Kafka Schema Registry Demo for JSON Schema

Implements a Kafka Schema Registry demo example that stores and retrieves schemas in JSON Schema format.

Maven Dependencies

Add the following repositories to the POM file to resolve Confluent and MapR dependencies:
<repositories>
   <repository>
       <id>confluent</id>
       <url>http://packages.confluent.io/maven/</url>
   </repository>
   <repository>
       <id>mapr-maven</id>
       <url>https://repository.mapr.com/maven/</url>
       <releases><enabled>true</enabled></releases>
       <snapshots><enabled>true</enabled></snapshots>
   </repository>
</repositories>
The following dependencies are needed for JSON Schema and MapR Kafka:
<dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>2.10.5</version>
</dependency>

<dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-json-schema-serializer</artifactId>
            <version>6.0.0.0-eep-800</version>
</dependency>

Create a Java class that corresponds to JSON Schema

Create a Java class that includes Jackson annotations, for example:
import com.fasterxml.jackson.annotation.JsonProperty;

public class User {
    @JsonProperty
    public String firstName;
    @JsonProperty
    public String lastName;
    @JsonProperty
    public short age;
    public User() {}
    public User(String firstName, String lastName, short age) {
        this.firstName = firstName;
        this.lastName = lastName;
        this.age = age;
    }
    public String toString() {
        return String.format("first name: " + firstName
                + "; last name: " + lastName + "; age: " + age);
    }
}

Create a JSON Schema Producer

  1. Import the following properties for the Kafka Producer:
    import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer;
    import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializerConfig;
    import io.demo.example.User;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.IntegerSerializer;
    
    import java.util.Properties;
    
  2. Configure the following properties for the Event Data Streams:
    Properties properties = new Properties();
    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            IntegerSerializer.class.getName());
    
    // Configure the KafkaJsonSchemaSerializer.
    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            KafkaJsonSchemaSerializer.class.getName());
    
    // Schema registry location.
    properties.setProperty(KafkaJsonSchemaSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
            "http://localhost:8087");
    
    KafkaProducer<Integer, User> producer = 
            new KafkaProducer<>(properties);
  3. Use the following code to send n different objects of class User.java to the topic json-schema_example in the /sample-stream stream:
    String topic = "/sample-stream:json-schema_example";
    
    for (int i = 0; i < n; i++) {
                User user = new User("John" + i, "Doe", (short) (i + 30));
    
                ProducerRecord<Integer, User> record =
                        new ProducerRecord(topic, i, user);
    
                producer.send(record, (recordMetadata, e) -> {
                    if (e == null) {
                        System.out.println("Success!" );
                        System.out.println(recordMetadata.toString());
                    } else {
                        e.printStackTrace();
                    }
                });
    }
    
    producer.flush();
    producer.close();

Create a JSON Schema Consumer

  1. Import the following properties for the Kafka Consumer:
    import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer;
    import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializerConfig;
    import io.demo.example.User;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.IntegerDeserializer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
  2. Add the KafkaJsonSchemaDeserializerConfig.JSON_VALUE_TYPE property to the properties of the Kafka Consumer to deserialize the output to the needed class.
    Properties properties = new Properties();
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            IntegerDeserializer.class.getName());
    
    //Use Kafka JSON Schema Deserializer.
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            KafkaJsonSchemaDeserializer.class.getName());
    
    //A class that the message value should be deserialized to. 
    properties.put(KafkaJsonSchemaDeserializerConfig.JSON_VALUE_TYPE,
    
            User.class.getName());
    
    //Schema registry location.
    properties.put(KafkaJsonSchemaDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
            "http://localhost:8087");
    
    KafkaConsumer<Integer, User> consumer = 
            new KafkaConsumer<>(properties);
  3. Use the following code to read objects of the User.java class from the json-schema_example topic in the /sample-stream stream:
    String topic = "/sample-stream:json-schema_example";
    consumer.subscribe(Collections.singletonList(topic));
    
    try {
        while (true) {
                    ConsumerRecords<Integer, User> records =
                            consumer.poll(Duration.ofMillis(100));
    
                    records.forEach(record -> {
    
                        User userRecord = record.value();
    
                        System.out.printf("%s %d %d %s \n", record.topic(),
                                record.partition(), record.offset(), userRecord);
                    });
        }
    } finally {
        consumer.close();
    }