Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 44 additions & 7 deletions snuba/lw_deletions/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ def __init__(
self.__metrics = metrics
self.__last_ongoing_mutations_check: Optional[float] = None
self.__redis_client = get_redis_client(RedisClientKey.CONFIG)
self.__local_inflight_count = 0
self.__mutations_issued_this_submit = 0

def poll(self) -> None:
self.__next_step.poll()
Expand All @@ -99,6 +101,7 @@ def _is_execute_enabled(self, conditions: Sequence[ConditionsBag]) -> bool:
return org_ids_delete_allowlist.issuperset(query_org_ids)

def submit(self, message: Message[ValuesBatch[KafkaPayload]]) -> None:
self.__mutations_issued_this_submit = 0
decode_messages = [rapidjson.loads(m.payload.value) for m in message.value.payload]
conditions = self.__formatter.format(decode_messages)

Expand Down Expand Up @@ -233,6 +236,14 @@ def _execute_delete_by_partition(
tracking_key = f"lw_delete_partitions:{self.__storage_name}:{cond_hash}"
ttl = settings.LW_DELETES_PARTITION_TRACKING_TTL

per_submit_budget = typing.cast(
int,
get_int_config(
"lw_deletes_per_submit_budget",
default=settings.LW_DELETES_PER_SUBMIT_BUDGET,
),
)

for partition_date in partition_dates:
member = f"{table}:{partition_date}"
days_delta = (datetime.strptime(partition_date, "%Y-%m-%d") - datetime.now()).days
Expand All @@ -249,6 +260,11 @@ def _execute_delete_by_partition(
)
continue

if self.__mutations_issued_this_submit >= per_submit_budget:
raise TooManyOngoingMutationsError(
f"per-submit budget of {per_submit_budget} mutations exhausted"
)

self._check_ongoing_mutations(skip_throttle=True)
partition_condition = equals(
FunctionCall(None, "toMonday", (column(self.__partition_column),)), # type: ignore[arg-type]
Expand Down Expand Up @@ -285,6 +301,8 @@ def _execute_single_delete(
attribution_info=self._get_attribute_info(),
query_settings=query_settings,
)
self.__local_inflight_count += 1
self.__mutations_issued_this_submit += 1
self.__metrics.timing(
"execute_delete_query_ms",
(time.time() - start) * 1000,
Comment on lines 301 to 308
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The _execute_single_delete function catches and logs specific ClickhouseError exceptions but doesn't re-raise them, causing failed deletions to be incorrectly marked as successful and skipped on retry.
Severity: CRITICAL

Suggested Fix

Re-raise the exception after it is logged within the except block in the _execute_single_delete function. This will ensure that the failure propagates, preventing the partition from being marked as successfully processed in Redis and allowing the deletion to be retried.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: snuba/lw_deletions/strategy.py#L301-L308

Potential issue: In the `_execute_single_delete` function, when a delete operation fails
with a `ClickhouseError` whose error code is listed in
`LW_DELETE_NON_RETRYABLE_CLICKHOUSE_ERROR_CODES` (such as `TIMEOUT_EXCEEDED` or
`MEMORY_LIMIT_EXCEEDED`), the exception is caught and logged, but not re-raised. This
causes the system to incorrectly treat the failed deletion as successful. As a result,
the `partition_delete_executed` metric is incremented, and the partition is marked as
processed in Redis. This prevents any future attempts to delete the data in that
partition, leading to permanent data retention.

Did we get this right? 👍 / 👎 to inform future reviews.

Expand All @@ -299,7 +317,31 @@ def _execute_single_delete(
else:
raise LWDeleteQueryException(exc.message)

delay_ms = get_int_config("lw_delete_inter_mutation_delay_ms", default=0)
if delay_ms and delay_ms > 0:
time.sleep(delay_ms / 1000.0)

def _check_ongoing_mutations(self, skip_throttle: bool = False) -> None:
max_ongoing_mutations = typing.cast(
int,
get_int_config(
"max_ongoing_mutations_for_delete",
default=settings.MAX_ONGOING_MUTATIONS_FOR_DELETE,
),
)

# Fast path: local counter already exceeds limit, no need to query CH
if self.__local_inflight_count > max_ongoing_mutations:
now = time.time()
if (
self.__last_ongoing_mutations_check is not None
and now - self.__last_ongoing_mutations_check < 1.0
):
raise TooManyOngoingMutationsError(
f"local inflight count {self.__local_inflight_count} exceeds max {max_ongoing_mutations}"
)
# Fall through to reconcile with CH

now = time.time()
if (
not skip_throttle
Expand All @@ -312,13 +354,8 @@ def _check_ongoing_mutations(self, skip_throttle: bool = False) -> None:
start = time.time()
ongoing_mutations = _num_ongoing_mutations(self.__storage.get_cluster(), self.__tables)
self.__last_ongoing_mutations_check = time.time()
max_ongoing_mutations = typing.cast(
int,
get_int_config(
"max_ongoing_mutations_for_delete",
default=settings.MAX_ONGOING_MUTATIONS_FOR_DELETE,
),
)
# Reconcile: trust CH as source of truth for completions
self.__local_inflight_count = ongoing_mutations
self.__metrics.timing("ongoing_mutations_query_ms", (time.time() - start) * 1000)
if ongoing_mutations > max_ongoing_mutations:
raise TooManyOngoingMutationsError(
Expand Down
1 change: 1 addition & 0 deletions snuba/settings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ class RedisClusters(TypedDict):

MAX_ONGOING_MUTATIONS_FOR_DELETE = 5
LW_DELETES_PARTITION_TRACKING_TTL = 3600
LW_DELETES_PER_SUBMIT_BUDGET = 5
SNQL_DISABLED_DATASETS: set[str] = set([])

ENDPOINT_GET_TRACE_PAGINATION_MAX_ITEMS: int = 0 # 0 means no limit
Expand Down
227 changes: 227 additions & 0 deletions tests/lw_deletions/test_lw_deletions.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from snuba.redis import RedisClientKey, get_redis_client
from snuba.utils.streams.topics import Topic as SnubaTopic
from snuba.web.bulk_delete_query import DeleteQueryMessage
from snuba.web.delete_query import TooManyOngoingMutationsError

ROWS_CONDITIONS = {
5: {"project_id": [1], "group_id": [1, 2, 3, 4]},
Expand Down Expand Up @@ -485,3 +486,229 @@ def test_partition_date_filtering(mock_execute: Mock, mock_num_mutations: Mock)
]
assert len(filtered_calls) == 1
assert filtered_calls[0][1]["value"] == 2


@patch("snuba.lw_deletions.strategy._execute_query")
@pytest.mark.redis_db
def test_local_inflight_counter_reconciles_with_ch(mock_execute: Mock) -> None:
"""
Verify that the local in-flight counter reconciles with CH on each check.
When CH returns 0 (stale), the counter resets, allowing more deletes.
When CH returns a high value, the counter reflects it and blocks.
"""
commit_step = Mock()
metrics = Mock()
storage = get_writable_storage(StorageKey("search_issues"))

state.set_config("max_ongoing_mutations_for_delete", 3)
state.set_config("lw_deletes_split_by_partition_search_issues", 1)
state.set_config("lw_deletes_per_submit_budget", 100)

partition_dates = ["2024-01-01", "2024-01-08", "2024-01-15"]

format_query = FormatQuery(commit_step, storage, SearchIssuesFormatter(), metrics)

# CH always returns 0 (stale) — all 3 partitions should execute
with (
patch("snuba.lw_deletions.strategy._num_ongoing_mutations", return_value=0),
patch.object(
format_query,
"_FormatQuery__partition_column",
"receive_timestamp",
),
patch.object(
FormatQuery,
"_get_partition_dates",
return_value=partition_dates,
),
):
strategy = BatchStepCustom(
max_batch_size=8,
max_batch_time=1000,
next_step=format_query,
increment_by=increment_by,
)
strategy.submit(_make_single_message())
strategy.join(2.0)

# CH says 0 each time → local counter reconciles to 0 → all execute
assert mock_execute.call_count == 3
assert commit_step.submit.call_count == 1


@patch("snuba.lw_deletions.strategy._execute_query")
@pytest.mark.redis_db
def test_local_counter_increments_after_each_delete(mock_execute: Mock) -> None:
"""
Verify that _execute_single_delete increments local inflight counter
and that the counter reconciles with CH value on _check_ongoing_mutations.
"""
from snuba.query.query_settings import HTTPQuerySettings

metrics = Mock()
storage = get_writable_storage(StorageKey("search_issues"))

state.set_config("max_ongoing_mutations_for_delete", 5)

format_query = FormatQuery(Mock(), storage, SearchIssuesFormatter(), metrics)

with patch("snuba.lw_deletions.strategy._num_ongoing_mutations", return_value=0):
# Initial CH check: reconciles local counter to 0
format_query._check_ongoing_mutations()
assert format_query._FormatQuery__local_inflight_count == 0 # type: ignore[attr-defined]

# Simulate deletes — counter should increment
query = Mock()
format_query._execute_single_delete("test_table", query, HTTPQuerySettings())
assert format_query._FormatQuery__local_inflight_count == 1 # type: ignore[attr-defined]

format_query._execute_single_delete("test_table", query, HTTPQuerySettings())
assert format_query._FormatQuery__local_inflight_count == 2 # type: ignore[attr-defined]

# CH check with skip_throttle=True reconciles counter to CH value (0)
format_query._check_ongoing_mutations(skip_throttle=True)
assert format_query._FormatQuery__local_inflight_count == 0 # type: ignore[attr-defined]


@patch("snuba.lw_deletions.strategy._num_ongoing_mutations", return_value=1)
@patch("snuba.lw_deletions.strategy._execute_query")
@pytest.mark.redis_db
def test_per_submit_budget_exhaustion(mock_execute: Mock, mock_num_mutations: Mock) -> None:
"""
When partition splitting produces more partitions than the per-submit budget,
the budget should stop issuing DELETEs and raise TooManyOngoingMutationsError.
On retry, Redis tracking skips already-processed partitions.
We test _execute_delete_by_partition directly for precise control.
"""
from snuba.query.query_settings import HTTPQuerySettings

metrics = Mock()
storage = get_writable_storage(StorageKey("search_issues"))

state.set_config("lw_deletes_split_by_partition_search_issues", 1)
state.set_config("lw_deletes_per_submit_budget", 2)

partition_dates = ["2024-01-01", "2024-01-08", "2024-01-15", "2024-01-22", "2024-01-29"]

format_query = FormatQuery(Mock(), storage, SearchIssuesFormatter(), metrics)
conditions = SearchIssuesFormatter().format(
[_get_message(10, {"project_id": [1], "group_id": [1]})]
)
where_clause = Mock()
query_settings = HTTPQuerySettings()

with (
patch.object(
format_query,
"_FormatQuery__partition_column",
"receive_timestamp",
),
patch.object(
FormatQuery,
"_get_partition_dates",
return_value=partition_dates,
),
patch(
"snuba.web.bulk_delete_query.construct_query",
return_value=Mock(),
),
):
table = "search_issues_local_v2"

# First call: processes 2 partitions, budget exhausted
with pytest.raises(TooManyOngoingMutationsError, match="per-submit budget"):
format_query._execute_delete_by_partition(
table, where_clause, query_settings, conditions
)

assert mock_execute.call_count == 2

# Redis should have the 2 processed partitions tracked
redis_client = get_redis_client(RedisClientKey.CONFIG)
keys = list(redis_client.scan_iter("lw_delete_partitions:search_issues:*"))
assert len(keys) == 1
members = redis_client.smembers(keys[0])
assert len(members) == 2

# Retry: reset the per-submit counter (simulating a new submit() call)
mock_execute.reset_mock()
format_query._FormatQuery__mutations_issued_this_submit = 0 # type: ignore[attr-defined]

with pytest.raises(TooManyOngoingMutationsError, match="per-submit budget"):
format_query._execute_delete_by_partition(
table, where_clause, query_settings, conditions
)

# 2 skipped via Redis, 2 more processed
assert mock_execute.call_count == 2
members = redis_client.smembers(keys[0])
assert len(members) == 4

# Final retry: 4 skipped, 1 remaining fits in budget
mock_execute.reset_mock()
format_query._FormatQuery__mutations_issued_this_submit = 0 # type: ignore[attr-defined]

format_query._execute_delete_by_partition(table, where_clause, query_settings, conditions)

assert mock_execute.call_count == 1
members = redis_client.smembers(keys[0])
assert len(members) == 5


@patch("snuba.lw_deletions.strategy._num_ongoing_mutations", return_value=1)
@patch("snuba.lw_deletions.strategy._execute_query")
@patch("snuba.lw_deletions.strategy.time.sleep")
@pytest.mark.redis_db
def test_inter_delete_delay(mock_sleep: Mock, mock_execute: Mock, mock_num_mutations: Mock) -> None:
"""
When lw_delete_inter_mutation_delay_ms is set, time.sleep should be called
after each _execute_single_delete.
"""
commit_step = Mock()
metrics = Mock()
storage = get_writable_storage(StorageKey("search_issues"))

state.set_config("lw_delete_inter_mutation_delay_ms", 200)

strategy = BatchStepCustom(
max_batch_size=8,
max_batch_time=1000,
next_step=FormatQuery(commit_step, storage, SearchIssuesFormatter(), metrics),
increment_by=increment_by,
)
strategy.submit(_make_single_message())
strategy.join(2.0)

assert mock_execute.call_count == 1
# sleep(0.2) should have been called once (200ms)
mock_sleep.assert_called_once_with(0.2)


@patch("snuba.lw_deletions.strategy._num_ongoing_mutations", return_value=1)
@patch("snuba.lw_deletions.strategy._execute_query")
@patch("snuba.lw_deletions.strategy.time.sleep")
@pytest.mark.redis_db
def test_inter_delete_delay_disabled_by_default(
mock_sleep: Mock, mock_execute: Mock, mock_num_mutations: Mock
) -> None:
"""
By default (delay_ms=0), time.sleep should not be called.
"""
commit_step = Mock()
metrics = Mock()
storage = get_writable_storage(StorageKey("search_issues"))

# Explicitly set to 0 (the default)
state.set_config("lw_delete_inter_mutation_delay_ms", 0)

strategy = BatchStepCustom(
max_batch_size=8,
max_batch_time=1000,
next_step=FormatQuery(commit_step, storage, SearchIssuesFormatter(), metrics),
increment_by=increment_by,
)
strategy.submit(_make_single_message())
strategy.join(2.0)

assert mock_execute.call_count == 1
mock_sleep.assert_not_called()
Loading