Project Reactor

How does limitRate work in Reactor

Project Reactor is a great reactive streams project that you will probably run into when you want to write reactive code in Spring. It is very powerful and can also be complex to wrap your head around. In this article I will look at the limitRate function of a Flux.

The first time I ran into limitRate I thought it would help in limiting/throttling the amount of events flowing downstream. And according to the documentation this is the case:

Ensure that backpressure signals from downstream subscribers are split into batches capped at the provided prefetchRate when propagated upstream, effectively rate limiting the upstream Publisher.

This means that limitRate will split big requests from downstream into smaller requests. It also states that this is effectively rate limiting the publisher.

Typically used for scenarios where consumer(s) request a large amount of data (eg. Long.MAX_VALUE) but the data source behaves better or can be optimized with smaller requests (eg. database paging, etc...). All data is still processed, unlike with limitRequest(long) which will cap the grand total request amount.

According to this documentation it will typically be useful when the requests to upstream is unlimited. The rate limiter can cut this up in smaller pieces. While there might be a usecase for this, I think it is far more useful for rate limiting the number of requests from downstream to upstream.

To many demand requests

Let's look at a scenario where we want to process messages from PubSub using Spring.


fun process(msg: AcknowledgeablePubsubMessage): Mono<String> = ...

pubSubReactiveFactory.poll("exampleSubscription", 1000 /* not important with limited demand*/)
  .flatMap(::process, 16)
  ...
  .subscribe()

In above sample, there will be an initial demand of 16 element going up to the source. The PubSubReactiveFactory will request 16 elements from PubSub and send them downstream. Whenever one of the workers in the flatMap is done, it will send a request(1) upstream. The pubSubReactiveFactory will request one element from PubSub. A fraction later, another demand may reach the source and it needs to do an extra call to pubsub to get 1 extra element. The pipeline is effectively transformed such that it will pull message per message from PubSub. Message handling time is pull latency + processing time. Doing a request for just 1 element is very wasteful, certainly when processing time is well within the deadline bounds and having a buffer makes sense.

Limiting number of demand requests

Best way to minimize the impact of pulling messages from a source is make sure we pull more than 1 message per request. This is exactly what limitRate can do. It limits the number of demand requests to the source by grouping them together. Internally, limitRate has a buffer from which it can feed the consumers downstream, while making sure to fill the buffer in time, by requesting elements from the source. By default, in time means when the buffer is 75% depleted.

When limitRate(100) is used, it will first demand 100 elements from the source, to fill the buffer. The moment elements arrive, the limitRate can send them downstream as long as there is demand. When the buffer only has 25 elements left (75% depleted), it will request elements 75 elements request(75) from the source to fill the buffer.

This makes sure the source can emit batches of events, making the latency overhead much less of an issue. The limitRate function is then more of a performance increaser than a throttler.

Example

Let's create an example to show the impact of limitRate. The source in this example can have unlimited outstanding requests and will add a 200ms latency to getting the elements that are requested. Processing take somewhere between 10-15ms.

val start = Instant.now()
val job = Flux.create<Int> { sink ->
  sink.onRequest { demand ->
    scheduler.schedule({
      repeat(demand.toInt()) {
        sink.next(nextInt())
      }
    }, 200, TimeUnit.MILLISECONDS)
  }
}
  .log("demandflow", Level.INFO, SignalType.REQUEST)
  .limitRate(100)
  .flatMap({ nr ->
    Mono.fromCallable { nr.toString() }.delayElement(Duration.ofMillis(nextLong(10, 15)))
  }, 16)
  .subscribeOn(Schedulers.parallel())
  .take(1000)
  .doOnComplete {
    println("Time: ${Duration.between(start, Instant.now())}")
  }
  .subscribe()

Without limitRate

If we start the code above with the line limitRate(100) commented, we get the following result:

