Advanced Futures With Play Framework

Some time ago I wrote A Little Guide on Using Futures for Web Developers to explain how to use Futures in web development to speed up page delivery with parallelism.

Futures in Play

Lately I've played a little with the Play Framework using its async actions so I want to add to this guide. Play controllers can return async actions in the form of Future[Action[...]] which then are executed.

Asynchronicity is not for free. It creates more complicated code and is harder to detect. Why would we use it then? Mainly to speed up requests by parallelizing code to several cores, do other work during IO waits or parallelize waits.

For an example we have three IO calls and two html fragments renderings. Together our request takes 80ms.

 io1()                       - 20 ms
 |-> io2()                   - 20 ms
     |-> io3()               - 20 ms
         |-> render1()       - 10 ms
             |-> render2()   - 10 ms
                             = 80 ms

When we asynchronize IO and parallelize rendering to more cores we reduce request time to just 50 ms. This is nearly a 50% speed up. And the more database IO your code has and the more you can parallelize rendering parts of a page and the more cores you have, the higher the speedup.

 io1()                       - 20 ms
     |-> io2()               - 20 ms
     |-> io3()               - 20 ms
         |-> render1() core1 - 10 ms
         |-> render2() core2 - 10 ms
                             = 50 ms

There are some pitfalls though. If your CPU is highly congested, rendering time may have outliers and take longer, because in the example above render2() might be blocked on the second core. So your request waits for the task on the second core to finish. If you use 16 cores then only one needs to be congested and your request is hold up (depending on your threadpool / executor implementation). Async code might also take up more memory, because threads are blocked waiting for IO. This is the case if you do not use real async IO (database) drivers and most "async" drivers are not async to the core but use threads themselves to achieve asynchronicity.

Using asynchronous code also takes a little time and thinking, because you can easily by mistake add some blocking part and all your nice parallelism goes out the window.

One Play action in one of my side projects uses Futures for asynchronous code and looks like this:

// PageJdbc.all() -> 
//    get urls of all pages, returns Future[Seq[String]]
// PageJdbc.byUrl() -> 
//    get page for url, return Future[Option[Page]]
// PageJdbc.history() -> 
//    get all titles of time of the page by url, returns Future[Seq[Double]]

def index = Action.async { implicit request: Request[AnyContent] =>
  val dataUrlsF = PageJdbc.all()
    // Map outer Future
    .map(
    // Iterate over the inner Seq
    _.map(u => {
    // call methods first, they would block in "for"
    val titleF = PageJdbc.byUrl(u).map(_.map(_.title).getOrElse("-"))
    val distsF = PageJdbc.history(u).map(dists)
    // Results in Seq[Future[(a,b,c)]]
    for (
      // Future.successful so result type is 
      // Future[(_,_,_)]
      url <- Future.successful(u);
      title <- titleF;
      dists <- distsF
    ) yield (url, title, dists)
  }))
  // results in Future[Seq[Future[(_,_,_)]]]
  // Convert F[S[F[...]]] to F[F[S[..]]] then flat to F[S[...]]
  .map(f => Future.sequence(f)).flatten

  // Returns F[Result[...]]
  for (dataUrls <- dataUrlsF) yield {
    Ok(views.html.index(ExampleForm.form, dataUrls))
  }
}

The code has three database operations with each returning a Future[..]. First all() returns a list of all urls of type Future[Seq[String]]. The double map calls get into the list and iterates over every item. The calls to byUrl and history return their own Futures of type Future[Option[Page]] and Future[Seq[String]].

Our code looks more complicated than a simple for comprehension but with a simple for comprehension there are some pitfalls e.g.

 // Does not work (!)
 val dataUrlsF = for (
     // Does not result in a result type of Future[Seq[...]] 
     urls <- PageJdbc.all();
     u <- urls;
     // called when all() finished, ok.
     title <- PageJdbc.byUrl(u).map(_.map(_.title).getOrElse("-"));
     // called when byUrl() finished, not all() #FAIL
     dists <- PageJdbc.history(u).map(dists); 
) yield (u, title, dists) 

The code can be written with nested fors though if we translate our working example from before from nested maps to nested fors. You can recognize the Future.sequence and flatten calls from above.

// returns Future[Seq[(_,_,_)]]
val dataUrlsF = (for (urls <- PageJdbc.all())
  yield for (u <- urls)
    yield {
      val titleF = PageJdbc.byUrl(u).map(_.map(_.title).getOrElse("-"));
      val distsF = PageJdbc.history(u).map(dists);
      for (
        title <- titleF;
        dists <- distsF
      ) yield (u, title, dists)
    }).map(f => Future.sequence(f)).flatten

Though complicated looking this code is pure parallelism bliss as the controller method returns immediatly without blocking or waiting with a Future for Play to execute. Also byUrl and history get called in parallel as they do not depend on each other. Suppose all() returns a list of 10 urls the code will then create 20 Futures which are worked on in parallel (depending on the number of cores/hyperthreads and the threadpool used). The code does not create 2 Futures sequentially for one url after another waiting for each Future to succeed but creates Futures for all 10 urls in one swoop.

Expressed in dependencies

  all()
  |-> byUrl()-1
  |-> history()-1
  |-> byUrl()-2
  |-> history()-2
  |-> byUrl()-3
  |-> history()-3
  ...

instead of

  all() 
  |-> byUrl()-1
      |-> history()-1
          |-> byUrl()-2
              |-> history()-2
                  |-> byUrl()-3
                      |-> history()-3
                          ...

Blocking Threads

You can test this construct of nested fors by running

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.concurrent._

