Kotlin — Compose: Observable flow with MutableSharedFlow

Tezov
ITNEXT
Published in
3 min readDec 17, 2023

--

In this story, I’m going to demonstrate a straightforward observable flow that can be listened to once, forever, or until a condition is met. To achieve this, we will use the MutableSharedFlow.

unsplash : Tobias Carlsson

MutableSharedFlow

MutableSharedFlow has various ways to be used, and I won’t list them all. Here my use case: I need a flow that I can listen to. The listener receives a value only when it is emitted. If the emitted value was emitted before the listener registered to the flow, it won’t be retransmitted. The flow doesn’t need to have any buffer.

class Emitter<T:Any>(
replay: Int = 0,
extraBufferCapacity: Int = 1,
onBufferOverflow: BufferOverflow = BufferOverflow.DROP_OLDEST
) {
internal val flow = MutableSharedFlow<T>(
replay = replay,
extraBufferCapacity = extraBufferCapacity,
onBufferOverflow = onBufferOverflow
)

fun tryEmit(event: T) = flow.tryEmit(event)

suspend fun emit(event: T) = flow.emit(event)

val createCollector get() = Collector(this)
}

As you can see, since I want to be able to use it in another context, I allow passing arguments that can override the default value.

Collector

I want to separate the roles, I added another class that I called “Collector.” When you create an Emitter, you must keep it private from all listeners and just give them access to the collector.

class Collector<T:Any>(
private val emitter: Emitter<T>
) {

fun once(scope: CoroutineScope, block: suspend (T) -> Unit) = emitter.flow.collectOnce(scope, block)

fun forever(scope: CoroutineScope, block: suspend (T) -> Unit) = emitter.flow.collectForever(scope, block)

fun until(scope: CoroutineScope, block: suspend (T) -> Boolean) = emitter.flow.collectUntil(scope, block)

}

The Collector class is simply a scoped class that privately wraps the emitter and makes use of extensions that I added directly to the MutableSharedFlow for convenience.

fun <T:Any> MutableSharedFlow<T>.collectOnce(scope: CoroutineScope, block: suspend (T) -> Unit) = scope.launch {
firstOrNull {
if(isActive) block(it)
true
}
}

fun <T:Any> MutableSharedFlow<T>.collectForever(scope: CoroutineScope, block: suspend (T) -> Unit) = scope.launch {
firstOrNull {
if (isActive) {
block(it)
false
} else {
true
}
}
}

fun <T:Any> MutableSharedFlow<T>.collectUntil(scope: CoroutineScope, block: suspend (T) -> Boolean) = scope.launch {
firstOrNull {
if (isActive) {
block(it)
} else {
true
}
}
}

These handy extensions allow me to improve readability. I don’t need to remember the meanings of the returned boolean (true = cancel the listener).

How to use it

Now, each time I need to publish some values and make them listenable, I just need to do this:

class CustomClass {

private val notifier = Notifier.Emitter<Something>()
val collect get() = notifier.createCollector

fun doSomeWork1() {
// ... something that's take time and done in background
notifier.tryEmit(Something())
}

suspend fun doSomeWork2() {
// ... something that's take time and done inside coroutine scope
notifier.emit(Something())
}

}

val myCustomClass = remember { CustomClass() }
val coroutineScope = rememberCoroutineScope()
LaunchEffet(Unit) {
with(myCustomClass.collect) {
once(coroutineScope) {

// receive only one value then this listener is never called again

}
forever(coroutineScope) {

// receive all the value as long as the coroutineScope is active

}
until(coroutineScope) {

// no idea how long you will receive values

return egg cameBefore chicken
}
}
}

That’s a simple way to remove RxJava from your project and transition into the Flow world. I’m using this class in this story about transition animation.

4. Source code and final thoughts

So you’ll find the source code in this repository:

tezov.medium.adr.navigation_animation

This repository corresponds to a story about navigation animation. While I haven’t written the story yet, as soon as it’s done, I’ll return to provide a link for reference.

Don’t hesitate to comment or request more information. If you find this story helpful, consider following or subscribing to the newsletter. Thanks.

I’ll gladly also accept any support to help me to write more content.

--

--

Born with a C real-time system background. Nowadays, i'm a fanatic Android developer and Swift lover.