Reordering Futures in Java Streams

Futures are an interesting concept in concurrent and parallel programming. I've written about them several time, the last time in Advanced Futures With Play Framework. Streams are a concept which came with Java 8 to operate on a stream of objects, transform and filter them. Combining Futures and Streams in Java sometimes isn't easy. Some time ago a problem arised in code where slow futures in a stream were blocking other objects in the stream to be processed. Usually you'd use parallel streams in this case, but for operations concerns parallel streams had the wrong thread characteristic. Therefor I've thought about ways to unblock streams.

Lets start with a simple example. We have a simple asynchronous API call and a Stream of items and want to call an external API potentially with IO which takes some time:

public static CompletableFuture<Integer> futureApiCall(Integer i) {
    return CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Done => " + i);
        return i;
    });
}

    System.out.println("\n== Without filter");
    List<CompletableFuture<Integer>> r1 = Stream.of(300, 1000, 101, 20)
            .peek(i -> System.out.println("Stream 1 => " + i))
            .map(i -> futureApiCall(i))
            .map(f -> f.thenApply(i -> i + 1))
            .collect(Collectors.toList());

    // Our code immediatly runs here, not waiting on Stream processing

    // turn List of Futures into Future of List
    System.out.println(CompletableFutures.allAsList(r1).join());

We iterate over the stream and make the call. The call returns CompletableFuture so our stream becomes a Stream of Futures. In the end we collect the items of the stream into a List with a stream collector, in this case Collectors.toList(). As a minor challenge the collect call returns List<CompletableFuture<T>>, but we would like to have CompletableFuture<List<Integer>> for easier consumption. Luckily the futures library from Spotify provides an allAsList call which turns a List of Futures into a Future of List.

When running the code it outputs

== Without filter
Stream 1 => 300   
Stream 1 => 1000
Stream 1 => 101
Stream 1 => 20
Done => 101
Done => 20
Done => 300
Done => 1000
[301, 1001, 102, 21]

The stream processes all items without blocking (4x Stream 1), then all API calls finish (Done) and afterwards the results are printed as 301, 1001, 102, 21. As the stream operates on Futures which do not block, our code immediatly runs after creation of the stream without waiting for the stream processing to end. Only when we call join() in the last line, our code blocks and waits until all futures have finished.

Streams are not only mapping but also support filtering. We now want to filter on the values of futures. As our Stream needs to deal with the value of a future, we need to wait on the future to complete, pull the value from the Future context into the Stream context with f.join():

    System.out.println("\n== Filter without reordering");
    List<CompletableFuture<Integer>> r2 = Stream.of(300, 1000, 101, 20)
            .peek(i -> System.out.println("Stream 1 => " + i))
            .map(i -> futureApiCall(i))
            .filter(f ->  f.join() % 2 == 0)
            .map(f -> f.thenApply(i -> i + 1))
            // not necessary with a collector
            // that collects Futures, e.g. from
            // Spotify
            .collect(Collectors.toList());

    // turn List of Futures into Future of List
    System.out.println(CompletableFutures.allAsList(r2).join());

Running this code is not as nice as before. The stream processes one item, then blocks until the call finishes, then processes the next item.

== Filter without reordering
Stream 1 => 300
Done => 300
Stream 1 => 1000
Done => 1000
Stream 1 => 101
Done => 101
Stream 1 => 20
Done => 20
[301, 1001, 21]

As can be seen from the output, the longer running task (1000) blocks (join) processing of futher stream items. Most often this is solved by using parallel streams. A parallel version would look like this

// we want our own pool
ForkJoinPool customThreadPool = new ForkJoinPool(2);
System.out.println("\n== Parallel version");
List<Integer> r4 = CompletableFuture.supplyAsync( () -> Stream.of(300, 1000, 101, 20)
    .parallel()
    .peek(i -> System.out.println("Stream 1 => " + i))
    .map(i -> futureApiCall(i))
    .filter(f ->  f.join() % 2 == 0)
    .map(f -> f.thenApply(i -> i + 1))
    .map(f -> f.join())
    .peek(i -> System.out.println("Stream 2 => " + i))
    .collect(Collectors.toList())
, customThreadPool).join();

As expected the output again processes all items, the 1000ms API call does no longer block processing of other items:

== Parallel version
Stream 1 => 101
Stream 1 => 1000
Stream 1 => 20
Stream 1 => 300
Done => 20
Stream 2 => 21
Done => 101
Done => 300
Stream 2 => 301
Done => 1000
Stream 2 => 1001
[301, 1001, 21]

