Unraveling the Code: Kotlin’s Edge Over Java Streams

Unraveling the Code: Kotlin's Edge Over Java Streams

This blog is inspired by the Devoxx talk titled "If Streams Are So Great, Let's Use Them Everywhere... Right??" by Maurice Naftalin and José Paumard. You can watch the full talk here on YouTube. In the talk, Maurice and José explore various examples that highlight the strengths of Java Streams, but also demonstrate how they can become overly complex and verbose in certain situations. In this blog, we will explore these examples and see how we can implement these snippets in Kotlin. Will the Kotlin code be easier, or do we run into the same complexity as with Java?

In this blog, we are going to explore the following examples:

  1. Finding the First Word Longer Than Three Characters
  2. Finding a Word of Length 3 with Its Index
  3. Creating the Cross Product of Two Ranges
  4. Grouping Cities by Country
  5. Finding the Country with the Least Number of Cities
  6. Finding All Countries with the Minimum Number of Cities
  7. Reading and Processing Temperature Data from a File

After examining these examples, we'll wrap up with a conclusion summarizing our findings.

Example 1: Finding the First Word Longer Than Three Characters

Java Code Examples

Classical Java Looping

Let's start with a Java snippet that splits a line by spaces and returns the first word longer than three characters using classical Java looping:

String splitLoop(String line) {
     var pattern = Pattern.compile(" ");
     var words = pattern.split(line);
     for (var word : words) {
         if (word.length() > 3) {
             return word;
         }
     }
     throw new NoSuchElementException("No word longer than 3 characters found");
}

This snippet demonstrates the traditional imperative approach in Java. It's straightforward but involves several steps: compiling a pattern, splitting the string, looping through the results, and manually throwing an exception if no match is found.

Java Streams Version

With Java Streams, we can be more expressive and concise. Here's the same functionality implemented using Streams:

String splitStream(String line) {
    var pattern = Pattern.compile(" ");
    return pattern.splitAsStream(line)
            .filter(word -> word.length() > 3)
            .findFirst()
            .orElseThrow();
}

The Streams version is more declarative, clearly stating what we want to achieve rather than how to do it step-by-step.

Kotlin Implementation

Now, let's see how we can implement the same functionality in Kotlin:

fun splitKotlin(line: String): String {
    return line.split(" ")
        .first { it.length > 3 }
}

Analysis

The Kotlin version demonstrates a more powerful and concise approach compared to both Java implementations. The key to its effectiveness lies in the first function, which accepts a lambda to specify precisely what we're looking for. It's worth noting, however, that while this approach is more elegant, the NoSuchElementException that would be thrown if no matching word is found is implicit here, unlike the Java versions where the exception handling is more explicit.

Example 2: Finding a Word of Length 3 with Its Index

For our next example, we'll try to find a word with exactly three characters and return both the word and its index in the original string. This adds a layer of complexity to our previous example.

Java Implementation

In Java, we'll use a record to represent our result:

record IndexWord(int index, String value) { }

Classical Java Looping

Here's how we might implement this using a traditional loop:

IndexWord splitLoop(String line) {
    var pattern = Pattern.compile(" ");
    var words = pattern.split(line);
    for (int index = 0; index < words.length; index++) {
        if (words[index].length() == 3) {
            return new IndexWord(index, words[index]);
        }
    }
    throw new NoSuchElementException("Not found");
}

This implementation is straightforward, but requires the reader to reason through all paths. The early return inside the loop is crucial to understanding the function's behavior.

Java Streams Version

To accomplish the same task using Streams, we need a way to index the elements. We can use IntStream for this purpose:

IndexWord splitLoop(String line) {
    var pattern = Pattern.compile(" ");
    var words = pattern.split(line);
    return IntStream.range(0, words.length)
            .filter(index -> words[index].length() == 3)
            .mapToObj(index -> new IndexWord(index, words[index]))
            .findFirst()
            .orElseThrow();
}

The Java Streams version leverages the groupingBy collector, which is specifically designed for such grouping operations. While it's a powerful tool, the syntax can be somewhat convoluted, especially for developers new to Streams. The nesting of collectors (groupingBy and counting) may not be immediately intuitive.

Kotlin Implementation

In Kotlin, we use a data class instead of a record:

data class IndexWord(val index: Int, val value: String)

The implementation then becomes:

fun splitIndexStream(line: String): IndexWord =
    line.split(" ")
        .withIndex()
        .map { (index, value) -> IndexWord(index, value) }
        .first { it.value.length == 3 }

This Kotlin implementation showcases the power and readability of Kotlin's standard library. The groupBy function, available on any list, allows for straightforward grouping operations. Following this, the mapValues call efficiently counts the items in each group. This approach combines the declarative style seen in Streams with Kotlin's more intuitive syntax, resulting in a concise and easily understandable solution.

Analysis

This example showcases how different approaches handle grouping operations, a common task in data processing.

Kotlin's implementation stands out for its simplicity and expressiveness. It achieves the grouping and counting in a single, easily readable line of code, without the need for specialized collectors or explicit mutation of a map. This example further demonstrates how Kotlin's design choices and rich standard library can lead to more intuitive and concise code, especially for common operations like grouping and counting.

As we continue to explore these examples, we see a consistent pattern: Kotlin often provides a balance between the clarity of imperative code and the power of functional operations, resulting in solutions that are both expressive and easy to understand.

Example 3: Creating the Cross Product of Two Ranges

For our next example, we'll create the cross product of two ranges, specifically for the range 0 to 3. This example demonstrates how different approaches handle nested operations.

Java Implementations

Imperative Java Solution

Let's start with the imperative Java solution:

var resultLoop = new ArrayList<Pair>();
for (int i = 0; i < 4; i++) {
    for (int j = 0; j < 4; j++) {
        resultLoop.add(new Pair(i, j));
    }
}