20:46:29.092 [parallel-1 ] INFO  demandflow - request(16)
20:46:29.367 [parallel-3 ] INFO  demandflow - request(1)
20:46:29.367 [parallel-8 ] INFO  demandflow - request(1)
20:46:29.368 [parallel-9 ] INFO  demandflow - request(1)
20:46:29.369 [parallel-1 ] INFO  demandflow - request(1)
20:46:29.369 [parallel-1 ] INFO  demandflow - request(1)
20:46:29.370 [parallel-10] INFO  demandflow - request(3)
20:46:29.370 [parallel-10] INFO  demandflow - request(1)
20:46:29.371 [parallel-2 ] INFO  demandflow - request(1)
20:46:29.371 [parallel-2 ] INFO  demandflow - request(1)
20:46:29.371 [parallel-2 ] INFO  demandflow - request(1)
...
20:46:42.551 [parallel-7 ] INFO  demandflow - request(1)
20:46:42.561 [parallel-10] INFO  demandflow - request(1)
20:46:42.732 [parallel-2 ] INFO  demandflow - request(1)
20:46:42.733 [parallel-3 ] INFO  demandflow - request(1)
20:46:42.735 [parallel-6 ] INFO  demandflow - request(1)
20:46:42.736 [parallel-4 ] INFO  demandflow - request(1)
20:46:42.736 [parallel-5 ] INFO  demandflow - request(1)
20:46:42.737 [parallel-7 ] INFO  demandflow - request(1)
20:46:42.739 [parallel-8 ] INFO  demandflow - request(1)

Time: PT13.752124S

After the first 16 elements that were demanded, it wil request mostly 1 at a time. Sometimes multiple request are bundled together. As you can see, processing this took over 13s. When ran with the limitRate(100) enable we have a completely different result:

20:49:55.068 [parallel-1 ] INFO  demandflow - request(100)
20:49:55.407 [parallel-7 ] INFO  demandflow - request(75)
20:49:55.644 [parallel-4 ] INFO  demandflow - request(75)
20:49:55.884 [parallel-9 ] INFO  demandflow - request(75)
20:49:56.125 [parallel-3 ] INFO  demandflow - request(75)
20:49:56.362 [parallel-8 ] INFO  demandflow - request(75)
20:49:56.601 [parallel-12] INFO  demandflow - request(75)
20:49:56.843 [parallel-12] INFO  demandflow - request(75)
20:49:57.082 [parallel-8 ] INFO  demandflow - request(75)
20:49:57.320 [parallel-8 ] INFO  demandflow - request(75)
20:49:57.560 [parallel-8 ] INFO  demandflow - request(75)
20:49:57.794 [parallel-5 ] INFO  demandflow - request(75)
20:49:58.034 [parallel-9 ] INFO  demandflow - request(75)
20:49:58.270 [parallel-3 ] INFO  demandflow - request(75)

Time: PT3.273889S

The first request is 100 to fill the initial buffer and the every so often we'll see a request for 75 elements to fill the buffer. With this configuration the processing took only a bit over 3 seconds. The impact of the 200ms latency is now minimized by requesting batches of elements.

Conclusion

The limitRate function is very useful to limit the number of demand requests flowing upstream. Instead of limiting the number of messages that can be processed by the pipeline, it actually greatly improves the performance. This function has helped me a lot to improve the performance of processing pipelines subscribing to a PubSub source.

Project Reactor

How to use groupBy in Reactor

How to use groupBy in Reactor

Project Reactor is a great reactive streams project that you will probably run into when you want to write reactive code in Spring. It is very powerful and can also be complex to wrap your head around. In this article I will look at the groupBy function of a Flux.

groupBy

The groupBy function will split the current flux into multiple fluxes. See it like a router. Based on a function you specify it will route the message to one of the groups. For example, when you you have a stream of numbers and perform intFlux.groupBy { it % 2 == 0 } , it will cut the flux in 2 fluxes. One will have a stream of even numbers and the other will have a stream with odd numbers. The resulting type of this groupBy is Flux<GroupedFlux<Boolean, Int>>. The outer flux is actually a finite stream of 2 GroupedFlux<Boolean, Int> elements. If the source on which the groupBy was applied was infinite, the 2 GroupedFlux objects are also infinite.

Processing the groups

Given the above example, there are 2 groups in a flux. Now we can write the logic to be performed on each group. Each GroupedFlux can be treated like a regular flux, but with an extra function: key(). This key function will return the result of the grouping function for all elements in this group. So in our example true for all the even numbers.

There is one little detail which is quite important. We need to make sure that we subscribe to all groups. This sounds trivial, but because it is part of a stream this could easily go wrong.

