We all know that Observable emits data / event and an Observer can receive it by subscribing on to it. There are multiple types of Observables, Observers and there are number of ways to create an Observable. In this article, I am gonna explains about different types of Observables and the scenarios where you can use them.

If you are new to RxJava, Introduction To Reactive Programming is a good place to get started.

Types of Observables & Observers

Observables differs from another in the way they produce the data and the number of emissions each Observable makes. We have to wisely choose the best suited Observable depending on the use case. Let’s say we are expecting an emission from an Observable, but if it produces null, then wrong Observable is used.

Overall, we have the below Observables in RxJava2.

  • Observable
  • Single
  • Maybe
  • Flowable
  • Completable

Opposing to each Observable, we have the following Observers.

  • Observer
  • SingleObservable
  • MaybeObservable
  • CompletableObserver

All Observables varies from one another in the number of emission it makes. The below table describes each Observable, its Observer and number of emissions.

Observable Observer # of emissions
Observable Observer Multiple or None
Single SingleObserver One
Maybe MaybeObserver One or None
Flowable Observer Multiple or None
Completable CompletableObserver None

Let’s move on to each Observable and try them with an example. Note that all examples here are more of syntactical representation of Observable and Observer just to show how they coordinate together. We are going to learn real use cases later in this article series.

1. Observable & Observer

Observable is probably most used observable among all. Observable can emit one or more items.

In the below example, we have an Observable that emits Note items one by one. We can also emit list of Notes at once, but if you want to apply an operator on to each note (in other words, you want to transform emitted data), emitting a single Note is better option.

// emitting single Note
Observable<Note>

// emitting list of notes at once, but in this case considering Single Observable is best option
Observable<List<Note>>
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;

import java.util.ArrayList;
import java.util.List;

import info.androidhive.rxandroidexamples.R;
import info.androidhive.rxandroidexamples.observers.model.Note;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

public class ObserverActivity extends AppCompatActivity {

    private static final String TAG = ObserverActivity.class.getSimpleName();
    private Disposable disposable;

    /**
     * Simple Observable emitting multiple Notes
     * -
     * Observable : Observer
     */

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_observer);

        Observable<Note> notesObservable = getNotesObservable();

        Observer<Note> notesObserver = getNotesObserver();

        notesObservable.observeOn(Schedulers.io())
                .subscribeOn(AndroidSchedulers.mainThread())
                .subscribeWith(notesObserver);
    }

    private Observer<Note> getNotesObserver() {
        return new Observer<Note>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
                disposable = d;
            }

            @Override
            public void onNext(Note note) {
                Log.d(TAG, "onNext: " + note.getNote());
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        };
    }

    private Observable<Note> getNotesObservable() {
        final List<Note> notes = prepareNotes();

        return Observable.create(new ObservableOnSubscribe<Note>() {
            @Override
            public void subscribe(ObservableEmitter<Note> emitter) throws Exception {
                for (Note note : notes) {
                    if (!emitter.isDisposed()) {
                        emitter.onNext(note);
                    }
                }

                // all notes are emitted
                if (!emitter.isDisposed()) {
                    emitter.onComplete();
                }
            }
        });
    }

    private List<Note> prepareNotes() {
        List<Note> notes = new ArrayList<>();
        notes.add(new Note(1, "Buy tooth paste!"));
        notes.add(new Note(2, "Call brother!"));
        notes.add(new Note(3, "Watch Narcos tonight!"));
        notes.add(new Note(4, "Pay power bill!"));
        return notes;
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        disposable.dispose();
    }
}
public class Note {
    int id;
    String note;

    // getters an setters
}
Output
onSubscribe
onNext: Buy tooth paste!
onNext: Call brother!
onNext: Watch Narcos tonight!
onNext: Pay power bill!
onComplete

2. Single & SingleObserver

Single always emits only one value or throws an error. The same job can be done using Observable too with a single emission but Single always makes sure there is an emission. A use case of Single would be making a network call to get response as the response will be fetched at once.

The below example always emits a single Note. Another example could be fetching a Note from database by its Id. Also we need to make sure that the Note is present in database as Single should always emit a value.

  • Notice here, the SingleObserver doesn’t have onNext() to emit the data, instead the data will be received in onSuccess(Note note) method.
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;

import info.androidhive.rxandroidexamples.R;
import info.androidhive.rxandroidexamples.observers.model.Note;
import io.reactivex.Single;
import io.reactivex.SingleEmitter;
import io.reactivex.SingleObserver;
import io.reactivex.SingleOnSubscribe;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

public class SingleObserverActivity extends AppCompatActivity {

    private static final String TAG = SingleObserverActivity.class.getSimpleName();
    private Disposable disposable;

    /**
     * Single Observable emitting single Note
     * Single Observable is more useful in making network calls
     * where you expect a single response object to be emitted
     * -
     * Single : SingleObserver
     */

