Category Archives: Spark
Getting Started With Spark 2.x Streaming and Kafka
![]() |
![]() |
I’ve been digging into spark more and more lately and I had some trouble finding up to date tutorials on getting started with Kafka and Spark Streaming (especially for 2.x and for kafka 0.10). Especially if you want to run your own code easily.While running streaming jobs with spark-shell is not really recommended I find it very convenient to get started as you don’t even need to compile the code.
Up and running with Kafka
First things first, you need a kafka producer running. You can find the official quickstart guide here: https://kafka.apache.org/quickstart, but for the sake of simplicity I will repeat the relevant parts here.
1. Get the kafka distribution:
- Download https://www.apache.org/dyn/closer.cgi?path=/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz
- Un-tar it:
tar -xzf kafka_2.11-0.11.0.1.tgz
- From now on I refer to the folder where it was un-tarred as [kafka_home]/
2. Run zookeeper
$ [kafka_home]/bin/zookeeper-server-start.sh config/zookeeper.properties
3. Run kafka server
$ [kafka_home]/bin/kafka-server-start.sh config/server.properties
4. Create topic
$ [kafka_home]/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
5. Run kafka producer
$ [kafka_home]/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
You can skip creating the consumer.
Setup Spark 2.x
First we need to download and setup spark.
1. Get spark
- Download https://www.apache.org/dyn/closer.lua/spark/spark-2.1.2/spark-2.1.2-bin-hadoop2.7.tgz
- Un-tar it:
tar -xzf spark-2.1.2-bin-hadoop2.7.tgz
, the folder it was unpackaged is now referred in this guide as [spark_home] - This guide also works with at least Spark 2.2 as well, you can pick your poison here: https://spark.apache.org/downloads.html
2. Verify Spark works
- You can verify that spark-shell works by launching it
[spark_home]/bin/spark-shell
- Use CTRL + C to quit
Create Spark Streaming Application
Let’s create a new folder for our streaming applications. Lets call it “kafka-spark-stream-app”.
So now the folder structure should look something like:
/[kafka_home]
/[spark_home]
/kafka-spark-stream-app
Let’s create a file for the word count streaming example, use any text editor to create a file called /kafka-spark-stream-app/kafkaSparkStream.scala
import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.KafkaUtils import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe // Create the context with a 1 second batch size val ssc = new StreamingContext(sc, Seconds(1)) val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "localhost:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "use_a_separate_group_id_for_each_stream" ) val topics = Array("kafka_metadata_example") val stream = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) val lines = stream.map(_.value) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination()
To run this, we need to still add few jars to the classpath so let’s create a subfolder to the /kafka-spark-stream-app/jars and add two jars there:
Both can be find for example on mvnrepository.com.So now the folder structure should look like this:
/[kafka_home]
/[spark_home]
/kafka-spark-stream-app
/kafka-spark-stream-app/kafkaSparkStream.scala
/kafka-spark-stream-app/jars/kafka-clients-0.10.2.1.jar
/kafka-spark-stream-app/jars/spark-streaming-kafka-0-10_2.11-2.1.1.jar
*In case you use different version of Spark, make sure you have the corresponding version of the spark-streaming-kafka -library as well.
Running the spark streaming script
Now all we have is left is to run the script assuming you have the kafka running as setup in the beginning of this article.
$ [spark_home]/bin/spark-shell --jars ../kafka-spark-stream-app/jars/spark-streaming-kafka-0-10_2.11-2.1.1.jar,../kafka-spark-stream-app/jars/kafka-clients-0.10.2.1.jar -i ../kafka-spark-stream-app/kafkaSparkStream.scala
Basically we just tell spark-shell that the script will require those two jars to run and where they are and with the switch -i we tell spark-shell to run the script from the given file. Now post some text from the kafka console producer and see the spark streaming application printing out the word counts of the given phrase.
Followup
Of course it does not make sense to run any real spark streaming application like this, but it’s very convenient to be able to run scripts without having to set up a proper Scala project. I will follow up on how to wrap to application in a proper sbt project soon.