Running Spark Jobs in Zeppelin

This section contains code samples for different types of Apache Spark jobs that you can run in your Apache Zeppelin notebook. You can run these examples using either the Livy or Spark interpreter. The Spark interpreter is available starting in the 1.1 release of the MapR Data Science Refinery product.

Before running these examples, depending on whether you are using the Livy or Spark interpreter, make sure you have configured the interpreter.
NOTE Examples in this section use Hadoop commands to access files in the MapR File System. If you have a MapR File System mount point in your container, you can replace the Hadoop commands with standard shell commands. Refer to Running Shell Commands in Zeppelin for an example of how to do this.

Running a Spark Job Using PySpark

The following example shows how to run a Spark job using Python. Make sure you have installed Python on your MapR cluster.

  1. Before running the sample PySpark code, copy the files that the code references to the MapR File System:
    %sh
    hadoop fs -mkdir -p /user/mapruser1/examples/src/main/resources/
    hadoop fs -put /opt/mapr/spark/spark-<version>/examples/src/main/resources/people.txt /user/mapruser1/examples/src/main/resources/
    hadoop fs -put /opt/mapr/spark/spark-<version>/examples/src/main/resources/people.json /user/mapruser1/examples/src/main/resources/
  2. Run the PySpark code specifying either %livy.pyspark or %spark.pyspark as your interpreter:
    from pyspark.sql.types import *
    sc = spark.sparkContext
    lines = sc.textFile("examples/src/main/resources/people.txt")
    parts = lines.map(lambda l: l.split(","))
    people = parts.map(lambda p: (p[0], p[1].strip()))
    schemaString = "name age" 
    fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
    schema = StructType(fields)
    schemaPeople = spark.createDataFrame(people, schema)
    schemaPeople.createOrReplaceTempView("people")
    schemaPeople.createOrReplaceTempView("people")
    results = spark.sql("SELECT name FROM people")
    results.show()

Running a Spark Job Using SparkR

The following SparkR code example creates a table and queries it using HiveQL. Make sure you have installed SparkR on your MapR cluster. Set your interpreter to either %livy.sparkr or %spark.r, depending on whether you are using Livy or Spark.

sqlContext <- sparkR.session(sc)
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA INPATH '/user/mapruser1/examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries can be expressed in HiveQL.
results <- collect(sql("FROM src SELECT key, value"))
print(results)

Reading a JSON File Using Spark

Set your interpreter to either %livy.spark or %spark, depending on whether you are using Livy or Spark.

The following example loads a JSON file into a Spark DataFrame and then displays it:

val pathJSON = "/user/mapruser1/examples/src/main/resources/people.json"
val df = spark.read.json(pathJSON)
df.show() 

Querying Using Spark

Set your interpreter to either %livy.spark or %spark, depending on whether you are using Livy or Spark.

The following example extends the JSON file reading example by issuing various queries on the data read into the DataFrame:

import spark.implicits._
val pathJSON = "/user/mapruser1/examples/src/main/resources/people.json"
val df = spark.read.json(pathJSON)
df.printSchema()
df.select("name").show()
df.select($"name", $"age" + 1).show()
df.filter($"age" > 21).show()
df.groupBy("age").count().show()

Querying Hive Tables Using Spark SQL

The following two examples query Hive tables using Spark SQL queries. Make sure the hive-site.xml configuration file from your Hive cluster is available in your Zeppelin container. Hive Tables describes the detailed steps.

If the code snippets in these examples do not specify an interpreter, specify %livy.spark to use the Livy interpreter or %spark to use the Spark interpreter.

Example 1
  1. Run the following code to create Hive tables and issue various select statements against them:
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.SparkSession
    case class Record(key: Int, value: String)
    import spark.implicits._
    import spark.sql
    
    // Create table, loading data from text file
    sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
    sql("LOAD DATA INPATH '/user/mapruser1/examples/src/main/resources/kv1.txt' INTO TABLE src")
    sql("SELECT * FROM src").show()
    sql("SELECT COUNT(*) FROM src").show()
    
    // Find records where key < 10, ordering the result
    val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
    
    // Create a second table and join with the first
    val stringsDS = sqlDF.map {case Row(key: Int, value: String) => s"Key: $key, Value: $value"}
    stringsDS.show()
    val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
    recordsDF.createOrReplaceTempView("records")
    sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
  2. Drop the table created in the previous code snippet:
    sql("DROP TABLE src").show()
Example 2
  1. Using the shell interpreter, create the source data file that you will use to load your Hive table:
    %sh
    cat > /tmp/test.data << EOF
    John,Smith
    Brian,May
    Rodger,Taylor
    John,Deacon
    Max,Plank
    Freddie,Mercury
    Albert,Einstein
    Fedor,Dostoevsky
    Lev,Tolstoy
    Niccolo,Paganini
    EOF
    hadoop fs -put /tmp/test.data /user/mapruser1/
  2. Create, load, and query the Hive table:
    sql ("create table test_hive(first_name string, last_name string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','")
    sql("load data inpath '/user/mapruser1/test.data' overwrite into table test_hive")
    sql("CREATE TABLE test_hive_parquet (first_name string, last_name string) STORED AS PARQUET")
    sql("INSERT OVERWRITE TABLE test_hive_parquet SELECT first_name, last_name FROM test_hive")
    sql("SELECT * from test_hive_parquet").show()