Consumer Application for CDC Binary Data

This example consumes changed data records from MapR Database Binary tables.

Example of Consuming Binary Changed Data Records

In this example, the following occurs:
  • Initialize the consumer properties using Apache Kafka and MapR Data Platform configuration parameters.
  • Display the change data record properties.
  • Iterate through the change nodes, determine the type of operation, and retrieve the operation value.
  • Display the change data record values by using the ChangeNode interface.
  • Subscribe to the stream topic, consume the events, and determine record type.
For changed data records from MapR Database Binary table data, the following are unique:
  • An additional package must be imported: java.nio.ByteBuffer
  • There is single value for single fields in documents that can be retrieved through ChangeNode interface. See the code line: ByteBuffer value = changeNode.getBinary();
package com.mapr.qa.cdc.tests.binary;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.ojai.*;
import org.ojai.store.Connection;
import org.ojai.store.Driver;
import org.ojai.store.DriverManager;
import org.ojai.store.cdc.*;

import java.nio.ByteBuffer;
import java.util.*;

public class CDCBinaryExample {

    /**
     * Initialize Basic Consumer Properties
     *
     * @return
     */
    public Properties getBasicListnerProperties() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "broker:9092");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // Use MapR CDC Specific Deserializer to parse the change contents
        props.put("value.deserializer", "com.mapr.db.cdc.ChangeDataRecordDeserializer");
        props.put("fetch.min.bytes", "10");
        props.put("fetch.wait.max.ms", "5000");
        props.put("auto.offset.reset", "earliest");
        return props;
    }

    /**
     * Display Utility
     *
     * @param consumerRecordkey
     * @param id
     * @param changeDataRecordType
     * @param recordOpTime
     * @param recordServerOpTime
     * @param field
     * @param op
     * @param changeNodeOpTime
     * @param changeNodeServerOpTime
     * @param valueType
     * @param value
     */
    public void display(String consumerRecordkey,
                        Value id,
                        ChangeDataRecordType changeDataRecordType,
                        Long recordOpTime,
                        Long recordServerOpTime,
                        String field,
                        ChangeOp op,
                        Long changeNodeOpTime,
                        Long changeNodeServerOpTime,
                        Value.Type valueType,
                        ByteBuffer value) {

        Connection mConnection = DriverManager.getConnection("ojai:mapr:");
        Driver mDriver = mConnection.getDriver();
        Document document = mDriver.newDocument();
        document.set("consumerRecordkey", consumerRecordkey);

        if (id != null)
            document.set("id", id);

        if (changeDataRecordType != null)
            document.set("changeDataRecordType", changeDataRecordType.name());

        document.set("recordOpTime", recordOpTime);
        document.set("recordServerOpTime", recordServerOpTime);

        if (field != null)
            document.set("field", field);

        document.set("op", op.name());

        document.set("changeNodeOpTime", changeNodeOpTime);
        document.set("changeNodeServerOpTime", changeNodeServerOpTime);

        if (valueType != null)
            document.set("valueType", valueType.name());

        if (value != null)
            document.set("value", new String(value.array()));

        System.out.println("\t\n********* Propagated Change **************************\t\n");
        System.out.println("\t\n" + document.asJsonString() + "\t\n");
        System.out.println("\t\n******************************************************\t\n");
    }

    /**
     * Parse change node contents via iterator
     *
     * @param consumerRecordkey
     * @param changeDataRecord
     */
    public void iteratorDisplay(Value id,
                                ChangeDataRecordType changeDataRecordType,
                                Long recordOpTime,
                                Long recordServerOpTime,
                                String consumerRecordkey,
                                ChangeDataRecord changeDataRecord) {

        for (KeyValue<FieldPath, ChangeNode> fieldChangePair : changeDataRecord) {

            // field if operation was done on a field
            String field = fieldChangePair.getKey().asJsonString();

            // Actual change node object, which holds change values
            ChangeNode changeNode = fieldChangePair.getValue();

            // Change Op, based on op done can be NULL, PUT, DELETE, DELETE_EXACT
            ChangeOp op = changeNode.getOp();

            // change node op time
            Long changeNodeOpTime = changeNode.getOpTimestamp();
            Long changeNodeServerOpTime = changeNode.getServerTimestamp();

            // the value type BINARY, if it is non delete operation
            Value.Type valueType = changeNode.getType();

            // value of the operation
            ByteBuffer value = changeNode.getBinary();

            // display the change contents
            display(consumerRecordkey, id, changeDataRecordType, recordOpTime, recordServerOpTime,
                    field, op, changeNodeOpTime, changeNodeServerOpTime, valueType, value);
        }
    }

    /**
     * Parse change node contents via reader
     *
     * @param consumerRecordkey
     * @param changeDataRecord
     */
    public void readerDisplay(Value id,
                              ChangeDataRecordType changeDataRecordType,
                              Long recordOpTime,
                              Long recordServerOpTime,
                              String consumerRecordkey,
                              ChangeDataRecord changeDataRecord) {

        ChangeEvent changeEvent;
        // get reader from the event
        ChangeDataReader changeDataReader = changeDataRecord.getReader();

        while ((changeEvent = changeDataReader.next()) != null) {
            // parse through change events
            switch (changeEvent) {
                case NODE:
                    System.out.println("node event get the value type");
                    Value.Type valueType = changeDataReader.getType();
                    String field = changeDataReader.getFieldName();
                    Long serverTimestamp = changeDataReader.getServerTimestamp();
                    Long opTimestamp = changeDataReader.getOpTimestamp();
                    ChangeOp op = changeDataReader.getOp();
                    ByteBuffer value = changeDataReader.getBinary();

                    display(consumerRecordkey, id, changeDataRecordType,
                            recordOpTime, recordServerOpTime, field, op, opTimestamp,
                            serverTimestamp, valueType, value);
                    break;
            }
            break;
        }
    }

    /**
     * Consume from changelog topics
     *
     * @param pollTimeout
     * @param topics
     */
    public void consume(long pollTimeout, String topics, boolean method) {
        System.out.println("consume...");
        // initialize consumer
        KafkaConsumer<String, ChangeDataRecord> consumer = new KafkaConsumer<String, ChangeDataRecord>
                (getBasicListnerProperties());

        // subscribe to /stream:topic
        List<String> topicList = new ArrayList<String>();
        topicList.add(topics);
        consumer.subscribe(topicList);
        consumer.seekToBeginning();

        // Get consumer records
        ConsumerRecords<String, ChangeDataRecord> consumerRecords = consumer.poll(pollTimeout);

        // iterate over consumer records
        for (ConsumerRecord<String, ChangeDataRecord> consumerRecord : consumerRecords) {

            String consumerRecordkey = consumerRecord.key().trim();
            ChangeDataRecord changeDataRecord = consumerRecord.value();

            // record key for the change
            Value id = changeDataRecord.getId();

            // record level op can be either RECORD_INSERT, RECORD_UPDATE, RECORD_DELETE
            ChangeDataRecordType changeDataRecordType = changeDataRecord.getType();

            // record level op-time & server op-time
            Long recordOpTime = changeDataRecord.getOpTimestamp();
            Long recordServerOpTime = changeDataRecord.getServerTimestamp();

            if (method) {
                // Method 1 - via iterator interface
                iteratorDisplay(id, changeDataRecordType,
                        recordOpTime, recordServerOpTime,
                        consumerRecordkey, changeDataRecord);
            } else {
                // Method 2 - via reader interface
                readerDisplay(id, changeDataRecordType,
                        recordOpTime, recordServerOpTime,
                        consumerRecordkey, changeDataRecord);
            }
        }
        consumer.close();
    }

    /**
     * Driver
     *
     * @param args
     */
    public static void main(String[] args) {
        Long pollTimeout = Long.parseLong(args[0]);
        String topic = args[1];
        boolean method = Boolean.parseBoolean(args[2]);
        CDCBinaryExample cdcBinaryExample = new CDCBinaryExample();
        cdcBinaryExample.consume(pollTimeout, topic, method);
    }
}