Class StateDirectory
- java.lang.Object
-
- org.apache.kafka.streams.processor.internals.StateDirectory
-
public class StateDirectory extends Object
Manages the directories where the state of Tasks owned by aStreamThread
are stored. Handles creation/locking/unlocking/cleaning of the Task Directories. This class is not thread-safe.
-
-
Constructor Summary
Constructors Constructor Description StateDirectory(StreamsConfig config, org.apache.kafka.common.utils.Time time, boolean hasPersistentStores)
Ensures that the state base directory as well as the application's sub-directory are created.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
clean()
void
cleanRemovedTasks(long cleanupDelayMs)
Remove the directories for anyTaskId
s that are no-longer owned by thisStreamThread
and aren't locked by either another process or anotherStreamThread
File
directoryForTask(TaskId taskId)
Get or create the directory for the providedTaskId
.
-
-
-
Constructor Detail
-
StateDirectory
public StateDirectory(StreamsConfig config, org.apache.kafka.common.utils.Time time, boolean hasPersistentStores)
Ensures that the state base directory as well as the application's sub-directory are created.- Parameters:
config
- streams application configuration to read the root state directory pathtime
- system timer used to execute periodic cleanup procedurehasPersistentStores
- only when the application's topology does have stores persisted on local file system, we would go ahead and auto-create the corresponding application / task / store directories whenever necessary; otherwise no directories would be created.- Throws:
ProcessorStateException
- if the base state directory or application state directory does not exist and could not be created when hasPersistentStores is enabled.
-
-
Method Detail
-
directoryForTask
public File directoryForTask(TaskId taskId)
Get or create the directory for the providedTaskId
.- Returns:
- directory for the
TaskId
- Throws:
ProcessorStateException
- if the task directory does not exists and could not be created
-
clean
public void clean()
-
cleanRemovedTasks
public void cleanRemovedTasks(long cleanupDelayMs)
Remove the directories for anyTaskId
s that are no-longer owned by thisStreamThread
and aren't locked by either another process or anotherStreamThread
- Parameters:
cleanupDelayMs
- only remove directories if they haven't been modified for at least this amount of time (milliseconds)
-
-