This imperative approach is straightforward and easily understandable for anyone familiar with Java. It uses nested loops to create all possible pairs of numbers from the given ranges.

Java Streams Version

Now, let's look at how we can achieve the same result using Java Streams:

var resultStream = IntStream.range(0, 4)
        .boxed()
        .flatMap(a -> IntStream.range(0, 4)
                .mapToObj(b -> new Pair(a, b)))
        .toList();

This Streams version aims to be more declarative, using flatMap to combine the results of the inner stream operations. The use of boxed() here is crucial. We need to use boxed() because we want to flatMap the inner stream into the outer stream, where the inner stream has a different type than the outer stream. Specifically, we're going from int to Pair. But because the outer stream is initially a stream of primitives, this direct mapping is not possible. With boxed(), we convert it to a Stream<Integer>, changing it from primitives to objects. This allows us to then map to other object types, such as Pair.

Kotlin Implementation

The Kotlin version looks similar to the Java Streams version, but with some notable simplifications:

val result = (0..3)
    .flatMap { i ->
        (0..3)
            .map { j -> Pair(i, j) }
    }

Analysis

This example showcases how different approaches handle more complex operations like creating a cross product. The imperative Java solution, while verbose, is straightforward and easily understood by most Java developers. It clearly shows the nested structure of the operation through its use of nested loops.

The Java Streams version attempts to make the operation more declarative, but introduces some complexity. A key point to note is the use of boxed() in this version. This method is necessary because we want to flatMap the inner stream into the outer stream, where the inner stream has a different type than the outer stream (from int to Pair). Since the outer stream is initially a stream of primitives (IntStream), this direct mapping is not possible. The boxed() method converts the IntStream to a Stream<Integer>, changing it from a stream of primitives to a stream of objects. This conversion allows us to then map to other object types, such as Pair. This necessity for explicit type handling adds a layer of complexity to the Java Streams version.

The Kotlin version strikes a balance between the declarative style of Streams and the simplicity of the imperative approach. It's visually similar to the Java Streams version, but with some key advantages. Kotlin's range operator .. is more concise than IntStream.range(). The Kotlin version also doesn't need boxed() as Kotlin handles the type conversion implicitly. Furthermore, the toList() call is unnecessary in Kotlin as the result is already a List.

While the Kotlin and Java Streams versions are quite similar in structure, the Kotlin version appears cleaner and more straightforward. It maintains the functional style and declarative nature of the Streams approach, but with less boilerplate and type juggling. This example demonstrates how Kotlin can offer the benefits of functional programming constructs while avoiding some of the verbosity that can creep into Java Streams code.

As we progress through these examples, we continue to see how Kotlin's design choices and standard library can lead to code that is both functional and readable, often simplifying operations that require more verbose handling in Java. Kotlin's ability to handle type conversions implicitly in such scenarios showcases its design philosophy of reducing boilerplate while maintaining type safety.

Example 4: Grouping Cities by Country

Our next example demonstrates how different approaches handle grouping operations. We'll group a list of cities by their country and count how many cities are in each country.

Java Implementations

Imperative Java Solution

Let's start with the imperative Java solution:

Map<Country, Long> cityCountPerCountry = new HashMap<>();
for (var city : Cities.cities) {
    cityCountPerCountry.merge(city.country(), 1L, Long::sum);
}

This imperative approach is clear and straightforward. It iterates through the list of cities, using the merge method of HashMap to count the occurrences of each country. The only potential downside is the need to mutate the map during the process.

Java Streams Version

Now, let's look at how we can achieve the same result using Java Streams:

Map<Country, Long> cityCountPerCountry =
        Cities.cities.stream()
                .collect(
                        Collectors.groupingBy(
                                City::country,
                                Collectors.counting()
                        )
                );

The Java Streams version leverages the groupingBy collector, which is specifically designed for such grouping operations. While it's a powerful tool, the syntax can be somewhat convoluted, especially for developers new to Streams. The nesting of collectors (groupingBy and counting) may not be immediately intuitive.

Kotlin Implementation

Here's how we can implement the same functionality in Kotlin:

val citiesSizeStream = cities.groupBy({ it.country }).mapValues { it.value.size }

This Kotlin implementation showcases the power and readability of Kotlin's standard library. The groupBy function, available on any list, allows for straightforward grouping operations. Following this, the mapValues call efficiently counts the items in each group. This approach combines the declarative style seen in Streams with Kotlin's more intuitive syntax, resulting in a concise and easily understandable solution.

Analysis

This example showcases how different approaches handle grouping operations, a common task in data processing.

Kotlin's implementation stands out for its simplicity and expressiveness. It achieves the grouping and counting in a single, easily readable line of code, without the need for specialized collectors or explicit mutation of a map. This example further demonstrates how Kotlin's design choices and rich standard library can lead to more intuitive and concise code, especially for common operations like grouping and counting.

As we continue to explore these examples, we see a consistent pattern: Kotlin often provides a balance between the clarity of imperative code and the power of functional operations, resulting in solutions that are both expressive and easy to understand.

Example 5: Finding the Country with the Least Number of Cities

Our next example demonstrates how to find the country with the least number of cities using different approaches.

Java Implementations

Java Collections Approach

Let's start with the Java Collections approach:

var result = Collections.min(cityCountPerCountry.entrySet(), Map.Entry.comparingByValue());

This solution is clear and concise. It directly uses the Collections.min() method with a custom comparator. While effective, this approach requires knowledge of specific utility methods in the Collections framework, which might not be immediately obvious to all developers.

Java Streams Version

Now, let's look at how we can achieve the same result using Java Streams:

var result = CitiesStream.getCountryLongMap().entrySet()
        .stream()
        .min(Map.Entry.comparingByValue())
        .orElseThrow();

The Streams version is more discoverable and arguably easier to understand. It clearly expresses the intent of finding the minimum value from the stream of map entries.

Kotlin Implementation

Here's how we can implement the same functionality in Kotlin:

