Today, we are going to learn few important map operators of RxJava i.e Map, FlatMap, ConcatMap and SwitchMap. This article summarises the usage of each operator, difference between them and use case scenarios that helps you choose the best operator that fulfils your requirement.

If you are new to RxJava operators, Operators Introduction is a good place to get started.

In short, Map, FlatMap, ConcatMap and SwitchMap applies a function or modifies the data emitted by an Observable.

  • Map modifies each item emitted by a source Observable and emits the modified item.
  • FlatMap, SwitchMap and ConcatMap also applies a function on each emitted item but instead of returning the modified item, it returns the Observable itself which can emit data again.
  • FlatMap and ConcatMap work is pretty much same. They merges items emitted by multiple Observables and returns a single Observable.
  • The difference between FlatMap and ConcatMap is, the order in which the items are emitted.
  • FlatMap can interleave items while emitting i.e the emitted items order is not maintained.
  • ConcatMap preserves the order of items. But the main disadvantage of ConcatMap is, it has to wait for each Observable to complete its work thus asynchronous is not maintained.
  • SwitchMap is a bit different from FlatMap and ConcatMap. SwitchMap unsubscribe from previous source Observable whenever new item started emitting, thus always emitting the items from current Observable.

Now, let’s see how each operator works with help of an example.

1. Map()

Map operator transform each item emitted by an Observable and emits the modified item.

Let’s say we have an Observable that makes a network call (assume the network call is made) and emits the User objects with name and gender. But in our requirement we need an email address to be present for each user, which is missing in the network response. Then we can alter each User object by applying Map() operation.

  • getUsersObservable() : assume this method is making a network call and fetching user objects. This returns an Observable that emits User objects with name and gender.
  • map() operator applies Function<User, User> on each User object and adds email address and returns the modified User object.
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;

public class MapOperatorActivity extends AppCompatActivity {

    private static final String TAG = MapOperatorActivity.class.getSimpleName();
    private Disposable disposable;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_map_operator);

        getUsersObservable()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Function<User, User>() {
                    @Override
                    public User apply(User user) throws Exception {
                        // modifying user object by adding email address
                        // turning user name to uppercase
                        user.setEmail(String.format("%s@rxjava.wtf", user.getName()));
                        user.setName(user.getName().toUpperCase());
                        return user;
                    }
                })
                .subscribe(new Observer<User>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        disposable = d;
                    }

                    @Override
                    public void onNext(User user) {
                        Log.e(TAG, "onNext: " + user.getName() + ", " + user.getGender() + ", " + user.getAddress().getAddress());
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "onError: " + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.e(TAG, "All users emitted!");
                    }
                });
    }

    /**
     * Assume this method is making a network call and fetching Users
     * an Observable that emits list of users
     * each User has name and email, but missing email id
     */
    private Observable<User> getUsersObservable() {
        String[] names = new String[]{"mark", "john", "trump", "obama"};

        final List<User> users = new ArrayList<>();
        for (String name : names) {
            User user = new User();
            user.setName(name);
            user.setGender("male");

            users.add(user);
        }
        return Observable
                .create(new ObservableOnSubscribe<User>() {
                    @Override
                    public void subscribe(ObservableEmitter<User> emitter) throws Exception {
                        for (User user : users) {
                            if (!emitter.isDisposed()) {
                                emitter.onNext(user);
                            }
                        }

                        if (!emitter.isDisposed()) {
                            emitter.onComplete();
                        }
                    }
                }).subscribeOn(Schedulers.io());
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        disposable.dispose();
    }
}
public class User {
    String name;
    String email;
    String gender;
    Address address;

    // getters and setters
}

If you run the example, you can notice email address added to each User. You can also notice that the name is modified to uppercase.

Output
onSubscribe
onNext: MARK, male, mark@rxjava.wtf
onNext: JOHN, male, john@rxjava.wtf
onNext: TRUMP, male, trump@rxjava.wtf
onNext: OBAMA, male, obama@rxjava.wtf
All users emitted!

2. FlatMap()

