Coroutines & RxJava, an asynchronicity comparison: Operators

Comparing Coroutines & RxJava operators - Part 5

In this blog series we compare Kotlin Coroutines and RxJava operators since they are both trying to solve a common problem in Android development: Asynchronous Programming.

We talked about comparing Kotlin Coroutines and RxJavas in part 1 and the Coroutines and RxJava interop library in part 4 of this series. We also talked about Asynchronicity Comparison for MVI in part 7.

Now, it’s time to compare how we can transform those streams with operators.

Common operators

Some RxJava operators are available in the Kotlin standard library as part of Kotlin Collections. Here you can see a table comparing simple operators:

Simple operators in RxJava and Coroutines

These operators transform the stream in the same way even though some operators differ in name: E.g., `skip` in RxJava is called `drop` in Coroutines.

Create your own operator

Some operators are not part of Kotlin Collections; however, you can create them yourself with little effort. We can replicate the range RxJava operator’s functionality with few lines of code and a simple `for` loop.

    fun range(
        context: CoroutineContext,
        start: Int,
        count: Int
) = publish(context) {
    for (x in start until start + count) send(x)
}
  

Some other operators require more work. In the example below you can see the implementation of the Completable.zip RxJava operator which takes two blocks of code and waits for both of them to finish.

    suspend fun zip(
             context: CoroutineContext,             
             block: () -> Unit, 
             block2: () -> Unit
) {
    val deferred1 = async(context) { block() }
    val deferred2 = async(context) { block2() }
    deferred1.await()
    deferred2.await()
}
  

If you noticed, we pass a CoroutineContext as a parameter. We do that so we can cancel the operator easily by calling `.cancel()` on that context’s job.

Complex operators

What about even more complex RxJava operators like `debounce`?

Complex operators in RxJava & Coroutines

You can find debounce as an extension function on `ReceiveChannel`. RxJava timeout has an equivalent in Kotlin Coroutines with `withTimeoutOrNull`, etc.

Similarities and differences between Coroutines & RxJava operators

We see that most operators are available in both libraries and, if not, you can easily build them. The only difference I can see in these two libraries is at which point you apply those operators.

Whereas in RxJava you can apply operators before subscribing to the stream, you have to do it after opening a subscription in Coroutines. Let’s see how we map an element in RxJava:

    rxObservable
    .map { result -> map(result) }
    .subscribe({
         consumeResult(it)    
    })
  

And now how we do it in Coroutines:

    broadcastChannel
    .openSubscription()
    .map { result -> map(result) }
  

In Coroutines, we have to do it after opening a subscription because map in Coroutines is an extension function on `ReceiveChannel<E>`. This is the case for other operators such as `filter`, `drop`, etc. Calling `openSubscription()` returns a `SubscriptionReceiveChannel<T>` object which extends `ReceiveChannel<E>`.

In my opinion, Coroutines requires extra work if you want multiple observers to apply the same operators. Of course you can do it! But it requires more code.

What’s coming next?

Threading is the topic for the sixth part of this series.

If you want to know how you can swap threading in Coroutines the same way you do it in RxJava with subscribeOn() and observeOn(), don’t miss Part 6 of this blog series. 


Manuel Vicente Vivo, Android Software Engineer at Capital One

Explore #LifeAtCapitalOne

Startup-like innovation with Fortune 100 capabilities.

Learn more

Related Content