If we want to have more control over when and how many threads are used at what time, and we also want single threaded Stream, we can implement a custom stream collector. With a custom collector, we can reorder the futures of the stream and move those that have completed to the front. Our custom collector is called FutureReorderCollector. It gathers all Futures and creates a new stream with a new ordering. Finished futures are now moved forward and running futures are kept back.

    System.out.println("\n== Filter with reordering");
    List<Integer> r3 = Stream.of(300, 1000, 101, 20)
            .peek(i -> System.out.println("Stream 1 => " + i))
            .map(i -> futureApiCall(i))
            // reorder futures here
            .collect(new FutureReorderCollector<>(executor))
            // new stream here
            .filter(i -> i % 2 == 0)
            .map(i -> i + 1)
            .peek(i -> System.out.println("Stream 2 => " + i))
            .collect(Collectors.toList());

Output now is as before, although we are not using parallel streams and have a long running (1000ms) API call.

== Filter with reordering
Stream 1 => 300
Stream 1 => 1000
Stream 1 => 101
Stream 1 => 20
Done => 101
Done => 20
Stream 2 => 21
Done => 300
Stream 2 => 301
Done => 1000 
Stream 2 => 1001
[21, 301, 1001]

The long running Thread is moved to the end of the stream. Stream execution is blocked only as much as needed.

Our collector needs an executor to process those Futures. An optimized version could work with the executor already used by the futures. But in this case we provide a custom ExecutorService which we need to shut down after we finished work.

    ExecutorService executor = Executors.newFixedThreadPool(4);
    ...
    executor.shutdown();

Implementation of the custom collector is simple. It collects all futures into a custom FutureCompletionService and creates an new Stream in the finisher with Stream.generate(f::take).limit(f.size()).

import java.util.Collections; 
import java.util.Set;
import java.util.concurrent.*;
import java.util.function.*;
import java.util.stream.Collector;
import java.util.stream.Stream;

// DO NOT USE IN PRODUCTION
public class FutureReorderCollector<T>
    implements Collector<CompletableFuture<T>, 
      FutureCompletionService<T>, 
      Stream<T>> {

  private ExecutorService e;

  public FutureReorderCollector(ExecutorService e) {
    this.e = e;
  }

  @Override
  public Supplier<FutureCompletionService<T>> supplier() {
    return () -> new FutureCompletionService<>(e);
  }

  @Override
  public BiConsumer<FutureCompletionService<T>, CompletableFuture<T>> accumulator() {
    return FutureCompletionService::submit;
  }

  // does not work for concurrent
  @Override
  public BinaryOperator<FutureCompletionService<T>> combiner() {
    // ... to be implemented .... 
 }

  @Override
  public Function<FutureCompletionService<T>, Stream<T>> finisher() {
    return (f) -> Stream.generate(f::take).limit(f.size());
 }

  @Override
  public Set<Characteristics> characteristics() {
    return Collections.singleton(
            Characteristics.UNORDERED
    );
  }
}

The collector uses a custom completion service, were we add futures to a JDK CompletionService and block on polling finished futures in take. One implementation detail is the counting of submitted tasks with AtomicInteger. This is used to limit the Stream generated in FutureReorderCollector.

public class FutureCompletionService<T> {
  private CompletionService<T> ecs;
  private AtomicInteger count = new AtomicInteger(0);

  public FutureCompletionService(ExecutorService e) {
      ecs = new ExecutorCompletionService<>(e);
  }

  public void submit(CompletableFuture<T> f) {
    count.incrementAndGet();
    ecs.submit(f::get);
  }

  public T take() {
    T r = null;
    try {
        r = ecs.take().get();
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    return r;
  }

  public int size() {
    return count.intValue();
  }
}

Streams and futures are powerful concepts in Java. Sometimes it's not easy to combine them. With this code you can implement your own collector to change how streams process their items.

Advanced Futures With Play Framework

Some time ago I wrote A Little Guide on Using Futures for Web Developers to explain how to use Futures in web development to speed up page delivery with parallelism.

Futures in Play

Lately I've played a little with the Play Framework using its async actions so I want to add to this guide. Play controllers can return async actions in the form of Future[Action[...]] which then are executed.

Asynchronicity is not for free. It creates more complicated code and is harder to detect. Why would we use it then? Mainly to speed up requests by parallelizing code to several cores, do other work during IO waits or parallelize waits.

For an example we have three IO calls and two html fragments renderings. Together our request takes 80ms.

