diff --git a/snuba/lw_deletions/strategy.py b/snuba/lw_deletions/strategy.py index 38ab1dc189..322d004708 100644 --- a/snuba/lw_deletions/strategy.py +++ b/snuba/lw_deletions/strategy.py @@ -19,10 +19,10 @@ from snuba import settings from snuba.attribution import AppID from snuba.attribution.attribution_info import AttributionInfo -from snuba.cleanup import get_active_partitions from snuba.clickhouse.errors import ClickhouseError from snuba.clickhouse.query import Query from snuba.clusters.cluster import ClickhouseClientSettings +from snuba.datasets.schemas.tables import TableSchema from snuba.datasets.storage import WritableTableStorage from snuba.datasets.storages.storage_key import StorageKey from snuba.lw_deletions.batching import BatchStepCustom, ValuesBatch @@ -72,6 +72,7 @@ def __init__( self.__formatter: Formatter = formatter self.__metrics = metrics self.__last_ongoing_mutations_check: Optional[float] = None + self.__redis_client = get_redis_client(RedisClientKey.CONFIG) def poll(self) -> None: self.__next_step.poll() @@ -151,10 +152,31 @@ def _conditions_hash(self, conditions: Sequence[ConditionsBag]) -> str: return hashlib.md5("|".join(parts).encode()).hexdigest()[:16] def _get_partition_dates(self, table: str) -> List[str]: + from snuba.util import decode_part_str + cluster = self.__storage.get_cluster() database = cluster.get_database() - connection = cluster.get_query_connection(ClickhouseClientSettings.QUERY) - parts = get_active_partitions(connection, self.__storage, database, table) + + node = cluster.get_local_nodes()[0] + connection = cluster.get_node_connection(ClickhouseClientSettings.CLEANUP, node) + + response = connection.execute( + """ + SELECT DISTINCT partition + FROM system.parts + WHERE database = %(database)s + AND table = %(table)s + AND active = 1 + """, + {"database": database, "table": table}, + ) + + schema = self.__storage.get_schema() + assert isinstance(schema, TableSchema) + partition_format = schema.get_partition_format() + assert partition_format is not None + parts = [decode_part_str(part, partition_format) for (part,) in response.results] + now = datetime.now() min_date = (now - timedelta(days=365)).date() max_date = (now + timedelta(days=7)).date() @@ -209,15 +231,16 @@ def _execute_delete_by_partition( cond_hash = self._conditions_hash(conditions) tracking_key = f"lw_delete_partitions:{self.__storage_name}:{cond_hash}" - redis_client = get_redis_client(RedisClientKey.CONFIG) ttl = settings.LW_DELETES_PARTITION_TRACKING_TTL for partition_date in partition_dates: member = f"{table}:{partition_date}" - if redis_client.sismember(tracking_key, member): + days_delta = (datetime.strptime(partition_date, "%Y-%m-%d") - datetime.now()).days + partition_week = str(days_delta // 7) + if self.__redis_client.sismember(tracking_key, member): self.__metrics.increment( "partition_delete_skipped", - tags={"table": table, "partition_date": partition_date}, + tags={"table": table, "partition_week": partition_week}, ) logger.info( "Skipping already-tracked partition %s for table %s", @@ -233,25 +256,25 @@ def _execute_delete_by_partition( ) partition_where = combine_and_conditions([where_clause, partition_condition]) query = construct_query(self.__storage, table, partition_where) - self._execute_single_delete(table, query, query_settings, partition_date=partition_date) + self._execute_single_delete(table, query, query_settings, partition_week=partition_week) self.__metrics.increment( "partition_delete_executed", - tags={"table": table, "partition_date": partition_date}, + tags={"table": table, "partition_week": partition_week}, ) - redis_client.sadd(tracking_key, member) + self.__redis_client.sadd(tracking_key, member) - redis_client.expire(tracking_key, ttl) + self.__redis_client.expire(tracking_key, ttl) def _execute_single_delete( self, table: str, query: Query, query_settings: HTTPQuerySettings, - partition_date: Optional[str] = None, + partition_week: Optional[str] = None, ) -> None: tags = {"table": table} - if partition_date: - tags["partition_date"] = partition_date + if partition_week: + tags["partition_week"] = partition_week start = time.time() try: _execute_query( diff --git a/tests/lw_deletions/test_lw_deletions.py b/tests/lw_deletions/test_lw_deletions.py index 3cb6cec0ef..c2cba43ccf 100644 --- a/tests/lw_deletions/test_lw_deletions.py +++ b/tests/lw_deletions/test_lw_deletions.py @@ -10,6 +10,7 @@ from arroyo.types import BrokerValue, Message, Partition, Topic from snuba import state +from snuba.clusters.cluster import ClickhouseNode from snuba.datasets.storages.factory import get_writable_storage from snuba.datasets.storages.storage_key import StorageKey from snuba.lw_deletions.batching import BatchStepCustom @@ -17,7 +18,6 @@ from snuba.lw_deletions.strategy import FormatQuery, increment_by from snuba.lw_deletions.types import ConditionsType from snuba.redis import RedisClientKey, get_redis_client -from snuba.util import Part from snuba.utils.streams.topics import Topic as SnubaTopic from snuba.web.bulk_delete_query import DeleteQueryMessage @@ -437,11 +437,8 @@ def test_split_by_partition_fallback(mock_execute: Mock, mock_num_mutations: Moc @patch("snuba.lw_deletions.strategy._num_ongoing_mutations", return_value=1) @patch("snuba.lw_deletions.strategy._execute_query") -@patch("snuba.lw_deletions.strategy.get_active_partitions") @pytest.mark.redis_db -def test_partition_date_filtering( - mock_get_partitions: Mock, mock_execute: Mock, mock_num_mutations: Mock -) -> None: +def test_partition_date_filtering(mock_execute: Mock, mock_num_mutations: Mock) -> None: """ When _get_partition_dates encounters partition dates outside the valid window (last 12 months through 7 days from now), those dates should be @@ -456,16 +453,26 @@ def test_partition_date_filtering( bogus_old = (now - timedelta(days=500)).strftime("%Y-%m-%d") bogus_future = (now + timedelta(days=30)).strftime("%Y-%m-%d") - mock_get_partitions.return_value = [ - Part(name="p1", date=now - timedelta(days=30), retention_days=90), - Part(name="p2", date=now - timedelta(days=60), retention_days=90), - Part(name="p3", date=now - timedelta(days=500), retention_days=90), - Part(name="p4", date=now + timedelta(days=30), retention_days=90), + # Build mock system.parts response: each row is a (partition_string,) tuple + # matching the (retention_days, 'YYYY-MM-DD') format used by search_issues + mock_results = Mock() + mock_results.results = [ + (f"(90, '{(now - timedelta(days=30)).strftime('%Y-%m-%d')}')",), + (f"(90, '{(now - timedelta(days=60)).strftime('%Y-%m-%d')}')",), + (f"(90, '{(now - timedelta(days=500)).strftime('%Y-%m-%d')}')",), + (f"(90, '{(now + timedelta(days=30)).strftime('%Y-%m-%d')}')",), ] + mock_connection = Mock() + mock_connection.execute.return_value = mock_results format_query = FormatQuery(Mock(), storage, SearchIssuesFormatter(), metrics) + cluster = storage.get_cluster() + dummy_node = ClickhouseNode("localhost", 9000) - with patch.object(storage.get_cluster(), "get_query_connection", return_value=Mock()): + with ( + patch.object(cluster, "get_local_nodes", return_value=[dummy_node]), + patch.object(cluster, "get_node_connection", return_value=mock_connection), + ): result = format_query._get_partition_dates("search_issues_local_v2") assert result == sorted([valid_date_1, valid_date_2])