RxJavaMath is separate package dedicated to perform mathematical operations on Observables. The operations such as Average, Max, Min, Sum can be applied to any Observable sequence and emit the final calculated value. All the match operators emits single value instead of series of values.

If you are new to RxJava operators, Operators Introduction is a good place to get started.

To get started, add the rxjava-math dependency in your build.gradle

// RxJava
implementation 'io.reactivex.rxjava2:rxjava:2.1.9'

// RxJava math
implementation 'io.reactivex:rxjava-math:1.0.0'

1. Max

Max() operator finds the maximum valued item in the Observable sequence and emits that value.

The below example emits the max value of an integer series.

Integer[] numbers = {5, 101, 404, 22, 3, 1024, 65};

Observable<Integer> observable = Observable.from(numbers);

MathObservable
     .max(observable)
     .subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "Max value: " + integer);
            }
        });
Output
Max value: 1024

The same operation can be done on other primitive datatype such as Float, Double, Long. Below examples emits the max value of float numbers.

Observable<Float> floatObservable = Observable.just(10.5f, 14.5f, 11.5f, 5.6f);
        MathObservable.max(floatObservable)
                .subscribe(new Subscriber<Float>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(Float aFloat) {
                        Log.d(TAG, "Max of 10.5f, 11.5f, 14.5f: " + aFloat);
                    }
                });
Output
 Max of 10.5f, 11.5f, 14.5f: 14.5

2. Min

Min() operator emits the minimum valued item in the Observable data set.

Integer[] numbers = {5, 101, 404, 22, 3, 1024, 65};

Observable<Integer> observable = Observable.from(numbers);

MathObservable
      .min(observable)
      .subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "Min value: " + integer);
            }
        });
Output
Min value: 3

3. Sum

Calculates the sum of all the items emitted by an Observable and emits only the Sum value. In the below example, sumInteger() is used to calculate the sum of Integers. Likewise, we have sumFloat(), sumDouble() and sumLong() available to calculate sum of other primitive datatypes.

Integer[] numbers = {5, 101, 404, 22, 3, 1024, 65};

Observable<Integer> observable = Observable.from(numbers);

MathObservable
         .sumInteger(observable)
         .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "Min value: " + integer);
                    }
                });
Output
Sum: 1624

4. Average

Calculates the average of all the items emitted by an Observable and emits only the Average value.

The below example calculates the average value of integers using averageInteger() method. To calculate average of other datatypes, averageFloat(), averageDouble() and averageLong() are available.

MathObservable
          .averageInteger(observable)
          .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "Average: " + integer);
                    }
                });
Output
Average: 232

5. Count

Counts number of items emitted by an Observable and emits only the count value.

Below, we have an Observable that emits both Male and Female users. We can count number of Male users using count() operator as shown.

  • filter() filters the items by gender by applying user.getGender().equalsIgnoreCase(“male”) on each emitted item.
getUsersObservable()
                .filter(new Predicate<User>() {
                    @Override
                    public boolean test(User user) throws Exception {
                        return user.getGender().equalsIgnoreCase("male");
                    }
                })
                .count()
                .subscribeWith(new SingleObserver<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        
                    }

                    @Override
                    public void onSuccess(Long count) {
                        Log.d(TAG, "Male users count: " + count);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }
                });

private Observable<User> getUsersObservable() {
        String[] maleUsers = new String[]{"Mark", "John", "Trump", "Obama"};
        String[] femaleUsers = new String[]{"Lucy", "Scarlett", "April"};

        final List<User> users = new ArrayList<>();

        for (String name : maleUsers) {
            User user = new User();
            user.setName(name);
            user.setGender("male");

            users.add(user);
        }

        for (String name : femaleUsers) {
            User user = new User();
            user.setName(name);
            user.setGender("female");

            users.add(user);
        }
        return Observable
                .create(new ObservableOnSubscribe<User>() {
                    @Override
                    public void subscribe(ObservableEmitter<User> emitter) throws Exception {
                        for (User user : users) {
                            if (!emitter.isDisposed()) {
                                emitter.onNext(user);
                            }
                        }

                        if (!emitter.isDisposed()) {
                            emitter.onComplete();
                        }
                    }
                }).subscribeOn(Schedulers.io());
    }
Output
Male users count: 4

6. Reduce

Reduce applies a function on each item and emits the final result. First, it applies a function to first item, takes the result and feeds back to same function on second item. This process continuous until the last emission. Once all the items are over, it emits the final result.

Below we have an Observable that emits numbers from 1 to 10. The reduce() operator calculates the sum of all the numbers and emits the final result.

        Observable
                .range(1, 10)
                .reduce(new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer number, Integer sum) throws Exception {
                        return sum + number;
                    }
                })
                .subscribe(new MaybeObserver<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        disposable = d;
                    }

                    @Override
                    public void onSuccess(Integer integer) {
                        Log.e(TAG, "Sum of numbers from 1 - 10 is: " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "onError: " + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.e(TAG, "onComplete");
                    }
                });
Output
Sum of numbers from 1 - 10 is: 55

7. Mathematical Operation on Custom DataTypes

Not only on primitive datatypes, we can also perform mathematical operators on custom datatypes too. We are going to use Java8 stream API to perform the comparator operations. So, if you are using Android Studio, make sure you enable Java8 support to use the APIs. This causes the android project to target higher API devices i.e minSdkVersion to 24.

Let’s consider an example of finding the elderly person in a list. For this we create a datatype of Person with name and age attributes. Using the Comparator.comparing(), we can easily creates an Observable that emits the max aged person in the list.

List<Person> persons = new ArrayList<>();
persons.addAll(getPersons());

Observable<Person> personObservable = Observable.from(persons);

MathObservable.from(personObservable)
    .max(Comparator.comparing(Person::getAge))
    .subscribe(new Observer<Person>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(Person person) {
            Log.d(TAG, "Person with max age: " + person.getName() + ", " + person.getAge() + " yrs");
        }
});


private List<Person> getPersons() {
    List<Person> persons = new ArrayList<>();

    Person p1 = new Person("Lucy", 24);
    persons.add(p1);

    Person p2 = new Person("John", 45);
    persons.add(p2);

    Person p3 = new Person("Obama", 51);
    persons.add(p3);

    return persons;
}
Output:
Person with max age: Obama, 51 yrs

I hope this article cleared explained the usage of RxJava mathematical operators. If you have any suggestions or questions, please do comment below.

If you want to explore more operators, have a look at the lists here.

Author

  • Jace Herondale

    7th one is gold

  • bismeet marwaha

    So many different imports for the Observable class .Ugh.

    • Yup I remember spending a day to figure out the problem with import statements.

  • bismeet marwaha

    Reduce isnt working. It says the overriden method apply doesnot throw the exception.

    • Check your import statements.

    • xxxQDAxxx

      use rx.Observable

  • Hasan El-Hefnawy

    In 1. Max, the code doesn’t work. I noticed you used .from(numbers) instead of .fromArray(numbers) and onCompleted() instead of onComplete() and didn’t add onSubscribe(Subscription s).
    The code worked when I added compile ‘io.reactivex:rxjava:1.1.5’. It seems there is a compatability issue between rxjava2 and rxjava-math.