Saving an Apache Spark RDD to a HPE Ezmeral Data Fabric Database JSON Table

Saving an RDD[OJAIDocument] to HPE Ezmeral Data Fabric Database

The HPE Ezmeral Data Fabric Database OJAI Connector for Apache Spark provides the following API to save an RDD[OJAIDocument] to a HPE Ezmeral Data Fabric Database table:

For saving an RDD, apply the following method on the RDD:
def saveToMapRDB(tablename: String, createTable: Boolean = false, bulkInsert: Boolean = false, idFieldPath: String = DocumentConstants.ID_KEY) : Unit
For saving an RDD, apply one of the following methods on a MapRDBJavaSparkContext object:
def saveToMapRDB[D](javaRDD: JavaRDD[D], tableName: String, createTable: Boolean, bulkInsert: Boolean, idField: String): Unit
              
def saveRowRDDToMapRDB(javaRDD: JavaRDD[Row], tableName: String, createTable: Boolean, bulkInsert: Boolean, idField: String): Unit

def saveToMapRDB[K, V <: AnyRef](javaPairRDD: JavaPairRDD[K, V], keyClazz: Class[K], valueClazz: Class[V], tableName: String, createTable: Boolean, bulkInsert: Boolean): Unit
NOTE The only required parameter to the methods is tableName. All the others are optional.
In the following example, address and first_name data is loaded from the "/tmp/user_profiles" table, stored as an RDD (userprofilesRDD), and then saved to the "/tmp/user_firstname_and_address" table:
import com.mapr.db.spark._
              
val userprofilesRDD = sc.loadFromMapRDB("/tmp/user_profiles")
                     .where("condition")
                     .select("address", "first_name")
            
userprofilesRDD.saveToMapRDB("/tmp/user_firstname_and_address")
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;
              
MapRDBJavaSparkContext mapRDBSparkContext = new MapRDBJavaSparkContext(sc);
JavaRDD userprofilesRDD = mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles")
              .where("condition")
              .select("address", "first_name");
mapRDBSparkContext.saveToMapRDB(userprofilesRDD, "/tmp/user_firstname_and_address", true, false, "_id");

The HPE Ezmeral Data Fabric Database OJAI Connector for Apache Spark also provides the following API to insert an RDD[OJAIDocument] to a HPE Ezmeral Data Fabric Database table:

NOTE The insertToMapRDB API is available starting in the EEP 4.1.0 release.
import com.mapr.db.spark._
            
val userprofilesRDD = sc.loadFromMapRDB("/tmp/user_profiles") 
userprofilesRDD.insertToMapRDB(tablename, createTable = true, bulkInsert = false, idFieldPath = "_id")
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
import org.apache.spark.sql.SparkSession;
            
MapRDBJavaSparkContext mapRDBSparkContext = new MapRDBJavaSparkContext(spark.sparkContext());
MapRDBJavaRDD<OJAIDocument> userprofilesRDD = mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles");
mapRDBSparkContext.insertRowRDDToMapRDB(userprofilesRDD, tablename);
NOTE The insertToMapRDB API throws an exception if a row with the same ID already exists.

This API supports the following parameters:

Parameter Default Description
tableName Not applicable The name of the HPE Ezmeral Data Fabric Database table in which you are saving the document.
createTable false Creates the table before saving the documents. Note that if the table already exists and createTable is set to true, the API throws an exception.
idFieldPath _id Specifies the key to be used for the document.
bulkInsert false Loads a group of rows simultaneously. bulkInsert is similar to a bulk load in MapReduce.
Parameter Default Description
RDD (JavaRDD or JavaPairRDD) Not applicable Specifies the RDD which you are saving to the HPE Ezmeral Data Fabric Database table.
tableName Not applicable Specifies the name of the HPE Ezmeral Data Fabric Database table in which you are saving the document.
createTable false Creates the table before saving the document. Note that if the table already exists and createTable is set to true, the API throws an exception.
idFieldPath _id Specifies the key to be used for the document.
bulkInsert false Loads a group of rows simultaneously. bulkInsert is similar to a bulk load in MapReduce.
keyClazz (Only for JavaPairRDD) Not applicable Specifies the class type which is the key in the JavaPairRDD which you are saving into the HPE Ezmeral Data Fabric Database table.
valueClazz(Only for JavaPairRDD) Not applicable Specifies the class type which is the value in the JavaPairRDD which you are saving into the HPE Ezmeral Data Fabric Database table.
In Java, saveToMapRDB method works with JavaRDD and JavaPairRDD. For saving JavaRDD[Row], use the saveRowRDDToMapRDB method.

The following example specifies a key by using the idFieldPath parameter and the bulkInsert value to save the RDD:

import com.mapr.db.spark._
                
userprofilesRDD.saveToMapRDB("/tmp/user_firstname_and_address", 
                             idFieldPath = "user_id", 
                             bulkInsert = false)
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
                