 io1()                       - 20 ms
 |-> io2()                   - 20 ms
     |-> io3()               - 20 ms
         |-> render1()       - 10 ms
             |-> render2()   - 10 ms
                             = 80 ms

When we asynchronize IO and parallelize rendering to more cores we reduce request time to just 50 ms. This is nearly a 50% speed up. And the more database IO your code has and the more you can parallelize rendering parts of a page and the more cores you have, the higher the speedup.

 io1()                       - 20 ms
     |-> io2()               - 20 ms
     |-> io3()               - 20 ms
         |-> render1() core1 - 10 ms
         |-> render2() core2 - 10 ms
                             = 50 ms

There are some pitfalls though. If your CPU is highly congested, rendering time may have outliers and take longer, because in the example above render2() might be blocked on the second core. So your request waits for the task on the second core to finish. If you use 16 cores then only one needs to be congested and your request is hold up (depending on your threadpool / executor implementation). Async code might also take up more memory, because threads are blocked waiting for IO. This is the case if you do not use real async IO (database) drivers and most "async" drivers are not async to the core but use threads themselves to achieve asynchronicity.

Using asynchronous code also takes a little time and thinking, because you can easily by mistake add some blocking part and all your nice parallelism goes out the window.

One Play action in one of my side projects uses Futures for asynchronous code and looks like this:

// PageJdbc.all() -> 
//    get urls of all pages, returns Future[Seq[String]]
// PageJdbc.byUrl() -> 
//    get page for url, return Future[Option[Page]]
// PageJdbc.history() -> 
//    get all titles of time of the page by url, returns Future[Seq[Double]]

def index = Action.async { implicit request: Request[AnyContent] =>
  val dataUrlsF = PageJdbc.all()
    // Map outer Future
    .map(
    // Iterate over the inner Seq
    _.map(u => {
    // call methods first, they would block in "for"
    val titleF = PageJdbc.byUrl(u).map(_.map(_.title).getOrElse("-"))
    val distsF = PageJdbc.history(u).map(dists)
    // Results in Seq[Future[(a,b,c)]]
    for (
      // Future.successful so result type is 
      // Future[(_,_,_)]
      url <- Future.successful(u);
      title <- titleF;
      dists <- distsF
    ) yield (url, title, dists)
  }))
  // results in Future[Seq[Future[(_,_,_)]]]
  // Convert F[S[F[...]]] to F[F[S[..]]] then flat to F[S[...]]
  .map(f => Future.sequence(f)).flatten

  // Returns F[Result[...]]
  for (dataUrls <- dataUrlsF) yield {
    Ok(views.html.index(ExampleForm.form, dataUrls))
  }
}

The code has three database operations with each returning a Future[..]. First all() returns a list of all urls of type Future[Seq[String]]. The double map calls get into the list and iterates over every item. The calls to byUrl and history return their own Futures of type Future[Option[Page]] and Future[Seq[String]].

Our code looks more complicated than a simple for comprehension but with a simple for comprehension there are some pitfalls e.g.

 // Does not work (!)
 val dataUrlsF = for (
     // Does not result in a result type of Future[Seq[...]] 
     urls <- PageJdbc.all();
     u <- urls;
     // called when all() finished, ok.
     title <- PageJdbc.byUrl(u).map(_.map(_.title).getOrElse("-"));
     // called when byUrl() finished, not all() #FAIL
     dists <- PageJdbc.history(u).map(dists); 
) yield (u, title, dists) 

The code can be written with nested fors though if we translate our working example from before from nested maps to nested fors. You can recognize the Future.sequence and flatten calls from above.

// returns Future[Seq[(_,_,_)]]
val dataUrlsF = (for (urls <- PageJdbc.all())
  yield for (u <- urls)
    yield {
      val titleF = PageJdbc.byUrl(u).map(_.map(_.title).getOrElse("-"));
      val distsF = PageJdbc.history(u).map(dists);
      for (
        title <- titleF;
        dists <- distsF
      ) yield (u, title, dists)
    }).map(f => Future.sequence(f)).flatten

Though complicated looking this code is pure parallelism bliss as the controller method returns immediatly without blocking or waiting with a Future for Play to execute. Also byUrl and history get called in parallel as they do not depend on each other. Suppose all() returns a list of 10 urls the code will then create 20 Futures which are worked on in parallel (depending on the number of cores/hyperthreads and the threadpool used). The code does not create 2 Futures sequentially for one url after another waiting for each Future to succeed but creates Futures for all 10 urls in one swoop.

Expressed in dependencies