Let's work with another example in which we divide the numbers in 10 groups: intFlux.groupBy { it % 10 }. Each group function will just count how many numbers came through. This is what the countNumbers function does with the help of the increment function:

val countOccurrences = ConcurrentHashMap<Int, Long>()  

fun increment(group: Int) = countOccurrences.compute(group) { _, k -> (k ?: 0) + 1 }  

fun countNumbers(group: GroupedFlux<Int, Int>): Flux<Int> =  
    group.doOnNext { increment(group.key()) }

The countNumbers function has to be wired together in the flux with the groupBy:

Flux.generate<Int> { it.next(emitCounter.incrementAndGet()) }  
 .groupBy { it % 10 }  
 .flatMap(::countNumbers)  
 .subscribeOn(Schedulers.parallel())  
 .subscribe()

Simple enough isn't it. This works and when we inspect the countOccurrences every so often we would see something like:

nrs emited: 6324660 Occurrences per group: 0: 634584, 1: 634802, 2: 634804, 3: 634804, 4: 634805, 5: 634805, 6: 634805, 7: 634805, 8: 634806, 9: 634806
nrs emited: 13912044 Occurrences per group: 0: 1391214, 1: 1391220, 2: 1391221, 3: 1391221, 4: 1391222, 5: 1391222, 6: 1391222, 7: 1391222, 8: 1391223, 9: 1391223
nrs emited: 22109057 Occurrences per group: 0: 2210915, 1: 2210921, 2: 2210935, 3: 2210936, 4: 2210936, 5: 2210937, 6: 2210964, 7: 2210966, 8: 2210966, 9: 2210967
nrs emited: 30416867 Occurrences per group: 0: 3041697, 1: 3041703, 2: 3041704, 3: 3041704, 4: 3041704, 5: 3041704, 6: 3041704, 7: 3041705, 8: 3041705, 9: 3041705
nrs emited: 38748273 Occurrences per group: 0: 3874837, 1: 3874843, 2: 3874844, 3: 3874844, 4: 3874844, 5: 3874844, 6: 3874844, 7: 3874845, 8: 3874845, 9: 3874845
nrs emited: 47157048 Occurrences per group: 0: 4715713, 1: 4715719, 2: 4715720, 3: 4715720, 4: 4715720, 5: 4715720, 6: 4715720, 7: 4715720, 8: 4715721, 9: 4715721
nrs emited: 55470463 Occurrences per group: 0: 5547095, 1: 5547106, 2: 5547107, 3: 5547120, 4: 5547121, 5: 5547121, 6: 5547122, 7: 5547122, 8: 5547122, 9: 5547122
nrs emited: 62455436 Occurrences per group: 0: 6245552, 1: 6245557, 2: 6245557, 3: 6245558, 4: 6245558, 5: 6245558, 6: 6245558, 7: 6245558, 8: 6245558, 9: 6245559
nrs emited: 69543352 Occurrences per group: 0: 6954345, 1: 6954351, 2: 6954351, 3: 6954351, 4: 6954351, 5: 6954352, 6: 6954352, 7: 6954352, 8: 6954352, 9: 6954352

The elements are nicely distributed over the groups. Notice that we did not specify an explicit concurrency on the flatMap. If it is left out it will default to Queues.SMALL_BUFFER_SIZE, which is 256 (unless configured differently). The groupBy made it such that we only have a limited amount of groups and as long as the number of groups stay below 256, this will work perfectly.

Let's look at what will happen when we tune the concurrency to be lower than the number of groups:

Flux.generate<Int> { it.next(emitCounter.incrementAndGet()) }  
 .groupBy { it % 10 }  
 .flatMap(::countNumbers, 9)  
 .subscribeOn(Schedulers.parallel())  
 .subscribe()

The resulting output is:

nrs emitted: 2560 Occurrences per group: 1: 256, 2: 256, 3: 256, 4: 256, 5: 256, 6: 256, 7: 256, 8: 256, 9: 256
nrs emitted: 2560 Occurrences per group: 1: 256, 2: 256, 3: 256, 4: 256, 5: 256, 6: 256, 7: 256, 8: 256, 9: 256
nrs emitted: 2560 Occurrences per group: 1: 256, 2: 256, 3: 256, 4: 256, 5: 256, 6: 256, 7: 256, 8: 256, 9: 256

