Introduction
In part 1 of this series, we introduced the concept of structured concurrency. We showed how it immediately makes concurrent programs safer, more correct, and easier to reason about. If you missed this part, I definitely recommend checking it out!
To achieve this safety in every situation, including when things fail, structured concurrency is very strict and opinionated about the way exceptions are handled and propagated throughout coroutines job hierarchies. This behavior can sometimes seem counterintuitive and confusing if not understood correctly. This is precisely what we will cover in this part 2 of the series. More specifically, we will go over the following topics:
- Exception handling inside a coroutine scope
- The confusing part!
- Using
supervisorScope
when child failures are not fatal - Cancellation and blocking code
Ready? Let’s go!
Exception handling inside a coroutine scope
Let’s start our journey with this simple example: We want to create a simple pipeline between a Producer
coroutine and a Consumer
coroutine exchanging data through a channel. Both coroutines run concurrently within a coroutine scope:
- The
Producer
continuously calls thefetchDataFromRemote()
function to download some data from a remote service before piping it out to a channel. - The
Consumer
continuously polls data from the channel and prints it on the screen.
1fun runPipeline() {2coroutineScope {3val channel = Channel<String>()45launch(CoroutineName("Producer")) {6while (true) {7val data = fetchDataFromRemote()8channel.send(data)9}10}1112launch(CoroutineName("Consumer")) {13for (data in channel) {14println(data)15}16}17}18}
In the setup above, our coroutines job hierarchy would look like the following:
Now, let’s imagine that the fetchDataFromRemote()
call throws an exception (ex. connectivity error).
1fun runPipeline() {2coroutineScope {3val channel = Channel<String>()45launch(CoroutineName("Producer")) {6while (true) {7// Boom!! Throws an exception!!!8val data = fetchDataFromRemote()9channel.send(data)10}11}1213launch(CoroutineName("Consumer")) {14for (data in channel) {15println(data)16}17}18}19}
This would cause the Producer
coroutine to fail. What do you think would happen then? Will the exception go unnoticed as we did not log it anywhere? Will we end up with the Consumer
coroutine running alone forever, desperately waiting for some data to arrive in the channel?
Fortunately, none of those bad things would happen thanks to structured concurrency: The parent job will notice the Producer
coroutine’s failure and will immediately take care of cancelling the other children within the scope (to prevent them from staying active forever) before bubbling the exception up to the outside.
Let’s review this workflow step by step (an animation below will further summarize it):
- It all starts with the
fetchFromRemote()
throwing an exception, causing theProducer
coroutine to fail. - The parent job notices the failure of one of its children and, consequently, marks itself as failed and cancels all its other children that are still running.
- The child coroutines that are cancelled will throw a
CancellationException
on their next call to a suspending function (or immediately if they are already suspended). That’s how they trigger their exit. - Once all child coroutines are stopped, the original exception is forwarded out of the
coroutineScope
call.
These steps are summarized by the animation below1 :
This is how structured concurrency saves us from leaking exceptions and coroutines when failures occur!
The confusing part
The error handling behavior we saw above is desirable in most scenarios. But sometimes, it might be extremely confusing. Especially when we want to handle the exceptions of the child coroutines ourselves.
Let’s demonstrate this using a slightly different example: We want to fetch and print the Github profile of a user on the screen. We decide that failures are not fatal: if we fail to fetch the Github profile of the user (ex. connectivity error), we just want to print an error message on the screen and prevent the exception from bubbling up.
1coroutineScope {2val profile = async(CoroutineName("Fetcher")) {3// This call might fail!!4fetchGithubProfile("wlezzar")5}67try {8println(profile.await())9} catch (err: Exception) {10println("Failed to fetch data from github: ${err}")11}12}
The associated job hierarchy would look like this:
We would think that catching the exception during the .await()
call would safely prevent the exception from bubbling up. But, unfortunately, that’s not the case! In fact, if you run the program above and simulate a failure in the fetchGithubProfile
call, the whole program will crash, and the exception will propagate outside of the coroutineScope
. Why is that?
Again, the explanation is the same as in the previous example: by the time we catch the exception at the .await()
call, it is already too late, the associated child coroutine (spun up using async
) would have already failed as we did not catch the exception inside the async { ... }
block. Thus, the same mechanics as previously will happen: The parent job will notice its child failure, marks itself as failed, cancels the other remaining child coroutines, and bubbles the exception up. The try ... catch
around the .await()
only allows us not to exit immediately and gives us a chance to react to the failure (ex. do some resources cleaning).
To prevent this scenario from happening, we have two options:
- Either we catch the exception inside the
async
block itself. This would prevent the child coroutine from failing in the first place. - Or we could use
supervisorScope
instead ofcoroutineScope
to ignore child coroutine failures.
Using supervisorScope
when child failures are not fatal
Sometimes, like in the last example, child coroutines failures are not fatal and should not cause the entire job hierarchy to fail. Instead, we would like to react to those failures in our own way. This is why supervisorScope
exists. It lets us do exactly that.
supervisorScope
is a slightly modified version of coroutineScope
. Both have the same purpose of creating a new coroutine scope with an explicit lifetime in which we can spin up child coroutines. The difference between the two is that: supervisorScope
uses a SupervisorJob
as the parent job to hold the child coroutines whereas coroutineScope
uses a regular Job
instead.
SupervisorJob
is a special implementation of Job
that ignores child failures. When those occur, it does not cancel the other children coroutines and does not bubble up the exception.
Here is how it can be used in the previous example:
1supervisorScope {2val profile = async(CoroutineName("Fetcher")) {3fetchGithubProfil("wlezzar")4}56try {7println(profile.await())8} catch (err: Exception) {9println("Failed to fetch profile data: ${err}")10}11}
Now, when a failure happens in the fetchGithubProfile
call, this will still cause the child coroutine to fail but the parent job (SupervisorJob
) will ignore this failure, and the supervisorScope
will not bubble it up to the outside.
One critical point to keep in mind: whenever we use a supervisor job as a parent in a scope, it becomes very important to catch the exceptions of child coroutines ourselves, otherwise they would be lost!
The way we handle a child coroutine’s exception when using a supervisor job as a parent depends on whether this coroutine was spinned up using launch
or async
:
- Failures of coroutines spinned using
async
are caught by wrapping the.await()
inside atry...catch
. - On the other hand, failures of coroutines spinned up using
launch
can be handled using coroutine exception handlers.
Cancellation and blocking code
When failures occur, the parent job is responsible for triggering the cancellation of its child coroutines. However, cancellation is cooperative! This means that when coroutines are asked to cancel, they need to cooperate by listening to those cancellation signals and reacting appropriately (ex. by exiting). You cannot force them to! In this case, we say that a coroutine is cooperative to cancellation or cancellable.
In the Kotlin coroutines world, you rarely need to worry about making your coroutines cooperative because all the suspending functions (like delay
, consuming from a channel, etc.) are natively already cancellable. Thus, your code will usually be cancellable right away without any extra effort.
However, when running a blocking code inside coroutines, you do need to worry about making it cooperative to cancellation.
Here is a hypothetical example:
1// Blocking code2fun someSlowBlockingCode() {3// Do some computation4}56val job = launch(Dispatchers.IO) {7someSlowBlockingCode()8}910job.cancel()11job.join() // Will hang until the blocking code is done!
The call to someSlowBlockingCode()
is not coroutines aware. It will not suspend and will never react to cancellations. Consequently, canceling the associated job will not cause the blocking function to interrupt. Instead, it will keep running until it’s done.
How can we fix this?
There are several approaches depending on whether you own or not the implementation of the blocking code.
One approach to make a blocking code cooperative to cancellations is to wrap it inside a runInterruptible
block, which is specifically built for that purpose.
1val job = launch(Dispatchers.IO) {2runInterruptible {3someSlowBlockingCode()4}5}67job.cancel()8job.join()
When the coroutine is cancelled, runInterruptible
interrupts the underlying thread before throwing a CancellationException
to the outside. However, for this to work, the blocking call underneath must itself be cooperative to thread interupts which is not guaranteed! This is usually the case when:
- The thread is blocked on
Object.wait()
,Thread.sleep()
, orThread.join()
- Most
java.util.concurrent
structures are interruptible. - When using Java NIO (but not java.io).
You can find more information around thread interrupts in the Thread.interrupt()
javadoc.
Despite not being a bullet proof solution, the runInterruptible
function is still very useful in many situations, especially when you don’t have control over the implementation of the underlying blocking code.
Otherwise, if you do own the blocking code’s implementation, another way to make it cooperative to cancellations is by periodically watching the value of the isActive property of the enclosing coroutine scope. When this value is false
, it means that the enclosing scope has been cancelled, and the blocking code should exit. Here is an example:
1fun someSlowBlockingCode(scope: CoroutineScope) {2while (scope.isActive) {3// Do some short living blocking operation4}5}67val job = launch(Dispatchers.IO) {8someSlowBlockingCode()9}1011job.cancel()12job.join()
Conclusion
Error handling in the context of coroutines is certainly one of the most challenging parts to understand and to master. Hopefully, this post gave you some clarity on its inner workings and the reasons why it was designed this way.
So far we have only seen how to create coroutine scopes inside suspend
functions using coroutineScope
and supervisorScope
. In the third and last part of this series, we will discover another useful way of creating independent scopes that are not tied to the lifetime of a function call. This can be achieved using the CoroutineScope
factory function. We will also talk about context elements and how they allow us to easily pass data to child coroutines. Stay tuned!
- The names used to express the statuses of the jobs in the different steps are for clarity and illustration purpose. Please check the documentation of
Job
for the actual state machine and status names.↩