Class KTableImpl<K,S,V>
- java.lang.Object
-
- org.apache.kafka.streams.kstream.internals.AbstractStream<K,V>
-
- org.apache.kafka.streams.kstream.internals.KTableImpl<K,S,V>
-
- Type Parameters:
K
- the key typeS
- the source's (parent's) value typeV
- the value type
- All Implemented Interfaces:
KTable<K,V>
public class KTableImpl<K,S,V> extends AbstractStream<K,V> implements KTable<K,V>
The implementation class ofKTable
.
-
-
Field Summary
-
Fields inherited from class org.apache.kafka.streams.kstream.internals.AbstractStream
builder, keySerde, name, streamsGraphNode, subTopologySourceNodes, valSerde
-
-
Constructor Summary
Constructors Constructor Description KTableImpl(String name, Serde<K> keySerde, Serde<V> valSerde, Set<String> subTopologySourceNodes, String queryableStoreName, ProcessorSupplier<?,?> processorSupplier, StreamsGraphNode streamsGraphNode, InternalStreamsBuilder builder)
-
Method Summary
All Methods Instance Methods Concrete Methods Deprecated Methods Modifier and Type Method Description void
enableSendingOldValues()
KTable<K,V>
filter(Predicate<? super K,? super V> predicate)
Create a newKTable
that consists of all records of thisKTable
which satisfy the given predicate, with default serializers, deserializers, and state store.KTable<K,V>
filter(Predicate<? super K,? super V> predicate, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Create a newKTable
that consists of all records of thisKTable
which satisfy the given predicate, with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance.KTable<K,V>
filter(Predicate<? super K,? super V> predicate, Named named)
Create a newKTable
that consists of all records of thisKTable
which satisfy the given predicate, with default serializers, deserializers, and state store.KTable<K,V>
filter(Predicate<? super K,? super V> predicate, Named named, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Create a newKTable
that consists of all records of thisKTable
which satisfy the given predicate, with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance.KTable<K,V>
filterNot(Predicate<? super K,? super V> predicate)
Create a newKTable
that consists all records of thisKTable
which do not satisfy the given predicate, with default serializers, deserializers, and state store.KTable<K,V>
filterNot(Predicate<? super K,? super V> predicate, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Create a newKTable
that consists all records of thisKTable
which do not satisfy the given predicate, with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance.KTable<K,V>
filterNot(Predicate<? super K,? super V> predicate, Named named)
Create a newKTable
that consists all records of thisKTable
which do not satisfy the given predicate, with default serializers, deserializers, and state store.KTable<K,V>
filterNot(Predicate<? super K,? super V> predicate, Named named, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Create a newKTable
that consists all records of thisKTable
which do not satisfy the given predicate, with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance.<K1,V1>
KGroupedTable<K1,V1>groupBy(KeyValueMapper<? super K,? super V,KeyValue<K1,V1>> selector)
Re-groups the records of thisKTable
using the providedKeyValueMapper
and default serializers and deserializers.<K1,V1>
KGroupedTable<K1,V1>groupBy(KeyValueMapper<? super K,? super V,KeyValue<K1,V1>> selector, Grouped<K1,V1> grouped)
Re-groups the records of thisKTable
using the providedKeyValueMapper
andSerde
s as specified byGrouped
.<K1,V1>
KGroupedTable<K1,V1>groupBy(KeyValueMapper<? super K,? super V,KeyValue<K1,V1>> selector, Serialized<K1,V1> serialized)
Deprecated.<V1,R>
KTable<K,R>join(KTable<K,V1> other, ValueJoiner<? super V,? super V1,? extends R> joiner)
Join records of thisKTable
with anotherKTable
's records using non-windowed inner equi join, with default serializers, deserializers, and state store.<V1,R>
KTable<K,R>join(KTable<K,V1> other, ValueJoiner<? super V,? super V1,? extends R> joiner, Named named)
Join records of thisKTable
with anotherKTable
's records using non-windowed inner equi join, with default serializers, deserializers, and state store.<VO,VR>
KTable<K,VR>join(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Join records of thisKTable
with anotherKTable
's records using non-windowed inner equi join, with theMaterialized
instance for configuration of thekey serde
,the result table's value serde
, andstate store
.<VO,VR>
KTable<K,VR>join(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Join records of thisKTable
with anotherKTable
's records using non-windowed inner equi join, with theMaterialized
instance for configuration of thekey serde
,the result table's value serde
, andstate store
.<VR,KO,VO>
KTable<K,VR>join(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner)
Join records of thisKTable
with anotherKTable
using non-windowed inner join.<VR,KO,VO>
KTable<K,VR>join(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Join records of thisKTable
with anotherKTable
using non-windowed inner join.<VR,KO,VO>
KTable<K,VR>join(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, Named named)
Join records of thisKTable
with anotherKTable
using non-windowed inner join.<VR,KO,VO>
KTable<K,VR>join(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Join records of thisKTable
with anotherKTable
using non-windowed inner join.<V1,R>
KTable<K,R>leftJoin(KTable<K,V1> other, ValueJoiner<? super V,? super V1,? extends R> joiner)
Join records of thisKTable
(left input) with anotherKTable
's (right input) records using non-windowed left equi join, with default serializers, deserializers, and state store.<V1,R>
KTable<K,R>leftJoin(KTable<K,V1> other, ValueJoiner<? super V,? super V1,? extends R> joiner, Named named)
Join records of thisKTable
(left input) with anotherKTable
's (right input) records using non-windowed left equi join, with default serializers, deserializers, and state store.<VO,VR>
KTable<K,VR>leftJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Join records of thisKTable
(left input) with anotherKTable
's (right input) records using non-windowed left equi join, with theMaterialized
instance for configuration of thekey serde
,the result table's value serde
, andstate store
.<VO,VR>
KTable<K,VR>leftJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Join records of thisKTable
(left input) with anotherKTable
's (right input) records using non-windowed left equi join, with theMaterialized
instance for configuration of thekey serde
,the result table's value serde
, andstate store
.<VR,KO,VO>
KTable<K,VR>leftJoin(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner)
Join records of thisKTable
with anotherKTable
using non-windowed left join.<VR,KO,VO>
KTable<K,VR>leftJoin(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Join records of thisKTable
with anotherKTable
using non-windowed left join.<VR,KO,VO>
KTable<K,VR>leftJoin(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, Named named)
Join records of thisKTable
with anotherKTable
using non-windowed left join.<VR,KO,VO>
KTable<K,VR>leftJoin(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Join records of thisKTable
with anotherKTable
using non-windowed left join.<VR> KTable<K,VR>
mapValues(ValueMapper<? super V,? extends VR> mapper)
Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type) in the newKTable
, with default serializers, deserializers, and state store.<VR> KTable<K,VR>
mapValues(ValueMapper<? super V,? extends VR> mapper, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type) in the newKTable
, with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance.<VR> KTable<K,VR>
mapValues(ValueMapper<? super V,? extends VR> mapper, Named named)
Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type) in the newKTable
, with default serializers, deserializers, and state store.<VR> KTable<K,VR>
mapValues(ValueMapper<? super V,? extends VR> mapper, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type) in the newKTable
, with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance.<VR> KTable<K,VR>
mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper)
Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type) in the newKTable
, with default serializers, deserializers, and state store.<VR> KTable<K,VR>
mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type) in the newKTable
, with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance.<VR> KTable<K,VR>
mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper, Named named)
Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type) in the newKTable
, with default serializers, deserializers, and state store.<VR> KTable<K,VR>
mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type) in the newKTable
, with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance.<V1,R>
KTable<K,R>outerJoin(KTable<K,V1> other, ValueJoiner<? super V,? super V1,? extends R> joiner)
Join records of thisKTable
(left input) with anotherKTable
's (right input) records using non-windowed outer equi join, with default serializers, deserializers, and state store.<V1,R>
KTable<K,R>outerJoin(KTable<K,V1> other, ValueJoiner<? super V,? super V1,? extends R> joiner, Named named)
Join records of thisKTable
(left input) with anotherKTable
's (right input) records using non-windowed outer equi join, with default serializers, deserializers, and state store.<VO,VR>
KTable<K,VR>outerJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Join records of thisKTable
(left input) with anotherKTable
's (right input) records using non-windowed outer equi join, with theMaterialized
instance for configuration of thekey serde
,the result table's value serde
, andstate store
.<VO,VR>
KTable<K,VR>outerJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Join records of thisKTable
(left input) with anotherKTable
's (right input) records using non-windowed outer equi join, with theMaterialized
instance for configuration of thekey serde
,the result table's value serde
, andstate store
.String
queryableStoreName()
Get the name of the local state store used that can be used to query thisKTable
.KTable<K,V>
suppress(Suppressed<? super K> suppressed)
Suppress some updates from this changelog stream, determined by the suppliedSuppressed
configuration.KStream<K,V>
toStream()
Convert this changelog stream to aKStream
.<K1> KStream<K1,V>
toStream(KeyValueMapper<? super K,? super V,? extends K1> mapper)
Convert this changelog stream to aKStream
using the givenKeyValueMapper
to select the new key.<K1> KStream<K1,V>
toStream(KeyValueMapper<? super K,? super V,? extends K1> mapper, Named named)
Convert this changelog stream to aKStream
using the givenKeyValueMapper
to select the new key.KStream<K,V>
toStream(Named named)
Convert this changelog stream to aKStream
.<VR> KTable<K,VR>
transformValues(ValueTransformerWithKeySupplier<? super K,? super V,? extends VR> transformerSupplier, String... stateStoreNames)
Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type), with default serializers, deserializers, and state store.<VR> KTable<K,VR>
transformValues(ValueTransformerWithKeySupplier<? super K,? super V,? extends VR> transformerSupplier, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized, String... stateStoreNames)
Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type), with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance.<VR> KTable<K,VR>
transformValues(ValueTransformerWithKeySupplier<? super K,? super V,? extends VR> transformerSupplier, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized, Named named, String... stateStoreNames)
Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type), with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance.<VR> KTable<K,VR>
transformValues(ValueTransformerWithKeySupplier<? super K,? super V,? extends VR> transformerSupplier, Named named, String... stateStoreNames)
Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type), with default serializers, deserializers, and state store.KTableValueGetterSupplier<K,V>
valueGetterSupplier()
-
Methods inherited from class org.apache.kafka.streams.kstream.internals.AbstractStream
internalTopologyBuilder, keySerde, valueSerde
-
-
-
-
Constructor Detail
-
KTableImpl
public KTableImpl(String name, Serde<K> keySerde, Serde<V> valSerde, Set<String> subTopologySourceNodes, String queryableStoreName, ProcessorSupplier<?,?> processorSupplier, StreamsGraphNode streamsGraphNode, InternalStreamsBuilder builder)
-
-
Method Detail
-
queryableStoreName
public String queryableStoreName()
Description copied from interface:KTable
Get the name of the local state store used that can be used to query thisKTable
.- Specified by:
queryableStoreName
in interfaceKTable<K,S>
- Returns:
- the underlying state store name, or
null
if thisKTable
cannot be queried.
-
filter
public KTable<K,V> filter(Predicate<? super K,? super V> predicate)
Description copied from interface:KTable
Create a newKTable
that consists of all records of thisKTable
which satisfy the given predicate, with default serializers, deserializers, and state store. All records that do not satisfy the predicate are dropped. For eachKTable
update, the filter is evaluated based on the current update record and then an update record is produced for the resultKTable
. This is a stateless record-by-record operation.Note that
filter
for a changelog stream works differently thanrecord stream filters
, becauserecords
withnull
values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., does not satisfy the given predicate) a tombstone record is forwarded.
-
filter
public KTable<K,V> filter(Predicate<? super K,? super V> predicate, Named named)
Description copied from interface:KTable
Create a newKTable
that consists of all records of thisKTable
which satisfy the given predicate, with default serializers, deserializers, and state store. All records that do not satisfy the predicate are dropped. For eachKTable
update, the filter is evaluated based on the current update record and then an update record is produced for the resultKTable
. This is a stateless record-by-record operation.Note that
filter
for a changelog stream works differently thanrecord stream filters
, becauserecords
withnull
values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., does not satisfy the given predicate) a tombstone record is forwarded.- Specified by:
filter
in interfaceKTable<K,S>
- Parameters:
predicate
- a filterPredicate
that is applied to each recordnamed
- aNamed
config used to name the processor in the topology- Returns:
- a
KTable
that contains only those records that satisfy the given predicate - See Also:
KTable.filterNot(Predicate)
-
filter
public KTable<K,V> filter(Predicate<? super K,? super V> predicate, Named named, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Description copied from interface:KTable
Create a newKTable
that consists of all records of thisKTable
which satisfy the given predicate, with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance. All records that do not satisfy the predicate are dropped. For eachKTable
update, the filter is evaluated based on the current update record and then an update record is produced for the resultKTable
. This is a stateless record-by-record operation.Note that
filter
for a changelog stream works differently thanrecord stream filters
, becauserecords
withnull
values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., does not satisfy the given predicate) a tombstone record is forwarded.To query the local
ReadOnlyKeyValueStore
it must be obtained viaKafkaStreams#store(...)
:
For non-local keys, a custom RPC mechanism must be implemented usingKafkaStreams streams = ... // filtering words ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore()); K key = "some-word"; ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
KafkaStreams.allMetadata()
to query the value of the key on a parallel running instance of your Kafka Streams application. The store name to query with is specified byMaterialized.as(String)
orMaterialized.as(KeyValueBytesStoreSupplier)
.- Specified by:
filter
in interfaceKTable<K,S>
- Parameters:
predicate
- a filterPredicate
that is applied to each recordnamed
- aNamed
config used to name the processor in the topologymaterialized
- aMaterialized
that describes how theStateStore
for the resultingKTable
should be materialized. Cannot benull
- Returns:
- a
KTable
that contains only those records that satisfy the given predicate - See Also:
KTable.filterNot(Predicate, Materialized)
-
filter
public KTable<K,V> filter(Predicate<? super K,? super V> predicate, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Description copied from interface:KTable
Create a newKTable
that consists of all records of thisKTable
which satisfy the given predicate, with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance. All records that do not satisfy the predicate are dropped. For eachKTable
update, the filter is evaluated based on the current update record and then an update record is produced for the resultKTable
. This is a stateless record-by-record operation.Note that
filter
for a changelog stream works differently thanrecord stream filters
, becauserecords
withnull
values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., does not satisfy the given predicate) a tombstone record is forwarded.To query the local
ReadOnlyKeyValueStore
it must be obtained viaKafkaStreams#store(...)
:
For non-local keys, a custom RPC mechanism must be implemented usingKafkaStreams streams = ... // filtering words ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore()); K key = "some-word"; ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
KafkaStreams.allMetadata()
to query the value of the key on a parallel running instance of your Kafka Streams application. The store name to query with is specified byMaterialized.as(String)
orMaterialized.as(KeyValueBytesStoreSupplier)
.- Specified by:
filter
in interfaceKTable<K,S>
- Parameters:
predicate
- a filterPredicate
that is applied to each recordmaterialized
- aMaterialized
that describes how theStateStore
for the resultingKTable
should be materialized. Cannot benull
- Returns:
- a
KTable
that contains only those records that satisfy the given predicate - See Also:
KTable.filterNot(Predicate, Materialized)
-
filterNot
public KTable<K,V> filterNot(Predicate<? super K,? super V> predicate)
Description copied from interface:KTable
Create a newKTable
that consists all records of thisKTable
which do not satisfy the given predicate, with default serializers, deserializers, and state store. All records that do satisfy the predicate are dropped. For eachKTable
update, the filter is evaluated based on the current update record and then an update record is produced for the resultKTable
. This is a stateless record-by-record operation.Note that
filterNot
for a changelog stream works differently thanrecord stream filters
, becauserecords
withnull
values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is forwarded.
-
filterNot
public KTable<K,V> filterNot(Predicate<? super K,? super V> predicate, Named named)
Description copied from interface:KTable
Create a newKTable
that consists all records of thisKTable
which do not satisfy the given predicate, with default serializers, deserializers, and state store. All records that do satisfy the predicate are dropped. For eachKTable
update, the filter is evaluated based on the current update record and then an update record is produced for the resultKTable
. This is a stateless record-by-record operation.Note that
filterNot
for a changelog stream works differently thanrecord stream filters
, becauserecords
withnull
values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is forwarded.- Specified by:
filterNot
in interfaceKTable<K,S>
- Parameters:
predicate
- a filterPredicate
that is applied to each recordnamed
- aNamed
config used to name the processor in the topology- Returns:
- a
KTable
that contains only those records that do not satisfy the given predicate - See Also:
KTable.filter(Predicate)
-
filterNot
public KTable<K,V> filterNot(Predicate<? super K,? super V> predicate, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Description copied from interface:KTable
Create a newKTable
that consists all records of thisKTable
which do not satisfy the given predicate, with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance. All records that do satisfy the predicate are dropped. For eachKTable
update, the filter is evaluated based on the current update record and then an update record is produced for the resultKTable
. This is a stateless record-by-record operation.Note that
filterNot
for a changelog stream works differently thanrecord stream filters
, becauserecords
withnull
values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is forwarded.To query the local
ReadOnlyKeyValueStore
it must be obtained viaKafkaStreams#store(...)
:
For non-local keys, a custom RPC mechanism must be implemented usingKafkaStreams streams = ... // filtering words ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore()); K key = "some-word"; ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
KafkaStreams.allMetadata()
to query the value of the key on a parallel running instance of your Kafka Streams application. The store name to query with is specified byMaterialized.as(String)
orMaterialized.as(KeyValueBytesStoreSupplier)
.- Specified by:
filterNot
in interfaceKTable<K,S>
- Parameters:
predicate
- a filterPredicate
that is applied to each recordmaterialized
- aMaterialized
that describes how theStateStore
for the resultingKTable
should be materialized. Cannot benull
- Returns:
- a
KTable
that contains only those records that do not satisfy the given predicate - See Also:
KTable.filter(Predicate, Materialized)
-
filterNot
public KTable<K,V> filterNot(Predicate<? super K,? super V> predicate, Named named, Materialized<K,V,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Description copied from interface:KTable
Create a newKTable
that consists all records of thisKTable
which do not satisfy the given predicate, with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance. All records that do satisfy the predicate are dropped. For eachKTable
update, the filter is evaluated based on the current update record and then an update record is produced for the resultKTable
. This is a stateless record-by-record operation.Note that
filterNot
for a changelog stream works differently thanrecord stream filters
, becauserecords
withnull
values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded directly if required (i.e., if there is anything to be deleted). Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is forwarded.To query the local
ReadOnlyKeyValueStore
it must be obtained viaKafkaStreams#store(...)
:
For non-local keys, a custom RPC mechanism must be implemented usingKafkaStreams streams = ... // filtering words ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<K, ValueAndTimestamp<V>>timestampedKeyValueStore()); K key = "some-word"; ValueAndTimestamp<V> valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
KafkaStreams.allMetadata()
to query the value of the key on a parallel running instance of your Kafka Streams application. The store name to query with is specified byMaterialized.as(String)
orMaterialized.as(KeyValueBytesStoreSupplier)
.- Specified by:
filterNot
in interfaceKTable<K,S>
- Parameters:
predicate
- a filterPredicate
that is applied to each recordnamed
- aNamed
config used to name the processor in the topologymaterialized
- aMaterialized
that describes how theStateStore
for the resultingKTable
should be materialized. Cannot benull
- Returns:
- a
KTable
that contains only those records that do not satisfy the given predicate - See Also:
KTable.filter(Predicate, Materialized)
-
mapValues
public <VR> KTable<K,VR> mapValues(ValueMapper<? super V,? extends VR> mapper)
Description copied from interface:KTable
Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type) in the newKTable
, with default serializers, deserializers, and state store. For eachKTable
update the providedValueMapper
is applied to the value of the updated record and computes a new value for it, resulting in an updated record for the resultKTable
. Thus, an input record<K,V>
can be transformed into an output record<K:V'>
. This is a stateless record-by-record operation.The example below counts the number of token of the value string.
KTable<String, String> inputTable = builder.table("topic"); KTable<String, Integer> outputTable = inputTable.mapValues(value -> value.split(" ").length);
This operation preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like a join) is applied to the result
KTable
.Note that
mapValues
for a changelog stream works differently thanrecord stream filters
, becauserecords
withnull
values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to delete the corresponding record in the resultKTable
.- Specified by:
mapValues
in interfaceKTable<K,S>
- Type Parameters:
VR
- the value type of the resultKTable
- Parameters:
mapper
- aValueMapper
that computes a new output value- Returns:
- a
KTable
that contains records with unmodified keys and new values (possibly of different type)
-
mapValues
public <VR> KTable<K,VR> mapValues(ValueMapper<? super V,? extends VR> mapper, Named named)
Description copied from interface:KTable
Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type) in the newKTable
, with default serializers, deserializers, and state store. For eachKTable
update the providedValueMapper
is applied to the value of the updated record and computes a new value for it, resulting in an updated record for the resultKTable
. Thus, an input record<K,V>
can be transformed into an output record<K:V'>
. This is a stateless record-by-record operation.The example below counts the number of token of the value string.
KTable<String, String> inputTable = builder.table("topic"); KTable<String, Integer> outputTable = inputTable.mapValues(value -> value.split(" ").length, Named.as("countTokenValue"));
This operation preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like a join) is applied to the result
KTable
.Note that
mapValues
for a changelog stream works differently thanrecord stream filters
, becauserecords
withnull
values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to delete the corresponding record in the resultKTable
.- Specified by:
mapValues
in interfaceKTable<K,S>
- Type Parameters:
VR
- the value type of the resultKTable
- Parameters:
mapper
- aValueMapper
that computes a new output valuenamed
- aNamed
config used to name the processor in the topology- Returns:
- a
KTable
that contains records with unmodified keys and new values (possibly of different type)
-
mapValues
public <VR> KTable<K,VR> mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper)
Description copied from interface:KTable
Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type) in the newKTable
, with default serializers, deserializers, and state store. For eachKTable
update the providedValueMapperWithKey
is applied to the value of the update record and computes a new value for it, resulting in an updated record for the resultKTable
. Thus, an input record<K,V>
can be transformed into an output record<K:V'>
. This is a stateless record-by-record operation.The example below counts the number of token of value and key strings.
KTable<String, String> inputTable = builder.table("topic"); KTable<String, Integer> outputTable = inputTable.mapValues((readOnlyKey, value) -> readOnlyKey.split(" ").length + value.split(" ").length);
Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. This operation preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like a join) is applied to the result
KTable
.Note that
mapValues
for a changelog stream works differently thanrecord stream filters
, becauserecords
withnull
values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to delete the corresponding record in the resultKTable
.- Specified by:
mapValues
in interfaceKTable<K,S>
- Type Parameters:
VR
- the value type of the resultKTable
- Parameters:
mapper
- aValueMapperWithKey
that computes a new output value- Returns:
- a
KTable
that contains records with unmodified keys and new values (possibly of different type)
-
mapValues
public <VR> KTable<K,VR> mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper, Named named)
Description copied from interface:KTable
Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type) in the newKTable
, with default serializers, deserializers, and state store. For eachKTable
update the providedValueMapperWithKey
is applied to the value of the update record and computes a new value for it, resulting in an updated record for the resultKTable
. Thus, an input record<K,V>
can be transformed into an output record<K:V'>
. This is a stateless record-by-record operation.The example below counts the number of token of value and key strings.
KTable<String, String> inputTable = builder.table("topic"); KTable<String, Integer> outputTable = inputTable.mapValues((readOnlyKey, value) -> readOnlyKey.split(" ").length + value.split(" ").length, Named.as("countTokenValueAndKey"));
Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. This operation preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like a join) is applied to the result
KTable
.Note that
mapValues
for a changelog stream works differently thanrecord stream filters
, becauserecords
withnull
values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to delete the corresponding record in the resultKTable
.- Specified by:
mapValues
in interfaceKTable<K,S>
- Type Parameters:
VR
- the value type of the resultKTable
- Parameters:
mapper
- aValueMapperWithKey
that computes a new output valuenamed
- aNamed
config used to name the processor in the topology- Returns:
- a
KTable
that contains records with unmodified keys and new values (possibly of different type)
-
mapValues
public <VR> KTable<K,VR> mapValues(ValueMapper<? super V,? extends VR> mapper, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Description copied from interface:KTable
Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type) in the newKTable
, with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance. For eachKTable
update the providedValueMapper
is applied to the value of the updated record and computes a new value for it, resulting in an updated record for the resultKTable
. Thus, an input record<K,V>
can be transformed into an output record<K:V'>
. This is a stateless record-by-record operation.The example below counts the number of token of the value string.
KTable<String, String> inputTable = builder.table("topic"); KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapper<String, Integer> { Integer apply(String value) { return value.split(" ").length; } });
To query the local
KeyValueStore
representing outputTable above it must be obtained viaKafkaStreams#store(...)
: For non-local keys, a custom RPC mechanism must be implemented usingKafkaStreams.allMetadata()
to query the value of the key on a parallel running instance of your Kafka Streams application. The store name to query with is specified byMaterialized.as(String)
orMaterialized.as(KeyValueBytesStoreSupplier)
.This operation preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like a join) is applied to the result
KTable
.Note that
mapValues
for a changelog stream works differently thanrecord stream filters
, becauserecords
withnull
values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to delete the corresponding record in the resultKTable
.- Specified by:
mapValues
in interfaceKTable<K,S>
- Type Parameters:
VR
- the value type of the resultKTable
- Parameters:
mapper
- aValueMapper
that computes a new output valuematerialized
- aMaterialized
that describes how theStateStore
for the resultingKTable
should be materialized. Cannot benull
- Returns:
- a
KTable
that contains records with unmodified keys and new values (possibly of different type)
-
mapValues
public <VR> KTable<K,VR> mapValues(ValueMapper<? super V,? extends VR> mapper, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Description copied from interface:KTable
Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type) in the newKTable
, with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance. For eachKTable
update the providedValueMapper
is applied to the value of the updated record and computes a new value for it, resulting in an updated record for the resultKTable
. Thus, an input record<K,V>
can be transformed into an output record<K:V'>
. This is a stateless record-by-record operation.The example below counts the number of token of the value string.
KTable<String, String> inputTable = builder.table("topic"); KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapper<String, Integer> { Integer apply(String value) { return value.split(" ").length; } });
To query the local
KeyValueStore
representing outputTable above it must be obtained viaKafkaStreams#store(...)
: For non-local keys, a custom RPC mechanism must be implemented usingKafkaStreams.allMetadata()
to query the value of the key on a parallel running instance of your Kafka Streams application. The store name to query with is specified byMaterialized.as(String)
orMaterialized.as(KeyValueBytesStoreSupplier)
.This operation preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like a join) is applied to the result
KTable
.Note that
mapValues
for a changelog stream works differently thanrecord stream filters
, becauserecords
withnull
values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to delete the corresponding record in the resultKTable
.- Specified by:
mapValues
in interfaceKTable<K,S>
- Type Parameters:
VR
- the value type of the resultKTable
- Parameters:
mapper
- aValueMapper
that computes a new output valuenamed
- aNamed
config used to name the processor in the topologymaterialized
- aMaterialized
that describes how theStateStore
for the resultingKTable
should be materialized. Cannot benull
- Returns:
- a
KTable
that contains records with unmodified keys and new values (possibly of different type)
-
mapValues
public <VR> KTable<K,VR> mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Description copied from interface:KTable
Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type) in the newKTable
, with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance. For eachKTable
update the providedValueMapperWithKey
is applied to the value of the update record and computes a new value for it, resulting in an updated record for the resultKTable
. Thus, an input record<K,V>
can be transformed into an output record<K:V'>
. This is a stateless record-by-record operation.The example below counts the number of token of value and key strings.
KTable<String, String> inputTable = builder.table("topic"); KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapperWithKey<String, String, Integer> { Integer apply(String readOnlyKey, String value) { return readOnlyKey.split(" ").length + value.split(" ").length; } });
To query the local
KeyValueStore
representing outputTable above it must be obtained viaKafkaStreams.store(StoreQueryParameters)
KafkaStreams#store(...)}: For non-local keys, a custom RPC mechanism must be implemented usingKafkaStreams.allMetadata()
to query the value of the key on a parallel running instance of your Kafka Streams application. The store name to query with is specified byMaterialized.as(String)
orMaterialized.as(KeyValueBytesStoreSupplier)
.Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. This operation preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like a join) is applied to the result
KTable
.Note that
mapValues
for a changelog stream works differently thanrecord stream filters
, becauserecords
withnull
values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to delete the corresponding record in the resultKTable
.- Specified by:
mapValues
in interfaceKTable<K,S>
- Type Parameters:
VR
- the value type of the resultKTable
- Parameters:
mapper
- aValueMapperWithKey
that computes a new output valuematerialized
- aMaterialized
that describes how theStateStore
for the resultingKTable
should be materialized. Cannot benull
- Returns:
- a
KTable
that contains records with unmodified keys and new values (possibly of different type)
-
mapValues
public <VR> KTable<K,VR> mapValues(ValueMapperWithKey<? super K,? super V,? extends VR> mapper, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Description copied from interface:KTable
Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type) in the newKTable
, with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance. For eachKTable
update the providedValueMapperWithKey
is applied to the value of the update record and computes a new value for it, resulting in an updated record for the resultKTable
. Thus, an input record<K,V>
can be transformed into an output record<K:V'>
. This is a stateless record-by-record operation.The example below counts the number of token of value and key strings.
KTable<String, String> inputTable = builder.table("topic"); KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapperWithKey<String, String, Integer> { Integer apply(String readOnlyKey, String value) { return readOnlyKey.split(" ").length + value.split(" ").length; } });
To query the local
KeyValueStore
representing outputTable above it must be obtained viaKafkaStreams#store(...)
: For non-local keys, a custom RPC mechanism must be implemented usingKafkaStreams.allMetadata()
to query the value of the key on a parallel running instance of your Kafka Streams application. The store name to query with is specified byMaterialized.as(String)
orMaterialized.as(KeyValueBytesStoreSupplier)
.Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. This operation preserves data co-location with respect to the key. Thus, no internal data redistribution is required if a key based operator (like a join) is applied to the result
KTable
.Note that
mapValues
for a changelog stream works differently thanrecord stream filters
, becauserecords
withnull
values (so-called tombstone records) have delete semantics. Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to delete the corresponding record in the resultKTable
.- Specified by:
mapValues
in interfaceKTable<K,S>
- Type Parameters:
VR
- the value type of the resultKTable
- Parameters:
mapper
- aValueMapperWithKey
that computes a new output valuenamed
- aNamed
config used to name the processor in the topologymaterialized
- aMaterialized
that describes how theStateStore
for the resultingKTable
should be materialized. Cannot benull
- Returns:
- a
KTable
that contains records with unmodified keys and new values (possibly of different type)
-
transformValues
public <VR> KTable<K,VR> transformValues(ValueTransformerWithKeySupplier<? super K,? super V,? extends VR> transformerSupplier, String... stateStoreNames)
Description copied from interface:KTable
Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type), with default serializers, deserializers, and state store. AValueTransformerWithKey
(provided by the givenValueTransformerWithKeySupplier
) is applied to each input record value and computes a new value for it. Thus, an input record<K,V>
can be transformed into an output record<K:V'>
. This is similar toKTable.mapValues(ValueMapperWithKey)
, but more flexible, allowing access to additional state-stores, and access to theProcessorContext
. Furthermore, viaPunctuator.punctuate(long)
the processing progress can be observed and additional periodic actions can be performed.If the downstream topology uses aggregation functions, (e.g.
KGroupedTable.reduce(org.apache.kafka.streams.kstream.Reducer<V>, org.apache.kafka.streams.kstream.Reducer<V>, org.apache.kafka.streams.kstream.Materialized<K, V, org.apache.kafka.streams.state.KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>>)
,KGroupedTable.aggregate(org.apache.kafka.streams.kstream.Initializer<VR>, org.apache.kafka.streams.kstream.Aggregator<? super K, ? super V, VR>, org.apache.kafka.streams.kstream.Aggregator<? super K, ? super V, VR>, org.apache.kafka.streams.kstream.Materialized<K, VR, org.apache.kafka.streams.state.KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>>)
, etc), care must be taken when dealing with state, (either held in state-stores or transformer instances), to ensure correct aggregate results. In contrast, if the resulting KTable is materialized, (cf.KTable.transformValues(ValueTransformerWithKeySupplier, Materialized, String...)
), such concerns are handled for you.In order to assign a state, the state must be created and registered beforehand:
// create store StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"), Serdes.String(), Serdes.String()); // register store builder.addStateStore(keyValueStoreBuilder); KTable outputTable = inputTable.transformValues(new ValueTransformerWithKeySupplier() { ... }, "myValueTransformState");
Within the
ValueTransformerWithKey
, the state is obtained via theProcessorContext
. To trigger periodic actions viapunctuate()
, a schedule must be registered.new ValueTransformerWithKeySupplier() { ValueTransformerWithKey get() { return new ValueTransformerWithKey() { private KeyValueStore<String, String> state; void init(ProcessorContext context) { this.state = (KeyValueStore<String, String>)context.getStateStore("myValueTransformState"); context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state } NewValueType transform(K readOnlyKey, V value) { // can access this.state and use read-only key return new NewValueType(readOnlyKey); // or null } void close() { // can access this.state } } } }
Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. Setting a new value preserves data co-location with respect to the key.
- Specified by:
transformValues
in interfaceKTable<K,S>
- Type Parameters:
VR
- the value type of the result table- Parameters:
transformerSupplier
- a instance ofValueTransformerWithKeySupplier
that generates aValueTransformerWithKey
. At least one transformer instance will be created per streaming task. Transformers do not need to be thread-safe.stateStoreNames
- the names of the state stores used by the processor- Returns:
- a
KTable
that contains records with unmodified key and new values (possibly of different type) - See Also:
KTable.mapValues(ValueMapper)
,KTable.mapValues(ValueMapperWithKey)
-
transformValues
public <VR> KTable<K,VR> transformValues(ValueTransformerWithKeySupplier<? super K,? super V,? extends VR> transformerSupplier, Named named, String... stateStoreNames)
Description copied from interface:KTable
Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type), with default serializers, deserializers, and state store. AValueTransformerWithKey
(provided by the givenValueTransformerWithKeySupplier
) is applied to each input record value and computes a new value for it. Thus, an input record<K,V>
can be transformed into an output record<K:V'>
. This is similar toKTable.mapValues(ValueMapperWithKey)
, but more flexible, allowing access to additional state-stores, and access to theProcessorContext
. Furthermore, viaPunctuator.punctuate(long)
the processing progress can be observed and additional periodic actions can be performed.If the downstream topology uses aggregation functions, (e.g.
KGroupedTable.reduce(org.apache.kafka.streams.kstream.Reducer<V>, org.apache.kafka.streams.kstream.Reducer<V>, org.apache.kafka.streams.kstream.Materialized<K, V, org.apache.kafka.streams.state.KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>>)
,KGroupedTable.aggregate(org.apache.kafka.streams.kstream.Initializer<VR>, org.apache.kafka.streams.kstream.Aggregator<? super K, ? super V, VR>, org.apache.kafka.streams.kstream.Aggregator<? super K, ? super V, VR>, org.apache.kafka.streams.kstream.Materialized<K, VR, org.apache.kafka.streams.state.KeyValueStore<org.apache.kafka.common.utils.Bytes, byte[]>>)
, etc), care must be taken when dealing with state, (either held in state-stores or transformer instances), to ensure correct aggregate results. In contrast, if the resulting KTable is materialized, (cf.KTable.transformValues(ValueTransformerWithKeySupplier, Materialized, String...)
), such concerns are handled for you.In order to assign a state, the state must be created and registered beforehand:
// create store StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"), Serdes.String(), Serdes.String()); // register store builder.addStateStore(keyValueStoreBuilder); KTable outputTable = inputTable.transformValues(new ValueTransformerWithKeySupplier() { ... }, "myValueTransformState");
Within the
ValueTransformerWithKey
, the state is obtained via theProcessorContext
. To trigger periodic actions viapunctuate()
, a schedule must be registered.new ValueTransformerWithKeySupplier() { ValueTransformerWithKey get() { return new ValueTransformerWithKey() { private KeyValueStore<String, String> state; void init(ProcessorContext context) { this.state = (KeyValueStore<String, String>)context.getStateStore("myValueTransformState"); context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state } NewValueType transform(K readOnlyKey, V value) { // can access this.state and use read-only key return new NewValueType(readOnlyKey); // or null } void close() { // can access this.state } } } }
Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. Setting a new value preserves data co-location with respect to the key.
- Specified by:
transformValues
in interfaceKTable<K,S>
- Type Parameters:
VR
- the value type of the result table- Parameters:
transformerSupplier
- a instance ofValueTransformerWithKeySupplier
that generates aValueTransformerWithKey
. At least one transformer instance will be created per streaming task. Transformers do not need to be thread-safe.named
- aNamed
config used to name the processor in the topologystateStoreNames
- the names of the state stores used by the processor- Returns:
- a
KTable
that contains records with unmodified key and new values (possibly of different type) - See Also:
KTable.mapValues(ValueMapper)
,KTable.mapValues(ValueMapperWithKey)
-
transformValues
public <VR> KTable<K,VR> transformValues(ValueTransformerWithKeySupplier<? super K,? super V,? extends VR> transformerSupplier, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized, String... stateStoreNames)
Description copied from interface:KTable
Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type), with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance. AValueTransformerWithKey
(provided by the givenValueTransformerWithKeySupplier
) is applied to each input record value and computes a new value for it. This is similar toKTable.mapValues(ValueMapperWithKey)
, but more flexible, allowing stateful, rather than stateless, record-by-record operation, access to additional state-stores, and access to theProcessorContext
. Furthermore, viaPunctuator.punctuate(long)
the processing progress can be observed and additional periodic actions can be performed. The resultingKTable
is materialized into another state store (additional to the provided state store names) as specified by the user viaMaterialized
parameter, and is queryable through its given name.In order to assign a state, the state must be created and registered beforehand:
// create store StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"), Serdes.String(), Serdes.String()); // register store builder.addStateStore(keyValueStoreBuilder); KTable outputTable = inputTable.transformValues( new ValueTransformerWithKeySupplier() { ... }, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("outputTable") .withKeySerde(Serdes.String()) .withValueSerde(Serdes.String()), "myValueTransformState");
Within the
ValueTransformerWithKey
, the state is obtained via theProcessorContext
. To trigger periodic actions viapunctuate()
, a schedule must be registered.new ValueTransformerWithKeySupplier() { ValueTransformerWithKey get() { return new ValueTransformerWithKey() { private KeyValueStore<String, String> state; void init(ProcessorContext context) { this.state = (KeyValueStore<String, String>)context.getStateStore("myValueTransformState"); context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state } NewValueType transform(K readOnlyKey, V value) { // can access this.state and use read-only key return new NewValueType(readOnlyKey); // or null } void close() { // can access this.state } } } }
Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. Setting a new value preserves data co-location with respect to the key.
- Specified by:
transformValues
in interfaceKTable<K,S>
- Type Parameters:
VR
- the value type of the result table- Parameters:
transformerSupplier
- a instance ofValueTransformerWithKeySupplier
that generates aValueTransformerWithKey
. At least one transformer instance will be created per streaming task. Transformers do not need to be thread-safe.materialized
- an instance ofMaterialized
used to describe how the state store of the resulting table should be materialized. Cannot benull
stateStoreNames
- the names of the state stores used by the processor- Returns:
- a
KTable
that contains records with unmodified key and new values (possibly of different type) - See Also:
KTable.mapValues(ValueMapper)
,KTable.mapValues(ValueMapperWithKey)
-
transformValues
public <VR> KTable<K,VR> transformValues(ValueTransformerWithKeySupplier<? super K,? super V,? extends VR> transformerSupplier, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized, Named named, String... stateStoreNames)
Description copied from interface:KTable
Create a newKTable
by transforming the value of each record in thisKTable
into a new value (with possibly a new type), with thekey serde
,value serde
, and the underlyingmaterialized state storage
configured in theMaterialized
instance. AValueTransformerWithKey
(provided by the givenValueTransformerWithKeySupplier
) is applied to each input record value and computes a new value for it. This is similar toKTable.mapValues(ValueMapperWithKey)
, but more flexible, allowing stateful, rather than stateless, record-by-record operation, access to additional state-stores, and access to theProcessorContext
. Furthermore, viaPunctuator.punctuate(long)
the processing progress can be observed and additional periodic actions can be performed. The resultingKTable
is materialized into another state store (additional to the provided state store names) as specified by the user viaMaterialized
parameter, and is queryable through its given name.In order to assign a state, the state must be created and registered beforehand:
// create store StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"), Serdes.String(), Serdes.String()); // register store builder.addStateStore(keyValueStoreBuilder); KTable outputTable = inputTable.transformValues( new ValueTransformerWithKeySupplier() { ... }, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("outputTable") .withKeySerde(Serdes.String()) .withValueSerde(Serdes.String()), "myValueTransformState");
Within the
ValueTransformerWithKey
, the state is obtained via theProcessorContext
. To trigger periodic actions viapunctuate()
, a schedule must be registered.new ValueTransformerWithKeySupplier() { ValueTransformerWithKey get() { return new ValueTransformerWithKey() { private KeyValueStore<String, String> state; void init(ProcessorContext context) { this.state = (KeyValueStore<String, String>)context.getStateStore("myValueTransformState"); context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state } NewValueType transform(K readOnlyKey, V value) { // can access this.state and use read-only key return new NewValueType(readOnlyKey); // or null } void close() { // can access this.state } } } }
Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. Setting a new value preserves data co-location with respect to the key.
- Specified by:
transformValues
in interfaceKTable<K,S>
- Type Parameters:
VR
- the value type of the result table- Parameters:
transformerSupplier
- a instance ofValueTransformerWithKeySupplier
that generates aValueTransformerWithKey
. At least one transformer instance will be created per streaming task. Transformers do not need to be thread-safe.materialized
- an instance ofMaterialized
used to describe how the state store of the resulting table should be materialized. Cannot benull
named
- aNamed
config used to name the processor in the topologystateStoreNames
- the names of the state stores used by the processor- Returns:
- a
KTable
that contains records with unmodified key and new values (possibly of different type) - See Also:
KTable.mapValues(ValueMapper)
,KTable.mapValues(ValueMapperWithKey)
-
toStream
public <K1> KStream<K1,V> toStream(KeyValueMapper<? super K,? super V,? extends K1> mapper)
Description copied from interface:KTable
Convert this changelog stream to aKStream
using the givenKeyValueMapper
to select the new key.For example, you can compute the new key as the length of the value string.
Setting a new key might result in an internal data redistribution if a key based operator (like an aggregation or join) is applied to the resultKTable<String, String> table = builder.table("topic"); KTable<Integer, String> keyedStream = table.toStream(new KeyValueMapper<String, String, Integer> { Integer apply(String key, String value) { return value.length(); } });
KStream
.This operation is equivalent to calling
table.
toStream
().
selectKey(KeyValueMapper)
.Note that
KTable.toStream()
is a logical operation and only changes the "interpretation" of the stream, i.e., each record of this changelog stream is no longer treated as an updated record (cf.KStream
vsKTable
).
-
toStream
public <K1> KStream<K1,V> toStream(KeyValueMapper<? super K,? super V,? extends K1> mapper, Named named)
Description copied from interface:KTable
Convert this changelog stream to aKStream
using the givenKeyValueMapper
to select the new key.For example, you can compute the new key as the length of the value string.
Setting a new key might result in an internal data redistribution if a key based operator (like an aggregation or join) is applied to the resultKTable<String, String> table = builder.table("topic"); KTable<Integer, String> keyedStream = table.toStream(new KeyValueMapper<String, String, Integer> { Integer apply(String key, String value) { return value.length(); } });
KStream
.This operation is equivalent to calling
table.
toStream
().
selectKey(KeyValueMapper)
.Note that
KTable.toStream()
is a logical operation and only changes the "interpretation" of the stream, i.e., each record of this changelog stream is no longer treated as an updated record (cf.KStream
vsKTable
).- Specified by:
toStream
in interfaceKTable<K,S>
- Type Parameters:
K1
- the new key type of the result stream- Parameters:
mapper
- aKeyValueMapper
that computes a new key for each recordnamed
- aNamed
config used to name the processor in the topology- Returns:
- a
KStream
that contains the same records as thisKTable
-
suppress
public KTable<K,V> suppress(Suppressed<? super K> suppressed)
Description copied from interface:KTable
Suppress some updates from this changelog stream, determined by the suppliedSuppressed
configuration. This controls what updates downstream table and stream operations will receive.
-
join
public <V1,R> KTable<K,R> join(KTable<K,V1> other, ValueJoiner<? super V,? super V1,? extends R> joiner)
Description copied from interface:KTable
Join records of thisKTable
with anotherKTable
's records using non-windowed inner equi join, with default serializers, deserializers, and state store. The join is a primary key join with join attributethisKTable.key == otherKTable.key
. The result is an ever updatingKTable
that represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTable
and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable
. This happens in a symmetric way, i.e., for each update of eitherthis
or theother
inputKTable
the result gets updated.For each
KTable
record that finds a corresponding record in the otherKTable
the providedValueJoiner
will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
records
withnull
values (so-called tombstone records) have delete semantics. Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTable
if required (i.e., if there is anything to be deleted).Input records with
null
key will be dropped and no join computation is performed.Example:
thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:C> <K1:C> <K1:b> <K1:ValueJoiner(C,b)> <K1:C> <K1:null> <K1:null> - Specified by:
join
in interfaceKTable<K,S>
- Type Parameters:
V1
- the value type of the otherKTable
R
- the value type of the resultKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
joiner
- aValueJoiner
that computes the join result for a pair of matching records- Returns:
- a
KTable
that contains join-records for each key and values computed by the givenValueJoiner
, one for each matched record-pair with the same key - See Also:
KTable.leftJoin(KTable, ValueJoiner)
,KTable.outerJoin(KTable, ValueJoiner)
-
join
public <V1,R> KTable<K,R> join(KTable<K,V1> other, ValueJoiner<? super V,? super V1,? extends R> joiner, Named named)
Description copied from interface:KTable
Join records of thisKTable
with anotherKTable
's records using non-windowed inner equi join, with default serializers, deserializers, and state store. The join is a primary key join with join attributethisKTable.key == otherKTable.key
. The result is an ever updatingKTable
that represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTable
and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable
. This happens in a symmetric way, i.e., for each update of eitherthis
or theother
inputKTable
the result gets updated.For each
KTable
record that finds a corresponding record in the otherKTable
the providedValueJoiner
will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
records
withnull
values (so-called tombstone records) have delete semantics. Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTable
if required (i.e., if there is anything to be deleted).Input records with
null
key will be dropped and no join computation is performed.Example:
thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:C> <K1:C> <K1:b> <K1:ValueJoiner(C,b)> <K1:C> <K1:null> <K1:null> - Specified by:
join
in interfaceKTable<K,S>
- Type Parameters:
V1
- the value type of the otherKTable
R
- the value type of the resultKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
joiner
- aValueJoiner
that computes the join result for a pair of matching recordsnamed
- aNamed
config used to name the processor in the topology- Returns:
- a
KTable
that contains join-records for each key and values computed by the givenValueJoiner
, one for each matched record-pair with the same key - See Also:
KTable.leftJoin(KTable, ValueJoiner)
,KTable.outerJoin(KTable, ValueJoiner)
-
join
public <VO,VR> KTable<K,VR> join(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Description copied from interface:KTable
Join records of thisKTable
with anotherKTable
's records using non-windowed inner equi join, with theMaterialized
instance for configuration of thekey serde
,the result table's value serde
, andstate store
. The join is a primary key join with join attributethisKTable.key == otherKTable.key
. The result is an ever updatingKTable
that represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTable
and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable
. This happens in a symmetric way, i.e., for each update of eitherthis
or theother
inputKTable
the result gets updated.For each
KTable
record that finds a corresponding record in the otherKTable
the providedValueJoiner
will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
records
withnull
values (so-called tombstone records) have delete semantics. Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTable
if required (i.e., if there is anything to be deleted).Input records with
null
key will be dropped and no join computation is performed.Example:
thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:C> <K1:C> <K1:b> <K1:ValueJoiner(C,b)> <K1:C> <K1:null> <K1:null> - Specified by:
join
in interfaceKTable<K,S>
- Type Parameters:
VO
- the value type of the otherKTable
VR
- the value type of the resultKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
joiner
- aValueJoiner
that computes the join result for a pair of matching recordsmaterialized
- an instance ofMaterialized
used to describe how the state store should be materialized. Cannot benull
- Returns:
- a
KTable
that contains join-records for each key and values computed by the givenValueJoiner
, one for each matched record-pair with the same key - See Also:
KTable.leftJoin(KTable, ValueJoiner, Materialized)
,KTable.outerJoin(KTable, ValueJoiner, Materialized)
-
join
public <VO,VR> KTable<K,VR> join(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Description copied from interface:KTable
Join records of thisKTable
with anotherKTable
's records using non-windowed inner equi join, with theMaterialized
instance for configuration of thekey serde
,the result table's value serde
, andstate store
. The join is a primary key join with join attributethisKTable.key == otherKTable.key
. The result is an ever updatingKTable
that represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTable
and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable
. This happens in a symmetric way, i.e., for each update of eitherthis
or theother
inputKTable
the result gets updated.For each
KTable
record that finds a corresponding record in the otherKTable
the providedValueJoiner
will be called to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
records
withnull
values (so-called tombstone records) have delete semantics. Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTable
if required (i.e., if there is anything to be deleted).Input records with
null
key will be dropped and no join computation is performed.Example:
thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:C> <K1:C> <K1:b> <K1:ValueJoiner(C,b)> <K1:C> <K1:null> <K1:null> - Specified by:
join
in interfaceKTable<K,S>
- Type Parameters:
VO
- the value type of the otherKTable
VR
- the value type of the resultKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
joiner
- aValueJoiner
that computes the join result for a pair of matching recordsnamed
- aNamed
config used to name the processor in the topologymaterialized
- an instance ofMaterialized
used to describe how the state store should be materialized. Cannot benull
- Returns:
- a
KTable
that contains join-records for each key and values computed by the givenValueJoiner
, one for each matched record-pair with the same key - See Also:
KTable.leftJoin(KTable, ValueJoiner, Materialized)
,KTable.outerJoin(KTable, ValueJoiner, Materialized)
-
outerJoin
public <V1,R> KTable<K,R> outerJoin(KTable<K,V1> other, ValueJoiner<? super V,? super V1,? extends R> joiner)
Description copied from interface:KTable
Join records of thisKTable
(left input) with anotherKTable
's (right input) records using non-windowed outer equi join, with default serializers, deserializers, and state store. The join is a primary key join with join attributethisKTable.key == otherKTable.key
. In contrast toinner-join
orleft-join
, all records from both inputKTable
s will produce an output record (cf. below). The result is an ever updatingKTable
that represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTable
and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable
. This happens in a symmetric way, i.e., for each update of eitherthis
or theother
inputKTable
the result gets updated.For each
KTable
record that finds a corresponding record in the otherKTable
's state the providedValueJoiner
will be called to compute a value (with arbitrary type) for the result record. Additionally, for each record that does not find a corresponding record in the corresponding otherKTable
's state the providedValueJoiner
will be called withnull
value for the corresponding other value to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
records
withnull
values (so-called tombstone records) have delete semantics. Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTable
if required (i.e., if there is anything to be deleted).Input records with
null
key will be dropped and no join computation is performed.Example:
thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:ValueJoiner(A,null)> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:null> <K1:b> <K1:ValueJoiner(null,b)> <K1:null> <K1:null> - Specified by:
outerJoin
in interfaceKTable<K,S>
- Type Parameters:
V1
- the value type of the otherKTable
R
- the value type of the resultKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
joiner
- aValueJoiner
that computes the join result for a pair of matching records- Returns:
- a
KTable
that contains join-records for each key and values computed by the givenValueJoiner
, one for each matched record-pair with the same key plus one for each non-matching record of bothKTable
s - See Also:
KTable.join(KTable, ValueJoiner)
,KTable.leftJoin(KTable, ValueJoiner)
-
outerJoin
public <V1,R> KTable<K,R> outerJoin(KTable<K,V1> other, ValueJoiner<? super V,? super V1,? extends R> joiner, Named named)
Description copied from interface:KTable
Join records of thisKTable
(left input) with anotherKTable
's (right input) records using non-windowed outer equi join, with default serializers, deserializers, and state store. The join is a primary key join with join attributethisKTable.key == otherKTable.key
. In contrast toinner-join
orleft-join
, all records from both inputKTable
s will produce an output record (cf. below). The result is an ever updatingKTable
that represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTable
and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable
. This happens in a symmetric way, i.e., for each update of eitherthis
or theother
inputKTable
the result gets updated.For each
KTable
record that finds a corresponding record in the otherKTable
's state the providedValueJoiner
will be called to compute a value (with arbitrary type) for the result record. Additionally, for each record that does not find a corresponding record in the corresponding otherKTable
's state the providedValueJoiner
will be called withnull
value for the corresponding other value to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
records
withnull
values (so-called tombstone records) have delete semantics. Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTable
if required (i.e., if there is anything to be deleted).Input records with
null
key will be dropped and no join computation is performed.Example:
thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:ValueJoiner(A,null)> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:null> <K1:b> <K1:ValueJoiner(null,b)> <K1:null> <K1:null> - Specified by:
outerJoin
in interfaceKTable<K,S>
- Type Parameters:
V1
- the value type of the otherKTable
R
- the value type of the resultKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
joiner
- aValueJoiner
that computes the join result for a pair of matching recordsnamed
- aNamed
config used to name the processor in the topology- Returns:
- a
KTable
that contains join-records for each key and values computed by the givenValueJoiner
, one for each matched record-pair with the same key plus one for each non-matching record of bothKTable
s - See Also:
KTable.join(KTable, ValueJoiner)
,KTable.leftJoin(KTable, ValueJoiner)
-
outerJoin
public <VO,VR> KTable<K,VR> outerJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Description copied from interface:KTable
Join records of thisKTable
(left input) with anotherKTable
's (right input) records using non-windowed outer equi join, with theMaterialized
instance for configuration of thekey serde
,the result table's value serde
, andstate store
. The join is a primary key join with join attributethisKTable.key == otherKTable.key
. In contrast toinner-join
orleft-join
, all records from both inputKTable
s will produce an output record (cf. below). The result is an ever updatingKTable
that represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTable
and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable
. This happens in a symmetric way, i.e., for each update of eitherthis
or theother
inputKTable
the result gets updated.For each
KTable
record that finds a corresponding record in the otherKTable
's state the providedValueJoiner
will be called to compute a value (with arbitrary type) for the result record. Additionally, for each record that does not find a corresponding record in the corresponding otherKTable
's state the providedValueJoiner
will be called withnull
value for the corresponding other value to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
records
withnull
values (so-called tombstone records) have delete semantics. Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTable
if required (i.e., if there is anything to be deleted).Input records with
null
key will be dropped and no join computation is performed.Example:
thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:ValueJoiner(A,null)> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:null> <K1:b> <K1:ValueJoiner(null,b)> <K1:null> <K1:null> - Specified by:
outerJoin
in interfaceKTable<K,S>
- Type Parameters:
VO
- the value type of the otherKTable
VR
- the value type of the resultKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
joiner
- aValueJoiner
that computes the join result for a pair of matching recordsmaterialized
- an instance ofMaterialized
used to describe how the state store should be materialized. Cannot benull
- Returns:
- a
KTable
that contains join-records for each key and values computed by the givenValueJoiner
, one for each matched record-pair with the same key plus one for each non-matching record of bothKTable
s - See Also:
KTable.join(KTable, ValueJoiner)
,KTable.leftJoin(KTable, ValueJoiner)
-
outerJoin
public <VO,VR> KTable<K,VR> outerJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Description copied from interface:KTable
Join records of thisKTable
(left input) with anotherKTable
's (right input) records using non-windowed outer equi join, with theMaterialized
instance for configuration of thekey serde
,the result table's value serde
, andstate store
. The join is a primary key join with join attributethisKTable.key == otherKTable.key
. In contrast toinner-join
orleft-join
, all records from both inputKTable
s will produce an output record (cf. below). The result is an ever updatingKTable
that represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTable
and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable
. This happens in a symmetric way, i.e., for each update of eitherthis
or theother
inputKTable
the result gets updated.For each
KTable
record that finds a corresponding record in the otherKTable
's state the providedValueJoiner
will be called to compute a value (with arbitrary type) for the result record. Additionally, for each record that does not find a corresponding record in the corresponding otherKTable
's state the providedValueJoiner
will be called withnull
value for the corresponding other value to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
records
withnull
values (so-called tombstone records) have delete semantics. Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTable
if required (i.e., if there is anything to be deleted).Input records with
null
key will be dropped and no join computation is performed.Example:
thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:ValueJoiner(A,null)> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:null> <K1:b> <K1:ValueJoiner(null,b)> <K1:null> <K1:null> - Specified by:
outerJoin
in interfaceKTable<K,S>
- Type Parameters:
VO
- the value type of the otherKTable
VR
- the value type of the resultKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
joiner
- aValueJoiner
that computes the join result for a pair of matching recordsnamed
- aNamed
config used to name the processor in the topologymaterialized
- an instance ofMaterialized
used to describe how the state store should be materialized. Cannot benull
- Returns:
- a
KTable
that contains join-records for each key and values computed by the givenValueJoiner
, one for each matched record-pair with the same key plus one for each non-matching record of bothKTable
s - See Also:
KTable.join(KTable, ValueJoiner)
,KTable.leftJoin(KTable, ValueJoiner)
-
leftJoin
public <V1,R> KTable<K,R> leftJoin(KTable<K,V1> other, ValueJoiner<? super V,? super V1,? extends R> joiner)
Description copied from interface:KTable
Join records of thisKTable
(left input) with anotherKTable
's (right input) records using non-windowed left equi join, with default serializers, deserializers, and state store. The join is a primary key join with join attributethisKTable.key == otherKTable.key
. In contrast toinner-join
, all records from leftKTable
will produce an output record (cf. below). The result is an ever updatingKTable
that represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTable
and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable
. This happens in a symmetric way, i.e., for each update of eitherthis
or theother
inputKTable
the result gets updated.For each
KTable
record that finds a corresponding record in the otherKTable
's state the providedValueJoiner
will be called to compute a value (with arbitrary type) for the result record. Additionally, for each record of leftKTable
that does not find a corresponding record in the rightKTable
's state the providedValueJoiner
will be called withrightValue = null
to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
records
withnull
values (so-called tombstone records) have delete semantics. For example, for left input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTable
if required (i.e., if there is anything to be deleted).Input records with
null
key will be dropped and no join computation is performed.Example:
thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:ValueJoiner(A,null)> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:null> <K1:b> <K1:null> <K1:null> - Specified by:
leftJoin
in interfaceKTable<K,S>
- Type Parameters:
V1
- the value type of the otherKTable
R
- the value type of the resultKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
joiner
- aValueJoiner
that computes the join result for a pair of matching records- Returns:
- a
KTable
that contains join-records for each key and values computed by the givenValueJoiner
, one for each matched record-pair with the same key plus one for each non-matching record of leftKTable
- See Also:
KTable.join(KTable, ValueJoiner)
,KTable.outerJoin(KTable, ValueJoiner)
-
leftJoin
public <V1,R> KTable<K,R> leftJoin(KTable<K,V1> other, ValueJoiner<? super V,? super V1,? extends R> joiner, Named named)
Description copied from interface:KTable
Join records of thisKTable
(left input) with anotherKTable
's (right input) records using non-windowed left equi join, with default serializers, deserializers, and state store. The join is a primary key join with join attributethisKTable.key == otherKTable.key
. In contrast toinner-join
, all records from leftKTable
will produce an output record (cf. below). The result is an ever updatingKTable
that represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTable
and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable
. This happens in a symmetric way, i.e., for each update of eitherthis
or theother
inputKTable
the result gets updated.For each
KTable
record that finds a corresponding record in the otherKTable
's state the providedValueJoiner
will be called to compute a value (with arbitrary type) for the result record. Additionally, for each record of leftKTable
that does not find a corresponding record in the rightKTable
's state the providedValueJoiner
will be called withrightValue = null
to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
records
withnull
values (so-called tombstone records) have delete semantics. For example, for left input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTable
if required (i.e., if there is anything to be deleted).Input records with
null
key will be dropped and no join computation is performed.Example:
thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:ValueJoiner(A,null)> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:null> <K1:b> <K1:null> <K1:null> - Specified by:
leftJoin
in interfaceKTable<K,S>
- Type Parameters:
V1
- the value type of the otherKTable
R
- the value type of the resultKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
joiner
- aValueJoiner
that computes the join result for a pair of matching recordsnamed
- aNamed
config used to name the processor in the topology- Returns:
- a
KTable
that contains join-records for each key and values computed by the givenValueJoiner
, one for each matched record-pair with the same key plus one for each non-matching record of leftKTable
- See Also:
KTable.join(KTable, ValueJoiner)
,KTable.outerJoin(KTable, ValueJoiner)
-
leftJoin
public <VO,VR> KTable<K,VR> leftJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Description copied from interface:KTable
Join records of thisKTable
(left input) with anotherKTable
's (right input) records using non-windowed left equi join, with theMaterialized
instance for configuration of thekey serde
,the result table's value serde
, andstate store
. The join is a primary key join with join attributethisKTable.key == otherKTable.key
. In contrast toinner-join
, all records from leftKTable
will produce an output record (cf. below). The result is an ever updatingKTable
that represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTable
and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable
. This happens in a symmetric way, i.e., for each update of eitherthis
or theother
inputKTable
the result gets updated.For each
KTable
record that finds a corresponding record in the otherKTable
's state the providedValueJoiner
will be called to compute a value (with arbitrary type) for the result record. Additionally, for each record of leftKTable
that does not find a corresponding record in the rightKTable
's state the providedValueJoiner
will be called withrightValue = null
to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
records
withnull
values (so-called tombstone records) have delete semantics. For example, for left input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTable
if required (i.e., if there is anything to be deleted).Input records with
null
key will be dropped and no join computation is performed.Example:
thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:ValueJoiner(A,null)> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:null> <K1:b> <K1:null> <K1:null> - Specified by:
leftJoin
in interfaceKTable<K,S>
- Type Parameters:
VO
- the value type of the otherKTable
VR
- the value type of the resultKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
joiner
- aValueJoiner
that computes the join result for a pair of matching recordsmaterialized
- an instance ofMaterialized
used to describe how the state store should be materialized. Cannot benull
- Returns:
- a
KTable
that contains join-records for each key and values computed by the givenValueJoiner
, one for each matched record-pair with the same key plus one for each non-matching record of leftKTable
- See Also:
KTable.join(KTable, ValueJoiner, Materialized)
,KTable.outerJoin(KTable, ValueJoiner, Materialized)
-
leftJoin
public <VO,VR> KTable<K,VR> leftJoin(KTable<K,VO> other, ValueJoiner<? super V,? super VO,? extends VR> joiner, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Description copied from interface:KTable
Join records of thisKTable
(left input) with anotherKTable
's (right input) records using non-windowed left equi join, with theMaterialized
instance for configuration of thekey serde
,the result table's value serde
, andstate store
. The join is a primary key join with join attributethisKTable.key == otherKTable.key
. In contrast toinner-join
, all records from leftKTable
will produce an output record (cf. below). The result is an ever updatingKTable
that represents the current (i.e., processing time) result of the join.The join is computed by (1) updating the internal state of one
KTable
and (2) performing a lookup for a matching record in the current (i.e., processing time) internal state of the otherKTable
. This happens in a symmetric way, i.e., for each update of eitherthis
or theother
inputKTable
the result gets updated.For each
KTable
record that finds a corresponding record in the otherKTable
's state the providedValueJoiner
will be called to compute a value (with arbitrary type) for the result record. Additionally, for each record of leftKTable
that does not find a corresponding record in the rightKTable
's state the providedValueJoiner
will be called withrightValue = null
to compute a value (with arbitrary type) for the result record. The key of the result record is the same as for both joining input records.Note that
records
withnull
values (so-called tombstone records) have delete semantics. For example, for left input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly to delete a record in the resultKTable
if required (i.e., if there is anything to be deleted).Input records with
null
key will be dropped and no join computation is performed.Example:
thisKTable thisState otherKTable otherState result updated record <K1:A> <K1:A> <K1:ValueJoiner(A,null)> <K1:A> <K1:b> <K1:b> <K1:ValueJoiner(A,b)> <K1:null> <K1:b> <K1:null> <K1:null> - Specified by:
leftJoin
in interfaceKTable<K,S>
- Type Parameters:
VO
- the value type of the otherKTable
VR
- the value type of the resultKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
joiner
- aValueJoiner
that computes the join result for a pair of matching recordsnamed
- aNamed
config used to name the processor in the topologymaterialized
- an instance ofMaterialized
used to describe how the state store should be materialized. Cannot benull
- Returns:
- a
KTable
that contains join-records for each key and values computed by the givenValueJoiner
, one for each matched record-pair with the same key plus one for each non-matching record of leftKTable
- See Also:
KTable.join(KTable, ValueJoiner, Materialized)
,KTable.outerJoin(KTable, ValueJoiner, Materialized)
-
groupBy
public <K1,V1> KGroupedTable<K1,V1> groupBy(KeyValueMapper<? super K,? super V,KeyValue<K1,V1>> selector)
Description copied from interface:KTable
Re-groups the records of thisKTable
using the providedKeyValueMapper
and default serializers and deserializers. EachKeyValue
pair of thisKTable
is mapped to a newKeyValue
pair by applying the providedKeyValueMapper
. Re-grouping aKTable
is required before an aggregation operator can be applied to the data (cf.KGroupedTable
). TheKeyValueMapper
selects a new key and value (with should both have unmodified type). If the new record key isnull
the record will not be included in the resultingKGroupedTable
Because a new key is selected, an internal repartitioning topic will be created in Kafka. This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
StreamsConfig
via parameterAPPLICATION_ID_CONFIG
, "<name>" is an internally generated name, and "-repartition" is a fixed suffix. You can retrieve all generated internal topic names viaTopology.describe()
.All data of this
KTable
will be redistributed through the repartitioning topic by writing all update records to and rereading all updated records from it, such that the resultingKGroupedTable
is partitioned on the new key.If the key or value type is changed, it is recommended to use
KTable.groupBy(KeyValueMapper, Grouped)
instead.- Specified by:
groupBy
in interfaceKTable<K,S>
- Type Parameters:
K1
- the key type of the resultKGroupedTable
V1
- the value type of the resultKGroupedTable
- Parameters:
selector
- aKeyValueMapper
that computes a new grouping key and value to be aggregated- Returns:
- a
KGroupedTable
that contains the re-grouped records of the originalKTable
-
groupBy
@Deprecated public <K1,V1> KGroupedTable<K1,V1> groupBy(KeyValueMapper<? super K,? super V,KeyValue<K1,V1>> selector, Serialized<K1,V1> serialized)
Deprecated.Description copied from interface:KTable
Re-groups the records of thisKTable
using the providedKeyValueMapper
andSerde
s as specified bySerialized
. EachKeyValue
pair of thisKTable
is mapped to a newKeyValue
pair by applying the providedKeyValueMapper
. Re-grouping aKTable
is required before an aggregation operator can be applied to the data (cf.KGroupedTable
). TheKeyValueMapper
selects a new key and value (with both maybe being the same type or a new type). If the new record key isnull
the record will not be included in the resultingKGroupedTable
Because a new key is selected, an internal repartitioning topic will be created in Kafka. This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
StreamsConfig
via parameterAPPLICATION_ID_CONFIG
, "<name>" is an internally generated name, and "-repartition" is a fixed suffix. You can retrieve all generated internal topic names viaTopology.describe()
.All data of this
KTable
will be redistributed through the repartitioning topic by writing all update records to and rereading all updated records from it, such that the resultingKGroupedTable
is partitioned on the new key.- Specified by:
groupBy
in interfaceKTable<K,S>
- Type Parameters:
K1
- the key type of the resultKGroupedTable
V1
- the value type of the resultKGroupedTable
- Parameters:
selector
- aKeyValueMapper
that computes a new grouping key and value to be aggregatedserialized
- theSerialized
instance used to specifySerdes
- Returns:
- a
KGroupedTable
that contains the re-grouped records of the originalKTable
-
groupBy
public <K1,V1> KGroupedTable<K1,V1> groupBy(KeyValueMapper<? super K,? super V,KeyValue<K1,V1>> selector, Grouped<K1,V1> grouped)
Description copied from interface:KTable
Re-groups the records of thisKTable
using the providedKeyValueMapper
andSerde
s as specified byGrouped
. EachKeyValue
pair of thisKTable
is mapped to a newKeyValue
pair by applying the providedKeyValueMapper
. Re-grouping aKTable
is required before an aggregation operator can be applied to the data (cf.KGroupedTable
). TheKeyValueMapper
selects a new key and value (where both could the same type or a new type). If the new record key isnull
the record will not be included in the resultingKGroupedTable
Because a new key is selected, an internal repartitioning topic will be created in Kafka. This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
StreamsConfig
via parameterAPPLICATION_ID_CONFIG
, "<name>" is either provided viaGrouped.as(String)
or an internally generated name.You can retrieve all generated internal topic names via
Topology.describe()
.All data of this
KTable
will be redistributed through the repartitioning topic by writing all update records to and rereading all updated records from it, such that the resultingKGroupedTable
is partitioned on the new key.- Specified by:
groupBy
in interfaceKTable<K,S>
- Type Parameters:
K1
- the key type of the resultKGroupedTable
V1
- the value type of the resultKGroupedTable
- Parameters:
selector
- aKeyValueMapper
that computes a new grouping key and value to be aggregatedgrouped
- theGrouped
instance used to specifySerdes
and the name for a repartition topic if repartitioning is required.- Returns:
- a
KGroupedTable
that contains the re-grouped records of the originalKTable
-
valueGetterSupplier
public KTableValueGetterSupplier<K,V> valueGetterSupplier()
-
enableSendingOldValues
public void enableSendingOldValues()
-
join
public <VR,KO,VO> KTable<K,VR> join(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner)
Description copied from interface:KTable
Join records of thisKTable
with anotherKTable
using non-windowed inner join.This is a foreign key join, where the joining key is determined by the
foreignKeyExtractor
.- Specified by:
join
in interfaceKTable<K,S>
- Type Parameters:
VR
- the value type of the resultKTable
KO
- the key type of the otherKTable
VO
- the value type of the otherKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
. Keyed by KO.foreignKeyExtractor
- aFunction
that extracts the key (KO) from this table's value (V). If the result is null, the update is ignored as invalid.joiner
- aValueJoiner
that computes the join result for a pair of matching records- Returns:
- a
KTable
that contains the result of joining this table withother
-
join
public <VR,KO,VO> KTable<K,VR> join(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, Named named)
Description copied from interface:KTable
Join records of thisKTable
with anotherKTable
using non-windowed inner join.This is a foreign key join, where the joining key is determined by the
foreignKeyExtractor
.- Specified by:
join
in interfaceKTable<K,S>
- Type Parameters:
VR
- the value type of the resultKTable
KO
- the key type of the otherKTable
VO
- the value type of the otherKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
. Keyed by KO.foreignKeyExtractor
- aFunction
that extracts the key (KO) from this table's value (V). If the result is null, the update is ignored as invalid.joiner
- aValueJoiner
that computes the join result for a pair of matching recordsnamed
- aNamed
config used to name the processor in the topology- Returns:
- a
KTable
that contains the result of joining this table withother
-
join
public <VR,KO,VO> KTable<K,VR> join(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Description copied from interface:KTable
Join records of thisKTable
with anotherKTable
using non-windowed inner join.This is a foreign key join, where the joining key is determined by the
foreignKeyExtractor
.- Specified by:
join
in interfaceKTable<K,S>
- Type Parameters:
VR
- the value type of the resultKTable
KO
- the key type of the otherKTable
VO
- the value type of the otherKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
. Keyed by KO.foreignKeyExtractor
- aFunction
that extracts the key (KO) from this table's value (V). If the result is null, the update is ignored as invalid.joiner
- aValueJoiner
that computes the join result for a pair of matching recordsmaterialized
- aMaterialized
that describes how theStateStore
for the resultingKTable
should be materialized. Cannot benull
- Returns:
- a
KTable
that contains the result of joining this table withother
-
join
public <VR,KO,VO> KTable<K,VR> join(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Description copied from interface:KTable
Join records of thisKTable
with anotherKTable
using non-windowed inner join.This is a foreign key join, where the joining key is determined by the
foreignKeyExtractor
.- Specified by:
join
in interfaceKTable<K,S>
- Type Parameters:
VR
- the value type of the resultKTable
KO
- the key type of the otherKTable
VO
- the value type of the otherKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
. Keyed by KO.foreignKeyExtractor
- aFunction
that extracts the key (KO) from this table's value (V). If the result is null, the update is ignored as invalid.joiner
- aValueJoiner
that computes the join result for a pair of matching recordsnamed
- aNamed
config used to name the processor in the topologymaterialized
- aMaterialized
that describes how theStateStore
for the resultingKTable
should be materialized. Cannot benull
- Returns:
- a
KTable
that contains the result of joining this table withother
-
leftJoin
public <VR,KO,VO> KTable<K,VR> leftJoin(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner)
Description copied from interface:KTable
Join records of thisKTable
with anotherKTable
using non-windowed left join.This is a foreign key join, where the joining key is determined by the
foreignKeyExtractor
.- Specified by:
leftJoin
in interfaceKTable<K,S>
- Type Parameters:
VR
- the value type of the resultKTable
KO
- the key type of the otherKTable
VO
- the value type of the otherKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
. Keyed by KO.foreignKeyExtractor
- aFunction
that extracts the key (KO) from this table's value (V). If the result is null, the update is ignored as invalid.joiner
- aValueJoiner
that computes the join result for a pair of matching records- Returns:
- a
KTable
that contains only those records that satisfy the given predicate
-
leftJoin
public <VR,KO,VO> KTable<K,VR> leftJoin(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, Named named)
Description copied from interface:KTable
Join records of thisKTable
with anotherKTable
using non-windowed left join.This is a foreign key join, where the joining key is determined by the
foreignKeyExtractor
.- Specified by:
leftJoin
in interfaceKTable<K,S>
- Type Parameters:
VR
- the value type of the resultKTable
KO
- the key type of the otherKTable
VO
- the value type of the otherKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
. Keyed by KO.foreignKeyExtractor
- aFunction
that extracts the key (KO) from this table's value (V) If the result is null, the update is ignored as invalid.joiner
- aValueJoiner
that computes the join result for a pair of matching recordsnamed
- aNamed
config used to name the processor in the topology- Returns:
- a
KTable
that contains the result of joining this table withother
-
leftJoin
public <VR,KO,VO> KTable<K,VR> leftJoin(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, Named named, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Description copied from interface:KTable
Join records of thisKTable
with anotherKTable
using non-windowed left join.This is a foreign key join, where the joining key is determined by the
foreignKeyExtractor
.- Specified by:
leftJoin
in interfaceKTable<K,S>
- Type Parameters:
VR
- the value type of the resultKTable
KO
- the key type of the otherKTable
VO
- the value type of the otherKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
. Keyed by KO.foreignKeyExtractor
- aFunction
that extracts the key (KO) from this table's value (V) If the result is null, the update is ignored as invalid.joiner
- aValueJoiner
that computes the join result for a pair of matching recordsnamed
- aNamed
config used to name the processor in the topologymaterialized
- aMaterialized
that describes how theStateStore
for the resultingKTable
should be materialized. Cannot benull
- Returns:
- a
KTable
that contains the result of joining this table withother
-
leftJoin
public <VR,KO,VO> KTable<K,VR> leftJoin(KTable<KO,VO> other, Function<V,KO> foreignKeyExtractor, ValueJoiner<V,VO,VR> joiner, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Description copied from interface:KTable
Join records of thisKTable
with anotherKTable
using non-windowed left join.This is a foreign key join, where the joining key is determined by the
foreignKeyExtractor
.- Specified by:
leftJoin
in interfaceKTable<K,S>
- Type Parameters:
VR
- the value type of the resultKTable
KO
- the key type of the otherKTable
VO
- the value type of the otherKTable
- Parameters:
other
- the otherKTable
to be joined with thisKTable
. Keyed by KO.foreignKeyExtractor
- aFunction
that extracts the key (KO) from this table's value (V). If the result is null, the update is ignored as invalid.joiner
- aValueJoiner
that computes the join result for a pair of matching recordsmaterialized
- aMaterialized
that describes how theStateStore
for the resultingKTable
should be materialized. Cannot benull
- Returns:
- a
KTable
that contains the result of joining this table withother
-
-