In this article we are going to learn few useful operators that are commonly used in Rx programming. These operators basically filters the data emitted by an Observable. This tutorial explains use of Filter, Skip, SkipLast, Take, TakeLast and Distinct operators with detailed explanation.

If you are new to RxJava operators, Operators Introduction is a good place to get started.

1. Filter()

filter() allows the Observable to emit the only values those passes a test.

The below Observable emits the even numbers by applying a boolean condition on each number. The numbers which passes the test will be emitted and the remaining will be ignored.

  • The filter() method takes a Predicate test and apply the test on each item that is in the list. Below, the condition integer % 2 == 0 is applied on each number and the numbers which returns true, they will be emitted.
       Observable
                .just(1, 2, 3, 4, 5, 6, 7, 8, 9)
                .filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(Integer integer) throws Exception {
                        return integer % 2 == 0;
                    }
                })
                .subscribe(new DisposableObserver<Integer>() {
                    @Override
                    public void onNext(Integer integer) {
                        Log.e(TAG, "Even: " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
Output
Even: 2
Even: 4
Even: 6
Even: 8

Let’s consider an example by considering a custom datatype. Below Observable emits list of Users and we want to filter out the users by gender female.

  • getUsersObservable() creates an Observable that emits list of users combining both male and female users.
  • In the filter() method, each user is checked against female gender by user.getGender().equalsIgnoreCase(“female”) condition.
Observable<User> userObservable = getUsersObservable();

userObservable
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .filter(new Predicate<User>() {
                    @Override
                    public boolean test(User user) throws Exception {
                        return user.getGender().equalsIgnoreCase("female");
                    }
                })
                .subscribeWith(new DisposableObserver<User>() {
                    @Override
                    public void onNext(User user) {
                        Log.e(TAG, user.getName() + ", " + user.getGender());
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

private Observable<User> getUsersObservable() {
        String[] maleUsers = new String[]{"Mark", "John", "Trump", "Obama"};
        String[] femaleUsers = new String[]{"Lucy", "Scarlett", "April"};

        final List<User> users = new ArrayList<>();

        for (String name : maleUsers) {
            User user = new User();
            user.setName(name);
            user.setGender("male");

            users.add(user);
        }

        for (String name : femaleUsers) {
            User user = new User();
            user.setName(name);
            user.setGender("female");

            users.add(user);
        }
        return Observable
                .create(new ObservableOnSubscribe<User>() {
                    @Override
                    public void subscribe(ObservableEmitter<User> emitter) throws Exception {
                        for (User user : users) {
                            if (!emitter.isDisposed()) {
                                emitter.onNext(user);
                            }
                        }

                        if (!emitter.isDisposed()) {
                            emitter.onComplete();
                        }
                    }
                }).subscribeOn(Schedulers.io());
    }

public class User {
    String name;
    String gender;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getGender() {
        return gender;
    }

    public void setGender(String gender) {
        this.gender = gender;
    }
}

If you run the example, you can see the female users printed in LogCat.

Output
Lucy, female
Scarlett, female
April, female

2. Skip()

Skip(n) operator skips the emission of first N items emitted by an Observable. Let’s say you have an Observable that emits integers from 1-10 and if skip(4) is operator is used, it skips 1-4 and emits the numbers 5, 6, 7, 8, 9, 10.

        Observable
                .range(1, 10)
                .skip(4)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "Subscribed");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "Completed");
                    }
                });
Output
Subscribed
onNext: 5
onNext: 6
onNext: 7
onNext: 8
onNext: 9
onNext: 10
Completed

3. SkipLast()

skipLast(n) skips the last N emissions from an Observable. In the same example, skipLast(4) skips the emission of 7-10 and emits only 1, 2, 3, 4, 5, 6

        Observable
                .range(1, 10)
                .skipLast(4)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "Subscribed");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "Completed");
                    }
                });
Output
Subscribed
onNext: 1
onNext: 2
onNext: 3
onNext: 4
onNext: 5
onNext: 6
Completed

4. Take()

take(n) acts exactly opposite to skip. It takes first N emissions of an Observable.

