gateway: debounce queued text follow-ups

This commit is contained in:
Paulo Nascimento
2026-05-23 21:32:19 -04:00
committed by Teknium
parent 21db250034
commit 7abd62719b
10 changed files with 607 additions and 89 deletions

View File

@@ -15,6 +15,7 @@ import re
import socket as _socket import socket as _socket
import subprocess import subprocess
import sys import sys
import time
import uuid import uuid
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from urllib.parse import urlsplit from urllib.parse import urlsplit
@@ -40,6 +41,16 @@ def _platform_name(platform) -> str:
return str(value or "").lower() return str(value or "").lower()
def _float_env(name: str, default: float) -> float:
raw = os.environ.get(name, "").strip()
if not raw:
return default
try:
return float(raw)
except (TypeError, ValueError):
return default
def _thread_metadata_for_source(source, reply_to_message_id: str | None = None) -> dict | None: def _thread_metadata_for_source(source, reply_to_message_id: str | None = None) -> dict | None:
"""Build platform-aware thread metadata for adapter sends. """Build platform-aware thread metadata for adapter sends.
@@ -1103,6 +1114,14 @@ class MessageEvent:
return args return args
@dataclass
class TextDebounceState:
event: MessageEvent
task: asyncio.Task | None
first_ts: float
last_ts: float
_PLAINTEXT_GATEWAY_RESTART_PATTERNS: tuple[re.Pattern[str], ...] = ( _PLAINTEXT_GATEWAY_RESTART_PATTERNS: tuple[re.Pattern[str], ...] = (
re.compile(r"^(?:please\s+)?restart\s+(?:the\s+)?gateway[.!?\s]*$", re.IGNORECASE), re.compile(r"^(?:please\s+)?restart\s+(?:the\s+)?gateway[.!?\s]*$", re.IGNORECASE),
re.compile(r"^(?:please\s+)?restart\s+(?:the\s+)?hermes\s+gateway[.!?\s]*$", re.IGNORECASE), re.compile(r"^(?:please\s+)?restart\s+(?:the\s+)?hermes\s+gateway[.!?\s]*$", re.IGNORECASE),
@@ -1398,6 +1417,17 @@ class BasePlatformAdapter(ABC):
self._active_sessions: Dict[str, asyncio.Event] = {} self._active_sessions: Dict[str, asyncio.Event] = {}
self._pending_messages: Dict[str, MessageEvent] = {} self._pending_messages: Dict[str, MessageEvent] = {}
self._session_tasks: Dict[str, asyncio.Task] = {} self._session_tasks: Dict[str, asyncio.Task] = {}
self._busy_text_mode: str = (
os.environ.get("HERMES_GATEWAY_BUSY_TEXT_MODE", "queue").strip().lower()
or "queue"
)
self._busy_text_debounce_seconds: float = _float_env(
"HERMES_GATEWAY_BUSY_TEXT_DEBOUNCE_SECONDS", 0.35
)
self._busy_text_hard_cap_seconds: float = _float_env(
"HERMES_GATEWAY_BUSY_TEXT_HARD_CAP_SECONDS", 1.0
)
self._text_debounce: dict[str, TextDebounceState] = {}
# Background message-processing tasks spawned by handle_message(). # Background message-processing tasks spawned by handle_message().
# Gateway shutdown cancels these so an old gateway instance doesn't keep # Gateway shutdown cancels these so an old gateway instance doesn't keep
# working on a task after --replace or manual restarts. # working on a task after --replace or manual restarts.
@@ -2725,6 +2755,161 @@ class BasePlatformAdapter(ABC):
return f"{existing_text}\n\n{new_text}".strip() return f"{existing_text}\n\n{new_text}".strip()
return existing_text return existing_text
def _text_debounce_store(self) -> dict[str, TextDebounceState]:
store = getattr(self, "_text_debounce", None)
if store is None:
store = {}
self._text_debounce = store
return store
def _is_queue_text_debounce_candidate(self, event: MessageEvent) -> bool:
"""Return True for normal text eligible for queue-mode debounce."""
result = (
getattr(self, "_busy_text_mode", "queue") == "queue"
and event.message_type == MessageType.TEXT
and not getattr(event, "internal", False)
and not event.is_command()
and bool((event.text or "").strip())
)
if result:
logger.debug(
"[%s] Queue-text debounce candidate accepted: session=%s text=%.60s",
self.name,
getattr(event, "session_key", "?"),
(event.text or "")[:60],
)
return result
def _can_merge_text_debounce_events(self, existing: MessageEvent, event: MessageEvent) -> bool:
"""Return True when two text debounce events came from the same sender."""
def _identity(candidate: MessageEvent) -> tuple[str, ...] | None:
source = getattr(candidate, "source", None)
if source is None:
return None
platform = _platform_name(getattr(source, "platform", None))
sender = getattr(source, "user_id_alt", None) or getattr(source, "user_id", None)
if sender:
return (platform, str(sender))
if getattr(source, "chat_type", None) in {"dm", "private"} and getattr(source, "chat_id", None):
return (platform, "dm", str(source.chat_id))
return None
existing_sender = _identity(existing)
incoming_sender = _identity(event)
return existing_sender is not None and existing_sender == incoming_sender
def _text_debounce_delay(self, session_key: str) -> float:
"""Return bounded busy-text debounce delay for ``session_key``."""
state = self._text_debounce_store().get(session_key)
if state is None:
return 0.0
now = time.monotonic()
window_deadline = state.last_ts + self._busy_text_debounce_seconds
hard_cap_deadline = state.first_ts + self._busy_text_hard_cap_seconds
return max(0.0, min(window_deadline, hard_cap_deadline) - now)
async def _queue_text_debounce(self, session_key: str, event: MessageEvent) -> None:
"""Buffer normal queue-mode busy text and schedule a bounded flush."""
store = self._text_debounce_store()
state = store.get(session_key)
if state is not None and not self._can_merge_text_debounce_events(state.event, event):
# Preserve sender attribution in shared sessions. The current
# buffer becomes the next pending turn; the new sender starts a
# fresh debounce burst when the pending slot allows it.
await self._flush_text_debounce_now(session_key)
state = store.get(session_key)
if state is not None and not self._can_merge_text_debounce_events(state.event, event):
existing_pending = self._pending_messages.get(session_key)
if existing_pending is not None and self._can_merge_text_debounce_events(existing_pending, event):
merge_pending_message_event(
self._pending_messages,
session_key,
event,
merge_text=True,
)
return
now = time.monotonic()
if state is None:
state = TextDebounceState(
event=event,
task=None,
first_ts=now,
last_ts=now,
)
store[session_key] = state
else:
if event.text:
state.event.text = (
f"{state.event.text}\n{event.text}"
if state.event.text
else event.text
)
latest_message_id = getattr(event, "message_id", None)
latest_anchor = latest_message_id or getattr(event, "reply_to_message_id", None)
if latest_message_id is not None:
state.event.message_id = str(latest_message_id)
if latest_anchor is not None and hasattr(state.event, "reply_to_message_id"):
state.event.reply_to_message_id = str(latest_anchor)
state.last_ts = now
if state.task is not None and not state.task.done():
state.task.cancel()
delay = self._text_debounce_delay(session_key)
state.task = asyncio.create_task(self._flush_text_debounce(session_key, delay))
async def _flush_text_debounce(self, session_key: str, delay: float) -> None:
"""Timer task that flushes the debounced text buffer."""
try:
await asyncio.sleep(delay)
await self._flush_text_debounce_now(session_key)
except asyncio.CancelledError:
return
finally:
current = asyncio.current_task()
state = self._text_debounce_store().get(session_key)
if state is not None and state.task is current:
state.task = None
async def _flush_text_debounce_now(self, session_key: str) -> bool:
"""Force-flush one debounced busy-text burst into the pending slot."""
store = self._text_debounce_store()
state = store.get(session_key)
if state is None:
return False
current = asyncio.current_task()
if state.task is not None and state.task is not current and not state.task.done():
state.task.cancel()
state.task = None
existing_pending = self._pending_messages.get(session_key)
if (
existing_pending is not None
and not self._can_merge_text_debounce_events(existing_pending, state.event)
):
return False
state = store.pop(session_key, None)
if state is None:
return False
merge_pending_message_event(
self._pending_messages,
session_key,
state.event,
merge_text=True,
)
return True
def _discard_text_debounce(self, session_key: str) -> None:
"""Cancel and drop pending text debounce state for control commands."""
state = self._text_debounce_store().pop(session_key, None)
if state is not None and state.task is not None and not state.task.done():
state.task.cancel()
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Session task + guard ownership helpers # Session task + guard ownership helpers
# ------------------------------------------------------------------ # ------------------------------------------------------------------
@@ -2794,6 +2979,7 @@ class BasePlatformAdapter(ABC):
self._active_sessions.pop(session_key, None) self._active_sessions.pop(session_key, None)
self._pending_messages.pop(session_key, None) self._pending_messages.pop(session_key, None)
self._session_tasks.pop(session_key, None) self._session_tasks.pop(session_key, None)
self._discard_text_debounce(session_key)
return True return True
def _start_session_processing( def _start_session_processing(
@@ -2875,6 +3061,7 @@ class BasePlatformAdapter(ABC):
) )
if discard_pending: if discard_pending:
self._pending_messages.pop(session_key, None) self._pending_messages.pop(session_key, None)
self._discard_text_debounce(session_key)
if release_guard: if release_guard:
self._release_session_guard(session_key) self._release_session_guard(session_key)
@@ -2889,6 +3076,7 @@ class BasePlatformAdapter(ABC):
command-scoped guard, then — if a follow-up message landed while the command-scoped guard, then — if a follow-up message landed while the
command was running — spawns a fresh processing task for it. command was running — spawns a fresh processing task for it.
""" """
await self._flush_text_debounce_now(session_key)
pending_event = self._pending_messages.pop(session_key, None) pending_event = self._pending_messages.pop(session_key, None)
self._release_session_guard(session_key, guard=command_guard) self._release_session_guard(session_key, guard=command_guard)
if pending_event is None: if pending_event is None:
@@ -3020,6 +3208,7 @@ class BasePlatformAdapter(ABC):
# through the dedicated handoff path that serializes # through the dedicated handoff path that serializes
# cancellation + runner response + pending drain. # cancellation + runner response + pending drain.
if cmd in {"stop", "new", "reset"}: if cmd in {"stop", "new", "reset"}:
self._discard_text_debounce(session_key)
try: try:
await self._dispatch_active_session_command(event, session_key, cmd) await self._dispatch_active_session_command(event, session_key, cmd)
except Exception as e: except Exception as e:
@@ -3064,8 +3253,9 @@ class BasePlatformAdapter(ABC):
# clarify-intercept can resolve it and unblock the agent. # clarify-intercept can resolve it and unblock the agent.
# #
# Without this bypass: the message gets queued in # Without this bypass: the message gets queued in
# _pending_messages AND triggers an interrupt, killing the # _pending_messages as a follow-up turn instead of reaching the
# agent run mid-clarify and discarding the user's answer. # clarify resolver, leaving the agent blocked and discarding the
# user's answer.
# Same shape as the /approve deadlock fix (PR #4926) — both # Same shape as the /approve deadlock fix (PR #4926) — both
# cases are "agent thread blocked on Event.wait, message must # cases are "agent thread blocked on Event.wait, message must
# reach the resolver before being treated as a new turn." # reach the resolver before being treated as a new turn."
@@ -3124,27 +3314,28 @@ class BasePlatformAdapter(ABC):
merge_pending_message_event(self._pending_messages, session_key, event) merge_pending_message_event(self._pending_messages, session_key, event)
return # Don't interrupt now - will run after current task completes return # Don't interrupt now - will run after current task completes
# Default behavior for non-photo follow-ups: interrupt the running agent. if self._is_queue_text_debounce_candidate(event):
# logger.debug(
# Use merge_text=True so rapid TEXT follow-ups (#4469) accumulate "[%s] New text message while session %s is active — "
# into the single pending slot instead of clobbering each other. "debouncing follow-up (busy_text_mode=queue, window=%.2fs)",
# Without merging, three rapid messages "A", "B", "C" land like: self.name,
# _pending_messages[k] = A (interrupts) session_key,
# _pending_messages[k] = B (replaces A before consumer reads) self._busy_text_debounce_seconds,
# _pending_messages[k] = C (replaces B) )
# ...and only "C" reaches the next turn. merge_pending_message_event await self._queue_text_debounce(session_key, event)
# already does the right thing for photo/media bursts; the else:
# ``merge_text=True`` flag extends that to plain TEXT events. logger.debug(
# Same shape as the Telegram bursty-grace path in gateway/run.py. "[%s] New message while session %s is active — queuing follow-up "
logger.debug("[%s] New message while session %s is active — triggering interrupt", self.name, session_key) "(no interrupt, will cascade after current turn)",
merge_pending_message_event( self.name,
self._pending_messages, session_key,
session_key, )
event, merge_pending_message_event(
merge_text=True, self._pending_messages,
) session_key,
# Signal the interrupt (the processing task checks this) event,
self._active_sessions[session_key].set() merge_text=event.message_type == MessageType.TEXT,
)
return # Don't process now - will be handled after current task finishes return # Don't process now - will be handled after current task finishes
# Mark session as active BEFORE spawning background task to close # Mark session as active BEFORE spawning background task to close
@@ -3498,10 +3689,15 @@ class BasePlatformAdapter(ABC):
ProcessingOutcome.SUCCESS if processing_ok else ProcessingOutcome.FAILURE, ProcessingOutcome.SUCCESS if processing_ok else ProcessingOutcome.FAILURE,
) )
# The active drain owns debounce state. If a queue-mode timer has
# not fired yet, force-flush into _pending_messages here and let
# this task hand off the follow-up.
await self._flush_text_debounce_now(session_key)
# Check if there's a pending message that was queued during our processing # Check if there's a pending message that was queued during our processing
if session_key in self._pending_messages: if session_key in self._pending_messages:
pending_event = self._pending_messages.pop(session_key) pending_event = self._pending_messages.pop(session_key)
logger.debug("[%s] Processing queued message from interrupt", self.name) logger.debug("[%s] Processing queued follow-up message", self.name)
# Keep the _active_sessions entry live across the turn chain # Keep the _active_sessions entry live across the turn chain
# and only CLEAR the interrupt Event — do NOT delete the entry. # and only CLEAR the interrupt Event — do NOT delete the entry.
# If we deleted here, a concurrent inbound message arriving # If we deleted here, a concurrent inbound message arriving
@@ -3510,7 +3706,7 @@ class BasePlatformAdapter(ABC):
# with the recursive drain below. Two agents on one # with the recursive drain below. Two agents on one
# session_key = duplicate responses, duplicate tool calls. # session_key = duplicate responses, duplicate tool calls.
# Clearing the Event keeps the guard live so follow-ups take # Clearing the Event keeps the guard live so follow-ups take
# the busy-handler path (queue + interrupt) as intended. # the busy-handler path as intended.
_active = self._active_sessions.get(session_key) _active = self._active_sessions.get(session_key)
if _active is not None: if _active is not None:
_active.clear() _active.clear()
@@ -3603,6 +3799,9 @@ class BasePlatformAdapter(ABC):
await self.stop_typing(event.source.chat_id) await self.stop_typing(event.source.chat_id)
except Exception: except Exception:
pass pass
# Final drain/release boundary: force-flush any timer that missed
# the in-band drain before deciding whether the guard can clear.
await self._flush_text_debounce_now(session_key)
# Late-arrival drain: a message may have arrived during the # Late-arrival drain: a message may have arrived during the
# cleanup awaits above (typing_task cancel, stop_typing). Such # cleanup awaits above (typing_task cancel, stop_typing). Such
# messages passed the Level-1 guard (entry still live, Event # messages passed the Level-1 guard (entry still live, Event
@@ -3722,6 +3921,10 @@ class BasePlatformAdapter(ABC):
self._session_tasks.clear() self._session_tasks.clear()
self._pending_messages.clear() self._pending_messages.clear()
self._active_sessions.clear() self._active_sessions.clear()
for state in list(self._text_debounce_store().values()):
if state.task is not None and not state.task.done():
state.task.cancel()
self._text_debounce_store().clear()
def has_pending_interrupt(self, session_key: str) -> bool: def has_pending_interrupt(self, session_key: str) -> bool:
"""Check if there's a pending interrupt for a session.""" """Check if there's a pending interrupt for a session."""