This will continue forever without any progress. The problem is that we have 10 groups, but only 9 workers. Each worker consumes 1 GroupedFlux, which means that there will be 1 group remaining without a worker. But why does the stream get stuck?

No more demand

To understand why the stream grinds to a halt we should look at the demand. You can read more about it in my blog "Debugging demand in Reactor". After adding the log statements:

fun countNumbers(group: GroupedFlux<Key, Int>): Flux<Int> =  
    group  
 .log("countNumbers", Level.INFO, SignalType.REQUEST, SignalType.ON_SUBSCRIBE, SignalType.ON_NEXT, SignalType.ON_NEXT)  
 .doOnNext { increment(group.key()) }

Flux.generate<Int> { it.next(emitCounter.incrementAndGet()) }  
 .log("groupBy", Level.INFO, SignalType.REQUEST, SignalType.ON_SUBSCRIBE, SignalType.ON_NEXT)
 .groupBy { it % 10 }
 .log("flatMap", Level.INFO, SignalType.REQUEST, SignalType.ON_SUBSCRIBE, SignalType.ON_NEXT)
 .flatMap(::countNumbers, 9)
 .subscribeOn(Schedulers.parallel())  
 .subscribe()

The resulting output is like this:

[groupBy] - onSubscribe([Fuseable] FluxGenerate.GenerateSubscription)
[flatMap] - onSubscribe([Fuseable] FluxGroupBy.GroupByMain)
[subscribe] - onSubscribe(FluxFlatMap.FlatMapMain)
[subscribe] - request(unbounded)
[flatMap] - request(9)
[groupBy] - request(256)
[groupBy] - onNext(1)
[countNumbers-1] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-1] - request(32)
[countNumbers-1] - onNext(1)
[groupBy] - request(1)
[groupBy] - onNext(2)
[countNumbers-2] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-2] - request(32)
[countNumbers-2] - onNext(2)
[groupBy] - request(1)
[groupBy] - onNext(3)
[countNumbers-3] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-3] - request(32)
[countNumbers-3] - onNext(3)
[groupBy] - request(1)
[groupBy] - onNext(4)
[countNumbers-4] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-4] - request(32)
[countNumbers-4] - onNext(4)
[groupBy] - request(1)
[groupBy] - onNext(5)
[countNumbers-5] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-5] - request(32)
[countNumbers-5] - onNext(5)
[groupBy] - request(1)
[groupBy] - onNext(6)
[countNumbers-6] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-6] - request(32)
[countNumbers-6] - onNext(6)
[groupBy] - request(1)
[groupBy] - onNext(7)
[countNumbers-7] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-7] - request(32)
[countNumbers-7] - onNext(7)
[groupBy] - request(1)
[groupBy] - onNext(8)
[countNumbers-8] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-8] - request(32)
[countNumbers-8] - onNext(8)
[groupBy] - request(1)
[groupBy] - onNext(9)
[countNumbers-9] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-9] - request(32)
[countNumbers-9] - onNext(9)
[groupBy] - request(1)
[groupBy] - onNext(10)
[groupBy] - onNext(11)
[countNumbers-1] - onNext(11)
[groupBy] - request(1)
[groupBy] - onNext(12)
[countNumbers-2] - onNext(12)
[groupBy] - request(1)
[groupBy] - onNext(13)
[countNumbers-3] - onNext(13)
[groupBy] - request(1)
[groupBy] - onNext(14)
[countNumbers-4] - onNext(14)
[groupBy] - request(1)
[groupBy] - onNext(15)
[countNumbers-5] - onNext(15)
[groupBy] - request(1)
[groupBy] - onNext(16)
[countNumbers-6] - onNext(16)
[groupBy] - request(1)
[groupBy] - onNext(17)
[countNumbers-7] - onNext(17)
[groupBy] - request(1)
[groupBy] - onNext(18)
[countNumbers-8] - onNext(18)
[groupBy] - request(1)
[groupBy] - onNext(19)
...
[countNumbers-8] - onNext(468)
[groupBy] - request(1)
[groupBy] - onNext(469)
[countNumbers-9] - onNext(469)
[groupBy] - request(1)
[groupBy] - onNext(470)
[groupBy] - onNext(471)
[countNumbers-1] - onNext(471)
[countNumbers-1] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(472)
[countNumbers-2] - onNext(472)
[countNumbers-2] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(473)
[countNumbers-3] - onNext(473)
[countNumbers-3] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(474)
[countNumbers-4] - onNext(474)
[countNumbers-4] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(475)
[countNumbers-5] - onNext(475)
[countNumbers-5] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(476)
[countNumbers-6] - onNext(476)
[countNumbers-6] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(477)
[countNumbers-7] - onNext(477)
[countNumbers-7] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(478)
[countNumbers-8] - onNext(478)
[countNumbers-8] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(479)
[countNumbers-9] - onNext(479)
[countNumbers-9] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(480)
[groupBy] - onNext(481)
[countNumbers-1] - onNext(481)
[groupBy] - request(1)
[groupBy] - onNext(482)
[countNumbers-2] - onNext(482)
[groupBy] - request(1)
[groupBy] - onNext(483)
[countNumbers-3] - onNext(483)
[groupBy] - request(1)
[groupBy] - onNext(484)
[countNumbers-4] - onNext(484)
[groupBy] - request(1)
[groupBy] - onNext(485)
[countNumbers-5] - onNext(485)
...
[groupBy] - request(1)
[groupBy] - onNext(2558)
[countNumbers-8] - onNext(2558)
[groupBy] - request(1)
[groupBy] - onNext(2559)
[countNumbers-9] - onNext(2559)
[groupBy] - request(1)
[groupBy] - onNext(2560)
nrs emitted: 2560 Occurrences per group: 1: 256, 2: 256, 3: 256, 4: 256, 5: 256, 6: 256, 7: 256, 8: 256, 9: 256
nrs emitted: 2560 Occurrences per group: 1: 256, 2: 256, 3: 256, 4: 256, 5: 256, 6: 256, 7: 256, 8: 256, 9: 256
nrs emitted: 2560 Occurrences per group: 1: 256, 2: 256, 3: 256, 4: 256, 5: 256, 6: 256, 7: 256, 8: 256, 9: 256
nrs emitted: 2560 Occurrences per group: 1: 256, 2: 256, 3: 256, 4: 256, 5: 256, 6: 256, 7: 256, 8: 256, 9: 256

