fix(gateway): wait for reconnection before dropping WebSocket sends
When a WebSocket-based platform adapter (e.g. QQ Bot) temporarily loses its connection, send() now polls is_connected for up to 15s instead of immediately returning a non-retryable failure. If the auto-reconnect completes within the window, the message is delivered normally. On timeout, the SendResult is marked retryable=True so the base class retry mechanism can attempt re-delivery. Same treatment applied to _send_media(). Adds 4 async tests covering: - Successful send after simulated reconnection - Retryable failure on timeout - Immediate success when already connected - _send_media reconnection wait Fixes #11163
This commit is contained in:
@@ -1535,6 +1535,33 @@ class QQAdapter(BasePlatformAdapter):
|
||||
|
||||
raise last_exc # type: ignore[misc]
|
||||
|
||||
# Maximum time (seconds) to wait for reconnection before giving up on send.
|
||||
_RECONNECT_WAIT_SECONDS = 15.0
|
||||
# How often (seconds) to poll is_connected while waiting.
|
||||
_RECONNECT_POLL_INTERVAL = 0.5
|
||||
|
||||
async def _wait_for_reconnection(self) -> bool:
|
||||
"""Wait for the WebSocket listener to reconnect.
|
||||
|
||||
The listener loop (_listen_loop) auto-reconnects on disconnect, but
|
||||
there is a race window where send() is called right after a disconnect
|
||||
and before the reconnect completes. This method polls is_connected
|
||||
for up to _RECONNECT_WAIT_SECONDS.
|
||||
|
||||
Returns True if reconnected, False if still disconnected.
|
||||
"""
|
||||
logger.info("[%s] Not connected — waiting for reconnection (up to %.0fs)",
|
||||
self.name, self._RECONNECT_WAIT_SECONDS)
|
||||
waited = 0.0
|
||||
while waited < self._RECONNECT_WAIT_SECONDS:
|
||||
await asyncio.sleep(self._RECONNECT_POLL_INTERVAL)
|
||||
waited += self._RECONNECT_POLL_INTERVAL
|
||||
if self.is_connected:
|
||||
logger.info("[%s] Reconnected after %.1fs", self.name, waited)
|
||||
return True
|
||||
logger.warning("[%s] Still not connected after %.0fs", self.name, self._RECONNECT_WAIT_SECONDS)
|
||||
return False
|
||||
|
||||
async def send(
|
||||
self,
|
||||
chat_id: str,
|
||||
@@ -1550,7 +1577,8 @@ class QQAdapter(BasePlatformAdapter):
|
||||
del metadata
|
||||
|
||||
if not self.is_connected:
|
||||
return SendResult(success=False, error="Not connected")
|
||||
if not await self._wait_for_reconnection():
|
||||
return SendResult(success=False, error="Not connected", retryable=True)
|
||||
|
||||
if not content or not content.strip():
|
||||
return SendResult(success=True)
|
||||
@@ -1751,7 +1779,8 @@ class QQAdapter(BasePlatformAdapter):
|
||||
) -> SendResult:
|
||||
"""Upload media and send as a native message."""
|
||||
if not self.is_connected:
|
||||
return SendResult(success=False, error="Not connected")
|
||||
if not await self._wait_for_reconnection():
|
||||
return SendResult(success=False, error="Not connected", retryable=True)
|
||||
|
||||
try:
|
||||
# Resolve media source
|
||||
|
||||
@@ -500,3 +500,85 @@ class TestBuildTextBody:
|
||||
adapter = self._make_adapter(app_id="a", client_secret="b", markdown_support=False)
|
||||
body = adapter._build_text_body("reply text", reply_to="msg_123")
|
||||
assert body.get("message_reference", {}).get("message_id") == "msg_123"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _wait_for_reconnection / send reconnection wait
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestWaitForReconnection:
|
||||
"""Test that send() waits for reconnection instead of silently dropping."""
|
||||
|
||||
def _make_adapter(self, **extra):
|
||||
from gateway.platforms.qqbot import QQAdapter
|
||||
return QQAdapter(_make_config(**extra))
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_waits_and_succeeds_on_reconnect(self):
|
||||
"""send() should wait for reconnection and then deliver the message."""
|
||||
adapter = self._make_adapter(app_id="a", client_secret="b")
|
||||
# Initially disconnected
|
||||
adapter._running = False
|
||||
adapter._http_client = mock.MagicMock()
|
||||
|
||||
# Simulate reconnection after 0.3s (faster than real interval)
|
||||
async def fake_api_request(*args, **kwargs):
|
||||
return {"id": "msg_123"}
|
||||
|
||||
adapter._api_request = fake_api_request
|
||||
adapter._ensure_token = mock.AsyncMock()
|
||||
adapter._RECONNECT_POLL_INTERVAL = 0.1
|
||||
adapter._RECONNECT_WAIT_SECONDS = 5.0
|
||||
|
||||
# Schedule reconnection after a short delay
|
||||
async def reconnect_after_delay():
|
||||
await asyncio.sleep(0.3)
|
||||
adapter._running = True
|
||||
|
||||
asyncio.get_event_loop().create_task(reconnect_after_delay())
|
||||
|
||||
result = await adapter.send("test_openid", "Hello, world!")
|
||||
assert result.success
|
||||
assert result.message_id == "msg_123"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_returns_retryable_after_timeout(self):
|
||||
"""send() should return retryable=True if reconnection takes too long."""
|
||||
adapter = self._make_adapter(app_id="a", client_secret="b")
|
||||
adapter._running = False
|
||||
adapter._RECONNECT_POLL_INTERVAL = 0.05
|
||||
adapter._RECONNECT_WAIT_SECONDS = 0.2
|
||||
|
||||
result = await adapter.send("test_openid", "Hello, world!")
|
||||
assert not result.success
|
||||
assert result.retryable is True
|
||||
assert "Not connected" in result.error
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_succeeds_immediately_when_connected(self):
|
||||
"""send() should not wait when already connected."""
|
||||
adapter = self._make_adapter(app_id="a", client_secret="b")
|
||||
adapter._running = True
|
||||
adapter._http_client = mock.MagicMock()
|
||||
|
||||
async def fake_api_request(*args, **kwargs):
|
||||
return {"id": "msg_immediate"}
|
||||
|
||||
adapter._api_request = fake_api_request
|
||||
|
||||
result = await adapter.send("test_openid", "Hello!")
|
||||
assert result.success
|
||||
assert result.message_id == "msg_immediate"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_media_waits_for_reconnect(self):
|
||||
"""_send_media should also wait for reconnection."""
|
||||
adapter = self._make_adapter(app_id="a", client_secret="b")
|
||||
adapter._running = False
|
||||
adapter._RECONNECT_POLL_INTERVAL = 0.05
|
||||
adapter._RECONNECT_WAIT_SECONDS = 0.2
|
||||
|
||||
result = await adapter._send_media("test_openid", "http://example.com/img.jpg", 1, "image")
|
||||
assert not result.success
|
||||
assert result.retryable is True
|
||||
assert "Not connected" in result.error
|
||||
|
||||
Reference in New Issue
Block a user