MutableSharedFlow is kind of complicated
Starting with Kotlin Coroutines version 1.5.0
, BroadcastChannel
and ConflatedBroadcastChannel
were marked as ObsoleteCoroutinesApi
and developers should now use SharedFlow
and StateFlow
instead.
Kotlin documentation even gives a handy guide on how to migrate from these channels to respective Flow APIs:
To migrate BroadcastChannel usage to SharedFlow, start by replacing usages of the
BroadcastChannel(capacity)
constructor withMutableSharedFlow(0, extraBufferCapacity=capacity)
(broadcast channel does not replay values to new subscribers). Replace send and trySend calls with emit and tryEmit, and convert subscribers’ code to flow operators.
Ok, but maybe you might be wondering about this parameter extraBufferCapacity
. What does it actually do? And doesn’t MutableSharedFlow
already have replay
parameter? What’s the difference?
Documentation
In cases like these, it’s often useful (alas not always helpful) to check out what the documentation has to say about these two parameters:
replay
- the number of values replayed to new subscribers (cannot be negative, defaults to zero).
extraBufferCapacity
- the number of values buffered in addition toreplay
. emit does not suspend while there is a buffer space remaining (optional, cannot be negative, defaults to zero).
I’m not sure about you, but to me, this does not clear those questions posed above very much. I know what replay
does, but after reading the documentation, extraBufferCapacity
remains as elusive as it was before I looked into it. And some of the consequences of this parameter are not obvious at all after the first read-through.
Time to dig in deeper and play around with these concepts.
Replay
Replay parameter is easy to explain and those familiar with RxJava will know this parameter very well. Essentially, creating a MutableSharedFlow
with replay=x
is the same thing as creating ReplaySubject
in RxJava.
Let’s look at this piece of code:
runBlocking {
val testFlow = MutableSharedFlow<String>()
testFlow.tryEmit("a")
val job = testFlow
.onEach { println(it) }
.launchIn(this)
delay(100) // to give enough time for println to be executed before cancelling job
job.cancel()
}
What would be the expected output of this piece of code?
….
Well, there’s actually no output. Why?
That’s because a value is first emitted to the testFlow
and the testFlow
is collected only after a value has been emitted. Nobody is listening to the event at the time this line of code is executed: testFlow.tryEmit(“test”)
.
So if you want to cache the event until the flow is actually collected, you can use the parameter replay
. The value is an integer, that specifies how many last events should be cached. For this example, let’s settle with replay=1
. Let’s now examine this piece of code:
runBlocking {
val testFlow = MutableSharedFlow<String>(replay = 1)
testFlow.tryEmit("a")
testFlow.tryEmit("b")
val job = testFlow
.onEach { println(it) }
.launchIn(this)
delay(100) // to give enough time for println to be executed before cancelling job
job.cancel()
}
What’s the output?
…
It’s b
. Why?
We have specified replay=1
, that means, last 1
event is cached for all future collectors. There were two events emitted to testFlow
, those being a
and b
(in that order). Last emitted event is b
and therefore that’s the one that will be emitted.
It’s important to note, that if we added one more or multiple more collectors even later, they’d still receive this b
value. As per this example:
runBlocking {
val testFlow = MutableSharedFlow<String>(replay = 1)
testFlow.tryEmit("a")
testFlow.tryEmit("b")
val job = testFlow
.onEach { println(it) }
.launchIn(this)
delay(100) // to give enough time for println to be executed before cancelling job
val job2 = testFlow
.onEach { println(it) }
.launchIn(this)
job.cancel()
delay(100)
job2.cancel()
}
This time the output is:
b
b
That’s because with replay
the cache is kept for all future collectors.
Ok, great, but sometimes you don’t want this behaviour. Sometimes, if nobody is listening, it’s fine, you want to drop those events and not cache them. For example let’s say your MutableSharedFlow
sends geolocation data to collectors. You might not want any next collector to receive some cached value, you‘d maybe rather wait for the next (more up-to-date) value. In such cases, specifying reply=0
(or not specifying any value, because the default is 0
) is the way to go. (For Rx folks, this would be roughly equivalent to using PublishSubject
).
So what could possibly go wrong with this approach?
Actually, there’s one gotcha you need to be careful about.
tryEmit and non-blocking CoroutineScope
In previous examples we were collecting the flow in a blocking CoroutineScope
(created by runBlocking {}
).
In practical situations in your applications you’ll likely to use different coroutine scopes, either ones provided by your platform, or one that you create by yourself, using CoroutineScope()
method.
The code could be spread across multiple files and let’s say you’d like to use tryEmit
, instead of emit
, because you want to emit events from a normal function, not a suspending one (outside of a coroutine).
This piece of code here simulates those conditions:
fun main() {
val testSharedFlow = MutableSharedFlow<String>() runBlocking {
CoroutineScope(Job()).launch {
testSharedFlow
.onStart { println("start") }
.collect { println(it) }
} delay(100) // to give enough time for println to be executed before execution finishes
} testSharedFlow.tryEmit("a")
testSharedFlow.tryEmit("b")
}
So what is this code going to output? Events are emitted after a collector started collecting, so we should see both a
and b
emitted, right?
Nope
The surprising answer is, that we’ll only receive start
as output of the above piece of code. What gives?
This is going to be a bit complicated to answer, but stick with me, please.
We have to think about two parties involved into this process. Emitters and Collectors. Emitters are the ones pushing events to the MutableSharedFlow
. Collectors are reading it from the flow.
Emitters and Collectors are independent of each other. Emitters try to emit an event to the MutableSharedFlow
and they don’t necessarily wait for Collectors
to collect them.
What tryEmit
method does, in order to not block thread until an event is collected is that it sends a value to MutableSharedFlow
‘s cache. If you’d use replay=1
or more, then you’d specify some buffer size (1
or more) and the event will be remembered in the cache until it is collected by the collector. Without that, the cache size is 0 and therefore the event is just dropped.
But as we’ve discussed above, there are cases, when using replay
is unacceptable. How to resolve this issue?
Option 1: emit instead of tryEmit
One way to go would be to use emit
instead of tryEmit
:
fun main() {
val testSharedFlow = MutableSharedFlow<String>() runBlocking {
CoroutineScope(Job()).launch {
testSharedFlow
.onStart { println("start") }
.collect { println(it) }
} delay(100) // to give enough time for println to be executed before execution finishes
}
runBlocking {
testSharedFlow.emit("a")
testSharedFlow.emit("b")
}
}
This time the output will look like this:
start
a
b
But why should this even work in the first place?
That’s because emit
is a suspending function. So emit
can actually wait for collectors to process the event and continue with the rest of the code only after that.
Ok, but that’s cheating a little bit. Remember how we said above, that we would like to use tryEmit
specifically so we could emit events also outside of a coroutine? emit
being a suspending function forces us to call it within a coroutine. How can we make tryEmit
work?
Option 2: extraBufferCapacity
If you still remember how this article started, we were curious about this parameter extraBufferCapacity
.
Let’s first see it in practice and then discuss, what is going on. Following the migration guide, we’d probably end up with something like this:
fun main() {
val testSharedFlow = MutableSharedFlow<String>(extraBufferCapacity = 1) runBlocking {
CoroutineScope(Job()).launch {
testSharedFlow
.onStart { println("start") }
.collect { println(it) }
} delay(1000) // to give enough time for println to be executed before execution finishes
} testSharedFlow.tryEmit("a")
testSharedFlow.tryEmit("b")
runBlocking { delay(100) } // to give enough time for println to be executed before execution finishes
}
The output of this piece of code will be:
start
a
Ok, so why do we now see only a
? Where did b
go? Why do we even see a
?
Let’s go back to what we said about the tryEmit
method:
What
tryEmit
method does, in order to not block thread until an event is collected, is that it sends a value toMutableSharedFlow
‘s cache.
That’s right. And when you create a mutable shared flow like this MutableSharedFlow<String>()
, the internal cache size is 0.
As mentioned before, since the cache size is 0, the event is immediately dropped. Now you’re probably starting to figuring it out. extraBufferCapacity
parameter specifies cache size for events that are sent to MutableSharedFlow
by tryEmit
(or in some cases even emit
) without causing any replay of events for new collectors.
And since in our example above the extraBufferCapacity
is set to 1
, only one event is cached and that is a
.
But why a? Why not b?
This sounds fishy, doesn’t it? Specifying replay=1
in a similar example outputs b
, extraBufferCapacity=1
outputs a
. That’s weird. Why?
The reason for that is, that replay cache attempts to replay last events passed to it. So when a new event comes in, and the replay cache is just 1
, it will overwrite that event. It sort of makes sense.
With extraBufferCapacity
, that one is there only to provide a cushion for fast Emitters and slow Collectors. That is, if Emitters produce events faster than Collectors can collect them. By default, first events are cached and after the cache is full, any other event is just dropped. Similar to if extraBufferCapacity
is 0
— which is actually the default value.
But I don’t want to drop NEW events :-(
As I wrote above, this is the default behaviour. This default behaviour however, might not fit your needs. Luckily for us, there is a way to change the default behaviour, by specifying parameter onBufferOverflow
.
As usual, let’s check what the documentation has to say about this parameter:
onBufferOverflow
- configures an emit action on buffer overflow. Optional, defaults to suspending attempts to emit a value. Values other than BufferOverflow.SUSPEND are supported only whenreplay > 0
orextraBufferCapacity > 0
. Buffer overflow can happen only when there is at least one subscriber that is not ready to accept the new value. In the absence of subscribers only the most recent replay values are stored and the buffer overflow behavior is never triggered and has no effect.
And which values of BufferOverflow
are available? It’s SUSPEND
, DROP_OLDEST
and DROP_LATEST
. SUSPEND
strategy is explained in the documentation above and matches what I wrote above about emit
method, adding few more details, about WHEN it actually suspends and when it can execute immediately.
You may have guessed, that in order to keep the new events and drop the old ones, BufferOverflow.DROP_OLDEST
strategy should be used. Indeed this piece of code:
fun main() {
val testSharedFlow = MutableSharedFlow<String>(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) runBlocking {
CoroutineScope(Job()).launch {
testSharedFlow
.onStart { println("start") }
.collect { println(it) }
} delay(100) // to give enough time for println to be executed before cancelling job
} testSharedFlow.tryEmit("a")
testSharedFlow.tryEmit("b")
runBlocking { delay(100) }
}
will output:
start
b
Which is what you usually probably want.
Of course it may be prudent to even increase the extraBufferCapacity
to larger value, if you don’t want to miss any events.
Final thoughts
Hopefully this article has shed a little more light onto proper usage of MutableSharedFlow
. As you can see, using MutableSharedFlow
just out of the box is not as easy as it sounds. Also, the migration pattern described by the documentation, while equivalent to previous BroadcastChannel
APIs, is not necessarily what you may need (you may need to specify onBufferOverflow = BufferOverflow.DROP_OLDEST
in your application, depending on your use case).
Note: In light of this article, I have suggested to drop default values for replay
and extraBufferCapacity
parameters, because I think the default values lead to unexpected behaviour and unnoticed bugs could be introduced into many applications out there. You can support this proposal here:
https://github.com/Kotlin/kotlinx.coroutines/issues/2387#issuecomment-850774203