Accessing the MapR Database in Zeppelin Using the MapR Database OJAI Connector
This section contains examples of Apache Spark jobs that use the MapR Database OJAI Connector for Apache Spark to read and write MapR Database JSON tables. The examples use the Spark Python interpreter. The Spark interpreter is available starting in the 1.1 release of the MapR Data Platform MapR Data Science Refinery product. The Python API in the MapR Database OJAI Connector is available starting in the EEP 4.1 release.
Inserting a Spark DataFrame into a MapR Database JSON Table
The following code sample creates a Spark DataFrame, inserts it into a MapR Database JSON table that is created as part of the insert, and then loads it into another DataFrame:
%spark.pyspark
df = sc.parallelize([ { "_id": "rsmith", "address": { "city": "San Francisco", "line": "100 Main Street", "zip": 94105 }, "dob": "1982-02-03", "first_name": "Robert", "interests": [ "electronics", "music", "sports" ], "last_name": "Smith" }, { "_id": "mdupont", "address": { "city": "San Jose", "line": "1223 Broadway", "zip": 95109 }, "dob": "1982-02-03", "first_name": "Maxime", "interests": [ "sports", "movies", "electronics" ], "last_name": "Dupont" }, { "_id": "jdoe", "address": None, "dob": "1970-06-23", "first_name": "John", "interests": None, "last_name": "Doe" }, { "_id": "dsimon", "address": None, "dob": "1980-10-13", "first_name": "David", "interests": None, "last_name": "Simon" }, { "_id": "alehmann", "address": None, "dob": "1980-10-13", "first_name": "Andrew", "interests": [ "html", "css", "js" ], "last_name": "Lehmann" } ]).toDF()
# Insert into MapR-DB table
spark.insertToMapRDB(df, "/user/mapruser1/table1", create_table=True)
# Load previously inserted data from MapR-DB table
df_loaded = spark.loadFromMapRDB("/user/mapruser1/table1").show()
Inserting a Spark DataFrame into a MapR Database JSON Table Using Bulk Insert
To bulk insert into a MapR Database JSON table that is created as part of the insert operation, you must order the records in the DataFrame as shown in the following example:
%spark.pyspark
df = sc.parallelize([ { "_id": "rsmith", "address":{ "city": "San Francisco", "line": "100 Main Street", "zip": 94105 }, "dob": "1982-02-03", "first_name": "Robert", "interests": [ "electronics", "music", "sports" ], "last_name": "Smith" }, { "_id": "mdupont", "address":{ "city": "San Jose", "line": "1223 Broadway", "zip": 95109 }, "dob": "1982-02-03", "first_name": "Maxime", "interests": [ "sports", "movies", "electronics" ], "last_name": "Dupont" },{ "_id": "jdoe", "address": None, "dob": "1970-06-23", "first_name": "John", "interests": None, "last_name": "Doe" },{ "_id": "dsimon", "address": None, "dob": "1980-10-13", "first_name": "David", "interests": None, "last_name": "Simon" },{ "_id": "alehmann", "address": None, "dob": "1980-10-13", "first_name": "Andrew", "interests": [ "html", "css", "js" ], "last_name": "Lehmann" }]).toDF().orderBy("_id")
# Bulk insert into MapR-DB table
spark.insertToMapRDB(df, "/user/mapruser1/table2", create_table=True, bulk_insert=True)
# Load previously inserted data from MapR-DB table
df_loaded = spark.loadFromMapRDB("/user/mapruser1/table2").show()
Selecting and Filtering Data when Loading a Spark DataFrame
The following code sample uses projection and filtering when loading a Spark DataFrame from a MapR Database JSON table:
%spark.pyspark
from pyspark.sql.functions import col, asc
df = sc.parallelize([{ "_id": "rsmith", "address": { "city": "San Francisco", "line": "100 Main Street", "zip": 94105 }, "dob": "1982-02-03", "first_name": "Robert", "interests": [ "electronics", "music", "sports" ], "last_name": "Smith" }, { "_id": "mdupont", "address": { "city": "San Jose", "line": "1223 Broadway", "zip": 95109 }, "dob": "1982-02-03", "first_name": "Maxime", "interests": [ "sports", "movies", "electronics" ], "last_name": "Dupont" }]).toDF()
spark.saveToMapRDB(df, "/user/mapruser1/table3", create_table=True)
# Load previously saved data from the MapR-DB table
df_loaded_select = spark.loadFromMapRDB("/user/mapruser1/table3")\
.select("_id","first_name","address")\
.filter(col("first_name") == "Maxime").show()
Joining DataFrames when Loading a Spark DataFrame
The following code sample loads a Spark DataFrame from a MapR Database JSON table and joins the DataFrame with a second DataFrame:
%spark.pyspark
df = sc.parallelize([ { "_id": "rsmith", "address": { "city": "San Francisco", "line": "100 Main Street", "zip": 94105 }, "dob": "1982-02-03", "first_name": "Robert", "interests": [ "electronics", "music", "sports" ], "last_name": "Smith" }, { "_id": "mdupont", "address": { "city": "San Jose", "line": "1223 Broadway", "zip": 95109 }, "dob": "1982-02-03", "first_name": "Maxime", "interests": [ "sports", "movies", "electronics" ], "last_name": "Dupont" }, { "_id": "jdoe", "address": None, "dob": "1970-06-23", "first_name": "John", "interests": None, "last_name": "Doe" }, { "_id": "dsimon", "address": None, "dob": "1980-10-13", "first_name": "David", "interests": None, "last_name": "Simon" }, { "_id": "alehmann", "address": None, "dob": "1980-10-13", "first_name": "Andrew", "interests": [ "html", "css", "js" ], "last_name": "Lehmann" } ]).toDF()
dfProfessions = sc.parallelize([ { "_id": "rsmith", "profession": "Engineer" }, { "_id": "alehmann", "profession": "Doctor" }, { "_id": "alehmann", "profession": "Accountant" }, { "_id": "fake", "profession": "Software developer" } ]).toDF()
# Save to MapR-DB table
spark.saveToMapRDB(df, "/user/mapruser1/table4", create_table=True)
# Load previously saved data from MapR-DB Table and join with another DataFrame
df_loaded_select = spark.loadFromMapRDB("/user/mapruser1/table4")\
.join(dfProfessions, "_id").show()
Tutorial Examples
Zeppelin on the MapR Tutorial includes notebooks with Python and Scala code examples using the MapR Database OJAI Connector.