val result = citiesSizeStream.minByOrNull { it.value }!!

This Kotlin implementation is even more concise. It directly uses the minByOrNull function on the map, specifying that we want to find the minimum based on the value of each entry. The !! operator is used here to assert that the result is non-null, though in production code, a safer null-handling approach might be preferred.

It's worth noting that we can apply the minByOrNull function immediately on the map without calling entrySet() first, as would be necessary in Java. This leads to simpler, more discoverable code during development, effectively removing an extra step that's required in the Java versions.

It's also interesting to note an inconsistency in Kotlin's standard library. While we used first in earlier examples, which throws a NoSuchElementException if the collection is empty, here we use minByOrNull. The min function is deprecated in favor of minByOrNull, even though firstOrNull is also available alongside first. This inconsistency in the API design is something to be aware of when working with Kotlin collections.

Analysis

This example highlights different approaches to finding a minimum value in a collection or map.

The Java Collections approach is succinct but requires specific knowledge of utility methods. The Java Streams version offers better discoverability and readability, clearly expressing the operation's intent.

Kotlin's implementation stands out for its brevity. It leverages Kotlin's extension functions on collections, allowing for a very concise expression of the desired operation. However, the inconsistency between first/firstOrNull and the deprecation of min in favor of minByOrNull shows that even well-designed languages can have quirks in their APIs.

These implementations demonstrate how different language features and standard library designs can affect the way we express common operations. While all three achieve the same result, they differ in terms of discoverability, conciseness, and the level of language-specific knowledge required.

Example 6: Finding All Countries with the Minimum Number of Cities

Our previous example had a limitation: it only found one country with the minimum number of cities, but there could be multiple countries with the same minimum. In this example, we'll address this by finding all countries that have the minimum number of cities.

Java Implementations

Imperative Java Approach

Let's start with the imperative Java approach:

var map = new TreeMap<Long, List<Country>>();
for (var countryCount : cityCountPerCountry.entrySet()) {
    // This initial value must be a mutable List, because we add data to it later.
        map.computeIfAbsent(countryCount.getValue(), _ -> new ArrayList<>()).add(countryCount.getKey());
}
var result = map.firstEntry();

This solution leverages a TreeMap, which keeps its entries sorted by key. We populate this map with the count of cities as the key and a list of countries as the value. The computeIfAbsent method is used to initialize a new list if needed and add the country to it. Finally, we retrieve the first entry, which corresponds to the minimum count.

While this code is relatively concise, it can be challenging to ensure it's bug-free due to the use of mutable collections. The logic, involving mutable lists and maps, may not be immediately clear at first glance.

Java Streams Version

Now, let's look at the Java Streams approach:

TreeMap<Long, List<Country>> countriesCountPerCity =
        cityCountPerCountry.entrySet()
                .stream()
                .collect(
                        Collectors.groupingBy(
                                Map.Entry::getValue,
                                TreeMap::new,
                                Collectors.mapping(
                                        Map.Entry::getKey,
                                        Collectors.toList()
                                )
                        )
                );
var result = countriesCountPerCity.firstEntry();

This Streams version uses a nested collector to group countries by their city count. While it achieves the desired result, the code is quite complex and not easily understandable at a glance. The use of nested collectors (groupingBy and mapping) makes this solution particularly challenging to write and comprehend, even for developers well-versed in Java Streams.

Kotlin Implementation

Here's how we can implement the same functionality in Kotlin:

val allMinCities = citiesSizeStream.entries
    .groupBy({ it.value }) { it.key }
    .minByOrNull { it.key }!!

The Kotlin implementation stands out for its simplicity and readability. It first groups the entries by their value (city count), transforming the values to be the country. Then it finds the entry with the minimum key (which represents the minimum city count). The result is a pair where the key is the minimum count and the value is a list of all countries with that count.

Analysis

This example highlights the stark differences between the approaches when dealing with a more complex data manipulation task. The Kotlin version stands out as the most simple and readable, leveraging the language's powerful standard library functions to express a complex operation in just three lines of easily understandable code. This demonstrates Kotlin's ability to maintain clarity and conciseness even as the complexity of the task increases.

Example 7: Reading and Processing Temperature Data from a File

Our final example demonstrates how to read a file containing temperature data, skip comments, and handle invalid data. The file format looks like this:

# temperatures
25.12
1.3
@@@@@@@@@@@@@@@@@
-3.2

Java Implementations

Imperative Java Approach

Let's start with the imperative Java approach:

static List<Float> readLoop(Path file) throws IOException {
    try (var reader = Files.newBufferedReader(file)) {
        var floats = new ArrayList<Float>();
        var line = reader.readLine();
        while (line != null) {
            if (!line.startsWith("#")) {
                try {
                    var f = Float.parseFloat(line);
                    floats.add(f);
                } catch (NumberFormatException _) {
                    // Ignoring invalid float lines
                }
            }
            line = reader.readLine();
        }
        return Collections.unmodifiableList(floats);
    }
}

This imperative approach handles multiple concerns:

  1. File opening and closing (using try-with-resources)
  2. Line-by-line reading
  3. Skipping comments
  4. Parsing valid floats and ignoring invalid ones
  5. Collecting results in a mutable list
  6. Returning an unmodifiable list

While functional, the code mixes business logic with technical details, making it harder to understand and maintain.

Java Streams Version 1

Now, let's look at a Java Streams approach:

static List<Float> readStreamV1(Path file) throws IOException {
    try (var lines = Files.lines(file)) {
        return lines
                .filter(line -> !line.startsWith("#"))
                .filter(line -> {
                    try {
                        var f = Float.parseFloat(line);
                        return true;
                    } catch (NumberFormatException _) {
                        return false;
                    }
                })
                .map(Float::parseFloat)
                .toList();
    }
}

This version is more readable, separating the concerns more clearly. However, it still requires try-with-resources for file handling and has a duplicated parsing step.

