Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions pymongo/asynchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2779,7 +2779,7 @@ def __init__(
self._last_error: Optional[Exception] = None
self._retrying = False
self._always_retryable = False
self._multiple_retries = _csot.get_timeout() is not None
self._max_retries = float("inf") if _csot.get_timeout() is not None else 1
self._client = mongo_client
self._retry_policy = mongo_client._retry_policy
self._func = func
Expand Down Expand Up @@ -2852,6 +2852,8 @@ async def run(self) -> T:
# ConnectionFailures do not supply a code property
exc_code = getattr(exc, "code", None)
overloaded = exc.has_error_label("SystemOverloadedError")
if overloaded:
self._max_retries = self._client.options.max_adaptive_retries
always_retryable = exc.has_error_label("RetryableError") and overloaded
if not self._client.options.retry_reads or (
not always_retryable
Expand Down Expand Up @@ -2890,6 +2892,8 @@ async def run(self) -> T:
exc_to_check = exc.error
retryable_write_label = exc_to_check.has_error_label("RetryableWriteError")
overloaded = exc_to_check.has_error_label("SystemOverloadedError")
if overloaded:
self._max_retries = self._client.options.max_adaptive_retries
always_retryable = exc_to_check.has_error_label("RetryableError") and overloaded

# Always retry abortTransaction and commitTransaction up to once
Expand Down Expand Up @@ -2943,7 +2947,9 @@ async def run(self) -> T:

def _is_not_eligible_for_retry(self) -> bool:
"""Checks if the exchange is not eligible for retry"""
return not self._retryable or (self._is_retrying() and not self._multiple_retries)
return not self._retryable or (
self._is_retrying() and self._attempt_number >= self._max_retries
)

def _is_retrying(self) -> bool:
"""Checks if the exchange is currently undergoing a retry"""
Expand Down
10 changes: 8 additions & 2 deletions pymongo/synchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2769,7 +2769,7 @@ def __init__(
self._last_error: Optional[Exception] = None
self._retrying = False
self._always_retryable = False
self._multiple_retries = _csot.get_timeout() is not None
self._max_retries = float("inf") if _csot.get_timeout() is not None else 1
self._client = mongo_client
self._retry_policy = mongo_client._retry_policy
self._func = func
Expand Down Expand Up @@ -2842,6 +2842,8 @@ def run(self) -> T:
# ConnectionFailures do not supply a code property
exc_code = getattr(exc, "code", None)
overloaded = exc.has_error_label("SystemOverloadedError")
if overloaded:
self._max_retries = self._client.options.max_adaptive_retries
always_retryable = exc.has_error_label("RetryableError") and overloaded
if not self._client.options.retry_reads or (
not always_retryable
Expand Down Expand Up @@ -2880,6 +2882,8 @@ def run(self) -> T:
exc_to_check = exc.error
retryable_write_label = exc_to_check.has_error_label("RetryableWriteError")
overloaded = exc_to_check.has_error_label("SystemOverloadedError")
if overloaded:
self._max_retries = self._client.options.max_adaptive_retries
always_retryable = exc_to_check.has_error_label("RetryableError") and overloaded

# Always retry abortTransaction and commitTransaction up to once
Expand Down Expand Up @@ -2933,7 +2937,9 @@ def run(self) -> T:

def _is_not_eligible_for_retry(self) -> bool:
"""Checks if the exchange is not eligible for retry"""
return not self._retryable or (self._is_retrying() and not self._multiple_retries)
return not self._retryable or (
self._is_retrying() and self._attempt_number >= self._max_retries
)

def _is_retrying(self) -> bool:
"""Checks if the exchange is currently undergoing a retry"""
Expand Down
71 changes: 70 additions & 1 deletion test/asynchronous/test_retryable_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import threading
from test.asynchronous.utils import async_set_fail_point

from pymongo.errors import OperationFailure
from pymongo import MongoClient
from pymongo.common import MAX_ADAPTIVE_RETRIES
from pymongo.errors import OperationFailure, PyMongoError

sys.path[0:0] = [""]

Expand All @@ -38,6 +40,7 @@
)

from pymongo.monitoring import (
CommandFailedEvent,
ConnectionCheckedOutEvent,
ConnectionCheckOutFailedEvent,
ConnectionCheckOutFailedReason,
Expand Down Expand Up @@ -145,6 +148,20 @@ async def test_pool_paused_error_is_retryable(self):


class TestRetryableReads(AsyncIntegrationTest):
async def asyncSetUp(self) -> None:
await super().asyncSetUp()
self.setup_client = MongoClient(**async_client_context.client_options)
self.addCleanup(self.setup_client.close)

# TODO: After PYTHON-4595 we can use async event handlers and remove this workaround.
def configure_fail_point_sync(self, command_args, off=False) -> None:
cmd = {"configureFailPoint": "failCommand"}
cmd.update(command_args)
Comment thread
NoahStapp marked this conversation as resolved.
Outdated
if off:
cmd["mode"] = "off"
cmd.pop("data", None)
self.setup_client.admin.command(cmd)

@async_client_context.require_multiple_mongoses
@async_client_context.require_failCommand_fail_point
async def test_retryable_reads_are_retried_on_a_different_mongos_when_one_is_available(self):
Expand Down Expand Up @@ -383,6 +400,58 @@ async def test_03_03_retryable_reads_caused_by_overload_errors_are_retried_on_th
# 6. Assert that both events occurred on the same server.
assert listener.failed_events[0].connection_id == listener.succeeded_events[0].connection_id

@async_client_context.require_failCommand_fail_point
@async_client_context.require_version_min(4, 4, 0) # type:ignore[untyped-decorator]
async def test_overload_then_nonoverload_retries_increased_reads(self) -> None:
# Create a client.
listener = OvertCommandListener()

# Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error
# code `91` (ShutdownInProgress) and `RetryableError` and `SystemOverloadedError` labels.
overload_fail_point = {
"configureFailPoint": "failCommand",
"mode": {"times": 1},
"data": {
"failCommands": ["find"],
"errorLabels": ["RetryableError", "SystemOverloadedError"],
"errorCode": 91,
},
}

# Configure a fail point with error code `91` (ShutdownInProgress) with only the `RetryableError` error label.
non_overload_fail_point = {
"configureFailPoint": "failCommand",
"mode": "alwaysOn",
"data": {
"failCommands": ["find"],
"errorCode": 91,
"errorLabels": ["RetryableError"],
},
}

def failed(event: CommandFailedEvent) -> None:
# Configure the fail point command only if the failed event is for the 91 error configured in step 2.
if listener.failed_events:
return
assert event.failure["code"] == 91
self.configure_fail_point_sync(non_overload_fail_point)
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
listener.failed_events.append(event)

listener.failed = failed

client = await self.async_rs_client(event_listeners=[listener])
await client.test.test.insert_one({})

self.configure_fail_point_sync(overload_fail_point)
self.addCleanup(self.configure_fail_point_sync, {}, off=True)

Comment thread
NoahStapp marked this conversation as resolved.
with self.assertRaises(PyMongoError):
await client.test.test.find_one()

started_finds = [e for e in listener.started_events if e.command_name == "find"]
self.assertEqual(len(started_finds), MAX_ADAPTIVE_RETRIES + 1)


if __name__ == "__main__":
unittest.main()
51 changes: 51 additions & 0 deletions test/asynchronous/test_retryable_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import threading
from test.asynchronous.utils import async_set_fail_point, flaky

from pymongo.common import MAX_ADAPTIVE_RETRIES

sys.path[0:0] = [""]

from test.asynchronous import (
Expand Down Expand Up @@ -784,6 +786,55 @@ def failed(event: CommandFailedEvent) -> None:
# Assert that the error does not contain the error label `NoWritesPerformed`.
assert "NoWritesPerformed" not in exc.exception.errors["errorLabels"]

async def test_overload_then_nonoverload_retries_increased_writes(self) -> None:
# Create a client with retryWrites=true.
listener = OvertCommandListener()

# Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error
# code `91` (ShutdownInProgress) and `RetryableError` and `SystemOverloadedError` labels.
overload_fail_point = {
"configureFailPoint": "failCommand",
"mode": {"times": 1},
"data": {
"failCommands": ["insert"],
"errorLabels": ["RetryableError", "SystemOverloadedError"],
"errorCode": 91,
},
}

# Configure a fail point with error code `91` (ShutdownInProgress) with the `RetryableError` and `RetryableWriteError` error labels.
non_overload_fail_point = {
"configureFailPoint": "failCommand",
"mode": "alwaysOn",
"data": {
"failCommands": ["insert"],
"errorCode": 91,
"errorLabels": ["RetryableError", "RetryableWriteError"],
},
}

def failed(event: CommandFailedEvent) -> None:
# Configure the fail point command only if the failed event is for the 91 error configured in step 2.
if listener.failed_events:
return
assert event.failure["code"] == 91
self.configure_fail_point_sync(non_overload_fail_point)
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
listener.failed_events.append(event)

listener.failed = failed

client = await self.async_rs_client(retryWrites=True, event_listeners=[listener])

self.configure_fail_point_sync(overload_fail_point)
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
Comment thread
Jibola marked this conversation as resolved.

Comment thread
NoahStapp marked this conversation as resolved.
with self.assertRaises(PyMongoError):
await client.test.test.insert_one({"x": 1})

started_inserts = [e for e in listener.started_events if e.command_name == "insert"]
self.assertEqual(len(started_inserts), MAX_ADAPTIVE_RETRIES + 1)


if __name__ == "__main__":
unittest.main()
71 changes: 70 additions & 1 deletion test/test_retryable_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import threading
from test.utils import set_fail_point

from pymongo.errors import OperationFailure
from pymongo import MongoClient
from pymongo.common import MAX_ADAPTIVE_RETRIES
from pymongo.errors import OperationFailure, PyMongoError

sys.path[0:0] = [""]

Expand All @@ -38,6 +40,7 @@
)

from pymongo.monitoring import (
CommandFailedEvent,
ConnectionCheckedOutEvent,
ConnectionCheckOutFailedEvent,
ConnectionCheckOutFailedReason,
Expand Down Expand Up @@ -145,6 +148,20 @@ def test_pool_paused_error_is_retryable(self):


class TestRetryableReads(IntegrationTest):
def setUp(self) -> None:
super().setUp()
self.setup_client = MongoClient(**client_context.client_options)
self.addCleanup(self.setup_client.close)

Comment on lines +152 to +156
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Separate PR) Can we add creating the collection here or do we have several deviations?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you open a ticket detailing what you mean?

Copy link
Copy Markdown
Contributor

@Jibola Jibola Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# TODO: After PYTHON-4595 we can use async event handlers and remove this workaround.
def configure_fail_point_sync(self, command_args, off=False) -> None:
cmd = {"configureFailPoint": "failCommand"}
cmd.update(command_args)
if off:
cmd["mode"] = "off"
cmd.pop("data", None)
self.setup_client.admin.command(cmd)

@client_context.require_multiple_mongoses
@client_context.require_failCommand_fail_point
def test_retryable_reads_are_retried_on_a_different_mongos_when_one_is_available(self):
Expand Down Expand Up @@ -381,6 +398,58 @@ def test_03_03_retryable_reads_caused_by_overload_errors_are_retried_on_the_same
# 6. Assert that both events occurred on the same server.
assert listener.failed_events[0].connection_id == listener.succeeded_events[0].connection_id

@client_context.require_failCommand_fail_point
@client_context.require_version_min(4, 4, 0) # type:ignore[untyped-decorator]
def test_overload_then_nonoverload_retries_increased_reads(self) -> None:
# Create a client.
listener = OvertCommandListener()

# Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error
# code `91` (ShutdownInProgress) and `RetryableError` and `SystemOverloadedError` labels.
overload_fail_point = {
"configureFailPoint": "failCommand",
"mode": {"times": 1},
"data": {
"failCommands": ["find"],
"errorLabels": ["RetryableError", "SystemOverloadedError"],
"errorCode": 91,
},
}

# Configure a fail point with error code `91` (ShutdownInProgress) with only the `RetryableError` error label.
non_overload_fail_point = {
"configureFailPoint": "failCommand",
"mode": "alwaysOn",
"data": {
"failCommands": ["find"],
"errorCode": 91,
"errorLabels": ["RetryableError"],
},
}

def failed(event: CommandFailedEvent) -> None:
# Configure the fail point command only if the failed event is for the 91 error configured in step 2.
if listener.failed_events:
return
assert event.failure["code"] == 91
self.configure_fail_point_sync(non_overload_fail_point)
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
listener.failed_events.append(event)

listener.failed = failed

client = self.rs_client(event_listeners=[listener])
client.test.test.insert_one({})

self.configure_fail_point_sync(overload_fail_point)
self.addCleanup(self.configure_fail_point_sync, {}, off=True)

with self.assertRaises(PyMongoError):
client.test.test.find_one()

started_finds = [e for e in listener.started_events if e.command_name == "find"]
self.assertEqual(len(started_finds), MAX_ADAPTIVE_RETRIES + 1)


if __name__ == "__main__":
unittest.main()
Loading
Loading