Reactive Web Request Batching with Scala and Play Framework

At first glance it seems silly to do batching in the reactive world. When I first started with reactive programming I thought I wouldn’t have to worry about things like resource starvation. After all, the reactive magic bullet was *magical*! But my magic bullet fizzled when it hit downstream resource constraints causing me to need batching.

With a reactive web client library like Play Framework’s, I can easily spin up tens of thousands of web requests, in parallel, using very little resources. But what if that saturates the server I’m making requests to? In an ideal world I could get backpressure but most web endpoints don’t provide a way to do that. So we just have to be nicer to the server and batch the requests. (Sidenote: How do you know how nice you should be to the service, e.g. batch size?)

So in some scenarios you might just need to batch a bunch of web requests. Let’s say you need to make 100 requests and you’ve figured out that you should only do 10 at a time. Then once all 100 have completed you need to do some aggregation of the results. Of course you could block until each batch is complete and mutate a shared value that accumulates the results. But we can do better than that! You can still use immutable data and non-blocking requests with batching.

In Play I have a /echo?num=1 request handler that just returns the number passed into it:

def echo(num: Int) = Action {
  Logger.info(s"num=$num")
  Ok(num.toString)
}

Now in another async controller I want to send 10,000 requests to the echo controller but batched 256 at a time, then return the aggregated results. First I need a function that will handle the batching:

private def processBatch(results: Future[Seq[String]], batch: Seq[String]): Future[Seq[String]] = {
  // when the results for the previous batches have been completed, start a new batch
  results.flatMap { responses =>
    // create the web requests for this batch
    val batchFutures: Seq[Future[String]] = batch.map(ws.url(_).get().map(_.body))

    // sequence the futures for this batch into a singe future
    val batchFuture: Future[Seq[String]] = Future.sequence(batchFutures)

    // when this batch is complete, append the responses to the existing responses
    batchFuture.map { batchResponses =>
      Logger.info("Finished a batch")
      responses ++ batchResponses
    }
  }
}

This processBatch function takes a Future that holds the previously accumulated results which are a sequence of Strings. It also takes a batch of urls and returns a Future that holds the previously accumulated results and the results from the batch. When the results that were passed in have all been completed, the batch is processed by creating a web request for each URL. Each request is transformed to just the String from the response body. All of the requests in the batch are then transformed into a single Future that completes when all of the requests in the batch have been completed. Once that future completes the results of the batch are combined with the previously completed results.

This could also be written using Scala’s fancy for comprehension:

for {
  responses <- results
  batchFuture = Future.sequence(batch.map(ws.url(_).get().map(_.body)))
  batchResponses <- batchFuture
} yield {
  Logger.info("Finished a batch")
  responses ++ batchResponses
}

Here is an async controller that uses the processBatch function:

def index = Action.async { implicit request =>

  // ints 1 to 10000
  val numbers: Seq[Int] = 1 to 10000

  // turn each number into a url
  val urls: Seq[String] = numbers.map(routes.Application.echo(_).absoluteURL())

  // split into groups of 256
  val batches: Iterator[Seq[String]] = urls.grouped(256)

  // futures for all of the response strings
  val futureResponses = batches.foldLeft(Future.successful(Seq.empty[String]))(processBatch)

  // render the list of responses when they are all complete
  futureResponses.map { responses =>
    Ok(responses.toString)
  }

}

A sequence of 10,000 URLs /echo?num=1 to /echo?num=10000 are assembled. That sequence is then partitioned into groups of 256. Then the reactive batching magic… Take the batches and do a foldLeft starting with an empty sequence of String, calling the processBatch function for each batch, accumulating the results. Once the future returned from the foldLeft completes the results are turned into a String and returned in an the Ok response.

There you have reactive batching of web requests! Check out the full source.