Skip to content

Multi exec on cluster #3611

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
f9cc550
feat(cluster): support for transactions on cluster-aware client
robertosantamaria-scopely Aug 8, 2023
9244b67
fix: remove deprecated argument
robertosantamaria-scopely Apr 25, 2025
ea3fc59
remove attributions from code
robertosantamaria-scopely Apr 25, 2025
a68359b
Refactor ClusterPipeline to use execution strategies
vladvildanov Apr 30, 2025
17e2118
Refactored strategy to use composition
vladvildanov May 2, 2025
0087b4f
Added test cases
vladvildanov May 2, 2025
a851942
Merge branch 'master' of github.com:redis/redis-py into multi-exec-on…
vladvildanov May 2, 2025
137c931
Sync with master
vladvildanov May 2, 2025
c081bd3
Filter tests, ensure that tests are working after refactor
vladvildanov May 6, 2025
08812e3
Added test case
vladvildanov May 6, 2025
d673c29
Merge branch 'master' into multi-exec-on-cluster
vladvildanov May 6, 2025
c14f540
Revert port changes
vladvildanov May 6, 2025
0b4716b
Merge branch 'multi-exec-on-cluster' of github.com:robertosantamaria-…
vladvildanov May 6, 2025
00c862e
Improved exception handling
vladvildanov May 6, 2025
e03f9e1
Change visibility of variable to public
vladvildanov May 6, 2025
ea3826f
Changed variable ref
vladvildanov May 6, 2025
35da8ab
Changed ref type
vladvildanov May 6, 2025
2343beb
Added documentation
vladvildanov May 7, 2025
1718614
Merge branch 'master' into multi-exec-on-cluster
vladvildanov May 7, 2025
d0d0b62
Refactored retries, fixed comments, fixed linters
vladvildanov May 8, 2025
f5bffde
Merge branch 'multi-exec-on-cluster' of github.com:robertosantamaria-…
vladvildanov May 8, 2025
4e35527
Merge branch 'master' into multi-exec-on-cluster
vladvildanov May 8, 2025
a1cbc2b
Added word to a wordlist
vladvildanov May 8, 2025
90210e8
Merge branch 'multi-exec-on-cluster' of github.com:robertosantamaria-…
vladvildanov May 8, 2025
90cd170
Revert port changes
vladvildanov May 8, 2025
13f6116
Added quotes
vladvildanov May 8, 2025
073531b
Fixed docs
vladvildanov May 8, 2025
a3f6366
Updated CONNECTION_ERRORS
vladvildanov May 8, 2025
6d1b150
Codestyle fixes
vladvildanov May 8, 2025
de5ca35
Updated docs
vladvildanov May 12, 2025
096ac29
Merge branch 'master' into multi-exec-on-cluster
vladvildanov May 12, 2025
a3de190
Revert import
vladvildanov May 12, 2025
8c7757b
Merge branch 'multi-exec-on-cluster' of github.com:robertosantamaria-…
vladvildanov May 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ APM
ARGV
BFCommands
CacheImpl
CAS
CFCommands
CMSCommands
ClusterNode
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ vagrant/.vagrant
.cache
.eggs
.idea
.vscode
.coverage
env
venv
Expand Down
1 change: 1 addition & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Support transactions in ClusterPipeline
* Removing support for RedisGraph module. RedisGraph support is deprecated since Redis Stack 7.2 (https://redis.com/blog/redisgraph-eol/)
* Fix lock.extend() typedef to accept float TTL extension
* Update URL in the readme linking to Redis University
Expand Down
119 changes: 105 additions & 14 deletions docs/advanced_features.rst
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ the server.

.. code:: python

>>> rc = RedisCluster()
>>> with rc.pipeline() as pipe:
... pipe.set('foo', 'value1')
... pipe.set('bar', 'value2')
Expand All @@ -177,20 +178,110 @@ the server.
... pipe.set('foo1', 'bar1').get('foo1').execute()
[True, b'bar1']

Please note: - RedisCluster pipelines currently only support key-based
commands. - The pipeline gets its ‘read_from_replicas’ value from the
cluster’s parameter. Thus, if read from replications is enabled in the
cluster instance, the pipeline will also direct read commands to
replicas. - The ‘transaction’ option is NOT supported in cluster-mode.
In non-cluster mode, the ‘transaction’ option is available when
executing pipelines. This wraps the pipeline commands with MULTI/EXEC
commands, and effectively turns the pipeline commands into a single
transaction block. This means that all commands are executed
sequentially without any interruptions from other clients. However, in
cluster-mode this is not possible, because commands are partitioned
according to their respective destination nodes. This means that we can
not turn the pipeline commands into one transaction block, because in
most cases they are split up into several smaller pipelines.
Please note:

- RedisCluster pipelines currently only support key-based commands.
- The pipeline gets its ‘load_balancing_strategy’ value from the
cluster’s parameter. Thus, if read from replications is enabled in
the cluster instance, the pipeline will also direct read commands to
replicas.


Transactions in clusters
~~~~~~~~~~~~~~~~~~~~~~~~

Transactions are supported in cluster-mode with one caveat: all keys of
all commands issued on a transaction pipeline must reside on the
same slot. This is similar to the limitation of multikey commands in
cluster. The reason behind this is that the Redis engine does not offer
a mechanism to block or exchange key data across nodes on the fly. A
client may add some logic to abstract engine limitations when running
on a cluster, such as the pipeline behavior explained on the previous
block, but there is no simple way that a client can enforce atomicity
across nodes on a distributed system.

The compromise of limiting the transaction pipeline to same-slot keys
is exactly that: a compromise. While this behavior is different from
non-transactional cluster pipelines, it simplifies migration of clients
from standalone to cluster under some circumstances. Note that application
code that issues multi/exec commands on a standalone client without
embedding them within a pipeline would eventually get ‘AttributeError’.
With this approach, if the application uses ‘client.pipeline(transaction=True)’,
then switching the client with a cluster-aware instance would simplify
code changes (to some extent). This may be true for application code that
makes use of hash keys, since its transactions may already be
mapping all commands to the same slot.

An alternative is some kind of two-step commit solution, where a slot
validation is run before the actual commands are run. This could work
with controlled node maintenance but does not cover single node failures.

Given the cluster limitations for transactions, by default pipeline isn't in
transactional mode. To enable transactional context set:

.. code:: python

>>> p = rc.pipeline(transaction=True)

After entering the transactional context you can add commands to a transactional
context, by one of the following ways:

.. code:: python

>>> p = rc.pipeline(transaction=True) # Chaining commands
>>> p.set("key", "value")
>>> p.get("key")
>>> response = p.execute()

Or

.. code:: python

>>> with rc.pipeline(transaction=True) as pipe: # Using context manager
... pipe.set("key", "value")
... pipe.get("key")
... response = pipe.execute()

As you see there's no need to explicitly send `MULTI/EXEC` commands to control context start/end
`ClusterPipeline` will take care of it.

To ensure that different keys will be mapped to a same hash slot on the server side
prepend your keys with the same hash tag, the technique that allows you to control
keys distribution.
More information `here <https://redis.io/docs/latest/operate/oss_and_stack/reference/cluster-spec/#hash-tags>`_

.. code:: python

>>> with rc.pipeline(transaction=True) as pipe:
... pipe.set("{tag}foo", "bar")
... pipe.set("{tag}bar", "foo")
... pipe.get("{tag}foo")
... pipe.get("{tag}bar")
... response = pipe.execute()

CAS Transactions
~~~~~~~~~~~~~~~~~~~~~~~~

If you want to apply optimistic locking for certain keys, you have to execute
`WATCH` command in transactional context. `WATCH` command follows the same limitations
as any other multi key command - all keys should be mapped to the same hash slot.

However, the difference between CAS transaction and normal one is that you have to
explicitly call MULTI command to indicate the start of transactional context, WATCH
command itself and any subsequent commands before MULTI will be immediately executed
on the server side so you can apply optimistic locking and get necessary data before
transaction execution.

.. code:: python

>>> with rc.pipeline(transaction=True) as pipe:
... pipe.watch("mykey") # Apply locking by immediately executing command
... val = pipe.get("mykey") # Immediately retrieves value
... val = val + 1 # Increment value
... pipe.multi() # Starting transaction context
... pipe.set("mykey", val) # Command will be pipelined
... response = pipe.execute() # Returns OK or None if key was modified in the meantime


Publish / Subscribe
-------------------
Expand Down
6 changes: 6 additions & 0 deletions redis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
BusyLoadingError,
ChildDeadlockedError,
ConnectionError,
CrossSlotTransactionError,
DataError,
InvalidPipelineStack,
InvalidResponse,
OutOfMemoryError,
PubSubError,
ReadOnlyError,
RedisClusterException,
RedisError,
ResponseError,
TimeoutError,
Expand Down Expand Up @@ -56,15 +59,18 @@ def int_or_str(value):
"ConnectionError",
"ConnectionPool",
"CredentialProvider",
"CrossSlotTransactionError",
"DataError",
"from_url",
"default_backoff",
"InvalidPipelineStack",
"InvalidResponse",
"OutOfMemoryError",
"PubSubError",
"ReadOnlyError",
"Redis",
"RedisCluster",
"RedisClusterException",
"RedisError",
"ResponseError",
"Sentinel",
Expand Down
15 changes: 12 additions & 3 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from redis.commands.core import Script
from redis.connection import (
AbstractConnection,
Connection,
ConnectionPool,
SSLConnection,
UnixDomainSocketConnection,
Expand Down Expand Up @@ -1297,9 +1298,15 @@ class Pipeline(Redis):

UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}

def __init__(self, connection_pool, response_callbacks, transaction, shard_hint):
def __init__(
self,
connection_pool: ConnectionPool,
response_callbacks,
transaction,
shard_hint,
):
self.connection_pool = connection_pool
self.connection = None
self.connection: Optional[Connection] = None
self.response_callbacks = response_callbacks
self.transaction = transaction
self.shard_hint = shard_hint
Expand Down Expand Up @@ -1434,7 +1441,9 @@ def pipeline_execute_command(self, *args, **options) -> "Pipeline":
self.command_stack.append((args, options))
return self

def _execute_transaction(self, connection, commands, raise_on_error) -> List:
def _execute_transaction(
self, connection: Connection, commands, raise_on_error
) -> List:
cmds = chain([(("MULTI",), {})], commands, [(("EXEC",), {})])
all_cmds = connection.pack_commands(
[args for args, options in cmds if EMPTY_RESPONSE not in options]
Expand Down
Loading
Loading