Reordering Futures in Java Streams

Futures are an interesting concept in concurrent and parallel programming. I’ve written about them several time, the last time in Advanced Futures With Play Framework. Streams are a concept which came with Java 8 to operate on a stream of objects, transform and filter them. Combining Futures and Streams in Java sometimes isn’t easy. Some time ago a problem arised in code where slow futures in a stream were blocking other objects in the stream to be processed. Usually you’d use parallel streams in this case, but for operations concerns parallel streams had the wrong thread characteristic. Therefor I’ve thought about ways to unblock streams.

Lets start with a simple example. We have a simple asynchronous API call and a Stream of items and want to call an external API potentially with IO which takes some time:

We iterate over the stream and make the call. The call returns CompletableFuture so our stream becomes a Stream of Futures. In the end we collect the items of the stream into a List with a stream collector, in this case Collectors.toList(). As a minor challenge the collect call returns List<CompletableFuture<T>>, but we would like to have CompletableFuture<List<Integer>> for easier consumption. Luckily the futures library from Spotify provides an allAsList call which turns a List of Futures into a Future of List.

When running the code it outputs

The stream processes all items without blocking (4x Stream 1), then all API calls finish (Done) and afterwards the results are printed as 301, 1001, 102, 21. As the stream operates on Futures which do not block, our code immediatly runs after creation of the stream without waiting for the stream processing to end. Only when we call join() in the last line, our code blocks and waits until all futures have finished.

Streams are not only mapping but also support filtering. We now want to filter on the values of futures. As our Stream needs to deal with the value of a future, we need to wait on the future to complete, pull the value from the Future context into the Stream context with f.join():

Running this code is not as nice as before. The stream processes one item, then blocks until the call finishes, then processes the next item.

As can be seen from the output, the longer running task (1000) blocks (join) processing of futher stream items. Most often this is solved by using parallel streams. A parallel version would look like this

As expected the output again processes all items, the 1000ms API call does no longer block processing of other items:

If we want to have more control over when and how many threads are used at what time, and we also want single threaded Stream, we can implement a custom stream collector. With a custom collector, we can reorder the futures of the stream and move those that have completed to the front. Our custom collector is called FutureReorderCollector. It gathers all Futures and creates a new stream with a new ordering. Finished futures are now moved forward and running futures are kept back.

Output now is as before, although we are not using parallel streams and have a long running (1000ms) API call.

The long running Thread is moved to the end of the stream. Stream execution is blocked only as much as needed.

Our collector needs an executor to process those Futures. An optimized version could work with the executor already used by the futures. But in this case we provide a custom ExecutorService which we need to shut down after we finished work.

Implementation of the custom collector is simple. It collects all futures into a custom FutureCompletionService and creates an new Stream in the finisher with Stream.generate(f::take).limit(f.size()).

The collector uses a custom completion service, were we add futures to a JDK CompletionService and block on polling finished futures in take. One implementation detail is the counting of submitted tasks with AtomicInteger. This is used to limit the Stream generated in FutureReorderCollector.

Streams and futures are powerful concepts in Java. Sometimes it’s not easy to combine them. With this code you can implement your own collector to change how streams process their items.