fix(gateway): separate observed Telegram group context

This commit is contained in:
Markus
2026-05-21 19:07:40 -04:00
committed by Teknium
parent 729a778af0
commit 4a91e36495
6 changed files with 251 additions and 50 deletions

View File

@@ -4573,10 +4573,10 @@ class TelegramAdapter(BasePlatformAdapter):
return ( return (
"You are handling a Telegram group chat message.\n" "You are handling a Telegram group chat message.\n"
f"- Your identity: user_id={bot_id}, @-mention name in this group=@{username}\n" f"- Your identity: user_id={bot_id}, @-mention name in this group=@{username}\n"
"- Lines in history prefixed with `[nickname|user_id]` are observed Telegram group context " "- observed Telegram group context may be provided in a separate context-only block "
"and are not necessarily addressed to you.\n" "before the current message; it is not necessarily addressed to you.\n"
"- Treat only the current new message as a request explicitly directed at you, " "- Treat only the current new message as a request explicitly directed at you, "
"and answer it directly." "and use observed context only when the current message asks for it."
) )
def _apply_telegram_group_observe_attribution(self, event: MessageEvent) -> MessageEvent: def _apply_telegram_group_observe_attribution(self, event: MessageEvent) -> MessageEvent:

View File

@@ -447,6 +447,109 @@ def _build_replay_entry(role: str, content: Any, msg: Dict[str, Any]) -> Dict[st
return entry return entry
_TELEGRAM_OBSERVED_CONTEXT_PROMPT_MARKER = "observed Telegram group context"
_OBSERVED_GROUP_CONTEXT_HEADER = "[Observed Telegram group context - context only, not requests]"
_CURRENT_ADDRESSED_MESSAGE_HEADER = "[Current addressed message - answer only this unless it explicitly asks you to use the observed context]"
def _uses_telegram_observed_group_context(channel_prompt: Optional[str]) -> bool:
"""Return True for Telegram group turns that may include observed chatter.
Telegram's observe-unmentioned mode persists skipped group chatter so a
later @mention can see it. Those rows must not replay as ordinary user
turns: a weak wake word like ``@bot cambio`` should not make the model treat
old unmentioned chatter as pending work. The Telegram adapter marks these
turns with a channel prompt; this helper keeps the run-path check explicit
and unit-testable.
"""
return bool(channel_prompt and _TELEGRAM_OBSERVED_CONTEXT_PROMPT_MARKER in channel_prompt)
def _build_gateway_agent_history(
history: List[Dict[str, Any]],
*,
channel_prompt: Optional[str] = None,
) -> tuple[List[Dict[str, Any]], Optional[str]]:
"""Convert stored gateway transcript rows into agent replay messages.
Observed Telegram group rows are returned as API-only context for the
current addressed message instead of being replayed as normal prior user
turns. Keeping that context out of ``conversation_history`` avoids
consecutive-user repair merging it with the live user turn and then hiding
the current message behind ``history_offset`` during persistence.
"""
agent_history: List[Dict[str, Any]] = []
observed_group_context: List[str] = []
separate_observed_context = _uses_telegram_observed_group_context(channel_prompt)
for msg in history or []:
role = msg.get("role")
if not role:
continue
# Skip metadata entries (tool definitions, session info) -- these are
# for transcript logging, not for the LLM.
if role in {"session_meta",}:
continue
# Skip system messages -- the agent rebuilds its own system prompt.
if role == "system":
continue
content = msg.get("content")
if separate_observed_context and msg.get("observed") and role == "user" and content:
observed_group_context.append(str(content).strip())
continue
# Rich agent messages (tool_calls, tool results) must be passed through
# intact so the API sees valid assistant→tool sequences.
has_tool_calls = "tool_calls" in msg
has_tool_call_id = "tool_call_id" in msg
is_tool_message = role == "tool"
if has_tool_calls or has_tool_call_id or is_tool_message:
clean_msg = {k: v for k, v in msg.items() if k not in {"timestamp", "observed"}}
agent_history.append(clean_msg)
elif content:
# Simple text message - just need role and content.
if msg.get("mirror"):
mirror_src = msg.get("mirror_source", "another session")
content = f"[Delivered from {mirror_src}] {content}"
entry = _build_replay_entry(role, content, msg)
agent_history.append(entry)
observed_context = "\n".join(observed_group_context).strip() or None
return agent_history, observed_context
def _wrap_current_message_with_observed_context(message: Any, observed_context: Optional[str]) -> Any:
"""Prepend observed Telegram context to the API-only current user turn."""
if not observed_context:
return message
prefix = (
f"{_OBSERVED_GROUP_CONTEXT_HEADER}\n"
f"{observed_context}\n\n"
f"{_CURRENT_ADDRESSED_MESSAGE_HEADER}\n"
)
if isinstance(message, str):
return f"{prefix}{message}"
if isinstance(message, list):
wrapped = [dict(part) if isinstance(part, dict) else part for part in message]
for part in wrapped:
if isinstance(part, dict) and part.get("type") == "text":
part["text"] = f"{prefix}{part.get('text', '')}"
return wrapped
return [{"type": "text", "text": prefix.rstrip()}] + wrapped
return message
def _last_transcript_timestamp(history: Optional[List[Dict[str, Any]]]) -> Any: def _last_transcript_timestamp(history: Optional[List[Dict[str, Any]]]) -> Any:
"""Return the ``timestamp`` of the last usable transcript row, if any. """Return the ``timestamp`` of the last usable transcript row, if any.
@@ -16467,45 +16570,16 @@ class GatewayRunner:
# that may include tool_calls, tool_call_id, reasoning, etc. # that may include tool_calls, tool_call_id, reasoning, etc.
# - These must be passed through intact so the API sees valid # - These must be passed through intact so the API sees valid
# assistant→tool sequences (dropping tool_calls causes 500 errors) # assistant→tool sequences (dropping tool_calls causes 500 errors)
agent_history = [] #
for msg in history: # Telegram observed group context is handled structurally here:
role = msg.get("role") # observed=True transcript rows are withheld from replayable
if not role: # history and attached to the current addressed message as
continue # API-only context, so persisted history stores only the real
# addressed user turn.
# Skip metadata entries (tool definitions, session info) agent_history, observed_group_context = _build_gateway_agent_history(
# -- these are for transcript logging, not for the LLM history,
if role in {"session_meta",}: channel_prompt=channel_prompt,
continue )
# Skip system messages -- the agent rebuilds its own system prompt
if role == "system":
continue
# Rich agent messages (tool_calls, tool results) must be passed
# through intact so the API sees valid assistant→tool sequences
has_tool_calls = "tool_calls" in msg
has_tool_call_id = "tool_call_id" in msg
is_tool_message = role == "tool"
if has_tool_calls or has_tool_call_id or is_tool_message:
clean_msg = {k: v for k, v in msg.items() if k != "timestamp"}
agent_history.append(clean_msg)
else:
# Simple text message - just need role and content
content = msg.get("content")
if content:
# Tag cross-platform mirror messages so the agent knows their origin
if msg.get("mirror"):
mirror_src = msg.get("mirror_source", "another session")
content = f"[Delivered from {mirror_src}] {content}"
# Preserve assistant reasoning + Codex replay fields so
# multi-turn reasoning context, prefix-cache hits, and
# provider-specific echo requirements survive session
# reload. See ``_ASSISTANT_REPLAY_FIELDS`` for the full
# whitelist and rationale.
entry = _build_replay_entry(role, content, msg)
agent_history.append(entry)
# Collect MEDIA paths already in history so we can exclude them # Collect MEDIA paths already in history so we can exclude them
# from the current turn's extraction. This is compression-safe: # from the current turn's extraction. This is compression-safe:
@@ -16738,7 +16812,17 @@ class GatewayRunner:
else: else:
_run_message = message _run_message = message
result = agent.run_conversation(_run_message, conversation_history=agent_history, task_id=session_id) _api_run_message = _wrap_current_message_with_observed_context(
_run_message,
observed_group_context,
)
_conversation_kwargs = {
"conversation_history": agent_history,
"task_id": session_id,
}
if observed_group_context:
_conversation_kwargs["persist_user_message"] = message
result = agent.run_conversation(_api_run_message, **_conversation_kwargs)
finally: finally:
unregister_gateway_notify(_approval_session_key) unregister_gateway_notify(_approval_session_key)
# Cancel any pending clarify entries so blocked agent # Cancel any pending clarify entries so blocked agent

