Consumer Application for CDC JSON Data
This example consumes changed data records from MapR Database JSON tables.
Example of Consuming JSON Changed Data Records
In this example, the
following occurs:
For changed data records from MapR Database JSON table data, the following are unique: - Initialize the consumer properties using Apache Kafka and MapR configuration parameters.
- Display the change data record properties.
- Iterate through the change nodes, determine the type of operation, and retrieve the operation value.
- Retrieve the properties of individual change node (for example: data type, field name, field value, and so on) by using various methods of the ChangeDataReader interface.
- Display the change data record values by using the ChangeNode interface.
- Subscribe to the stream topic, consume the events, and determine record type.
- There are multiple property values that can be retrieved through the ChangeDataReader interface. For example, getDouble or getFloat.
- There are multiple values for single fields in documents that can be retrieved through
ChangeNode interface. See the code line:
Value value = changeNode.getValue();
package example.cdps;
import com.mapr.db.MapRDB;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.ojai.*;
import org.ojai.store.cdc.*;
import java.util.*;
public class CDPConsumer {
/**
* Initialize Basic Consumer Properties
* @return
*/
public Properties getBasicListnerProperties() {
Properties props = new Properties();
props.put("bootstrap.servers", "mfs220.qa.lab:9211");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Use MapR CDP Specific Deserializer to parse the change contents
props.put("value.deserializer", "com.mapr.db.cdc.ChangeDataRecordDeserializer");
props.put("fetch.min.bytes", "10");
props.put("fetch.wait.max.ms", "5000");
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "false");
return props;
}
/**
* Display Utility
* @param consumerRecordkey
* @param id
* @param changeDataRecordType
* @param recordOpTime
* @param recordServerOpTime
* @param field
* @param op
* @param changeNodeOpTime
* @param changeNodeServerOpTime
* @param valueType
* @param value
*/
public void display(String consumerRecordkey,
Value id,
ChangeDataRecordType changeDataRecordType,
Long recordOpTime,
Long recordServerOpTime,
String field,
ChangeOp op,
Long changeNodeOpTime,
Long changeNodeServerOpTime,
Value.Type valueType,
Value value) {
Document document = MapRDB.newDocument();
document.set("consumerRecordkey", consumerRecordkey);
if(id != null)
document.set("id", id);
if(changeDataRecordType != null)
document.set("changeDataRecordType", changeDataRecordType.name());
document.set("recordOpTime", recordOpTime);
document.set("recordServerOpTime", recordServerOpTime);
if(field != null)
document.set("field", field);
document.set("op", op.name());
document.set("changeNodeOpTime", changeNodeOpTime);
document.set("changeNodeServerOpTime", changeNodeServerOpTime);
if(valueType != null)
document.set("valueType", valueType.name());
if(value != null)
document.set("value", value);
System.out.println("\t\n********* Propagated Change **************************\t\n");
System.out.println("\t\n" + document.asJsonString() + "\t\n");
System.out.println("\t\n******************************************************\t\n");
}
/**
* Parse change node contents via iterator
* @param consumerRecordkey
* @param changeDataRecord
*/
public void iteratorDisplay(Value id,
ChangeDataRecordType changeDataRecordType,
Long recordOpTime,
Long recordServerOpTime,
String consumerRecordkey,
ChangeDataRecord changeDataRecord) {
for (KeyValue<FieldPath, ChangeNode> fieldChangePair : changeDataRecord) {
// field if operation was done one a field
String field = fieldChangePair.getKey().asJsonString();
// Actual change node object, which holds change values
ChangeNode changeNode = fieldChangePair.getValue();
// Change Op, based on op done can be NULL, SET, MERGE, DELETE, DELETE_EXACT
ChangeOp op = changeNode.getOp();
// change node op time
Long changeNodeOpTime = changeNode.getOpTimestamp();
Long changeNodeServerOpTime = changeNode.getServerTimestamp();
// the value type if it was non delete operation, such as insert replace etc
Value.Type valueType = changeNode.getType();
// value of the operation such as insert value or replace
Value value = changeNode.getValue();
// display the change contents
display(consumerRecordkey, id, changeDataRecordType, recordOpTime, recordServerOpTime,
field, op, changeNodeOpTime, changeNodeServerOpTime, valueType, value);
}
}
/**
* Get Parsed Value
* @param changeDataReader
* @param field
* @param valueType
* @return
*/
public Value getValue(ChangeDataReader changeDataReader, String field, Value.Type valueType) {
Document valDoc = MapRDB.newDocument();
if(field == null) {
valDoc.setNull("null");
field = "null";
}
switch (valueType) {
case NULL:
valDoc.setNull(field);
break;
case BOOLEAN:
valDoc.set(field, changeDataReader.getBoolean());
break;
case STRING:
valDoc.set(field, changeDataReader.getString());
break;
case SHORT:
valDoc.set(field, changeDataReader.getShort());
break;
case BYTE:
valDoc.set(field, changeDataReader.getByte());
break;
case INT:
valDoc.set(field, changeDataReader.getInt());
break;
case LONG:
valDoc.set(field, changeDataReader.getLong());
break;
case FLOAT:
valDoc.set(field, changeDataReader.getFloat());
break;
case DOUBLE:
valDoc.set(field, changeDataReader.getDouble());
break;
case DECIMAL:
valDoc.set(field, changeDataReader.getDecimal());
break;
case DATE:
valDoc.set(field, changeDataReader.getDate());
break;
case TIME:
valDoc.set(field, changeDataReader.getTime());
break;
case TIMESTAMP:
valDoc.set(field, changeDataReader.getTimestamp());
break;
case INTERVAL:
valDoc.set(field, changeDataReader.getInterval());
break;
case BINARY:
valDoc.set(field, changeDataReader.getBinary());
break;
default:
break;
}
return valDoc.getValue(field);
}
/**
* Parse change node contents via reader
* @param consumerRecordkey
* @param changeDataRecord
*/
public void readerDisplay(Value id,
ChangeDataRecordType changeDataRecordType,
Long recordOpTime,
Long recordServerOpTime,
String consumerRecordkey,
ChangeDataRecord changeDataRecord) {
System.out.println("Reader");
ChangeEvent changeEvent;
// get reader from the event
ChangeDataReader changeDataReader = changeDataRecord.getReader();
while ((changeEvent = changeDataReader.next()) != null) {
// parse through change events
switch (changeEvent) {
case NODE:
System.out.println("node event get the value type");
Value.Type valueType = changeDataReader.getType();
String field = changeDataReader.getFieldName();
Long serverTimestamp = changeDataReader.getServerTimestamp();
Long opTimestamp = changeDataReader.getOpTimestamp();
ChangeOp op = changeDataReader.getOp();
Value value = getValue(changeDataReader, field, valueType);
display(consumerRecordkey, id, changeDataRecordType,
recordOpTime, recordServerOpTime, field, op, opTimestamp,
serverTimestamp, valueType, value);
break;
}
break;
}
}
/**
* Consume from changelog topics
* @param pollTimeout
* @param topics
*/
public void consume(long pollTimeout, String topics, boolean method) {
System.out.println("consume...");
// initialize consumer
KafkaConsumer<String, ChangeDataRecord> consumer = new KafkaConsumer<String, ChangeDataRecord>
(getBasicListnerProperties());
// subscribe to /stream:topic
List<String> topicList = new ArrayList<String>();
topicList.add(topics);
consumer.subscribe(topicList);
consumer.seekToBeginning();
// Get consumer records
ConsumerRecords<String, ChangeDataRecord> consumerRecords = consumer.poll(pollTimeout);
// iterate over consumer records
for(ConsumerRecord<String, ChangeDataRecord> consumerRecord: consumerRecords) {
String consumerRecordkey = consumerRecord.key().trim();
ChangeDataRecord changeDataRecord = consumerRecord.value();
// record key for the change
Value id = changeDataRecord.getId();
// record level op can be either RECORD_INSERT, RECORD_UPDATE, RECORD_DELETE
ChangeDataRecordType changeDataRecordType = changeDataRecord.getType();
// record level op-time & server op-time
Long recordOpTime = changeDataRecord.getOpTimestamp();
Long recordServerOpTime = changeDataRecord.getServerTimestamp();
if(method) {
// Method 1 - via iterator interface
iteratorDisplay(id, changeDataRecordType,
recordOpTime, recordServerOpTime,
consumerRecordkey, changeDataRecord);
} else {
// Method 2 - via reader interface
readerDisplay(id, changeDataRecordType,
recordOpTime, recordServerOpTime,
consumerRecordkey, changeDataRecord);
}
}
consumer.close();
}
/**
* Driver
* @param args
*/
public static void main(String[] args) {
Long pollTimeout = Long.parseLong(args[0]);
String topic = args[1];
boolean method = Boolean.parseBoolean(args[2]);
CDPConsumer cdpConsumer = new CDPConsumer();
cdpConsumer.consume(pollTimeout, topic, method);
}
}