Skip to content

Commit 1fc4ac1

Browse files
HyeockJinKimclaude
andauthored
feat(BA-3002): Replace time() with Redis TIME command in ValkeyScheduleClient (#6695)
Co-authored-by: Claude <[email protected]>
1 parent 8dc0e74 commit 1fc4ac1

File tree

3 files changed

+39
-26
lines changed

3 files changed

+39
-26
lines changed

changes/6695.feature.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Replace time() with Redis TIME command in ValkeyScheduleClient

src/ai/backend/common/clients/valkey_client/valkey_schedule/client.py

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from collections.abc import Mapping, Sequence
22
from dataclasses import dataclass
3-
from time import time
43
from typing import Optional, Self, cast
54
from uuid import UUID
65

@@ -130,9 +129,20 @@ def _get_route_health_key(self, route_id: str) -> str:
130129
"""
131130
return f"route:health:{route_id}"
132131

133-
def _is_health_status_valid(self, status: str, timestamp_str: str) -> bool:
132+
async def _get_redis_time(self) -> int:
134133
"""
135-
Check if health status is healthy and timestamp is not stale.
134+
Get current Unix timestamp from Redis server using TIME command.
135+
This ensures consistent timestamps across distributed systems.
136+
137+
:return: Current Unix timestamp in seconds
138+
"""
139+
result = await self._client.client.time()
140+
seconds_bytes, _ = result
141+
return int(seconds_bytes)
142+
143+
async def _validate_health_status(self, status: str, timestamp_str: str) -> bool:
144+
"""
145+
Validate health status by checking if it's healthy and timestamp is not stale.
136146
137147
:param status: The status string ("1" for healthy, "0" for unhealthy)
138148
:param timestamp_str: The timestamp string value from Redis
@@ -142,7 +152,7 @@ def _is_health_status_valid(self, status: str, timestamp_str: str) -> bool:
142152
return False
143153
try:
144154
timestamp = int(timestamp_str)
145-
current_time = int(time())
155+
current_time = await self._get_redis_time()
146156
return (current_time - timestamp) <= MAX_HEALTH_STALENESS_SEC
147157
except (ValueError, TypeError):
148158
return False
@@ -323,10 +333,10 @@ async def get_route_health_status(self, route_id: str) -> Optional[HealthStatus]
323333
data = {k.decode(): v.decode() for k, v in result.items()}
324334

325335
# Parse boolean values using validation helper (checks both status and staleness)
326-
readiness = self._is_health_status_valid(
336+
readiness = await self._validate_health_status(
327337
data.get("readiness", "0"), data.get("last_readiness", "0")
328338
)
329-
liveness = self._is_health_status_valid(
339+
liveness = await self._validate_health_status(
330340
data.get("liveness", "0"), data.get("last_liveness", "0")
331341
)
332342
last_check = int(data["last_check"]) if "last_check" in data else 0
@@ -345,7 +355,7 @@ async def initialize_routes_health_status_batch(self, route_ids: list[str]) -> N
345355
if not route_ids:
346356
return
347357

348-
current_time = str(int(time()))
358+
current_time = str(await self._get_redis_time())
349359
batch = Batch(is_atomic=False)
350360

351361
for route_id in route_ids:
@@ -373,7 +383,7 @@ async def update_route_readiness(self, route_id: str, readiness: bool) -> None:
373383
key = self._get_route_health_key(route_id)
374384
data: Mapping[str | bytes, str | bytes] = {
375385
"readiness": "1" if readiness else "0",
376-
"last_readiness": str(int(time())),
386+
"last_readiness": str(await self._get_redis_time()),
377387
}
378388

379389
batch = Batch(is_atomic=False)
@@ -393,7 +403,7 @@ async def update_route_liveness(self, route_id: str, liveness: bool) -> None:
393403
key = self._get_route_health_key(route_id)
394404
data: Mapping[str | bytes, str | bytes] = {
395405
"liveness": "1" if liveness else "0",
396-
"last_liveness": str(int(time())),
406+
"last_liveness": str(await self._get_redis_time()),
397407
}
398408

399409
batch = Batch(is_atomic=False)
@@ -415,7 +425,7 @@ async def check_route_health_status(
415425
if not route_ids:
416426
return {}
417427

418-
current_time = str(int(time()))
428+
current_time = str(await self._get_redis_time())
419429
batch = Batch(is_atomic=False)
420430

421431
# Single batch: update last_check, refresh TTL, and get all data
@@ -447,13 +457,15 @@ async def check_route_health_status(
447457
# Parse existing data
448458
data = {k.decode(): v.decode() for k, v in result.items()}
449459
# Parse boolean values using validation helper (checks both status and staleness)
460+
readiness = await self._validate_health_status(
461+
data.get("readiness", "0"), data.get("last_readiness", "0")
462+
)
463+
liveness = await self._validate_health_status(
464+
data.get("liveness", "0"), data.get("last_liveness", "0")
465+
)
450466
health_statuses[route_id] = HealthStatus(
451-
readiness=self._is_health_status_valid(
452-
data.get("readiness", "0"), data.get("last_readiness", "0")
453-
),
454-
liveness=self._is_health_status_valid(
455-
data.get("liveness", "0"), data.get("last_liveness", "0")
456-
),
467+
readiness=readiness,
468+
liveness=liveness,
457469
last_check=int(data["last_check"]) if "last_check" in data else 0,
458470
)
459471

@@ -470,7 +482,7 @@ async def update_routes_readiness_batch(self, route_readiness: Mapping[str, bool
470482
if not route_readiness:
471483
return
472484

473-
current_time = str(int(time()))
485+
current_time = str(await self._get_redis_time())
474486
batch = Batch(is_atomic=False)
475487
for route_id, readiness in route_readiness.items():
476488
key = self._get_route_health_key(route_id)

tests/common/clients/valkey_client/test_valkey_schedule_client.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -61,36 +61,36 @@ async def _set_stale_health_data(self, client: ValkeyScheduleClient, route_id: s
6161
)
6262

6363
@pytest.mark.asyncio
64-
async def test_is_health_status_valid_with_valid_timestamp(
64+
async def test_validate_health_status_with_valid_timestamp(
6565
self, valkey_schedule_client: ValkeyScheduleClient
6666
) -> None:
6767
"""Test validation with healthy status and fresh timestamp"""
6868
fresh_timestamp = str(int(time()))
69-
assert valkey_schedule_client._is_health_status_valid("1", fresh_timestamp) is True
69+
assert await valkey_schedule_client._validate_health_status("1", fresh_timestamp) is True
7070

7171
@pytest.mark.asyncio
72-
async def test_is_health_status_valid_with_stale_timestamp(
72+
async def test_validate_health_status_with_stale_timestamp(
7373
self, valkey_schedule_client: ValkeyScheduleClient
7474
) -> None:
7575
"""Test validation with healthy status but stale timestamp"""
7676
stale_timestamp = str(int(time()) - MAX_HEALTH_STALENESS_SEC - 10)
77-
assert valkey_schedule_client._is_health_status_valid("1", stale_timestamp) is False
77+
assert await valkey_schedule_client._validate_health_status("1", stale_timestamp) is False
7878

7979
@pytest.mark.asyncio
80-
async def test_is_health_status_valid_with_unhealthy_status(
80+
async def test_validate_health_status_with_unhealthy_status(
8181
self, valkey_schedule_client: ValkeyScheduleClient
8282
) -> None:
8383
"""Test validation with unhealthy status regardless of timestamp"""
8484
fresh_timestamp = str(int(time()))
85-
assert valkey_schedule_client._is_health_status_valid("0", fresh_timestamp) is False
85+
assert await valkey_schedule_client._validate_health_status("0", fresh_timestamp) is False
8686

8787
@pytest.mark.asyncio
88-
async def test_is_health_status_valid_with_invalid_timestamp(
88+
async def test_validate_health_status_with_invalid_timestamp(
8989
self, valkey_schedule_client: ValkeyScheduleClient
9090
) -> None:
9191
"""Test validation with invalid timestamp formats"""
92-
assert valkey_schedule_client._is_health_status_valid("1", "invalid") is False
93-
assert valkey_schedule_client._is_health_status_valid("1", "") is False
92+
assert await valkey_schedule_client._validate_health_status("1", "invalid") is False
93+
assert await valkey_schedule_client._validate_health_status("1", "") is False
9494

9595
@pytest.mark.asyncio
9696
async def test_initialize_routes_health_status_batch(

0 commit comments

Comments
 (0)