View File

@@ -1277,6 +1277,7 @@ class SessionStore:
platform_message_id=( platform_message_id=(
message.get("platform_message_id") or message.get("message_id") message.get("platform_message_id") or message.get("message_id")
), ),
observed=bool(message.get("observed")),
) )
except Exception as e: except Exception as e:
logger.debug("Session DB operation failed: %s", e) logger.debug("Session DB operation failed: %s", e)

View File

@@ -33,7 +33,7 @@ T = TypeVar("T")
DEFAULT_DB_PATH = get_hermes_home() / "state.db" DEFAULT_DB_PATH = get_hermes_home() / "state.db"
SCHEMA_VERSION = 12 SCHEMA_VERSION = 13
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# WAL-compatibility fallback # WAL-compatibility fallback
@@ -237,7 +237,8 @@ CREATE TABLE IF NOT EXISTS messages (
reasoning_details TEXT, reasoning_details TEXT,
codex_reasoning_items TEXT, codex_reasoning_items TEXT,
codex_message_items TEXT, codex_message_items TEXT,
platform_message_id TEXT platform_message_id TEXT,
observed INTEGER DEFAULT 0
); );
CREATE TABLE IF NOT EXISTS state_meta ( CREATE TABLE IF NOT EXISTS state_meta (
@@ -1460,6 +1461,7 @@ class SessionDB:
codex_reasoning_items: Any = None, codex_reasoning_items: Any = None,
codex_message_items: Any = None, codex_message_items: Any = None,
platform_message_id: str = None, platform_message_id: str = None,
observed: bool = False,
) -> int: ) -> int:
""" """
Append a message to a session. Returns the message row ID. Append a message to a session. Returns the message row ID.
@@ -1501,8 +1503,8 @@ class SessionDB:
"""INSERT INTO messages (session_id, role, content, tool_call_id, """INSERT INTO messages (session_id, role, content, tool_call_id,
tool_calls, tool_name, timestamp, token_count, finish_reason, tool_calls, tool_name, timestamp, token_count, finish_reason,
reasoning, reasoning_content, reasoning_details, codex_reasoning_items, reasoning, reasoning_content, reasoning_details, codex_reasoning_items,
codex_message_items, platform_message_id) codex_message_items, platform_message_id, observed)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
( (
session_id, session_id,
role, role,
@@ -1519,6 +1521,7 @@ class SessionDB:
codex_items_json, codex_items_json,
codex_message_items_json, codex_message_items_json,
platform_message_id, platform_message_id,
1 if observed else 0,
), ),
) )
msg_id = cursor.lastrowid msg_id = cursor.lastrowid
@@ -1590,8 +1593,8 @@ class SessionDB:
"""INSERT INTO messages (session_id, role, content, tool_call_id, """INSERT INTO messages (session_id, role, content, tool_call_id,
tool_calls, tool_name, timestamp, token_count, finish_reason, tool_calls, tool_name, timestamp, token_count, finish_reason,
reasoning, reasoning_content, reasoning_details, codex_reasoning_items, reasoning, reasoning_content, reasoning_details, codex_reasoning_items,
codex_message_items, platform_message_id) codex_message_items, platform_message_id, observed)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
( (
session_id, session_id,
role, role,
@@ -1608,6 +1611,7 @@ class SessionDB:
codex_items_json, codex_items_json,
codex_message_items_json, codex_message_items_json,
platform_msg_id, platform_msg_id,
1 if msg.get("observed") else 0,
), ),
) )
total_messages += 1 total_messages += 1
@@ -1925,7 +1929,7 @@ class SessionDB:
rows = self._conn.execute( rows = self._conn.execute(
"SELECT role, content, tool_call_id, tool_calls, tool_name, " "SELECT role, content, tool_call_id, tool_calls, tool_name, "
"finish_reason, reasoning, reasoning_content, reasoning_details, " "finish_reason, reasoning, reasoning_content, reasoning_details, "
"codex_reasoning_items, codex_message_items, platform_message_id " "codex_reasoning_items, codex_message_items, platform_message_id, observed "
f"FROM messages WHERE session_id IN ({placeholders}) ORDER BY id", f"FROM messages WHERE session_id IN ({placeholders}) ORDER BY id",
tuple(session_ids), tuple(session_ids),
).fetchall() ).fetchall()
@@ -1953,6 +1957,8 @@ class SessionDB:
# for backward compatibility with the JSONL transcript shape. # for backward compatibility with the JSONL transcript shape.
if row["platform_message_id"]: if row["platform_message_id"]:
msg["message_id"] = row["platform_message_id"] msg["message_id"] = row["platform_message_id"]
if row["observed"]:
msg["observed"] = True
# Restore reasoning fields on assistant messages so providers # Restore reasoning fields on assistant messages so providers
# that replay reasoning (OpenRouter, OpenAI, Nous) receive # that replay reasoning (OpenRouter, OpenAI, Nous) receive
# coherent multi-turn reasoning context. # coherent multi-turn reasoning context.