To better understand FlatMap, consider a scenario where you have a network call to fetch Users with name and gender. Then you have another network that gives you address of each user. Now the requirement is to create an Observable that emits Users with name, gender and address properties. To achieve this, you need to get the users first, then make separate network call for each user to fetch his address. This can be done easily using FlatMap operator.

  • getUsersObservable() : assume it makes a network call and returns an Observable that emits User (name and gender) objects.
  • getAddressObservable() : assume it makes another network call just to fetch user address. This also returns an Observable that emits User by adding address node to existing name and gender.
  • flatMap() operator makes getAddressObservable() call each time a User is emitted and returns an Observable that emits User including the address filed.
  • Finally flatMap() returns an Observable by merging two Observables together.
  • Thread.sleep(sleepTime); added here to simulate network latency.
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import info.androidhive.rxandroidexamples.R;
import info.androidhive.rxandroidexamples.operators.model.Address;
import info.androidhive.rxandroidexamples.operators.model.User;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;

public class FlatMapActivity extends AppCompatActivity {

    private static final String TAG = FlatMapActivity.class.getSimpleName();

    private Disposable disposable;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_flat_map);

        getUsersObservable()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .flatMap(new Function<User, Observable<User>>() {

                    @Override
                    public Observable<User> apply(User user) throws Exception {

                        // getting each user address by making another network call
                        return getAddressObservable(user);
                    }
                })
                .subscribe(new Observer<User>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e(TAG, "onSubscribe");
                        disposable = d;
                    }

                    @Override
                    public void onNext(User user) {
                        Log.e(TAG, "onNext: " + user.getName() + ", " + user.getGender() + ", " + user.getAddress().getAddress());
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {
                        Log.e(TAG, "All users emitted!");
                    }
                });
    }

    /**
     * Assume this as a network call
     * returns Users with address filed added
     */
    private Observable<User> getAddressObservable(final User user) {

        final String[] addresses = new String[]{
                "1600 Amphitheatre Parkway, Mountain View, CA 94043",
                "2300 Traverwood Dr. Ann Arbor, MI 48105",
                "500 W 2nd St Suite 2900 Austin, TX 78701",
                "355 Main Street Cambridge, MA 02142"
        };

        return Observable
                .create(new ObservableOnSubscribe<User>() {
                    @Override
                    public void subscribe(ObservableEmitter<User> emitter) throws Exception {
                        Address address = new Address();
                        address.setAddress(addresses[new Random().nextInt(2) + 0]);
                        if (!emitter.isDisposed()) {
                            user.setAddress(address);


                            // Generate network latency of random duration
                            int sleepTime = new Random().nextInt(1000) + 500;

                            Thread.sleep(sleepTime);
                            emitter.onNext(user);
                            emitter.onComplete();
                        }
                    }
                }).subscribeOn(Schedulers.io());
    }

    /**
     * Assume this is a network call to fetch users
     * returns Users with name and gender but missing address
     */
    private Observable<User> getUsersObservable() {
        String[] maleUsers = new String[]{"Mark", "John", "Trump", "Obama"};

        final List<User> users = new ArrayList<>();

        for (String name : maleUsers) {
            User user = new User();
            user.setName(name);
            user.setGender("male");

            users.add(user);
        }

        return Observable
                .create(new ObservableOnSubscribe<User>() {
                    @Override
                    public void subscribe(ObservableEmitter<User> emitter) throws Exception {
                        for (User user : users) {
                            if (!emitter.isDisposed()) {
                                emitter.onNext(user);
                            }
                        }

                        if (!emitter.isDisposed()) {
                            emitter.onComplete();
                        }
                    }
                }).subscribeOn(Schedulers.io());
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        disposable.dispose();
    }
}

If you run this example you can see the output like below. Here, name and gender are fetched from one observable and address is fetched from another observable. Also notice that the order of items is not maintained as source observable. You can see the order changed each time you run this example.

Output
onSubscribe
onNext: John, male, 2300 Traverwood Dr. Ann Arbor, MI 48105
onNext: Obama, male, 2300 Traverwood Dr. Ann Arbor, MI 48105
onNext: Mark, male, 1600 Amphitheatre Parkway, Mountain View, CA 94043
onNext: Trump, male, 2300 Traverwood Dr. Ann Arbor, MI 48105
All users emitted!

3. ConcatMap()

Now consider the same example of FlatMap but replacing the operator with ConcatMap. Technically the both operators produces the same output but the sequence the data emitted changes.

  • ConcatMap() maintains the order of items and waits for the current Observable to complete its job before emitting the next one.
  • ConcatMap is more suitable when you want to maintain the order of execution.