    // TODO - link to Retrofit  tutorial
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_single_observer);

        Single<Note> noteObservable = getNoteObservable();

        SingleObserver<Note> singleObserver = getSingleObserver();

        noteObservable
                .observeOn(Schedulers.io())
                .subscribeOn(AndroidSchedulers.mainThread())
                .subscribe(singleObserver);

    }

    private SingleObserver<Note> getSingleObserver() {
        return new SingleObserver<Note>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
                disposable = d;
            }

            @Override
            public void onSuccess(Note note) {
                Log.e(TAG, "onSuccess: " + note.getNote());
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e.getMessage());
            }
        };
    }

    private Single<Note> getNoteObservable() {
        return Single.create(new SingleOnSubscribe<Note>() {
            @Override
            public void subscribe(SingleEmitter<Note> emitter) throws Exception {
                Note note = new Note(1, "Buy milk!");
                emitter.onSuccess(note);
            }
        });
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        disposable.dispose();
    }
}
Output
onSubscribe
onSuccess: Buy milk!

3. Maybe & MaybeObserver

Maybe observable may or may not emits a value. This observable can be used when you are expecting an item to be emitted optionally.

The below example always emits a value but you will get to know the use when used in real apps.

import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;

import info.androidhive.rxandroidexamples.R;
import info.androidhive.rxandroidexamples.observers.model.Note;
import io.reactivex.Maybe;
import io.reactivex.MaybeEmitter;
import io.reactivex.MaybeObserver;
import io.reactivex.MaybeOnSubscribe;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

public class MaybeObserverActivity extends AppCompatActivity {

    private static final String TAG = MaybeObserverActivity.class.getSimpleName();
    private Disposable disposable;

    /**
     * Consider an example getting a note from db using ID
     * There is possibility of not finding the note by ID in the db
     * In this situation, MayBe can be used
     * -
     * Maybe : MaybeObserver
     */

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_maybe_observer);

        Maybe<Note> noteObservable = getNoteObservable();

        MaybeObserver<Note> noteObserver = getNoteObserver();

        noteObservable.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(noteObserver);
    }

    private MaybeObserver<Note> getNoteObserver() {
        return new MaybeObserver<Note>() {
            @Override
            public void onSubscribe(Disposable d) {
                disposable = d;
            }

            @Override
            public void onSuccess(Note note) {
                Log.d(TAG, "onSuccess: " + note.getNote());
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete");
            }
        };
    }

    /**
     * Emits optional data (0 or 1 emission)
     * But for now it emits 1 Note always
     */
    private Maybe<Note> getNoteObservable() {
        return Maybe.create(new MaybeOnSubscribe<Note>() {
            @Override
            public void subscribe(MaybeEmitter<Note> emitter) throws Exception {
                Note note = new Note(1, "Call brother!");
                if (!emitter.isDisposed()) {
                    emitter.onSuccess(note);
                }
            }
        });
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        disposable.dispose();
    }
}

4. Completable & CompletableObserver

Completable observable won’t emit any data instead it notifies the status of the task either success or failure. This observable can be used when you want to perform some task and not expect any value. A use case would be updating some data on the server by making PUT request.

import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;

import info.androidhive.rxandroidexamples.R;
import info.androidhive.rxandroidexamples.observers.model.Note;
import io.reactivex.Completable;
import io.reactivex.CompletableEmitter;
import io.reactivex.CompletableObserver;
import io.reactivex.CompletableOnSubscribe;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

public class CompletableObserverActivity extends AppCompatActivity {

    private static final String TAG = CompletableObserverActivity.class.getSimpleName();
    private Disposable disposable;

    /**
     * Completable won't emit any item, instead it returns
     * Success or failure state
     * Consider an example of making a PUT request to server to update
     * something where you are not expecting any response but the
     * success status
     * -
     * Completable : CompletableObserver
     */
    // TODO - link to Retrofit  tutorial
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_completable_observer);

        Note note = new Note(1, "Home work!");

        Completable completableObservable = updateNote(note);

        CompletableObserver completableObserver = completableObserver();

        completableObservable
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(completableObserver);
    }


    /**
     * Assume this making PUT request to server to update the Note
     */
    private Completable updateNote(Note note) {
        return Completable.create(new CompletableOnSubscribe() {
            @Override
            public void subscribe(CompletableEmitter emitter) throws Exception {
                if (!emitter.isDisposed()) {
                    Thread.sleep(1000);
                    emitter.onComplete();
                }
            }
        });
    }

    private CompletableObserver completableObserver() {
        return new CompletableObserver() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
                disposable = d;
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: Note updated successfully!");
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: " + e.getMessage());
            }
        };
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        disposable.dispose();
    }
}
Output
onSubscribe
onComplete: Note updated successfully!

5. Flowable & Observer

