Reactive Web Request Batching with Scala and Play Framework
At first glance it seems silly to do batching in the reactive world. When I first started with reactive programming I thought I wouldn’t have to worry about things like resource starvation. After all, the reactive magic bullet was *magical*! But my magic bullet fizzled when it hit downstream resource constraints causing me to need batching.
With a reactive web client library like Play Framework’s, I can easily spin up tens of thousands of web requests, in parallel, using very little resources. But what if that saturates the server I’m making requests to? In an ideal world I could get backpressure but most web endpoints don’t provide a way to do that. So we just have to be nicer to the server and batch the requests. (Sidenote: How do you know how nice you should be to the service, e.g. batch size?)
So in some scenarios you might just need to batch a bunch of web requests. Let’s say you need to make 100 requests and you’ve figured out that you should only do 10 at a time. Then once all 100 have completed you need to do some aggregation of the results. Of course you could block until each batch is complete and mutate a shared value that accumulates the results. But we can do better than that! You can still use immutable data and non-blocking requests with batching.
In Play I have a /echo?num=1
request handler that just returns the number passed into it:
def echo(num: Int) = Action {
Logger.info(s"num=$num")
Ok(num.toString)
}
Now in another async controller I want to send 10,000 requests to the echo
controller but batched 256 at a time, then return the aggregated results. First I need a function that will handle the batching:
private def processBatch(results: Future[Seq[String]], batch: Seq[String]): Future[Seq[String]] = {
// when the results for the previous batches have been completed, start a new batch
results.flatMap { responses =>
// create the web requests for this batch
val batchFutures: Seq[Future[String]] = batch.map(ws.url(_).get().map(_.body))
// sequence the futures for this batch into a singe future
val batchFuture: Future[Seq[String]] = Future.sequence(batchFutures)
// when this batch is complete, append the responses to the existing responses
batchFuture.map { batchResponses =>
Logger.info("Finished a batch")
responses ++ batchResponses
}
}
}
This processBatch
function takes a Future
that holds the previously accumulated results which are a sequence of Strings. It also takes a batch of urls and returns a Future
that holds the previously accumulated results and the results from the batch. When the results
that were passed in have all been completed, the batch is processed by creating a web request for each URL. Each request is transformed to just the String
from the response body. All of the requests in the batch are then transformed into a single Future
that completes when all of the requests in the batch have been completed. Once that future completes the results of the batch are combined with the previously completed results.
This could also be written using Scala’s fancy for comprehension:
for {
responses <- results
batchFuture = Future.sequence(batch.map(ws.url(_).get().map(_.body)))
batchResponses <- batchFuture
} yield {
Logger.info("Finished a batch")
responses ++ batchResponses
}
Here is an async controller that uses the processBatch
function:
def index = Action.async { implicit request =>
// ints 1 to 10000
val numbers: Seq[Int] = 1 to 10000
// turn each number into a url
val urls: Seq[String] = numbers.map(routes.Application.echo(_).absoluteURL())
// split into groups of 256
val batches: Iterator[Seq[String]] = urls.grouped(256)
// futures for all of the response strings
val futureResponses = batches.foldLeft(Future.successful(Seq.empty[String]))(processBatch)
// render the list of responses when they are all complete
futureResponses.map { responses =>
Ok(responses.toString)
}
}
A sequence of 10,000 URLs /echo?num=1
to /echo?num=10000
are assembled. That sequence is then partitioned into groups of 256. Then the reactive batching magic… Take the batches and do a foldLeft
starting with an empty sequence of String, calling the processBatch
function for each batch, accumulating the results. Once the future returned from the foldLeft
completes the results are turned into a String and returned in an the Ok
response.
There you have reactive batching of web requests! Check out the full source.