How to use groupBy in Reactor
Project Reactor is a great reactive streams project that you will probably run into when you want to write reactive code in Spring. It is very powerful and can also be complex to wrap your head around. In this article I will look at the groupBy function of a Flux.
groupBy
The groupBy
function will split the current flux into multiple fluxes. See it like a router. Based on a function you specify it will route the message to one of the groups. For example, when you you have a stream of numbers and perform intFlux.groupBy { it % 2 == 0 }
, it will cut the flux in 2 fluxes. One will have a stream of even numbers and the other will have a stream with odd numbers. The resulting type of this groupBy is Flux<GroupedFlux<Boolean, Int>>
. The outer flux is actually a finite stream of 2 GroupedFlux<Boolean, Int>
elements. If the source on which the groupBy
was applied was infinite, the 2 GroupedFlux
objects are also infinite.
Processing the groups
Given the above example, there are 2 groups in a flux. Now we can write the logic to be performed on each group. Each GroupedFlux
can be treated like a regular flux, but with an extra function: key()
. This key function will return the result of the grouping function for all elements in this group. So in our example true
for all the even numbers.
There is one little detail which is quite important. We need to make sure that we subscribe to all groups. This sounds trivial, but because it is part of a stream this could easily go wrong.
Let's work with another example in which we divide the numbers in 10 groups: intFlux.groupBy { it % 10 }
. Each group function will just count how many numbers came through. This is what the countNumbers
function does with the help of the increment
function:
val countOccurrences = ConcurrentHashMap<Int, Long>()
fun increment(group: Int) = countOccurrences.compute(group) { _, k -> (k ?: 0) + 1 }
fun countNumbers(group: GroupedFlux<Int, Int>): Flux<Int> =
group.doOnNext { increment(group.key()) }
The countNumbers
function has to be wired together in the flux with the groupBy
:
Flux.generate<Int> { it.next(emitCounter.incrementAndGet()) }
.groupBy { it % 10 }
.flatMap(::countNumbers)
.subscribeOn(Schedulers.parallel())
.subscribe()
Simple enough isn't it. This works and when we inspect the countOccurrences
every so often we would see something like:
nrs emited: 6324660 Occurrences per group: 0: 634584, 1: 634802, 2: 634804, 3: 634804, 4: 634805, 5: 634805, 6: 634805, 7: 634805, 8: 634806, 9: 634806
nrs emited: 13912044 Occurrences per group: 0: 1391214, 1: 1391220, 2: 1391221, 3: 1391221, 4: 1391222, 5: 1391222, 6: 1391222, 7: 1391222, 8: 1391223, 9: 1391223
nrs emited: 22109057 Occurrences per group: 0: 2210915, 1: 2210921, 2: 2210935, 3: 2210936, 4: 2210936, 5: 2210937, 6: 2210964, 7: 2210966, 8: 2210966, 9: 2210967
nrs emited: 30416867 Occurrences per group: 0: 3041697, 1: 3041703, 2: 3041704, 3: 3041704, 4: 3041704, 5: 3041704, 6: 3041704, 7: 3041705, 8: 3041705, 9: 3041705
nrs emited: 38748273 Occurrences per group: 0: 3874837, 1: 3874843, 2: 3874844, 3: 3874844, 4: 3874844, 5: 3874844, 6: 3874844, 7: 3874845, 8: 3874845, 9: 3874845
nrs emited: 47157048 Occurrences per group: 0: 4715713, 1: 4715719, 2: 4715720, 3: 4715720, 4: 4715720, 5: 4715720, 6: 4715720, 7: 4715720, 8: 4715721, 9: 4715721
nrs emited: 55470463 Occurrences per group: 0: 5547095, 1: 5547106, 2: 5547107, 3: 5547120, 4: 5547121, 5: 5547121, 6: 5547122, 7: 5547122, 8: 5547122, 9: 5547122
nrs emited: 62455436 Occurrences per group: 0: 6245552, 1: 6245557, 2: 6245557, 3: 6245558, 4: 6245558, 5: 6245558, 6: 6245558, 7: 6245558, 8: 6245558, 9: 6245559
nrs emited: 69543352 Occurrences per group: 0: 6954345, 1: 6954351, 2: 6954351, 3: 6954351, 4: 6954351, 5: 6954352, 6: 6954352, 7: 6954352, 8: 6954352, 9: 6954352
The elements are nicely distributed over the groups. Notice that we did not specify an explicit concurrency on the flatMap
. If it is left out it will default to Queues.SMALL_BUFFER_SIZE
, which is 256 (unless configured differently). The groupBy
made it such that we only have a limited amount of groups and as long as the number of groups stay below 256, this will work perfectly.
Let's look at what will happen when we tune the concurrency to be lower than the number of groups:
Flux.generate<Int> { it.next(emitCounter.incrementAndGet()) }
.groupBy { it % 10 }
.flatMap(::countNumbers, 9)
.subscribeOn(Schedulers.parallel())
.subscribe()
The resulting output is:
nrs emitted: 2560 Occurrences per group: 1: 256, 2: 256, 3: 256, 4: 256, 5: 256, 6: 256, 7: 256, 8: 256, 9: 256
nrs emitted: 2560 Occurrences per group: 1: 256, 2: 256, 3: 256, 4: 256, 5: 256, 6: 256, 7: 256, 8: 256, 9: 256
nrs emitted: 2560 Occurrences per group: 1: 256, 2: 256, 3: 256, 4: 256, 5: 256, 6: 256, 7: 256, 8: 256, 9: 256
This will continue forever without any progress. The problem is that we have 10 groups, but only 9 workers. Each worker consumes 1 GroupedFlux
, which means that there will be 1 group remaining without a worker. But why does the stream get stuck?
No more demand
To understand why the stream grinds to a halt we should look at the demand. You can read more about it in my blog "Debugging demand in Reactor". After adding the log statements:
fun countNumbers(group: GroupedFlux<Key, Int>): Flux<Int> =
group
.log("countNumbers", Level.INFO, SignalType.REQUEST, SignalType.ON_SUBSCRIBE, SignalType.ON_NEXT, SignalType.ON_NEXT)
.doOnNext { increment(group.key()) }
Flux.generate<Int> { it.next(emitCounter.incrementAndGet()) }
.log("groupBy", Level.INFO, SignalType.REQUEST, SignalType.ON_SUBSCRIBE, SignalType.ON_NEXT)
.groupBy { it % 10 }
.log("flatMap", Level.INFO, SignalType.REQUEST, SignalType.ON_SUBSCRIBE, SignalType.ON_NEXT)
.flatMap(::countNumbers, 9)
.subscribeOn(Schedulers.parallel())
.subscribe()
The resulting output is like this:
[groupBy] - onSubscribe([Fuseable] FluxGenerate.GenerateSubscription)
[flatMap] - onSubscribe([Fuseable] FluxGroupBy.GroupByMain)
[subscribe] - onSubscribe(FluxFlatMap.FlatMapMain)
[subscribe] - request(unbounded)
[flatMap] - request(9)
[groupBy] - request(256)
[groupBy] - onNext(1)
[countNumbers-1] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-1] - request(32)
[countNumbers-1] - onNext(1)
[groupBy] - request(1)
[groupBy] - onNext(2)
[countNumbers-2] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-2] - request(32)
[countNumbers-2] - onNext(2)
[groupBy] - request(1)
[groupBy] - onNext(3)
[countNumbers-3] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-3] - request(32)
[countNumbers-3] - onNext(3)
[groupBy] - request(1)
[groupBy] - onNext(4)
[countNumbers-4] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-4] - request(32)
[countNumbers-4] - onNext(4)
[groupBy] - request(1)
[groupBy] - onNext(5)
[countNumbers-5] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-5] - request(32)
[countNumbers-5] - onNext(5)
[groupBy] - request(1)
[groupBy] - onNext(6)
[countNumbers-6] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-6] - request(32)
[countNumbers-6] - onNext(6)
[groupBy] - request(1)
[groupBy] - onNext(7)
[countNumbers-7] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-7] - request(32)
[countNumbers-7] - onNext(7)
[groupBy] - request(1)
[groupBy] - onNext(8)
[countNumbers-8] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-8] - request(32)
[countNumbers-8] - onNext(8)
[groupBy] - request(1)
[groupBy] - onNext(9)
[countNumbers-9] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-9] - request(32)
[countNumbers-9] - onNext(9)
[groupBy] - request(1)
[groupBy] - onNext(10)
[groupBy] - onNext(11)
[countNumbers-1] - onNext(11)
[groupBy] - request(1)
[groupBy] - onNext(12)
[countNumbers-2] - onNext(12)
[groupBy] - request(1)
[groupBy] - onNext(13)
[countNumbers-3] - onNext(13)
[groupBy] - request(1)
[groupBy] - onNext(14)
[countNumbers-4] - onNext(14)
[groupBy] - request(1)
[groupBy] - onNext(15)
[countNumbers-5] - onNext(15)
[groupBy] - request(1)
[groupBy] - onNext(16)
[countNumbers-6] - onNext(16)
[groupBy] - request(1)
[groupBy] - onNext(17)
[countNumbers-7] - onNext(17)
[groupBy] - request(1)
[groupBy] - onNext(18)
[countNumbers-8] - onNext(18)
[groupBy] - request(1)
[groupBy] - onNext(19)
...
[countNumbers-8] - onNext(468)
[groupBy] - request(1)
[groupBy] - onNext(469)
[countNumbers-9] - onNext(469)
[groupBy] - request(1)
[groupBy] - onNext(470)
[groupBy] - onNext(471)
[countNumbers-1] - onNext(471)
[countNumbers-1] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(472)
[countNumbers-2] - onNext(472)
[countNumbers-2] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(473)
[countNumbers-3] - onNext(473)
[countNumbers-3] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(474)
[countNumbers-4] - onNext(474)
[countNumbers-4] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(475)
[countNumbers-5] - onNext(475)
[countNumbers-5] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(476)
[countNumbers-6] - onNext(476)
[countNumbers-6] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(477)
[countNumbers-7] - onNext(477)
[countNumbers-7] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(478)
[countNumbers-8] - onNext(478)
[countNumbers-8] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(479)
[countNumbers-9] - onNext(479)
[countNumbers-9] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(480)
[groupBy] - onNext(481)
[countNumbers-1] - onNext(481)
[groupBy] - request(1)
[groupBy] - onNext(482)
[countNumbers-2] - onNext(482)
[groupBy] - request(1)
[groupBy] - onNext(483)
[countNumbers-3] - onNext(483)
[groupBy] - request(1)
[groupBy] - onNext(484)
[countNumbers-4] - onNext(484)
[groupBy] - request(1)
[groupBy] - onNext(485)
[countNumbers-5] - onNext(485)
...
[groupBy] - request(1)
[groupBy] - onNext(2558)
[countNumbers-8] - onNext(2558)
[groupBy] - request(1)
[groupBy] - onNext(2559)
[countNumbers-9] - onNext(2559)
[groupBy] - request(1)
[groupBy] - onNext(2560)
nrs emitted: 2560 Occurrences per group: 1: 256, 2: 256, 3: 256, 4: 256, 5: 256, 6: 256, 7: 256, 8: 256, 9: 256
nrs emitted: 2560 Occurrences per group: 1: 256, 2: 256, 3: 256, 4: 256, 5: 256, 6: 256, 7: 256, 8: 256, 9: 256
nrs emitted: 2560 Occurrences per group: 1: 256, 2: 256, 3: 256, 4: 256, 5: 256, 6: 256, 7: 256, 8: 256, 9: 256
nrs emitted: 2560 Occurrences per group: 1: 256, 2: 256, 3: 256, 4: 256, 5: 256, 6: 256, 7: 256, 8: 256, 9: 256
The logs give a lot of information about what is going on under the hood. At first the onSubscribe
event that starts the Flux
is passed along. Keep in mind that a Flux
is nothing but a definition until you subscribe, "nothing happens until you subscribe". This is called a cold stream. When the subscribe reaches the last element in the stream, the demand will start flowing back.
The subscribe
has not back-pressure and can handle everything, so it wil request an unbouded
demand. The flatMap
has a concurrency of 9, so it will send a demand upstream of 9. Note that this is a demand for 9 elements of type GroupedFlux<Int>
, so we request 9 groups. The groupBy
has the default behaviour to request a demand of 256 elements. This will reach the source and the source will start emitting 256 elements (if possible, which it is in this case). These 256 elements will be distributed over the 10 groups that are defined in the grouping function. The output above shows that the first time an element is emitted (onNext(1)
) it will subscribe to the that group and we immediately see demand flowing.
This shows that the subscription to a GroupedFlux
only happens once the first element for that group is available. We also see that the first element was dispatched to a group and immediately the groupBy
will signal new demand upstream. This will happen 8 more times until we reach element 10 which would end up in a countNumbers-10
, but we do not have a processor for that group. So it will stay in the groupBy, which has a 256 demand, but now 1 element cannot dispatched. Element 11 will be dispatched to group 1 again. Every subflux has a demand of 32 as we can see. The elements will be divided over the active 9 groups, but the elements that are for group 10 will get stuck.
When 3/4 of the demand for a group is fulfilled it will re-signal demand. This is the request(24)
. The groupBy
with a buffer of 256
will continuously pass elements downstream when they are available. This will happen a few times until the groupBy
has 256 elements for group 10 and needs to keep that. The groupBy
indicated a demand of 256 and now all demand is filled with elements for group 10. There is no more demand and we have full back-pressure. Therefore, the pipeline is now stuck "waiting" for demand for the elements of group 10.
Conclusion
If you use the groupBy
function in a Flux
, you must make sure that there are enough subscribers in the flatMap
, otherwise your stream will get stuck. To "Debugging demand in Reactor" the loggin functionality is really helpful. I learned a lot while I was writing this blog and got even more insight into the internals of Reactor.