Consumer Application for CDC JSON Data

This example consumes changed data records from MapR Database JSON tables.

Example of Consuming JSON Changed Data Records

In this example, the following occurs:
  • Initialize the consumer properties using Apache Kafka and MapR configuration parameters.
  • Display the change data record properties.
  • Iterate through the change nodes, determine the type of operation, and retrieve the operation value.
  • Retrieve the properties of individual change node (for example: data type, field name, field value, and so on) by using various methods of the ChangeDataReader interface.
  • Display the change data record values by using the ChangeNode interface.
  • Subscribe to the stream topic, consume the events, and determine record type.
For changed data records from MapR Database JSON table data, the following are unique:
  • There are multiple property values that can be retrieved through the ChangeDataReader interface. For example, getDouble or getFloat.
  • There are multiple values for single fields in documents that can be retrieved through ChangeNode interface. See the code line: Value value = changeNode.getValue();
package example.cdps;

import com.mapr.db.MapRDB;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.ojai.*;
import org.ojai.store.cdc.*;
import java.util.*;

public class CDPConsumer {

    /**
     * Initialize Basic Consumer Properties
     * @return
     */
    public Properties getBasicListnerProperties() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "mfs220.qa.lab:9211");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // Use MapR CDP Specific Deserializer to parse the change contents
        props.put("value.deserializer", "com.mapr.db.cdc.ChangeDataRecordDeserializer");
        props.put("fetch.min.bytes", "10");
        props.put("fetch.wait.max.ms", "5000");
        props.put("auto.offset.reset", "earliest");
        props.put("enable.auto.commit", "false");
        return props;
    }

    /**
     * Display Utility
     * @param consumerRecordkey
     * @param id
     * @param changeDataRecordType
     * @param recordOpTime
     * @param recordServerOpTime
     * @param field
     * @param op
     * @param changeNodeOpTime
     * @param changeNodeServerOpTime
     * @param valueType
     * @param value
     */
    public void display(String consumerRecordkey,
                        Value id,
                        ChangeDataRecordType changeDataRecordType,
                        Long recordOpTime,
                        Long recordServerOpTime,
                        String field,
                        ChangeOp op,
                        Long changeNodeOpTime,
                        Long changeNodeServerOpTime,
                        Value.Type valueType,
                        Value value) {

        Document document = MapRDB.newDocument();
        document.set("consumerRecordkey",  consumerRecordkey);

        if(id != null)
            document.set("id", id);

        if(changeDataRecordType != null)
            document.set("changeDataRecordType", changeDataRecordType.name());

        document.set("recordOpTime", recordOpTime);
        document.set("recordServerOpTime", recordServerOpTime);

        if(field != null)
            document.set("field", field);

        document.set("op", op.name());

        document.set("changeNodeOpTime", changeNodeOpTime);
        document.set("changeNodeServerOpTime", changeNodeServerOpTime);

        if(valueType != null)
            document.set("valueType", valueType.name());

        if(value != null)
            document.set("value", value);

        System.out.println("\t\n********* Propagated Change **************************\t\n");
        System.out.println("\t\n" + document.asJsonString() + "\t\n");
        System.out.println("\t\n******************************************************\t\n");
    }

    /**
     * Parse change node contents via iterator
     * @param consumerRecordkey
     * @param changeDataRecord
     */
    public void iteratorDisplay(Value id,
                                ChangeDataRecordType changeDataRecordType,
                                Long recordOpTime,
                                Long recordServerOpTime,
                                String consumerRecordkey,
                                ChangeDataRecord changeDataRecord) {

        for (KeyValue<FieldPath, ChangeNode> fieldChangePair : changeDataRecord) {

            // field if operation was done one a field
            String field = fieldChangePair.getKey().asJsonString();

            // Actual change node object, which holds change values
            ChangeNode changeNode = fieldChangePair.getValue();

            // Change Op, based on op done can be NULL, SET, MERGE, DELETE, DELETE_EXACT
            ChangeOp op = changeNode.getOp();

            // change node op time
            Long changeNodeOpTime = changeNode.getOpTimestamp();
            Long changeNodeServerOpTime  = changeNode.getServerTimestamp();

            // the value type if it was non delete operation, such as insert replace etc
            Value.Type valueType = changeNode.getType();

            // value of the operation such as insert value or replace
            Value value = changeNode.getValue();

            // display the change contents
            display(consumerRecordkey, id, changeDataRecordType, recordOpTime, recordServerOpTime,
                    field, op, changeNodeOpTime, changeNodeServerOpTime, valueType, value);
        }
    }

    /**
     * Get Parsed Value
     * @param changeDataReader
     * @param field
     * @param valueType
     * @return
     */
    public Value getValue(ChangeDataReader changeDataReader, String field, Value.Type valueType) {
        Document valDoc = MapRDB.newDocument();

        if(field == null) {
            valDoc.setNull("null");
            field = "null";
        }

        switch (valueType) {
            case NULL:
                valDoc.setNull(field);
                break;
            case BOOLEAN:
                valDoc.set(field, changeDataReader.getBoolean());
                break;
            case STRING:
                valDoc.set(field, changeDataReader.getString());
                break;
            case SHORT:
                valDoc.set(field, changeDataReader.getShort());
                break;
            case BYTE:
                valDoc.set(field, changeDataReader.getByte());
                break;
            case INT:
                valDoc.set(field, changeDataReader.getInt());
                break;
            case LONG:
                valDoc.set(field, changeDataReader.getLong());
                break;
            case FLOAT:
                valDoc.set(field, changeDataReader.getFloat());
                break;
            case DOUBLE:
                valDoc.set(field, changeDataReader.getDouble());
                break;
            case DECIMAL:
                valDoc.set(field, changeDataReader.getDecimal());
                break;
            case DATE:
                valDoc.set(field, changeDataReader.getDate());
                break;
            case TIME:
                valDoc.set(field, changeDataReader.getTime());
                break;
            case TIMESTAMP:
                valDoc.set(field, changeDataReader.getTimestamp());
                break;
            case INTERVAL:
                valDoc.set(field, changeDataReader.getInterval());
                break;
            case BINARY:
                valDoc.set(field, changeDataReader.getBinary());
                break;
            default:
                break;
        }
        return valDoc.getValue(field);
    }
    /**
     * Parse change node contents via reader
     * @param consumerRecordkey
     * @param changeDataRecord
     */
    public void readerDisplay(Value id,
                              ChangeDataRecordType changeDataRecordType,
                              Long recordOpTime,
                              Long recordServerOpTime,
                              String consumerRecordkey,
                              ChangeDataRecord changeDataRecord) {
        System.out.println("Reader");
        ChangeEvent changeEvent;
        // get reader from the event
        ChangeDataReader changeDataReader = changeDataRecord.getReader();

        while ((changeEvent = changeDataReader.next()) != null) {
            // parse through change events
            switch (changeEvent) {
                case NODE:
                    System.out.println("node event get the value type");
                    Value.Type valueType = changeDataReader.getType();
                    String field = changeDataReader.getFieldName();
                    Long serverTimestamp = changeDataReader.getServerTimestamp();
                    Long opTimestamp = changeDataReader.getOpTimestamp();
                    ChangeOp op = changeDataReader.getOp();
                    Value value = getValue(changeDataReader, field, valueType);

                    display(consumerRecordkey, id, changeDataRecordType,
                        recordOpTime, recordServerOpTime, field, op, opTimestamp,
                            serverTimestamp, valueType, value);
                    break;
            }
            break;
        }
    }

    /**
     * Consume from changelog topics
     * @param pollTimeout
     * @param topics
     */
    public void consume(long pollTimeout, String topics, boolean method) {
        System.out.println("consume...");
        // initialize consumer
        KafkaConsumer<String, ChangeDataRecord> consumer = new KafkaConsumer<String, ChangeDataRecord>
                (getBasicListnerProperties());

        // subscribe to /stream:topic
        List<String> topicList = new ArrayList<String>();
        topicList.add(topics);
        consumer.subscribe(topicList);
        consumer.seekToBeginning();

        // Get consumer records
        ConsumerRecords<String, ChangeDataRecord> consumerRecords = consumer.poll(pollTimeout);

        // iterate over consumer records
        for(ConsumerRecord<String, ChangeDataRecord> consumerRecord: consumerRecords) {

            String consumerRecordkey = consumerRecord.key().trim();
            ChangeDataRecord changeDataRecord = consumerRecord.value();

            // record key for the change
            Value id = changeDataRecord.getId();

            // record level op can be either RECORD_INSERT, RECORD_UPDATE, RECORD_DELETE
            ChangeDataRecordType changeDataRecordType = changeDataRecord.getType();

            // record level op-time & server op-time
            Long recordOpTime = changeDataRecord.getOpTimestamp();
            Long recordServerOpTime = changeDataRecord.getServerTimestamp();

            if(method) {
                // Method 1 - via iterator interface
                iteratorDisplay(id, changeDataRecordType,
                        recordOpTime, recordServerOpTime,
                        consumerRecordkey, changeDataRecord);
            } else {
                // Method 2 - via reader interface
                readerDisplay(id, changeDataRecordType,
                        recordOpTime, recordServerOpTime,
                        consumerRecordkey, changeDataRecord);
            }
        }
        consumer.close();
    }

    /**
     * Driver
     * @param args
     */
    public static void main(String[] args) {
        Long pollTimeout = Long.parseLong(args[0]);
        String topic = args[1];
        boolean method = Boolean.parseBoolean(args[2]);
        CDPConsumer cdpConsumer = new CDPConsumer();
        cdpConsumer.consume(pollTimeout, topic, method);
    }
}