Category Archives: Spark

Getting Started With Spark 2.x Streaming and Kafka

I’ve been digging into spark more and more lately and I had some trouble finding up to date tutorials on getting started with Kafka and Spark Streaming (especially for 2.x and for kafka 0.10). Especially if you want to run your own code easily.While running streaming jobs with spark-shell is not really recommended I find it very convenient to get started as you don’t even need to compile the code.

Up and running with Kafka

First things first, you need a kafka producer running. You can find the official quickstart guide here:, but for the sake of simplicity I will repeat the relevant parts here.

1. Get the kafka distribution:

2. Run zookeeper

  • $ [kafka_home]/bin/ config/

3. Run kafka server

  • $ [kafka_home]/bin/ config/

4. Create topic

  • $ [kafka_home]/bin/ --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

5. Run kafka producer

  • $ [kafka_home]/bin/ --broker-list localhost:9092 --topic test

You can skip creating the consumer.

Setup Spark 2.x

First we need to download and setup spark.

1. Get spark

2. Verify Spark works

  • You can verify that spark-shell works by launching it [spark_home]/bin/spark-shell
  • Use CTRL + C to quit

Create Spark Streaming Application

Let’s create a new folder for our streaming applications. Lets call it “kafka-spark-stream-app”.
So now the folder structure should look something like:

Let’s create a file for the word count streaming example, use any text editor to create a file called /kafka-spark-stream-app/kafkaSparkStream.scala

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

// Create the context with a 1 second batch size
val ssc = new StreamingContext(sc, Seconds(1))

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "" -> "use_a_separate_group_id_for_each_stream"

val topics = Array("kafka_metadata_example")

val stream = KafkaUtils.createDirectStream[String, String](
  Subscribe[String, String](topics, kafkaParams)

val lines =
val words = lines.flatMap(_.split(" "))
val wordCounts = => (x, 1L)).reduceByKey(_ + _)

To run this, we need to still add few jars to the classpath so let’s create a subfolder to the /kafka-spark-stream-app/jars and add two jars there:

Both can be find for example on now the folder structure should look like this:

*In case you use different version of Spark, make sure you have the corresponding version of the spark-streaming-kafka -library as well.

Running the spark streaming script

Now all we have is left is to run the script assuming you have the kafka running as setup in the beginning of this article.

$ [spark_home]/bin/spark-shell --jars ../kafka-spark-stream-app/jars/spark-streaming-kafka-0-10_2.11-2.1.1.jar,../kafka-spark-stream-app/jars/kafka-clients- -i ../kafka-spark-stream-app/kafkaSparkStream.scala

Basically we just tell spark-shell that the script will require those two jars to run and where they are and with the switch -i we tell spark-shell to run the script from the given file. Now post some text from the kafka console producer and see the spark streaming application printing out the word counts of the given phrase.


Of course it does not make sense to run any real spark streaming application like this, but it’s very convenient to be able to run scripts without having to set up a proper Scala project. I will follow up on how to wrap to application in a proper sbt project soon.