Java Streams Version 2

We can further improve the Streams version using mapMulti:

static List<Float> readStreamV2(Path file) throws IOException {
    try (var lines = Files.lines(file)) {
        return lines
                .filter(line -> !line.startsWith("#"))
                .<Float>mapMulti((line, downstream) -> {
                    try {
                        var f = Float.parseFloat(line);
                        downstream.accept(f);
                    } catch (NumberFormatException _) {
                        // Ignoring invalid float lines
                    }
                })
                .toList();
    }
}

This version eliminates the duplicate parsing but introduces the more complex mapMulti operation.

Kotlin Implementation

Here's how we can implement the same functionality in Kotlin:

fun readStreamKt(file: Path): List<Float> =
    file.useLines { lines ->
        lines
            .filterNot { it.startsWith("#") }
            .mapNotNull { it.toFloatOrNull() }
            .toList()
    }

The Kotlin implementation stands out for its simplicity and readability. It leverages Kotlin's standard library functions to express the complex operation in just a few lines of easily understandable code.

It's crucial to note that the .toList() call is inside the useLines block. This is very important because useLines returns a Sequence<String>, which is lazily evaluated. If we were to return the Sequence<Float> (by omitting .toList() or placing it outside useLines), and then try to use it after the useLines block has completed, we would get an exception as the underlying file stream would already be closed. By calling .toList() inside useLines, we ensure that all lines are processed and collected into a list while the file is still open.

Analysis

This example highlights the stark differences between the approaches when dealing with a complex file processing task involving multiple concerns.

The imperative Java version, while comprehensive, mixes different levels of abstraction, making it harder to understand and maintain. The Java Streams versions improve readability but still require explicit resource management and exception handling.

The Kotlin version shines in its simplicity and expressiveness. It uses useLines for automatic resource management, filterNot for clear intent in skipping comments, and mapNotNull with toFloatOrNull to elegantly handle parsing and invalid data. This approach separates concerns effectively and reduces boilerplate, resulting in code that's both concise and easy to understand.

This final example powerfully demonstrates Kotlin's ability to simplify complex operations through its thoughtful standard library design and language features, leading to more maintainable and readable code.

Conclusion

Throughout this exploration of various coding challenges, from simple string manipulations to complex file processing tasks, we've seen a consistent pattern emerge. Kotlin, in comparison to both imperative Java and Java Streams, consistently demonstrates a remarkable ability to simplify code while maintaining readability and functionality.

Key takeaways from our comparison:

  1. Simplicity: Kotlin code generally appears simpler to both read and write. The language's design and standard library functions often allow for more intuitive expressions of complex operations.

  2. Discoverability: Most, if not all, of the Kotlin APIs we used were easily discoverable through IDE autocompletion. This feature significantly enhances the developer experience, making it easier to explore and utilize the language's capabilities.

  3. Conciseness: Kotlin solutions were consistently shorter than their Java counterparts. This brevity allows developers to express complex operations in fewer lines of code, potentially reducing the chances of errors and improving maintainability.

  4. Readability: Despite being more concise, Kotlin code maintains, and often enhances, readability. The language's design choices and expressive syntax allow for code that clearly communicates intent.

  5. Powerful Standard Library: Kotlin's standard library provides a rich set of functions that make common programming tasks more straightforward. Functions like groupBy, mapNotNull, and useLines demonstrate how well-designed library functions can significantly simplify code.

  6. Balance: Kotlin seems to strike a good balance between the clarity of imperative code and the power of functional programming constructs, often resulting in solutions that combine the best of both worlds.

