Consumer Application for CDC Binary Data
This example consumes changed data records from MapR Database Binary tables.
Example of Consuming Binary Changed Data Records
In this example, the following occurs:
For changed data records from MapR Database Binary table data, the following are unique: - Initialize the consumer properties using Apache Kafka and MapR Data Platform configuration parameters.
- Display the change data record properties.
- Iterate through the change nodes, determine the type of operation, and retrieve the operation value.
- Display the change data record values by using the ChangeNode interface.
- Subscribe to the stream topic, consume the events, and determine record type.
- An additional package must be imported:
java.nio.ByteBuffer
- There is single value for single fields in documents that can be retrieved through
ChangeNode interface. See the code line:
ByteBuffer value = changeNode.getBinary();
package com.mapr.qa.cdc.tests.binary;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.ojai.*;
import org.ojai.store.Connection;
import org.ojai.store.Driver;
import org.ojai.store.DriverManager;
import org.ojai.store.cdc.*;
import java.nio.ByteBuffer;
import java.util.*;
public class CDCBinaryExample {
/**
* Initialize Basic Consumer Properties
*
* @return
*/
public Properties getBasicListnerProperties() {
Properties props = new Properties();
props.put("bootstrap.servers", "broker:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Use MapR CDC Specific Deserializer to parse the change contents
props.put("value.deserializer", "com.mapr.db.cdc.ChangeDataRecordDeserializer");
props.put("fetch.min.bytes", "10");
props.put("fetch.wait.max.ms", "5000");
props.put("auto.offset.reset", "earliest");
return props;
}
/**
* Display Utility
*
* @param consumerRecordkey
* @param id
* @param changeDataRecordType
* @param recordOpTime
* @param recordServerOpTime
* @param field
* @param op
* @param changeNodeOpTime
* @param changeNodeServerOpTime
* @param valueType
* @param value
*/
public void display(String consumerRecordkey,
Value id,
ChangeDataRecordType changeDataRecordType,
Long recordOpTime,
Long recordServerOpTime,
String field,
ChangeOp op,
Long changeNodeOpTime,
Long changeNodeServerOpTime,
Value.Type valueType,
ByteBuffer value) {
Connection mConnection = DriverManager.getConnection("ojai:mapr:");
Driver mDriver = mConnection.getDriver();
Document document = mDriver.newDocument();
document.set("consumerRecordkey", consumerRecordkey);
if (id != null)
document.set("id", id);
if (changeDataRecordType != null)
document.set("changeDataRecordType", changeDataRecordType.name());
document.set("recordOpTime", recordOpTime);
document.set("recordServerOpTime", recordServerOpTime);
if (field != null)
document.set("field", field);
document.set("op", op.name());
document.set("changeNodeOpTime", changeNodeOpTime);
document.set("changeNodeServerOpTime", changeNodeServerOpTime);
if (valueType != null)
document.set("valueType", valueType.name());
if (value != null)
document.set("value", new String(value.array()));
System.out.println("\t\n********* Propagated Change **************************\t\n");
System.out.println("\t\n" + document.asJsonString() + "\t\n");
System.out.println("\t\n******************************************************\t\n");
}
/**
* Parse change node contents via iterator
*
* @param consumerRecordkey
* @param changeDataRecord
*/
public void iteratorDisplay(Value id,
ChangeDataRecordType changeDataRecordType,
Long recordOpTime,
Long recordServerOpTime,
String consumerRecordkey,
ChangeDataRecord changeDataRecord) {
for (KeyValue<FieldPath, ChangeNode> fieldChangePair : changeDataRecord) {
// field if operation was done on a field
String field = fieldChangePair.getKey().asJsonString();
// Actual change node object, which holds change values
ChangeNode changeNode = fieldChangePair.getValue();
// Change Op, based on op done can be NULL, PUT, DELETE, DELETE_EXACT
ChangeOp op = changeNode.getOp();
// change node op time
Long changeNodeOpTime = changeNode.getOpTimestamp();
Long changeNodeServerOpTime = changeNode.getServerTimestamp();
// the value type BINARY, if it is non delete operation
Value.Type valueType = changeNode.getType();
// value of the operation
ByteBuffer value = changeNode.getBinary();
// display the change contents
display(consumerRecordkey, id, changeDataRecordType, recordOpTime, recordServerOpTime,
field, op, changeNodeOpTime, changeNodeServerOpTime, valueType, value);
}
}
/**
* Parse change node contents via reader
*
* @param consumerRecordkey
* @param changeDataRecord
*/
public void readerDisplay(Value id,
ChangeDataRecordType changeDataRecordType,
Long recordOpTime,
Long recordServerOpTime,
String consumerRecordkey,
ChangeDataRecord changeDataRecord) {
ChangeEvent changeEvent;
// get reader from the event
ChangeDataReader changeDataReader = changeDataRecord.getReader();
while ((changeEvent = changeDataReader.next()) != null) {
// parse through change events
switch (changeEvent) {
case NODE:
System.out.println("node event get the value type");
Value.Type valueType = changeDataReader.getType();
String field = changeDataReader.getFieldName();
Long serverTimestamp = changeDataReader.getServerTimestamp();
Long opTimestamp = changeDataReader.getOpTimestamp();
ChangeOp op = changeDataReader.getOp();
ByteBuffer value = changeDataReader.getBinary();
display(consumerRecordkey, id, changeDataRecordType,
recordOpTime, recordServerOpTime, field, op, opTimestamp,
serverTimestamp, valueType, value);
break;
}
break;
}
}
/**
* Consume from changelog topics
*
* @param pollTimeout
* @param topics
*/
public void consume(long pollTimeout, String topics, boolean method) {
System.out.println("consume...");
// initialize consumer
KafkaConsumer<String, ChangeDataRecord> consumer = new KafkaConsumer<String, ChangeDataRecord>
(getBasicListnerProperties());
// subscribe to /stream:topic
List<String> topicList = new ArrayList<String>();
topicList.add(topics);
consumer.subscribe(topicList);
consumer.seekToBeginning();
// Get consumer records
ConsumerRecords<String, ChangeDataRecord> consumerRecords = consumer.poll(pollTimeout);
// iterate over consumer records
for (ConsumerRecord<String, ChangeDataRecord> consumerRecord : consumerRecords) {
String consumerRecordkey = consumerRecord.key().trim();
ChangeDataRecord changeDataRecord = consumerRecord.value();
// record key for the change
Value id = changeDataRecord.getId();
// record level op can be either RECORD_INSERT, RECORD_UPDATE, RECORD_DELETE
ChangeDataRecordType changeDataRecordType = changeDataRecord.getType();
// record level op-time & server op-time
Long recordOpTime = changeDataRecord.getOpTimestamp();
Long recordServerOpTime = changeDataRecord.getServerTimestamp();
if (method) {
// Method 1 - via iterator interface
iteratorDisplay(id, changeDataRecordType,
recordOpTime, recordServerOpTime,
consumerRecordkey, changeDataRecord);
} else {
// Method 2 - via reader interface
readerDisplay(id, changeDataRecordType,
recordOpTime, recordServerOpTime,
consumerRecordkey, changeDataRecord);
}
}
consumer.close();
}
/**
* Driver
*
* @param args
*/
public static void main(String[] args) {
Long pollTimeout = Long.parseLong(args[0]);
String topic = args[1];
boolean method = Boolean.parseBoolean(args[2]);
CDCBinaryExample cdcBinaryExample = new CDCBinaryExample();
cdcBinaryExample.consume(pollTimeout, topic, method);
}
}