Class RocksDBStore
- java.lang.Object
-
- org.apache.kafka.streams.state.internals.RocksDBStore
-
- All Implemented Interfaces:
StateStore
,BatchWritingStore
,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>
,ReadOnlyKeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>
- Direct Known Subclasses:
RocksDBTimestampedStore
public class RocksDBStore extends Object implements KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>, BatchWritingStore
A persistent key-value store based on RocksDB.
-
-
Field Summary
Fields Modifier and Type Field Description protected boolean
open
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
addToBatch(KeyValue<byte[],byte[]> record, org.rocksdb.WriteBatch batch)
KeyValueIterator<org.apache.kafka.common.utils.Bytes,byte[]>
all()
Return an iterator over all keys in this store.long
approximateNumEntries()
Return an approximate count of key-value mappings in this store.void
close()
Close the storage engine.byte[]
delete(org.apache.kafka.common.utils.Bytes key)
Delete the value from the store (if there is one).void
flush()
Flush any cached databyte[]
get(org.apache.kafka.common.utils.Bytes key)
Get the value corresponding to this key.org.rocksdb.Options
getOptions()
void
init(ProcessorContext context, StateStore root)
Initializes this state store.boolean
isOpen()
Is this store open for reading and writingString
name()
The name of this store.boolean
persistent()
Return if the storage is persistent or not.void
put(org.apache.kafka.common.utils.Bytes key, byte[] value)
Update the value associated with this key.void
putAll(List<KeyValue<org.apache.kafka.common.utils.Bytes,byte[]>> entries)
Update all the given key/value pairs.byte[]
putIfAbsent(org.apache.kafka.common.utils.Bytes key, byte[] value)
Update the value associated with this key, unless a value is already associated with the key.KeyValueIterator<org.apache.kafka.common.utils.Bytes,byte[]>
range(org.apache.kafka.common.utils.Bytes from, org.apache.kafka.common.utils.Bytes to)
Get an iterator over a given range of keys.void
write(org.rocksdb.WriteBatch batch)
-
-
-
Method Detail
-
init
public void init(ProcessorContext context, StateStore root)
Description copied from interface:StateStore
Initializes this state store.The implementation of this function must register the root store in the context via the
ProcessorContext.register(StateStore, StateRestoreCallback)
function, where the firstStateStore
parameter should always be the passed-inroot
object, and the second parameter should be an object of user's implementation of theStateRestoreCallback
interface used for restoring the state store from the changelog.Note that if the state store engine itself supports bulk writes, users can implement another interface
BatchingStateRestoreCallback
which extendsStateRestoreCallback
to let users implement bulk-load restoration logic instead of restoring one record at a time.- Specified by:
init
in interfaceStateStore
-
name
public String name()
Description copied from interface:StateStore
The name of this store.- Specified by:
name
in interfaceStateStore
- Returns:
- the storage name
-
persistent
public boolean persistent()
Description copied from interface:StateStore
Return if the storage is persistent or not.- Specified by:
persistent
in interfaceStateStore
- Returns:
true
if the storage is persistent—false
otherwise
-
isOpen
public boolean isOpen()
Description copied from interface:StateStore
Is this store open for reading and writing- Specified by:
isOpen
in interfaceStateStore
- Returns:
true
if the store is open
-
put
public void put(org.apache.kafka.common.utils.Bytes key, byte[] value)
Description copied from interface:KeyValueStore
Update the value associated with this key.- Specified by:
put
in interfaceKeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>
- Parameters:
key
- The key to associate the value tovalue
- The value to update, it can benull
; if the serialized bytes are alsonull
it is interpreted as deletes
-
putIfAbsent
public byte[] putIfAbsent(org.apache.kafka.common.utils.Bytes key, byte[] value)
Description copied from interface:KeyValueStore
Update the value associated with this key, unless a value is already associated with the key.- Specified by:
putIfAbsent
in interfaceKeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>
- Parameters:
key
- The key to associate the value tovalue
- The value to update, it can benull
; if the serialized bytes are alsonull
it is interpreted as deletes- Returns:
- The old value or
null
if there is no such key.
-
putAll
public void putAll(List<KeyValue<org.apache.kafka.common.utils.Bytes,byte[]>> entries)
Description copied from interface:KeyValueStore
Update all the given key/value pairs.- Specified by:
putAll
in interfaceKeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>
- Parameters:
entries
- A list of entries to put into the store; if the serialized bytes are alsonull
it is interpreted as deletes
-
get
public byte[] get(org.apache.kafka.common.utils.Bytes key)
Description copied from interface:ReadOnlyKeyValueStore
Get the value corresponding to this key.- Specified by:
get
in interfaceReadOnlyKeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>
- Parameters:
key
- The key to fetch- Returns:
- The value or null if no value is found.
-
delete
public byte[] delete(org.apache.kafka.common.utils.Bytes key)
Description copied from interface:KeyValueStore
Delete the value from the store (if there is one).- Specified by:
delete
in interfaceKeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>
- Parameters:
key
- The key- Returns:
- The old value or
null
if there is no such key.
-
range
public KeyValueIterator<org.apache.kafka.common.utils.Bytes,byte[]> range(org.apache.kafka.common.utils.Bytes from, org.apache.kafka.common.utils.Bytes to)
Description copied from interface:ReadOnlyKeyValueStore
Get an iterator over a given range of keys. This iterator must be closed after use. The returned iterator must be safe fromConcurrentModificationException
s and must not return null values. No ordering guarantees are provided.- Specified by:
range
in interfaceReadOnlyKeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>
- Parameters:
from
- The first key that could be in the rangeto
- The last key that could be in the range- Returns:
- The iterator for this range.
-
all
public KeyValueIterator<org.apache.kafka.common.utils.Bytes,byte[]> all()
Description copied from interface:ReadOnlyKeyValueStore
Return an iterator over all keys in this store. This iterator must be closed after use. The returned iterator must be safe fromConcurrentModificationException
s and must not return null values. No ordering guarantees are provided.- Specified by:
all
in interfaceReadOnlyKeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>
- Returns:
- An iterator of all key/value pairs in the store.
-
approximateNumEntries
public long approximateNumEntries()
Return an approximate count of key-value mappings in this store.RocksDB
cannot return an exact entry count without doing a full scan, so this method relies on therocksdb.estimate-num-keys
property to get an approximate count. The returned size also includes a count of dirty keys in the store's in-memory cache, which may lead to some double-counting of entries and inflate the estimate.- Specified by:
approximateNumEntries
in interfaceReadOnlyKeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>
- Returns:
- an approximate count of key-value mappings in the store.
-
flush
public void flush()
Description copied from interface:StateStore
Flush any cached data- Specified by:
flush
in interfaceStateStore
-
addToBatch
public void addToBatch(KeyValue<byte[],byte[]> record, org.rocksdb.WriteBatch batch) throws org.rocksdb.RocksDBException
- Specified by:
addToBatch
in interfaceBatchWritingStore
- Throws:
org.rocksdb.RocksDBException
-
write
public void write(org.rocksdb.WriteBatch batch) throws org.rocksdb.RocksDBException
- Specified by:
write
in interfaceBatchWritingStore
- Throws:
org.rocksdb.RocksDBException
-
close
public void close()
Description copied from interface:StateStore
Close the storage engine. Note that this function needs to be idempotent since it may be called several times on the same state store.Users only need to implement this function but should NEVER need to call this api explicitly as it will be called by the library automatically when necessary
- Specified by:
close
in interfaceStateStore
-
getOptions
public org.rocksdb.Options getOptions()
-
-