The Rx way of doing things

2017, Aug 28    

We have been trying to include more and more reactive approach in apps that we build at moldedbits. Now that I am little more comfortable with reactive way of doing things I decided to write a blog on reactive vs imperative way of solving a problem. I will try to explain how reactive programming makes developer’s life easy by doing more in less code.

Problem Statement

Consider following problem statement

  1. You have to make an API call which returns a map of ObjectId to Object
  2. You need to convert this map to a list of objects which contain object-id
  3. Now you need to filter some objects from this list
  4. For each of these filtered objects you need to do another API call.
  5. Above API calls should be done sequentially and each subsequent API call should be done after a delay of 100 ms.

This is a real life scenario. I had written implementation for this in one of our Android app. We use Retrofit for networking, RxJava-2 for reactive programming and Retrolambda for Java8 like syntax and boilerplate reduction.

Now think about how you will implement this in imperative world.

The Imperative Way

Let me help you. In an imperative world

  1. You will probably do an API call with help of Retrofit(if you are using retrofit) or in a AsyncTask.
  2. When this API returns you will iterate the map and convert it in a list on main thread (off-course you can do it in a different thread but I want to keep things simple).
  3. After which you would do a second iteration for filtering out desired objects.
  4. Now things get interesting as you need to perform API calls for each of these remaining objects in list. You probably will need to write a Service or AsyncTask which now performs synchronous API calls after a sleep of 100ms.

Now that is lot of work and I am not even considering edge cases and error handling for sake of simplicity. Think of the case when API returns a large list of objects and you can’t afford to process that on main thread.

Well, Rx does all this heavy lifting for you. Lets see how.

The Rx Way

Reactive programming is totally a different approach towards writing code. Everything in reactive world is considered a stream. These streams are immutable and there are plethora of operators available which operate on these streams. Real power of reactive programming lies in these tried and tested operators.

Rx makes task of switching between the thread as easy as having an apple pie. Our above said problem statement can be accomplished by mere 10 lines of code with RxJava. Here is a sample from production code

APIProvider.getInstance().getService().getMyObjects(userName)
  .subscribeOn(Schedules.io())
  .map(MyObject::toList)
  .flatMap(rx.Observable::from)
  .filter(myObject->myObject.getName().equals(Constants.SOME_NAME))
  // adding a 10ms delay in subsequent api calls
  .flatMap(myObject -> Observable.timer(100, TimeUnit.MILLISECONDS).map(y -> myObject))
  .doOnNext(rule -> {
      // deleting my object one by one
      APIProvider.getInstance()
        .getService().deleteMyObject(userName, rule.getId())
        .subscribe();
  }).subscribe();

lets understand this code line by line

APIProvider.getInstance().getService().getMyObjects(userName)

Its Retrofit way of doing an API call. This API returns a map of MyObjectId to MyObject. An example JSON looks like this

{
  "1": {
    "name": "object1"
  },
  "2": {
    "name": "object2"
  }
}
.subscribeOn(Schedules.io())

subscribeOn is an Rx operator which directs RxJava to start processing on scheduler thread (a background thread). Just this line lets you switch your processing to a background thread. How cool is that!

.map(MyObject::toList)

Map is transformation operator which allows us to convert things. Here we are converting Map to List. toList is a static method written in MyObject class. This method converts a Map<ObjectId, MyObject> to List<MyObject>. List version has object id’s in object itself. MyObject::toList is Java 8 way of calling this method, Its named as Method Reference. Transformed data structure now looks like this

[
  {
    "id": "1",
    "name": "object1"
  },
  {
    "id": "2",
    "name": "object2"
  }
]
.flatMap(rx.Observable::from)

This statement allows us to receive each MyObject as separate event on observable stream. Before this we were having whole List or Map as single event on observable stream. To perform filtering and API calls on each of these separately we need to have them as independent events on stream.

.filter(myObject->myObject.getName().equals(Constants.SOME_NAME))

This is pretty obvious. filter operator allows us to filter out events which do not match filtering criteria. Any object whose name does not match with Constants.SOME_NAME is not allowed to propagate further in stream.

.flatMap(myObject -> Observable.timer(100, TimeUnit.MILLISECONDS).map(y -> myObject))

This interesting piece of code is allowing us to add a delay of 100 milliseconds between subsequent results. Inside flatmap we are creating another stream using Timer observable. As per documentation Timer returns an Observable that emits a single number zero after a delay period you specify. Since, we do not want to loose our MyObjects we need to map this zero back to MyObject. Outer flatmap is making sure that event are received on current stream.

doOnNext(myObject -> {
    // deleting my object one by one
    APIProvider.getInstance()
      .getService().deleteMyObject(userName, myObject.getId())
      .subscribe();
})

Finally, since we have nice stream of MyObjects including a delay of 100ms we can perform our API calls. That is what is happening inside doOnNext operator. doOnNext operator allows us add a hook on each event in a stream.

.subscribe()

Fire! this is what this operator does. This the simplest version of Subscribe operator that we have used here. You can read more about it in docs.

If you have noticed, all this work is happening in a background thread without us having to worry about anything. In my case I had no need to process API results in Android main thread but in case if you do you can just use ObserveOn operator to switch threads any time.

So what do you think about it? Let me know in comments.

Happy Coding !!

Note: This article was Originally published at The Rx way of doing things