Using the Custom Partitioner with the MapR Database OJAI Connector for Apache Spark

In any distributed computing system, partitioning data is crucial to achieve the best performance. Apache Spark provides a mechanism to register a custom partitioner for partitioning the pipeline. The MapR Database OJAI Connector for Apache Spark includes a custom partitioner you can use to optimally partition data in an RDD.

The MapR Database OJAI Connector for Apache Spark's custom partitioner takes the following classes as keys:

  • String
  • ByteBuffer (as serializable ByteBuffer)

You can register this custom partitioner with either the partitionBy function or the repartitionAndSortWithinPartitions function.

The connector supports two versions of the custom partitioner. One version takes a MapR Database JSON table as an input. The partition information of the table is used to partition the data, so the saveToMapRDB call can use a bulkInsert to store the data. The bulkInsert option requires that you have the data already sorted on the _id key.

The other version of the custom partitioner takes an array of splits as an input.

Specifying tablename for the Partitioner

If you already have a table that has been created and partitioned based on a set of keys, you can can specify that the RDD be partitioned in the same way (using the same set of keys). In the following example, /srctable is provided as a reference partitioner for /dsttable:

sc.loadFromMapRDB("/srctable")
            .keyBy(doc => doc._id[String])
            .repartitionAndSortWithinPartitions(MapRDBSpark.newPartitioner[String]("/dsttable"))
            .saveToMapRDB("/dsttable", createTable = false, bulkInsert = true)

Specifying a String Seq as an Array of Splits

In the following example, the first line creates an array of splits as id1, id2 ... id9. The rest of the example splits the RDD based on the array of splits:

val dstSplits: Array[String] = (1 to 9 by 3).map("id" + _).toArray
val partitionRDD = sc.loadFromMapRDB("/srctable")
            .keyBy(doc => doc._id[String])
            .repartitionAndSortWithinPartitions(MapRDBSpark.newPartitioner[String](dstSplits))
            .saveToMapRDB("/dsttable", createTable = true, bulkInsert = true)

Specifying a ByteBuffer Seq as an Array of Splits

Suppose you have an array of byte buffers to use as the array of splits for the partitioner. You must convert the byte buffers to serializable byte buffers first:

// Converting bytebuffer to serializable bytebuffer
val dstSplits = arrayOfByteBuffer.map(x => MapRDBSpark.serializableBinaryValue(x))
sc.loadFromMapRDB("/srctable")
  //KeyBy serializable bytebuffer
  .keyBy(doc => doc.getBinarySerializable(binaryField))
  .repartitionAndSortWithinPartitions(MapRDBSpark.newPartitioner(dstSplits))
  .saveToMapRDB("/dsttable", createTable = true, bulkInsert = true)

Specifying tablename for the Partitioner with ByteBuffer as Id Fields

Suppose you have a table with keys that are binary or ByteBuffer, and you have an RDD with some rows and some values. You can repartition the RDD based on the partitions of the table. The following example reads the document from /srctable, but you could provide any table. In the second line, the example specifies a keyBy call on an ID that is binary serializable. In the last line, /dsttable is the RDD that has a key of serializable ByteBuffers:

sc.loadFromMapRDB("/srctable")
  .keyBy(doc => doc.getIdBinarySerializable())
  .repartitionAndSortWithinPartitions(MapRDBSpark.newPartitioner[ByteBuffer]("/dsttable"))
NOTE You must provide the key type of the PairedRDD on which the partitioning is specified. If the IDs are serializable bytebuffers, specify ByteBuffer. Otherwise, specify String.

After the data is partitioned with the custom partitioner, all the downstream transformations should be non-partition-changing transformations. Here is the code for passing on partitioner for an RDD:

user_profiles.repartitionAndSortWithinPartitions
   (MapRDBSpark.newPartitioner[String](<table-name>)) 

Or you can use the partitionBy function on the RDD:

user_profiles.partitionBy(MapRDBSpark.newPartitioner[String](<table-name>))

The key of the data for this partitioner should be of the same type as that of the key of the table name. This partitioner yields a single partition if the table supplied to it is not pre-split. The number of partitions is calculated based on the table’s existing tablet information.

For a table created with the bulkInsert option set to true, one of the following applies:

  • If the table is pre-split, then the resulting partitions can be > 1.
  • If the table is no-split, then the resulting partitions will be 1 if no partition information is available from the RDD lineage.