Combining Reactive Streams, Heroku Kafka, and Play Framework
Heroku recently announced early access to the new Heroku Kafka service and while I’ve heard great things about Apache Kafka I hadn’t played with it because I’m too lazy to set that kind of stuff up on my own. Now that I can setup a Kafka cluster just by provisioning a Heroku Addon I figured it was time to give it a try.
If you aren’t familiar with Kafka it is kinda a next generation messaging system. It uses pub-sub, scales horizontally, and has built-in message durability and delivery guarantees. Originally Kafka was built at LinkedIn but is now being used by pretty much every progressive enterprise that needs to move massive amounts of data through transformation pipelines.
While learning Kafka I wanted to build something really simple: an event producer that just sends random numbers to a Kafka topic and a event consumer that receives those random numbers and sends them to a browser via a WebSocket. I decided to use Play Framework and the Akka Streams implementation of Reactive Streams.
In Reactive Streams there is the pretty standard “Source” and “Sink” where an event producer is a Source and a consumer is a Sink. A “Flow” is a pairing between a Source and a Sink with an optional transformation. In my example there are two apps, each with a Flow. A worker process will send random numbers to Kafka so its Source will be periodically generated random numbers and its Sink will be Kafka. In the web process the Source is Kafka and the Sink is a WebSocket that will push the random numbers to the browser.
Here is the worker app with some necessary config omitted (check out the full source):
object RandomNumbers extends App {
val tickSource = Source.tick(Duration.Zero, 500.milliseconds, Unit).map(_ => Random.nextInt().toString)
kafka.sink("RandomNumbers").map { kafkaSink =>
tickSource
.map(new ProducerRecord[String, String]("RandomNumbers", _))
.to(kafkaSink)
.run()(app.materializer)
}
}
The tickSource
is a Source that generates a new random Int every 500 milliseconds. That Source is connected to a Kafka Sink with a Flow that transforms an Int into a ProducerRecord (for Kafka). This uses the Reactive Kafka library which is a Reactive Streams API for working with Kafka.
On the web app side, Play Framework has builtin support for using Reactive Streams with WebSockets so all we need is a controller method that creates a Source from a Kafka topic and hooks that to a WebSocket Flow (full source):
def ws = WebSocket.acceptOrResult[Any, String] { _ =>
kafka.source(Set("RandomNumbers")) match {
case Failure(e) =>
Future.successful(Left(InternalServerError("Could not connect to Kafka")))
case Success(source) =>
val flow = Flow.fromSinkAndSource(Sink.ignore, source.map(_.value))
Future.successful(Right(flow))
}
}
Notice that the Flow has a Sink.ignore
which just says to ignore any incoming messages on the WebSocket (those sent from the browser). Play takes care of all the underlying stuff and then whenever Kafka gets a message on the “RandomNumbers” topic, it will be sent out via the WebSocket.
And it all works!
Check out the full source for instructions on how to get this example setup on your machine and on Heroku. Let me know how it goes!