Introduction
Structured concurrency is a powerful concept that is important to understand to fully take advantage of the power of Kotlin coroutines.
Today’s post is the first of a series about Structured concurrency: an introduction that takes the challenge of explaining the concepts behind it in a straightforward and intuitive manner. It aims to trigger in you the “Aha!” moment that will make you say: Why doesn’t every single concurrency library use structured concurrency?! Stay tuned ;)
Unstructured concurrency
To understand something and build a good intuition around the subject, it is often helpful to start from the problems it tries to solve in the first place. So let’s consider a simple example program that does not use structured concurrency and see what problems can arise.
The following code creates three concurrent tasks that run in the background in separate threads. Each task’s job is to fetch a bunch of data from a remote service using the fetchData
function and add the resulting number to the global sum
.
1val sum = AtomicInteger(0)23// Create 3 concurrent tasks that compute the total sum in parallel4repeat(3) {5CompletableFuture.runAsync {6val data: Int = fetchData()7sum.addAndGet(data)8}9}1011// Do something useful in the meantime while the sum is being computed12Thread.sleep(2000)1314// And then use the sum.15println("The final sum is: $sum")
You should have already spotted a few problems in the snippet above:
- We never wait for the created tasks (futures) to finish their job before using the final
sum
. ThefetchData
call might take longer than expected in some of the futures, and we end up using an incomplete sum without being aware. - What if one of the futures fails? The
fetchData()
call might raise an exception because of a networking problem. The future’s failure will go unnoticed and will never be propagated to the outside. In other words, we can leak exceptions! Causing, once again, our sum to be incorrect without our awareness. - Last but not least: what if one of the futures gets stuck indefinitely and never ends? Let’s say the
fetchData
call hangs and never returns because of a deadlock or a bug in its implementation. In this case, our future and the thread hosting it will keep running in the background forever, consuming unnecessary resources. We will likely never be aware of it and will have no way of shutting it down. In other words, we can leak threads!
We can try to fix the problems above by adding some boilerplate code to keep track of the futures in a list, wait for them one by one to finish, and forward any exception that would be thrown.
1val sum = AtomicInteger(0)23// A list to keep track of the created futures4val futures = ArrayList<Future<Void>>()56repeat(3) {7val future = CompletableFuture.runAsync {8val data: Int = fetchData()9sum.addAndGet(data)10}1112// Add the future to the list13futures.add(future)14}1516// Do something useful in the meantime in parallel17Thread.sleep(2000)1819// Wait for all the futures and propagate exceptions20for (future in futures) {21future.get()22}2324// And then use the sum.25println("The final sum is: $sum")
What we did here is actually “Making our concurrency more structured”: Instead of launching the background futures in the air and losing sight of them, we keep track of them in a list and wait for their completion. That is exactly what we mean by structured concurrency: The start and the end of the concurrent paths are clear and explicit.
However, not only is this boilerplate code cumbersome to write, it is also far from being perfect:
- What if the first future encountered by our
for
loop throws an exception? In that case, thefor
loop stops iterating and propagates the exception to the outside without waiting for the remaining futures. So if one of them hangs or takes an unreasonably long amount of time to finish, we would still be potentially leaking threads! To solve this edge case, we need to catch the exception and cancel the remaining futures that are still running before propagating the error to the outside. That requires a much more involved boilerplate code than what we wrote. - We didn’t handle user cancellations. What if we want to enable the user to cancel the sum computation in the middle? Doing so would require even more boilerplate code to listen for cancellation signals and to propagate them to the futures we launched one by one.
In summary, correctly structuring concurrency is complex and requires a lot of boilerplate code to make it bug free and 100% safe in all cases.
The good news is: structured concurrency is baked into the design of the Coroutines library, and we do not have to write any boilerplate code to use it and take advantage of it! So let’s check this out!
Structured concurrency with Kotlin Coroutines
Let’s rewrite our code using coroutines and see what that looks like:
1suspend fun distributedSum() {2val sum = AtomicInteger(0)34// The coroutine scope acts like a parent that keeps track of5// all the child coroutines created inside it6coroutineScope {7// Create 3 coroutines that compute the total sum concurrently8repeat(3) {9launch {10val data: Int = fetchDataAsync()11sum.addAndGet(data)12}13}14}1516println("The final sum is: $sum")17}
This short and extremely simple code is already free from all the problems mentioned in the previous section: It cannot leak exceptions or coroutines, it will never print an incomplete sum or a wrong one, and cancellation is already possible!
Feels like magic? Let’s unpack it step by step to understand how.
1coroutineScope { // this: CoroutineScope (receiver)2...3}
We start by calling the coroutineScope
function and giving it a kotlin lambda inside which we can create any number of background coroutines using launch
. The reason why we are able to use launch
inside this lambda is because its receiver1 is a CoroutineScope
instance, of which launch
is defined as an extension function.
1coroutineScope { // this: CoroutineScope2repeat(3) {3// equivelent to: this.launch4launch {5...6}7}8}
Any background coroutine created within this block is attached as a child to this CoroutineScope
instance which keeps track of its completion and failure. It is similar to what we tried to achieve by manually keeping track of the completable futures in a list. But instead of a list, the CoroutineScope
uses a fancier hierarchical data structure called Job
. We will dig deeper into this data structure later on in this post.
Here are the goodies that come with this setup:
- We don’t need to keep track of the background coroutines ourselves, the
CoroutineScope
itself already does, and thecoroutineScope
call will suspend until all of them complete or fail. Thus, we know for sure that after this call, all our background coroutines are done; and we can then confidently use the computed sum. - If any of the background coroutines fails for a reason, the exception is caught by the
coroutineScope
and propagated to the outside. But first, it takes care of canceling the remaining coroutines that are still running2, thus making sure that no background coroutine is still pending after the exception is propagated.
Our background coroutines now have a clear and explicit lifetime defined by the coroutineScope
block beyond which it is impossible to leak any coroutines or exceptions. That’s the core idea of structured concurrency: “every time our control splits into multiple concurrent paths, we make sure they join up again” 3. As a result, our program instantly becomes safer and easier to reason about.
The CoroutineScope and the Jobs hierarchy
Let’s take a closer look at the signature of the launch
coroutine builder function:
1public fun CoroutineScope.launch(...): Job
The launch
function requires a CoroutineScope
to be usable. But why is that?
The reason is to prevent background coroutines from being created in the air without having a parent CoroutineScope
instance to keep track of them. This is the case for all coroutine builder functions, which is how Kotlin enforces structured concurrency.
The CoroutineScope
instance keeps track of its children coroutines using a hierarchical data structure called Job
. You can think of a job as a simple tree that:
- Stores children
Job
instances to keep track of them (which themselves can have other nested childrenJob
instances). - And has a status (
active
,completed
, etc.).
Coroutine builder functions (like launch
) also create their own CoroutineScope
instance as the receiver of their lambda block, making it possible to create nested coroutines! That enables building complex hierarchies of concurrently running background coroutines without ever leaking a single one of them.
1coroutineScope { // this: CoroutineScope2launch(CoroutineName("Coroutine A")) { // this: CoroutineScope (nested CoroutineScope with its own Job)3launch(CoroutineName("Coroutine A.1")) { ... }4launch(CoroutineName("Coroutine A.2")) { ... }5}67launch(CoroutineName("Coroutine B")) { ... }8}
The code above would create the following hierarchy:
The coroutineScope
call will suspend and not complete until Coroutine A
and Coroutine B
are done. Coroutine A
, on the other hand, will not complete until Coroutine A.1
and Coroutine A.2
are done. If Coroutine A.1
fails for some reason, then Coroutine A
will fail as a whole, propagating the exception to the parent job, causing coroutineScope
itself to fail. Any coroutine still running in this hierarchy will get canceled during the process.
Difference between CoroutineScope and CoroutineContext
One aspect that may confuse a lot of Kotliners is the difference between a coroutineContext
and a CoroutineScope
.
If you check the source code of CoroutineScope
, you will find the following:
1public interface CoroutineScope {2public val coroutineContext: CoroutineContext3}
Why do we need a CoroutineScope
then? Why not just use CoroutineContext
instead? The difference between both concepts as outlined by Roman Elizarov is in their intended purpose.
A CoroutineContext
is simply a data structure holding 0 or several context elements. It is just like a HashMap
and serves as a container that coroutines and their schedulers can access at any point in time to retrieve useful information from like the Dispatcher
, the coroutine’s name, etc.
On the other hand, a CoroutineScope
is like a “manager and a tracker for coroutines”. It is the entity that can create background coroutines using coroutine builder functions defined as extension methods to this class (like launch
).
To be able to fullfill its job of creating such coroutines and keeping track of them, the CoroutineScope
needs basically two things:
- A
Job
instance to keep track of the children coroutines (to enforce structured concurrency). - Some optional coroutine context elements that we want to propagate to all children coroutines created inside this scope (ex. the Dispatcher).
It turns out that both the requirements above can be satisfied by having a single coroutineContext
property:
- The
coroutineContext
property can store theJob
instance that will serve as a parent to the children coroutines created by thisCoroutineScope
. All the functions that instantiate aCoroutineScope
will create such aJob
and put it there. - It will also hold any additional context element that we want to propagate to the children coroutines.
Roman Elizarov explains this concept clearly in his article about the topic. I highly encourage you to read it as well as his other writings about Kotlin ;).
Conclusion
Structured concurrency makes your concurrency intensive programs safer and more understandable. It frees you from all the boilerplate code that you would otherwise need to write to synchronize your background tasks and make sure you are not leaking any of them.
The next topic of this series will be about Error handling and cancellation, maybe one of the trickiest aspects of structured concurrency. Stay tuned if you don’t want to miss out on that one ;) .
- The lambda receiver is the instance referred to by the
this
keyword inside this lambda’s block.↩ - This behavior can be overridden using Supervisor jobs. We will talk mpre about this in the second part of this series around Error Handling and cancellation.↩
- Quote taken from Notes on structured concurrency or go statement considered harmful↩