Reordering Futures in Java Streams

Futures are an interesting concept in concurrent and parallel programming. I've written about them several time, the last time in Advanced Futures With Play Framework. Streams are a concept which came with Java 8 to operate on a stream of objects, transform and filter them. Combining Futures and Streams in Java sometimes isn't easy. Some time ago a problem arised in code where slow futures in a stream were blocking other objects in the stream to be processed. Usually you'd use parallel streams in this case, but for operations concerns parallel streams had the wrong thread characteristic. Therefor I've thought about ways to unblock streams.

Lets start with a simple example. We have a simple asynchronous API call and a Stream of items and want to call an external API potentially with IO which takes some time:

We iterate over the stream and make the call. The call returns CompletableFuture so our stream becomes a Stream of Futures. In the end we collect the items of the stream into a List with a stream collector, in this case Collectors.toList(). As a minor challenge the collect call returns List<CompletableFuture<T>>, but we would like to have CompletableFuture<List<Integer>> for easier consumption. Luckily the futures library from Spotify provides an allAsList call which turns a List of Futures into a Future of List.

When running the code it outputs

The stream processes all items without blocking (4x Stream 1), then all API calls finish (Done) and afterwards the results are printed as 301, 1001, 102, 21. As the stream operates on Futures which do not block, our code immediatly runs after creation of the stream without waiting for the stream processing to end. Only when we call join() in the last line, our code blocks and waits until all futures have finished.

Streams are not only mapping but also support filtering. We now want to filter on the values of futures. As our Stream needs to deal with the value of a future, we need to wait on the future to complete, pull the value from the Future context into the Stream context with f.join():

Running this code is not as nice as before. The stream processes one item, then blocks until the call finishes, then processes the next item.

As can be seen from the output, the longer running task (1000) blocks (join) processing of futher stream items. Most often this is solved by using parallel streams. A parallel version would look like this

As expected the output again processes all items, the 1000ms API call does no longer block processing of other items:

If we want to have more control over when and how many threads are used at what time, and we also want single threaded Stream, we can implement a custom stream collector. With a custom collector, we can reorder the futures of the stream and move those that have completed to the front. Our custom collector is called FutureReorderCollector. It gathers all Futures and creates a new stream with a new ordering. Finished futures are now moved forward and running futures are kept back.

Output now is as before, although we are not using parallel streams and have a long running (1000ms) API call.

The long running Thread is moved to the end of the stream. Stream execution is blocked only as much as needed.

Our collector needs an executor to process those Futures. An optimized version could work with the executor already used by the futures. But in this case we provide a custom ExecutorService which we need to shut down after we finished work.

Implementation of the custom collector is simple. It collects all futures into a custom FutureCompletionService and creates an new Stream in the finisher with Stream.generate(f::take).limit(f.size()).

The collector uses a custom completion service, were we add futures to a JDK CompletionService and block on polling finished futures in take. One implementation detail is the counting of submitted tasks with AtomicInteger. This is used to limit the Stream generated in FutureReorderCollector.

Streams and futures are powerful concepts in Java. Sometimes it's not easy to combine them. With this code you can implement your own collector to change how streams process their items.

Advanced Futures With Play Framework

Some time ago I wrote A Little Guide on Using Futures for Web Developers to explain how to use Futures in web development to speed up page delivery with parallelism.

Futures in Play

Lately I've played a little with the Play Framework using its async actions so I want to add to this guide. Play controllers can return async actions in the form of Future[Action[...]] which then are executed.

Asynchronicity is not for free. It creates more complicated code and is harder to detect. Why would we use it then? Mainly to speed up requests by parallelizing code to several cores, do other work during IO waits or parallelize waits.

For an example we have three IO calls and two html fragments renderings. Together our request takes 80ms.

When we asynchronize IO and parallelize rendering to more cores we reduce request time to just 50 ms. This is nearly a 50% speed up. And the more database IO your code has and the more you can parallelize rendering parts of a page and the more cores you have, the higher the speedup.

