Projection and Filter Pushdown with Apache Spark DataFrames and Datasets

Projection and filter pushdown improve query performance. When you apply the select and filter methods on DataFrames and Datasets, the HPE Ezmeral Data Fabric Database OJAI Connector for Apache Spark pushes these elements to HPE Ezmeral Data Fabric Database where possible.

Projection Pushdown

Projection pushdown minimizes data transfer between HPE Ezmeral Data Fabric Database and the Apache Spark engine by omitting unnecessary fields from table scans. It is especially beneficial when a table contains many columns.

When you invoke the following select method on a DataFrame, the connector pushes the projection:

import org.apache.spark.sql.SparkSession
import com.mapr.db.spark.sql._ 

val df = sparkSession.loadFromMapRDB("/tmp/user_profiles")
df.select("_id", "first_name", "last_name")
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;
            
MapRDBJavaSession maprSession = new MapRDBJavaSession(sparkSession);
Dataset<Row> df = maprSession.loadFromMapRDB("/tmp/user_profiles");
df.select("_id", "first_name", "last_name");
from pyspark.sql import SparkSession
            
df = spark_session.loadFromMapRDB("/tmp/user_profiles")
df.select("_id", "first_name", "last_name")      

The equivalent example using Datasets is as follows:

import org.apache.spark.sql.SparkSession
import com.mapr.db.spark.sql._ 
            
val ds = sparkSession.loadFromMapRDB[Person]("/tmp/user_profiles").as[Person]
ds.select("_id", "first_name", "last_name")
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;
            
MapRDBJavaSession maprSession = new MapRDBJavaSession(sparkSession);
Dataset<Row> ds = maprSession.loadFromMapRDB("/tmp/user_profiles", Person.class);
ds.select("_id", "first_name", "last_name");       

Filter Pushdown

Filter pushdown improves performance by reducing the amount of data passed between HPE Ezmeral Data Fabric Database and the Apache Spark engine when filtering data.

Consider the following example:

import org.apache.spark.sql.SparkSession
import com.mapr.db.spark.sql._ 
            
val df = sparkSession.loadFromMapRDB("/tmp/user_profiles")
df.filter("first_name = 'Bill'") 
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;
            
MapRDBJavaSession maprSession = new MapRDBJavaSession(spark);
Dataset<Row> df = maprSession.loadFromMapRDB("/tmp/user_profiles");
df.filter("first_name = 'Bill'")
from pyspark.sql import SparkSession
            
df = spark_session.loadFromMapRDB("/tmp/user_profiles")
df.filter("first_name = 'Bill'")

The HPE Ezmeral Data Fabric Database OJAI Connector for Apache Spark pushes the filter firstName = 'Bill' down to HPE Ezmeral Data Fabric Database.

The equivalent example using Datasets is as follows:

import org.apache.spark.sql.SparkSession
import com.mapr.db.spark.sql._ 
            
val ds = sparkSession.loadFromMapRDB[Person]("/tmp/user_profiles").as[Person]
ds.filter($"first_name" === "Bill")
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;
            
Dataset ds =  maprSession.loadFromMapRDB("/tmp/user_profiles").as(Encoders.bean(Person.getClass()));
ds.filter(col("first_name").equalTo("Bill"));

The following DataFrame filters those rows in which first_name is either "David" or "Peter":

df.filter($"first_name" === "David" || $"first_name" === "Peter")
df.filter(col("first_name").equalTo("David").or(col("first_name").equalTo("Peter")))
df.filter((col("first_name") == "David") | (col("first_name") == "Peter"))

The following DataFrame retrieves only the rows in which the first_name is "David" and the last_name is "Jones":

df.filter($"first_name" === "David" && $"last_name" === "Jones")
df.filter(col("first_name").equalTo("David").and(col("last_name").equalTo("Jones")))
df.filter((col("first_name") == "David") & (col("last_name") == "Jones"))
The following uses a not condition to return rows where the first_name is not "David" and the last_name is not "Peter":
df.filter(not($"first_name" === "David || $"last_name" === "Peter"))
df.filter(not(col("first_name").equalTo("David").or(col("last_name").equalTo("Peter"))))
df.filter(~((col("first_name") == "David") | (col("last_name") == "Peter")))

The HPE Ezmeral Data Fabric Database OJAI Connector pushes down all of the filters shown in the earlier examples. It can push down the following types of filters, provided that the field is not an Array or Map:

  • Equal To (=)
  • Not Equal To (!=)
  • Less Than (<)
  • Less Than or Equal To (<=)
  • Greater Than (>)
  • Greater Than or Equal To (>=)
  • In Predicate (IN)
  • Like predicate (LIKE)
  • AND, OR
  • NOT

Restrictions

Pushdowns with DataFrames and Datasets are not supported in the following scenarios:

  • Filters on complex types, including arrays, maps, and structs
    For example, a filter on a field in a map, as shown in the following example, is not pushed down:
    df.filter($"address.city" === "Milpitas")
    df.filter(col("address.city").equalTo("Milpitas"));
    df.filter(col("address.city") == "Milpitas")
  • Filters with functions sizeof, typeof, and matches

    Spark SQL does not support these functions.

  • Projections on complex types, including arrays, maps, and structs
    For example, if you select an element of an array, as shown in the following example, it is not pushed down:
    ds.select($"hobbies" (0))
    df.select(col("hobbies").getItem(0));
    df.select(col("hobbies").getItem(0))
These limitations do not apply to pushdowns on RDDs. An alternative is to apply the pushdown using an RDD, and then convert the RDD to a DataFrame.
NOTE HPE Ezmeral Data Fabric Database 6.0 introduces support for Secondary Indexes, but the HPE Ezmeral Data Fabric Database OJAI Connector for Spark does not currently leverage them.