  all()
  |-> byUrl()-1
  |-> history()-1
  |-> byUrl()-2
  |-> history()-2
  |-> byUrl()-3
  |-> history()-3
  ...

instead of

  all() 
  |-> byUrl()-1
      |-> history()-1
          |-> byUrl()-2
              |-> history()-2
                  |-> byUrl()-3
                      |-> history()-3
                          ...

Blocking Threads

You can test this construct of nested fors by running

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.concurrent._

def range(i: Int): Future[Seq[Int]] = {
  Future {
    1.to(i)
  }
}

def w(i: Int): Future[Int] = {
  println(s"Create $i")
  val f = Future {
    println(s"Running $i")
    Thread.sleep(1000)
    i
  }
  f.onComplete(_ =>
    println(s"Finished $i")
  )
  f
}

def run() = {
  println(s"Number cores: ${Runtime.getRuntime.availableProcessors}")

  // Same construct of nested fors like in our other example
  val data = (for (seq <- range(5))
    yield for (i <- seq)
      yield {
        val i1F = w(i * 10);
        val i2F = w(i * 10 + 1)
        for (
          i1 <- i1F;
          i2 <- i2F
        ) yield (i1, i2)
      }).map(f => Future.sequence(f)).flatten

  Await.result(data, 20.second)
  println("All finished.")
}

run with

Example.run()

You will see something like

Number cores: 4
Create 10
Running 10    <-- Run 1.
Create 11
Running 11    <-- Run 2.
Create 20
Create 21
Create 30
Create 31
Create 40
Create 41
Create 50
Create 51
Running 20    <-- Run 3.
Running 21    <-- Run 4.
Finished 11   <-- Finish 1.
Finished 21   <-- Finish 2.
Finished 10   ...
Running 31
Running 40
Running 30
Running 41
Finished 31
Finished 40
Running 50
Finished 41
Running 51
Finished 20
Finished 30
Finished 51
Finished 50
All finished.

Why isn't the code running all 2x5 Futures at once as promised above? We use the global standard Scala ExecutionContext. The context limits the running threads to your number of cores. When we have blocking Futures e.g. because of IO, no more threads are executed. Fortunately we can tell Scala if our Future is blocking. When we change the code to

val f = Future {
  blocking {
    println(s"Running $i")
    Thread.sleep(1000)
    i
  }
}

we get the expected results:

Number cores: 4
Create 10
Create 11
...
Create 50
Create 51 
Running 21
Running 20
...
Running 40
Running 31
Finished 21
Finished 10
...
Finished 31
Finished 11
All finished.

Database access with Future

The byUrl method which returns a Page by its url looks like this:

def byUrl(url: String)(implicit db: Database, ec: ExecutionContext): Future[Option[Page]] = {
  new GetPageByUrl(url).executeWithRSF { rs: ResultSet =>
    Page(url = rs.getString(1), title = rs.getString(2), description = rs.getString(3))
  }.map(_.headOption)
}

In our usecase we use getOrElse for working with the Option to get the title. If empty results should be sorted out in our for chain, this gets more complicated. But there is some help working with Future[Option[...]] in A Little Guide on Using Futures for Web Developers.

The method executeWithRSF uses

Future {
   blocking { 
     ... database stuff ...
   }
}

for parallelism. This can be changed to use a native asynchronous database driver.

More than one Future depends on result of previous Future

Sometimes you have more than one Future that depends on a previous Future and we can generalize our case from above. If you just call Futures one after each other, as before the Futures are created sequentially and not executed in parallel.

val r = for (
   x <- Future.successful(1);
   a <- Future.successfull(1 + x);
   b <- Future.successfull(2 + x);
   c <- Future.successfull(3 + x);
   ) yield (a + b + c);

Here b is executed after a has finished, c is executed after b is finished.

With this code all three are executed at the same time:

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

val r = for ( 
   x <- Future.successful(1);
   a :: b :: c :: _ <- Future.sequence(
      Seq(
          Future.successful(1 + x), 
          Future.successful(2 + x),
          Future.successful(3 + x)
      ))
) yield (a + b + c);
Await.result(r, 5.second)

Asynchronous Write/Database IO after Form Validation

How do you use Action.async with Form Validation? This code uses async futures to receive a webpage for an url the user has entered. Then the title and description is inserted into a database where insert returns Future[Option[Int]] with the new primary key of the inserted page.

We need to use a monad transformer FutureO because otherwise we'll end up with Future[Option[Future[Option[Int]]]] which isn't flatable. FutureO[FutureO[Int]] is flatable to FutureO[Int] which then can be transformed to Future[Option[Int]]. The monad transformer takes Future[Option[...]] as a constructor argument.

// Helper for successful Future
def successF[S](block: => S): Future[S] = {
  Future.successful(block)
}

def addUrl = Action.async { implicit request: Request[AnyContent] =>

  val error = { formWithErrors: Form[Data] =>
    successF {
      BadRequest(views.html.index(formWithErrors, List.empty))
    }
  }

  val success = { data: Data =>
    // FutureO monad transformer
    // Otherwise we would end up 
    // with Future[Option[Future[Option[Int]]]]
    (for (page <- FutureO(Http.get(data.url));
        p = {
          val doc = Jsoup.parse(page)
          val title = doc.select("title").first().text()
          val description = doc.select("meta[name=description]").attr("content")
          Page(url = data.url, 
               title = title, 
               description = description)
        };
        key <- FutureO(PageJdbc.insert(p))
    ) yield key)
      // FutureO[Int] to Future[Option[Int]]
      .future
      // Future[Option[Int]]
      .map {
        case Some(key) => 
          Redirect(routes.HomeController.show(key))
        case _ => 
          Redirect(routes.HomeController.index())
    }
  }

  ExampleForm.form.bindFromRequest.fold(error, success)
}

Moving to asynchronous request handling code has sped up many of my projects. Getting into thinking with Futures is a steep learning curve at first but pays of. You can finde more about Futures in my A Little Guide on Using Futures for Web Developers.

Comparing Tagged Types and Design by Contract

I'm working on a new mainstream programming language that prevents more bugs than current mainstream languages. One idea it will incorporate are Tagged Types. I wrote about them recently in Real Life Bug Prevention in Scala. Tagged Types can be used to solve different problems. Often they are used as marker types. In the context of this blog posts they work as contracts on other types therefor I call them contract types (CT) from here on. An existing Scala library which implements those with lots of functionality is Refined - good talk about Refined.

Design by Contract for Preventing Bugs

Design by Contract (DbC) is a an idea to prevent bugs by specifying constraints on data and constraints on how data relates to other data. The idea of DbC has been floating around for decades, but hasn't made it into mainstream. The main reason I haven't used DbC is because validation happens at runtime instead of compile time.

The idea is to express pre-conditions, post-conditions and invariants for method calls and classes. Pre-conditions are checked before the execution of a method, the post-conditions are checked at the end of the method, invariants are checked before and after execution of the method.

The canonical DbC example is a Stack (Source)

/**
 *  @inv !isEmpty() implies top() != null   //  no null objects are allowed
 */
public interface Stack<T>
{
  /**
   *  @pre o != null
   *  @post !isEmpty()
   *  @post top() == o
   */
  void push(T o);
  /**
   *  @pre !isEmpty()
   *  @post @return == top()@pre
   */
  T pop();
  /**
   *  @pre !isEmpty()
   */
  T top();
  boolean isEmpty();
}

Contract Type Example

Lets start with a simpler example. We have a method that takes Integers and returns Integers.

def x(i:Int):Int = { ... }

The method has the additional constraint that the argument needs to be a positive integer. In some languages there are types for positive integers (or more likely non-negative including zero) but in Scala we use Int.

With a hypothetical Design by Contract framework (like iContract in Java) we could express the constraint with

// @pre i > 0
def x(i:Int):Int = { ... }

The DbC framework creates checks during compilation for our constraint which are executed at runtime. If the constraint is violated, an exception is thrown. This is equivalent to using require in Scala, but makes the contract explicit and part of the documented API. With require in Scala this would look like

def x(i:Int):Int = { 
  require(i > 0)
  ...
}

Can this be expressed with contract types? Yes, we supply an Int @@ Positive to the method instead of Int. This way the method can be sure that the argument is a positive Integer.

def x(i:Int @@ Positive):Int = { ... }

How does a developer get the required type to call this method? One way is to have a factory method to create the required type from Int with a contract type Positive tagged on. Developers supply an Int to positive(). If the argument is positive, the method returns Some[Int @@ Positive] otherwise it returns None. The result can then be supplied to our method.

def positive(x:Int):Option[Int @@ Positive] = {
  // Implemented with if instead of map
  if (x > 0) Some(tag[Positive](x)) else None
}

// tag 3 with @@ Positive and call x
positive(3).map(i => x(i))

This way the caller of a method needs to prove that data satisfies a certain constraint, in this case Positive. If the caller can't supply data that satisfies a constraint, he can't call the method. The responsibility to provide correct data lies with the caller, not the implementor of the method. This is a huge shift in development responsibility from

def x(i:Int):Int = { 
  require(i > 0)
  ...
}

to

def x(i:Int @@ Positive):Int = { ... }

although the change in code looks rather insignificant.

Needing a method for conversion looks cumbersome, but this conversion only happens at the interfacing between two modules or systems and so is rare.

Generation of contract typed values ties neatly into validation, for example with Scalactic

def positive_?(i: Int, errorMessage: String): 
  Int @@ Positive Or Every[ErrorMessage] = {
  if (x > 0) Good(tag[Positive](x)) 
  else Bad("Number needs to be positive.")
}

Positive Performance Side Effect

If the state of constraints is unclear, constraints are checked more than once. For example a system might take a String as input and then checks in many places if the String is empty. With

String @@ NonEmpty

the constraint is checked once and the contract type removes all constraint checks in other parts of the system. Sometimes even duplicated work is removed. Often different parts of a system need data in some form, for example lowercase Strings.

String @@ Lowercase

With constract types the String is converted once and not several times in different parts of the system. This removes work and increases performance.

Examining Contract Types

We should look at more examples of contract types

Int @@ Even
Int @@ Odd
Int @@ NotZero

Float @@ Percentage

List @@ NonEmpty
List @@ Sorted
String @@ NonEmpty

String @@ LowerCase
String @@ Trimmed
String @@ LargerThan3

Sometimes the difference between tagged types that add semantic meaning to data and contract types is blurry. I would consider

UUID @@ UserId

not a contract type though. It's more difficult with

// @pre money.vat == true
// Often expressed by subclassing
Money @@ VAT

// @pre money.netto == true
Money @@ Netto

// @pre money.brutto == true
Money @@ Brutto

and even more difficult with

String @@ SafeHtml
String @@ Html

While the first looks to be a contract type, the second looks more like a tagged type.

Benefits and Drawbacks

Contract Types