View File

@@ -225,6 +225,94 @@ def test_observed_group_context_uses_shared_source_and_prompt_for_later_mentions
asyncio.run(_run()) asyncio.run(_run())
def test_observed_group_context_replays_as_current_message_context_not_user_turns():
from gateway.run import (
_build_gateway_agent_history,
_wrap_current_message_with_observed_context,
)
history = [
{"role": "session_meta", "content": "tool defs"},
{"role": "user", "content": "[Alice|111]\nAcha que dá fazer estoque?", "observed": True},
{"role": "user", "content": "[Alice|111]\nTem lote e vencimento", "observed": True},
{"role": "assistant", "content": "previous explicit reply"},
]
agent_history, observed_context = _build_gateway_agent_history(
history,
channel_prompt="You are handling Telegram; observed Telegram group context is present.",
)
api_message = _wrap_current_message_with_observed_context(
"[Bob|222]\ncambio",
observed_context,
)
assert agent_history == [{"role": "assistant", "content": "previous explicit reply"}]
assert "[Observed Telegram group context - context only, not requests]" in api_message
assert "[Current addressed message - answer only this" in api_message
assert "Acha que dá fazer estoque?" in api_message
assert "Tem lote e vencimento" in api_message
assert api_message.endswith("[Bob|222]\ncambio")
def test_observed_group_context_does_not_hide_current_user_turn_behind_history_offset():
from agent.agent_runtime_helpers import repair_message_sequence
from gateway.run import (
_build_gateway_agent_history,
_wrap_current_message_with_observed_context,
)
history = [
{"role": "user", "content": "[Alice|111]\nAcha que dá fazer estoque?", "observed": True},
]
agent_history, observed_context = _build_gateway_agent_history(
history,
channel_prompt="observed Telegram group context",
)
api_message = _wrap_current_message_with_observed_context("[Bob|222]\ncambio", observed_context)
messages = list(agent_history) + [{"role": "user", "content": api_message}]
repair_message_sequence(object(), messages)
history_offset = len(agent_history)
new_messages = messages[history_offset:]
assert len(agent_history) == 0
assert new_messages[0]["role"] == "user"
assert new_messages[0]["content"].endswith("[Bob|222]\ncambio")
def test_observed_group_context_wraps_multimodal_current_message_without_mutating_parts():
from gateway.run import _wrap_current_message_with_observed_context
original = [
{"type": "text", "text": "[Bob|222]\nsee this image"},
{"type": "image_url", "image_url": {"url": "data:image/png;base64,abc"}},
]
wrapped = _wrap_current_message_with_observed_context(
original,
"[Alice|111]\nside chatter",
)
assert original[0]["text"] == "[Bob|222]\nsee this image"
assert wrapped[0]["text"].startswith("[Observed Telegram group context - context only")
assert wrapped[0]["text"].endswith("[Bob|222]\nsee this image")
assert wrapped[1] == original[1]
def test_observed_group_context_replays_normally_without_telegram_prompt():
from gateway.run import _build_gateway_agent_history
history = [
{"role": "user", "content": "[Alice|111]\nside chatter", "observed": True},
]
agent_history, observed_context = _build_gateway_agent_history(history, channel_prompt=None)
assert observed_context is None
assert agent_history == [{"role": "user", "content": "[Alice|111]\nside chatter"}]
def test_unmentioned_group_observe_requires_chat_allowlist_for_shared_context(): def test_unmentioned_group_observe_requires_chat_allowlist_for_shared_context():
async def _run(): async def _run():
adapter = _make_adapter( adapter = _make_adapter(

View File

@@ -161,6 +161,28 @@ class TestMessageStorage:
session = db.get_session("s1") session = db.get_session("s1")
assert session["message_count"] == 2 assert session["message_count"] == 2
def test_observed_flag_round_trips_for_gateway_replay(self, db):
db.create_session(session_id="s1", source="telegram:-100")
db.append_message(
"s1",
role="user",
content="[Alice|111]\nside chatter",
observed=True,
)
db.append_message("s1", role="assistant", content="ack")
messages = db.get_messages("s1")
assert messages[0]["observed"] == 1
assert messages[1]["observed"] == 0
conversation = db.get_messages_as_conversation("s1")
assert conversation[0] == {
"role": "user",
"content": "[Alice|111]\nside chatter",
"observed": True,
}
assert "observed" not in conversation[1]
def test_tool_response_does_not_increment_tool_count(self, db): def test_tool_response_does_not_increment_tool_count(self, db):
"""Tool responses (role=tool) should not increment tool_call_count. """Tool responses (role=tool) should not increment tool_call_count.