Changes in Kafka 2.6.1

Describes several differences to note when upgrading from Kafka 2.1.1 to 2.6.1.

Classpath change
  • Kafka 2.6.1 uses classes from kafka-eventstreams.jar instead of mapr-streams.jar to access the cluster.
  • If an application fails with a ClassNotFoundException or NoClassDefFoundError for classes in packages under com.mapr.kafka.eventstreams.*, verify that kafka-eventstreams.jar is in the Java classpath. You can find kafka-eventstreams.jar in the /opt/mapr/lib/ directory, or you can download it from the Maven repository.
Scala changes
  • Scala version 2.11 is no longer supported. Scala versions 2.12 and 2.13 are supported.
  • Scala code leveraging the NewTopic(String, int, short) constructor with literal values must explicitly call toShort on the second literal.
RocksDBs change
  • Kafka Streams version 2.6.1 requires RocksDB version 5.18.4.
Default consumer group id
  • The default consumer group id has been changed from the empty string ("") to null. Consumers that use the new default group id will not be able to subscribe to topics and fetch or commit offsets. The empty string as consumer group id is deprecated but will be supported until a future major release. Old clients that rely on the empty string group id will now have to explicitly provide it as part of the consumer configuration. For more information, see KIP-289.
client.dns.lookup
The default value for the client.dns.lookup configuration has been changed from default to use_all_dns_ips. If a hostname resolves to multiple IP addresses, clients and brokers will now attempt to connect to each IP in sequence until the connection is successfully established. For more information, see KIP-602.
DSL
  • Use the DSL operator, cogroup(), to aggregate multiple streams together at once.
  • Kafka Streams DSL switches its used store types. While this change is mainly transparent to users, there are some corner cases that may require code changes.
KStream.toTable() API
Use the KStream.toTable() API to translate an input event stream into a KTable.
Serde type Void
Use the Serde type, Void, to represent null keys or null values from an input topic.
Sticky partitioning
The DefaultPartitioner now uses a sticky partitioning strategy. This means that records for a specific topic with null keys and no assigned partition will be sent to the same partition until the batch is ready to be sent. When a new batch is created, a new partition is chosen. This decreases latency to produce, but it may result in uneven distribution of records across partitions in edge cases. Generally, users will not be impacted, but this difference may be noticeable in tests and other situations producing records for a very short amount of time.
Rebalancing
  • We are introducing incremental cooperative rebalancing to the clients' group protocol, which allows consumers to keep all of their assigned partitions during a rebalance and in the end revoke only those which must be migrated to another consumer for the overall cluster balance. The ConsumerCoordinator will choose the latest RebalanceProtocol that is commonly supported by all of the consumer's supported assignors.
  • We are introducing a new rebalancing protocol for Kafka Connect based on incremental cooperative rebalancing. The new protocol does not require stopping all the tasks during a rebalancing phase between Connect workers. Instead, only the tasks that need to be exchanged between workers are stopped and they are started in a follow-up rebalance. The new Connect protocol is enabled by default. For more details on how it works and how to enable the old behavior of eager rebalancing, checkout incremental cooperative rebalancing design.
Deprecated APIs
  • Deprecated UsePreviousTimeOnInvalidTimestamp and replaced with UsePartitionTimeOnInvalidTimeStamp.
  • Provided support to query stale stores (for high availability) and the stores belonging to a specific partition by deprecating KafkaStreams.store(String, QueryableStoreType) and replacing it with KafkaStreams.store(StoreQueryParameters).
  • The internal PartitionAssignor interface has been deprecated and replaced with a new ConsumerPartitionAssignor in the public API. Some methods/signatures are slightly different between the two interfaces. Users implementing a custom PartitionAssignor should migrate to the new interface as soon as possible.
  • The blocking KafkaConsumer#committed methods have been extended to allow a list of partitions as input parameters rather than a single partition. It enables fewer request/response iterations between clients and brokers fetching for the committed offsets for the consumer group. The old overloaded functions are deprecated and we would recommend users making their code changes to leverage the new methods
  • The default consumer group id has been changed from the empty string ("") to null. Consumers who use the new default group id will not be able to subscribe to topics and fetch or commit offsets. The empty string as consumer group id is deprecated but will be supported until a future major release. Old clients that rely on the empty string group id will now have to explicitly provide it as part of their consumer configuration.