def range(i: Int): Future[Seq[Int]] = {
  Future {
    1.to(i)
  }
}

def w(i: Int): Future[Int] = {
  println(s"Create $i")
  val f = Future {
    println(s"Running $i")
    Thread.sleep(1000)
    i
  }
  f.onComplete(_ =>
    println(s"Finished $i")
  )
  f
}

def run() = {
  println(s"Number cores: ${Runtime.getRuntime.availableProcessors}")

  // Same construct of nested fors like in our other example
  val data = (for (seq <- range(5))
    yield for (i <- seq)
      yield {
        val i1F = w(i * 10);
        val i2F = w(i * 10 + 1)
        for (
          i1 <- i1F;
          i2 <- i2F
        ) yield (i1, i2)
      }).map(f => Future.sequence(f)).flatten

  Await.result(data, 20.second)
  println("All finished.")
}

run with

Example.run()

You will see something like

Number cores: 4
Create 10
Running 10    <-- Run 1.
Create 11
Running 11    <-- Run 2.
Create 20
Create 21
Create 30
Create 31
Create 40
Create 41
Create 50
Create 51
Running 20    <-- Run 3.
Running 21    <-- Run 4.
Finished 11   <-- Finish 1.
Finished 21   <-- Finish 2.
Finished 10   ...
Running 31
Running 40
Running 30
Running 41
Finished 31
Finished 40
Running 50
Finished 41
Running 51
Finished 20
Finished 30
Finished 51
Finished 50
All finished.

Why isn't the code running all 2x5 Futures at once as promised above? We use the global standard Scala ExecutionContext. The context limits the running threads to your number of cores. When we have blocking Futures e.g. because of IO, no more threads are executed. Fortunately we can tell Scala if our Future is blocking. When we change the code to

val f = Future {
  blocking {
    println(s"Running $i")
    Thread.sleep(1000)
    i
  }
}

we get the expected results:

Number cores: 4
Create 10
Create 11
...
Create 50
Create 51 
Running 21
Running 20
...
Running 40
Running 31
Finished 21
Finished 10
...
Finished 31
Finished 11
All finished.

Database access with Future

The byUrl method which returns a Page by its url looks like this:

def byUrl(url: String)(implicit db: Database, ec: ExecutionContext): Future[Option[Page]] = {
  new GetPageByUrl(url).executeWithRSF { rs: ResultSet =>
    Page(url = rs.getString(1), title = rs.getString(2), description = rs.getString(3))
  }.map(_.headOption)
}

In our usecase we use getOrElse for working with the Option to get the title. If empty results should be sorted out in our for chain, this gets more complicated. But there is some help working with Future[Option[...]] in A Little Guide on Using Futures for Web Developers.

The method executeWithRSF uses

Future {
   blocking { 
     ... database stuff ...
   }
}

for parallelism. This can be changed to use a native asynchronous database driver.

More than one Future depends on result of previous Future

Sometimes you have more than one Future that depends on a previous Future and we can generalize our case from above. If you just call Futures one after each other, as before the Futures are created sequentially and not executed in parallel.

val r = for (
   x <- Future.successful(1);
   a <- Future.successfull(1 + x);
   b <- Future.successfull(2 + x);
   c <- Future.successfull(3 + x);
   ) yield (a + b + c);

Here b is executed after a has finished, c is executed after b is finished.

With this code all three are executed at the same time:

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

val r = for ( 
   x <- Future.successful(1);
   a :: b :: c :: _ <- Future.sequence(
      Seq(
          Future.successful(1 + x), 
          Future.successful(2 + x),
          Future.successful(3 + x)
      ))
) yield (a + b + c);
Await.result(r, 5.second)

Asynchronous Write/Database IO after Form Validation

How do you use Action.async with Form Validation? This code uses async futures to receive a webpage for an url the user has entered. Then the title and description is inserted into a database where insert returns Future[Option[Int]] with the new primary key of the inserted page.

We need to use a monad transformer FutureO because otherwise we'll end up with Future[Option[Future[Option[Int]]]] which isn't flatable. FutureO[FutureO[Int]] is flatable to FutureO[Int] which then can be transformed to Future[Option[Int]]. The monad transformer takes Future[Option[...]] as a constructor argument.

// Helper for successful Future
def successF[S](block: => S): Future[S] = {
  Future.successful(block)
}

def addUrl = Action.async { implicit request: Request[AnyContent] =>

  val error = { formWithErrors: Form[Data] =>
    successF {
      BadRequest(views.html.index(formWithErrors, List.empty))
    }
  }

  val success = { data: Data =>
    // FutureO monad transformer
    // Otherwise we would end up 
    // with Future[Option[Future[Option[Int]]]]
    (for (page <- FutureO(Http.get(data.url));
        p = {
          val doc = Jsoup.parse(page)
          val title = doc.select("title").first().text()
          val description = doc.select("meta[name=description]").attr("content")
          Page(url = data.url, 
               title = title, 
               description = description)
        };
        key <- FutureO(PageJdbc.insert(p))
    ) yield key)
      // FutureO[Int] to Future[Option[Int]]
      .future
      // Future[Option[Int]]
      .map {
        case Some(key) => 
          Redirect(routes.HomeController.show(key))
        case _ => 
          Redirect(routes.HomeController.index())
    }
  }

  ExampleForm.form.bindFromRequest.fold(error, success)
}

Moving to asynchronous request handling code has sped up many of my projects. Getting into thinking with Futures is a steep learning curve at first but pays of. You can finde more about Futures in my A Little Guide on Using Futures for Web Developers.