Kafka Schema Registry Demo for JSON Schema
Implements a Kafka Schema Registry demo example that stores and retrieves schemas in JSON Schema format.
Maven Dependencies
Add the following repositories to the POM file to resolve Confluent and MapR
dependencies:
<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
<repository>
<id>mapr-maven</id>
<url>https://repository.mapr.com/maven/</url>
<releases><enabled>true</enabled></releases>
<snapshots><enabled>true</enabled></snapshots>
</repository>
</repositories>
The following dependencies are needed for JSON Schema and MapR
Kafka:
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.10.5</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-json-schema-serializer</artifactId>
<version>6.0.0.0-eep-800</version>
</dependency>
Create a Java class that corresponds to JSON Schema
Create a Java class that includes Jackson annotations, for
example:
import com.fasterxml.jackson.annotation.JsonProperty;
public class User {
@JsonProperty
public String firstName;
@JsonProperty
public String lastName;
@JsonProperty
public short age;
public User() {}
public User(String firstName, String lastName, short age) {
this.firstName = firstName;
this.lastName = lastName;
this.age = age;
}
public String toString() {
return String.format("first name: " + firstName
+ "; last name: " + lastName + "; age: " + age);
}
}
Create a JSON Schema Producer
- Import the following properties for the Kafka
Producer:
import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer; import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializerConfig; import io.demo.example.User; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.IntegerSerializer; import java.util.Properties;
- Configure the following properties for the Event Data
Streams:
Properties properties = new Properties(); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); // Configure the KafkaJsonSchemaSerializer. properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaJsonSchemaSerializer.class.getName()); // Schema registry location. properties.setProperty(KafkaJsonSchemaSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8087"); KafkaProducer<Integer, User> producer = new KafkaProducer<>(properties);
- Use the following code to send
n
different objects of classUser.java
to the topicjson-schema_example
in the/sample-stream
stream:String topic = "/sample-stream:json-schema_example"; for (int i = 0; i < n; i++) { User user = new User("John" + i, "Doe", (short) (i + 30)); ProducerRecord<Integer, User> record = new ProducerRecord(topic, i, user); producer.send(record, (recordMetadata, e) -> { if (e == null) { System.out.println("Success!" ); System.out.println(recordMetadata.toString()); } else { e.printStackTrace(); } }); } producer.flush(); producer.close();
Create a JSON Schema Consumer
- Import the following properties for the Kafka
Consumer:
import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer; import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializerConfig; import io.demo.example.User; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.IntegerDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties;
- Add the
KafkaJsonSchemaDeserializerConfig.JSON_VALUE_TYPE
property to the properties of the Kafka Consumer to deserialize the output to the needed class.Properties properties = new Properties(); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); //Use Kafka JSON Schema Deserializer. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonSchemaDeserializer.class.getName()); //A class that the message value should be deserialized to. properties.put(KafkaJsonSchemaDeserializerConfig.JSON_VALUE_TYPE, User.class.getName()); //Schema registry location. properties.put(KafkaJsonSchemaDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8087"); KafkaConsumer<Integer, User> consumer = new KafkaConsumer<>(properties);
- Use the following code to read objects of the
User.java
class from thejson-schema_example
topic in the/sample-stream
stream:String topic = "/sample-stream:json-schema_example"; consumer.subscribe(Collections.singletonList(topic)); try { while (true) { ConsumerRecords<Integer, User> records = consumer.poll(Duration.ofMillis(100)); records.forEach(record -> { User userRecord = record.value(); System.out.printf("%s %d %d %s \n", record.topic(), record.partition(), record.offset(), userRecord); }); } } finally { consumer.close(); }