Loading Data from MapR Database as an Apache Spark Dataset

You can use one of three ways to load data from MapR Database into an Apache Spark Dataset:

  • Load the data into a Dataset.
  • Load the data into a DataFrame, and then convert it to a Dataset.
  • Load the data into a Dataset using a custom encoder.

Load into a Dataset

For loading as a Dataset, apply the following method on a SparkSession object:
def loadFromMapRDB[T](table: String, schema : StructType).as [T]: Dataset
              
import com.mapr.db.spark.sql._
              
val ds = sparkSession.loadFromMapRDB[T]("/tmp/user_profiles").as [T]: Dataset
For loading as a Dataset, apply the following method on a MapRDBJavaSession object:
def loadFromMapRDB[T <: java.lang.Object](tableName: String, schema: StructType, sampleSize: Double, clazz: Class[T]): Dataset[T]
              
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;

MapRDBJavaSession maprSession = new MapRDBJavaSession(sparkSession);
              
Dataset<Row> ds = maprSession.loadFromMapRDB("/tmp/user_profiles");       
NOTE The only required parameter to the methods is tableName. All the others are optional.

Load into DataFrame and Convert to Dataset

To load the data as a DataFrame, see Loading Data from MapR Database as an Apache Spark DataFrame. To convert the DataFrame to a Dataset, use the as[<type>] method. The <type> can be any of the basic types in Scala.

The following code example creates a Dataset[Person] using the as[<type>] method:

import org.apache.spark.sql.SparkSession
import com.mapr.db.spark.sql._
              
case class Address(Pin: Integer, street: String, city: String)
              
case class Person (_id:String, 
              first_name:String,
              last_name: String, dob: java.sql.Date,
              Interests: Seq[String, address: Address)
              
val ds = sparkSession.loadFromMapRDB[Person]("/tmp/user_profiles").as[Person]
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;
              
public static class Address implements Serializable {
     private Integer pin;
     private String street;
     private String city;
     
     public Integer getPin() { return pin; }
     public void setPin(Integer pin) { this.pin = pin; }
     public String getStreet() { return street; }
     public void setStreet(String street) { this.street = street; }
     public String getCity() { return city; }
     public void setCity(String city) { this.city = city; }
}

public static class Person implements Serializable {
     private String _id;
     private String firstName;
     private String lastName;
     private Date dob;
     private Seq<String> interests;
              public String get_id() { return _id; }
              public void set_id(String _id) { this._id = _id; }
              public String getFirstName() { return firstName; }
              public void setFirstName(String firstName) { this.firstName = firstName; }
              public String getLastName() { return lastName; }
              public void setLastName(String lastName) { this.lastName = lastName; }
              public Date getDob() { return dob; }
              public void setDob(Date dob) { this.dob = dob; }
              public Seq<String> getInterests() { return interests; }
              public void setInterests(Seq<String> interests) { this.interests = interests; }
              }
Dataset<Person> ds = maprSession.loadFromMapRDB(tableName, Person.class);

Load into Dataset Using Custom Encoder

You can create a custom encoder for Java bean classes by calling the Encoders.bean method. Encoders.bean only support Java classes. To create a Dataset of the Scala class, the previous code can be used. The following example shows how to load into a Dataset by creating a custom encoder for a Java class named beanClass:

import org.apache.spark.sql.SparkSession        
import com.mapr.db.spark.sql._ 
            
val ds = sparkSession.loadFromMapRDB("/tmp/user_profiles")
         .as(Encoders.bean(beanClass))
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;
            
maprSession.loadFromMapRDB("/tmp/user_profiles").as(Encoders.bean(beanClass);

Filter Pushdown

After you have loaded data into a Dataset, you can apply filter pushdowns. The following example filters on first_name:

ds.filter($"first_name" === "David")
ds.filter(col("first_name").equalTo("David")).show();

See Projection and Filter Pushdown with Apache Spark DataFrames and Datasets for other examples.