Class StreamsConfig

  • Direct Known Subclasses:
    QuietStreamsConfig

    public class StreamsConfig
    extends AbstractConfig
    Configuration for a KafkaStreams instance. Can also be used to configure the Kafka Streams internal KafkaConsumer, KafkaProducer and Admin. To avoid consumer/producer/admin property conflicts, you should prefix those properties using consumerPrefix(String), producerPrefix(String) and adminClientPrefix(String), respectively.

    Example:

    
     // potentially wrong: sets "metadata.max.age.ms" to 1 minute for producer AND consumer
     Properties streamsProperties = new Properties();
     streamsProperties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 60000);
     // or
     streamsProperties.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 60000);
    
     // suggested:
     Properties streamsProperties = new Properties();
     // sets "metadata.max.age.ms" to 1 minute for consumer only
     streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), 60000);
     // sets "metadata.max.age.ms" to 1 minute for producer only
     streamsProperties.put(StreamsConfig.producerPrefix(ProducerConfig.METADATA_MAX_AGE_CONFIG), 60000);
    
     StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
     
    This instance can also be used to pass in custom configurations to different modules (e.g. passing a special config in your customized serde class). The consumer/producer/admin prefix can also be used to distinguish these custom config values passed to different clients with the same config name. * Example:
    
     Properties streamsProperties = new Properties();
     // sets "my.custom.config" to "foo" for consumer only
     streamsProperties.put(StreamsConfig.consumerPrefix("my.custom.config"), "foo");
     // sets "my.custom.config" to "bar" for producer only
     streamsProperties.put(StreamsConfig.producerPrefix("my.custom.config"), "bar");
     // sets "my.custom.config2" to "boom" for all clients universally
     streamsProperties.put("my.custom.config2", "boom");
    
     // as a result, inside producer's serde class configure(..) function,
     // users can now read both key-value pairs "my.custom.config" -> "foo"
     // and "my.custom.config2" -> "boom" from the config map
     StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
     
    When increasing ProducerConfig.MAX_BLOCK_MS_CONFIG to be more resilient to non-available brokers you should also increase ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG using the following guidance:
         max.poll.interval.ms > max.block.ms
     
    Kafka Streams requires at least the following properties to be set: By default, Kafka Streams does not allow users to overwrite the following properties (Streams setting shown in parentheses):
    • "group.id" (<application.id>) - Streams client will always use the application ID a consumer group ID
    • "enable.auto.commit" (false) - Streams client will always disable/turn off auto committing
    • "partition.assignment.strategy" (StreamsPartitionAssignor) - Streams client will always use its own partition assignor
    If "processing.guarantee" is set to "exactly_once", Kafka Streams does not allow users to overwrite the following properties (Streams setting shown in parentheses):
    See Also:
    KafkaStreams(org.apache.kafka.streams.Topology, Properties), ConsumerConfig, ProducerConfig
    • Constructor Detail

      • StreamsConfig

        public StreamsConfig​(Map<?,​?> props)
        Create a new StreamsConfig using the given properties.
        Parameters:
        props - properties that specify Kafka Streams and internal consumer/producer configuration
      • StreamsConfig

        protected StreamsConfig​(Map<?,​?> props,
                                boolean doLog)
    • Method Detail

      • producerPrefix

        public static String producerPrefix​(String producerProp)
        Prefix a property with PRODUCER_PREFIX. This is used to isolate producer configs from other client configs.
        Parameters:
        producerProp - the producer property to be masked
        Returns:
        PRODUCER_PREFIX + producerProp
      • adminClientPrefix

        public static String adminClientPrefix​(String adminClientProp)
        Prefix a property with ADMIN_CLIENT_PREFIX. This is used to isolate admin configs from other client configs.
        Parameters:
        adminClientProp - the admin client property to be masked
        Returns:
        ADMIN_CLIENT_PREFIX + adminClientProp
      • topicPrefix

        public static String topicPrefix​(String topicProp)
        Prefix a property with TOPIC_PREFIX used to provide default topic configs to be applied when creating internal topics.
        Parameters:
        topicProp - the topic property to be masked
        Returns:
        TOPIC_PREFIX + topicProp
      • configDef

        public static ConfigDef configDef()
        Return a copy of the config definition.
        Returns:
        a copy of the config definition
      • postProcessParsedConfig

        protected Map<String,​Object> postProcessParsedConfig​(Map<String,​Object> parsedValues)
        Description copied from class: AbstractConfig
        Called directly after user configs got parsed (and thus default values got set). This allows to change default values for "secondary defaults" if required.
        Overrides:
        postProcessParsedConfig in class AbstractConfig
        Parameters:
        parsedValues - unmodifiable map of current configuration
        Returns:
        a map of updates that should be applied to the configuration (will be validated to prevent bad updates)
      • getProducerConfigs

        public Map<String,​Object> getProducerConfigs​(String clientId)
        Get the configs for the producer. Properties using the prefix PRODUCER_PREFIX will be used in favor over their non-prefixed versions except in the case of ProducerConfig.BOOTSTRAP_SERVERS_CONFIG where we always use the non-prefixed version as we only support reading/writing from/to the same Kafka Cluster.
        Parameters:
        clientId - clientId
        Returns:
        Map of the producer configuration.
      • getAdminConfigs

        public Map<String,​Object> getAdminConfigs​(String clientId)
        Get the configs for the admin client.
        Parameters:
        clientId - clientId
        Returns:
        Map of the admin client configuration.
      • defaultKeySerde

        public Serde defaultKeySerde()
        Return an configured instance of key Serde class.
        Returns:
        an configured instance of key Serde class
      • getStreamsInternalStreamFolder

        public String getStreamsInternalStreamFolder()
      • getStreamsInternalStreamNotcompacted

        public String getStreamsInternalStreamNotcompacted()
      • getStreamsInternalStreamCompacted

        public String getStreamsInternalStreamCompacted()
      • getStreamsCliSideAssignmentInternalStream

        public String getStreamsCliSideAssignmentInternalStream()
      • main

        public static void main​(String[] args)