Loading Data into a DataFrame Using Schema Inference

If you do not know the schema of the data, you can use schema inference to load data into a DataFrame. This section describes how to use schema inference and restrictions that apply

When you do not specify a schema or a type when loading data, schema inference triggers automatically. The HPE Ezmeral Data Fabric Database OJAI Connector for Apache Spark internally samples documents from the HPE Ezmeral Data Fabric Database JSON table and determines a schema based on that data sample. By default, the sample size is 1000 documents. Alternatively, you can specify a sample size parameter. The parameter is optional in the loadFromMapRDB call and is named sampleSize. The following example specifies using a sample size of 100 documents:

import org.apache.spark.sql.SparkSession
import com.mapr.db.spark.sql._

val df = sparkSession.loadFromMapRDB(tableName, sampleSize : 100)
import com.mapr.db.spark.sql.api.java.MapRDBJavaSession;
import org.apache.spark.sql.SparkSession;
          
MapRDBJavaSession maprSession = new MapRDBJavaSession(spark);
Dataset<Row> df = maprSession.loadFromMapRDB(tableName, 100);
from pyspark.sql import SparkSession 
          
df = spark.loadFromMapRDB(table_name, 100)
IMPORTANT Because schema inference relies on data sampling, it is non-deterministic. It is not well suited for production use where you need predictable results. Inferring schema results in reading sample rows from the table, hence execution time varies with number of rows in the source table.

Sampling Using Reader Functions

An alternative to sampling data using the loadFromMapRDB call is to use reader functions.

To use the DataFrame reader function (for Scala only), call the following methods:

val df = sparkSession.read.maprdb(tableName)

To use the reader function with basic Spark, call the read function on a SQLContext object as follows:

import org.apache.spark.sql.SQLContext
            
val df = sqlContext.read.format("com.mapr.db.spark.sql")
         .option("tableName", <table-name>)
         .option("sampleSize", 100).load()
import org.apache.spark.sql.SQLContext;
            
Dataset<Row> df = sqlContext.read()
                  .format("com.mapr.db.spark.sql")
                  .option("tableName", <table-name>).load();
from pyspark.sql import SQLContext
            
df = sql_context.read\
     .format("com.mapr.db.spark.sql.DefaultSource")\
     .option("tableName", <table-name>).load()

Type Conflict Resolution When Sampling

When sampling data during schema inference, you might encounter conflicting value types within a field. The connector uses the following rules to resolve type conflicts:

  • If the two conflicting types are each one of the following, the resolved type is the wider of the two types:
    • ByteType
    • ShortType
    • IntegerType
    • LongType
    • FloatType
    • DoubleType

    The type list above is arranged in increasing order of width. For example, if one document contains a field of type ByteType and the other contains a field of type FloatType, the resultant type is FloatType.

  • If one of the types is DecimalType, then the resultant type is DecimalType, if and only if DecimalType is the wider of the two types.
  • If the two types are StructType, each with different fields, then the resultant type is a new StructType that contains all the fields in each StructType.
  • If the two types are ArrayType, each with different element types, then the resultant type is a new ArrayType where the type of the elements in the array is resolved using the aforementioned rules.
  • If none of the above rules can be used for resolving type conflicts, then during data conversion, the load reports a ConflictType exception.

Suppose Name contains String values in some rows and a map with first_name and last_name as nested fields in other rows. During schema inference, the conflict resolution logic encounters two different types for the same field, StringType and MapType. It will note the conflict and return a ConflictType exception later when converting the data during the load.

By default, conflict exceptions occur during data conversion. To change this so that the exception is returned during the conflict resolution stage, set the FailOnConflict option to true :

val df = spark.read.maprdb(<tableName>, Map("sampleSize" -> 100, "FailOnConflict" -> true))

Invalid Schemas

When using schema inference, missing and extra fields are resolved in the following ways:

  • If a field in the inferred schema is missing in the HPE Ezmeral Data Fabric Database JSON document, the field is set to null.
  • If there are fields in a HPE Ezmeral Data Fabric Database JSON document that are not in the inferred schema, the load returns an InvalidSchema exception.