MapRDBJavaSparkContext mapRDBSparkContext = new MapRDBJavaSparkContext(spark.sparkContext());
mapRDBSparkContext.saveToMapRDB(userprofilesRDD, "/tmp/user_firstname_and_address", false, false, "user_id");

The following example saves an RDD of Person objects into the newly created /tmp/Userinfo table:

import com.mapr.db.spark._
            
val sparkConf = new SparkConf().setAppName("json app").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val people = sc.parallelize(getUsers())
people.saveToMapRDB("/tmp/UserInfo", createTable= true)
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
            
SparkConf sparkConf = new SparkConf().setAppName("json app").setMaster("local[*]");
SparkContext sc = new SparkContext(sparkConf);
JavaRDD rdd = sc.parallelize(getUsers());
mapRDBSparkContext.saveToMapRDB(rdd, "/tmp/UserInfo", true);

The following example shows the getUsers function that allocates the Person objects:

def getUsers(): Array[Person] = {
    val users: Array[Person] =
            
    Array(
       Person("DavUSCalif", "David", "Jones",
           ODate.parse("1947-11-29"),
           Seq("football", "books", "movies"),
           Map("city" -> "milpitas", "street" -> "350 holger way", "Pin" -> 95035)),
            
       Person("PetUSUtah", "Peter", "pan",
           ODate.parse("1974-1-29"),
           Seq("boxing", "music", "movies"),
           Map("city" -> "salt lake", "street" -> "351 lake way", "Pin" -> 89898)),
            
       Person("JamUSAriz", "James", "junior",
           ODate.parse("1968-10-2"),
           Seq("tennis", "painting", "music"),
           Map("city" -> "phoenix", "street" -> "358 pond way", "Pin" -> 67765)),
            
       Person("JimUSCalif", "Jimmy", "gill",
           ODate.parse("1976-1-9"),
           Seq("cricket", "sketching"),
           Map("city" -> "san jose", "street" -> "305 city way", "Pin" -> 95652)),
            
       Person("IndUSCalif", "Indiana", "Jones",
           ODate.parse("1987-5-4"),
           Seq("squash", "comics", "movies"),
           Map("city" -> "sunnyvale", "street" -> "35 town way", "Pin" -> 95985)))
            
     users
  }

Saving a JSON Document to HPE Ezmeral Data Fabric Database

To save a JSON document using the HPE Ezmeral Data Fabric Database OJAI Connector for Apache Spark, you must first convert the JSON document into an OJAI document and then save the RDD, as shown in the following example:

import com.mapr.db.spark._
            
val documents = sc.parallelize((1 to 10)
            .map(i => s"""{"_id": "$i", "test": "$i"}"""))
val maprd = documents.map(a => MapRDBSpark.newDocument(a))
maprd.saveToMapRDB("/tmp/testData")
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
import org.apache.spark.api.java.JavaSparkContext;
            
JavaRDD<String> documents = JavaSparkContext.fromSparkContext(sc)
            .parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
            .map(i -> { return "{\"id\": \"" + i + "\", \"test\": \"" + i + "\"}"; });
JavaRDD<OJAIDocument> maprd = documents.map(MapRDBSpark::newDocument);
mapRDBSparkContext.saveToMapRDB(maprd, "/tmp/testData");
An _id field is required to save JSON data into a table, so an _id field must be present. If you need only to convert the JSON data to an OJAI document (without saving to HPE Ezmeral Data Fabric Database), the _id field is not required. If the HPE Ezmeral Data Fabric Database table already contains a record with the same _id value, HPE Ezmeral Data Fabric Database replaces the record. Otherwise, it inserts a new record.

Just as you can load a JSON document into a Scala bean class (see Creating an RDD of a Class), you can save the RDD of Scala class objects in a HPE Ezmeral Data Fabric Database JSON table. saveToMapRDB can save any bean object as a JSON document by converting it to a JSON document.

Table Splits and saveToMapRDB

If the createTable parameter is set to true, saveToMapRDB can use the partition information from the RDD's lineage to create the splits for a new table:

sc.loadFromMapRDB("/tmp/user_profiles").saveToMapRDB("/userProfiles",
                                                     createTable = true)

Suppose /tmp/user_profiles has a table with five splits. saveToMapRDB uses this information to create the /userProfiles table with the same number and range of splits. You can also supply this information by using MapRDBSpark.newPartitioner:

sc.loadFromMapRDB("/tmp/user_profiles").keyBy(doc => doc.get("_id"))
  .repartitionAndSortWithinPartitions(MapRDBSpark.newPartitioner[String]
      ("/profiles"))
  .saveToMapRDB("/userProfiles", createTable = true)

For more information about partitioning, see Using the Custom Partitioner with the HPE Ezmeral Data Fabric Database OJAI Connector for Apache Spark.