Integrate Spark with Kafka
From EEP-5.0.0, Spark can be integrated with Kafka-1.0. You can configure a Spark application to produce Kafka messages.
About this task
NOTE Starting from EEP-8.0.0, HPE Ezmeral Data Fabric does not support
spark-streaming-kafka-producer
. To learn about Kafka
integration on Apache Spark 3.1.2 and later in HPE Ezmeral Data Fabric, see
Structured Streaming + Kafka Integration Guide
(Kafka broker version 0.10.0 or higher). Procedure
-
Add the following dependency:
groupId = org.apache.spark artifactId = spark-streaming-kafka-producer_2.11 version = <spark_version>-mapr-<mapr_eco_version>
-
When you write the Spark program, import and use classes from:
The import oforg.apache.spark.streaming.kafka.producer._ org.apache.spark.streaming.dstream.
org.apache.spark.streaming.stream.DStream
adds the following method from DStream:sendToKafka(topic: String, conf: ProducerConf)
-
In the code below, calling
sendToKafka
will sendnumMessages
messages to the set of topics specified by the topics parameter:val producerConf = new ProducerConf( bootstrapServers = kafkaBrokers.split(",").toList) val items = (0 until numMessages.toInt).map(i => Item(i, i).toString) val defaultRDD: RDD[String] = ssc.sparkContext.parallelize(items) val dStream: DStream[String] = new ConstantInputDStream[String](ssc, defaultRDD) dStream.foreachRDD(_.sendToKafka(topics, producerConf)) dStream.count().print()