While Java, especially with the addition of Streams, has made significant strides in enabling more functional and expressive code, Kotlin appears to take this a step further. It offers a language design and standard library that consistently allow for cleaner, more intuitive solutions across a wide range of programming tasks. Notably, while working with Java often requires choosing between imperative and functional styles (as we've seen in cases where imperative code sometimes looks easier than the equivalent Streams version), Kotlin seems to eliminate this dilemma. In Kotlin, the most straightforward and readable solution often naturally combines both paradigms, removing the need for an explicit choice between styles.

Project Reactor

How does limitRate work in Reactor

Project Reactor is a great reactive streams project that you will probably run into when you want to write reactive code in Spring. It is very powerful and can also be complex to wrap your head around. In this article I will look at the limitRate function of a Flux.

The first time I ran into limitRate I thought it would help in limiting/throttling the amount of events flowing downstream. And according to the documentation this is the case:

Ensure that backpressure signals from downstream subscribers are split into batches capped at the provided prefetchRate when propagated upstream, effectively rate limiting the upstream Publisher.

This means that limitRate will split big requests from downstream into smaller requests. It also states that this is effectively rate limiting the publisher.

Typically used for scenarios where consumer(s) request a large amount of data (eg. Long.MAX_VALUE) but the data source behaves better or can be optimized with smaller requests (eg. database paging, etc...). All data is still processed, unlike with limitRequest(long) which will cap the grand total request amount.

According to this documentation it will typically be useful when the requests to upstream is unlimited. The rate limiter can cut this up in smaller pieces. While there might be a usecase for this, I think it is far more useful for rate limiting the number of requests from downstream to upstream.

To many demand requests

Let's look at a scenario where we want to process messages from PubSub using Spring.


fun process(msg: AcknowledgeablePubsubMessage): Mono<String> = ...

pubSubReactiveFactory.poll("exampleSubscription", 1000 /* not important with limited demand*/)
  .flatMap(::process, 16)
  ...
  .subscribe()

In above sample, there will be an initial demand of 16 element going up to the source. The PubSubReactiveFactory will request 16 elements from PubSub and send them downstream. Whenever one of the workers in the flatMap is done, it will send a request(1) upstream. The pubSubReactiveFactory will request one element from PubSub. A fraction later, another demand may reach the source and it needs to do an extra call to pubsub to get 1 extra element. The pipeline is effectively transformed such that it will pull message per message from PubSub. Message handling time is pull latency + processing time. Doing a request for just 1 element is very wasteful, certainly when processing time is well within the deadline bounds and having a buffer makes sense.

Limiting number of demand requests

Best way to minimize the impact of pulling messages from a source is make sure we pull more than 1 message per request. This is exactly what limitRate can do. It limits the number of demand requests to the source by grouping them together. Internally, limitRate has a buffer from which it can feed the consumers downstream, while making sure to fill the buffer in time, by requesting elements from the source. By default, in time means when the buffer is 75% depleted.

When limitRate(100) is used, it will first demand 100 elements from the source, to fill the buffer. The moment elements arrive, the limitRate can send them downstream as long as there is demand. When the buffer only has 25 elements left (75% depleted), it will request elements 75 elements request(75) from the source to fill the buffer.

This makes sure the source can emit batches of events, making the latency overhead much less of an issue. The limitRate function is then more of a performance increaser than a throttler.

Example

Let's create an example to show the impact of limitRate. The source in this example can have unlimited outstanding requests and will add a 200ms latency to getting the elements that are requested. Processing take somewhere between 10-15ms.

val start = Instant.now()
val job = Flux.create<Int> { sink ->
  sink.onRequest { demand ->
    scheduler.schedule({
      repeat(demand.toInt()) {
        sink.next(nextInt())
      }
    }, 200, TimeUnit.MILLISECONDS)
  }
}
  .log("demandflow", Level.INFO, SignalType.REQUEST)
  .limitRate(100)
  .flatMap({ nr ->
    Mono.fromCallable { nr.toString() }.delayElement(Duration.ofMillis(nextLong(10, 15)))
  }, 16)
  .subscribeOn(Schedulers.parallel())
  .take(1000)
  .doOnComplete {
    println("Time: ${Duration.between(start, Instant.now())}")
  }
  .subscribe()

Without limitRate

If we start the code above with the line limitRate(100) commented, we get the following result:

20:46:29.092 [parallel-1 ] INFO  demandflow - request(16)
20:46:29.367 [parallel-3 ] INFO  demandflow - request(1)
20:46:29.367 [parallel-8 ] INFO  demandflow - request(1)
20:46:29.368 [parallel-9 ] INFO  demandflow - request(1)
20:46:29.369 [parallel-1 ] INFO  demandflow - request(1)
20:46:29.369 [parallel-1 ] INFO  demandflow - request(1)
20:46:29.370 [parallel-10] INFO  demandflow - request(3)
20:46:29.370 [parallel-10] INFO  demandflow - request(1)
20:46:29.371 [parallel-2 ] INFO  demandflow - request(1)
20:46:29.371 [parallel-2 ] INFO  demandflow - request(1)
20:46:29.371 [parallel-2 ] INFO  demandflow - request(1)
...
20:46:42.551 [parallel-7 ] INFO  demandflow - request(1)
20:46:42.561 [parallel-10] INFO  demandflow - request(1)
20:46:42.732 [parallel-2 ] INFO  demandflow - request(1)
20:46:42.733 [parallel-3 ] INFO  demandflow - request(1)
20:46:42.735 [parallel-6 ] INFO  demandflow - request(1)
20:46:42.736 [parallel-4 ] INFO  demandflow - request(1)
20:46:42.736 [parallel-5 ] INFO  demandflow - request(1)
20:46:42.737 [parallel-7 ] INFO  demandflow - request(1)
20:46:42.739 [parallel-8 ] INFO  demandflow - request(1)

Time: PT13.752124S

After the first 16 elements that were demanded, it wil request mostly 1 at a time. Sometimes multiple request are bundled together. As you can see, processing this took over 13s. When ran with the limitRate(100) enable we have a completely different result:

20:49:55.068 [parallel-1 ] INFO  demandflow - request(100)
20:49:55.407 [parallel-7 ] INFO  demandflow - request(75)
20:49:55.644 [parallel-4 ] INFO  demandflow - request(75)
20:49:55.884 [parallel-9 ] INFO  demandflow - request(75)
20:49:56.125 [parallel-3 ] INFO  demandflow - request(75)
20:49:56.362 [parallel-8 ] INFO  demandflow - request(75)
20:49:56.601 [parallel-12] INFO  demandflow - request(75)
20:49:56.843 [parallel-12] INFO  demandflow - request(75)
20:49:57.082 [parallel-8 ] INFO  demandflow - request(75)
20:49:57.320 [parallel-8 ] INFO  demandflow - request(75)
20:49:57.560 [parallel-8 ] INFO  demandflow - request(75)
20:49:57.794 [parallel-5 ] INFO  demandflow - request(75)
20:49:58.034 [parallel-9 ] INFO  demandflow - request(75)
20:49:58.270 [parallel-3 ] INFO  demandflow - request(75)

Time: PT3.273889S

The first request is 100 to fill the initial buffer and the every so often we'll see a request for 75 elements to fill the buffer. With this configuration the processing took only a bit over 3 seconds. The impact of the 200ms latency is now minimized by requesting batches of elements.

Conclusion

The limitRate function is very useful to limit the number of demand requests flowing upstream. Instead of limiting the number of messages that can be processed by the pipeline, it actually greatly improves the performance. This function has helped me a lot to improve the performance of processing pipelines subscribing to a PubSub source.

Project Reactor

How to use groupBy in Reactor

How to use groupBy in Reactor

Project Reactor is a great reactive streams project that you will probably run into when you want to write reactive code in Spring. It is very powerful and can also be complex to wrap your head around. In this article I will look at the groupBy function of a Flux.

groupBy

The groupBy function will split the current flux into multiple fluxes. See it like a router. Based on a function you specify it will route the message to one of the groups. For example, when you you have a stream of numbers and perform intFlux.groupBy { it % 2 == 0 } , it will cut the flux in 2 fluxes. One will have a stream of even numbers and the other will have a stream with odd numbers. The resulting type of this groupBy is Flux<GroupedFlux<Boolean, Int>>. The outer flux is actually a finite stream of 2 GroupedFlux<Boolean, Int> elements. If the source on which the groupBy was applied was infinite, the 2 GroupedFlux objects are also infinite.

Processing the groups

Given the above example, there are 2 groups in a flux. Now we can write the logic to be performed on each group. Each GroupedFlux can be treated like a regular flux, but with an extra function: key(). This key function will return the result of the grouping function for all elements in this group. So in our example true for all the even numbers.

There is one little detail which is quite important. We need to make sure that we subscribe to all groups. This sounds trivial, but because it is part of a stream this could easily go wrong.

Let's work with another example in which we divide the numbers in 10 groups: intFlux.groupBy { it % 10 }. Each group function will just count how many numbers came through. This is what the countNumbers function does with the help of the increment function:

val countOccurrences = ConcurrentHashMap<Int, Long>()  

fun increment(group: Int) = countOccurrences.compute(group) { _, k -> (k ?: 0) + 1 }  

fun countNumbers(group: GroupedFlux<Int, Int>): Flux<Int> =  
    group.doOnNext { increment(group.key()) }

The countNumbers function has to be wired together in the flux with the groupBy:

Flux.generate<Int> { it.next(emitCounter.incrementAndGet()) }  
 .groupBy { it % 10 }  
 .flatMap(::countNumbers)  
 .subscribeOn(Schedulers.parallel())  
 .subscribe()

Simple enough isn't it. This works and when we inspect the countOccurrences every so often we would see something like:

nrs emited: 6324660 Occurrences per group: 0: 634584, 1: 634802, 2: 634804, 3: 634804, 4: 634805, 5: 634805, 6: 634805, 7: 634805, 8: 634806, 9: 634806
nrs emited: 13912044 Occurrences per group: 0: 1391214, 1: 1391220, 2: 1391221, 3: 1391221, 4: 1391222, 5: 1391222, 6: 1391222, 7: 1391222, 8: 1391223, 9: 1391223
nrs emited: 22109057 Occurrences per group: 0: 2210915, 1: 2210921, 2: 2210935, 3: 2210936, 4: 2210936, 5: 2210937, 6: 2210964, 7: 2210966, 8: 2210966, 9: 2210967
nrs emited: 30416867 Occurrences per group: 0: 3041697, 1: 3041703, 2: 3041704, 3: 3041704, 4: 3041704, 5: 3041704, 6: 3041704, 7: 3041705, 8: 3041705, 9: 3041705
nrs emited: 38748273 Occurrences per group: 0: 3874837, 1: 3874843, 2: 3874844, 3: 3874844, 4: 3874844, 5: 3874844, 6: 3874844, 7: 3874845, 8: 3874845, 9: 3874845
nrs emited: 47157048 Occurrences per group: 0: 4715713, 1: 4715719, 2: 4715720, 3: 4715720, 4: 4715720, 5: 4715720, 6: 4715720, 7: 4715720, 8: 4715721, 9: 4715721
nrs emited: 55470463 Occurrences per group: 0: 5547095, 1: 5547106, 2: 5547107, 3: 5547120, 4: 5547121, 5: 5547121, 6: 5547122, 7: 5547122, 8: 5547122, 9: 5547122
nrs emited: 62455436 Occurrences per group: 0: 6245552, 1: 6245557, 2: 6245557, 3: 6245558, 4: 6245558, 5: 6245558, 6: 6245558, 7: 6245558, 8: 6245558, 9: 6245559
nrs emited: 69543352 Occurrences per group: 0: 6954345, 1: 6954351, 2: 6954351, 3: 6954351, 4: 6954351, 5: 6954352, 6: 6954352, 7: 6954352, 8: 6954352, 9: 6954352

The elements are nicely distributed over the groups. Notice that we did not specify an explicit concurrency on the flatMap. If it is left out it will default to Queues.SMALL_BUFFER_SIZE, which is 256 (unless configured differently). The groupBy made it such that we only have a limited amount of groups and as long as the number of groups stay below 256, this will work perfectly.

Let's look at what will happen when we tune the concurrency to be lower than the number of groups:

Flux.generate<Int> { it.next(emitCounter.incrementAndGet()) }  
 .groupBy { it % 10 }  
 .flatMap(::countNumbers, 9)  
 .subscribeOn(Schedulers.parallel())  
 .subscribe()

The resulting output is:

nrs emitted: 2560 Occurrences per group: 1: 256, 2: 256, 3: 256, 4: 256, 5: 256, 6: 256, 7: 256, 8: 256, 9: 256
nrs emitted: 2560 Occurrences per group: 1: 256, 2: 256, 3: 256, 4: 256, 5: 256, 6: 256, 7: 256, 8: 256, 9: 256
nrs emitted: 2560 Occurrences per group: 1: 256, 2: 256, 3: 256, 4: 256, 5: 256, 6: 256, 7: 256, 8: 256, 9: 256

This will continue forever without any progress. The problem is that we have 10 groups, but only 9 workers. Each worker consumes 1 GroupedFlux, which means that there will be 1 group remaining without a worker. But why does the stream get stuck?

No more demand

To understand why the stream grinds to a halt we should look at the demand. You can read more about it in my blog "Debugging demand in Reactor". After adding the log statements:

fun countNumbers(group: GroupedFlux<Key, Int>): Flux<Int> =  
    group  
 .log("countNumbers", Level.INFO, SignalType.REQUEST, SignalType.ON_SUBSCRIBE, SignalType.ON_NEXT, SignalType.ON_NEXT)  
 .doOnNext { increment(group.key()) }

Flux.generate<Int> { it.next(emitCounter.incrementAndGet()) }  
 .log("groupBy", Level.INFO, SignalType.REQUEST, SignalType.ON_SUBSCRIBE, SignalType.ON_NEXT)
 .groupBy { it % 10 }
 .log("flatMap", Level.INFO, SignalType.REQUEST, SignalType.ON_SUBSCRIBE, SignalType.ON_NEXT)
 .flatMap(::countNumbers, 9)
 .subscribeOn(Schedulers.parallel())  
 .subscribe()

The resulting output is like this:

[groupBy] - onSubscribe([Fuseable] FluxGenerate.GenerateSubscription)
[flatMap] - onSubscribe([Fuseable] FluxGroupBy.GroupByMain)
[subscribe] - onSubscribe(FluxFlatMap.FlatMapMain)
[subscribe] - request(unbounded)
[flatMap] - request(9)
[groupBy] - request(256)
[groupBy] - onNext(1)
[countNumbers-1] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-1] - request(32)
[countNumbers-1] - onNext(1)
[groupBy] - request(1)
[groupBy] - onNext(2)
[countNumbers-2] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-2] - request(32)
[countNumbers-2] - onNext(2)
[groupBy] - request(1)
[groupBy] - onNext(3)
[countNumbers-3] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-3] - request(32)
[countNumbers-3] - onNext(3)
[groupBy] - request(1)
[groupBy] - onNext(4)
[countNumbers-4] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-4] - request(32)
[countNumbers-4] - onNext(4)
[groupBy] - request(1)
[groupBy] - onNext(5)
[countNumbers-5] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-5] - request(32)
[countNumbers-5] - onNext(5)
[groupBy] - request(1)
[groupBy] - onNext(6)
[countNumbers-6] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-6] - request(32)
[countNumbers-6] - onNext(6)
[groupBy] - request(1)
[groupBy] - onNext(7)
[countNumbers-7] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-7] - request(32)
[countNumbers-7] - onNext(7)
[groupBy] - request(1)
[groupBy] - onNext(8)
[countNumbers-8] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-8] - request(32)
[countNumbers-8] - onNext(8)
[groupBy] - request(1)
[groupBy] - onNext(9)
[countNumbers-9] - onSubscribe([Fuseable] FluxGroupBy.UnicastGroupedFlux)
[countNumbers-9] - request(32)
[countNumbers-9] - onNext(9)
[groupBy] - request(1)
[groupBy] - onNext(10)
[groupBy] - onNext(11)
[countNumbers-1] - onNext(11)
[groupBy] - request(1)
[groupBy] - onNext(12)
[countNumbers-2] - onNext(12)
[groupBy] - request(1)
[groupBy] - onNext(13)
[countNumbers-3] - onNext(13)
[groupBy] - request(1)
[groupBy] - onNext(14)
[countNumbers-4] - onNext(14)
[groupBy] - request(1)
[groupBy] - onNext(15)
[countNumbers-5] - onNext(15)
[groupBy] - request(1)
[groupBy] - onNext(16)
[countNumbers-6] - onNext(16)
[groupBy] - request(1)
[groupBy] - onNext(17)
[countNumbers-7] - onNext(17)
[groupBy] - request(1)
[groupBy] - onNext(18)
[countNumbers-8] - onNext(18)
[groupBy] - request(1)
[groupBy] - onNext(19)
...
[countNumbers-8] - onNext(468)
[groupBy] - request(1)
[groupBy] - onNext(469)
[countNumbers-9] - onNext(469)
[groupBy] - request(1)
[groupBy] - onNext(470)
[groupBy] - onNext(471)
[countNumbers-1] - onNext(471)
[countNumbers-1] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(472)
[countNumbers-2] - onNext(472)
[countNumbers-2] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(473)
[countNumbers-3] - onNext(473)
[countNumbers-3] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(474)
[countNumbers-4] - onNext(474)
[countNumbers-4] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(475)
[countNumbers-5] - onNext(475)
[countNumbers-5] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(476)
[countNumbers-6] - onNext(476)
[countNumbers-6] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(477)
[countNumbers-7] - onNext(477)
[countNumbers-7] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(478)
[countNumbers-8] - onNext(478)
[countNumbers-8] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(479)
[countNumbers-9] - onNext(479)
[countNumbers-9] - request(24)
[groupBy] - request(1)
[groupBy] - onNext(480)
[groupBy] - onNext(481)
[countNumbers-1] - onNext(481)
[groupBy] - request(1)
[groupBy] - onNext(482)
[countNumbers-2] - onNext(482)
[groupBy] - request(1)
[groupBy] - onNext(483)
[countNumbers-3] - onNext(483)
[groupBy] - request(1)
[groupBy] - onNext(484)
[countNumbers-4] - onNext(484)
[groupBy] - request(1)
[groupBy] - onNext(485)
[countNumbers-5] - onNext(485)
...
[groupBy] - request(1)
[groupBy] - onNext(2558)
[countNumbers-8] - onNext(2558)
[groupBy] - request(1)
[groupBy] - onNext(2559)
[countNumbers-9] - onNext(2559)
[groupBy] - request(1)
[groupBy] - onNext(2560)
nrs emitted: 2560 Occurrences per group: 1: 256, 2: 256, 3: 256, 4: 256, 5: 256, 6: 256, 7: 256, 8: 256, 9: 256
nrs emitted: 2560 Occurrences per group: 1: 256, 2: 256, 3: 256, 4: 256, 5: 256, 6: 256, 7: 256, 8: 256, 9: 256
nrs emitted: 2560 Occurrences per group: 1: 256, 2: 256, 3: 256, 4: 256, 5: 256, 6: 256, 7: 256, 8: 256, 9: 256
nrs emitted: 2560 Occurrences per group: 1: 256, 2: 256, 3: 256, 4: 256, 5: 256, 6: 256, 7: 256, 8: 256, 9: 256

