diff --git a/snuba/lw_deletions/strategy.py b/snuba/lw_deletions/strategy.py index 322d004708..60d356929f 100644 --- a/snuba/lw_deletions/strategy.py +++ b/snuba/lw_deletions/strategy.py @@ -73,6 +73,8 @@ def __init__( self.__metrics = metrics self.__last_ongoing_mutations_check: Optional[float] = None self.__redis_client = get_redis_client(RedisClientKey.CONFIG) + self.__local_inflight_count = 0 + self.__mutations_issued_this_submit = 0 def poll(self) -> None: self.__next_step.poll() @@ -99,6 +101,7 @@ def _is_execute_enabled(self, conditions: Sequence[ConditionsBag]) -> bool: return org_ids_delete_allowlist.issuperset(query_org_ids) def submit(self, message: Message[ValuesBatch[KafkaPayload]]) -> None: + self.__mutations_issued_this_submit = 0 decode_messages = [rapidjson.loads(m.payload.value) for m in message.value.payload] conditions = self.__formatter.format(decode_messages) @@ -233,6 +236,14 @@ def _execute_delete_by_partition( tracking_key = f"lw_delete_partitions:{self.__storage_name}:{cond_hash}" ttl = settings.LW_DELETES_PARTITION_TRACKING_TTL + per_submit_budget = typing.cast( + int, + get_int_config( + "lw_deletes_per_submit_budget", + default=settings.LW_DELETES_PER_SUBMIT_BUDGET, + ), + ) + for partition_date in partition_dates: member = f"{table}:{partition_date}" days_delta = (datetime.strptime(partition_date, "%Y-%m-%d") - datetime.now()).days @@ -249,6 +260,11 @@ def _execute_delete_by_partition( ) continue + if self.__mutations_issued_this_submit >= per_submit_budget: + raise TooManyOngoingMutationsError( + f"per-submit budget of {per_submit_budget} mutations exhausted" + ) + self._check_ongoing_mutations(skip_throttle=True) partition_condition = equals( FunctionCall(None, "toMonday", (column(self.__partition_column),)), # type: ignore[arg-type] @@ -285,6 +301,8 @@ def _execute_single_delete( attribution_info=self._get_attribute_info(), query_settings=query_settings, ) + self.__local_inflight_count += 1 + self.__mutations_issued_this_submit += 1 self.__metrics.timing( "execute_delete_query_ms", (time.time() - start) * 1000, @@ -299,7 +317,31 @@ def _execute_single_delete( else: raise LWDeleteQueryException(exc.message) + delay_ms = get_int_config("lw_delete_inter_mutation_delay_ms", default=0) + if delay_ms and delay_ms > 0: + time.sleep(delay_ms / 1000.0) + def _check_ongoing_mutations(self, skip_throttle: bool = False) -> None: + max_ongoing_mutations = typing.cast( + int, + get_int_config( + "max_ongoing_mutations_for_delete", + default=settings.MAX_ONGOING_MUTATIONS_FOR_DELETE, + ), + ) + + # Fast path: local counter already exceeds limit, no need to query CH + if self.__local_inflight_count > max_ongoing_mutations: + now = time.time() + if ( + self.__last_ongoing_mutations_check is not None + and now - self.__last_ongoing_mutations_check < 1.0 + ): + raise TooManyOngoingMutationsError( + f"local inflight count {self.__local_inflight_count} exceeds max {max_ongoing_mutations}" + ) + # Fall through to reconcile with CH + now = time.time() if ( not skip_throttle @@ -312,13 +354,8 @@ def _check_ongoing_mutations(self, skip_throttle: bool = False) -> None: start = time.time() ongoing_mutations = _num_ongoing_mutations(self.__storage.get_cluster(), self.__tables) self.__last_ongoing_mutations_check = time.time() - max_ongoing_mutations = typing.cast( - int, - get_int_config( - "max_ongoing_mutations_for_delete", - default=settings.MAX_ONGOING_MUTATIONS_FOR_DELETE, - ), - ) + # Reconcile: trust CH as source of truth for completions + self.__local_inflight_count = ongoing_mutations self.__metrics.timing("ongoing_mutations_query_ms", (time.time() - start) * 1000) if ongoing_mutations > max_ongoing_mutations: raise TooManyOngoingMutationsError( diff --git a/snuba/settings/__init__.py b/snuba/settings/__init__.py index 9fce9e168b..e3b7cdc924 100644 --- a/snuba/settings/__init__.py +++ b/snuba/settings/__init__.py @@ -470,6 +470,7 @@ class RedisClusters(TypedDict): MAX_ONGOING_MUTATIONS_FOR_DELETE = 5 LW_DELETES_PARTITION_TRACKING_TTL = 3600 +LW_DELETES_PER_SUBMIT_BUDGET = 5 SNQL_DISABLED_DATASETS: set[str] = set([]) ENDPOINT_GET_TRACE_PAGINATION_MAX_ITEMS: int = 0 # 0 means no limit diff --git a/tests/lw_deletions/test_lw_deletions.py b/tests/lw_deletions/test_lw_deletions.py index c2cba43ccf..559ccda3dc 100644 --- a/tests/lw_deletions/test_lw_deletions.py +++ b/tests/lw_deletions/test_lw_deletions.py @@ -20,6 +20,7 @@ from snuba.redis import RedisClientKey, get_redis_client from snuba.utils.streams.topics import Topic as SnubaTopic from snuba.web.bulk_delete_query import DeleteQueryMessage +from snuba.web.delete_query import TooManyOngoingMutationsError ROWS_CONDITIONS = { 5: {"project_id": [1], "group_id": [1, 2, 3, 4]}, @@ -485,3 +486,229 @@ def test_partition_date_filtering(mock_execute: Mock, mock_num_mutations: Mock) ] assert len(filtered_calls) == 1 assert filtered_calls[0][1]["value"] == 2 + + +@patch("snuba.lw_deletions.strategy._execute_query") +@pytest.mark.redis_db +def test_local_inflight_counter_reconciles_with_ch(mock_execute: Mock) -> None: + """ + Verify that the local in-flight counter reconciles with CH on each check. + When CH returns 0 (stale), the counter resets, allowing more deletes. + When CH returns a high value, the counter reflects it and blocks. + """ + commit_step = Mock() + metrics = Mock() + storage = get_writable_storage(StorageKey("search_issues")) + + state.set_config("max_ongoing_mutations_for_delete", 3) + state.set_config("lw_deletes_split_by_partition_search_issues", 1) + state.set_config("lw_deletes_per_submit_budget", 100) + + partition_dates = ["2024-01-01", "2024-01-08", "2024-01-15"] + + format_query = FormatQuery(commit_step, storage, SearchIssuesFormatter(), metrics) + + # CH always returns 0 (stale) — all 3 partitions should execute + with ( + patch("snuba.lw_deletions.strategy._num_ongoing_mutations", return_value=0), + patch.object( + format_query, + "_FormatQuery__partition_column", + "receive_timestamp", + ), + patch.object( + FormatQuery, + "_get_partition_dates", + return_value=partition_dates, + ), + ): + strategy = BatchStepCustom( + max_batch_size=8, + max_batch_time=1000, + next_step=format_query, + increment_by=increment_by, + ) + strategy.submit(_make_single_message()) + strategy.join(2.0) + + # CH says 0 each time → local counter reconciles to 0 → all execute + assert mock_execute.call_count == 3 + assert commit_step.submit.call_count == 1 + + +@patch("snuba.lw_deletions.strategy._execute_query") +@pytest.mark.redis_db +def test_local_counter_increments_after_each_delete(mock_execute: Mock) -> None: + """ + Verify that _execute_single_delete increments local inflight counter + and that the counter reconciles with CH value on _check_ongoing_mutations. + """ + from snuba.query.query_settings import HTTPQuerySettings + + metrics = Mock() + storage = get_writable_storage(StorageKey("search_issues")) + + state.set_config("max_ongoing_mutations_for_delete", 5) + + format_query = FormatQuery(Mock(), storage, SearchIssuesFormatter(), metrics) + + with patch("snuba.lw_deletions.strategy._num_ongoing_mutations", return_value=0): + # Initial CH check: reconciles local counter to 0 + format_query._check_ongoing_mutations() + assert format_query._FormatQuery__local_inflight_count == 0 # type: ignore[attr-defined] + + # Simulate deletes — counter should increment + query = Mock() + format_query._execute_single_delete("test_table", query, HTTPQuerySettings()) + assert format_query._FormatQuery__local_inflight_count == 1 # type: ignore[attr-defined] + + format_query._execute_single_delete("test_table", query, HTTPQuerySettings()) + assert format_query._FormatQuery__local_inflight_count == 2 # type: ignore[attr-defined] + + # CH check with skip_throttle=True reconciles counter to CH value (0) + format_query._check_ongoing_mutations(skip_throttle=True) + assert format_query._FormatQuery__local_inflight_count == 0 # type: ignore[attr-defined] + + +@patch("snuba.lw_deletions.strategy._num_ongoing_mutations", return_value=1) +@patch("snuba.lw_deletions.strategy._execute_query") +@pytest.mark.redis_db +def test_per_submit_budget_exhaustion(mock_execute: Mock, mock_num_mutations: Mock) -> None: + """ + When partition splitting produces more partitions than the per-submit budget, + the budget should stop issuing DELETEs and raise TooManyOngoingMutationsError. + On retry, Redis tracking skips already-processed partitions. + We test _execute_delete_by_partition directly for precise control. + """ + from snuba.query.query_settings import HTTPQuerySettings + + metrics = Mock() + storage = get_writable_storage(StorageKey("search_issues")) + + state.set_config("lw_deletes_split_by_partition_search_issues", 1) + state.set_config("lw_deletes_per_submit_budget", 2) + + partition_dates = ["2024-01-01", "2024-01-08", "2024-01-15", "2024-01-22", "2024-01-29"] + + format_query = FormatQuery(Mock(), storage, SearchIssuesFormatter(), metrics) + conditions = SearchIssuesFormatter().format( + [_get_message(10, {"project_id": [1], "group_id": [1]})] + ) + where_clause = Mock() + query_settings = HTTPQuerySettings() + + with ( + patch.object( + format_query, + "_FormatQuery__partition_column", + "receive_timestamp", + ), + patch.object( + FormatQuery, + "_get_partition_dates", + return_value=partition_dates, + ), + patch( + "snuba.web.bulk_delete_query.construct_query", + return_value=Mock(), + ), + ): + table = "search_issues_local_v2" + + # First call: processes 2 partitions, budget exhausted + with pytest.raises(TooManyOngoingMutationsError, match="per-submit budget"): + format_query._execute_delete_by_partition( + table, where_clause, query_settings, conditions + ) + + assert mock_execute.call_count == 2 + + # Redis should have the 2 processed partitions tracked + redis_client = get_redis_client(RedisClientKey.CONFIG) + keys = list(redis_client.scan_iter("lw_delete_partitions:search_issues:*")) + assert len(keys) == 1 + members = redis_client.smembers(keys[0]) + assert len(members) == 2 + + # Retry: reset the per-submit counter (simulating a new submit() call) + mock_execute.reset_mock() + format_query._FormatQuery__mutations_issued_this_submit = 0 # type: ignore[attr-defined] + + with pytest.raises(TooManyOngoingMutationsError, match="per-submit budget"): + format_query._execute_delete_by_partition( + table, where_clause, query_settings, conditions + ) + + # 2 skipped via Redis, 2 more processed + assert mock_execute.call_count == 2 + members = redis_client.smembers(keys[0]) + assert len(members) == 4 + + # Final retry: 4 skipped, 1 remaining fits in budget + mock_execute.reset_mock() + format_query._FormatQuery__mutations_issued_this_submit = 0 # type: ignore[attr-defined] + + format_query._execute_delete_by_partition(table, where_clause, query_settings, conditions) + + assert mock_execute.call_count == 1 + members = redis_client.smembers(keys[0]) + assert len(members) == 5 + + +@patch("snuba.lw_deletions.strategy._num_ongoing_mutations", return_value=1) +@patch("snuba.lw_deletions.strategy._execute_query") +@patch("snuba.lw_deletions.strategy.time.sleep") +@pytest.mark.redis_db +def test_inter_delete_delay(mock_sleep: Mock, mock_execute: Mock, mock_num_mutations: Mock) -> None: + """ + When lw_delete_inter_mutation_delay_ms is set, time.sleep should be called + after each _execute_single_delete. + """ + commit_step = Mock() + metrics = Mock() + storage = get_writable_storage(StorageKey("search_issues")) + + state.set_config("lw_delete_inter_mutation_delay_ms", 200) + + strategy = BatchStepCustom( + max_batch_size=8, + max_batch_time=1000, + next_step=FormatQuery(commit_step, storage, SearchIssuesFormatter(), metrics), + increment_by=increment_by, + ) + strategy.submit(_make_single_message()) + strategy.join(2.0) + + assert mock_execute.call_count == 1 + # sleep(0.2) should have been called once (200ms) + mock_sleep.assert_called_once_with(0.2) + + +@patch("snuba.lw_deletions.strategy._num_ongoing_mutations", return_value=1) +@patch("snuba.lw_deletions.strategy._execute_query") +@patch("snuba.lw_deletions.strategy.time.sleep") +@pytest.mark.redis_db +def test_inter_delete_delay_disabled_by_default( + mock_sleep: Mock, mock_execute: Mock, mock_num_mutations: Mock +) -> None: + """ + By default (delay_ms=0), time.sleep should not be called. + """ + commit_step = Mock() + metrics = Mock() + storage = get_writable_storage(StorageKey("search_issues")) + + # Explicitly set to 0 (the default) + state.set_config("lw_delete_inter_mutation_delay_ms", 0) + + strategy = BatchStepCustom( + max_batch_size=8, + max_batch_time=1000, + next_step=FormatQuery(commit_step, storage, SearchIssuesFormatter(), metrics), + increment_by=increment_by, + ) + strategy.submit(_make_single_message()) + strategy.join(2.0) + + assert mock_execute.call_count == 1 + mock_sleep.assert_not_called()