import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import info.androidhive.rxandroidexamples.R;
import info.androidhive.rxandroidexamples.operators.model.Address;
import info.androidhive.rxandroidexamples.operators.model.User;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;

public class ConcatMapOperatorActivity extends AppCompatActivity {

    private static final String TAG = ConcatMapOperatorActivity.class.getSimpleName();

    private Disposable disposable;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_concat_map);
        getUsersObservable()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .concatMap(new Function<User, Observable<User>>() {

                    @Override
                    public Observable<User> apply(User user) throws Exception {

                        // getting each user address by making another network call
                        return getAddressObservable(user);
                    }
                })
                .subscribe(new Observer<User>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e(TAG, "onSubscribe");
                        disposable = d;
                    }

                    @Override
                    public void onNext(User user) {
                        Log.e(TAG, "onNext: " + user.getName() + ", " + user.getGender() + ", " + user.getAddress().getAddress());
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {
                        Log.e(TAG, "All users emitted!");
                    }
                });
    }

    /**
     * Assume this as a network call
     * returns Users with address filed added
     */
    private Observable<User> getAddressObservable(final User user) {

        final String[] addresses = new String[]{
                "1600 Amphitheatre Parkway, Mountain View, CA 94043",
                "2300 Traverwood Dr. Ann Arbor, MI 48105",
                "500 W 2nd St Suite 2900 Austin, TX 78701",
                "355 Main Street Cambridge, MA 02142"
        };

        return Observable
                .create(new ObservableOnSubscribe<User>() {
                    @Override
                    public void subscribe(ObservableEmitter<User> emitter) throws Exception {
                        Address address = new Address();
                        address.setAddress(addresses[new Random().nextInt(2) + 0]);
                        if (!emitter.isDisposed()) {
                            user.setAddress(address);


                            // Generate network latency of random duration
                            int sleepTime = new Random().nextInt(1000) + 500;

                            Thread.sleep(sleepTime);
                            emitter.onNext(user);
                            emitter.onComplete();
                        }
                    }
                }).subscribeOn(Schedulers.io());
    }

    /**
     * Assume this is a network call to fetch users
     * returns Users with name and gender but missing address
     */
    private Observable<User> getUsersObservable() {
        String[] maleUsers = new String[]{"Mark", "John", "Trump", "Obama"};

        final List<User> users = new ArrayList<>();

        for (String name : maleUsers) {
            User user = new User();
            user.setName(name);
            user.setGender("male");

            users.add(user);
        }

        return Observable
                .create(new ObservableOnSubscribe<User>() {
                    @Override
                    public void subscribe(ObservableEmitter<User> emitter) throws Exception {
                        for (User user : users) {
                            if (!emitter.isDisposed()) {
                                emitter.onNext(user);
                            }
                        }

                        if (!emitter.isDisposed()) {
                            emitter.onComplete();
                        }
                    }
                }).subscribeOn(Schedulers.io());
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        disposable.dispose();
    }
}

If you run the example, you can see the order is maintained as source observable i.e Mark, John, Trump, Obama and it always maintains the same order.

Output
onSubscribe
onNext: Mark, male, 1600 Amphitheatre Parkway, Mountain View, CA 94043
onNext: John, male, 2300 Traverwood Dr. Ann Arbor, MI 48105
onNext: Trump, male, 2300 Traverwood Dr. Ann Arbor, MI 48105
onNext: Obama, male, 1600 Amphitheatre Parkway, Mountain View, CA 94043
All users emitted!

FlatMap & ConcatMap – Flight Tickets Listing

Flight Ticket Listing is good example of FlatMap & ConcatMap operators when used with Retrofit networking calls.

4. SwitchMap()

SwithMap() on the other hand is completely a different operator from FlatMap and ConcatMap. SwitchMap always return the latest Observable and emits the items from it.

import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;


import java.util.concurrent.TimeUnit;

import info.androidhive.rxandroidexamples.R;
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;

public class SwitchMapOperatorActivity extends AppCompatActivity {

    private static final String TAG = SwitchMapOperatorActivity.class.getSimpleName();

    private Disposable disposable;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_switch_map_operator);

