Coroutines and RxJava — An Asynchronicity Comparison Part 1

Asynchronous Programming

Introduction

In this blog series I will compare Kotlin Coroutines and RxJava since they are both trying to solve a common problem in Android development: Asynchronous Programming.

Use Case: Async Heavy Objects Creation for a Fast App Launch!

Launch-Time performance matters! If you want your app to launch really fast, it’s important how you handle Object creation.

There are some computations that can take some time (e.g. initializing heavy objects). You don’t want that to happen in the UI thread! If so, your app will skip frames and the user will have a laggy experience. What can affect this? SDK initialization, creating crypto objects, etc.

For our first use case, we just want to perform a heavy operation in the background thread.

RxJava

Even though RxJava is a Reactive Extensions library for the JVM, you can also use it to run asynchronous tasks in the background.

In this scenario, we omit the RxJava ability of transferring streams of elements, we just want something to be initialized.

What RxJava building operator do we need to achieve this? A Completable!

As per RxJava documentation: a Completable represents a deferred computation without any value but only indication for completion or exception.

How Can We Create a Completable that Initializes the Objects We Want?

    fun initializeObjectsAsync(): Completable {
    return Completable.create { emitter ->
        try {
            heavyInitialization()
            if (emitter != null && !emitter.isDisposed) {
                emitter?.onComplete()
            }
        } catch (e: Exception) {
            if (emitter != null && !emitter.isDisposed) {
                emitter?.onError(e)
            }
        }
    }
}
  

As you can see, we’ve created a function that will return a Completable object. Inside the function, we’re creating our custom Completable with `Completable.create` that will take an emitter (the object that will potentially subscribe to this).

After performing the heavy initialization, we’re going to notify the emitter that it succeeded. If there was an error, we are going to notify on the error that occurred. This is because the emitter is of type CompletableEmitter and `onComplete` and `onError` are the methods available to communicate the result to the Subscriber.

Another way you can do this is with `Completable.fromCallable()`

    fun initializeObjectsAsync(): Completable {
    return Completable.fromCallable({
            heavyInitialization()
    })
}
  

How Can We Consume That Function?

As we know, Observables, and (per extension) Completables, are cold in RxJava. That means that only when we subscribe will the code inside the `Completable.create` be executed. Something to keep in mind is that it will be executed every time that you subscribe to it.

We have to subscribe to the Completable we created in the `initializeObjectsAsync` function we created above.

    fun initializeObjects() {
    initializeObjectsAsync()
        .subscribeOn(Schedulers.computation())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe({
            // The initialization succeeded!
            // We can perform UI changes here 
        }, {
            // An Error occurred!
        })
}
  

How do we tell RxJava we want the initialization to be executed on the background thread? We use the `subscribeOn` operator to tell RxJava we want the code inside `Completable.create` to be executed on the background thread.

Since we want to make some UI changes when the initialization has finished, we can use the `observeOn` operator to tell RxJava we want to listen for the result on the Android main thread.

Only when you subscribe to the Completable, will the code inside Completable.create be executed.

After defining the threading, we want to subscribe to kick off the Completable and get notified when it completes. We do that with `.subscribe(successBlockOfCode,failureBlockOfCode).` We pass two blocks of code: the first one defining the success scenario, and the second one the failure scenario.

If we want this code to be executed when we create an Android Activity, for example, we can call this method in the onCreate method.

    override fun onCreate(savedInstanceState: Bundle?) {
    super.onCreate(savedInstanceState)
    setContentView(R.layout.activity_main)

    initializeObjects()
}
  

How can we do the same with Coroutines?

Kotlin Coroutines

With Coroutines this is even simpler! Conceptually, a coroutine is similar to a thread in various ways: we can write sequential code that can be run on a particular thread.

I will write another blog post about threading in both RxJava and Coroutines. For now, you need to know that we can define the threading policy of a coroutine inside the CoroutineContext with a CoroutineDispatcher. We can create a coroutine with a specific CoroutineContext with a CoroutineBuilder.

