Skip to content

Commit 0cfcb58

Browse files
committed
Standalone client POC
1 parent e83ffbb commit 0cfcb58

File tree

5 files changed

+86
-7
lines changed

5 files changed

+86
-7
lines changed

redis/_parsers/base.py

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -159,22 +159,28 @@ async def read_response(
159159

160160

161161
_INVALIDATION_MESSAGE = [b"invalidate", "invalidate"]
162+
_MOVING_MESSAGE = [b"MOVING", "MOVING"]
162163

163164

164165
class PushNotificationsParser(Protocol):
165166
"""Protocol defining RESP3-specific parsing functionality"""
166167

167168
pubsub_push_handler_func: Callable
168169
invalidation_push_handler_func: Optional[Callable] = None
170+
node_replacement_push_handler_func: Optional[Callable] = None
169171

170172
def handle_pubsub_push_response(self, response):
171173
"""Handle pubsub push responses"""
172174
raise NotImplementedError()
173175

174176
def handle_push_response(self, response, **kwargs):
175-
if response[0] not in _INVALIDATION_MESSAGE:
177+
msg_type = response[0]
178+
if msg_type not in _INVALIDATION_MESSAGE and msg_type not in _MOVING_MESSAGE:
176179
return self.pubsub_push_handler_func(response)
177-
if self.invalidation_push_handler_func:
180+
if msg_type in _MOVING_MESSAGE and self.node_replacement_push_handler_func:
181+
# push notification for enterprise cluster node replacement
182+
return self.node_replacement_push_handler_func(response)
183+
if msg_type in _INVALIDATION_MESSAGE and self.invalidation_push_handler_func:
178184
return self.invalidation_push_handler_func(response)
179185

180186
def set_pubsub_push_handler(self, pubsub_push_handler_func):
@@ -183,22 +189,32 @@ def set_pubsub_push_handler(self, pubsub_push_handler_func):
183189
def set_invalidation_push_handler(self, invalidation_push_handler_func):
184190
self.invalidation_push_handler_func = invalidation_push_handler_func
185191

192+
def set_node_replacement_push_handler_func(
193+
self, node_replacement_push_handler_func
194+
):
195+
self.node_replacement_push_handler_func = node_replacement_push_handler_func
196+
186197

187198
class AsyncPushNotificationsParser(Protocol):
188199
"""Protocol defining async RESP3-specific parsing functionality"""
189200

190201
pubsub_push_handler_func: Callable
191202
invalidation_push_handler_func: Optional[Callable] = None
203+
node_replacement_push_handler_func: Optional[Callable] = None
192204

193205
async def handle_pubsub_push_response(self, response):
194206
"""Handle pubsub push responses asynchronously"""
195207
raise NotImplementedError()
196208

197209
async def handle_push_response(self, response, **kwargs):
198210
"""Handle push responses asynchronously"""
199-
if response[0] not in _INVALIDATION_MESSAGE:
211+
msg_type = response[0]
212+
if msg_type not in _INVALIDATION_MESSAGE and msg_type not in _MOVING_MESSAGE:
200213
return await self.pubsub_push_handler_func(response)
201-
if self.invalidation_push_handler_func:
214+
if msg_type in _MOVING_MESSAGE and self.node_replacement_push_handler_func:
215+
# push notification for enterprise cluster node replacement
216+
return await self.node_replacement_push_handler_func(response)
217+
if msg_type in _INVALIDATION_MESSAGE and self.invalidation_push_handler_func:
202218
return await self.invalidation_push_handler_func(response)
203219

204220
def set_pubsub_push_handler(self, pubsub_push_handler_func):
@@ -209,6 +225,11 @@ def set_invalidation_push_handler(self, invalidation_push_handler_func):
209225
"""Set the invalidation push handler function"""
210226
self.invalidation_push_handler_func = invalidation_push_handler_func
211227

228+
def set_node_replacement_push_handler_func(
229+
self, node_replacement_push_handler_func
230+
):
231+
self.node_replacement_push_handler_func = node_replacement_push_handler_func
232+
212233

213234
class _AsyncRESPBase(AsyncBaseParser):
214235
"""Base class for async resp parsing"""

redis/_parsers/resp3.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from ..exceptions import ConnectionError, InvalidResponse, ResponseError
55
from ..typing import EncodableT
6+
from ..notifications import NodeReplacementNotification
67
from .base import (
78
AsyncPushNotificationsParser,
89
PushNotificationsParser,
@@ -18,13 +19,25 @@ class _RESP3Parser(_RESPBase, PushNotificationsParser):
1819
def __init__(self, socket_read_size):
1920
super().__init__(socket_read_size)
2021
self.pubsub_push_handler_func = self.handle_pubsub_push_response
22+
self.node_replacement_push_handler_func = (
23+
self.handle_node_replacement_push_response
24+
)
2125
self.invalidation_push_handler_func = None
2226

2327
def handle_pubsub_push_response(self, response):
2428
logger = getLogger("push_response")
2529
logger.debug("Push response: " + str(response))
2630
return response
2731

32+
def handle_node_replacement_push_response(self, response):
33+
logger = getLogger("push_response")
34+
logger.debug("Push response: " + str(response))
35+
36+
host, port = response[2].split(":")
37+
ttl = response[1]
38+
pushed_notification = NodeReplacementNotification(host, port, ttl)
39+
return pushed_notification
40+
2841
def read_response(self, disable_decoding=False, push_request=False):
2942
pos = self._buffer.get_pos() if self._buffer else None
3043
try:
@@ -116,13 +129,17 @@ def _read_response(self, disable_decoding=False, push_request=False):
116129
)
117130
for _ in range(int(response))
118131
]
119-
response = self.handle_push_response(response)
132+
push_response = self.handle_push_response(response)
120133
if not push_request:
121-
return self._read_response(
134+
response = self._read_response(
122135
disable_decoding=disable_decoding, push_request=push_request
123136
)
137+
if isinstance(push_response, NodeReplacementNotification):
138+
return push_response
139+
else:
140+
return response
124141
else:
125-
return response
142+
return push_response
126143
else:
127144
raise InvalidResponse(f"Protocol Error: {raw!r}")
128145

redis/client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,7 @@ def __init__(
244244
cache: Optional[CacheInterface] = None,
245245
cache_config: Optional[CacheConfig] = None,
246246
event_dispatcher: Optional[EventDispatcher] = None,
247+
enable_node_replacement_push_notifications: bool = False,
247248
) -> None:
248249
"""
249250
Initialize a new Redis client.
@@ -297,6 +298,7 @@ def __init__(
297298
"redis_connect_func": redis_connect_func,
298299
"credential_provider": credential_provider,
299300
"protocol": protocol,
301+
"enable_node_replacement_push_notifications": enable_node_replacement_push_notifications,
300302
}
301303
# based on input, setup appropriate connection args
302304
if unix_socket_path is not None:

redis/connection.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1374,6 +1374,7 @@ def __init__(
13741374
connection_class=Connection,
13751375
max_connections: Optional[int] = None,
13761376
cache_factory: Optional[CacheFactoryInterface] = None,
1377+
enable_node_replacement_push_notifications: bool = False,
13771378
**connection_kwargs,
13781379
):
13791380
max_connections = max_connections or 2**31
@@ -1383,6 +1384,9 @@ def __init__(
13831384
self.connection_class = connection_class
13841385
self.connection_kwargs = connection_kwargs
13851386
self.max_connections = max_connections
1387+
self.enable_node_replacement_push_notifications = (
1388+
enable_node_replacement_push_notifications
1389+
)
13861390
self.cache = None
13871391
self._cache_factory = cache_factory
13881392

@@ -1408,6 +1412,12 @@ def __init__(
14081412
connection_kwargs.pop("cache", None)
14091413
connection_kwargs.pop("cache_config", None)
14101414

1415+
if self.enable_node_replacement_push_notifications:
1416+
if connection_kwargs.get("protocol") not in [3, "3"]:
1417+
raise RedisError(
1418+
"Push notifications on node replacement are only supported with RESP version 3"
1419+
)
1420+
14111421
self._event_dispatcher = self.connection_kwargs.get("event_dispatcher", None)
14121422
if self._event_dispatcher is None:
14131423
self._event_dispatcher = EventDispatcher()
@@ -1694,13 +1704,15 @@ def __init__(
16941704
timeout=20,
16951705
connection_class=Connection,
16961706
queue_class=LifoQueue,
1707+
enable_node_replacement_push_notifications=False,
16971708
**connection_kwargs,
16981709
):
16991710
self.queue_class = queue_class
17001711
self.timeout = timeout
17011712
super().__init__(
17021713
connection_class=connection_class,
17031714
max_connections=max_connections,
1715+
enable_node_replacement_push_notifications=enable_node_replacement_push_notifications,
17041716
**connection_kwargs,
17051717
)
17061718

redis/notifications.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import time
2+
3+
4+
class NodeReplacementNotification:
5+
"""
6+
Initialize a new NodeReplacementNotification.
7+
8+
Args:
9+
new_node_host (str): Hostname or IP address of the new replacement node
10+
new_node_port (int): Port number of the new replacement node
11+
ttl (int): Time-to-live in seconds for this notification
12+
"""
13+
14+
def __init__(self, new_node_host: str, new_node_port: int, ttl: int):
15+
self.new_node_host = new_node_host
16+
self.new_node_port = new_node_port
17+
self.ttl = ttl
18+
self.creation_time = time.time()
19+
20+
def is_expired(self) -> bool:
21+
"""
22+
Check if this notification has expired based on its TTL.
23+
24+
Returns:
25+
bool: True if the notification has expired, False otherwise
26+
"""
27+
return time.time() > (self.creation_time + self.ttl)

0 commit comments

Comments
 (0)