fix(compression): pass thread_id metadata + add gateway test for warning delivery

Address review feedback on PR #16333:

1. The hygiene-path warning send was missing metadata=_hyg_meta. On
   Telegram topics / Slack threads / Discord threads the warning would
   land in the main channel instead of the originating thread. Now
   reuses the same _hyg_meta dict already computed for the hygiene
   compaction itself.

2. New gateway-level test
   test_session_hygiene_warns_user_when_summary_generation_fails
   verifies end-to-end:
   - When the compressor's _last_summary_fallback_used flag is True,
     the gateway invokes adapter.send() exactly once.
   - The warning message includes the dropped count and the underlying
     error string.
   - metadata={'thread_id': ...} is propagated so the warning lands
     in the originating topic/thread.

Tests: 20 gateway hygiene + 54 context_compressor — all pass.
This commit is contained in:
iamagenius00
2026-04-27 11:25:14 +08:00
committed by Teknium
parent dfdc4276e8
commit c61bc3f72c
2 changed files with 117 additions and 1 deletions

View File

@@ -4822,7 +4822,7 @@ class GatewayRunner:
try:
_adapter = self.adapters.get(source.platform)
if _adapter and source.chat_id:
await _adapter.send(source.chat_id, _warn_msg)
await _adapter.send(source.chat_id, _warn_msg, metadata=_hyg_meta)
except Exception as _werr:
logger.warning(
"Failed to deliver compression-failure warning to user: %s",

View File

@@ -393,3 +393,119 @@ async def test_session_hygiene_messages_stay_in_originating_topic(monkeypatch, t
assert FakeCompressAgent.last_instance is not None
FakeCompressAgent.last_instance.shutdown_memory_provider.assert_called_once()
FakeCompressAgent.last_instance.close.assert_called_once()
@pytest.mark.asyncio
async def test_session_hygiene_warns_user_when_summary_generation_fails(monkeypatch, tmp_path):
"""When auxiliary compression's summary LLM call fails, the compressor
inserts a static fallback and the dropped turns are unrecoverable.
Gateway must surface a visible ⚠️ warning to the user, including
thread_id metadata so it lands in the originating topic/thread."""
fake_dotenv = types.ModuleType("dotenv")
fake_dotenv.load_dotenv = lambda *args, **kwargs: None
monkeypatch.setitem(sys.modules, "dotenv", fake_dotenv)
class FakeCompressAgentWithSummaryFailure:
last_instance = None
def __init__(self, **kwargs):
self.model = kwargs.get("model")
self.session_id = kwargs.get("session_id", "fake-session")
self._print_fn = None
self.shutdown_memory_provider = MagicMock()
self.close = MagicMock()
# Simulate a compressor that hit summary-generation failure
# and inserted the static fallback placeholder.
self.context_compressor = SimpleNamespace(
_last_summary_fallback_used=True,
_last_summary_dropped_count=42,
_last_summary_error="404 model not found: gemini-3-flash-preview",
)
type(self).last_instance = self
def _compress_context(self, messages, *_args, **_kwargs):
self.session_id = f"{self.session_id}_compressed"
return ([{"role": "assistant", "content": "compressed"}], None)
fake_run_agent = types.ModuleType("run_agent")
fake_run_agent.AIAgent = FakeCompressAgentWithSummaryFailure
monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent)
gateway_run = importlib.import_module("gateway.run")
GatewayRunner = gateway_run.GatewayRunner
adapter = HygieneCaptureAdapter()
runner = object.__new__(GatewayRunner)
runner.config = GatewayConfig(
platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="fake-token")}
)
runner.adapters = {Platform.TELEGRAM: adapter}
runner._voice_mode = {}
runner.hooks = SimpleNamespace(emit=AsyncMock(), loaded_hooks=False)
runner.session_store = MagicMock()
runner.session_store.get_or_create_session.return_value = SessionEntry(
session_key="agent:main:telegram:group:-1001:17585",
session_id="sess-1",
created_at=datetime.now(),
updated_at=datetime.now(),
platform=Platform.TELEGRAM,
chat_type="group",
)
runner.session_store.load_transcript.return_value = _make_history(6, content_size=400)
runner.session_store.has_any_sessions.return_value = True
runner.session_store.rewrite_transcript = MagicMock()
runner.session_store.append_to_transcript = MagicMock()
runner._running_agents = {}
runner._pending_messages = {}
runner._pending_approvals = {}
runner._session_db = None
runner._is_user_authorized = lambda _source: True
runner._set_session_env = lambda _context: None
runner._run_agent = AsyncMock(
return_value={
"final_response": "ok",
"messages": [],
"tools": [],
"history_offset": 0,
"last_prompt_tokens": 0,
}
)
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"})
monkeypatch.setattr(
"agent.model_metadata.get_model_context_length",
lambda *_args, **_kwargs: 100,
)
monkeypatch.setenv("TELEGRAM_HOME_CHANNEL", "795544298")
event = MessageEvent(
text="hello",
source=SessionSource(
platform=Platform.TELEGRAM,
chat_id="-1001",
chat_type="group",
thread_id="17585",
user_id="12345",
),
message_id="1",
)
result = await runner._handle_message(event)
assert result == "ok"
# The compressor reported summary-failure → exactly one warning
# message must have been delivered to the user.
warning_messages = [s for s in adapter.sent if "Context compression summary failed" in s["content"]]
assert len(warning_messages) == 1, (
f"Expected 1 compression-failure warning, got {len(warning_messages)}: {adapter.sent}"
)
warn = warning_messages[0]
# Warning must include the dropped count and the underlying error.
assert "42" in warn["content"]
assert "404" in warn["content"]
# Warning must land in the originating topic/thread, not the main channel.
assert warn["chat_id"] == "-1001"
assert warn["metadata"] == {"thread_id": "17585"}
FakeCompressAgentWithSummaryFailure.last_instance.close.assert_called_once()