The logs give a lot of information about what is going on under the hood. At first the onSubscribe event that starts the Flux is passed along. Keep in mind that a Flux is nothing but a definition until you subscribe, "nothing happens until you subscribe". This is called a cold stream. When the subscribe reaches the last element in the stream, the demand will start flowing back.

The subscribe has not back-pressure and can handle everything, so it wil request an unbouded demand. The flatMap has a concurrency of 9, so it will send a demand upstream of 9. Note that this is a demand for 9 elements of type GroupedFlux<Int>, so we request 9 groups. The groupBy has the default behaviour to request a demand of 256 elements. This will reach the source and the source will start emitting 256 elements (if possible, which it is in this case). These 256 elements will be distributed over the 10 groups that are defined in the grouping function. The output above shows that the first time an element is emitted (onNext(1)) it will subscribe to the that group and we immediately see demand flowing.

This shows that the subscription to a GroupedFlux only happens once the first element for that group is available. We also see that the first element was dispatched to a group and immediately the groupBy will signal new demand upstream. This will happen 8 more times until we reach element 10 which would end up in a countNumbers-10, but we do not have a processor for that group. So it will stay in the groupBy, which has a 256 demand, but now 1 element cannot dispatched. Element 11 will be dispatched to group 1 again. Every subflux has a demand of 32 as we can see. The elements will be divided over the active 9 groups, but the elements that are for group 10 will get stuck.