There are some pitfalls though. If your CPU is highly congested, rendering time may have outliers and take longer, because in the example above render2() might be blocked on the second core. So your request waits for the task on the second core to finish. If you use 16 cores then only one needs to be congested and your request is hold up (depending on your threadpool / executor implementation). Async code might also take up more memory, because threads are blocked waiting for IO. This is the case if you do not use real async IO (database) drivers and most "async" drivers are not async to the core but use threads themselves to achieve asynchronicity.

Using asynchronous code also takes a little time and thinking, because you can easily by mistake add some blocking part and all your nice parallelism goes out the window.

One Play action in one of my side projects uses Futures for asynchronous code and looks like this:

The code has three database operations with each returning a Future[..]. First all() returns a list of all urls of type Future[Seq[String]]. The double map calls get into the list and iterates over every item. The calls to byUrl and history return their own Futures of type Future[Option[Page]] and Future[Seq[String]].

Our code looks more complicated than a simple for comprehension but with a simple for comprehension there are some pitfalls e.g.

The code can be written with nested fors though if we translate our working example from before from nested maps to nested fors. You can recognize the Future.sequence and flatten calls from above.

Though complicated looking this code is pure parallelism bliss as the controller method returns immediatly without blocking or waiting with a Future for Play to execute. Also byUrl and history get called in parallel as they do not depend on each other. Suppose all() returns a list of 10 urls the code will then create 20 Futures which are worked on in parallel (depending on the number of cores/hyperthreads and the threadpool used). The code does not create 2 Futures sequentially for one url after another waiting for each Future to succeed but creates Futures for all 10 urls in one swoop.

Expressed in dependencies

instead of

Blocking Threads

You can test this construct of nested fors by running

run with

You will see something like

Why isn't the code running all 2x5 Futures at once as promised above? We use the global standard Scala ExecutionContext. The context limits the running threads to your number of cores. When we have blocking Futures e.g. because of IO, no more threads are executed. Fortunately we can tell Scala if our Future is blocking. When we change the code to

we get the expected results:

Database access with Future

The byUrl method which returns a Page by its url looks like this:

In our usecase we use getOrElse for working with the Option to get the title. If empty results should be sorted out in our for chain, this gets more complicated. But there is some help working with Future[Option[...]] in A Little Guide on Using Futures for Web Developers.

The method executeWithRSF uses

for parallelism. This can be changed to use a native asynchronous database driver.

More than one Future depends on result of previous Future

Sometimes you have more than one Future that depends on a previous Future and we can generalize our case from above. If you just call Futures one after each other, as before the Futures are created sequentially and not executed in parallel.

Here b is executed after a has finished, c is executed after b is finished.

With this code all three are executed at the same time:

Asynchronous Write/Database IO after Form Validation

How do you use Action.async with Form Validation? This code uses async futures to receive a webpage for an url the user has entered. Then the title and description is inserted into a database where insert returns Future[Option[Int]] with the new primary key of the inserted page.

We need to use a monad transformer FutureO because otherwise we'll end up with Future[Option[Future[Option[Int]]]] which isn't flatable. FutureO[FutureO[Int]] is flatable to FutureO[Int] which then can be transformed to Future[Option[Int]]. The monad transformer takes Future[Option[...]] as a constructor argument.

Moving to asynchronous request handling code has sped up many of my projects. Getting into thinking with Futures is a steep learning curve at first but pays of. You can finde more about Futures in my A Little Guide on Using Futures for Web Developers.

Comparing Tagged Types and Design by Contract

I'm working on a new mainstream programming language that prevents more bugs than current mainstream languages. One idea it will incorporate are Tagged Types. I wrote about them recently in Real Life Bug Prevention in Scala. Tagged Types can be used to solve different problems. Often they are used as marker types. In the context of this blog posts they work as contracts on other types therefor I call them contract types (CT) from here on. An existing Scala library which implements those with lots of functionality is Refined - good talk about Refined.

