Writing a Spark Stream Word Count Application to HPE Ezmeral Data Fabric Database

The example in this section writes a Spark stream word count application to HPE Ezmeral Data Fabric Database.

Example

val spark = SparkSession
      .builder
      .appName("StructuredKafkaWordCount")
      .getOrCreate()
              
import spark.implicits._
//Create a DataSet representing the stream of input lines from Kafka
val lines = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option(subscribeType, topics)
      .load()
      .selectExpr("CAST(value AS STRING)")
      .as[String]
              
//Generate a running word count
val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
              
//Run the query that saves the result to MapR-DB
val query = wordCounts.writeStream
      .format(MapRDBSourceConfig.Format)
      .option(MapRDBSourceConfig.TablePathOption, resultTable)
      .option(MapRDBSourceConfig.CreateTableOption, true)
      .option(MapRDBSourceConfig.IdFieldPathOption, "value")
      .outputMode("complete")
      .start()
              
query.awaitTermination()  
SparkSession spark = SparkSession
           .builder()
           .appName("JavaStructuredKafkaWordCount")
           .getOrCreate();
              
//Create a DataSet representing the stream of input lines from Kafka
Dataset<String> lines = spark
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", bootstrapServers)
            .option(subscribeType, topics)
            .load()
            .selectExpr("CAST(value AS STRING)")
            .as(Encoders.STRING());
              
//Generate a running word count
Dataset<Row> wordCounts = lines.flatMap(
(FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(),
Encoders.STRING()).groupBy("value").count();
              
//Run the query that saves the result to MapR-DB
StreamingQuery query = wordCounts.writeStream()
            .format(MapRDBSourceConfig.Format())
            .option(MapRDBSourceConfig.TablePathOption(), resultTable)
            .option(MapRDBSourceConfig.CreateTableOption(), true)
            .option(MapRDBSourceConfig.IdFieldPathOption(), "value")
            .outputMode("complete");
            .start();
              
query.awaitTermination();
spark = SparkSession\
           .builder\
           .appName("StructuredKafkaWordCount")\
           .getOrCreate()
              
#Create a DataSet representing the stream of input lines from Kafka
lines = spark\
          .readStream\
          .format("kafka")\
          .option("kafka.bootstrap.servers", bootstrapServers)\
          .option(subscribeType, topics)\
          .load()\
          .selectExpr("CAST(value AS STRING)")
              
#Split the lines into words
words = lines.select(
#Explode turns each item in an array into a separate row
explode(
       split(lines.value, ' ')
       ).alias('word')
)
              
#Generate a running word count
wordCounts = words.groupBy('word').count()
              
#Run the query that saves the result to MapR-DB
query = wordCounts\
              .writeStream\
              .format("com.mapr.db.spark.streaming") \
              .option("tablePath", table_path) \
              .option("createTable", True) \
              .option("idFieldPath", "value") \
              .outputMode('complete')\
              .start()
              
query.awaitTermination()