View File

@@ -839,6 +839,8 @@ if _config_path.exists():
if _display_cfg and isinstance(_display_cfg, dict): if _display_cfg and isinstance(_display_cfg, dict):
if "busy_input_mode" in _display_cfg: if "busy_input_mode" in _display_cfg:
os.environ["HERMES_GATEWAY_BUSY_INPUT_MODE"] = str(_display_cfg["busy_input_mode"]) os.environ["HERMES_GATEWAY_BUSY_INPUT_MODE"] = str(_display_cfg["busy_input_mode"])
if "busy_text_mode" in _display_cfg:
os.environ["HERMES_GATEWAY_BUSY_TEXT_MODE"] = str(_display_cfg["busy_text_mode"])
if "busy_ack_enabled" in _display_cfg: if "busy_ack_enabled" in _display_cfg:
os.environ["HERMES_GATEWAY_BUSY_ACK_ENABLED"] = str(_display_cfg["busy_ack_enabled"]) os.environ["HERMES_GATEWAY_BUSY_ACK_ENABLED"] = str(_display_cfg["busy_ack_enabled"])
# Timezone: bridge config.yaml → HERMES_TIMEZONE env var. # Timezone: bridge config.yaml → HERMES_TIMEZONE env var.
@@ -1554,6 +1556,7 @@ class GatewayRunner:
# blow up on attribute access. # blow up on attribute access.
_running_agents_ts: Dict[str, float] = {} _running_agents_ts: Dict[str, float] = {}
_busy_input_mode: str = "interrupt" _busy_input_mode: str = "interrupt"
_busy_text_mode: str = "interrupt"
_restart_drain_timeout: float = DEFAULT_GATEWAY_RESTART_DRAIN_TIMEOUT _restart_drain_timeout: float = DEFAULT_GATEWAY_RESTART_DRAIN_TIMEOUT
_exit_code: Optional[int] = None _exit_code: Optional[int] = None
_draining: bool = False _draining: bool = False
@@ -1580,6 +1583,7 @@ class GatewayRunner:
self._service_tier = self._load_service_tier() self._service_tier = self._load_service_tier()
self._show_reasoning = self._load_show_reasoning() self._show_reasoning = self._load_show_reasoning()
self._busy_input_mode = self._load_busy_input_mode() self._busy_input_mode = self._load_busy_input_mode()
self._busy_text_mode = self._load_busy_text_mode()
self._restart_drain_timeout = self._load_restart_drain_timeout() self._restart_drain_timeout = self._load_restart_drain_timeout()
self._provider_routing = self._load_provider_routing() self._provider_routing = self._load_provider_routing()
self._fallback_model = self._load_fallback_model() self._fallback_model = self._load_fallback_model()
@@ -2826,6 +2830,17 @@ class GatewayRunner:
return "steer" return "steer"
return "interrupt" return "interrupt"
@staticmethod
def _load_busy_text_mode() -> str:
"""Load normal busy TEXT follow-up behavior from config/env."""
mode = os.getenv("HERMES_GATEWAY_BUSY_TEXT_MODE", "").strip().lower()
if not mode:
cfg = _load_gateway_runtime_config()
mode = str(cfg_get(cfg, "display", "busy_text_mode", default="") or "").strip().lower()
if mode == "interrupt":
return "interrupt"
return "queue"
@staticmethod @staticmethod
def _load_restart_drain_timeout() -> float: def _load_restart_drain_timeout() -> float:
"""Load graceful gateway restart/stop drain timeout in seconds.""" """Load graceful gateway restart/stop drain timeout in seconds."""
@@ -2973,11 +2988,19 @@ class GatewayRunner:
running_agent = self._running_agents.get(session_key) running_agent = self._running_agents.get(session_key)
effective_mode = self._busy_input_mode
busy_text_mode = getattr(self, "_busy_text_mode", "queue")
if (
event.message_type == MessageType.TEXT
and busy_text_mode == "queue"
and effective_mode != "steer"
):
return False
# Steer mode: inject mid-run via running_agent.steer() instead of # Steer mode: inject mid-run via running_agent.steer() instead of
# queueing + interrupting. If the agent isn't running yet # queueing + interrupting. If the agent isn't running yet
# (sentinel) or lacks steer(), or the payload is empty, fall back # (sentinel) or lacks steer(), or the payload is empty, fall back
# to queue semantics so nothing is lost. # to queue semantics so nothing is lost.
effective_mode = self._busy_input_mode
steered = False steered = False
if effective_mode == "steer": if effective_mode == "steer":
steer_text = (event.text or "").strip() steer_text = (event.text or "").strip()
@@ -3002,7 +3025,12 @@ class GatewayRunner:
# successful steer — the text already landed inside the run and # successful steer — the text already landed inside the run and
# must NOT also be replayed as a next-turn user message. # must NOT also be replayed as a next-turn user message.
if not steered: if not steered:
merge_pending_message_event(adapter._pending_messages, session_key, event) merge_pending_message_event(
adapter._pending_messages,
session_key,
event,
merge_text=event.message_type == MessageType.TEXT,
)
is_queue_mode = effective_mode == "queue" is_queue_mode = effective_mode == "queue"
is_steer_mode = effective_mode == "steer" is_steer_mode = effective_mode == "steer"
@@ -3934,6 +3962,7 @@ class GatewayRunner:
adapter.set_fatal_error_handler(self._handle_adapter_fatal_error) adapter.set_fatal_error_handler(self._handle_adapter_fatal_error)
adapter.set_session_store(self.session_store) adapter.set_session_store(self.session_store)
adapter.set_busy_session_handler(self._handle_active_session_busy_message) adapter.set_busy_session_handler(self._handle_active_session_busy_message)
adapter._busy_text_mode = self._busy_text_mode
# Try to connect # Try to connect
logger.info("Connecting to %s...", platform.value) logger.info("Connecting to %s...", platform.value)
@@ -5546,6 +5575,7 @@ class GatewayRunner:
adapter.set_fatal_error_handler(self._handle_adapter_fatal_error) adapter.set_fatal_error_handler(self._handle_adapter_fatal_error)
adapter.set_session_store(self.session_store) adapter.set_session_store(self.session_store)
adapter.set_busy_session_handler(self._handle_active_session_busy_message) adapter.set_busy_session_handler(self._handle_active_session_busy_message)
adapter._busy_text_mode = self._busy_text_mode
success = await self._connect_adapter_with_timeout(adapter, platform) success = await self._connect_adapter_with_timeout(adapter, platform)
if success: if success:

View File

@@ -1,20 +1,10 @@
"""Regression test for #4469. """Regression tests for active-session TEXT follow-up queueing.
When the agent is actively running (session present in When the agent is actively running, rapid text follow-ups should survive as
``adapter._active_sessions``) and the user fires off multiple TEXT one next-turn pending message instead of clobbering each other. In
follow-ups in rapid succession, the previous behaviour was a single-slot ``busy_text_mode=queue`` those active follow-ups first pass through a short
replacement at ``gateway/platforms/base.py``: debounce so bursty multi-message thoughts are merged before the active drain
hands off the next turn.
self._pending_messages[session_key] = event
So three rapid messages ``A``, ``B``, ``C`` arriving while the agent was
still working on the initial turn produced a pending slot containing only
``C``; ``A`` and ``B`` were silently dropped.
The fix routes the follow-up through ``merge_pending_message_event(...,
merge_text=True)`` so TEXT events accumulate into the existing pending
event's text instead of clobbering it. Photo / media bursts continue to
merge through the same helper (they always did).
""" """
from __future__ import annotations from __future__ import annotations
@@ -22,7 +12,7 @@ from __future__ import annotations
import asyncio import asyncio
import sys import sys
import types import types
from unittest.mock import AsyncMock, MagicMock from unittest.mock import AsyncMock, MagicMock, patch
import pytest import pytest
@@ -44,16 +34,27 @@ from gateway.platforms.base import (
BasePlatformAdapter, BasePlatformAdapter,
MessageEvent, MessageEvent,
MessageType, MessageType,
SendResult,
) )
from gateway.session import SessionSource, build_session_key from gateway.session import SessionSource, build_session_key
def _make_event(text: str, chat_id: str = "12345") -> MessageEvent: def _make_event(
text: str,
chat_id: str = "12345",
*,
chat_type: str = "dm",
user_id: str = "u1",
user_name: str | None = None,
thread_id: str | None = None,
) -> MessageEvent:
source = SessionSource( source = SessionSource(
platform=Platform.TELEGRAM, platform=Platform.TELEGRAM,
chat_id=chat_id, chat_id=chat_id,
chat_type="dm", chat_type=chat_type,
user_id="u1", user_id=user_id,
user_name=user_name,
thread_id=thread_id,
) )
return MessageEvent( return MessageEvent(
text=text, text=text,
@@ -63,27 +64,26 @@ def _make_event(text: str, chat_id: str = "12345") -> MessageEvent:
) )
class _DummyAdapter(BasePlatformAdapter): # type: ignore[misc]
async def connect(self):
pass
async def disconnect(self):
pass
async def get_chat_info(self, chat_id):
return None
async def send(self, *args, **kwargs):
return SendResult(success=True, message_id="x")
def _make_initialized_adapter() -> BasePlatformAdapter:
return _DummyAdapter(PlatformConfig(enabled=True, token="***"), Platform.TELEGRAM)
def _make_adapter() -> BasePlatformAdapter: def _make_adapter() -> BasePlatformAdapter:
"""Build a BasePlatformAdapter without running its heavy __init__. """Build a BasePlatformAdapter without running its heavy __init__."""
We only need the bits ``handle_message`` touches on the active-session
path: ``_active_sessions``, ``_pending_messages``,
``_message_handler``, ``_busy_session_handler``, ``config``, ``platform``.
"""
class _DummyAdapter(BasePlatformAdapter): # type: ignore[misc]
async def connect(self):
pass
async def disconnect(self):
pass
async def get_chat_info(self, chat_id):
return None
async def send(self, *args, **kwargs):
return MagicMock(success=True, message_id="x", retryable=False)
adapter = object.__new__(_DummyAdapter) adapter = object.__new__(_DummyAdapter)
adapter.config = PlatformConfig(enabled=True, token="***") adapter.config = PlatformConfig(enabled=True, token="***")
adapter.platform = Platform.TELEGRAM adapter.platform = Platform.TELEGRAM
@@ -100,6 +100,10 @@ def _make_adapter() -> BasePlatformAdapter:
adapter._fatal_error_retryable = True adapter._fatal_error_retryable = True
adapter._fatal_error_handler = None adapter._fatal_error_handler = None
adapter._running = True adapter._running = True
adapter._busy_text_mode = "queue"
adapter._busy_text_debounce_seconds = 0.1
adapter._busy_text_hard_cap_seconds = 1.0
adapter._text_debounce = {}
adapter._auto_tts_default = False adapter._auto_tts_default = False
adapter._auto_tts_enabled_chats = set() adapter._auto_tts_enabled_chats = set()
adapter._auto_tts_disabled_chats = set() adapter._auto_tts_disabled_chats = set()
@@ -107,39 +111,235 @@ def _make_adapter() -> BasePlatformAdapter:
return adapter return adapter
def _debounced_event(adapter: BasePlatformAdapter, session_key: str) -> MessageEvent:
return adapter._text_debounce[session_key].event
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_rapid_text_followups_accumulate_instead_of_replacing(): async def test_rapid_text_followups_accumulate_instead_of_replacing():
"""Three rapid TEXT follow-ups during an active session must all """Rapid TEXT follow-ups must all survive in the pending event."""
survive in ``adapter._pending_messages[session_key].text``."""
adapter = _make_adapter() adapter = _make_adapter()
adapter._busy_text_mode = "" # direct-merge behavior, no debounce
first = _make_event("part one") first = _make_event("part one")
session_key = build_session_key(first.source) session_key = build_session_key(first.source)
# Mark the session as active so subsequent messages take the
# "already running" branch in handle_message.
adapter._active_sessions[session_key] = asyncio.Event() adapter._active_sessions[session_key] = asyncio.Event()
second = _make_event("part two") await adapter.handle_message(_make_event("part two"))
third = _make_event("part three") await adapter.handle_message(_make_event("part three"))
await adapter.handle_message(second)
await adapter.handle_message(third)
# Both rapid follow-ups must be preserved, not just the last one.
pending = adapter._pending_messages[session_key] pending = adapter._pending_messages[session_key]
assert pending.text == "part two\npart three", ( assert pending.text == "part two\npart three"
f"expected accumulated text, got {pending.text!r}" assert not adapter._active_sessions[session_key].is_set()
@pytest.mark.asyncio
async def test_debounce_buffers_rapid_text_then_flushes_to_pending():
adapter = _make_adapter()
adapter._busy_text_debounce_seconds = 0.05
first = _make_event("part one")
session_key = build_session_key(first.source)
adapter._active_sessions[session_key] = asyncio.Event()
await adapter.handle_message(_make_event("part two"))
assert session_key in adapter._text_debounce
assert _debounced_event(adapter, session_key).text == "part two"
assert session_key not in adapter._pending_messages
await adapter.handle_message(_make_event("part three"))
assert _debounced_event(adapter, session_key).text == "part two\npart three"
await asyncio.sleep(0.15)
assert session_key not in adapter._text_debounce
assert adapter._pending_messages[session_key].text == "part two\npart three"
@pytest.mark.asyncio
async def test_debounce_resets_timer_on_new_arrival():
adapter = _make_adapter()
adapter._busy_text_debounce_seconds = 0.1
first = _make_event("one")
session_key = build_session_key(first.source)
adapter._active_sessions[session_key] = asyncio.Event()
await adapter.handle_message(first)
task1 = adapter._text_debounce[session_key].task
assert task1 is not None
assert not task1.done()
await adapter.handle_message(_make_event("two"))
task2 = adapter._text_debounce[session_key].task
assert task2 is not None
assert task2 is not task1
await asyncio.sleep(0)
assert task1.cancelled() or task1.done()
assert adapter._text_debounce[session_key].task is task2
await adapter.handle_message(_make_event("three"))
task3 = adapter._text_debounce[session_key].task
assert task3 is not None
assert task3 is not task2
await asyncio.sleep(0.2)
assert session_key not in adapter._text_debounce
assert adapter._pending_messages[session_key].text == "one\ntwo\nthree"
@pytest.mark.asyncio
async def test_active_drain_force_flushes_debounce_before_release():
adapter = _make_adapter()
adapter._busy_text_debounce_seconds = 1.0
processed: list[str] = []
async def _handler(event):
processed.append(event.text)
if event.text == "current":
await adapter.handle_message(_make_event("follow up"))
return None
adapter._message_handler = _handler
current = _make_event("current")
session_key = build_session_key(current.source)
task = asyncio.create_task(adapter._process_message_background(current, session_key))
adapter._session_tasks[session_key] = task
await asyncio.wait_for(task, timeout=1.0)
for _ in range(20):
if processed == ["current", "follow up"] and session_key not in adapter._active_sessions:
break
await asyncio.sleep(0.05)
assert processed == ["current", "follow up"]
assert session_key not in adapter._text_debounce
assert session_key not in adapter._pending_messages
assert session_key not in adapter._active_sessions
@pytest.mark.asyncio
async def test_force_flush_cancels_timer_without_duplicate_processing():
adapter = _make_adapter()
adapter._busy_text_debounce_seconds = 0.2
event = _make_event("queued once")
session_key = build_session_key(event.source)
adapter._active_sessions[session_key] = asyncio.Event()
await adapter.handle_message(event)
timer_task = adapter._text_debounce[session_key].task
flushed = await adapter._flush_text_debounce_now(session_key)
assert flushed is True
assert session_key not in adapter._text_debounce
assert adapter._pending_messages[session_key].text == "queued once"
await asyncio.sleep(0.3)
assert timer_task is not None
assert timer_task.cancelled() or timer_task.done()
assert adapter._pending_messages[session_key].text == "queued once"
@pytest.mark.asyncio
async def test_text_debounce_does_not_merge_different_senders():
adapter = _make_adapter()
adapter._busy_text_debounce_seconds = 1.0
first = _make_event(
"from alice",
chat_type="group",
user_id="alice",
user_name="Alice",
thread_id="topic-1",
) )
# Interrupt event must be signalled exactly like before. second = _make_event(
assert adapter._active_sessions[session_key].is_set() "from bob",
chat_type="group",
user_id="bob",
user_name="Bob",
thread_id="topic-1",
)
session_key = build_session_key(first.source)
assert session_key == build_session_key(second.source)
adapter._active_sessions[session_key] = asyncio.Event()
await adapter.handle_message(first)
await adapter.handle_message(second)
assert adapter._pending_messages[session_key].text == "from alice"
assert _debounced_event(adapter, session_key).text == "from bob"
@pytest.mark.asyncio
async def test_control_and_clarify_messages_bypass_text_debounce():
adapter = _make_adapter()
started: list[str] = []
def _fake_start(event, session_key, *, interrupt_event=None):
started.append(event.text)
return True
adapter._start_session_processing = _fake_start # type: ignore[method-assign]
await adapter.handle_message(_make_event("/status"))
assert started == ["/status"]
assert adapter._text_debounce == {}
answer = _make_event("clarify answer")
session_key = build_session_key(answer.source)
adapter._active_sessions[session_key] = asyncio.Event()
adapter._message_handler = AsyncMock(return_value=None)
with patch("tools.clarify_gateway.get_pending_for_session", return_value=object()):
await adapter.handle_message(answer)
adapter._message_handler.assert_awaited_once_with(answer)
assert session_key not in adapter._text_debounce
assert session_key not in adapter._pending_messages
@pytest.mark.asyncio
async def test_debounce_skipped_when_busy_text_mode_not_queue():
adapter = _make_adapter()
adapter._busy_text_mode = ""
event = _make_event("direct merge")
session_key = build_session_key(event.source)
adapter._active_sessions[session_key] = asyncio.Event()
await adapter.handle_message(event)
assert adapter._pending_messages[session_key].text == "direct merge"
assert session_key not in adapter._text_debounce
def test_debounce_respects_env_var_override(monkeypatch):
monkeypatch.setenv("HERMES_GATEWAY_BUSY_TEXT_DEBOUNCE_SECONDS", "2.5")
adapter = _make_initialized_adapter()
assert adapter._busy_text_debounce_seconds == 2.5
@pytest.mark.asyncio
async def test_debounce_cleanup_in_cancel_background_tasks():
adapter = _make_adapter()
adapter._busy_text_debounce_seconds = 1.0
event = _make_event("cleanup test")
session_key = build_session_key(event.source)
adapter._active_sessions[session_key] = asyncio.Event()
await adapter.handle_message(event)
assert session_key in adapter._text_debounce
await adapter.cancel_background_tasks()
assert session_key not in adapter._text_debounce
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_single_followup_is_stored_as_is(): async def test_single_followup_is_stored_as_is():
"""One TEXT follow-up still lands as the event object itself
(no spurious wrapping / mutation) — guards against the merge path
breaking the simple case."""
adapter = _make_adapter() adapter = _make_adapter()
adapter._busy_text_mode = ""
first = _make_event("only one") first = _make_event("only one")
session_key = build_session_key(first.source) session_key = build_session_key(first.source)
@@ -149,4 +349,29 @@ async def test_single_followup_is_stored_as_is():
pending = adapter._pending_messages[session_key] pending = adapter._pending_messages[session_key]
assert pending is first assert pending is first
assert pending.text == "only one" assert pending.text == "only one"
assert adapter._active_sessions[session_key].is_set() assert not adapter._active_sessions[session_key].is_set()
def test_adapter_defaults_to_queue_mode(monkeypatch):
monkeypatch.delenv("HERMES_GATEWAY_BUSY_TEXT_MODE", raising=False)
adapter = _make_initialized_adapter()
assert adapter._busy_text_mode == "queue"
assert adapter._is_queue_text_debounce_candidate(_make_event("hello"))
def test_adapter_is_queue_text_debounce_candidate_by_default():
adapter = _make_adapter()
assert adapter._is_queue_text_debounce_candidate(_make_event("hello world"))
def test_command_messages_bypass_debounce_even_in_queue_mode():
adapter = _make_adapter()
assert not adapter._is_queue_text_debounce_candidate(_make_event(""))
assert not adapter._is_queue_text_debounce_candidate(_make_event("/stop"))
def test_busy_text_mode_respects_env_var_override(monkeypatch):
monkeypatch.setenv("HERMES_GATEWAY_BUSY_TEXT_MODE", "interrupt")
adapter = _make_initialized_adapter()
assert adapter._busy_text_mode == "interrupt"
assert not adapter._is_queue_text_debounce_candidate(_make_event("test"))

