Using Structured Streaming to Create a Word Count Application

The example in this section creates a dataset representing a stream of input lines from Kafka and prints out a running word count of the input lines to the console.

Using Apache Kafka

Example

val spark = SparkSession
       .builder
       .appName("StructuredKafkaWordCount")
       .getOrCreate()
              
import spark.implicits._
//Create a DataSet representing the stream of input lines from Kafka
val lines = spark
       .readStream
       .format("kafka")
       .option("kafka.bootstrap.servers", bootstrapServers)
       .option(subscribeType, topics)
       .load()
       .selectExpr("CAST(value AS STRING)")
       .as[String]
//Generate a running word count
val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
//Run the query that prints the running counts to the console
val query = wordCounts.writeStream
       .outputMode("complete")
       .format("console")
       .option("checkpointLocation", checkpointLocation)
       .start()
              
query.awaitTermination()
SparkSession spark = SparkSession
              .builder()
              .appName("JavaStructuredKafkaWordCount")
              .getOrCreate();
//Create a DataSet representing the stream of input lines from Kafka
Dataset<String> lines = spark
              .readStream()
              .format("kafka")
              .option("kafka.bootstrap.servers", bootstrapServers)
              .option(subscribeType, topics)
              .load()
              .selectExpr("CAST(value AS STRING)")
              .as(Encoders.STRING());
//Generate a running word count
Dataset<Row> wordCounts = lines.flatMap(
(FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(),
Encoders.STRING()).groupBy("value").count();
              
//Run the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream()
              .outputMode("complete")
              .format("console")
              .start();
              
query.awaitTermination();            
spark = SparkSession\
          .builder\
          .appName("StructuredKafkaWordCount")\
          .getOrCreate()
              
#Create a DataSet representing the stream of input lines from Kafka
lines = spark\
          .readStream\
          .format("kafka")\
          .option("kafka.bootstrap.servers", bootstrapServers)\
          .option(subscribeType, topics)\
          .load()\
          .selectExpr("CAST(value AS STRING)")
              
#Split the lines into words
words = lines.select(
#explode turns each item in an array into a separate row
explode(
        split(lines.value, ' ')
       ).alias('word')
     )
               
#Generate a running word count
wordCounts = words.groupBy('word').count()
              
#Run the query that prints the running counts to the console
query = wordCounts\
           .writeStream\
           .outputMode('complete')\
           .format('console')\
           .start()
              
query.awaitTermination()

Using MapR Event Store for Apache Kafka

Example

For MapR Event Store, the topic name consists of the stream name and topic, and the bootstrap servers are not used. For example:
var topic: String = "/user/mapr/stream:reviews"
val df1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers",
   "maprdemo:9092").option("subscribe", topic).option("group.id",
   "testgroup").option("startingOffsets", "earliest").option("failOnDataLoss",
          false).option("maxOffsetsPerTrigger", 1000).load()