From 2546b7acea9b294429396e9196374127acd71024 Mon Sep 17 00:00:00 2001 From: Teknium Date: Wed, 15 Apr 2026 03:31:08 -0700 Subject: [PATCH] fix(gateway): suppress duplicate replies on interrupt and streaming flood control MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three fixes for the duplicate reply bug affecting all gateway platforms: 1. base.py: Suppress stale response when the session was interrupted by a new message that hasn't been consumed yet. Checks both interrupt_event and _pending_messages to avoid false positives. (#8221, #2483) 2. run.py (return path): Remove response_previewed guard from already_sent check. Stream consumer's already_sent alone is authoritative — if content was delivered via streaming, the duplicate send must be suppressed regardless of the agent's response_previewed flag. (#8375) 3. run.py (queued-message path): Same fix — already_sent without response_previewed now correctly marks the first response as already streamed, preventing re-send before processing the queued message. The response_previewed field is still produced by the agent (run_agent.py) but is no longer required as a gate for duplicate suppression. The stream consumer's already_sent flag is the delivery-level truth about what the user actually saw. Concepts from PR #8380 (konsisumer). Closes #8375, #8221, #2483. --- gateway/platforms/base.py | 15 + gateway/run.py | 12 +- .../test_duplicate_reply_suppression.py | 291 ++++++++++++++++++ 3 files changed, 308 insertions(+), 10 deletions(-) create mode 100644 tests/gateway/test_duplicate_reply_suppression.py diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index f7943da47..1561cd526 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -1624,6 +1624,21 @@ class BasePlatformAdapter(ABC): # streaming already delivered the text (already_sent=True) or # when the message was queued behind an active agent. Log at # DEBUG to avoid noisy warnings for expected behavior. + # + # Suppress stale response when the session was interrupted by a + # new message that hasn't been consumed yet. The pending message + # is processed by the pending-message handler below (#8221/#2483). + if ( + response + and interrupt_event.is_set() + and session_key in self._pending_messages + ): + logger.info( + "[%s] Suppressing stale response for interrupted session %s", + self.name, + session_key, + ) + response = None if not response: logger.debug("[%s] Handler returned empty/None response for %s", self.name, event.source.chat_id) if response: diff --git a/gateway/run.py b/gateway/run.py index d360d453f..2eb745f92 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -9231,15 +9231,11 @@ class GatewayRunner: pass except Exception as e: logger.debug("Stream consumer wait before queued message failed: %s", e) - _response_previewed = bool(result.get("response_previewed")) _already_streamed = bool( _sc and ( getattr(_sc, "final_response_sent", False) - or ( - _response_previewed - and getattr(_sc, "already_sent", False) - ) + or getattr(_sc, "already_sent", False) ) ) first_response = result.get("final_response", "") @@ -9323,13 +9319,9 @@ class GatewayRunner: # them even if streaming had sent earlier partial output. _sc = stream_consumer_holder[0] if _sc and isinstance(response, dict) and not response.get("failed"): - _response_previewed = bool(response.get("response_previewed")) if ( getattr(_sc, "final_response_sent", False) - or ( - _response_previewed - and getattr(_sc, "already_sent", False) - ) + or getattr(_sc, "already_sent", False) ): response["already_sent"] = True diff --git a/tests/gateway/test_duplicate_reply_suppression.py b/tests/gateway/test_duplicate_reply_suppression.py new file mode 100644 index 000000000..5a0ea02f3 --- /dev/null +++ b/tests/gateway/test_duplicate_reply_suppression.py @@ -0,0 +1,291 @@ +"""Tests for duplicate reply suppression across the gateway stack. + +Covers three fix paths: + 1. base.py: stale response suppressed when interrupt_event is set and a + pending message exists (#8221 / #2483) + 2. run.py return path: already_sent propagated from stream consumer's + already_sent flag without requiring response_previewed (#8375) + 3. run.py queued-message path: first response correctly detected as + already-streamed when already_sent is True without response_previewed +""" + +import asyncio +from types import SimpleNamespace +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from gateway.config import Platform, PlatformConfig +from gateway.platforms.base import ( + BasePlatformAdapter, + MessageEvent, + MessageType, + ProcessingOutcome, + SendResult, +) +from gateway.session import SessionSource, build_session_key + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +class StubAdapter(BasePlatformAdapter): + """Minimal concrete adapter for testing.""" + + def __init__(self): + super().__init__(PlatformConfig(enabled=True, token="fake"), Platform.DISCORD) + self.sent = [] + + async def connect(self): + return True + + async def disconnect(self): + pass + + async def send(self, chat_id, content, reply_to=None, metadata=None): + self.sent.append({"chat_id": chat_id, "content": content}) + return SendResult(success=True, message_id="msg1") + + async def send_typing(self, chat_id, metadata=None): + pass + + async def get_chat_info(self, chat_id): + return {"id": chat_id} + + +def _make_event(text="hello", chat_id="c1", user_id="u1"): + return MessageEvent( + text=text, + source=SessionSource( + platform=Platform.DISCORD, + chat_id=chat_id, + chat_type="dm", + user_id=user_id, + ), + message_id="m1", + ) + + +# =================================================================== +# Test 1: base.py — stale response suppressed on interrupt (#8221) +# =================================================================== + +class TestBaseInterruptSuppression: + @pytest.mark.asyncio + async def test_stale_response_suppressed_when_interrupted(self): + """When interrupt_event is set AND a pending message exists, + base.py should suppress the stale response instead of sending it.""" + adapter = StubAdapter() + + stale_response = "This is the stale answer to the first question." + pending_response = "This is the answer to the second question." + call_count = 0 + + async def fake_handler(event): + nonlocal call_count + call_count += 1 + if call_count == 1: + return stale_response + return pending_response + + adapter.set_message_handler(fake_handler) + + event_a = _make_event(text="first question") + session_key = build_session_key(event_a.source) + + # Simulate: message A is being processed, message B arrives + # The interrupt event is set and B is in pending_messages + interrupt_event = asyncio.Event() + interrupt_event.set() + adapter._active_sessions[session_key] = interrupt_event + + event_b = _make_event(text="second question") + adapter._pending_messages[session_key] = event_b + + await adapter._process_message_background(event_a, session_key) + + # The stale response should NOT have been sent. + stale_sends = [s for s in adapter.sent if s["content"] == stale_response] + assert len(stale_sends) == 0, ( + f"Stale response was sent {len(stale_sends)} time(s) — should be suppressed" + ) + # The pending message's response SHOULD have been sent. + pending_sends = [s for s in adapter.sent if s["content"] == pending_response] + assert len(pending_sends) == 1, "Pending message response should be sent" + + @pytest.mark.asyncio + async def test_response_not_suppressed_without_interrupt(self): + """Normal case: no interrupt, response should be sent.""" + adapter = StubAdapter() + + async def fake_handler(event): + return "Normal response" + + adapter.set_message_handler(fake_handler) + event = _make_event() + session_key = build_session_key(event.source) + + await adapter._process_message_background(event, session_key) + + assert any(s["content"] == "Normal response" for s in adapter.sent) + + @pytest.mark.asyncio + async def test_response_not_suppressed_with_interrupt_but_no_pending(self): + """Interrupt event set but no pending message (race already resolved) — + response should still be sent.""" + adapter = StubAdapter() + + async def fake_handler(event): + return "Valid response" + + adapter.set_message_handler(fake_handler) + event = _make_event() + session_key = build_session_key(event.source) + + # Set interrupt but no pending message + interrupt_event = asyncio.Event() + interrupt_event.set() + adapter._active_sessions[session_key] = interrupt_event + + await adapter._process_message_background(event, session_key) + + assert any(s["content"] == "Valid response" for s in adapter.sent) + + +# =================================================================== +# Test 2: run.py — already_sent without response_previewed (#8375) +# =================================================================== + +class TestAlreadySentWithoutResponsePreviewed: + """The already_sent flag on the response dict should be set when the + stream consumer's already_sent is True, even if response_previewed is + False. This prevents duplicate sends when streaming was interrupted + by flood control.""" + + def _make_mock_stream_consumer(self, already_sent=False, final_response_sent=False): + sc = SimpleNamespace( + already_sent=already_sent, + final_response_sent=final_response_sent, + ) + return sc + + def test_already_sent_set_without_response_previewed(self): + """Stream consumer already_sent=True should propagate to response + dict even when response_previewed is False.""" + sc = self._make_mock_stream_consumer(already_sent=True, final_response_sent=False) + response = {"final_response": "text", "response_previewed": False} + + # Reproduce the logic from run.py return path (post-fix) + if sc and isinstance(response, dict) and not response.get("failed"): + if ( + getattr(sc, "final_response_sent", False) + or getattr(sc, "already_sent", False) + ): + response["already_sent"] = True + + assert response.get("already_sent") is True + + def test_already_sent_not_set_when_nothing_sent(self): + """When stream consumer hasn't sent anything, already_sent should + not be set on the response.""" + sc = self._make_mock_stream_consumer(already_sent=False, final_response_sent=False) + response = {"final_response": "text", "response_previewed": False} + + if sc and isinstance(response, dict) and not response.get("failed"): + if ( + getattr(sc, "final_response_sent", False) + or getattr(sc, "already_sent", False) + ): + response["already_sent"] = True + + assert "already_sent" not in response + + def test_already_sent_set_on_final_response_sent(self): + """final_response_sent=True should still work as before.""" + sc = self._make_mock_stream_consumer(already_sent=False, final_response_sent=True) + response = {"final_response": "text"} + + if sc and isinstance(response, dict) and not response.get("failed"): + if ( + getattr(sc, "final_response_sent", False) + or getattr(sc, "already_sent", False) + ): + response["already_sent"] = True + + assert response.get("already_sent") is True + + def test_already_sent_not_set_on_failed_response(self): + """Failed responses should never be suppressed — user needs to see + the error message even if streaming sent earlier partial output.""" + sc = self._make_mock_stream_consumer(already_sent=True, final_response_sent=False) + response = {"final_response": "Error: something broke", "failed": True} + + if sc and isinstance(response, dict) and not response.get("failed"): + if ( + getattr(sc, "final_response_sent", False) + or getattr(sc, "already_sent", False) + ): + response["already_sent"] = True + + assert "already_sent" not in response + + +# =================================================================== +# Test 3: run.py queued-message path — _already_streamed detection +# =================================================================== + +class TestQueuedMessageAlreadyStreamed: + """The queued-message path should detect that the first response was + already streamed (already_sent=True) even without response_previewed.""" + + def _make_mock_sc(self, already_sent=False, final_response_sent=False): + return SimpleNamespace( + already_sent=already_sent, + final_response_sent=final_response_sent, + ) + + def test_queued_path_detects_already_streamed(self): + """already_sent=True on stream consumer means first response was + streamed — skip re-sending before processing queued message.""" + _sc = self._make_mock_sc(already_sent=True) + + # Reproduce the queued-message logic from run.py (post-fix) + _already_streamed = bool( + _sc + and ( + getattr(_sc, "final_response_sent", False) + or getattr(_sc, "already_sent", False) + ) + ) + + assert _already_streamed is True + + def test_queued_path_sends_when_not_streamed(self): + """Nothing was streamed — first response should be sent before + processing the queued message.""" + _sc = self._make_mock_sc(already_sent=False) + + _already_streamed = bool( + _sc + and ( + getattr(_sc, "final_response_sent", False) + or getattr(_sc, "already_sent", False) + ) + ) + + assert _already_streamed is False + + def test_queued_path_with_no_stream_consumer(self): + """No stream consumer at all (streaming disabled) — not streamed.""" + _sc = None + + _already_streamed = bool( + _sc + and ( + getattr(_sc, "final_response_sent", False) + or getattr(_sc, "already_sent", False) + ) + ) + + assert _already_streamed is False