HPE Ezmeral Data Fabric Database Binary Connector for Apache Spark Integration with Basic Spark

This page describes integration between Apache Spark and HBase APIs.

This section describes Spark integration with HBase APIs at the lowest and simplest levels. All other interaction points are built upon the concepts described here.

At the root of all integration with Spark and HBase APIs is the HBaseContext. The HBaseContext takes in HBase configurations and pushes them to the Spark executors. This allows you to have an HBase Connection per Spark executor in a static location.

HBaseContext Usage Example

This example shows how HBaseContext can be used to do a foreachPartition on an RDD in Scala:
val sc = new SparkContext("local", "test")
val config = new HbaseConfiguration()
...
val hbaseContext = new HBaseContext(sc, config)

rdd.hbaseForeachPartition(hbaseContext, (it, conn) => {
 val bufferedMutator = conn.getBufferedMutator(TableName.valueOf("/apps/my_table"))
 it.foreach((putRecord) => {
	val put = new Put(putRecord._1)
	putRecord._2.foreach((putValue) => 
		put.addColumn(putValue._1,
		putValue._2, putValue._3))
	bufferedMutator.mutate(put)
 })
 bufferedMutator.flush()
 bufferedMutator.close()
})
Here is the same example implemented in Java:
JavaSparkContext jsc = new JavaSparkContext(sparkConf);

try {
  List<byte[]> list = new ArrayList<>();
  list.add(Bytes.toBytes("1"));
  ...
  list.add(Bytes.toBytes("5"));

  JavaRDD<byte[]> rdd = jsc.parallelize(list);
  Configuration conf = HBaseConfiguration.create();
  JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);

  hbaseContext.foreachPartition(
	rdd,
	new VoidFunction<Tuple2<Iterator<byte[]>, Connection>>() {
   public void call(Tuple2<Iterator<byte[]>, Connection> t) throws Exception {
	Table table = t._2().getTable(TableName.valueOf(tableName));
	BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName));
    while (t._1().hasNext()) {
      byte[] b = t._1().next();
      Result r = table.get(new Get(b));
      if (r.getExists()) {
       mutator.mutate(new Put(b));
      }
    }

    mutator.flush();
    mutator.close();
    table.close();
   }
  });
} finally {
  jsc.stop();
}

All functionality between Spark and HBase Client is supported both in Scala and in Java, with the exception of SparkSQL, which supports any language that is supported by Spark. This section focuses on Scala examples.

The example here shows how to do a foreachPartition with a connection. A number of other Spark base functions are supported out of the box:

bulkPut Enables massively parallel sending of puts to HBase.
bulkDelete Enables massively parallel sending of deletes to HBase.
bulkGet Enables massively parallel sending of gets to HBase to create a new RDD.
mapPartition Enables the Spark Map function with a Connection object to allow full access to HBase.
hBaseRDD Simplifies a distributed scan to create an RDD.

You can see examples of these commands in the source code of the HBase-Spark Module.