Category Archives: Scala

Getting Started With Spark 2.x Streaming and Kafka

I’ve been digging into spark more and more lately and I had some trouble finding up to date tutorials on getting started with Kafka and Spark Streaming (especially for 2.x and for kafka 0.10). Especially if you want to run your own code easily.While running streaming jobs with spark-shell is not really recommended I find it very convenient to get started as you don’t even need to compile the code.

Up and running with Kafka

First things first, you need a kafka producer running. You can find the official quickstart guide here:, but for the sake of simplicity I will repeat the relevant parts here.

1. Get the kafka distribution:

2. Run zookeeper

  • $ [kafka_home]/bin/ config/

3. Run kafka server

  • $ [kafka_home]/bin/ config/

4. Create topic

  • $ [kafka_home]/bin/ --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

5. Run kafka producer

  • $ [kafka_home]/bin/ --broker-list localhost:9092 --topic test

You can skip creating the consumer.

Setup Spark 2.x

First we need to download and setup spark.

1. Get spark

2. Verify Spark works

  • You can verify that spark-shell works by launching it [spark_home]/bin/spark-shell
  • Use CTRL + C to quit

Create Spark Streaming Application

Let’s create a new folder for our streaming applications. Lets call it “kafka-spark-stream-app”.
So now the folder structure should look something like:

Let’s create a file for the word count streaming example, use any text editor to create a file called /kafka-spark-stream-app/kafkaSparkStream.scala

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

// Create the context with a 1 second batch size
val ssc = new StreamingContext(sc, Seconds(1))

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "" -> "use_a_separate_group_id_for_each_stream"

val topics = Array("kafka_metadata_example")

val stream = KafkaUtils.createDirectStream[String, String](
  Subscribe[String, String](topics, kafkaParams)

val lines =
val words = lines.flatMap(_.split(" "))
val wordCounts = => (x, 1L)).reduceByKey(_ + _)

To run this, we need to still add few jars to the classpath so let’s create a subfolder to the /kafka-spark-stream-app/jars and add two jars there:

Both can be find for example on now the folder structure should look like this:

*In case you use different version of Spark, make sure you have the corresponding version of the spark-streaming-kafka -library as well.

Running the spark streaming script

Now all we have is left is to run the script assuming you have the kafka running as setup in the beginning of this article.

$ [spark_home]/bin/spark-shell --jars ../kafka-spark-stream-app/jars/spark-streaming-kafka-0-10_2.11-2.1.1.jar,../kafka-spark-stream-app/jars/kafka-clients- -i ../kafka-spark-stream-app/kafkaSparkStream.scala

Basically we just tell spark-shell that the script will require those two jars to run and where they are and with the switch -i we tell spark-shell to run the script from the given file. Now post some text from the kafka console producer and see the spark streaming application printing out the word counts of the given phrase.


Of course it does not make sense to run any real spark streaming application like this, but it’s very convenient to be able to run scripts without having to set up a proper Scala project. I will follow up on how to wrap to application in a proper sbt project soon.

Why Scala?


Recently we started a new project and I’m happy to say we had quite a lot of freedom to choose the tech stack we wanted to implemented it.

Technically the project did not seem too difficult, basically just aggregating data from few different web service api:s. So it looked like a good chance to take a small chance and try something new. We decided to go with Scala and Play Framework and here is why:

Reactive Model

I had previously done some smaller project using node.js and event driven programming. I definitely think that “reactive” is the way to go and makes sense to learn to do it properly. The thing I was missing was a proper type system which leads to…

Static Typing

Scala’s type system is extremely powerful and type inference allows some compact and concise code. Read more here:


After (too ) many years of mostly Java development it was definitely time for something more powerful. Scala is pretty much as functional as a language can be. Some purist may argue that it’s not purely functional like Haskell, but in the real world situations Scala is as functional as they come.

Specifically Pattern Matching deserves to be mentioned as one of my favourite features.If you manage to specify most of your data model in case classes, life gets a lot easier.

Cake Pattern

Cake pattern is seems to be the go-to way of wiring Scala apps together. It basically allows you to do modular design and dependency injection without using any library. It does include a bit of boilerplate, but I think that the advantages of using statically typed, compile time checked dependency injection is better than using any separate library even with the price of a little bit of boilerplate. Read more about cake pattern here:

Java Interoperability

If Java has one strength, it’s the plenitude of well tested libraries. Using Java lib’s is trivially easy from Scala.


Scala recently turned 10 years old and the language is definitely mature enough. It still evolves, but latest stable releases are worthy of their name and stable.