Observable<Integer> integerObservable =
                Observable.fromArray(new Integer[]{1, 2, 3, 4, 5, 6});


        // it always emits 6 as it un-subscribes the before observer
        integerObservable
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .switchMap(new Function<Integer, ObservableSource<Integer>>() {
                    @Override
                    public ObservableSource<Integer> apply(Integer integer) throws Exception {
                        return Observable.just(integer).delay(1, TimeUnit.SECONDS);
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe");
                        disposable = d;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "All users emitted!");
                    }
                });
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        disposable.dispose();
    }
}
Output
onSubscribe
onNext: 6
All users emitted!

SwitchMap – Android Instant Search

Explaining SwitchMap with a simple Integer emission won’t describe much about the operator. I suggest you read the Android Instant Search example to understand the real use case of SwitchMap.

Choosing between Map operators

  • Consider using Map operator where there is an offline operations needs to be done on emitted data. As explained in the article, we got something from server but that doesn’t fulfils our requirement. In that case, Map can be used to alter the emitted data.
  • Choose FlatMap when the order is not important. Let’s say you are building a Airline Ticket Fair app that fetches the prices of each airline separately and display on the screen. For this both FlatMap and ConcatMap can be used. But if the order is not important and want to send all the network calls simultaneously, I would consider FlatMap over ConcatMap. If you consider ConcatMap in this scenario, the time takes to fetch the prices takes very longer time as the ConcatMap won’t make simultaneous calls in order to maintain item order.
  • SwitchMap is best suited when you want to discard the response and consider the latest one. Let’s say you are writing an Instant Search app which sends search query to server each time user types something. In this case multiple requests will be sent to server with multiple queries, but we want to show the result of latest typed query only. For this case, SwitchMap is best operator to use.
  • Another use case of SwitchMap is, you have a feed screen in which feed is refreshed each time user perform pulldown to refresh. In this scenario, SwitchMap is best suited as it can ignores the older feed response and consider only the latest request.

I hope this explained well about Map operators. If you want to explore more RxJava operators, check here.

Author

  • Ersultan Utemisov

    very nice tutorial, thanks Ravi

  • Raviraj Desai

    Thanks Ravi

  • karthick Ramanathan

    hi Ravi, one doubt .In Flat map to add users address why thread with sleep time is used, i could’t understand

    • We are assuming the address is fetched by making network call. So just to simulate the network delay, we are using a Timer.

      • karthick Ramanathan

        Thanks

  • Ahmed Abdeen

    Very nice tutorial. Thank you so much Ravi!

  • Raul Yang

    very good, i understood FlatMap better than before, thank you.

  • pradeepkumar reddy

    did you create any tutorial for using sqlbrite library ?

    As of now, i’m using the following code. My usecase is, user types a search string and for that search string i will make a LIKE pattern query from sqlite db. I’m using Observable.fromCallable() for the sqlite db query.

    Can you please let me know if there are any issues in the following code.

    disposable.add(RxTextView.textChangeEvents(binding.etLocalInstSearch)
    .skipInitialValue()
    .debounce(500, TimeUnit.MILLISECONDS)
    .filter(new Predicate() {
    @Override
    public boolean test(TextViewTextChangeEvent textViewTextChangeEvent) {
    return textViewTextChangeEvent.text().toString().length() > 2;
    }
    })
    .flatMap(new Function<textviewtextchangeevent, observablesource<list>>() {
    @Override
    public ObservableSource<list> apply(final TextViewTextChangeEvent tvTextChangeEvent) {
    return Observable.fromCallable(new Callable<list>() {
    @Override
    public List call() {
    // make the LIKE query for the search string and get the response
    return getLocalDBInstSearchResult(tvTextChangeEvent.text().toString());
    }
    });
    }
    })
    .distinctUntilChanged()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeWith(getInstSearchResultObserver()));

  • Fesi Nagetive

    Ravi you are simply awesome

  • loki luck

    Great toturial! You save my life 🙂

  • Nupur

    Very nice tutorial and well explained

  • Pavel Shakhtarin

    The best tutorial for beginners so far in my opinion, thx man :3

  • Hasan El-Hefnawy

    In 1. Map(), public void onNext(User user),
    It should be user.getEmail() instead of user.getAddress().getAddress().

  • Anh Tuan Tran

    In 2.FlatMap
    May I know where is Address class?

    • Theri Roki

      same question for me too

  • Suyog Dorlikar

    It is very useful tutorial for Rx-Java
    Thank you

  • Suyog Dorlikar

    It is best tutorials for beginners

  • Suyog Dorlikar

    Please can you make tutorials on dagger 2
    It is very helpful to study on dagger