Skip to content

Commit d2b2b9c

Browse files
committed
Add expiration mechanism
1 parent 3408314 commit d2b2b9c

File tree

2 files changed

+58
-12
lines changed

2 files changed

+58
-12
lines changed

quixstreams/state/rocksdb/timestamped.py

+11-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
)
1919

2020
DAYS_7 = 7 * 24 * 60 * 60 * 1000
21+
EXPIRATION_COUNTER = 0
2122

2223

2324
class TimestampedPartitionTransaction(PartitionTransaction):
@@ -55,13 +56,17 @@ def get_last(
5556
5657
:param timestamp: The upper bound timestamp (inclusive) in milliseconds.
5758
:param prefix: The key prefix.
59+
:param retention: The retention period in milliseconds.
5860
:param cf_name: The column family name.
5961
:return: The deserialized value if found, otherwise None.
6062
"""
63+
global EXPIRATION_COUNTER
64+
6165
prefix = self._ensure_bytes(prefix)
6266

6367
# Negative retention is not allowed
64-
lower_bound = self._serialize_key(max(timestamp - retention, 0), prefix)
68+
lower_bound_timestamp = max(timestamp - retention, 0)
69+
lower_bound = self._serialize_key(lower_bound_timestamp, prefix)
6570
# +1 because upper bound is exclusive
6671
upper_bound = self._serialize_key(timestamp + 1, prefix)
6772

@@ -96,6 +101,10 @@ def get_last(
96101
# iterating backwards from the upper bound.
97102
break
98103

104+
if not EXPIRATION_COUNTER % 1000:
105+
self._expire(lower_bound_timestamp, prefix, cf_name=cf_name)
106+
EXPIRATION_COUNTER += 1
107+
99108
return self._deserialize_value(value) if value is not None else None
100109

101110
@validate_transaction_status(PartitionTransactionStatus.STARTED)
@@ -114,7 +123,7 @@ def set(self, timestamp: int, value: Any, prefix: Any, cf_name: str = "default")
114123
prefix = self._ensure_bytes(prefix)
115124
super().set(timestamp, value, prefix, cf_name=cf_name)
116125

117-
def expire(self, timestamp: int, prefix: bytes, cf_name: str = "default"):
126+
def _expire(self, timestamp: int, prefix: bytes, cf_name: str = "default"):
118127
"""
119128
Delete all entries for a given prefix with timestamps less than the
120129
provided timestamp.

tests/test_quixstreams/test_state/test_rocksdb/test_timestamped.py

+47-10
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
from contextlib import contextmanager
22
from typing import Any
3+
from unittest import mock
34

45
import pytest
56

7+
from quixstreams.state.rocksdb.partition import Marker
68
from quixstreams.state.rocksdb.timestamped import (
79
TimestampedPartitionTransaction,
810
TimestampedStore,
@@ -54,13 +56,8 @@ def test_get_last_ignore_deleted(
5456
tx.set(timestamp=9, value="value9-stored", prefix=b"key")
5557

5658
with transaction() as tx:
57-
tx.expire(timestamp=10, prefix=b"key")
58-
59-
# Message with timestamp 8 comes out of order after later messages
60-
# got expired in the same transaction.
61-
# TODO: Should we in this case "unexpire" the timestamp 9 message?
59+
tx._expire(timestamp=10, prefix=b"key")
6260
tx.set(timestamp=8, value="value8-cached", prefix=b"key")
63-
6461
assert tx.get_last(timestamp=10, prefix=b"key") == "value8-cached"
6562

6663

@@ -136,13 +133,53 @@ def test_get_last_from_store_with_retention(
136133
assert tx.get_last(timestamp=10, prefix=b"key", retention=4) == None
137134

138135

136+
@mock.patch("quixstreams.state.rocksdb.timestamped.EXPIRATION_COUNTER", 1000)
137+
def test_get_last_from_cache_with_expire_call(
138+
transaction: TimestampedPartitionTransaction,
139+
):
140+
with transaction() as tx:
141+
tx.set(timestamp=5, value="value1", prefix=b"key")
142+
assert tx._update_cache.get_updates_for_prefix(prefix=b"key") == {
143+
b"key|\x00\x00\x00\x00\x00\x00\x00\x05": b'"value1"',
144+
}
145+
146+
# Expiration counter is exhausted for this `get_last` call
147+
# `_expire` method is called with `lower_bound_timestamp` = 10 - 4 = 6
148+
# Everything below timestamp 6 gets expired.
149+
assert tx.get_last(timestamp=10, prefix=b"key", retention=4) == None
150+
assert tx._update_cache.get_updates_for_prefix(prefix=b"key") == {}
151+
152+
153+
@mock.patch("quixstreams.state.rocksdb.timestamped.EXPIRATION_COUNTER", 1000)
154+
def test_get_last_from_store_with_expire_call(
155+
transaction: TimestampedPartitionTransaction,
156+
):
157+
key = b"key|\x00\x00\x00\x00\x00\x00\x00\x05"
158+
159+
with transaction() as tx:
160+
tx.set(timestamp=5, value="value1", prefix=b"key")
161+
162+
with transaction() as tx:
163+
assert tx._partition.get(key) == b'"value1"'
164+
assert not tx._update_cache.get_deletes()
165+
166+
# Expiration counter is exhausted for this `get_last` call
167+
# `_expire` method is called with `lower_bound_timestamp` = 10 - 4 = 6
168+
# Everything belowe timestamp 6 gets expired.
169+
assert tx.get_last(timestamp=10, prefix=b"key", retention=4) == None
170+
assert key in tx._update_cache.get_deletes()
171+
172+
with transaction() as tx:
173+
assert tx._partition.get(key) is Marker.UNDEFINED
174+
175+
139176
def test_expire_cached(transaction: TimestampedPartitionTransaction):
140177
with transaction() as tx:
141178
tx.set(timestamp=1, value="value1", prefix=b"key")
142179
tx.set(timestamp=10, value="value10", prefix=b"key")
143180
tx.set(timestamp=11, value="value11", prefix=b"key")
144181

145-
tx.expire(timestamp=11, prefix=b"key")
182+
tx._expire(timestamp=11, prefix=b"key")
146183

147184
assert tx.get_last(timestamp=10, prefix=b"key") == None
148185
assert tx.get_last(timestamp=11, prefix=b"key") == "value11"
@@ -155,7 +192,7 @@ def test_expire_stored(transaction: TimestampedPartitionTransaction):
155192
tx.set(timestamp=11, value="value11", prefix=b"key")
156193

157194
with transaction() as tx:
158-
tx.expire(timestamp=11, prefix=b"key")
195+
tx._expire(timestamp=11, prefix=b"key")
159196

160197
assert tx.get_last(timestamp=10, prefix=b"key") == None
161198
assert tx.get_last(timestamp=11, prefix=b"key") == "value11"
@@ -168,7 +205,7 @@ def test_expire_idempotent(transaction: TimestampedPartitionTransaction):
168205
with transaction() as tx:
169206
tx.set(timestamp=10, value="value10", prefix=b"key")
170207

171-
tx.expire(timestamp=11, prefix=b"key")
172-
tx.expire(timestamp=11, prefix=b"key")
208+
tx._expire(timestamp=11, prefix=b"key")
209+
tx._expire(timestamp=11, prefix=b"key")
173210

174211
assert tx.get_last(timestamp=10, prefix=b"key") == None

0 commit comments

Comments
 (0)