Using Timestamps on Streams and Topics

Provides a code example for using timestamps on MapR Event Store For Apache Kafka streams and topics.

Passing Timestamp Value

The timestamp value can be passed as part of the ProducerRecord, for example:

ProducerRecord<String, String> producerRecord = 
        new ProducerRecord<String, String>(topicName, partition, timestamp, key, value);
NOTE The timestamp value is retained if the timestamp type is createtime. If the timestamp type is logappendtime, then the timestamp value is ignored and instead the server timestamp is used.

Retrieving Timestamp Type

This example sets and retrieves the timestamp type. The following code example performs the following:
  • Creates a stream with a default timestamp type of LogAppendTime.
  • Creates a topic with a specific timestamp type of CreateTime.
  • Retrieves the topics's timestamp type.
// Create stream with default timestamp type as "LogAppendTime"
// Create a topic with timestamp type as "CreateTime"
        Configuration conf = new Configuration();
        Admin streamAdmin = Streams.newAdmin(conf);

// Create a stream
        StreamDescriptor sDesc = Streams.newStreamDescriptor();
        sDesc.setDefaultTimestampType(TimestampType.LOG_APPEND_TIME);
        streamAdmin.createStream(streamName, sDesc);
        
// Create a topic
        TopicDescriptor tDesc = Streams.newTopicDescriptor();
        tDesc.setTimestampType(TimestampType.CREATE_TIME);
        streamAdmin.createTopic(streamName, topicName, tDesc);
        
// Get topic timestamp type
        TopicDescriptor rDesc = streamAdmin.getTopicDescriptor(streamName, topicName);
        System.out.println(rDesc.getTimestampType().name);