Play Framework has reach version 2.3.7 and accompanying Activator makes starting projects very easy. Activator has pretty decent template mechanism and you got bunch of templates to choose from when you start a new project.

Sbt the Scala Build Tool has evolved like Scala. It’s regularly updated and has a working plugin system. It comes with a nice REPL. It might not look fancy, but it get’s the job done.

When it comes to IDE:s you got basically two fine choices: Eclipse based Scala IDE and IntelliJ. I personally found IntelliJ:s scala & play plugin to work better and eventually settled on that. The only downside is that play plugin requires the registered (paid) version.


Scala compiles to java bytecode so the performance is just as good. Static typing allows the compiler to optimize better. Just have a quick look at these benchmarks:
I chose to compare Scala to another modern jvm language with dynamic typing. Of course this is just a little sample, but Scala is across the board faster. It good to keep in mind that usually performance should be one of the last criterias when selecting the language, but it’s nice to know that when push comes to shove, Scala will deliver. There is a reason why internet giants like Twitter and LinkedIn chose Scala.


There you have, our reasoning for choosing Scala. After about 4 months into the project it still looks like a good choice. Don’t get me wrong, it has not been a walk in the park and we’ve had some difficulties and problems, but that’s the topic of an upcoming post.

JSON values to typed Id:s in Scala & Play

I recently published a post about how to deal with JSON objects using Scala’s case classes and Play Framework. To keep up with the theme here is another post about the same topic, but this time it’s specifically about types.

JSON format does have types, but as they are not visible in the actual content and how they are mapped to data types at the receiving end depends a lot on developers first look at the incoming data. Majority of content seems to be in String format and that seems like a safe choice, after all you can fairly safely represent a number as a String as long as you don’t process the data in any meaningful form. You would not be so lucky doing it the other way around.

This logic seems pretty valid especially if you are not certain about the format of the data. Maybe the value just happened to be a number this time, but it might include a letter next time and then using anything but String would result in a runtime exception and we definitely don’t want that. Going all-in with String does have some unfortunate side effects and some problems might be creeping into your code.

def doIt(someId: String, someOtherId: String, foo: String, bar: String)

It’s really easy to mix up the parameter order when you have methods like his and what’s the point of having static typing if you deal mostly with strings anyway. In the worst case it will “kind of” work, but the results are wrong. Moreover code like this is a pain to refactor.

So instead of a mess like this wouldn’t it be nice to deal with properly typed values instead? (From here on I concentrate more on id -values, if you need to pass on many values you probably have other design flaws as well)

def doIt(someId: SomeId, someOtherId: SomeOtherId, foo: String, bar: String)

Now we have also regained the ability to trust the developer’s best friend – the compiler. Of course there is nothing new or fancy about wrapping values to classes, but what turned out to be tricky was to maintain the handy JSON-parsing that comes with Play Framework and case classes without too much boilerplate. So, how do we actually achieve this?

First solution: Implicit conversion to typed Id -classes

Well first of all we implemented a proper base trait representing any typed id case class which we will declare later on.

trait BaseId[V] {  val value: V  }

Based on that basic trait we can then implement another trait for every value type we want to support.

trait StringBaseId extends BaseId[String]
trait NumberBaseId extends BaseId[BigDecimal]

What we now need is an implicit conversion of the JavaScript’s primitive type to an instance of a typed id implementation. We do this using an implicit class for every primitive type we support. For example String -based ids we implement like this:

