diff --git a/gateway/platforms/webhook.py b/gateway/platforms/webhook.py index daaf4f5dc..6d4885d2b 100644 --- a/gateway/platforms/webhook.py +++ b/gateway/platforms/webhook.py @@ -76,8 +76,17 @@ class WebhookAdapter(BasePlatformAdapter): self._routes: Dict[str, dict] = dict(self._static_routes) self._runner = None - # Delivery info keyed by session chat_id — consumed by send() + # Delivery info keyed by session chat_id. + # + # Read by every send() invocation for the chat_id (status messages + # AND the final response). Cleaned up via TTL on each POST so the + # dict stays bounded — see _prune_delivery_info(). Do NOT pop on + # send(), or interim status messages (e.g. fallback notifications, + # context-pressure warnings) will consume the entry before the + # final response arrives, causing the response to silently fall + # back to the "log" deliver type. self._delivery_info: Dict[str, dict] = {} + self._delivery_info_created: Dict[str, float] = {} # Reference to gateway runner for cross-platform delivery (set externally) self.gateway_runner = None @@ -160,10 +169,14 @@ class WebhookAdapter(BasePlatformAdapter): ) -> SendResult: """Deliver the agent's response to the configured destination. - chat_id is ``webhook:{route}:{delivery_id}`` — we pop the delivery - info stored during webhook receipt so it doesn't leak memory. + chat_id is ``webhook:{route}:{delivery_id}``. The delivery info + stored during webhook receipt is read with ``.get()`` (not popped) + so that interim status messages emitted before the final response + — fallback-model notifications, context-pressure warnings, etc. — + do not consume the entry and silently downgrade the final response + to the ``log`` deliver type. TTL cleanup happens on POST. """ - delivery = self._delivery_info.pop(chat_id, {}) + delivery = self._delivery_info.get(chat_id, {}) deliver_type = delivery.get("deliver", "log") if deliver_type == "log": @@ -190,6 +203,23 @@ class WebhookAdapter(BasePlatformAdapter): success=False, error=f"Unknown deliver type: {deliver_type}" ) + def _prune_delivery_info(self, now: float) -> None: + """Drop delivery_info entries older than the idempotency TTL. + + Mirrors the cleanup pattern used for ``_seen_deliveries``. Called + on each POST so the dict size is bounded by ``rate_limit * TTL`` + even if many webhooks fire and never receive a final response. + """ + cutoff = now - self._idempotency_ttl + stale = [ + k + for k, t in self._delivery_info_created.items() + if t < cutoff + ] + for k in stale: + self._delivery_info.pop(k, None) + self._delivery_info_created.pop(k, None) + async def get_chat_info(self, chat_id: str) -> Dict[str, Any]: return {"name": chat_id, "type": "webhook"} @@ -382,7 +412,9 @@ class WebhookAdapter(BasePlatformAdapter): # same route get independent agent runs (not queued/interrupted). session_chat_id = f"webhook:{route_name}:{delivery_id}" - # Store delivery info for send() — consumed (popped) on delivery + # Store delivery info for send(). Read by every send() invocation + # for this chat_id (interim status messages and the final response), + # so we do NOT pop on send. TTL-based cleanup keeps the dict bounded. deliver_config = { "deliver": route_config.get("deliver", "log"), "deliver_extra": self._render_delivery_extra( @@ -391,6 +423,8 @@ class WebhookAdapter(BasePlatformAdapter): "payload": payload, } self._delivery_info[session_chat_id] = deliver_config + self._delivery_info_created[session_chat_id] = now + self._prune_delivery_info(now) # Build source and event source = self.build_source( diff --git a/tests/gateway/test_webhook_adapter.py b/tests/gateway/test_webhook_adapter.py index f323b95af..bedf254a1 100644 --- a/tests/gateway/test_webhook_adapter.py +++ b/tests/gateway/test_webhook_adapter.py @@ -590,8 +590,15 @@ class TestSessionIsolation: class TestDeliveryCleanup: @pytest.mark.asyncio - async def test_delivery_info_cleaned_after_send(self): - """send() pops delivery_info so the entry doesn't leak memory.""" + async def test_delivery_info_survives_multiple_sends(self): + """send() must NOT pop delivery_info. + + Interim status messages (fallback notifications, context-pressure + warnings, etc.) flow through the same send() path as the final + response. If the entry were popped on the first send, the final + response would silently downgrade to the ``log`` deliver type. + Regression test for that bug. + """ adapter = _make_adapter() chat_id = "webhook:test:d-xyz" adapter._delivery_info[chat_id] = { @@ -599,10 +606,40 @@ class TestDeliveryCleanup: "deliver_extra": {}, "payload": {"x": 1}, } + adapter._delivery_info_created[chat_id] = time.time() - result = await adapter.send(chat_id, "Agent response here") - assert result.success is True - assert chat_id not in adapter._delivery_info + # First send (e.g. an interim status message) + result1 = await adapter.send(chat_id, "Status: switching to fallback") + assert result1.success is True + # Entry must still be present so the final send can read it + assert chat_id in adapter._delivery_info + + # Second send (the final agent response) + result2 = await adapter.send(chat_id, "Final agent response") + assert result2.success is True + assert chat_id in adapter._delivery_info + + @pytest.mark.asyncio + async def test_delivery_info_pruned_via_ttl(self): + """Stale delivery_info entries are dropped on the next POST.""" + adapter = _make_adapter() + adapter._idempotency_ttl = 60 # short TTL for the test + now = time.time() + + # Stale entry — older than TTL + adapter._delivery_info["webhook:test:old"] = {"deliver": "log"} + adapter._delivery_info_created["webhook:test:old"] = now - 120 + + # Fresh entry — should survive + adapter._delivery_info["webhook:test:new"] = {"deliver": "log"} + adapter._delivery_info_created["webhook:test:new"] = now - 5 + + adapter._prune_delivery_info(now) + + assert "webhook:test:old" not in adapter._delivery_info + assert "webhook:test:old" not in adapter._delivery_info_created + assert "webhook:test:new" in adapter._delivery_info + assert "webhook:test:new" in adapter._delivery_info_created # =================================================================== diff --git a/tests/gateway/test_webhook_integration.py b/tests/gateway/test_webhook_integration.py index 899989810..5c6fe0111 100644 --- a/tests/gateway/test_webhook_integration.py +++ b/tests/gateway/test_webhook_integration.py @@ -259,8 +259,9 @@ class TestCrossPlatformDelivery: mock_tg_adapter.send.assert_awaited_once_with( "12345", "I've acknowledged the alert.", metadata=None ) - # Delivery info should be cleaned up - assert chat_id not in adapter._delivery_info + # Delivery info is retained after send() so interim status messages + # don't strand the final response (TTL-based cleanup happens on POST). + assert chat_id in adapter._delivery_info # =================================================================== @@ -333,5 +334,6 @@ class TestGitHubCommentDelivery: text=True, timeout=30, ) - # Delivery info cleaned up - assert chat_id not in adapter._delivery_info + # Delivery info is retained after send() so interim status messages + # don't strand the final response (TTL-based cleanup happens on POST). + assert chat_id in adapter._delivery_info