Loading Data from HPE Ezmeral Data Fabric Database as an Apache Spark RDD

You can use the following API to load JSON-format data from a HPE Ezmeral Data Fabric Database table into an Apache Spark RDD of a JSON document:

For loading as an RDD, apply the following method on a SparkContext object:
def loadFromMapRDB[T](table: String): RDD[T]
For loading as an RDD, apply the following method on a MapRDBJavaSparkContext object:
mapRDBSparkContext.loadFromMapRDB(tableName: String, clazz: Class)
NOTE The only required parameter to the methods is tableName. All the others are optional.

The following example creates a userprofilesRDD by calling loadFromMapRDB from SparkContext (Scala) or MapRDBSparkContext (Java) and supplying the table ("/tmp/user_profiles"):

import com.mapr.db.spark._
            
val userprofilesRDD = sc.loadFromMapRDB("/tmp/user_profiles")
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;
 
MapRDBJavaSparkContext mapRDBSparkContext = new MapRDBJavaSparkContext(sc);
JavaRDD userprofilesRDD = mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles")   

The following example creates a usersInfo RDD by calling loadFromMapRDB from SparkContext (Scala) or MapRDBSparkContext (Java) and supplying the table ("/tmp/UserInfo"):

import com.mapr.db.spark._
            
val usersInfo = sc.loadFromMapRDB("/tmp/UserInfo")
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
            
MapRDBJavaRDD<OJAIDocument> usersInfo = mapRDBSparkContext.loadFromMapRDB("/tmp/UserInfo")
In the previous example, the usersInfo data contains the following information:
  • Address (map type)
  • Date of birth (date type)
  • First name (string type)
  • Interests (string type)
  • Last name (string type)

The following prints the fields and shows the output for a sample user:

usersInfo.foreach(println(_))
usersInfo.foreach(System.out::println);
{
  "address":
     {"Pin":95035,"city":"milpitas","street":"350 holger way"},
  "dob":"1947-11-29",
  "first_name":"David",
  "interests":["football","books","movies"],
  "last_name":"Jones"
}

The following example shows a join operation performed on two different JSON documents using address.city as the join key:

import com.mapr.db.spark._
            
val maprd1 = sc.loadFromMapRDB("/tmp/user_profiles")
            
val maprd2 = sc.loadFromMapRDB("/tmp/user_income")
            
val collection = maprd1.map(a => (a.`address.city`[String],a))
.cogroup(maprd2.map(a=>(a.`address.city`[String],a)))
.map(a => (a._1,a._2._1.size,a._2._2.size)).collect
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
import scala.Tuple2;
import scala.Tuple3;
import java.util.Collection;
            
MapRDBJavaRDD<OJAIDocument> maprd1 = mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles");
MapRDBJavaRDD<OJAIDocument> maprd2 = mapRDBSparkContext.loadFromMapRDB("/tmp/user_income");

List collection = maprd1.mapToPair(a -> new Tuple2<>(a.getString("address.city"), a))
.cogroup(maprd2.mapToPair(a -> new Tuple2<>(a.getString("address.city"), a)))
.map(a -> new Tuple3<>(a._1, ((Collection<?>)a._2._1).size(), ((Collection<?>)a._2._2).size()))
.collect();

The resulting RDD, collection, contains the count of the users in the user_profiles and user_income HPE Ezmeral Data Fabric Database tables.

The following example adds a new field into all the JSON documents:

import com.mapr.db.spark._
          
val maprd = sc.loadFromMapRDB("/tmp/user_profiles")
val documents = maprd.map(a => { a.`address.country` = "USA"; a}).collect
documents.saveToMapRDB("/tmp/cleaned_user_profiles")
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
          
MapRDBJavaRDD<OJAIDocument> maprd =   mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles");
List<OJAIDocument> documents = maprd.map(a -> {a.set("address.country", "USA"); return a;})
                                          .collect();
mapRDBSparkContext.saveToMapRDB(documents, "/tmp/cleaned_user_profiles");

Improving Performance by Using Projection Pushdown and Filter Pushdown

To improve performance, you can supply a WHERE clause and projection fields to the loadFromMapRDB API. In the following example, a condition is supplied to the loadFromMapRDB function and only certain fields are specified in the SELECT clause:

import com.mapr.db.spark._
              
val userprofilesRDD = sc.loadFromMapRDB("/tmp/user_profiles")
                        .where([condition])
                        .select("address",
                                "first_name",
                                "_id",
                                "last_name")
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
import org.ojai.store.QueryCondition;
              
MapRDBJavaSparkContext mapRDBSparkContext = new MapRDBJavaSparkContext(spark.sparkContext());
MapRDBJavaRDD userprofilesRDD = mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles")
                                                  .where([condition])
                                                  .select("address",
                                                          "first_name",
                                                          "_id",
                                                          "last_name");          

The data is loaded based on the condition. The condition is pushed down to the server, and the server returns data based on the filtering. Only the fields specified in the SELECT clause are projected.

In the following example, the WHERE clause is used as a filter condition:

import com.mapr.db.spark._
              
val userprofilesRDD = sc.loadFromMapRDB("/tmp/user_profiles")
                      .where(field("salary") >= 100)
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
import org.ojai.store.QueryCondition;

MapRDBJavaSparkContext mapRDBSparkContext = new MapRDBJavaSparkContext(spark.sparkContext());
MapRDBJavaRDD userprofilesRDD = mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles")
.where(MapRDB.newCondition().is("salary", QueryCondition.Op.GREATER_OR_EQUAL, 100));

The userprofilesRDD includes only those documents with a salary field greater than 100.

By specifying an _id field, you can find and retrieve a row for a given key:

import com.mapr.db.spark._
              
val userprofilesRDD = sc.loadFromMapRDB("/tmp/user_profiles")
                      .where(field("_id") === "k2")
import com.mapr.db.spark.api.java.MapRDBJavaSparkContext;
import org.ojai.store.QueryCondition;

MapRDBJavaSparkContext mapRDBSparkContext = new MapRDBJavaSparkContext(spark.sparkContext());
MapRDBJavaRDD userprofilesRDD = mapRDBSparkContext.loadFromMapRDB("/tmp/user_profiles")
                  .where(MapRDB.newCondition().is("_id", QueryCondition.Op.EQUAL, "k2"));

WHERE Clause Semantics

The loadFromMapRDB API supports a WHERE clause to push down the filter to the JSON document API, ensuring that only relevant documents are propagated to the RDD.

You can use two options to provide the filter condition:

  • Scala domain-specific language (DSL)
  • QueryCondition (from OJAI API)

Following is an example of using loadFromMapRDB and supplying a condition by using Scala DSL:

Condition isDoe = field("last_name") === "Doe" 
val userprofilesRDD = sc.loadFromMapRDB("/tmp/user_profiles").where(isDoe)

For more information about using Scala DSL, see Scala DSL for Specifying Filter Conditions.

Following is an example of passing the condition using the QueryCondition API:

val maprd  = sc.loadFromMapRDB(tableName)
             .where(MapRDB.newCondition()
             .is("_id", QueryCondition.Op.EQUAL, "k2")
             .build())
MapRDBJavaRDD rdd = mapRDBJavaSparkContext.loadFromMapRDB(tableName)
            .where(MapRDB.newCondition().is("_id", QueryCondition.Op.EQUAL, "k2").build());

For more information about QueryCondition, see Querying with Conditions.

NOTE For additional information, see Java Examples in the source code.