A CoroutineBuilder is a function that creates a Coroutine, runs a block of code, and gives you access to its result in some form. Examples of CoroutineBuilder are: launch, async, runBlocking…

Assuming we want to call the `heavyInitialization()` method as we did before in the `onCreate` method, we can create a Coroutine with the CoroutineBuilder launch and perform the heavy initialization in the block of code that is being run.

    fun initializeObjects() {
    launch(CommonPool) {
        heavyInitialization()
    }
  

The CoroutineDispatcher CommonPool is similar to RxJava `Schedulers.computation()`. This will run the code in the background and we don’t have to worry about anything else.

Let’s imitate the example we built with RxJava: we want to know when it finished initializing the heavy objects and error handling.

    fun initializeObjects() {
    launch(CommonPool) {
        try {
            heavyInitialization()
            // The initialization succeeded!
            withContext(UI) {
                // We can perform UI changes here
            }
        } catch (e: Exception) {
            // An Error occurred!
        }
    }
}
  

Since the code inside a Coroutine (what is called the suspending lambda) is executed sequentially, the line after `heavyInitialization()` will be executed when the initialization succeeds.

As before, we can wrap the call inside a try catch block for the error handling. It will work in the same way.

How can we swap to the UI thread and perform the UI changes? There’s a function that you can call inside a Coroutine called `withContext` that uses another CoroutineContext to run the block of code inside it. Basically, it will swap to the Android UI thread to execute that code.

The UI CoroutineContext that we see in the example is not part of the standard Kotlin Coroutines library. Since it’s Android specific, it’s available in this other library: `org.jetbrains.kotlinx:kotlinx-coroutines-android:$kotlin_coroutines_version` .

Second Use Case: Background Processing With a Fibonacci Example

For our second use case, apart from executing something in the background, we also want to return a value. Now, we’d like to calculate the Fibonacci of a number when the user presses a button.

Imagine we have the following code to calculate the Fibonacci of a number:

    private fun fib(n: Long): Long {        
    return if (n <= 1) n        
    else fib(n - 1) + fib(n - 2)    
}
  

How can we calculate this in the background and return the result?

RxJava

We’ve seen most of these concepts in our first use case. We need a RxJava building operator that can return an object. We can use a Single for that!

    fun fibonacciAsync(number: Long): Single = 
    Single.create({ emitter ->
            val result = fib(number) 
            if (emitter != null && !emitter.isDisposed) {       
                 emitter.onSuccess(result)
            }
})
  

The code inside the `Single.create` will be executed when we subscribe to it, and we can consume it in the same way we did before. When we receive the user interaction, we just subscribe to the Single that the fibonacci method returns.

You can also use the fromCallable function that we saw in the Completable example:

    fun fibonacciAsync(number: Long): Single = 
    Single.fromCallable({
        return fib(number)
    })
  

We pass the number we want as parameter of that function, which will also be used inside the Single.create block of code. We can get that number from an EditText, for example.

    @OnClick(R.id.my_button)
fun onButtonClicked() { 
    fibonacciAsync(numberInputEditText.text.toString().toLong())
       .subscribeOn(Schedulers.computation())
       .observeOn(AndroidSchedulers.mainThread())
       .subscribe({ fibonacciNumber -> 
           //Update UI with the result 
           myTextView.text = fibonacciNumber
       },{
           // Error happened
       })
}
  

Every time the user clicks on the button, we’re going to calculate a new Fibonacci number. If the user changes the value inside the numberInputEditText, the result will be different!

Kotlin Coroutines

This example is as easy as the one above. When the user taps on the button, we want to start a Coroutine and calculate the Fibonacci of the number we want. We already know everything we need to do it:

    @OnClick(R.id.my_button)
fun onButtonClicked() { 
    launch(CommonPool) {
        val result = fib(
            numberInputEditText.text.toString().toLong()
        )
        withContext(UI) {
            fibonacciResultTextView.text = result
        }
    }
}
  

What’s Coming Next?

The second part of this series will be about Cancelling Execution!

How do you cancel an Observable? And a Coroutine? Don’t miss it next week!


Manuel Vicente Vivo, Android Software Engineer at Capital One

Related Content