Using Structured Streaming to Create a Word Count Application

The example in this section creates a dataset representing a stream of input lines from Kafka and prints out a running word count of the input lines to the console.

Using Apache Kafka


val spark = SparkSession
import spark.implicits._
//Create a DataSet representing the stream of input lines from Kafka
val lines = spark
       .option("kafka.bootstrap.servers", bootstrapServers)
       .option(subscribeType, topics)
       .selectExpr("CAST(value AS STRING)")
//Generate a running word count
val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
//Run the query that prints the running counts to the console
val query = wordCounts.writeStream
       .option("checkpointLocation", checkpointLocation)
SparkSession spark = SparkSession
//Create a DataSet representing the stream of input lines from Kafka
Dataset<String> lines = spark
              .option("kafka.bootstrap.servers", bootstrapServers)
              .option(subscribeType, topics)
              .selectExpr("CAST(value AS STRING)")
//Generate a running word count
Dataset<Row> wordCounts = lines.flatMap(
(FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(),
//Run the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream()
spark = SparkSession\
#Create a DataSet representing the stream of input lines from Kafka
lines = spark\
          .option("kafka.bootstrap.servers", bootstrapServers)\
          .option(subscribeType, topics)\
          .selectExpr("CAST(value AS STRING)")
#Split the lines into words
words =
#explode turns each item in an array into a separate row
        split(lines.value, ' ')
#Generate a running word count
wordCounts = words.groupBy('word').count()
#Run the query that prints the running counts to the console
query = wordCounts\

Using MapR Event Store for Apache Kafka


For MapR Event Store, the topic name consists of the stream name and topic, and the bootstrap servers are not used. For example:
var topic: String = "/user/mapr/stream:reviews"
val df1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers",
   "maprdemo:9092").option("subscribe", topic).option("",
   "testgroup").option("startingOffsets", "earliest").option("failOnDataLoss",
          false).option("maxOffsetsPerTrigger", 1000).load()