Introduction
Coroutines are one of Kotlin’s killer features! They do a great job simplifying concurrency intensive programs, making them feel more accessible and intuitive to write.
However, coroutines are also easy enough to misuse. One of the common mistakes that programmers make is accidentally injecting blocking calls inside coroutines (we will see what that means in a second), causing the whole application to perform slower and feel unresponsive. It might not even be you who introduced this blocking call in the first place, but a library function marked as suspend
that you are using deep inside your program! And when this happens, chances are that it will take more than a few minutes to troubleshoot and figure out why.
This blog post is here to hopefully save you from wasting your time as I once did! 😅 . We will see how to quickly detect illegitimate blocking calls with BlockHound. But before that, we will cover some theory around how coroutines work, what causes them to be blocked, and a few strategies to handle blocking calls within a coroutines-based application. If you are already a coroutines expert and are not interested in the theory part, feel free to skip to the last part of this post, where we demo the usage of BlockHound.
The symptoms: what causes coroutines to become slow?
When you start feeling that your coroutines based application is becoming slow and unresponsive, in most cases (bets on the table!), the cause is one of these two:
- The CPU of your machine is the bottleneck! This is probably the first thing to check. The coroutines machinery is a complex beast and needs CPU to run! Orchestrating and scheduling the many coroutines you have in your application is, in fact, a CPU-intensive task. If all the cores of your machines are nearing 100% when your program is running, then the whole machine will become slow, not only your coroutines! The way out in this case is to either optimize your program to use less CPU, distribute the work more evenly across the available cores, or simply upgrade your machine.
- You are inadvertently blocking your coroutines! This mistake is unfortunately easy to make but often hard to find and recover from. This is what we will focus on in the rest of this post.
How is a coroutine blocked?
Let’s take a step back and try to form a high-level understanding of how coroutines work behind the scenes and what blocking them means.
1launch {2println("Hey I'm a coroutine!!")3delay(5000) // suspends4println("I'm done!")5}
Coroutines are often referred to as “lightweight threads”. When a coroutine is created, like in the example above, it runs concurrently alongside other coroutines. However, the analogy with threads stops pretty much here.
Coroutines are run by one or a small number of threads (typically the number of cores in your machine). Those threads are called workers in Kotlin’s jargon. One Worker (thread) juggles between several coroutines. It starts working on a coroutine, then suddenly stops its execution in the middle (suspension) to resume working on a completely unrelated coroutine (continuation). This suspension and continuation all happen behind the scenes with the help of some magic from the Kotlin compiler!
A worker (thread) is only allowed to do this jump from one coroutine to the other when it meets a suspension point inside the coroutine it is currently running. A suspension point is declared via the suspendCoroutine
function or the likes. In practice, however, you will rarely encounter those functions directly. Instead, you will typically use higher-level functions marked with the suspend
keyword that use these lower-level constructs behind the scenes.
Problems start when your coroutine takes a long time executing code without suspending. Consider the following example:
1suspend fun httpRequest(url: String): String {2...3}45fun saveResponse(response: String, filename: String) {6...7}89launch {10val response = httpRequest("https://example.com/data.json") // Takes about 5s!11saveResponse(response, filename="/tmp/response.txt") // Takes about 2s!12}
In the code above, the coroutine created via the launch
builder starts by performing an http request to fetch some remote data. This call takes around 5s. As it is suspending (i.e., the function is marked with the suspend
keyword, and we assume the author’s implementation is correctly suspending), our Worker is not blocked during that time! It will jump out of this coroutine’s execution to progress on other coroutines while the http call is in progress. Thus, the Worker’s time here is not being wasted. So far, so good!
Now, 5s have elapsed, the http call is done, our worker thread comes back to resume from where it left off. The next step is to write the response into a file. This step takes around 2s. However, the call is not suspending! (Note the absence of the suspend
keyword in the saveResponse
function’s signature). Consequently, the worker thread has no other choice than to sit idle waiting for the call to finish. It cannot jump out to make progress on any other coroutine. In other words, our worker thread is blocked!
In a multithreaded runtime like Kotlin’s, the default coroutine dispatcher usually has multiple worker threads, so the issue is not as bad as if it were to happen in a single-threaded runtime like Javascript’s or Python’s. But still, if you block all your worker threads this way, no progress is made on any of the other coroutines, and that’s when you start noticing slowness or unresponsiveness in your app!
What should I do if I need to block inside my coroutines?
In the previous section, we have seen that blocking calls inside coroutines should be avoided. However, in reality, this is not something we can do all the time! Sometimes you might need to use a library function that synchronously performs blocking tasks, and you don’t find an asynchronous alternative (with futures or callbacks). There are, unfortunately, tons of examples in the Java ecosystem. A few that I encountered myself include the Kubernetes java client or the Kafka consumer client.
So how can we perform those blocking calls within our coroutines based application while keeping it performant and responsive?
The general idea is to run them in a separate worker pool or dedicated threads to keep your main worker pool1, where most of your coroutines are running, free from blocking code. There are multiple ways how you can achieve this in practice.
Using Dispatchers.IO
The first solution is the simplest and the most widely used.
The idea is to isolate your blocking code by running it within the Dispatchers.IO
, a global shared pool of worker threads whose primary purpose is to run blocking tasks. This worker pool contains 64 threads by default, making it less sensitive to blocking tasks. You would need to make all of the 64 threads simultaneously blocked for it to start causing slowness or unresponsiveness to your app!
Using this dispatcher can be done as the following:
1launch {2val response = httpCient.request()3withContext(Dispatchers.IO) {4saveResponse(response, filename="/tmp/response.txt")5}6}
It is easy to feel safe using this dispatcher widely across your application whenever you have a blocking call to make. However, you need to be careful: this pool has a limited number of workers. And as it is global, shared across your whole application, and might also be used by your library functions without you knowing, it’s much easier to exhaust than you might think! It happened to me more than once that I ran into performance issues in my coroutines and then realized that I was saturating the Dispatchers.IO
threads!
That’s why I’m usually reluctant to using this dispatcher unless I’m writing a small application or when the predictability of performance is not critical.
The good news is: these issues are entirely mitigated by using Dispatchers.IO
views, a feature available since version 1.6.0 of kotlinx-coroutines-core
, making this dispatcher much more useful and safer to use!
Creating views from the Dispatchers.IO
Since version 1.6.0 of kotlinx-coroutines-core
, it is possible to create “views” from coroutine dipsatchers. This feature, especially when used on Dispatchers.IO
, is a very effective way to isolate blocking calls.
Let’s check an example of the usage before explaining the underlying concepts:
1val dispatcher = Dispatchers.IO.limitedParallelism(100)23launch {4val response = httpCient.request()5withContext(dispatcher) {6saveResponse(response, filename="/tmp/response.txt")7}8}
The limitedParallelism
function creates a “view” of the current dispatcher that limits the parallelism to the given value (100 in the example above). The resulting view uses the original dispatcher for execution but guarantees that no more than the specified number of coroutines are executed simultaneously.
Generally, you cannot create a view with a parallelism level that is superior to the original dispatcher’s parallelism level. However, the Dispatchers.IO
has a unique property of elasticity when creating views, allowing you to create as many views as you want with the desired level of parallelism. You can read more about this important behavior here. That’s the reason why, in the example above, we were able to create a view with a parallelism level of 100 even though the Dispatchers.IO
is capped to 64 threads! That was not a mistake 😉.
This is my default approach for handling blocking calls in most situations unless I am using an older version of Coroutines.
Using a dedicated thread pool as a Coroutine Dispatcher
You can also create a dedicated thread pool to offload your blocking calls. This approach is a good alternative if you don’t want to rely on the shared Dispatchers.IO
(before dispatcher views became available).
1val dispatcher = newFixedThreadPoolContext(10)23// or4val dispatcher = Executors.newCachedThreadPool().asCoroutineDispatcher()56launch(dispatcher) {7...8}910// It becomes your responsibility to close the resulting dispatcher!11dispatcher.close()
The resulting dispatcher becomes your responsibility to manage. In particular, you need to remember to close it when you are done with it.
Spinning up a dedicated thread to run a long-running blocking task
I often use this approach when I have a long-running blocking task that produces some data that I want to pass to my coroutines through a Channel or a Flow. A nice example of this is using the Kafka consumer client and making it poll records from Kafka as a Flow:
1fun <K, V> KafkaConsumer<K, V>.pollAsFlow(topics: List<String>): Flow<ConsumerRecord<K, V>> =2callbackFlow {3thread(name = "kafka-consumer-flow-thread") {4try {5subscribe(topics)6// `isActive` here refers to the producer scope of the `callbackFlow`7while (isActive) {8// Consume new records from Kafka (blocking)9val records = poll(Duration.ofSeconds((3)))10for (record in records) {11// Send the record downstream using the flow's `trySendBlocking` method12trySendBlocking(record).getOrThrow()13}14commitSync() // Blocking15}16} catch (exception: Exception) {17close(exception)18} finally {19close()20}21}2223awaitClose {}24}2526// usage27launch {28consumer29.pollAsFlow(listOf("my_topic"))30.collect { println(it) }31}
If you are not familiar with flows, I highly recommend checking them out! They will very likely level up your asynchronous programming skills!
Using Blockhound to detect blocking calls inside coroutines
Enough with theory! Let’s get to the fun part!
In practice, one efficient way to detect blocking calls inside your coroutines is by using BlockHound.
BlockHound is a Java Agent built by the Reactor team that detects blocking calls from non-blocking threads. When it does, BlockHound throws an exception with a helpful error message pointing us to the exact place where the blocking call occurred. It is suitable for local debugging during development, unit, and integration tests.
But first, we need to instruct BlockHound what threads we consider non-blocking (i.e., in which we want to forbid blocking calls). This is done through integrations. BlockHound already provides built-in integrations with Reactor & RxJava, allowing it to accurately detect illegitimate blocking calls in both frameworks’ event loops.
Fortunately for us, the Kotlin team also built a BlockHound integration for Kotlin Coroutines. It is packaged & distributed via the kotlinx-coroutines-debug project.
This integration configures BlockHound to intercept blocking calls in Coroutines launched in the Dispatchers.Default
worker pool. This is an important thing to keep in mind when using this integration: Blocking calls in coroutines spun up in other dispatchers (including those you build yourself) will not be intercepted. If this part is not clear, no worries, it will become more apparent after the demo below.
To demonstrate the usage of BlockHound, let’s consider a short Kotlin code with a suspend
function containing an ugly blocking call:
1/**2* A suspend function with an awful blocking sleep!3*/4suspend fun httpCall() {5Thread.sleep(200)6delay(5000)7}89fun main(): Unit = runBlocking {10httpCall()11}
Obviously, in reality, the evil blocking calls inside your coroutines will rarely be this visible! Usually, you will have to find them hidden deep inside your functions in unexpected places (such as middlewares, callbacks, etc.).
To use Blockhound, we need to inject its maven coordinates in our build system and kotlinx-coroutines-debug
as it provides the coroutines Blockhound integration. The example below uses Gradle:
1dependencies {2implementation(platform("org.jetbrains.kotlin:kotlin-bom"))34implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")5implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.5.2")67implementation("org.jetbrains.kotlinx:kotlinx-coroutines-debug:1.5.2")8implementation("io.projectreactor.tools:blockhound:1.0.6.RELEASE")9}
To prepare BlockHound to monitor our threads, we should call the BlockHound.install()
method and supply it with a CoroutinesBlockHoundIntegration
instance:
1import kotlinx.coroutines.debug.CoroutinesBlockHoundIntegration2import kotlinx.coroutines.delay3import kotlinx.coroutines.runBlocking4import reactor.blockhound.BlockHound56/**7* A suspend function with an awful blocking sleep!8*/910suspend fun httpCall() {11Thread.sleep(200)12delay(5000)13}1415fun main(): Unit = runBlocking {16BlockHound.install(CoroutinesBlockHoundIntegration())17httpCall()18}
Now, if you run this program, surprisingly, nothing happens! BlockHound did not intercept our blocking call. As I mentioned above, this is due to how this integration is built: it only intercepts blocking calls when running in the context of Dispatchers.Default
(or Dispatchers.Main
). So let’s modify our program to use that context instead:
1import kotlinx.coroutines.Dispatchers2import kotlinx.coroutines.debug.CoroutinesBlockHoundIntegration3import kotlinx.coroutines.delay4import kotlinx.coroutines.runBlocking5import reactor.blockhound.BlockHound67/**8* A suspend function with an awful blocking sleep!9*/1011suspend fun httpCall() {12Thread.sleep(200)13delay(5000)14}1516fun main(): Unit = runBlocking(Dispatchers.Default) {17BlockHound.install(CoroutinesBlockHoundIntegration())18httpCall()19}
Now, if we rerun our program, we will get a nice error message pointing it to the exact place where the blocking call is happening:
1Exception in thread "main" reactor.blockhound.BlockingOperationError: Blocking call! java.lang.Thread.sleep2at java.base/java.lang.Thread.sleep(Thread.java)3at coroutines.debug.MainKt.httpCall(Main.kt:13)4at coroutines.debug.MainKt$main$1.invokeSuspend(Main.kt:19)5at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
Conclusion
Accidental blocking calls are one of the main causes of performance issues in Coroutines-based applications. They can come from your library functions, middlewares, or other unexpected places. Using BlockHound in your automated tests should allow you to detect them preemptively before they reach your production environment. Once you identify your guilty blocking calls, you can use the strategies we have seen earlier to deal with them.
Last but not least, I am currently creating an online course dedicated to learning Kotlin Coroutines. If you are interested, make sure to check it out and subscribe for updates to be notified of its release 😉.
- Usually the
Dispatchers.Default
↩