  • Correctness is checked at compile time, not runtime
  • Very few additional code needed
  • The burden is on the caller to prove an argument is correct

Design by Contract

  • Can fail at runtime
  • Design by contract is more powerful

Unit Testing

  • Very powerful
  • Additional (and more) code needed
  • Code seperate from main code
  • Run at compile time
  • Different testing mind set

The main benefit of contract types compared to DbC are that they are ensured at the call site instead of in the method. Also they are visible to the caller during development. More powerful systems for specifications exist, like TLA+. These also have the benefit to prove correctness at comile time but have the drawback of a much higher complexity. This is fine if you write a distributed database but overkill for web frontends to databases. I consider contract types something between regular types, DbC and dependent types like in Agda or Idris where it is possible for types to depend on data.

Conclusion

Contract types solve a special problem. Often types in mainstream languages do not express enough about the data they type, for example that a String is lowercase or a User is logged in. Interfaces in mainstream languages therefor have implicit requirements about the data they receive which easily leads to bugs. Contract types like Int @@ Positive express the requirement as a type. Although this can be solved by inheritance, interfaces or type classes, all of these concepts are harder to understand and read than contract types. There is more to be said in another blog post.

Real Life Bug Prevention in Scala

Using Scala for the last 5 years to run eventsofa I've became a better programmer creating less bugs. Most of the code is Java++ Scala compared to more canonical functional Haskell-like Scala. So I'm interested how Scala used in this way prevents bugs that I would make in Java, Python or Ruby.

There have been two things that made a huge impact, Options and tagged types.

Lets explore both concepts with a small example:

interface UserRepository {
  public User forId(id:String)
}

interface InvoiceRepository {
  public Invoice forAccount(accountNumber:String)
}

Option vs. Null

How does one signal that there is no invoice or there is no user? In Java this is often done with Null or throwing some kind of InvoiceNotFoundException. With Nulls for error handling (Some Thoughts on Error Handling) code looks like this

User user = users.forId(userId)
if (user == null) {
   ....
} else {
  Invoice invoice = invoices.forAccount(user.accountNumber)
  if (invoice == null) {
     ....
  } else {
    ...
  }
}

Code with exceptions would look similar. This code is noisy and because developers are not forced to check for Null, sometimes developers do not test for error conditions signaled with Null. The result are hard to find NullPointerExceptions (I also blame you Java, for not printing more information in NPEs, take a look at Elm!)

In Scala one would use Option type to signal that there is no user, with returning Some(User) if a user was found and None if there wasn't.

trait UserRepository {
  def forId(id:String):Option[User]
}

trait InvoiceRepository {
  def forAccount(accountNumber:String):Option[Invoice]
}

Code using Option does not need to check for the result type of the method call and split the code path depending on success and error as with Null. One code path is used for both the error and the success case. Error handling can be deferred to a later point and errors can be accumulated.

