Class ProcessorStateManager
- java.lang.Object
-
- org.apache.kafka.streams.processor.internals.ProcessorStateManager
-
public class ProcessorStateManager extends Object
ProcessorStateManager is the source of truth for the current offset for each state store, which is either the read offset during restoring, or the written offset during normal processing. The offset is initialized as null when the state store is registered, and then it can be updated by loading checkpoint file, restore state stores, or passing from the record collector's written offsets. When checkpointing, if the offset is not null it would be written to the file. The manager is also responsible for restoring state stores via their registered restore callback, which is used for both updating standby tasks as well as restoring active tasks.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
ProcessorStateManager.StateStoreMetadata
-
Constructor Summary
Constructors Constructor Description ProcessorStateManager(TaskId taskId, Task.TaskType taskType, boolean eosEnabled, org.apache.kafka.common.utils.LogContext logContext, StateDirectory stateDirectory, ChangelogRegister changelogReader, Map<String,String> storeToChangelogTopic, Collection<TopicPartition> sourcePartitions)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description File
baseDir()
Map<TopicPartition,Long>
changelogOffsets()
void
checkpoint(Map<TopicPartition,Long> writtenOffsets)
void
close()
Close
all stores (even in case of failure).void
flush()
StateStore
getGlobalStore(String name)
StateStore
getStore(String name)
void
registerStore(StateStore store, StateRestoreCallback stateRestoreCallback)
static String
storeChangelogTopic(String applicationId, String storeName, String internalStream)
Task.TaskType
taskType()
-
-
-
Constructor Detail
-
ProcessorStateManager
public ProcessorStateManager(TaskId taskId, Task.TaskType taskType, boolean eosEnabled, org.apache.kafka.common.utils.LogContext logContext, StateDirectory stateDirectory, ChangelogRegister changelogReader, Map<String,String> storeToChangelogTopic, Collection<TopicPartition> sourcePartitions) throws ProcessorStateException
- Throws:
ProcessorStateException
- if the task directory does not exist and could not be created
-
-
Method Detail
-
storeChangelogTopic
public static String storeChangelogTopic(String applicationId, String storeName, String internalStream)
-
getGlobalStore
public StateStore getGlobalStore(String name)
-
baseDir
public File baseDir()
-
registerStore
public void registerStore(StateStore store, StateRestoreCallback stateRestoreCallback)
-
getStore
public StateStore getStore(String name)
-
changelogOffsets
public Map<TopicPartition,Long> changelogOffsets()
-
taskType
public Task.TaskType taskType()
-
flush
public void flush()
- Throws:
TaskMigratedException
- recoverable error sending changelog records that would cause the task to be removedStreamsException
- fatal error when flushing the state store, for example sending changelog records failed or flushing state store get IO errors; such error should cause the thread to die
-
close
public void close() throws ProcessorStateException
Close
all stores (even in case of failure). Log all exceptions and re-throw the first exception that occurred at the end.- Throws:
ProcessorStateException
- if any error happens when closing the state stores
-
checkpoint
public void checkpoint(Map<TopicPartition,Long> writtenOffsets)
-
-