Flowable observable should be used when an Observable is generating huge amount of events/data than the Observer can handle. As per doc, Flowable can be used when the source is generating 10k+ events and subscriber can’t consume it all.

In the below example, Flowable is emitting numbers from 1-100 and reduce operator is used to add all the numbers and emit the final value.

import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;

import info.androidhive.rxandroidexamples.R;
import io.reactivex.Flowable;
import io.reactivex.SingleObserver;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.BiFunction;
import io.reactivex.schedulers.Schedulers;

public class FlowableObserverActivity extends AppCompatActivity {

    private static final String TAG = FlowableObserverActivity.class.getSimpleName();
    private Disposable disposable;

    /**
     * Simple example of Flowable just to show the syntax
     * the use of Flowable is best explained when used with BackPressure
     * Read the below link to know the best use cases to use Flowable operator
     * https://github.com/ReactiveX/RxJava/wiki/What%27s-different-in-2.0#when-to-use-flowable
     * -
     * Flowable : SingleObserver
     */

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_flowable_observer);

        Flowable<Integer> flowableObservable = getFlowableObservable();

        SingleObserver<Integer> observer = getFlowableObserver();

        flowableObservable
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .reduce(0, new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer result, Integer number) {
                        //Log.e(TAG, "Result: " + result + ", new number: " + number);
                        return result + number;
                    }
                })
                .subscribe(observer);
    }

    private SingleObserver<Integer> getFlowableObserver() {
        return new SingleObserver<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
                disposable = d;
            }

            @Override
            public void onSuccess(Integer integer) {
                Log.d(TAG, "onSuccess: " + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: " + e.getMessage());
            }
        };
    }

    private Flowable<Integer> getFlowableObservable() {
        return Flowable.range(1, 100);
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        disposable.dispose();
    }
}
Output
onSubscribe
onSuccess: 5050

Backpressure

When Flowable is used, the overflown emissions has to be handled using a strategy called Backpressure. Otherwise it throws an exception such as MissingBackpressureException or OutOfMemoryError. Backpressure is very important topic and needs a separate article to understand it clearly.

Author

  • Sai Babu

    Thank you Ravi, understood diff types of observables and observers.

  • Jeff

    Thanks Ravi, i have clear understanding about the type of observer and observable and how to implement in which case and condition how to use, but could you help me give some example how and when to use the backpressure? from the description you said it’s really important when combining with Flowable

    • Flowables will be used when there is huge data to be consumed, but the Observer can’t handle the data. Example can be device sensor data and you are performing some operations on it. In this case you can use Flowable with some Backpressure strategy. I am yet to prepare Solid example on Flowable.

      I am working on Android Architecture components, I’ll continue this series once I am done with it.

      • Jeff

        Ok thanks a lot for reply, I hope that you can continue the support for flowable observer methods so that i can i have clear used when this observer to used and how to handle the large data which you have mention….

  • Akash

    Hi Ravi,

    Thanks for the great post!
    In the first two examples here, you are observing on background thread (Schedulers.io) and subscribing on main thread. Is there any specific reason for doing this ?

    • It shouldn’t be. It’s probably my mistake.

  • Ganesh

    Thanks Ravi for this extremely good and useful post. Now I clearly understood that How to select require Observable and Observer? and How to use it? before use blindly 🙂

    Please share the links of your tutorials like RxBus, Event handling using RxJava2.

  • pradeepkumar reddy

    can you please share sample code to create observable for sqlite query ?

    • Consider using this library.
      https://github.com/square/sqlbrite

      • pradeepkumar reddy

        did you create any tutorial for using sqlbrite library ?

        As of now, i’m using the following code. My usecase is, user types a search string and for that search string i will make a LIKE pattern query from sqlite db. I’m using Observable.fromCallable() for the sqlite db query.

        Can you please let me know if there are any issues in the following code.

        disposable.add(RxTextView.textChangeEvents(binding.etLocalInstSearch)
        .skipInitialValue()
        .debounce(500, TimeUnit.MILLISECONDS)
        .filter(new Predicate() {
        @Override
        public boolean test(TextViewTextChangeEvent textViewTextChangeEvent) {
        return textViewTextChangeEvent.text().toString().length() > 2;
        }
        })
        .flatMap(new Function<textviewtextchangeevent, observablesource<list>>() {
        @Override
        public ObservableSource<list> apply(final TextViewTextChangeEvent tvTextChangeEvent) {
        return Observable.fromCallable(new Callable<list>() {
        @Override
        public List call() {
        // make the LIKE query for the search string and get the response
        return getLocalDBInstSearchResult(tvTextChangeEvent.text().toString());
        }
        });
        }
        })
        .distinctUntilChanged()
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeWith(getInstSearchResultObserver()));

  • Jaswant Singh Sodha

    Great Tutorial, now I have understood different types of Observable and Observer and their usage.

  • Shams Babadjanov