The logs give a lot of information about what is going on under the hood. At first the onSubscribe event that starts the Flux is passed along. Keep in mind that a Flux is nothing but a definition until you subscribe, "nothing happens until you subscribe". This is called a cold stream. When the subscribe reaches the last element in the stream, the demand will start flowing back.

The subscribe has not back-pressure and can handle everything, so it wil request an unbouded demand. The flatMap has a concurrency of 9, so it will send a demand upstream of 9. Note that this is a demand for 9 elements of type GroupedFlux<Int>, so we request 9 groups. The groupBy has the default behaviour to request a demand of 256 elements. This will reach the source and the source will start emitting 256 elements (if possible, which it is in this case). These 256 elements will be distributed over the 10 groups that are defined in the grouping function. The output above shows that the first time an element is emitted (onNext(1)) it will subscribe to the that group and we immediately see demand flowing.

This shows that the subscription to a GroupedFlux only happens once the first element for that group is available. We also see that the first element was dispatched to a group and immediately the groupBy will signal new demand upstream. This will happen 8 more times until we reach element 10 which would end up in a countNumbers-10, but we do not have a processor for that group. So it will stay in the groupBy, which has a 256 demand, but now 1 element cannot dispatched. Element 11 will be dispatched to group 1 again. Every subflux has a demand of 32 as we can see. The elements will be divided over the active 9 groups, but the elements that are for group 10 will get stuck.

When 3/4 of the demand for a group is fulfilled it will re-signal demand. This is the request(24). The groupBy with a buffer of 256 will continuously pass elements downstream when they are available. This will happen a few times until the groupBy has 256 elements for group 10 and needs to keep that. The groupBy indicated a demand of 256 and now all demand is filled with elements for group 10. There is no more demand and we have full back-pressure. Therefore, the pipeline is now stuck "waiting" for demand for the elements of group 10.

Conclusion

