Interface ConsumerRebalanceListener


  • public interface ConsumerRebalanceListener
    A callback interface that the user can implement to trigger custom actions when the set of partitions assigned to the consumer changes.

    This is applicable when the consumer is having Kafka auto-manage group membership. If the consumer directly assigns partitions, those partitions will never be reassigned and this callback is not applicable.

    When Kafka is managing the group membership, a partition re-assignment will be triggered any time the members of the group change or the subscription of the members changes. This can occur when processes die, new process instances are added or old instances come back to life after failure. Rebalances can also be triggered by changes affecting the subscribed topics (e.g. when the number of partitions is administratively adjusted).

    There are many uses for this functionality. One common use is saving offsets in a custom store. By saving offsets in the onPartitionsRevoked(Collection) call we can ensure that any time partition assignment changes the offset gets saved.

    Another use is flushing out any kind of cache of intermediate results the consumer may be keeping. For example, consider a case where the consumer is subscribed to a topic containing user page views, and the goal is to count the number of page views per user for each five minute window. Let's say the topic is partitioned by the user id so that all events for a particular user go to a single consumer instance. The consumer can keep in memory a running tally of actions per user and only flush these out to a remote data store when its cache gets too big. However if a partition is reassigned it may want to automatically trigger a flush of this cache, before the new owner takes over consumption.

    This callback will only execute in the user thread as part of the poll(long) call whenever partition assignment changes.

    It is guaranteed that all consumer processes will invoke onPartitionsRevoked prior to any process invoking onPartitionsAssigned. So if offsets or other state is saved in the onPartitionsRevoked call it is guaranteed to be saved by the time the process taking over that partition has their onPartitionsAssigned callback called to load the state.

    Here is pseudo-code for a callback implementation for saving offsets:

     
       public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
           private Consumer<?,?> consumer;
    
           public SaveOffsetsOnRebalance(Consumer<?,?> consumer) {
               this.consumer = consumer;
           }
    
           public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
               // save the offsets in an external store using some custom code not described here
               for(TopicPartition partition: partitions)
                  saveOffsetInExternalStore(consumer.position(partition));
           }
    
           public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
               // read the offsets from an external store using some custom code not described here
               for(TopicPartition partition: partitions)
                  consumer.seek(partition, readOffsetFromExternalStore(partition));
           }
       }
     
     
    • Method Summary

      All Methods Instance Methods Abstract Methods 
      Modifier and Type Method Description
      void onPartitionsAssigned​(java.util.Collection<TopicPartition> partitions)
      A callback method the user can implement to provide handling of customized offsets on completion of a successful partition re-assignment.
      void onPartitionsRevoked​(java.util.Collection<TopicPartition> partitions)
      A callback method the user can implement to provide handling of offset commits to a customized store on the start of a rebalance operation.
    • Method Detail

      • onPartitionsRevoked

        void onPartitionsRevoked​(java.util.Collection<TopicPartition> partitions)
        A callback method the user can implement to provide handling of offset commits to a customized store on the start of a rebalance operation. This method will be called before a rebalance operation starts and after the consumer stops fetching data. It is recommended that offsets should be committed in this callback to either Kafka or a custom offset store to prevent duplicate data.

        For examples on usage of this API, see Usage Examples section of KafkaConsumer

        NOTE: This method is only called before rebalances. It is not called prior to KafkaConsumer.close().

        It is common for the revocation callback to use the consumer instance in order to commit offsets. It is possible for a WakeupException or InterruptException to be raised from one these nested invocations. In this case, the exception will be propagated to the current invocation of KafkaConsumer.poll(java.time.Duration) in which this callback is being executed. This means it is not necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer thread.

        Parameters:
        partitions - The list of partitions that were assigned to the consumer on the last rebalance
        Throws:
        WakeupException - If raised from a nested call to KafkaConsumer
        InterruptException - If raised from a nested call to KafkaConsumer
      • onPartitionsAssigned

        void onPartitionsAssigned​(java.util.Collection<TopicPartition> partitions)
        A callback method the user can implement to provide handling of customized offsets on completion of a successful partition re-assignment. This method will be called after the partition re-assignment completes and before the consumer starts fetching data, and only as the result of a poll(long) call.

        It is guaranteed that all the processes in a consumer group will execute their onPartitionsRevoked(Collection) callback before any instance executes its onPartitionsAssigned(Collection) callback.

        It is common for the assignment callback to use the consumer instance in order to query offsets. It is possible for a WakeupException or InterruptException to be raised from one these nested invocations. In this case, the exception will be propagated to the current invocation of KafkaConsumer.poll(java.time.Duration) in which this callback is being executed. This means it is not necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer thread.

        Parameters:
        partitions - The list of partitions that are now assigned to the consumer (may include partitions previously assigned to the consumer)
        Throws:
        WakeupException - If raised from a nested call to KafkaConsumer
        InterruptException - If raised from a nested call to KafkaConsumer