Kafka Schema Registry Use Cases

Describes typical use cases to register and query a schema and serialize and deserialize data.

Use Case 1: Registering and Querying a Schema for a Kafka Topic

While Kafka topics do not have a schema, having an external store that tracks this metadata for a given Kafka topic helps answer the following questions:
  • What are the different events in any given Kafka topic?
  • What can I put into a given Kafka topic?
  • Do all Kafka events have a similar type of schema?
  • How do I parse and use the data in a given Kafka topic?

Sample workflow code:

The following sample commands register and query a schema in a Kafka topic:

# Register a new version of a schema under the subject "Kafka-key"
        
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
    http://localhost:8087/subjects/Kafka-key/versions
    {"id":1}
        
# Register a new version of a schema under the subject "Kafka-value"
        
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
    http://localhost:8087/subjects/Kafka-value/versions
    {"id":1}
        
# List all subjects
        
$ curl -X GET http://localhost:8087/subjects
    ["Kafka-value","Kafka-key"]
        
# List all schema versions registered under the subject "Kafka-value"
        
$ curl -X GET http://localhost:8087/subjects/Kafka-value/versions
    [1]
        
# Fetch a schema by globally unique id 1
        
$ curl -X GET http://localhost:8087/schemas/ids/1
    {"schema":"\"string\""}
        
# Fetch version 1 of the schema registered under subject "Kafka-value"
        
$ curl -X GET http://localhost:8087/subjects/Kafka-value/versions/1
    {"subject":"Kafka-value","version":1,"id":1,"schema":"\"string\""}
        
# Fetch the most recently registered schema under subject "Kafka-value"
        
$ curl -X GET http://localhost:8087/subjects/Kafka-value/versions/latest
    {"subject":"Kafka-value","version":1,"id":1,"schema":"\"string\""}    

Use Case 2: Serializing and Deserializing Data in a Kafka Topic

In addition to storing the schema metadata for a topic, Kafka Schema Registry also provides mechanisms for reading and writing data to a Kafka topic in supported formats.

You can plug the appropriate Serializer into the KafkaProducer to send messages to Kafka. A Serializer is available for each of the supported formats.

For Avro, use the KafkaAvroSerializer.

Currently, primitive types of null, Boolean, Integer, Long, Float, Double, String, byte[], and the complex IndexedRecord type are supported.

Sending data of other types to KafkaAvroSerializer causes a SerializationException to occur. Typically, IndexedRecord is used for the value of the Kafka message. If used, the key of the Kafka message is often of one of the primitive types.

For example, when sending a message to a topic t, the schema for the key and the value is automatically registered in the Kafka Schema Registry under the subject t-key and t-value, respectively, if the compatibility test passes. The only exception is when the null type is never registered in the Kafka Schema Registry.

For consuming messages from a Kafka topic, the deserializer can be plugged analogically to the serializer.

Use Case 3: Supporting KSQL Streams or Tables in Supported formats

KSQL requires that you use the Kafka Schema Registry to create KSQL Streams or Tables in supported formats.

Schema Registry supports Avro format. Specify VALUE_FORMAT='AVRO' to work with topics that contain messages in Avro format.

Sample workflow code:

The following commands create and register a schema using an Avro console producer:

# Create a stream
        
$ maprcli stream create -path /sample-stream -produceperm p -consumeperm p -topicperm p
        
# Use Avro console producer to create and register a schema
        
/sample-stream:pageviews-avro-topic
        
CREATE STREAM pageviews WITH (KAFKA_TOPIC='pageviews-avro-topic', VALUE_FORMAT='AVRO');
        
CREATE TABLE users WITH (KAFKA_TOPIC='users-avro-topic', VALUE_FORMAT='AVRO',
        KEY='userid');