HPE Ezmeral Data Fabric Database JSON MapReduce: Sample App

This sample application reads records (JSON documents) from a JSON table and inserts new documents into another JSON table.

After reading records from a JSON table, the application aggregates data within those records, creates new JSON documents that contain the aggregated records, and then inserts the new documents into another JSON table. Each record contains the name of an author and the name of a book that the author has written.

The JSON documents have this structure:
{
    "_id" : <string or binary>,
    "authorid":"<string>",
    "name":"<string>",
    "book":{
        "id":<int>,
        "title":"<string>"
    }
}
The structure of each aggregate record will look like this:
{
    "_id" : <string or binary>,
    "authorid":"<string>",
    "book":{
        [
            "title":"<string>",
            "title":"<string>",
            ...
        ]
    }
}

Prerequisites

  • Ensure that your user ID has the -readAce and -writeAce privileges on the volumes where you plan to create the source and destination tables.
  • Create the source JSON table. You can create the source table and populate it with sample records by running sample_dataset.txt from the mapr dbshell utility.
    $ mapr dbshell < sample_dataset.txt
  • Create the destination JSON table. A simple way to create this table is to use the create command in the HPE Ezmeral Data Fabric Database Shell (JSON Tables) utility.

Compiling and Running

To compile an application, use the following command:
javac -cp <classpath> <java source file(s)>
To launch an application, use the following command:
java -cp <classpath>:. -Djava.library.path=/opt/mapr/lib <main class> <command line arguments>

To run the application, supply the paths and names of the source and destination tables as arguments:

CombineBookList <source_table> <destination_table>

Code Walkthrough

  private static Job createSubmittableJob(Configuration conf, String[] otherArgs)
      throws IOException {

    srcTable = otherArgs[0];
    destTable = otherArgs[1];

    Job job = new Job(conf, NAME + "_" + destTable);
    job.setJarByClass(CombineBookList.class);
    MapRDBMapReduceUtil.configureTableInputFormat(job, srcTable);
    job.setMapperClass(CombineBookListMapper.class);
    MapRDBMapReduceUtil.setMapOutputKeyValueClass(job);
    MapRDBMapReduceUtil.configureTableOutputFormat(job, destTable);
    job.setReducerClass(CombineBookListReducer.class);
    MapRDBMapReduceUtil.setOutputKeyValueClass(job);
    job.setNumReduceTasks(1);
    return job;
  }

The createSubmittableJob() method uses methods that are in the MapRDBMapReduceUtil class to perform the following tasks:

Set the input format to the default table input format
You can call the configureTableInputFormat() method, passing in the job and also passing in the path and name of the source table:
MapRDBMapReduceUtil.configureTableInputFormat(job, srcTable);
The default behavior is to do the following:
  • Set the serialization class for Document and Value objects. These interfaces are part of the OJAI (Open JSON Application Interface) API.
  • Set the field INPUT_TABLE in TableInputFormat to the path and name of the source table, and pass this value to the configuration for the MapReduce application.
  • Set the input format class for the job to TableInputFormat.
If you want to customize TableInputFormat , you can call it as you would normally set the input format for a job:
job.setInputFormatClass(TableInputFormat.class);
Set the type for keys and values that are output from the mapper
You can call the setMapOutputKeyValueClass() method to use the default type for keys and values:
MapRDBMapReduceUtil.setMapOutputKeyValueClass(job);
If you want to customize the output keys and values, you can call Job.setMapOutputKeyClass() and Job.setMapOuputValueClass() as you would normally for MapReduce applications.
Set the output format to the default table output format
You can call the configureTableOutputFormat() method, passing in the job and also passing in the path and name of the destination table, which must already exist at runtime:
MapRDBMapReduceUtil.configureTableOutputFormat(job, destTable);
The default behavior is to do the following:
  • Set the field OUTPUT_TABLE in TableOutputFormat to the path and name of the destination table, and pass this value to the configuration for the MapReduce applications.
  • Set the output format class for the job to TableOutputFormat.
If you want to customize TableOutputFormat, you can call it as you would normally set the output format for a job:
job.setOutputFormatClass(TableOutputFormat.class);
You also have the option of using the BulkLoadOutputFormat class for bulk loading.
Set the type of the keys and values that are output from the reducer
You can call the setOutputKeyValueClass() method to use the default type for keys and values:
MapRDBMapReduceUtil.setOutputKeyValueClass(job);
If you want to customize the output keys and values, you can call Job.setOutputKeyClass() and Job.setOuputValueClass() as you would normally for MapReduce applications.

The map() method in the mapper class CombineBookListMapper takes the value of the _id field in a document as a key and the JSON document with that _id field value as a Document. The mapper does nothing with the Value object. For each record, the mapper writes the value of the authorid field and the full JSON document itself to the context.

  public static class CombineBookListMapper extends Mapper<Value, Document, Value, Document> {
    @Override
    public void map(Value key, Document record, Context context) throws IOException, InterruptedException {
      context.write(record.getValue("authorid"), record);
    }
  }

Both the Value and Document interfaces are part of the OJAI (Open JSON Application Interface) API. The javadoc for the OJAI API is here.

The reduce() method in the reducer class CombineBookListReducer takes the map output key, which is the value of the authorid field, and the map output value, which is an iterator of Document objects that each contain a full record. For each author ID, the reducer creates a document. For each document in the iterator, the reducer extracts the value of the book field and adds that value to the list books within a new JSON document.

  public static class CombineBookListReducer extends Reducer<Value, Document, Value, Document> {

    @Override
    public void reduce(Value key, Iterable<Document> values,
        Context context) throws IOException, InterruptedException {
      Document d = MapRDB.newDocument();
      List<Document> books = new ArrayList<Document>();

      for (Document b : values) {
        books.add((Document)b.getValue("book"));
      }

      d.setId(key);
      d.set("books", books);
      context.write(key, d);
    }
  }

The MapRDB class is part of the HPE Ezmeral Data Fabric Database JSON API, not the HPE Ezmeral Data Fabric Database JSON MapReduce API.