From 0cc7f79016cab874da869587db09f52f7330ce2d Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Mon, 13 Apr 2026 19:22:43 -0700 Subject: [PATCH] fix(streaming): prevent duplicate Telegram replies when stream task is cancelled (#9319) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the 5-second stream_task timeout in gateway/run.py expires (due to slow Telegram API calls from rate limiting after several messages), the stream consumer is cancelled via asyncio.CancelledError. The CancelledError handler did a best-effort final edit but never set final_response_sent, so the gateway fell through to the normal send path and delivered the full response again as a reply — causing a duplicate. The fix: in the CancelledError handler, set final_response_sent = True when already_sent is True (i.e., the stream consumer had already delivered content to the user). This tells the gateway's already_sent check that the response was delivered, preventing the duplicate send. Adds two tests verifying the cancellation behavior: - Cancelled with already_sent=True → final_response_sent=True (no dup) - Cancelled with already_sent=False → final_response_sent=False (normal send path proceeds) Reported by community user hume on Discord. --- gateway/stream_consumer.py | 8 +++ tests/gateway/test_stream_consumer.py | 81 +++++++++++++++++++++++++++ 2 files changed, 89 insertions(+) diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index e743df8d5..240084e9b 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -280,6 +280,14 @@ class GatewayStreamConsumer: await self._send_or_edit(self._accumulated) except Exception: pass + # If we delivered any content before being cancelled, mark the + # final response as sent so the gateway's already_sent check + # doesn't trigger a duplicate message. The 5-second + # stream_task timeout (gateway/run.py) can cancel us while + # waiting on a slow Telegram API call — without this flag the + # gateway falls through to the normal send path. + if self._already_sent: + self._final_response_sent = True except Exception as e: logger.error("Stream consumer error: %s", e) diff --git a/tests/gateway/test_stream_consumer.py b/tests/gateway/test_stream_consumer.py index d66306722..d8a1be2d2 100644 --- a/tests/gateway/test_stream_consumer.py +++ b/tests/gateway/test_stream_consumer.py @@ -599,3 +599,84 @@ class TestInterimCommentaryMessages: assert sent_texts == ["Hello ▉", "world"] assert consumer.already_sent is True assert consumer.final_response_sent is True + + +class TestCancelledConsumerSetsFlags: + """Cancellation must set final_response_sent when already_sent is True. + + The 5-second stream_task timeout in gateway/run.py can cancel the + consumer while it's still processing. If final_response_sent stays + False, the gateway falls through to the normal send path and the + user sees a duplicate message. + """ + + @pytest.mark.asyncio + async def test_cancelled_with_already_sent_marks_final_response_sent(self): + """Cancelling after content was sent should set final_response_sent.""" + adapter = MagicMock() + adapter.send = AsyncMock( + return_value=SimpleNamespace(success=True, message_id="msg_1") + ) + adapter.edit_message = AsyncMock( + return_value=SimpleNamespace(success=True) + ) + adapter.MAX_MESSAGE_LENGTH = 4096 + + consumer = GatewayStreamConsumer( + adapter, + "chat_123", + StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5), + ) + + # Stream some text — the consumer sends it and sets already_sent + consumer.on_delta("Hello world") + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.08) + + assert consumer.already_sent is True + + # Cancel the task (simulates the 5-second timeout in gateway) + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + # The fix: final_response_sent should be True even though _DONE + # was never processed, preventing a duplicate message. + assert consumer.final_response_sent is True + + @pytest.mark.asyncio + async def test_cancelled_without_any_sends_does_not_mark_final(self): + """Cancelling before anything was sent should NOT set final_response_sent.""" + adapter = MagicMock() + adapter.send = AsyncMock( + return_value=SimpleNamespace(success=False, message_id=None) + ) + adapter.edit_message = AsyncMock( + return_value=SimpleNamespace(success=True) + ) + adapter.MAX_MESSAGE_LENGTH = 4096 + + consumer = GatewayStreamConsumer( + adapter, + "chat_123", + StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5), + ) + + # Send fails — already_sent stays False + consumer.on_delta("x") + task = asyncio.create_task(consumer.run()) + await asyncio.sleep(0.08) + + assert consumer.already_sent is False + + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + # Without a successful send, final_response_sent should stay False + # so the normal gateway send path can deliver the response. + assert consumer.final_response_sent is False