fix(gateway): drain stale httpx polling connections on Telegram reconnect
Network errors through proxies (e.g. sing-box) can leave httpx connections in a half-closed state occupying pool slots. After enough reconnect cycles the 256-connection default fills up entirely, causing Pool timeout: All connections in the connection pool are occupied. Fix: cycle only the getUpdates request object (_request[0]) via shut-down + re-initialize before restarting polling. This drains stale connections without touching the general request (_request[1]) that concurrent send_message / edit_message calls rely on. The drain is applied to both _handle_polling_network_error and _handle_polling_conflict reconnect paths via a shared _drain_polling_connections() helper. Failures in the drain are swallowed so reconnect always proceeds. Based on #16466 by @Mirac1eSky.
This commit is contained in:
@@ -369,6 +369,49 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
return {"link_preview_options": LinkPreviewOptions(is_disabled=True)}
|
||||
return {"disable_web_page_preview": True}
|
||||
|
||||
async def _drain_polling_connections(self) -> None:
|
||||
"""Reset the httpx connection pool used for getUpdates polling.
|
||||
|
||||
Network errors (especially through proxies like sing-box) can leave
|
||||
httpx connections in a half-closed state that still occupy pool slots.
|
||||
After enough reconnect cycles the pool fills up entirely, causing
|
||||
``Pool timeout: All connections in the connection pool are occupied.``
|
||||
|
||||
We reset ONLY ``_request[0]`` (the getUpdates request) — the general
|
||||
request (``_request[1]``) is left untouched so concurrent
|
||||
``send_message`` / ``edit_message`` calls are never interrupted.
|
||||
|
||||
Implementation note: accesses ``Bot._request[0]`` which is the
|
||||
get-updates ``BaseRequest`` in the PTB 22.x internal tuple
|
||||
``(get_updates_request, general_request)``. There is no public
|
||||
accessor for the polling request; review if upgrading to PTB 23+.
|
||||
"""
|
||||
if not (self._app and self._app.bot):
|
||||
return
|
||||
try:
|
||||
# PTB 22.x: _request is a (get_updates, general) tuple;
|
||||
# no public accessor exists for the polling request.
|
||||
polling_req = self._app.bot._request[0] # noqa: SLF001
|
||||
except Exception:
|
||||
return
|
||||
try:
|
||||
await polling_req.shutdown()
|
||||
except Exception:
|
||||
logger.debug(
|
||||
"[%s] Polling request shutdown failed (non-fatal)",
|
||||
self.name, exc_info=True,
|
||||
)
|
||||
try:
|
||||
await polling_req.initialize()
|
||||
logger.debug(
|
||||
"[%s] Polling request pool drained before reconnect", self.name
|
||||
)
|
||||
except Exception:
|
||||
logger.debug(
|
||||
"[%s] Polling request re-initialize failed (non-fatal)",
|
||||
self.name, exc_info=True,
|
||||
)
|
||||
|
||||
async def _handle_polling_network_error(self, error: Exception) -> None:
|
||||
"""Reconnect polling after a transient network interruption.
|
||||
|
||||
@@ -414,6 +457,8 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
await self._drain_polling_connections()
|
||||
|
||||
try:
|
||||
await self._app.updater.start_polling(
|
||||
allowed_updates=Update.ALL_TYPES,
|
||||
@@ -461,6 +506,7 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
except Exception:
|
||||
pass
|
||||
await asyncio.sleep(RETRY_DELAY)
|
||||
await self._drain_polling_connections()
|
||||
try:
|
||||
await self._app.updater.start_polling(
|
||||
allowed_updates=Update.ALL_TYPES,
|
||||
|
||||
@@ -160,3 +160,127 @@ async def test_reconnect_triggers_fatal_after_max_retries():
|
||||
assert adapter.has_fatal_error
|
||||
assert adapter.fatal_error_code == "telegram_network_error"
|
||||
fatal_handler.assert_called_once()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Connection pool drain tests (PR #16466 salvage)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _make_mock_app():
|
||||
"""Build a mock Application with an explicit polling request object."""
|
||||
mock_polling_req = AsyncMock()
|
||||
mock_polling_req.shutdown = AsyncMock()
|
||||
mock_polling_req.initialize = AsyncMock()
|
||||
|
||||
mock_bot = MagicMock()
|
||||
mock_bot._request = (mock_polling_req, MagicMock()) # (getUpdates, general)
|
||||
|
||||
mock_updater = MagicMock()
|
||||
mock_updater.running = True
|
||||
mock_updater.stop = AsyncMock()
|
||||
mock_updater.start_polling = AsyncMock()
|
||||
|
||||
mock_app = MagicMock()
|
||||
mock_app.updater = mock_updater
|
||||
mock_app.bot = mock_bot
|
||||
return mock_app, mock_polling_req
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reconnect_drains_polling_request_only():
|
||||
"""During reconnect, only the polling request (_request[0]) must be cycled.
|
||||
|
||||
The general request (_request[1]) must NOT be touched — doing so would
|
||||
break concurrent send_message / edit_message calls.
|
||||
"""
|
||||
adapter = _make_adapter()
|
||||
adapter._polling_network_error_count = 1
|
||||
|
||||
mock_app, mock_polling_req = _make_mock_app()
|
||||
adapter._app = mock_app
|
||||
|
||||
general_req = mock_app.bot._request[1]
|
||||
|
||||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||||
await adapter._handle_polling_network_error(Exception("Bad Gateway"))
|
||||
|
||||
# Polling request must be shut down and re-initialized
|
||||
mock_polling_req.shutdown.assert_called_once()
|
||||
mock_polling_req.initialize.assert_called_once()
|
||||
|
||||
# General request must NOT be touched
|
||||
general_req.shutdown.assert_not_called()
|
||||
general_req.initialize.assert_not_called()
|
||||
|
||||
# Reconnect must still succeed
|
||||
mock_app.updater.start_polling.assert_called_once()
|
||||
assert adapter._polling_network_error_count == 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reconnect_continues_if_drain_fails():
|
||||
"""If the polling request drain raises, start_polling must still proceed."""
|
||||
adapter = _make_adapter()
|
||||
adapter._polling_network_error_count = 1
|
||||
|
||||
mock_app, mock_polling_req = _make_mock_app()
|
||||
# Both shutdown and initialize fail
|
||||
mock_polling_req.shutdown = AsyncMock(side_effect=Exception("shutdown boom"))
|
||||
mock_polling_req.initialize = AsyncMock(side_effect=Exception("init boom"))
|
||||
adapter._app = mock_app
|
||||
|
||||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||||
await adapter._handle_polling_network_error(Exception("Bad Gateway"))
|
||||
|
||||
# start_polling must still be called despite drain failure
|
||||
mock_app.updater.start_polling.assert_called_once()
|
||||
assert adapter._polling_network_error_count == 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_initialize_still_runs_when_shutdown_fails():
|
||||
"""If shutdown() raises, initialize() must still be attempted.
|
||||
|
||||
This prevents a failed shutdown from leaving the request pool in a
|
||||
permanently closed state.
|
||||
"""
|
||||
adapter = _make_adapter()
|
||||
adapter._polling_network_error_count = 1
|
||||
|
||||
mock_app, mock_polling_req = _make_mock_app()
|
||||
mock_polling_req.shutdown = AsyncMock(side_effect=Exception("shutdown boom"))
|
||||
adapter._app = mock_app
|
||||
|
||||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||||
await adapter._handle_polling_network_error(Exception("Bad Gateway"))
|
||||
|
||||
# initialize MUST be called even though shutdown raised
|
||||
mock_polling_req.initialize.assert_called_once()
|
||||
mock_app.updater.start_polling.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_conflict_retry_also_drains_polling_connections():
|
||||
"""_handle_polling_conflict must also drain the polling pool on retry."""
|
||||
adapter = _make_adapter()
|
||||
adapter._polling_conflict_count = 0
|
||||
|
||||
mock_app, mock_polling_req = _make_mock_app()
|
||||
adapter._app = mock_app
|
||||
|
||||
with patch("asyncio.sleep", new_callable=AsyncMock):
|
||||
await adapter._handle_polling_conflict(Exception("Conflict: terminated by other getUpdates"))
|
||||
|
||||
# Polling request must be drained during conflict retry too
|
||||
mock_polling_req.shutdown.assert_called_once()
|
||||
mock_polling_req.initialize.assert_called_once()
|
||||
mock_app.updater.start_polling.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_drain_helper_noop_without_app():
|
||||
"""_drain_polling_connections must be a no-op when _app is None."""
|
||||
adapter = _make_adapter()
|
||||
adapter._app = None
|
||||
# Should not raise
|
||||
await adapter._drain_polling_connections()
|
||||
|
||||
Reference in New Issue
Block a user