Skip to content

Commit 3408314

Browse files
committed
Add retention
1 parent d657118 commit 3408314

File tree

2 files changed

+52
-11
lines changed

2 files changed

+52
-11
lines changed

quixstreams/state/rocksdb/timestamped.py

+25-6
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
"TimestampedPartitionTransaction",
1818
)
1919

20+
DAYS_7 = 7 * 24 * 60 * 60 * 1000
21+
2022

2123
class TimestampedPartitionTransaction(PartitionTransaction):
2224
"""
@@ -39,6 +41,7 @@ def get_last(
3941
self,
4042
timestamp: int,
4143
prefix: Any,
44+
retention: int = DAYS_7,
4245
cf_name: str = "default",
4346
) -> Optional[Any]:
4447
"""Get the latest value for a prefix up to a given timestamp.
@@ -56,8 +59,12 @@ def get_last(
5659
:return: The deserialized value if found, otherwise None.
5760
"""
5861
prefix = self._ensure_bytes(prefix)
59-
# Add +1 because the storage `.iter_items()` is exclusive on the upper bound
60-
key = self._serialize_key(timestamp + 1, prefix)
62+
63+
# Negative retention is not allowed
64+
lower_bound = self._serialize_key(max(timestamp - retention, 0), prefix)
65+
# +1 because upper bound is exclusive
66+
upper_bound = self._serialize_key(timestamp + 1, prefix)
67+
6168
value: Optional[bytes] = None
6269

6370
deletes = self._update_cache.get_deletes(cf_name=cf_name)
@@ -68,13 +75,13 @@ def get_last(
6875

6976
cached = sorted(updates.items(), reverse=True)
7077
for cached_key, cached_value in cached:
71-
if prefix < cached_key < key and cached_key not in deletes:
78+
if lower_bound <= cached_key < upper_bound and cached_key not in deletes:
7279
value = cached_value
7380
break
7481

7582
stored = self._partition.iter_items(
76-
lower_bound=prefix,
77-
upper_bound=key,
83+
lower_bound=lower_bound,
84+
upper_bound=upper_bound,
7885
backwards=True,
7986
cf_name=cf_name,
8087
)
@@ -108,7 +115,19 @@ def set(self, timestamp: int, value: Any, prefix: Any, cf_name: str = "default")
108115
super().set(timestamp, value, prefix, cf_name=cf_name)
109116

110117
def expire(self, timestamp: int, prefix: bytes, cf_name: str = "default"):
111-
key = self._serialize_key(timestamp + 1, prefix)
118+
"""
119+
Delete all entries for a given prefix with timestamps less than the
120+
provided timestamp.
121+
122+
This applies to both the in-memory update cache and the underlying
123+
RocksDB store within the current transaction.
124+
125+
:param timestamp: The upper bound timestamp (exclusive) in milliseconds.
126+
Entries with timestamps strictly less than this will be deleted.
127+
:param prefix: The key prefix.
128+
:param cf_name: Column family name.
129+
"""
130+
key = self._serialize_key(timestamp, prefix)
112131

113132
cached = self._update_cache.get_updates_for_prefix(
114133
prefix=prefix,

tests/test_quixstreams/test_state/test_rocksdb/test_timestamped.py

+27-5
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def test_get_last_ignore_deleted(
5454
tx.set(timestamp=9, value="value9-stored", prefix=b"key")
5555

5656
with transaction() as tx:
57-
tx.expire(timestamp=9, prefix=b"key")
57+
tx.expire(timestamp=10, prefix=b"key")
5858

5959
# Message with timestamp 8 comes out of order after later messages
6060
# got expired in the same transaction.
@@ -114,13 +114,35 @@ def test_get_last_prefix_not_bytes(transaction: TimestampedPartitionTransaction)
114114
assert tx.get_last(timestamp=10, prefix=b'"key"') == "value"
115115

116116

117+
def test_get_last_from_cache_with_retention(
118+
transaction: TimestampedPartitionTransaction,
119+
):
120+
with transaction() as tx:
121+
tx.set(timestamp=5, value="value", prefix=b"key")
122+
assert tx.get_last(timestamp=10, prefix=b"key") == "value"
123+
assert tx.get_last(timestamp=10, prefix=b"key", retention=5) == "value"
124+
assert tx.get_last(timestamp=10, prefix=b"key", retention=4) == None
125+
126+
127+
def test_get_last_from_store_with_retention(
128+
transaction: TimestampedPartitionTransaction,
129+
):
130+
with transaction() as tx:
131+
tx.set(timestamp=5, value="value", prefix=b"key")
132+
133+
with transaction() as tx:
134+
assert tx.get_last(timestamp=10, prefix=b"key") == "value"
135+
assert tx.get_last(timestamp=10, prefix=b"key", retention=5) == "value"
136+
assert tx.get_last(timestamp=10, prefix=b"key", retention=4) == None
137+
138+
117139
def test_expire_cached(transaction: TimestampedPartitionTransaction):
118140
with transaction() as tx:
119141
tx.set(timestamp=1, value="value1", prefix=b"key")
120142
tx.set(timestamp=10, value="value10", prefix=b"key")
121143
tx.set(timestamp=11, value="value11", prefix=b"key")
122144

123-
tx.expire(timestamp=10, prefix=b"key")
145+
tx.expire(timestamp=11, prefix=b"key")
124146

125147
assert tx.get_last(timestamp=10, prefix=b"key") == None
126148
assert tx.get_last(timestamp=11, prefix=b"key") == "value11"
@@ -133,7 +155,7 @@ def test_expire_stored(transaction: TimestampedPartitionTransaction):
133155
tx.set(timestamp=11, value="value11", prefix=b"key")
134156

135157
with transaction() as tx:
136-
tx.expire(timestamp=10, prefix=b"key")
158+
tx.expire(timestamp=11, prefix=b"key")
137159

138160
assert tx.get_last(timestamp=10, prefix=b"key") == None
139161
assert tx.get_last(timestamp=11, prefix=b"key") == "value11"
@@ -146,7 +168,7 @@ def test_expire_idempotent(transaction: TimestampedPartitionTransaction):
146168
with transaction() as tx:
147169
tx.set(timestamp=10, value="value10", prefix=b"key")
148170

149-
tx.expire(timestamp=10, prefix=b"key")
150-
tx.expire(timestamp=10, prefix=b"key")
171+
tx.expire(timestamp=11, prefix=b"key")
172+
tx.expire(timestamp=11, prefix=b"key")
151173

152174
assert tx.get_last(timestamp=10, prefix=b"key") == None

0 commit comments

Comments
 (0)