In the below example, take(4) takes first 4 emissions i.e 1, 2, 3, 4 and skips the remaining.

         Observable
                .range(1, 10)
                .take(4)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "Subscribed");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "Completed");
                    }
                });
Output
Subscribed
onNext: 1
onNext: 2
onNext: 3
onNext: 4
Completed

5. TakeLast()

takeLast(n) emits last N items from an Observable.

In the same example, takeLast(4) takes last 4 emissions i.e 7, 8, 9, 10 and skips the remaining.

         Observable
                .range(1, 10)
                .takeLast(4)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "Subscribed");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "Completed");
                    }
                });
Output
Subscribed
onNext: 7
onNext: 8
onNext: 9
onNext: 10
Completed

6. Distinct()

Distinct operator filters out items emitted by an Observable by avoiding duplicate items in the list.

Below, we have list of integers with duplicates. Using distinct(), emission of duplicates can be avoided.

Observable<Integer> numbersObservable = Observable.just(10,10, 15, 20, 100, 200, 100, 300, 20, 100);

        numbersObservable
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .distinct()
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
Output
onNext: 10
onNext: 15
onNext: 20
onNext: 100
onNext: 200
onNext: 300

The distinct operator works very well with primitive datatypes. But if you want to use it with a custom datatype, you need to override the equals() and hashCode() methods.

The below example avoids duplicate notes emitted by an Observable.

  • prepareNotes() returns list of notes including duplicate notes.
  • getNotesObservable() creates an Observable that emits list of notes
  • In Note model, equals() and hashCode() methods are overridden to make comparison between objects work.
Observable<Note> notesObservable = getNotesObservable();

        DisposableObserver<Note> notesObserver = getNotesObserver();

        notesObservable.observeOn(Schedulers.io())
                .subscribeOn(AndroidSchedulers.mainThread())
                .distinct()
                .subscribeWith(notesObserver);

private DisposableObserver<Note> getNotesObserver() {
        return new DisposableObserver<Note>() {

            @Override
            public void onNext(Note note) {
                Log.e(TAG, "onNext: " + note.getNote());
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete");
            }
        };
    }

    private Observable<Note> getNotesObservable() {
        final List<Note> notes = prepareNotes();

        return Observable.create(new ObservableOnSubscribe<Note>() {
            @Override
            public void subscribe(ObservableEmitter<Note> emitter) throws Exception {
                for (Note note : notes) {
                    if (!emitter.isDisposed()) {
                        emitter.onNext(note);
                    }
                }

                if (!emitter.isDisposed()) {
                    emitter.onComplete();
                }
            }
        });
    }

    // Preparing notes including duplicates
    private List<Note> prepareNotes() {
        List<Note> notes = new ArrayList<>();
        notes.add(new Note(1, "Buy tooth paste!"));
        notes.add(new Note(2, "Call brother!"));
        notes.add(new Note(3, "Call brother!"));
        notes.add(new Note(4, "Pay power bill!"));
        notes.add(new Note(5, "Watch Narcos tonight!"));
        notes.add(new Note(6, "Buy tooth paste!"));
        notes.add(new Note(7, "Pay power bill!"));

        return notes;
    }

public class Note {
    int id;
    String note;

    public Note(int id, String note) {
        this.id = id;
        this.note = note;
    }

    public int getId() {
        return id;
    }

    public String getNote() {
        return note;
    }

    @Override
    public boolean equals(Object obj) {
        if (obj == this) {
            return true;
        }

        if (!(obj instanceof Note)) {
            return false;
        }

        return note.equalsIgnoreCase(((Note) obj).getNote());
    }

    @Override
    public int hashCode() {
        int hash = 3;
        hash = 53 * hash + (this.note != null ? this.note.hashCode() : 0);
        return hash;
    }
}
Output
onNext: Buy tooth paste!
onNext: Call brother!
onNext: Pay power bill!
onNext: Watch Narcos tonight!
onComplete

If you want to explore more operators, have look at the list here.

Author

  • Mustufa Ansari

    Hello sIr you tutorials are changing my android developing career! Thank you so much! I just want to ask which android studio theme are you using?