implicit class StringTypedIdFormat[I <: BaseId[String]](factory: Factory[String, I]) 
    extends Format[I] {
  def reads(json: JsValue): JsResult[I] = json match {
    case JsString(value) => JsSuccess(factory(value))
    case _ => JsError(s"Unexpected JSON value $json")
  def writes(id: I): JsValue = JsString(id.value)

The provided factory will be used to instantiate a concrete implementation based on a String. The type factory is declared as follows:

type Factory[V, I <: BaseId[V]] = V => I

In the next step we declare concrete id -classes for each id type we need. Staying consistent to our requested method signature of the previous example we write:

case class SomeId(value: String) extends StringBaseId
case class SomeOtherId(value: BigDecimal) extends NumberBaseId

Both case classes extend from our base corresponding base traits. The case class representing the JSON’s would look like this:

case class SomeObject(id:SomeId, name:String)
case class SomeOtherObject(id:SomeOtherId, name:String, value:Number)

To get an implicit conversion between JSON and those two case classes we must provide implicit read and write -functions or JsonCombinator formats like (

implicit val someObjectFormat: Format[SomeObject] = 
implicit val someOtherObjectFormat: Format[SomeOtherObject] = 

This format wouldn’t yet work because we still need an implicit conversion between JSON and the typed id case classes. We can finally provide those format using the helper classes previously declare. For the two ids we declare:

implicit val someIdFormat: Format[SomeId] = 
  new StringTypedIdFormat[SomeId](SomeId.apply _)
implicit val someOtherIdFormat: Format[SomeOtherId] = 
  new NumberTypedIdFormat[SomeOtherId](SomeOtherId.apply _)

The default apply method of the id case classes can be used directly as the declared factory method to generate the concrete instance of the case class. We can then implicitly convert between JSON primitive type id values and our typed id case classes in Scala.

A simple test to demonstrate the conversion:

val someObjectAsJson: JsValue = Json.parse("""
    "name": "someName"

"Parsing generic id object" should {
   "SomeId will be parsed correctly" in {
     val test =[SomeObject] === SomeId("111", “someName”)

We can further simplify the id format declaration by adding an additional method to the Scala’s Json object:

object TypedId {

  //implicit convertion to extended json object
  implicit def fromJson(json: Json.type) = TypedId

  //extended format function
  def idformat[I <: StringBaseId](fact: Factory[String, I]) = 
    new StringTypedIdFormat[I](fact)
  def idformat[I <: NumberBaseId](fact: Factory[BigDecimal, I]) = 
    new NumberTypedIdFormat[I](fact)

Now we can replace the format declaration with the following simpler version:

implicit val someIdFormat: Format[SomeId] = 
  Json.idformat[SomeId](SomeId.apply _)
implicit val someOtherIdFormat: Format[SomeOtherId] = 
  Json.idformat[SomeOtherId](SomeOtherId.apply _)

We still need to declare the factory method because we can’t instantiate a typed class at runtime.

We are not completely happy with this solution because with the current solution we would have to declare a concrete case class as well as an implicit format for every id class we create. We are looking for a more generic way to declare such id’s and it could look something like this:

def doIt(someId: StringId[SomeObject], someOtherId: NumberId[SomeOtherObject], foo: String, bar: String)

Bye the way:
All the base implementations can be found on GitHub.

Mike Toggweiler, a partner @ Tegonal co-authored this post.

Parsing json with over 22 fields with case classes

We recently started to develop a new product using play framework in the back end and angular js for the client side. The back end is rather light and mainly consist of aggregating data from different web services. We do have a local database and for that we chose MongoDB. So what we have is many sources of data and pretty much all of them provide data in json format. Needless to say json parsing has to work flawlessly.

For the most part that worked great. With play you can create a case class with all the same fields as the corresponding json and then you just create a format for it in the companion class. This is easy and elegant and works great… until…

…until you notice that one of the web services you are calling has more than 22 fields and is a problem because Scala does not allow more than 22 fields in a case class. Normally that’s fine and usually that is a red flag about poor design, but sometimes the web services you use have more than 22 fields in the response objects.

case class FooBar(
   field01: String,
   field02: String,
   field03: String,
// ... many fields ...
   field22: String,
   field23: String)

// compilation errors

object FooBar{
   implicit val foobarFormat: Format[FooBar] = Json.format[FooBar]

Luckily there is pretty neat way around this and it might even improve your design. You can use nested case classes and all you need to do is implement a wrapper writer and reader for that class, but first you can create case classes that represent subset of the fields of the response.

case class Foo(
   field01: String,
   field02: String,
// ... many fields ...
   field09: String,
   field10: String)

object Foo{
   implicit val fooFormat: Format[Foo] = Json.format[Foo]

case class Bar(
   field11: String,
   field12: String,
// ... many fields ...
   field21: String,
   field22: String,
   field23: String)

object Bar{
   implicit val barFormat: Format[Bar] = Json.format[Bar]

So now we have smaller classes that contains subset of the original fields and the only thing missing is a “wrapper” class the represents the complete class. This is rather simple and in this case it has two fields, namely the two classes I just defined.

case class FooBar(
   foo: Foo,
   bar: Bar)

object FooBar {
   implicit val foobarReads: Reads[FooBar] = (
      (JsPath).read[Foo] and
      (JsPath).read[Bar])(FooBar.apply _)

   implicit val foobarWrites: Writes[FooBar] = (
      (JsPath).write[Foo] and

Now in scala you can access fields quite neatly using the dot -notion, but the json is serialized back to the original format. This parsing can be easily tested.

val foobarJson: JsValue = Json.parse("""
  { "field1":"value1", 

"Parsing json to nested case classes" should {
  "work just fine" in {
    val foobar =[FooBar] === "value1"
// ... === "value23"