View File

@@ -15,6 +15,7 @@ from gateway.session import SessionSource, build_session_key
class DummyTelegramAdapter(BasePlatformAdapter): class DummyTelegramAdapter(BasePlatformAdapter):
def __init__(self): def __init__(self):
super().__init__(PlatformConfig(enabled=True, token="fake-token"), Platform.TELEGRAM) super().__init__(PlatformConfig(enabled=True, token="fake-token"), Platform.TELEGRAM)
self._busy_text_mode = ""
self.sent = [] self.sent = []
self.typing = [] self.typing = []
self.processing_hooks = [] self.processing_hooks = []

View File

@@ -65,6 +65,7 @@ def _make_runner():
runner._pending_messages = {} runner._pending_messages = {}
runner._busy_ack_ts = {} runner._busy_ack_ts = {}
runner._draining = False runner._draining = False
runner._busy_text_mode = "interrupt"
runner.adapters = {} runner.adapters = {}
runner.config = MagicMock() runner.config = MagicMock()
runner.session_store = None runner.session_store = None
@@ -84,6 +85,8 @@ def _make_adapter(platform_val="telegram"):
adapter.config = MagicMock() adapter.config = MagicMock()
adapter.config.extra = {} adapter.config.extra = {}
adapter.platform = MagicMock(value=platform_val) adapter.platform = MagicMock(value=platform_val)
adapter._text_debounce = {}
adapter._busy_text_debounce_seconds = 0.6
return adapter return adapter
@@ -186,6 +189,32 @@ class TestBusySessionAck:
assert "respond once the current task finishes" in content assert "respond once the current task finishes" in content
assert "Interrupting" not in content assert "Interrupting" not in content
@pytest.mark.asyncio
async def test_busy_text_mode_queue_delegates_to_adapter_handle_message(self):
"""busy_text_mode=queue lets the adapter debounce text silently."""
runner, sentinel = _make_runner()
runner._busy_input_mode = "interrupt"
runner._busy_text_mode = "queue"
adapter = _make_adapter()
first = _make_event(text="part one")
second = _make_event(text="part two")
sk = build_session_key(first.source)
agent = MagicMock()
runner._running_agents[sk] = agent
runner.adapters[first.source.platform] = adapter
runner.adapters[second.source.platform] = adapter
result1 = await runner._handle_active_session_busy_message(first, sk)
result2 = await runner._handle_active_session_busy_message(second, sk)
assert result1 is False
assert result2 is False
assert sk not in adapter._pending_messages
agent.interrupt.assert_not_called()
adapter._send_with_retry.assert_not_called()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_steer_mode_calls_agent_steer_no_interrupt_no_queue(self): async def test_steer_mode_calls_agent_steer_no_interrupt_no_queue(self):
"""busy_input_mode='steer' injects via agent.steer() and skips queueing.""" """busy_input_mode='steer' injects via agent.steer() and skips queueing."""

