From d6137453ac221da5ec0402a858582a165f6177e2 Mon Sep 17 00:00:00 2001 From: Siwen Wang <27719690+Mirac1eSky@users.noreply.github.com> Date: Tue, 28 Apr 2026 18:50:26 +0530 Subject: [PATCH] fix(gateway): drain stale httpx polling connections on Telegram reconnect Network errors through proxies (e.g. sing-box) can leave httpx connections in a half-closed state occupying pool slots. After enough reconnect cycles the 256-connection default fills up entirely, causing Pool timeout: All connections in the connection pool are occupied. Fix: cycle only the getUpdates request object (_request[0]) via shut-down + re-initialize before restarting polling. This drains stale connections without touching the general request (_request[1]) that concurrent send_message / edit_message calls rely on. The drain is applied to both _handle_polling_network_error and _handle_polling_conflict reconnect paths via a shared _drain_polling_connections() helper. Failures in the drain are swallowed so reconnect always proceeds. Based on #16466 by @Mirac1eSky. --- gateway/platforms/telegram.py | 46 +++++++ .../test_telegram_network_reconnect.py | 124 ++++++++++++++++++ 2 files changed, 170 insertions(+) diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index b08971469..09a70ccf5 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -369,6 +369,49 @@ class TelegramAdapter(BasePlatformAdapter): return {"link_preview_options": LinkPreviewOptions(is_disabled=True)} return {"disable_web_page_preview": True} + async def _drain_polling_connections(self) -> None: + """Reset the httpx connection pool used for getUpdates polling. + + Network errors (especially through proxies like sing-box) can leave + httpx connections in a half-closed state that still occupy pool slots. + After enough reconnect cycles the pool fills up entirely, causing + ``Pool timeout: All connections in the connection pool are occupied.`` + + We reset ONLY ``_request[0]`` (the getUpdates request) — the general + request (``_request[1]``) is left untouched so concurrent + ``send_message`` / ``edit_message`` calls are never interrupted. + + Implementation note: accesses ``Bot._request[0]`` which is the + get-updates ``BaseRequest`` in the PTB 22.x internal tuple + ``(get_updates_request, general_request)``. There is no public + accessor for the polling request; review if upgrading to PTB 23+. + """ + if not (self._app and self._app.bot): + return + try: + # PTB 22.x: _request is a (get_updates, general) tuple; + # no public accessor exists for the polling request. + polling_req = self._app.bot._request[0] # noqa: SLF001 + except Exception: + return + try: + await polling_req.shutdown() + except Exception: + logger.debug( + "[%s] Polling request shutdown failed (non-fatal)", + self.name, exc_info=True, + ) + try: + await polling_req.initialize() + logger.debug( + "[%s] Polling request pool drained before reconnect", self.name + ) + except Exception: + logger.debug( + "[%s] Polling request re-initialize failed (non-fatal)", + self.name, exc_info=True, + ) + async def _handle_polling_network_error(self, error: Exception) -> None: """Reconnect polling after a transient network interruption. @@ -414,6 +457,8 @@ class TelegramAdapter(BasePlatformAdapter): except Exception: pass + await self._drain_polling_connections() + try: await self._app.updater.start_polling( allowed_updates=Update.ALL_TYPES, @@ -461,6 +506,7 @@ class TelegramAdapter(BasePlatformAdapter): except Exception: pass await asyncio.sleep(RETRY_DELAY) + await self._drain_polling_connections() try: await self._app.updater.start_polling( allowed_updates=Update.ALL_TYPES, diff --git a/tests/gateway/test_telegram_network_reconnect.py b/tests/gateway/test_telegram_network_reconnect.py index f78a7f208..532639b2d 100644 --- a/tests/gateway/test_telegram_network_reconnect.py +++ b/tests/gateway/test_telegram_network_reconnect.py @@ -160,3 +160,127 @@ async def test_reconnect_triggers_fatal_after_max_retries(): assert adapter.has_fatal_error assert adapter.fatal_error_code == "telegram_network_error" fatal_handler.assert_called_once() + + +# --------------------------------------------------------------------------- +# Connection pool drain tests (PR #16466 salvage) +# --------------------------------------------------------------------------- + +def _make_mock_app(): + """Build a mock Application with an explicit polling request object.""" + mock_polling_req = AsyncMock() + mock_polling_req.shutdown = AsyncMock() + mock_polling_req.initialize = AsyncMock() + + mock_bot = MagicMock() + mock_bot._request = (mock_polling_req, MagicMock()) # (getUpdates, general) + + mock_updater = MagicMock() + mock_updater.running = True + mock_updater.stop = AsyncMock() + mock_updater.start_polling = AsyncMock() + + mock_app = MagicMock() + mock_app.updater = mock_updater + mock_app.bot = mock_bot + return mock_app, mock_polling_req + + +@pytest.mark.asyncio +async def test_reconnect_drains_polling_request_only(): + """During reconnect, only the polling request (_request[0]) must be cycled. + + The general request (_request[1]) must NOT be touched — doing so would + break concurrent send_message / edit_message calls. + """ + adapter = _make_adapter() + adapter._polling_network_error_count = 1 + + mock_app, mock_polling_req = _make_mock_app() + adapter._app = mock_app + + general_req = mock_app.bot._request[1] + + with patch("asyncio.sleep", new_callable=AsyncMock): + await adapter._handle_polling_network_error(Exception("Bad Gateway")) + + # Polling request must be shut down and re-initialized + mock_polling_req.shutdown.assert_called_once() + mock_polling_req.initialize.assert_called_once() + + # General request must NOT be touched + general_req.shutdown.assert_not_called() + general_req.initialize.assert_not_called() + + # Reconnect must still succeed + mock_app.updater.start_polling.assert_called_once() + assert adapter._polling_network_error_count == 0 + + +@pytest.mark.asyncio +async def test_reconnect_continues_if_drain_fails(): + """If the polling request drain raises, start_polling must still proceed.""" + adapter = _make_adapter() + adapter._polling_network_error_count = 1 + + mock_app, mock_polling_req = _make_mock_app() + # Both shutdown and initialize fail + mock_polling_req.shutdown = AsyncMock(side_effect=Exception("shutdown boom")) + mock_polling_req.initialize = AsyncMock(side_effect=Exception("init boom")) + adapter._app = mock_app + + with patch("asyncio.sleep", new_callable=AsyncMock): + await adapter._handle_polling_network_error(Exception("Bad Gateway")) + + # start_polling must still be called despite drain failure + mock_app.updater.start_polling.assert_called_once() + assert adapter._polling_network_error_count == 0 + + +@pytest.mark.asyncio +async def test_initialize_still_runs_when_shutdown_fails(): + """If shutdown() raises, initialize() must still be attempted. + + This prevents a failed shutdown from leaving the request pool in a + permanently closed state. + """ + adapter = _make_adapter() + adapter._polling_network_error_count = 1 + + mock_app, mock_polling_req = _make_mock_app() + mock_polling_req.shutdown = AsyncMock(side_effect=Exception("shutdown boom")) + adapter._app = mock_app + + with patch("asyncio.sleep", new_callable=AsyncMock): + await adapter._handle_polling_network_error(Exception("Bad Gateway")) + + # initialize MUST be called even though shutdown raised + mock_polling_req.initialize.assert_called_once() + mock_app.updater.start_polling.assert_called_once() + + +@pytest.mark.asyncio +async def test_conflict_retry_also_drains_polling_connections(): + """_handle_polling_conflict must also drain the polling pool on retry.""" + adapter = _make_adapter() + adapter._polling_conflict_count = 0 + + mock_app, mock_polling_req = _make_mock_app() + adapter._app = mock_app + + with patch("asyncio.sleep", new_callable=AsyncMock): + await adapter._handle_polling_conflict(Exception("Conflict: terminated by other getUpdates")) + + # Polling request must be drained during conflict retry too + mock_polling_req.shutdown.assert_called_once() + mock_polling_req.initialize.assert_called_once() + mock_app.updater.start_polling.assert_called_once() + + +@pytest.mark.asyncio +async def test_drain_helper_noop_without_app(): + """_drain_polling_connections must be a no-op when _app is None.""" + adapter = _make_adapter() + adapter._app = None + # Should not raise + await adapter._drain_polling_connections()