Class StateDirectory


  • public class StateDirectory
    extends Object
    Manages the directories where the state of Tasks owned by a StreamThread are stored. Handles creation/locking/unlocking/cleaning of the Task Directories. This class is not thread-safe.
    • Constructor Detail

      • StateDirectory

        public StateDirectory​(StreamsConfig config,
                              org.apache.kafka.common.utils.Time time,
                              boolean hasPersistentStores)
        Ensures that the state base directory as well as the application's sub-directory are created.
        Parameters:
        config - streams application configuration to read the root state directory path
        time - system timer used to execute periodic cleanup procedure
        hasPersistentStores - only when the application's topology does have stores persisted on local file system, we would go ahead and auto-create the corresponding application / task / store directories whenever necessary; otherwise no directories would be created.
        Throws:
        ProcessorStateException - if the base state directory or application state directory does not exist and could not be created when hasPersistentStores is enabled.
    • Method Detail

      • directoryForTask

        public File directoryForTask​(TaskId taskId)
        Get or create the directory for the provided TaskId.
        Returns:
        directory for the TaskId
        Throws:
        ProcessorStateException - if the task directory does not exists and could not be created
      • clean

        public void clean()
      • cleanRemovedTasks

        public void cleanRemovedTasks​(long cleanupDelayMs)
        Remove the directories for any TaskIds that are no-longer owned by this StreamThread and aren't locked by either another process or another StreamThread
        Parameters:
        cleanupDelayMs - only remove directories if they haven't been modified for at least this amount of time (milliseconds)