Class StreamTask
- java.lang.Object
-
- org.apache.kafka.streams.processor.internals.AbstractTask
-
- org.apache.kafka.streams.processor.internals.StreamTask
-
- All Implemented Interfaces:
ProcessorNodePunctuator
,Task
public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, Task
A StreamTask is associated with aPartitionGroup
, and is assigned to a StreamThread for processing.
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from interface org.apache.kafka.streams.processor.internals.Task
Task.State, Task.TaskType
-
-
Field Summary
-
Fields inherited from class org.apache.kafka.streams.processor.internals.AbstractTask
id, inputPartitions, stateDirectory, stateMgr, topology
-
Fields inherited from interface org.apache.kafka.streams.processor.internals.Task
LATEST_OFFSET
-
-
Constructor Summary
Constructors Constructor Description StreamTask(TaskId id, Set<TopicPartition> partitions, ProcessorTopology topology, Consumer<byte[],byte[]> mainConsumer, StreamsConfig config, StreamsMetricsImpl streamsMetrics, StateDirectory stateDirectory, ThreadCache cache, org.apache.kafka.common.utils.Time time, ProcessorStateManager stateMgr, RecordCollector recordCollector, InternalProcessorContext processorContext)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[],byte[]>> records)
Adds records to queues.Map<TopicPartition,Long>
changelogOffsets()
void
closeClean()
Must be idempotent.void
closeCleanAndRecycleState()
Attempt a clean close but do not close the underlying statevoid
closeDirty()
Must be idempotent.boolean
commitNeeded()
boolean
commitRequested()
Whether or not a request has been made to commit the current statevoid
completeRestoration()
boolean
hasRecordsQueued()
void
initializeIfNeeded()
boolean
isActive()
boolean
isProcessable(long wallClockTime)
An active task is processable if its buffer contains data for all of its input source topic partitions, or if it is enforced to be processableboolean
maybePunctuateStreamTime()
Possibly trigger registered stream-time punctuation functions if current partition group timestamp has reached the defined stamp Note, this is only called in the presence of new recordsboolean
maybePunctuateSystemTime()
Possibly trigger registered system-time punctuation functions if current system timestamp has reached the defined stamp Note, this is called irrespective of the presence of new recordsvoid
postCommit()
This should only be called if the attempted commit succeeded for this taskMap<TopicPartition,OffsetAndMetadata>
prepareCommit()
boolean
process(long wallClockTime)
Process one record.InternalProcessorContext
processorContext()
void
punctuate(ProcessorNode<?,?> node, long timestamp, PunctuationType type, Punctuator punctuator)
Map<TopicPartition,Long>
purgeableOffsets()
void
recordProcessBatchTime(long processBatchTime)
void
recordProcessTimeRatioAndBufferSize(long allTaskProcessMs, long now)
void
resume()
- resume the taskCancellable
schedule(long interval, PunctuationType type, Punctuator punctuator)
Schedules a punctuation for the processorvoid
suspend()
String
toString()
Produces a string representation containing useful information about a Task.String
toString(String indent)
Produces a string representation containing useful information about a Task starting with the given indent.void
update(Set<TopicPartition> topicPartitions, Map<String,List<String>> allTopologyNodesToSourceTopics)
Updates input partitions and topology after rebalance-
Methods inherited from class org.apache.kafka.streams.processor.internals.AbstractTask
changelogPartitions, getStore, id, inputPartitions, isClosed, markChangelogAsCorrupted, revive, state
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
-
Methods inherited from interface org.apache.kafka.streams.processor.internals.Task
changelogPartitions, getStore, id, inputPartitions, isClosed, markChangelogAsCorrupted, needsInitializationOrRestoration, revive, state
-
-
-
-
Constructor Detail
-
StreamTask
public StreamTask(TaskId id, Set<TopicPartition> partitions, ProcessorTopology topology, Consumer<byte[],byte[]> mainConsumer, StreamsConfig config, StreamsMetricsImpl streamsMetrics, StateDirectory stateDirectory, ThreadCache cache, org.apache.kafka.common.utils.Time time, ProcessorStateManager stateMgr, RecordCollector recordCollector, InternalProcessorContext processorContext)
-
-
Method Detail
-
initializeIfNeeded
public void initializeIfNeeded()
- Specified by:
initializeIfNeeded
in interfaceTask
- Throws:
LockException
- could happen when multi-threads within the single instance, could retryTimeoutException
- if initializing record collector timed outStreamsException
- fatal error, should close the thread
-
completeRestoration
public void completeRestoration()
- Specified by:
completeRestoration
in interfaceTask
- Throws:
TimeoutException
- if fetching committed offsets timed out
-
prepareCommit
public Map<TopicPartition,OffsetAndMetadata> prepareCommit()
- Specified by:
prepareCommit
in interfaceTask
- Returns:
- offsets that should be committed for this task
-
postCommit
public void postCommit()
This should only be called if the attempted commit succeeded for this task- Specified by:
postCommit
in interfaceTask
-
closeClean
public void closeClean()
Description copied from interface:Task
Must be idempotent.- Specified by:
closeClean
in interfaceTask
-
closeDirty
public void closeDirty()
Description copied from interface:Task
Must be idempotent.- Specified by:
closeDirty
in interfaceTask
-
update
public void update(Set<TopicPartition> topicPartitions, Map<String,List<String>> allTopologyNodesToSourceTopics)
Description copied from interface:Task
Updates input partitions and topology after rebalance- Specified by:
update
in interfaceTask
- Overrides:
update
in classAbstractTask
-
closeCleanAndRecycleState
public void closeCleanAndRecycleState()
Description copied from interface:Task
Attempt a clean close but do not close the underlying state- Specified by:
closeCleanAndRecycleState
in interfaceTask
-
isProcessable
public boolean isProcessable(long wallClockTime)
An active task is processable if its buffer contains data for all of its input source topic partitions, or if it is enforced to be processable
-
process
public boolean process(long wallClockTime)
Process one record.- Specified by:
process
in interfaceTask
- Returns:
- true if this method processes a record, false if it does not process a record.
- Throws:
TaskMigratedException
- if the task producer got fenced (EOS only)
-
recordProcessBatchTime
public void recordProcessBatchTime(long processBatchTime)
- Specified by:
recordProcessBatchTime
in interfaceTask
-
recordProcessTimeRatioAndBufferSize
public void recordProcessTimeRatioAndBufferSize(long allTaskProcessMs, long now)
- Specified by:
recordProcessTimeRatioAndBufferSize
in interfaceTask
-
punctuate
public void punctuate(ProcessorNode<?,?> node, long timestamp, PunctuationType type, Punctuator punctuator)
- Specified by:
punctuate
in interfaceProcessorNodePunctuator
- Throws:
IllegalStateException
- if the current node is not nullTaskMigratedException
- if the task producer got fenced (EOS only)
-
purgeableOffsets
public Map<TopicPartition,Long> purgeableOffsets()
- Specified by:
purgeableOffsets
in interfaceTask
-
addRecords
public void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[],byte[]>> records)
Adds records to queues. If a record has an invalid (i.e., negative) timestamp, the record is skipped and not added to the queue for processing- Specified by:
addRecords
in interfaceTask
- Parameters:
partition
- the partitionrecords
- the records
-
schedule
public Cancellable schedule(long interval, PunctuationType type, Punctuator punctuator)
Schedules a punctuation for the processor- Parameters:
interval
- the interval in millisecondstype
- the punctuation type- Throws:
IllegalStateException
- if the current node is not null
-
maybePunctuateStreamTime
public boolean maybePunctuateStreamTime()
Possibly trigger registered stream-time punctuation functions if current partition group timestamp has reached the defined stamp Note, this is only called in the presence of new records- Specified by:
maybePunctuateStreamTime
in interfaceTask
- Throws:
TaskMigratedException
- if the task producer got fenced (EOS only)
-
maybePunctuateSystemTime
public boolean maybePunctuateSystemTime()
Possibly trigger registered system-time punctuation functions if current system timestamp has reached the defined stamp Note, this is called irrespective of the presence of new records- Specified by:
maybePunctuateSystemTime
in interfaceTask
- Throws:
TaskMigratedException
- if the task producer got fenced (EOS only)
-
commitRequested
public boolean commitRequested()
Whether or not a request has been made to commit the current state- Specified by:
commitRequested
in interfaceTask
-
processorContext
public InternalProcessorContext processorContext()
-
toString
public String toString()
Produces a string representation containing useful information about a Task. This is useful in debugging scenarios.
-
toString
public String toString(String indent)
Produces a string representation containing useful information about a Task starting with the given indent. This is useful in debugging scenarios.- Returns:
- A string representation of the Task instance.
-
commitNeeded
public boolean commitNeeded()
- Specified by:
commitNeeded
in interfaceTask
-
changelogOffsets
public Map<TopicPartition,Long> changelogOffsets()
- Specified by:
changelogOffsets
in interfaceTask
- Returns:
- the offsets of all the changelog partitions associated with this task, indicating the current positions of the logged state stores of the task.
-
hasRecordsQueued
public boolean hasRecordsQueued()
-
-