If you use the groupBy function in a Flux, you must make sure that there are enough subscribers in the flatMap, otherwise your stream will get stuck. To "Debugging demand in Reactor" the loggin functionality is really helpful. I learned a lot while I was writing this blog and got even more insight into the internals of Reactor.

Project Reactor

Debugging demand in Reactor

Debugging demand in Reactor

Project Reactor is a great reactive streams project that you will probably run into when you want to write reactive code in Spring. It is very powerful and can also be complex to wrap your head around. Something that can be confusing is how demand flows upstream and messages flow downstream.

Getting insight in flow of demand

In any Flux it is possible to show demand by using the log function on a flux. With this function you can specify what SignalType you want to be logged. Let's look at an example:

val counter = AtomicLong()  

fun process(nr: Long): Mono<Long> =  
    Mono.just(nr).delayElement(Duration.ofMillis(nextLong(1, 25)))  

Flux.generate<Long> { it.next(counter.incrementAndGet()) }  
 .log("beforeFlatmap", Level.INFO, SignalType.REQUEST)  
 .flatMap(::process)  
 .log("beforeTake", Level.INFO, SignalType.REQUEST)  
 .take(100)  
 .log("beforeSubscribe", Level.INFO, SignalType.REQUEST)  
 .subscribeOn(Schedulers.parallel())  
 .subscribe()  

Thread.sleep(4000)  
println("Counter: ${counter.get()}")

When run this will print:

13:43:15.197 [parallel-1] INFO beforeSubscribe - request(unbounded)
13:43:15.200 [parallel-1] INFO beforeTake - request(unbounded)
13:43:15.200 [parallel-1] INFO beforeFlatmap - | request(256)
13:43:15.251 [parallel-6] INFO beforeFlatmap - | request(1)
13:43:15.251 [parallel-6] INFO beforeFlatmap - | request(1)
13:43:15.251 [parallel-6] INFO beforeFlatmap - | request(1)
13:43:15.252 [parallel-6] INFO beforeFlatmap - | request(1)
13:43:15.252 [parallel-8] INFO beforeFlatmap - | request(1)
...
13:43:15.260 [parallel-2] INFO beforeFlatmap - | request(4)
13:43:15.260 [parallel-2] INFO beforeFlatmap - | request(12)
13:43:15.260 [parallel-2] INFO beforeFlatmap - | request(2)
13:43:15.261 [parallel-2] INFO beforeFlatmap - | request(6)
13:43:15.261 [parallel-2] INFO beforeFlatmap - | request(7)
13:43:15.261 [parallel-2] INFO beforeFlatmap - | request(3)
13:43:15.261 [parallel-2] INFO beforeFlatmap - | request(2)
13:43:15.262 [parallel-2] INFO beforeFlatmap - | request(3)
13:43:15.262 [parallel-2] INFO beforeFlatmap - | request(3)
13:43:15.262 [parallel-2] INFO beforeFlatmap - | request(1)
Counter: 350

The logs we showing the request is the demand flowing up (towards the source) and gives us insight in what happens with the demand. The first demand that is sent is when the stream is subscribed to. Remember, demand flows upstream, so in our code bottom to top. The subscribe function will always request an unbounded amount of events. Next we will reach the take function that doesn't change the demand and also sends unbounded demand. So up until this point we do not have any back pressure control. Or said differently, these function can keep up with anything upstream may send. Next we will hit the flatMap, with it's default concurrency (256). The flatMap changes the demand. There are only 256 workers, so it can only process 256 messages at this time. Therefore it signals a demand of 256. This demand will reach the source and the source can now emit 256 elements. When a task in the flatMap is done it will not encounter any back pressure, because the demand downstream is unbounded. This means, that when a task is done it can immediately emit the message and signal it has new demand, by requesting 1 extra message.

When 100 messages reached the take function the stream will be completed. However, in the end we see much more messages were submitted from the source, namely 350. This happens, because everything is happening at the same time. When a task in the flatMap is done, it will signal demand by requesting a new element. Therefore it can happen, that there are more messages emitted than the 100 requested.

Conclusion

Using the log on a Flux can greatly help in understanding what's going on under the covers. We've seen in above example that even in trivial flows it leads to interesting discoveries.