 val invoice = users.forId(userId)
               .flatMap(
                 u => invoices.forAccount(u.accountNumber)
               )

or

 val invoice = users.forId(userId)
               .map(_.accountNumber)
               .flatMap(invoices.forAccount)

For cleaner syntax Scala provide for sugaring which expands to the code above, but is easier to read

val invoice = for ( user <- users.forId(userId);
      invoice <- invoices.forAccount(user.accountNumber)
    ) yield {
      ...
}

In Scala it's easy to compose Options compared to composing Nulls - which don't compose. Also the code is easier to read with less noise, especially the for version is easy to understand even with more complex dependencies. Code that is easy to read and easy to understand results in less misunderstandings and less bugs. This way error handling is improved as developers do write more and better error handling code. From my experience with Scala the usage of Option instead of Null or Exceptions leads to a lot less lazyness bugs.

Tagged Types

The usage of String for accountNumber and userId is problematic.

trait UserRepository {
  def forId(id:String):Option[User]
}

For developers it's hard to understand how these Strings look like or how to get or create a correct one. There might be different userIds in different String formats and it is easy to plug the wrong userId formatted String into a method. The same object might have different names in different method signatures like id, userId, or user. A little bit more nuanced the problems shines in

def substractPercentage(percentage:Float)

If you find this code, your first thought is "What is percentage?". To represent 11.5% the Float value could be 11.5 or 0.115. This has biten me in the past when I've assumed 10% percent for a coupon was represented by 0.10. As a result millions of users got mails with coupons with a value 0.10% because the developer of that method represented 10% with 10.0.

Where this also rears its head is with money.