View File

@@ -47,6 +47,7 @@ def _make_adapter():
"""Create a minimal adapter for testing the active-session guard.""" """Create a minimal adapter for testing the active-session guard."""
config = PlatformConfig(enabled=True, token="test-token") config = PlatformConfig(enabled=True, token="test-token")
adapter = _StubAdapter(config, Platform.TELEGRAM) adapter = _StubAdapter(config, Platform.TELEGRAM)
adapter._busy_text_mode = ""
adapter.sent_responses = [] adapter.sent_responses = []
async def _mock_handler(event): async def _mock_handler(event):

View File

@@ -45,6 +45,7 @@ def _run_gateway_import(hermes_home: Path, initial_env: dict[str, str]) -> dict[
"HERMES_AGENT_TIMEOUT", "HERMES_AGENT_TIMEOUT",
"HERMES_AGENT_TIMEOUT_WARNING", "HERMES_AGENT_TIMEOUT_WARNING",
"HERMES_GATEWAY_BUSY_INPUT_MODE", "HERMES_GATEWAY_BUSY_INPUT_MODE",
"HERMES_GATEWAY_BUSY_TEXT_MODE",
"HERMES_TIMEZONE", "HERMES_TIMEZONE",
): ):
v = os.environ.get(k) v = os.environ.get(k)
@@ -143,6 +144,15 @@ def test_config_display_busy_input_mode_wins_over_stale_env(hermes_home: Path) -
assert env.get("HERMES_GATEWAY_BUSY_INPUT_MODE") == "interrupt" assert env.get("HERMES_GATEWAY_BUSY_INPUT_MODE") == "interrupt"
def test_config_display_busy_text_mode_wins_over_stale_env(hermes_home: Path) -> None:
_write_config(hermes_home, display_cfg={"busy_text_mode": "queue"})
_write_env(hermes_home, {"HERMES_GATEWAY_BUSY_TEXT_MODE": "interrupt"})
env = _run_gateway_import(hermes_home, initial_env={})
assert env.get("HERMES_GATEWAY_BUSY_TEXT_MODE") == "queue"
def test_config_timezone_wins_over_stale_env(hermes_home: Path) -> None: def test_config_timezone_wins_over_stale_env(hermes_home: Path) -> None:
_write_config(hermes_home, timezone="America/Los_Angeles") _write_config(hermes_home, timezone="America/Los_Angeles")
_write_env(hermes_home, {"HERMES_TIMEZONE": "UTC"}) _write_env(hermes_home, {"HERMES_TIMEZONE": "UTC"})