Design by Contract for Preventing Bugs

Design by Contract (DbC) is a an idea to prevent bugs by specifying constraints on data and constraints on how data relates to other data. The idea of DbC has been floating around for decades, but hasn't made it into mainstream. The main reason I haven't used DbC is because validation happens at runtime instead of compile time.

The idea is to express pre-conditions, post-conditions and invariants for method calls and classes. Pre-conditions are checked before the execution of a method, the post-conditions are checked at the end of the method, invariants are checked before and after execution of the method.

The canonical DbC example is a Stack (Source)

Contract Type Example

Lets start with a simpler example. We have a method that takes Integers and returns Integers.

The method has the additional constraint that the argument needs to be a positive integer. In some languages there are types for positive integers (or more likely non-negative including zero) but in Scala we use Int.

With a hypothetical Design by Contract framework (like iContract in Java) we could express the constraint with

The DbC framework creates checks during compilation for our constraint which are executed at runtime. If the constraint is violated, an exception is thrown. This is equivalent to using require in Scala, but makes the contract explicit and part of the documented API. With require in Scala this would look like

Can this be expressed with contract types? Yes, we supply an Int @@ Positive to the method instead of Int. This way the method can be sure that the argument is a positive Integer.

How does a developer get the required type to call this method? One way is to have a factory method to create the required type from Int with a contract type Positive tagged on. Developers supply an Int to positive(). If the argument is positive, the method returns Some[Int @@ Positive] otherwise it returns None. The result can then be supplied to our method.

This way the caller of a method needs to prove that data satisfies a certain constraint, in this case Positive. If the caller can't supply data that satisfies a constraint, he can't call the method. The responsibility to provide correct data lies with the caller, not the implementor of the method. This is a huge shift in development responsibility from


although the change in code looks rather insignificant.

Needing a method for conversion looks cumbersome, but this conversion only happens at the interfacing between two modules or systems and so is rare.

Generation of contract typed values ties neatly into validation, for example with Scalactic

Positive Performance Side Effect

If the state of constraints is unclear, constraints are checked more than once. For example a system might take a String as input and then checks in many places if the String is empty. With

the constraint is checked once and the contract type removes all constraint checks in other parts of the system. Sometimes even duplicated work is removed. Often different parts of a system need data in some form, for example lowercase Strings.

With constract types the String is converted once and not several times in different parts of the system. This removes work and increases performance.

Examining Contract Types

We should look at more examples of contract types

Sometimes the difference between tagged types that add semantic meaning to data and contract types is blurry. I would consider

not a contract type though. It's more difficult with

and even more difficult with

While the first looks to be a contract type, the second looks more like a tagged type.

Benefits and Drawbacks

Contract Types

  • Correctness is checked at compile time, not runtime
  • Very few additional code needed
  • The burden is on the caller to prove an argument is correct

Design by Contract

  • Can fail at runtime
  • Design by contract is more powerful

Unit Testing

  • Very powerful
  • Additional (and more) code needed
  • Code seperate from main code
  • Run at compile time
  • Different testing mind set

The main benefit of contract types compared to DbC are that they are ensured at the call site instead of in the method. Also they are visible to the caller during development. More powerful systems for specifications exist, like TLA+. These also have the benefit to prove correctness at comile time but have the drawback of a much higher complexity. This is fine if you write a distributed database but overkill for web frontends to databases. I consider contract types something between regular types, DbC and dependent types like in Agda or Idris where it is possible for types to depend on data.


Contract types solve a special problem. Often types in mainstream languages do not express enough about the data they type, for example that a String is lowercase or a User is logged in. Interfaces in mainstream languages therefor have implicit requirements about the data they receive which easily leads to bugs. Contract types like Int @@ Positive express the requirement as a type. Although this can be solved by inheritance, interfaces or type classes, all of these concepts are harder to understand and read than contract types. There is more to be said in another blog post.