Class ThreadCache
- java.lang.Object
-
- org.apache.kafka.streams.state.internals.ThreadCache
-
public class ThreadCache extends Object
An in-memory LRU cache store similar toMemoryLRUCache
but byte-based, not record based
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static interface
ThreadCache.DirtyEntryFlushListener
-
Constructor Summary
Constructors Constructor Description ThreadCache(org.apache.kafka.common.utils.LogContext logContext, long maxCacheSizeBytes, StreamsMetricsImpl metrics)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description void
addDirtyEntryFlushListener(String namespace, ThreadCache.DirtyEntryFlushListener listener)
Add a listener that is called each time an entry is evicted from the cache or an explicit flush is calledorg.apache.kafka.streams.state.internals.ThreadCache.MemoryLRUCacheBytesIterator
all(String namespace)
org.apache.kafka.streams.state.internals.LRUCacheEntry
delete(String namespace, org.apache.kafka.common.utils.Bytes key)
long
evicts()
void
flush(String namespace)
long
flushes()
org.apache.kafka.streams.state.internals.LRUCacheEntry
get(String namespace, org.apache.kafka.common.utils.Bytes key)
long
gets()
static String
nameSpaceFromTaskIdAndStore(String taskIDString, String underlyingStoreName)
The thread cache maintains a set ofNamedCache
s whose names are a concatenation of the task ID and the underlying store name.void
put(String namespace, org.apache.kafka.common.utils.Bytes key, org.apache.kafka.streams.state.internals.LRUCacheEntry value)
void
putAll(String namespace, List<KeyValue<org.apache.kafka.common.utils.Bytes,org.apache.kafka.streams.state.internals.LRUCacheEntry>> entries)
org.apache.kafka.streams.state.internals.LRUCacheEntry
putIfAbsent(String namespace, org.apache.kafka.common.utils.Bytes key, org.apache.kafka.streams.state.internals.LRUCacheEntry value)
long
puts()
org.apache.kafka.streams.state.internals.ThreadCache.MemoryLRUCacheBytesIterator
range(String namespace, org.apache.kafka.common.utils.Bytes from, org.apache.kafka.common.utils.Bytes to)
long
size()
static String
taskIDfromCacheName(String cacheName)
Given a cache name of the form taskid-storename, return the task ID.static String
underlyingStoreNamefromCacheName(String cacheName)
Given a cache name of the form taskid-storename, return the store name.
-
-
-
Constructor Detail
-
ThreadCache
public ThreadCache(org.apache.kafka.common.utils.LogContext logContext, long maxCacheSizeBytes, StreamsMetricsImpl metrics)
-
-
Method Detail
-
puts
public long puts()
-
gets
public long gets()
-
evicts
public long evicts()
-
flushes
public long flushes()
-
nameSpaceFromTaskIdAndStore
public static String nameSpaceFromTaskIdAndStore(String taskIDString, String underlyingStoreName)
The thread cache maintains a set ofNamedCache
s whose names are a concatenation of the task ID and the underlying store name. This method creates those names.- Parameters:
taskIDString
- Task IDunderlyingStoreName
- Underlying store name- Returns:
-
taskIDfromCacheName
public static String taskIDfromCacheName(String cacheName)
Given a cache name of the form taskid-storename, return the task ID.- Parameters:
cacheName
-- Returns:
-
underlyingStoreNamefromCacheName
public static String underlyingStoreNamefromCacheName(String cacheName)
Given a cache name of the form taskid-storename, return the store name.- Parameters:
cacheName
-- Returns:
-
addDirtyEntryFlushListener
public void addDirtyEntryFlushListener(String namespace, ThreadCache.DirtyEntryFlushListener listener)
Add a listener that is called each time an entry is evicted from the cache or an explicit flush is called- Parameters:
namespace
-listener
-
-
flush
public void flush(String namespace)
-
get
public org.apache.kafka.streams.state.internals.LRUCacheEntry get(String namespace, org.apache.kafka.common.utils.Bytes key)
-
put
public void put(String namespace, org.apache.kafka.common.utils.Bytes key, org.apache.kafka.streams.state.internals.LRUCacheEntry value)
-
putIfAbsent
public org.apache.kafka.streams.state.internals.LRUCacheEntry putIfAbsent(String namespace, org.apache.kafka.common.utils.Bytes key, org.apache.kafka.streams.state.internals.LRUCacheEntry value)
-
putAll
public void putAll(String namespace, List<KeyValue<org.apache.kafka.common.utils.Bytes,org.apache.kafka.streams.state.internals.LRUCacheEntry>> entries)
-
delete
public org.apache.kafka.streams.state.internals.LRUCacheEntry delete(String namespace, org.apache.kafka.common.utils.Bytes key)
-
range
public org.apache.kafka.streams.state.internals.ThreadCache.MemoryLRUCacheBytesIterator range(String namespace, org.apache.kafka.common.utils.Bytes from, org.apache.kafka.common.utils.Bytes to)
-
all
public org.apache.kafka.streams.state.internals.ThreadCache.MemoryLRUCacheBytesIterator all(String namespace)
-
size
public long size()
-
-