 def setAmount(moneyAmount:Double)

First Once and for all: Do not use double for money, second is this a Netto or Brutto amount? A bug I've seen several times is when variables representing netto amounts are plugged into methods expecting brutto amounts.

The classic approach would be to create abstract data types (ADTs), classes, case classes or data types classes (Value Objects).

def setAmount(moneyAmount:BruttoMoney)

def substractPercentage(amount:Percentage)

The downside with this is (Never, never, never use String in Java (or at least less often 🙂) and the reason developers don't use this often enough: developers need to write more code plus Percentage is harder to understand, to reuse and to calculated with than Float. I can plug in Float into more methods than Percentage.

In Scala I'm using Tagged Types instead of case classes her

def findUser(userId:String @@ UserId)

def setAmount(moneyAmount:Money @@ Brutto)

def substractPercentage(amount: Float @@ Percentage)

These have several benefits. It's easy to see that Percentage is a Float, creation can be documented (10% represented by 0.10) and controlled plus the API is easier to read. All of this leads to less misunderstandings and from my experience therefor to less bugs.

Other examples are

def bonusForUser(user:User @@ Authenticated)

def read(server:Server @@ Connected) 

Tagged types have made my code much better than before.

How about other Scala features to prevent bugs? As I am writing server side, request isolated code, immutable data structures didn't help to prevent concurrency bugs in the last years. My only concurrency is fire and forget usage of actors for logging, database side effects, sending mails etc. Limited concurrency is used to get data from different data sources, neither of these use cases share data though.

Sealed traits have helped with checking if switch statements are exhaustive, especially in larger code bases this helps when adding new features. Case classes prevented hashcode bugs as they generate correct hashcode methods by themselves.

All in all my Scala code has a reduced level of bugs due to some Scala features compared to my code in Java, Ruby or Python. Any new language I am using would need to support both Option and tagged types.