software engineering Apr 26, 2018

Coroutines and RxJava — An Asynchronicity Comparison (Part 4): Interop Library

Introduction

In this blog series I will compare Kotlin Coroutines and RxJava since they are both trying to solve a common problem in Android development: Asynchronous Programming.

In Part 3, we finished talking about publish — a Coroutine with a channel built in that runs the suspending lambda inside the Coroutine every time you open a subscription.

In Part 3 we said that publish is part of the interop library. That’s what this article is about: Coroutines and RxJava interop library.

Why Would You Need The Interop Library?

The most obvious answer is that you’re migrating from one library to the other. You shouldn’t do a one-off migration since that would increase the risk of something going wrong. A better approach is migrating feature by feature. In that case, you would need both libraries together in your project and the ability to consume one on top of the other.

The less obvious answer is when you want to consume Coroutines from Java code. You cannot call Coroutines from Java due to the code that the Kotlin compiler creates for you. For this reason, you need a bridge that can consume Coroutines from Java. RxJava is one option.

Getting Started

If you want to use the Interop library, you need to import this library into your project.

implementation "org.jetbrains.kotlinx:kotlinx-coroutines-rx2:$kotlin_coroutines_version"

This will import Coroutines and RxJava 2, in case you don’t have them already imported into your project.

Coroutines on Top of RxJava

Let’s go back to the example we usedin Part 2: a timer that emits an item every second.

Observable.interval(1, TimeUnit.SECONDS)

This is RxJava code. How can we consume this using Coroutines?

We can use the openSubscription extension function on the Observable. That will return a `SubscriptionReceiveChannel` with which you can open a subscription and consume the elements as if they were emitted by a Channel.

Observable.interval(1, TimeUnit.SECONDS)
    .openSubscription().use { channel ->
        for (value in channel) {
            consumeValue(value)
        }
    }

You can also consume just the first element sent by a RxJava source of information with the await method family. You have `.await()` available on a Single, `.awaitFirst()` available on an Observable, etc.

This is how you can use the `awaitFirstOrDefault(value)` extension function.

val value = Observable.interval(1, TimeUnit.SECONDS)
                .awaitFirstOrDefault(-1)

What about the other way around? How can we consume Coroutines using RxJava?

RxJava on Top of Coroutines

There are some extension functions we can use to consume Channels and Coroutines using RxJava.

Consuming Coroutines Using RxJava

You can convert any Job to a Completable with the Job.asCompletableextension function.

val job = launch {
    heavyComputation()
}
job.asCompletable(CommonPool).subscribe({
    // Job completed
})

Let’s imagine we have a Coroutine performing a heavy computation. We can convert that Job to a Completable and subscribe to it as if it was originally created with RxJava.

In that example, we’re passing `CommonPool` as a parameter to the extension function: that’s the CoroutineContext from which the resulting completable will be signaled.

You can also use Deferred.asSingle in the same way.

val deferred = async {
    heavyComputation()
}
deferred.asSingle(CommonPool).subscribe({
    // Job completed
}, {
    // Error happened
})

CoroutineBuilders

There are CoroutineBuilders that will return a RxJava source of information. When you subscribe to it, as any other CoroutineBuilder, it will create a new Coroutine and run the suspending lambda inside it.

Methods available: rxCompletable, rxMaybe, rxSingle, rxObservable and rxFlowable.

How can use use rxCompletable, for example?

rxCompletable {
    // Suspending lambda
}.subscribe()

Unsubscribing will cancel the Coroutine.

Consuming Channels using RxJava

You can also use the ReceiveChannel.asObservable extension function to convert any channel to a hot reactive observable.

What’s Coming Next?

The fifth part in our series will be all about Operators!

Are all those amazing RxJava operators available in Coroutines? Which ones are supported by default? Which ones do I have to implement?

Manuel Vicente Vivo
Android Software Engineer at Capital One
@manuelvicnt

DISCLOSURE STATEMENT: These opinions are those of the author. Unless noted otherwise in this post, Capital One is not affiliated with, nor is it endorsed by, any of the companies mentioned. All trademarks and other intellectual property used or displayed are the ownership of their respective owners. This article is © 2018 Capital One.