Skip to content

Commit 91be4a0

Browse files
Multi exec on cluster (#3611)
* feat(cluster): support for transactions on cluster-aware client Adds support for transactions based on multi/watch/exec on clusters. Transactions in this mode are limited to a single hash slot. Contributed-by: Scopely <[email protected]> * fix: remove deprecated argument * remove attributions from code * Refactor ClusterPipeline to use execution strategies * Refactored strategy to use composition * Added test cases * Sync with master * Filter tests, ensure that tests are working after refactor * Added test case * Revert port changes * Improved exception handling * Change visibility of variable to public * Changed variable ref * Changed ref type * Added documentation * Refactored retries, fixed comments, fixed linters * Added word to a wordlist * Revert port changes * Added quotes * Fixed docs * Updated CONNECTION_ERRORS * Codestyle fixes * Updated docs * Revert import --------- Co-authored-by: vladvildanov <[email protected]> Co-authored-by: Vladyslav Vildanov <[email protected]>
1 parent 02b2ab6 commit 91be4a0

10 files changed

+1601
-357
lines changed

.github/wordlist.txt

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ APM
22
ARGV
33
BFCommands
44
CacheImpl
5+
CAS
56
CFCommands
67
CMSCommands
78
ClusterNode

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ vagrant/.vagrant
99
.cache
1010
.eggs
1111
.idea
12+
.vscode
1213
.coverage
1314
env
1415
venv

CHANGES

+1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Support transactions in ClusterPipeline
12
* Removing support for RedisGraph module. RedisGraph support is deprecated since Redis Stack 7.2 (https://redis.com/blog/redisgraph-eol/)
23
* Fix lock.extend() typedef to accept float TTL extension
34
* Update URL in the readme linking to Redis University

docs/advanced_features.rst

+105-14
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ the server.
167167

168168
.. code:: python
169169
170+
>>> rc = RedisCluster()
170171
>>> with rc.pipeline() as pipe:
171172
... pipe.set('foo', 'value1')
172173
... pipe.set('bar', 'value2')
@@ -177,20 +178,110 @@ the server.
177178
... pipe.set('foo1', 'bar1').get('foo1').execute()
178179
[True, b'bar1']
179180
180-
Please note: - RedisCluster pipelines currently only support key-based
181-
commands. - The pipeline gets its ‘read_from_replicas’ value from the
182-
cluster’s parameter. Thus, if read from replications is enabled in the
183-
cluster instance, the pipeline will also direct read commands to
184-
replicas. - The ‘transaction’ option is NOT supported in cluster-mode.
185-
In non-cluster mode, the ‘transaction’ option is available when
186-
executing pipelines. This wraps the pipeline commands with MULTI/EXEC
187-
commands, and effectively turns the pipeline commands into a single
188-
transaction block. This means that all commands are executed
189-
sequentially without any interruptions from other clients. However, in
190-
cluster-mode this is not possible, because commands are partitioned
191-
according to their respective destination nodes. This means that we can
192-
not turn the pipeline commands into one transaction block, because in
193-
most cases they are split up into several smaller pipelines.
181+
Please note:
182+
183+
- RedisCluster pipelines currently only support key-based commands.
184+
- The pipeline gets its ‘load_balancing_strategy’ value from the
185+
cluster’s parameter. Thus, if read from replications is enabled in
186+
the cluster instance, the pipeline will also direct read commands to
187+
replicas.
188+
189+
190+
Transactions in clusters
191+
~~~~~~~~~~~~~~~~~~~~~~~~
192+
193+
Transactions are supported in cluster-mode with one caveat: all keys of
194+
all commands issued on a transaction pipeline must reside on the
195+
same slot. This is similar to the limitation of multikey commands in
196+
cluster. The reason behind this is that the Redis engine does not offer
197+
a mechanism to block or exchange key data across nodes on the fly. A
198+
client may add some logic to abstract engine limitations when running
199+
on a cluster, such as the pipeline behavior explained on the previous
200+
block, but there is no simple way that a client can enforce atomicity
201+
across nodes on a distributed system.
202+
203+
The compromise of limiting the transaction pipeline to same-slot keys
204+
is exactly that: a compromise. While this behavior is different from
205+
non-transactional cluster pipelines, it simplifies migration of clients
206+
from standalone to cluster under some circumstances. Note that application
207+
code that issues multi/exec commands on a standalone client without
208+
embedding them within a pipeline would eventually get ‘AttributeError’.
209+
With this approach, if the application uses ‘client.pipeline(transaction=True)’,
210+
then switching the client with a cluster-aware instance would simplify
211+
code changes (to some extent). This may be true for application code that
212+
makes use of hash keys, since its transactions may already be
213+
mapping all commands to the same slot.
214+
215+
An alternative is some kind of two-step commit solution, where a slot
216+
validation is run before the actual commands are run. This could work
217+
with controlled node maintenance but does not cover single node failures.
218+
219+
Given the cluster limitations for transactions, by default pipeline isn't in
220+
transactional mode. To enable transactional context set:
221+
222+
.. code:: python
223+
224+
>>> p = rc.pipeline(transaction=True)
225+
226+
After entering the transactional context you can add commands to a transactional
227+
context, by one of the following ways:
228+
229+
.. code:: python
230+
231+
>>> p = rc.pipeline(transaction=True) # Chaining commands
232+
>>> p.set("key", "value")
233+
>>> p.get("key")
234+
>>> response = p.execute()
235+
236+
Or
237+
238+
.. code:: python
239+
240+
>>> with rc.pipeline(transaction=True) as pipe: # Using context manager
241+
... pipe.set("key", "value")
242+
... pipe.get("key")
243+
... response = pipe.execute()
244+
245+
As you see there's no need to explicitly send `MULTI/EXEC` commands to control context start/end
246+
`ClusterPipeline` will take care of it.
247+
248+
To ensure that different keys will be mapped to a same hash slot on the server side
249+
prepend your keys with the same hash tag, the technique that allows you to control
250+
keys distribution.
251+
More information `here <https://redis.io/docs/latest/operate/oss_and_stack/reference/cluster-spec/#hash-tags>`_
252+
253+
.. code:: python
254+
255+
>>> with rc.pipeline(transaction=True) as pipe:
256+
... pipe.set("{tag}foo", "bar")
257+
... pipe.set("{tag}bar", "foo")
258+
... pipe.get("{tag}foo")
259+
... pipe.get("{tag}bar")
260+
... response = pipe.execute()
261+
262+
CAS Transactions
263+
~~~~~~~~~~~~~~~~~~~~~~~~
264+
265+
If you want to apply optimistic locking for certain keys, you have to execute
266+
`WATCH` command in transactional context. `WATCH` command follows the same limitations
267+
as any other multi key command - all keys should be mapped to the same hash slot.
268+
269+
However, the difference between CAS transaction and normal one is that you have to
270+
explicitly call MULTI command to indicate the start of transactional context, WATCH
271+
command itself and any subsequent commands before MULTI will be immediately executed
272+
on the server side so you can apply optimistic locking and get necessary data before
273+
transaction execution.
274+
275+
.. code:: python
276+
277+
>>> with rc.pipeline(transaction=True) as pipe:
278+
... pipe.watch("mykey") # Apply locking by immediately executing command
279+
... val = pipe.get("mykey") # Immediately retrieves value
280+
... val = val + 1 # Increment value
281+
... pipe.multi() # Starting transaction context
282+
... pipe.set("mykey", val) # Command will be pipelined
283+
... response = pipe.execute() # Returns OK or None if key was modified in the meantime
284+
194285
195286
Publish / Subscribe
196287
-------------------

redis/__init__.py

+6
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@
1616
BusyLoadingError,
1717
ChildDeadlockedError,
1818
ConnectionError,
19+
CrossSlotTransactionError,
1920
DataError,
21+
InvalidPipelineStack,
2022
InvalidResponse,
2123
OutOfMemoryError,
2224
PubSubError,
2325
ReadOnlyError,
26+
RedisClusterException,
2427
RedisError,
2528
ResponseError,
2629
TimeoutError,
@@ -56,15 +59,18 @@ def int_or_str(value):
5659
"ConnectionError",
5760
"ConnectionPool",
5861
"CredentialProvider",
62+
"CrossSlotTransactionError",
5963
"DataError",
6064
"from_url",
6165
"default_backoff",
66+
"InvalidPipelineStack",
6267
"InvalidResponse",
6368
"OutOfMemoryError",
6469
"PubSubError",
6570
"ReadOnlyError",
6671
"Redis",
6772
"RedisCluster",
73+
"RedisClusterException",
6874
"RedisError",
6975
"ResponseError",
7076
"Sentinel",

redis/client.py

+12-3
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
from redis.commands.core import Script
3535
from redis.connection import (
3636
AbstractConnection,
37+
Connection,
3738
ConnectionPool,
3839
SSLConnection,
3940
UnixDomainSocketConnection,
@@ -1297,9 +1298,15 @@ class Pipeline(Redis):
12971298

12981299
UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
12991300

1300-
def __init__(self, connection_pool, response_callbacks, transaction, shard_hint):
1301+
def __init__(
1302+
self,
1303+
connection_pool: ConnectionPool,
1304+
response_callbacks,
1305+
transaction,
1306+
shard_hint,
1307+
):
13011308
self.connection_pool = connection_pool
1302-
self.connection = None
1309+
self.connection: Optional[Connection] = None
13031310
self.response_callbacks = response_callbacks
13041311
self.transaction = transaction
13051312
self.shard_hint = shard_hint
@@ -1434,7 +1441,9 @@ def pipeline_execute_command(self, *args, **options) -> "Pipeline":
14341441
self.command_stack.append((args, options))
14351442
return self
14361443

1437-
def _execute_transaction(self, connection, commands, raise_on_error) -> List:
1444+
def _execute_transaction(
1445+
self, connection: Connection, commands, raise_on_error
1446+
) -> List:
14381447
cmds = chain([(("MULTI",), {})], commands, [(("EXEC",), {})])
14391448
all_cmds = connection.pack_commands(
14401449
[args for args, options in cmds if EMPTY_RESPONSE not in options]

0 commit comments

Comments
 (0)