View File

@@ -103,6 +103,7 @@ class TestInterruptKeyConsistency:
async def test_handle_message_stores_under_session_key(self): async def test_handle_message_stores_under_session_key(self):
"""handle_message stores pending messages under session_key, not chat_id.""" """handle_message stores pending messages under session_key, not chat_id."""
adapter = StubAdapter() adapter = StubAdapter()
adapter._busy_text_mode = ""
adapter.set_message_handler(lambda event: asyncio.sleep(0, result=None)) adapter.set_message_handler(lambda event: asyncio.sleep(0, result=None))
source = _source("-1001234", "group") source = _source("-1001234", "group")
@@ -120,8 +121,8 @@ class TestInterruptKeyConsistency:
# NOT stored under chat_id # NOT stored under chat_id
assert source.chat_id not in adapter._pending_messages assert source.chat_id not in adapter._pending_messages
# Interrupt event was set # Text follow-ups queue silently and do not interrupt the active turn.
assert adapter._active_sessions[session_key].is_set() assert adapter._active_sessions[session_key].is_set() is False
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_photo_followup_is_queued_without_interrupt(self): async def test_photo_followup_is_queued_without_interrupt(self):

View File

@@ -116,6 +116,24 @@ def test_load_busy_input_mode_prefers_env_then_config_then_default(tmp_path, mon
assert gateway_run.GatewayRunner._load_busy_input_mode() == "interrupt" assert gateway_run.GatewayRunner._load_busy_input_mode() == "interrupt"
def test_load_busy_text_mode_defaults_to_queue_and_allows_interrupt(tmp_path, monkeypatch):
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
monkeypatch.delenv("HERMES_GATEWAY_BUSY_TEXT_MODE", raising=False)
assert gateway_run.GatewayRunner._load_busy_text_mode() == "queue"
(tmp_path / "config.yaml").write_text(
"display:\n busy_text_mode: interrupt\n", encoding="utf-8"
)
assert gateway_run.GatewayRunner._load_busy_text_mode() == "interrupt"
monkeypatch.setenv("HERMES_GATEWAY_BUSY_TEXT_MODE", "queue")
assert gateway_run.GatewayRunner._load_busy_text_mode() == "queue"
monkeypatch.setenv("HERMES_GATEWAY_BUSY_TEXT_MODE", "bogus")
assert gateway_run.GatewayRunner._load_busy_text_mode() == "queue"
def test_load_restart_drain_timeout_prefers_env_then_config_then_default( def test_load_restart_drain_timeout_prefers_env_then_config_then_default(
tmp_path, monkeypatch, caplog tmp_path, monkeypatch, caplog
): ):

View File

@@ -53,6 +53,7 @@ class _StubAdapter(BasePlatformAdapter):
def _make_adapter(): def _make_adapter():
config = PlatformConfig(enabled=True, token="test-token") config = PlatformConfig(enabled=True, token="test-token")
adapter = _StubAdapter(config, Platform.TELEGRAM) adapter = _StubAdapter(config, Platform.TELEGRAM)
adapter._busy_text_mode = ""
adapter.sent_responses = [] adapter.sent_responses = []
async def _mock_send_retry(chat_id, content, **kwargs): async def _mock_send_retry(chat_id, content, **kwargs):
@@ -396,4 +397,3 @@ class TestOldTaskCannotClobberNewerGuard:
# default path) still work. # default path) still work.
adapter._release_session_guard(sk) adapter._release_session_guard(sk)
assert sk not in adapter._active_sessions assert sk not in adapter._active_sessions