Class StickyAssignor

  • All Implemented Interfaces:
    org.apache.kafka.clients.consumer.internals.PartitionAssignor

    public class StickyAssignor
    extends org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor

    This class is not supported.

    The sticky assignor serves two purposes. First, it guarantees an assignment that is as balanced as possible, meaning either:

    • the numbers of topic partitions assigned to consumers differ by at most one; or
    • each consumer that has 2+ fewer topic partitions than some other consumer cannot get any of those topic partitions transferred to it.
    Second, it preserved as many existing assignment as possible when a reassignment occurs. This helps in saving some of the overhead processing when topic partitions move from one consumer to another.

    Starting fresh it would work by distributing the partitions over consumers as evenly as possible. Even though this may sound similar to how round robin assignor works, the second example below shows that it is not. During a reassignment it would perform the reassignment in such a way that in the new assignment

    1. topic partitions are still distributed as evenly as possible, and
    2. topic partitions stay with their previously assigned consumers as much as possible.
    Of course, the first goal above takes precedence over the second one.

    Example 1. Suppose there are three consumers C0, C1, C2, four topics t0, t1, t2, t3, and each topic has 2 partitions, resulting in partitions t0p0, t0p1, t1p0, t1p1, t2p0, t2p1, t3p0, t3p1. Each consumer is subscribed to all three topics. The assignment with both sticky and round robin assignors will be:

    • C0: [t0p0, t1p1, t3p0]
    • C1: [t0p1, t2p0, t3p1]
    • C2: [t1p0, t2p1]
    Now, let's assume C1 is removed and a reassignment is about to happen. The round robin assignor would produce:
    • C0: [t0p0, t1p0, t2p0, t3p0]
    • C2: [t0p1, t1p1, t2p1, t3p1]
    while the sticky assignor would result in:
    • C0 [t0p0, t1p1, t3p0, t2p0]
    • C2 [t1p0, t2p1, t0p1, t3p1]
    preserving all the previous assignments (unlike the round robin assignor).

    Example 2. There are three consumers C0, C1, C2, and three topics t0, t1, t2, with 1, 2, and 3 partitions respectively. Therefore, the partitions are t0p0, t1p0, t1p1, t2p0, t2p1, t2p2. C0 is subscribed to t0; C1 is subscribed to t0, t1; and C2 is subscribed to t0, t1, t2. The round robin assignor would come up with the following assignment:

    • C0 [t0p0]
    • C1 [t1p0]
    • C2 [t1p1, t2p0, t2p1, t2p2]
    which is not as balanced as the assignment suggested by sticky assignor:
    • C0 [t0p0]
    • C1 [t1p0, t1p1]
    • C2 [t2p0, t2p1, t2p2]
    Now, if consumer C0 is removed, these two assignors would produce the following assignments. Round Robin (preserves 3 partition assignments):
    • C1 [t0p0, t1p1]
    • C2 [t1p0, t2p0, t2p1, t2p2]
    Sticky (preserves 5 partition assignments):
    • C1 [t1p0, t1p1, t0p0]
    • C2 [t2p0, t2p1, t2p2]

    Impact on ConsumerRebalanceListener

    The sticky assignment strategy can provide some optimization to those consumers that have some partition cleanup code in their onPartitionsRevoked() callback listeners. The cleanup code is placed in that callback listener because the consumer has no assumption or hope of preserving any of its assigned partitions after a rebalance when it is using range or round robin assignor. The listener code would look like this:
     
     class TheOldRebalanceListener implements ConsumerRebalanceListener {
    
       void onPartitionsRevoked(Collection<TopicPartition> partitions) {
         for (TopicPartition partition: partitions) {
           commitOffsets(partition);
           cleanupState(partition);
         }
       }
    
       void onPartitionsAssigned(Collection<TopicPartition> partitions) {
         for (TopicPartition partition: partitions) {
           initializeState(partition);
           initializeOffset(partition);
         }
       }
     }
     
     
    As mentioned above, one advantage of the sticky assignor is that, in general, it reduces the number of partitions that actually move from one consumer to another during a reassignment. Therefore, it allows consumers to do their cleanup more efficiently. Of course, they still can perform the partition cleanup in the onPartitionsRevoked() listener, but they can be more efficient and make a note of their partitions before and after the rebalance, and do the cleanup after the rebalance only on the partitions they have lost (which is normally not a lot). The code snippet below clarifies this point:
     
     class TheNewRebalanceListener implements ConsumerRebalanceListener {
       Collection<TopicPartition> lastAssignment = Collections.emptyList();
    
       void onPartitionsRevoked(Collection<TopicPartition> partitions) {
         for (TopicPartition partition: partitions)
           commitOffsets(partition);
       }
    
       void onPartitionsAssigned(Collection<TopicPartition> assignment) {
         for (TopicPartition partition: difference(lastAssignment, assignment))
           cleanupState(partition);
    
         for (TopicPartition partition: difference(assignment, lastAssignment))
           initializeState(partition);
    
         for (TopicPartition partition: assignment)
           initializeOffset(partition);
    
         this.lastAssignment = assignment;
       }
     }
     
     
    Any consumer that uses sticky assignment can leverage this listener like this: consumer.subscribe(topics, new TheNewRebalanceListener());
    • Nested Class Summary

      • Nested classes/interfaces inherited from interface org.apache.kafka.clients.consumer.internals.PartitionAssignor

        org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment, org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription
    • Constructor Summary

      Constructors 
      Constructor Description
      StickyAssignor()  
    • Method Summary

      All Methods Instance Methods Concrete Methods 
      Modifier and Type Method Description
      java.util.Map<java.lang.String,​java.util.List<TopicPartition>> assign​(java.util.Map<java.lang.String,​java.lang.Integer> partitionsPerTopic, java.util.Map<java.lang.String,​org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription> subscriptions)  
      java.lang.String name()  
      void onAssignment​(org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment assignment)  
      org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription subscription​(java.util.Set<java.lang.String> topics)  
      • Methods inherited from class org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor

        assign, partitions, put
      • Methods inherited from class java.lang.Object

        clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
    • Constructor Detail

      • StickyAssignor

        public StickyAssignor()
    • Method Detail

      • assign

        public java.util.Map<java.lang.String,​java.util.List<TopicPartition>> assign​(java.util.Map<java.lang.String,​java.lang.Integer> partitionsPerTopic,
                                                                                           java.util.Map<java.lang.String,​org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription> subscriptions)
        Specified by:
        assign in class org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor
      • onAssignment

        public void onAssignment​(org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment assignment)
        Specified by:
        onAssignment in interface org.apache.kafka.clients.consumer.internals.PartitionAssignor
        Overrides:
        onAssignment in class org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor
      • subscription

        public org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription subscription​(java.util.Set<java.lang.String> topics)
        Specified by:
        subscription in interface org.apache.kafka.clients.consumer.internals.PartitionAssignor
        Overrides:
        subscription in class org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor
      • name

        public java.lang.String name()