When 3/4 of the demand for a group is fulfilled it will re-signal demand. This is the request(24). The groupBy with a buffer of 256 will continuously pass elements downstream when they are available. This will happen a few times until the groupBy has 256 elements for group 10 and needs to keep that. The groupBy indicated a demand of 256 and now all demand is filled with elements for group 10. There is no more demand and we have full back-pressure. Therefore, the pipeline is now stuck "waiting" for demand for the elements of group 10.

Conclusion

If you use the groupBy function in a Flux, you must make sure that there are enough subscribers in the flatMap, otherwise your stream will get stuck. To "Debugging demand in Reactor" the loggin functionality is really helpful. I learned a lot while I was writing this blog and got even more insight into the internals of Reactor.

Project Reactor

Debugging demand in Reactor

Debugging demand in Reactor

Project Reactor is a great reactive streams project that you will probably run into when you want to write reactive code in Spring. It is very powerful and can also be complex to wrap your head around. Something that can be confusing is how demand flows upstream and messages flow downstream.

Getting insight in flow of demand

In any Flux it is possible to show demand by using the log function on a flux. With this function you can specify what SignalType you want to be logged. Let's look at an example:

val counter = AtomicLong()  

fun process(nr: Long): Mono<Long> =  
    Mono.just(nr).delayElement(Duration.ofMillis(nextLong(1, 25)))  

