Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 36 additions & 13 deletions snuba/lw_deletions/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
from snuba import settings
from snuba.attribution import AppID
from snuba.attribution.attribution_info import AttributionInfo
from snuba.cleanup import get_active_partitions
from snuba.clickhouse.errors import ClickhouseError
from snuba.clickhouse.query import Query
from snuba.clusters.cluster import ClickhouseClientSettings
from snuba.datasets.schemas.tables import TableSchema
from snuba.datasets.storage import WritableTableStorage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.lw_deletions.batching import BatchStepCustom, ValuesBatch
Expand Down Expand Up @@ -72,6 +72,7 @@ def __init__(
self.__formatter: Formatter = formatter
self.__metrics = metrics
self.__last_ongoing_mutations_check: Optional[float] = None
self.__redis_client = get_redis_client(RedisClientKey.CONFIG)

def poll(self) -> None:
self.__next_step.poll()
Expand Down Expand Up @@ -151,10 +152,31 @@ def _conditions_hash(self, conditions: Sequence[ConditionsBag]) -> str:
return hashlib.md5("|".join(parts).encode()).hexdigest()[:16]

def _get_partition_dates(self, table: str) -> List[str]:
from snuba.util import decode_part_str

cluster = self.__storage.get_cluster()
database = cluster.get_database()
connection = cluster.get_query_connection(ClickhouseClientSettings.QUERY)
parts = get_active_partitions(connection, self.__storage, database, table)

node = cluster.get_local_nodes()[0]
connection = cluster.get_node_connection(ClickhouseClientSettings.CLEANUP, node)

response = connection.execute(
"""
SELECT DISTINCT partition
FROM system.parts
WHERE database = %(database)s
AND table = %(table)s
AND active = 1
""",
{"database": database, "table": table},
)

schema = self.__storage.get_schema()
assert isinstance(schema, TableSchema)
partition_format = schema.get_partition_format()
assert partition_format is not None
parts = [decode_part_str(part, partition_format) for (part,) in response.results]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicated get_active_partitions logic instead of reusing existing function

Medium Severity

The new inline code in _get_partition_dates (the query, schema lookup, decode_part_str call) is an exact copy of get_active_partitions from snuba/cleanup.py. Since get_active_partitions accepts a ClickhousePool as its first parameter and get_node_connection returns a ClickhousePool, the fix only needed to change the connection passed to the existing function — not duplicate all 12+ lines. Maintaining two identical copies risks divergent bug fixes.

Fix in Cursor Fix in Web


now = datetime.now()
min_date = (now - timedelta(days=365)).date()
max_date = (now + timedelta(days=7)).date()
Expand Down Expand Up @@ -209,15 +231,16 @@ def _execute_delete_by_partition(

cond_hash = self._conditions_hash(conditions)
tracking_key = f"lw_delete_partitions:{self.__storage_name}:{cond_hash}"
redis_client = get_redis_client(RedisClientKey.CONFIG)
ttl = settings.LW_DELETES_PARTITION_TRACKING_TTL

for partition_date in partition_dates:
member = f"{table}:{partition_date}"
if redis_client.sismember(tracking_key, member):
days_delta = (datetime.strptime(partition_date, "%Y-%m-%d") - datetime.now()).days
partition_week = str(days_delta // 7)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Week offset miscalculated due to time-of-day component

Low Severity

datetime.strptime(partition_date, "%Y-%m-%d") produces midnight, while datetime.now() includes the current time. The .days attribute of the resulting timedelta is systematically one less than the actual calendar-day difference (e.g., today's partition yields .days = -1 instead of 0). This shifts all partition_week tags by roughly one day, so a partition from today gets week "-1" rather than the expected "0".

Fix in Cursor Fix in Web

Copy link
Member

@MeredithAnya MeredithAnya Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking this would be the week number in the year like datetime.now().isocalendar().week , not sure if that makes more or less sense than what you have here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's actually what the AI did first, but I think it makes more sense to normalize it the way we do weeks_ago in some existing metrics

if self.__redis_client.sismember(tracking_key, member):
self.__metrics.increment(
"partition_delete_skipped",
tags={"table": table, "partition_date": partition_date},
tags={"table": table, "partition_week": partition_week},
)
logger.info(
"Skipping already-tracked partition %s for table %s",
Expand All @@ -233,25 +256,25 @@ def _execute_delete_by_partition(
)
partition_where = combine_and_conditions([where_clause, partition_condition])
query = construct_query(self.__storage, table, partition_where)
self._execute_single_delete(table, query, query_settings, partition_date=partition_date)
self._execute_single_delete(table, query, query_settings, partition_week=partition_week)
self.__metrics.increment(
"partition_delete_executed",
tags={"table": table, "partition_date": partition_date},
tags={"table": table, "partition_week": partition_week},
)
redis_client.sadd(tracking_key, member)
self.__redis_client.sadd(tracking_key, member)

redis_client.expire(tracking_key, ttl)
self.__redis_client.expire(tracking_key, ttl)

def _execute_single_delete(
self,
table: str,
query: Query,
query_settings: HTTPQuerySettings,
partition_date: Optional[str] = None,
partition_week: Optional[str] = None,
) -> None:
tags = {"table": table}
if partition_date:
tags["partition_date"] = partition_date
if partition_week:
tags["partition_week"] = partition_week
start = time.time()
try:
_execute_query(
Expand Down
29 changes: 18 additions & 11 deletions tests/lw_deletions/test_lw_deletions.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@
from arroyo.types import BrokerValue, Message, Partition, Topic

from snuba import state
from snuba.clusters.cluster import ClickhouseNode
from snuba.datasets.storages.factory import get_writable_storage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.lw_deletions.batching import BatchStepCustom
from snuba.lw_deletions.formatters import SearchIssuesFormatter
from snuba.lw_deletions.strategy import FormatQuery, increment_by
from snuba.lw_deletions.types import ConditionsType
from snuba.redis import RedisClientKey, get_redis_client
from snuba.util import Part
from snuba.utils.streams.topics import Topic as SnubaTopic
from snuba.web.bulk_delete_query import DeleteQueryMessage

Expand Down Expand Up @@ -437,11 +437,8 @@ def test_split_by_partition_fallback(mock_execute: Mock, mock_num_mutations: Moc

@patch("snuba.lw_deletions.strategy._num_ongoing_mutations", return_value=1)
@patch("snuba.lw_deletions.strategy._execute_query")
@patch("snuba.lw_deletions.strategy.get_active_partitions")
@pytest.mark.redis_db
def test_partition_date_filtering(
mock_get_partitions: Mock, mock_execute: Mock, mock_num_mutations: Mock
) -> None:
def test_partition_date_filtering(mock_execute: Mock, mock_num_mutations: Mock) -> None:
"""
When _get_partition_dates encounters partition dates outside the valid window
(last 12 months through 7 days from now), those dates should be
Expand All @@ -456,16 +453,26 @@ def test_partition_date_filtering(
bogus_old = (now - timedelta(days=500)).strftime("%Y-%m-%d")
bogus_future = (now + timedelta(days=30)).strftime("%Y-%m-%d")

mock_get_partitions.return_value = [
Part(name="p1", date=now - timedelta(days=30), retention_days=90),
Part(name="p2", date=now - timedelta(days=60), retention_days=90),
Part(name="p3", date=now - timedelta(days=500), retention_days=90),
Part(name="p4", date=now + timedelta(days=30), retention_days=90),
# Build mock system.parts response: each row is a (partition_string,) tuple
# matching the (retention_days, 'YYYY-MM-DD') format used by search_issues
mock_results = Mock()
mock_results.results = [
(f"(90, '{(now - timedelta(days=30)).strftime('%Y-%m-%d')}')",),
(f"(90, '{(now - timedelta(days=60)).strftime('%Y-%m-%d')}')",),
(f"(90, '{(now - timedelta(days=500)).strftime('%Y-%m-%d')}')",),
(f"(90, '{(now + timedelta(days=30)).strftime('%Y-%m-%d')}')",),
]
mock_connection = Mock()
mock_connection.execute.return_value = mock_results

format_query = FormatQuery(Mock(), storage, SearchIssuesFormatter(), metrics)
cluster = storage.get_cluster()
dummy_node = ClickhouseNode("localhost", 9000)

with patch.object(storage.get_cluster(), "get_query_connection", return_value=Mock()):
with (
patch.object(cluster, "get_local_nodes", return_value=[dummy_node]),
patch.object(cluster, "get_node_connection", return_value=mock_connection),
):
result = format_query._get_partition_dates("search_issues_local_v2")

assert result == sorted([valid_date_1, valid_date_2])
Expand Down
Loading