Loading Data from MapR Database as an Apache Spark Dataset
You can use one of three ways to load data from MapR Database into an Apache Spark Dataset:
- Load the data into a Dataset.
- Load the data into a DataFrame, and then convert it to a Dataset.
- Load the data into a Dataset using a custom encoder.
Load into a Dataset
SparkSession
object:
def loadFromMapRDB[T](table: String, schema : StructType).as [T]: Dataset
import com.mapr.db.spark.sql._
val ds = sparkSession.loadFromMapRDB[T]("/tmp/user_profiles").as [T]: Dataset
MapRDBJavaSession
object:
def loadFromMapRDB[T <: java.lang.Object](tableName: String, schema: StructType, sampleSize: Double, clazz: Class[T]): Dataset[T]
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;
MapRDBJavaSession maprSession = new MapRDBJavaSession(sparkSession);
Dataset<Row> ds = maprSession.loadFromMapRDB("/tmp/user_profiles");
Load into DataFrame and Convert to Dataset
To load the data as a DataFrame, see Loading Data from MapR Database as an Apache Spark DataFrame. To convert the DataFrame
to a Dataset, use the as[<type>]
method. The <type>
can be any of the basic types in Scala.
The following code example creates a Dataset[Person]
using the
as[<type>]
method:
import org.apache.spark.sql.SparkSession
import com.mapr.db.spark.sql._
case class Address(Pin: Integer, street: String, city: String)
case class Person (_id:String,
first_name:String,
last_name: String, dob: java.sql.Date,
Interests: Seq[String, address: Address)
val ds = sparkSession.loadFromMapRDB[Person]("/tmp/user_profiles").as[Person]
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;
public static class Address implements Serializable {
private Integer pin;
private String street;
private String city;
public Integer getPin() { return pin; }
public void setPin(Integer pin) { this.pin = pin; }
public String getStreet() { return street; }
public void setStreet(String street) { this.street = street; }
public String getCity() { return city; }
public void setCity(String city) { this.city = city; }
}
public static class Person implements Serializable {
private String _id;
private String firstName;
private String lastName;
private Date dob;
private Seq<String> interests;
public String get_id() { return _id; }
public void set_id(String _id) { this._id = _id; }
public String getFirstName() { return firstName; }
public void setFirstName(String firstName) { this.firstName = firstName; }
public String getLastName() { return lastName; }
public void setLastName(String lastName) { this.lastName = lastName; }
public Date getDob() { return dob; }
public void setDob(Date dob) { this.dob = dob; }
public Seq<String> getInterests() { return interests; }
public void setInterests(Seq<String> interests) { this.interests = interests; }
}
Dataset<Person> ds = maprSession.loadFromMapRDB(tableName, Person.class);
Load into Dataset Using Custom Encoder
You can create a custom encoder for Java bean classes by calling the
Encoders.bean
method. Encoders.bean
only support Java classes. To create
a Dataset of the Scala class, the previous code can be used. The following example shows how
to load into a Dataset by creating a custom encoder for a Java class named
beanClass
:
import org.apache.spark.sql.SparkSession
import com.mapr.db.spark.sql._
val ds = sparkSession.loadFromMapRDB("/tmp/user_profiles")
.as(Encoders.bean(beanClass))
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;
maprSession.loadFromMapRDB("/tmp/user_profiles").as(Encoders.bean(beanClass);
Filter Pushdown
After you have loaded data into a Dataset, you can apply filter pushdowns. The following
example filters on first_name
:
ds.filter($"first_name" === "David")
ds.filter(col("first_name").equalTo("David")).show();
See Projection and Filter Pushdown with Apache Spark DataFrames and Datasets for other examples.