Flux.generate<Long> { it.next(counter.incrementAndGet()) }  
 .log("beforeFlatmap", Level.INFO, SignalType.REQUEST)  
 .flatMap(::process)  
 .log("beforeTake", Level.INFO, SignalType.REQUEST)  
 .take(100)  
 .log("beforeSubscribe", Level.INFO, SignalType.REQUEST)  
 .subscribeOn(Schedulers.parallel())  
 .subscribe()  

Thread.sleep(4000)  
println("Counter: ${counter.get()}")

When run this will print:

13:43:15.197 [parallel-1] INFO beforeSubscribe - request(unbounded)
13:43:15.200 [parallel-1] INFO beforeTake - request(unbounded)
13:43:15.200 [parallel-1] INFO beforeFlatmap - | request(256)
13:43:15.251 [parallel-6] INFO beforeFlatmap - | request(1)
13:43:15.251 [parallel-6] INFO beforeFlatmap - | request(1)
13:43:15.251 [parallel-6] INFO beforeFlatmap - | request(1)
13:43:15.252 [parallel-6] INFO beforeFlatmap - | request(1)
13:43:15.252 [parallel-8] INFO beforeFlatmap - | request(1)
...
13:43:15.260 [parallel-2] INFO beforeFlatmap - | request(4)
13:43:15.260 [parallel-2] INFO beforeFlatmap - | request(12)
13:43:15.260 [parallel-2] INFO beforeFlatmap - | request(2)
13:43:15.261 [parallel-2] INFO beforeFlatmap - | request(6)
13:43:15.261 [parallel-2] INFO beforeFlatmap - | request(7)
13:43:15.261 [parallel-2] INFO beforeFlatmap - | request(3)
13:43:15.261 [parallel-2] INFO beforeFlatmap - | request(2)
13:43:15.262 [parallel-2] INFO beforeFlatmap - | request(3)
13:43:15.262 [parallel-2] INFO beforeFlatmap - | request(3)
13:43:15.262 [parallel-2] INFO beforeFlatmap - | request(1)
Counter: 350

The logs we showing the request is the demand flowing up (towards the source) and gives us insight in what happens with the demand. The first demand that is sent is when the stream is subscribed to. Remember, demand flows upstream, so in our code bottom to top. The subscribe function will always request an unbounded amount of events. Next we will reach the take function that doesn't change the demand and also sends unbounded demand. So up until this point we do not have any back pressure control. Or said differently, these function can keep up with anything upstream may send. Next we will hit the flatMap, with it's default concurrency (256). The flatMap changes the demand. There are only 256 workers, so it can only process 256 messages at this time. Therefore it signals a demand of 256. This demand will reach the source and the source can now emit 256 elements. When a task in the flatMap is done it will not encounter any back pressure, because the demand downstream is unbounded. This means, that when a task is done it can immediately emit the message and signal it has new demand, by requesting 1 extra message.

When 100 messages reached the take function the stream will be completed. However, in the end we see much more messages were submitted from the source, namely 350. This happens, because everything is happening at the same time. When a task in the flatMap is done, it will signal demand by requesting a new element. Therefore it can happen, that there are more messages emitted than the 100 requested.

Conclusion

Using the log on a Flux can greatly help in understanding what's going on under the covers. We've seen in above example that even in trivial flows it leads to interesting discoveries.