diff --git a/.evergreen/run-tests.sh b/.evergreen/run-tests.sh index 2b7d856d41..a9f2ba2b5c 100755 --- a/.evergreen/run-tests.sh +++ b/.evergreen/run-tests.sh @@ -26,12 +26,9 @@ else fi # List the packages. -uv sync ${UV_ARGS} --reinstall +uv sync ${UV_ARGS} --reinstall --quiet uv pip list -# Ensure we go back to base environment after the test. -trap "uv sync" EXIT HUP - # Start the test runner. uv run ${UV_ARGS} .evergreen/scripts/run_tests.py "$@" diff --git a/justfile b/justfile index 74ebb48823..7ac5bd33ff 100644 --- a/justfile +++ b/justfile @@ -1,10 +1,11 @@ # See https://just.systems/man/en/ for instructions set shell := ["bash", "-c"] +# Do not modify the lock file when running justfile commands. +export UV_FROZEN := "1" # Commonly used command segments. -uv_run := "uv run --isolated --frozen " -typing_run := uv_run + "--group typing --extra aws --extra encryption --extra ocsp --extra snappy --extra test --extra zstd" -docs_run := uv_run + "--extra docs" +typing_run := "uv run --group typing --extra aws --extra encryption --extra ocsp --extra snappy --extra test --extra zstd" +docs_run := "uv run --extra docs" doc_build := "./doc/_build" mypy_args := "--install-types --non-interactive" @@ -13,51 +14,55 @@ mypy_args := "--install-types --non-interactive" default: @just --list +[private] +resync: + @uv sync --quiet --frozen + install: bash .evergreen/scripts/setup-dev-env.sh [group('docs')] -docs: +docs: && resync {{docs_run}} sphinx-build -W -b html doc {{doc_build}}/html [group('docs')] -docs-serve: +docs-serve: && resync {{docs_run}} sphinx-autobuild -W -b html doc --watch ./pymongo --watch ./bson --watch ./gridfs {{doc_build}}/serve [group('docs')] -docs-linkcheck: +docs-linkcheck: && resync {{docs_run}} sphinx-build -E -b linkcheck doc {{doc_build}}/linkcheck [group('typing')] -typing: +typing: && resync just typing-mypy just typing-pyright [group('typing')] -typing-mypy: +typing-mypy: && resync {{typing_run}} mypy {{mypy_args}} bson gridfs tools pymongo {{typing_run}} mypy {{mypy_args}} --config-file mypy_test.ini test {{typing_run}} mypy {{mypy_args}} test/test_typing.py test/test_typing_strict.py [group('typing')] -typing-pyright: +typing-pyright: && resync {{typing_run}} pyright test/test_typing.py test/test_typing_strict.py {{typing_run}} pyright -p strict_pyrightconfig.json test/test_typing_strict.py [group('lint')] -lint: - {{uv_run}} pre-commit run --all-files +lint: && resync + uv run pre-commit run --all-files [group('lint')] -lint-manual: - {{uv_run}} pre-commit run --all-files --hook-stage manual +lint-manual: && resync + uv run pre-commit run --all-files --hook-stage manual [group('test')] -test *args="-v --durations=5 --maxfail=10": - {{uv_run}} --extra test pytest {{args}} +test *args="-v --durations=5 --maxfail=10": && resync + uv run --extra test pytest {{args}} [group('test')] -run-tests *args: +run-tests *args: && resync bash ./.evergreen/run-tests.sh {{args}} [group('test')] diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index 8c169b4c52..065686f43a 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -37,7 +37,7 @@ from bson import DEFAULT_CODEC_OPTIONS from pymongo import _csot, helpers_shared from pymongo.asynchronous.client_session import _validate_session_write_concern -from pymongo.asynchronous.helpers import _handle_reauth +from pymongo.asynchronous.helpers import _backoff, _handle_reauth from pymongo.asynchronous.network import command from pymongo.common import ( MAX_BSON_SIZE, @@ -788,9 +788,9 @@ def __init__( # Enforces: maxConnecting # Also used for: clearing the wait queue self._max_connecting_cond = _async_create_condition(self.lock) - self._max_connecting = self.opts.max_connecting self._pending = 0 self._client_id = client_id + self._backoff = 0 if self.enabled_for_cmap: assert self.opts._event_listeners is not None self.opts._event_listeners.publish_pool_created( @@ -846,6 +846,8 @@ async def _reset( async with self.size_cond: if self.closed: return + # Clear the backoff state. + self._backoff = 0 if self.opts.pause_enabled and pause and not self.opts.load_balanced: old_state, self.state = self.state, PoolState.PAUSED self.gen.inc(service_id) @@ -928,6 +930,11 @@ async def _reset( for conn in sockets: await conn.close_conn(ConnectionClosedReason.STALE) + @property + def max_connecting(self) -> int: + """The current max connecting limit for the pool.""" + return 1 if self._backoff else self.opts.max_connecting + async def update_is_writable(self, is_writable: Optional[bool]) -> None: """Updates the is_writable attribute on all sockets currently in the Pool. @@ -994,7 +1001,7 @@ async def remove_stale_sockets(self, reference_generation: int) -> None: async with self._max_connecting_cond: # If maxConnecting connections are already being created # by this pool then try again later instead of waiting. - if self._pending >= self._max_connecting: + if self._pending >= self.max_connecting: return self._pending += 1 incremented = True @@ -1022,6 +1029,30 @@ async def remove_stale_sockets(self, reference_generation: int) -> None: self.requests -= 1 self.size_cond.notify() + def _handle_connection_error(self, error: BaseException, phase: str, conn_id: int) -> None: + # Handle system overload condition for non-sdam pools. + # Look for an AutoReconnect error raised from a ConnectionResetError with + # errno == errno.ECONNRESET or raised from an OSError that we've created due to + # a closed connection. + # If found, set backoff and add error labels. + if self.is_sdam or type(error) != AutoReconnect: + return + self._backoff += 1 + error._add_error_label("SystemOverloadedError") + error._add_error_label("RetryableError") + # Log the pool backoff message. + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + message=_ConnectionStatusMessage.POOL_BACKOFF, + clientId=self._client_id, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=conn_id, + reason=_verbose_connection_error_reason(ConnectionClosedReason.POOL_BACKOFF), + error=ConnectionClosedReason.POOL_BACKOFF, + ) + async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> AsyncConnection: """Connect to Mongo and return a new AsyncConnection. @@ -1051,8 +1082,17 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A driverConnectionId=conn_id, ) + # Apply backoff if applicable. + if self._backoff: + await asyncio.sleep(_backoff(self._backoff)) + + # Pass a context to determine if we successfully create a configured socket. + context = dict(has_created_socket=False) + try: - networking_interface = await _configured_protocol_interface(self.address, self.opts) + networking_interface = await _configured_protocol_interface( + self.address, self.opts, context=context + ) # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. except BaseException as error: async with self.lock: @@ -1073,10 +1113,11 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), error=ConnectionClosedReason.ERROR, ) + if context["has_created_socket"]: + self._handle_connection_error(error, "handshake", conn_id) if isinstance(error, (IOError, OSError, *SSLErrors)): details = _get_timeout_details(self.opts) _raise_connection_failure(self.address, error, timeout_details=details) - raise conn = AsyncConnection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type] @@ -1094,15 +1135,18 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A await conn.authenticate() # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. - except BaseException: + except BaseException as e: async with self.lock: self.active_contexts.discard(conn.cancel_context) + self._handle_connection_error(e, "hello", conn_id) await conn.close_conn(ConnectionClosedReason.ERROR) raise if handler: await handler.client._topology.receive_cluster_time(conn._cluster_time) + # Clear the backoff state. + self._backoff = 0 return conn @contextlib.asynccontextmanager @@ -1279,12 +1323,12 @@ async def _get_conn( # to be checked back into the pool. async with self._max_connecting_cond: self._raise_if_not_ready(checkout_started_time, emit_event=False) - while not (self.conns or self._pending < self._max_connecting): + while not (self.conns or self._pending < self.max_connecting): timeout = deadline - time.monotonic() if deadline else None if not await _async_cond_wait(self._max_connecting_cond, timeout): # Timed out, notify the next thread to ensure a # timeout doesn't consume the condition. - if self.conns or self._pending < self._max_connecting: + if self.conns or self._pending < self.max_connecting: self._max_connecting_cond.notify() emitted_event = True self._raise_wait_queue_timeout(checkout_started_time) @@ -1425,8 +1469,8 @@ async def _perished(self, conn: AsyncConnection) -> bool: :class:`~pymongo.errors.AutoReconnect` exceptions on server hiccups, etc. We only check if the socket was closed by an external error if it has been > 1 second since the socket was checked into the - pool, to keep performance reasonable - we can't avoid AutoReconnects - completely anyway. + pool, or we are in backoff mode, to keep performance reasonable - + we can't avoid AutoReconnects completely anyway. """ idle_time_seconds = conn.idle_time_seconds() # If socket is idle, open a new one. @@ -1437,8 +1481,11 @@ async def _perished(self, conn: AsyncConnection) -> bool: await conn.close_conn(ConnectionClosedReason.IDLE) return True - if self._check_interval_seconds is not None and ( - self._check_interval_seconds == 0 or idle_time_seconds > self._check_interval_seconds + check_interval_seconds = self._check_interval_seconds + if self._backoff: + check_interval_seconds = 0 + if check_interval_seconds is not None and ( + check_interval_seconds == 0 or idle_time_seconds > check_interval_seconds ): if conn.conn_closed(): await conn.close_conn(ConnectionClosedReason.ERROR) diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index 283aabc690..1e91bbe79b 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -890,7 +890,9 @@ async def _handle_error(self, address: _Address, err_ctx: _ErrorContext) -> None # Clear the pool. await server.reset(service_id) elif isinstance(error, ConnectionFailure): - if isinstance(error, WaitQueueTimeoutError): + if isinstance(error, WaitQueueTimeoutError) or error.has_error_label( + "SystemOverloadedError" + ): return # "Client MUST replace the server's description with type Unknown # ... MUST NOT request an immediate check of the server." diff --git a/pymongo/logger.py b/pymongo/logger.py index 1b3fe43b86..ccfc45ed88 100644 --- a/pymongo/logger.py +++ b/pymongo/logger.py @@ -42,6 +42,7 @@ class _ConnectionStatusMessage(str, enum.Enum): POOL_READY = "Connection pool ready" POOL_CLOSED = "Connection pool closed" POOL_CLEARED = "Connection pool cleared" + POOL_BACKOFF = "Connection pool backoff" CONN_CREATED = "Connection created" CONN_READY = "Connection ready" @@ -88,6 +89,7 @@ class _SDAMStatusMessage(str, enum.Enum): _VERBOSE_CONNECTION_ERROR_REASONS = { ConnectionClosedReason.POOL_CLOSED: "Connection pool was closed", ConnectionCheckOutFailedReason.POOL_CLOSED: "Connection pool was closed", + ConnectionClosedReason.POOL_BACKOFF: "Connection pool is in backoff", ConnectionClosedReason.STALE: "Connection pool was stale", ConnectionClosedReason.ERROR: "An error occurred while using the connection", ConnectionCheckOutFailedReason.CONN_ERROR: "An error occurred while trying to establish a new connection", diff --git a/pymongo/monitoring.py b/pymongo/monitoring.py index 46a78aea0b..0dfbbb915a 100644 --- a/pymongo/monitoring.py +++ b/pymongo/monitoring.py @@ -934,6 +934,9 @@ class ConnectionClosedReason: POOL_CLOSED = "poolClosed" """The pool was closed, making the connection no longer valid.""" + POOL_BACKOFF = "poolBackoff" + """The pool is in backoff mode.""" + class ConnectionCheckOutFailedReason: """An enum that defines values for `reason` on a diff --git a/pymongo/network_layer.py b/pymongo/network_layer.py index 605b8dde9b..9bf46cbc3d 100644 --- a/pymongo/network_layer.py +++ b/pymongo/network_layer.py @@ -256,6 +256,7 @@ def __init__(self, timeout: Optional[float] = None): self._timeout = timeout self._closed = asyncio.get_running_loop().create_future() self._connection_lost = False + self._closing_exception = None def settimeout(self, timeout: float | None) -> None: self._timeout = timeout @@ -269,9 +270,11 @@ def close(self, exc: Optional[Exception] = None) -> None: self.transport.abort() self._resolve_pending(exc) self._connection_lost = True + self._closing_exception = exc # type:ignore[assignment] def connection_lost(self, exc: Optional[Exception] = None) -> None: self._resolve_pending(exc) + self._closing_exception = exc # type:ignore[assignment] if not self._closed.done(): self._closed.set_result(None) @@ -335,8 +338,11 @@ async def read(self, request_id: Optional[int], max_message_size: int) -> tuple[ if self._done_messages: message = await self._done_messages.popleft() else: - if self.transport and self.transport.is_closing(): - raise OSError("connection is already closed") + if self._closed.done(): + if self._closing_exception: + raise self._closing_exception + else: + raise OSError("connection closed") read_waiter = asyncio.get_running_loop().create_future() self._pending_messages.append(read_waiter) try: @@ -474,6 +480,7 @@ def _resolve_pending(self, exc: Optional[Exception] = None) -> None: else: msg.set_exception(exc) self._done_messages.append(msg) + self._pending_messages.clear() class PyMongoKMSProtocol(PyMongoBaseProtocol): diff --git a/pymongo/pool_shared.py b/pymongo/pool_shared.py index 0536dc3835..c555b125df 100644 --- a/pymongo/pool_shared.py +++ b/pymongo/pool_shared.py @@ -250,6 +250,7 @@ async def _configured_protocol_interface( address: _Address, options: PoolOptions, protocol_kls: type[PyMongoBaseProtocol] = PyMongoProtocol, + context: dict[str, bool] | None = None, ) -> AsyncNetworkingInterface: """Given (host, port) and PoolOptions, return a configured AsyncNetworkingInterface. @@ -261,6 +262,10 @@ async def _configured_protocol_interface( ssl_context = options._ssl_context timeout = options.socket_timeout + # Signal that we have created the socket successfully. + if context: + context["has_created_socket"] = True + if ssl_context is None: return AsyncNetworkingInterface( await asyncio.get_running_loop().create_connection( @@ -374,7 +379,7 @@ def _create_connection(address: _Address, options: PoolOptions) -> socket.socket def _configured_socket_interface( - address: _Address, options: PoolOptions, *args: Any + address: _Address, options: PoolOptions, *args: Any, context: dict[str, bool] | None = None ) -> NetworkingInterface: """Given (host, port) and PoolOptions, return a NetworkingInterface wrapping a configured socket. @@ -385,6 +390,10 @@ def _configured_socket_interface( sock = _create_connection(address, options) ssl_context = options._ssl_context + # Signal that we have created the socket successfully. + if context: + context["has_created_socket"] = True + if ssl_context is None: sock.settimeout(options.socket_timeout) return NetworkingInterface(sock) diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index f35ca4d0fd..d0c517f186 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -84,7 +84,7 @@ from pymongo.server_type import SERVER_TYPE from pymongo.socket_checker import SocketChecker from pymongo.synchronous.client_session import _validate_session_write_concern -from pymongo.synchronous.helpers import _handle_reauth +from pymongo.synchronous.helpers import _backoff, _handle_reauth from pymongo.synchronous.network import command if TYPE_CHECKING: @@ -786,9 +786,9 @@ def __init__( # Enforces: maxConnecting # Also used for: clearing the wait queue self._max_connecting_cond = _create_condition(self.lock) - self._max_connecting = self.opts.max_connecting self._pending = 0 self._client_id = client_id + self._backoff = 0 if self.enabled_for_cmap: assert self.opts._event_listeners is not None self.opts._event_listeners.publish_pool_created( @@ -844,6 +844,8 @@ def _reset( with self.size_cond: if self.closed: return + # Clear the backoff state. + self._backoff = 0 if self.opts.pause_enabled and pause and not self.opts.load_balanced: old_state, self.state = self.state, PoolState.PAUSED self.gen.inc(service_id) @@ -926,6 +928,11 @@ def _reset( for conn in sockets: conn.close_conn(ConnectionClosedReason.STALE) + @property + def max_connecting(self) -> int: + """The current max connecting limit for the pool.""" + return 1 if self._backoff else self.opts.max_connecting + def update_is_writable(self, is_writable: Optional[bool]) -> None: """Updates the is_writable attribute on all sockets currently in the Pool. @@ -990,7 +997,7 @@ def remove_stale_sockets(self, reference_generation: int) -> None: with self._max_connecting_cond: # If maxConnecting connections are already being created # by this pool then try again later instead of waiting. - if self._pending >= self._max_connecting: + if self._pending >= self.max_connecting: return self._pending += 1 incremented = True @@ -1018,6 +1025,30 @@ def remove_stale_sockets(self, reference_generation: int) -> None: self.requests -= 1 self.size_cond.notify() + def _handle_connection_error(self, error: BaseException, phase: str, conn_id: int) -> None: + # Handle system overload condition for non-sdam pools. + # Look for an AutoReconnect error raised from a ConnectionResetError with + # errno == errno.ECONNRESET or raised from an OSError that we've created due to + # a closed connection. + # If found, set backoff and add error labels. + if self.is_sdam or type(error) != AutoReconnect: + return + self._backoff += 1 + error._add_error_label("SystemOverloadedError") + error._add_error_label("RetryableError") + # Log the pool backoff message. + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + message=_ConnectionStatusMessage.POOL_BACKOFF, + clientId=self._client_id, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=conn_id, + reason=_verbose_connection_error_reason(ConnectionClosedReason.POOL_BACKOFF), + error=ConnectionClosedReason.POOL_BACKOFF, + ) + def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connection: """Connect to Mongo and return a new Connection. @@ -1047,8 +1078,17 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect driverConnectionId=conn_id, ) + # Apply backoff if applicable. + if self._backoff: + time.sleep(_backoff(self._backoff)) + + # Pass a context to determine if we successfully create a configured socket. + context = dict(has_created_socket=False) + try: - networking_interface = _configured_socket_interface(self.address, self.opts) + networking_interface = _configured_socket_interface( + self.address, self.opts, context=context + ) # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. except BaseException as error: with self.lock: @@ -1069,10 +1109,11 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), error=ConnectionClosedReason.ERROR, ) + if context["has_created_socket"]: + self._handle_connection_error(error, "handshake", conn_id) if isinstance(error, (IOError, OSError, *SSLErrors)): details = _get_timeout_details(self.opts) _raise_connection_failure(self.address, error, timeout_details=details) - raise conn = Connection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type] @@ -1090,15 +1131,18 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect conn.authenticate() # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. - except BaseException: + except BaseException as e: with self.lock: self.active_contexts.discard(conn.cancel_context) + self._handle_connection_error(e, "hello", conn_id) conn.close_conn(ConnectionClosedReason.ERROR) raise if handler: handler.client._topology.receive_cluster_time(conn._cluster_time) + # Clear the backoff state. + self._backoff = 0 return conn @contextlib.contextmanager @@ -1275,12 +1319,12 @@ def _get_conn( # to be checked back into the pool. with self._max_connecting_cond: self._raise_if_not_ready(checkout_started_time, emit_event=False) - while not (self.conns or self._pending < self._max_connecting): + while not (self.conns or self._pending < self.max_connecting): timeout = deadline - time.monotonic() if deadline else None if not _cond_wait(self._max_connecting_cond, timeout): # Timed out, notify the next thread to ensure a # timeout doesn't consume the condition. - if self.conns or self._pending < self._max_connecting: + if self.conns or self._pending < self.max_connecting: self._max_connecting_cond.notify() emitted_event = True self._raise_wait_queue_timeout(checkout_started_time) @@ -1421,8 +1465,8 @@ def _perished(self, conn: Connection) -> bool: :class:`~pymongo.errors.AutoReconnect` exceptions on server hiccups, etc. We only check if the socket was closed by an external error if it has been > 1 second since the socket was checked into the - pool, to keep performance reasonable - we can't avoid AutoReconnects - completely anyway. + pool, or we are in backoff mode, to keep performance reasonable - + we can't avoid AutoReconnects completely anyway. """ idle_time_seconds = conn.idle_time_seconds() # If socket is idle, open a new one. @@ -1433,8 +1477,11 @@ def _perished(self, conn: Connection) -> bool: conn.close_conn(ConnectionClosedReason.IDLE) return True - if self._check_interval_seconds is not None and ( - self._check_interval_seconds == 0 or idle_time_seconds > self._check_interval_seconds + check_interval_seconds = self._check_interval_seconds + if self._backoff: + check_interval_seconds = 0 + if check_interval_seconds is not None and ( + check_interval_seconds == 0 or idle_time_seconds > check_interval_seconds ): if conn.conn_closed(): conn.close_conn(ConnectionClosedReason.ERROR) diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index a4ca0e6e0f..0f6592dfc0 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -888,7 +888,9 @@ def _handle_error(self, address: _Address, err_ctx: _ErrorContext) -> None: # Clear the pool. server.reset(service_id) elif isinstance(error, ConnectionFailure): - if isinstance(error, WaitQueueTimeoutError): + if isinstance(error, WaitQueueTimeoutError) or error.has_error_label( + "SystemOverloadedError" + ): return # "Client MUST replace the server's description with type Unknown # ... MUST NOT request an immediate check of the server." diff --git a/test/asynchronous/test_encryption.py b/test/asynchronous/test_encryption.py index f6afa4b2a3..adba824143 100644 --- a/test/asynchronous/test_encryption.py +++ b/test/asynchronous/test_encryption.py @@ -1276,7 +1276,7 @@ async def test_06_insert_fails_over_16MiB(self): with self.assertRaises(BulkWriteError) as ctx: await self.coll_encrypted.bulk_write([InsertOne(doc)]) err = ctx.exception.details["writeErrors"][0] - self.assertEqual(2, err["code"]) + self.assertIn(err["code"], [2, 10334]) self.assertIn("object to insert too large", err["errmsg"]) diff --git a/test/asynchronous/test_pooling.py b/test/asynchronous/test_pooling.py index 3193d9e3d5..6cbdf7a65c 100644 --- a/test/asynchronous/test_pooling.py +++ b/test/asynchronous/test_pooling.py @@ -29,6 +29,7 @@ from pymongo.errors import AutoReconnect, ConnectionFailure, DuplicateKeyError from pymongo.hello import HelloCompat from pymongo.lock import _async_create_lock +from pymongo.read_preferences import ReadPreference sys.path[0:0] = [""] @@ -513,6 +514,77 @@ async def test_connection_timeout_message(self): str(error.exception), ) + async def test_pool_check_backoff(self): + # Test that Pool recovers from two connection failures in a row. + # This exercises code at the end of Pool._check(). + cx_pool = await self.create_pool(max_pool_size=1, connect_timeout=1, wait_queue_timeout=1) + self.addAsyncCleanup(cx_pool.close) + + async with cx_pool.checkout() as conn: + # Simulate a closed socket without telling the Connection it's + # closed. + await conn.conn.close() + + # Enable backoff. + cx_pool._backoff = 1 + + # Swap pool's address with a bad one. + address, cx_pool.address = cx_pool.address, ("foo.com", 1234) + with self.assertRaises(AutoReconnect): + async with cx_pool.checkout(): + pass + + # Back to normal, semaphore was correctly released. + cx_pool.address = address + async with cx_pool.checkout(): + pass + + @async_client_context.require_failCommand_appName + async def test_pool_backoff_preserves_existing_connections(self): + client = await self.async_rs_or_single_client() + coll = self.db.t + pool = await async_get_pool(client) + await coll.insert_many([{"x": 1} for _ in range(10)]) + t = SocketGetter(self.c, pool) + await t.start() + while t.state != "connection": + await asyncio.sleep(0.1) + + assert not t.sock.conn_closed() + + # Mock a session establishment overload. + mock_connection_fail = { + "configureFailPoint": "failCommand", + "mode": {"times": 1}, + "data": { + "closeConnection": True, + }, + } + + async with self.fail_point(mock_connection_fail): + await coll.find_one({}) + + # Make sure the pool is out of backoff state. + assert pool._backoff == 0 + + # Make sure the existing socket was not affected. + assert not t.sock.conn_closed() + + # Cleanup + await t.release_conn() + await t.join() + await pool.close() + + async def test_pool_backoff_limits_maxConnecting(self): + client = await self.async_rs_or_single_client(maxConnecting=10) + pool = await async_get_pool(client) + assert pool.max_connecting == 10 + pool._backoff = 1 + assert pool.max_connecting == 1 + pool._backoff = 0 + assert pool.max_connecting == 10 + await client.close() + class TestPoolMaxSize(_TestPoolingBase): async def test_max_pool_size(self): diff --git a/test/connection_logging/connection-logging.json b/test/connection_logging/connection-logging.json index 5799e834d7..60190c7dc0 100644 --- a/test/connection_logging/connection-logging.json +++ b/test/connection_logging/connection-logging.json @@ -331,7 +331,9 @@ "uriOptions": { "retryReads": false, "appname": "clientAppName", - "heartbeatFrequencyMS": 10000 + "heartbeatFrequencyMS": 10000, + "socketTimeoutMS": 500, + "connectTimeoutMS": 500 }, "observeLogMessages": { "connection": "debug" @@ -355,7 +357,9 @@ "failCommands": [ "saslContinue" ], - "closeConnection": true, + "closeConnection": false, + "blockConnection": true, + "blockTimeMS": 1000, "appName": "clientAppName" } } diff --git a/test/connection_monitoring/pool-create-min-size-error.json b/test/connection_monitoring/pool-create-min-size-error.json index 1c744b850c..8ec958780d 100644 --- a/test/connection_monitoring/pool-create-min-size-error.json +++ b/test/connection_monitoring/pool-create-min-size-error.json @@ -9,21 +9,23 @@ ], "failPoint": { "configureFailPoint": "failCommand", - "mode": { - "times": 50 - }, + "mode": "alwaysOn", "data": { "failCommands": [ "isMaster", "hello" ], - "closeConnection": true, + "closeConnection": false, + "blockConnection": true, + "blockTimeMS": 1000, "appName": "poolCreateMinSizeErrorTest" } }, "poolOptions": { "minPoolSize": 1, "backgroundThreadIntervalMS": 50, + "socketTimeoutMS": 500, + "connectTimeoutMS": 500, "appName": "poolCreateMinSizeErrorTest" }, "operations": [ diff --git a/test/discovery_and_monitoring/unified/auth-network-error.json b/test/discovery_and_monitoring/unified/auth-network-error.json index 84763af32e..656b291366 100644 --- a/test/discovery_and_monitoring/unified/auth-network-error.json +++ b/test/discovery_and_monitoring/unified/auth-network-error.json @@ -53,7 +53,9 @@ "failCommands": [ "saslContinue" ], - "closeConnection": true, + "closeConnection": false, + "blockConnection": true, + "blockTimeMS": 1000, "appName": "authNetworkErrorTest" } } @@ -75,6 +77,8 @@ ], "uriOptions": { "retryWrites": false, + "socketTimeoutMS": 500, + "connectTimeoutMS": 500, "appname": "authNetworkErrorTest" } } diff --git a/test/load_balancer/sdam-error-handling.json b/test/load_balancer/sdam-error-handling.json index 5892dcacd6..b9842b8017 100644 --- a/test/load_balancer/sdam-error-handling.json +++ b/test/load_balancer/sdam-error-handling.json @@ -32,6 +32,8 @@ "useMultipleMongoses": false, "uriOptions": { "appname": "lbSDAMErrorTestClient", + "socketTimeoutMS": 500, + "connectTimeoutMS": 500, "retryWrites": false }, "observeEvents": [ @@ -64,7 +66,9 @@ "id": "multiClient", "useMultipleMongoses": true, "uriOptions": { - "retryWrites": false + "retryWrites": false, + "socketTimeoutMS": 500, + "connectTimeoutMS": 500 }, "observeEvents": [ "connectionCreatedEvent", @@ -282,7 +286,8 @@ "isMaster", "hello" ], - "closeConnection": true, + "blockConnection": true, + "blockTimeMS": 1000, "appName": "lbSDAMErrorTestClient" } } @@ -345,7 +350,8 @@ "failCommands": [ "saslContinue" ], - "closeConnection": true, + "blockConnection": true, + "blockTimeMS": 1000, "appName": "lbSDAMErrorTestClient" } } @@ -406,7 +412,8 @@ "failCommands": [ "getMore" ], - "closeConnection": true + "closeConnection": true, + "appName": "lbSDAMErrorTestClient" } } } diff --git a/test/test_encryption.py b/test/test_encryption.py index 5c8813203d..1a307f56ee 100644 --- a/test/test_encryption.py +++ b/test/test_encryption.py @@ -1272,7 +1272,7 @@ def test_06_insert_fails_over_16MiB(self): with self.assertRaises(BulkWriteError) as ctx: self.coll_encrypted.bulk_write([InsertOne(doc)]) err = ctx.exception.details["writeErrors"][0] - self.assertEqual(2, err["code"]) + self.assertIn(err["code"], [2, 10334]) self.assertIn("object to insert too large", err["errmsg"]) diff --git a/test/test_pooling.py b/test/test_pooling.py index cb5b206996..f3bfcf4ba2 100644 --- a/test/test_pooling.py +++ b/test/test_pooling.py @@ -29,6 +29,7 @@ from pymongo.errors import AutoReconnect, ConnectionFailure, DuplicateKeyError from pymongo.hello import HelloCompat from pymongo.lock import _create_lock +from pymongo.read_preferences import ReadPreference sys.path[0:0] = [""] @@ -511,6 +512,77 @@ def test_connection_timeout_message(self): str(error.exception), ) + def test_pool_check_backoff(self): + # Test that Pool recovers from two connection failures in a row. + # This exercises code at the end of Pool._check(). + cx_pool = self.create_pool(max_pool_size=1, connect_timeout=1, wait_queue_timeout=1) + self.addCleanup(cx_pool.close) + + with cx_pool.checkout() as conn: + # Simulate a closed socket without telling the Connection it's + # closed. + conn.conn.close() + + # Enable backoff. + cx_pool._backoff = 1 + + # Swap pool's address with a bad one. + address, cx_pool.address = cx_pool.address, ("foo.com", 1234) + with self.assertRaises(AutoReconnect): + with cx_pool.checkout(): + pass + + # Back to normal, semaphore was correctly released. + cx_pool.address = address + with cx_pool.checkout(): + pass + + @client_context.require_failCommand_appName + def test_pool_backoff_preserves_existing_connections(self): + client = self.rs_or_single_client() + coll = self.db.t + pool = get_pool(client) + coll.insert_many([{"x": 1} for _ in range(10)]) + t = SocketGetter(self.c, pool) + t.start() + while t.state != "connection": + time.sleep(0.1) + + assert not t.sock.conn_closed() + + # Mock a session establishment overload. + mock_connection_fail = { + "configureFailPoint": "failCommand", + "mode": {"times": 1}, + "data": { + "closeConnection": True, + }, + } + + with self.fail_point(mock_connection_fail): + coll.find_one({}) + + # Make sure the pool is out of backoff state. + assert pool._backoff == 0 + + # Make sure the existing socket was not affected. + assert not t.sock.conn_closed() + + # Cleanup + t.release_conn() + t.join() + pool.close() + + def test_pool_backoff_limits_maxConnecting(self): + client = self.rs_or_single_client(maxConnecting=10) + pool = get_pool(client) + assert pool.max_connecting == 10 + pool._backoff = 1 + assert pool.max_connecting == 1 + pool._backoff = 0 + assert pool.max_connecting == 10 + client.close() + class TestPoolMaxSize(_TestPoolingBase): def test_max_pool_size(self): diff --git a/tools/synchro.py b/tools/synchro.py index 44698134cd..968d0e362f 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -341,7 +341,7 @@ def translate_async_sleeps(lines: list[str]) -> list[str]: sleeps = [line for line in lines if "asyncio.sleep" in line] for line in sleeps: - res = re.search(r"asyncio.sleep\(([^()]*)\)", line) + res = re.search(r"asyncio\.sleep\(\s*(.*?)\)", line) if res: old = res[0] index = lines.index(line)