feat(gateway): auto-delete slash-command system notices after TTL (#18266)

Adds opt-in auto-deletion for slash-command reply messages like
"New session started!", "Restarting gateway…", "Stopped.", and
YOLO toggles.  After the TTL elapses the gateway calls the adapter's
delete_message; on platforms without a delete API (everything except
Telegram today) the TTL is silently ignored and the message stays.

Requested on Twitter by @charlesmcdowell — tool-call bubbles are useful
real-time, but system notices clutter the thread once the agent finishes.

Implementation:

- EphemeralReply(str) sentinel in gateway/platforms/base.py.  Subclasses
  str so existing 'X' in response / response.startswith(...) checks in
  tests and call sites keep working unchanged; isinstance() still
  distinguishes it for the send path.
- _process_message_background and both busy-session bypass paths
  (in base.py) call _unwrap_ephemeral() on the handler return, send
  the unwrapped text, and schedule a detached delete task when the
  TTL > 0 AND the adapter class overrides delete_message.
- display.ephemeral_system_ttl (default 0 = disabled) in DEFAULT_CONFIG.
  Handler can pass ttl_seconds explicitly to override.
- Wrapped the highest-noise return sites: /new, /reset, /stop,
  /yolo on/off, /restart success + "already in progress".  Draining
  notices and /help output left as plain strings — those are
  informational and users want to read them.

Backward-compat: default TTL 0 → no scheduling, no behavior change
for existing users.  Platforms without delete_message silently no-op.
This commit is contained in:
Teknium
2026-04-30 23:05:48 -07:00
committed by GitHub
parent e2eb561e8e
commit 4caad285a6
4 changed files with 530 additions and 26 deletions

View File

@@ -416,7 +416,7 @@ def is_host_excluded_by_no_proxy(hostname: str, no_proxy_value: str | None = Non
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Any, Callable, Awaitable, Tuple
from typing import Dict, List, Optional, Any, Callable, Awaitable, Tuple, Union
from enum import Enum
from pathlib import Path as _Path
@@ -981,7 +981,7 @@ def coerce_plaintext_gateway_command(event: "MessageEvent") -> None:
return
@dataclass
@dataclass
class SendResult:
"""Result of sending a message."""
success: bool
@@ -991,6 +991,45 @@ class SendResult:
retryable: bool = False # True for transient connection errors — base will retry automatically
class EphemeralReply(str):
"""System-notice reply that auto-deletes after a TTL.
Slash-command handlers in ``gateway/run.py`` can return this wrapper
instead of a plain string to request that the reply message be deleted
after ``ttl_seconds`` on platforms that support ``delete_message``.
Subclassing ``str`` keeps the wrapper transparent to anything that
treats handler return values as text (existing tests use ``in`` /
``startswith`` / equality; the ``_process_message_background`` pipeline
extracts attachments from the string content). ``isinstance(r,
EphemeralReply)`` still distinguishes ephemeral replies from plain
strings so the send path can schedule deletion.
Platforms that don't override :meth:`BasePlatformAdapter.delete_message`
silently ignore the TTL — the message is sent normally and left in
place. When ``ttl_seconds`` is ``None``, the pipeline uses the
configured ``display.ephemeral_system_ttl`` default. A default of ``0``
disables auto-deletion globally, preserving prior behavior.
"""
ttl_seconds: Optional[int]
def __new__(cls, text: str, ttl_seconds: Optional[int] = None):
instance = super().__new__(cls, text)
instance.ttl_seconds = ttl_seconds
return instance
@property
def text(self) -> str:
"""Return the underlying text.
Provided for call sites that want an explicit string conversion,
though ``str(reply)`` and using ``reply`` directly where a string
is expected both work identically.
"""
return str.__str__(self)
def merge_pending_message_event(
pending_messages: Dict[str, MessageEvent],
session_key: str,
@@ -1073,8 +1112,10 @@ _RETRYABLE_ERROR_PATTERNS = (
)
# Type for message handlers
MessageHandler = Callable[[MessageEvent], Awaitable[Optional[str]]]
# Type for message handlers. Handlers may return a plain string (normal
# reply), an ``EphemeralReply`` to opt the reply into auto-deletion, or
# ``None`` when the response was already delivered (e.g. via streaming).
MessageHandler = Callable[[MessageEvent], Awaitable[Optional[Union[str, "EphemeralReply"]]]]
def resolve_channel_prompt(
@@ -1459,6 +1500,64 @@ class BasePlatformAdapter(ABC):
"""
return False
def _get_ephemeral_system_ttl_default(self) -> int:
"""Read ``display.ephemeral_system_ttl`` from config.
Returns the TTL in seconds to use when an :class:`EphemeralReply`
does not specify one explicitly. ``0`` (the default) disables
auto-deletion. Non-fatal if config is unreadable.
"""
try:
from hermes_cli.config import load_config as _load_config
except Exception:
return 0
try:
cfg = _load_config()
except Exception:
return 0
display = cfg.get("display", {}) if isinstance(cfg, dict) else {}
if not isinstance(display, dict):
return 0
raw = display.get("ephemeral_system_ttl", 0)
try:
return int(raw)
except (TypeError, ValueError):
return 0
def _schedule_ephemeral_delete(
self,
chat_id: str,
message_id: str,
ttl_seconds: int,
) -> None:
"""Spawn a detached task that deletes ``message_id`` after ``ttl_seconds``.
Best-effort — failures (gateway restart, permission denied, message
too old for Telegram's 48h window) are swallowed at debug level.
Does not block the caller.
"""
async def _run_delete() -> None:
try:
await asyncio.sleep(max(1, int(ttl_seconds)))
await self.delete_message(chat_id=chat_id, message_id=message_id)
except asyncio.CancelledError:
raise
except Exception as e:
logger.debug(
"[%s] Ephemeral delete failed for %s/%s: %s",
self.name, chat_id, message_id, e,
)
coro = _run_delete()
try:
asyncio.create_task(coro)
except RuntimeError:
# No running loop (e.g. unit tests that never reach the async
# path). Close the coroutine cleanly so Python doesn't warn
# about it never being awaited, then drop silently.
coro.close()
async def send_slash_confirm(
self,
chat_id: str,
@@ -2048,6 +2147,28 @@ class BasePlatformAdapter(ABC):
lowered = error.lower()
return "timed out" in lowered or "readtimeout" in lowered or "writetimeout" in lowered
def _unwrap_ephemeral(self, response: Any) -> Tuple[Optional[str], int]:
"""Unwrap a handler response into (text, ttl_seconds).
Accepts a plain string, ``None``, or an :class:`EphemeralReply`.
Returns ``(text, ttl)`` where ``ttl > 0`` means the caller should
schedule a deletion via :meth:`_schedule_ephemeral_delete` after
the send succeeds. ``ttl`` is forced to 0 when the adapter
doesn't override :meth:`delete_message` so non-supporting
platforms silently degrade to normal sends.
"""
if isinstance(response, EphemeralReply):
ttl = response.ttl_seconds
if ttl is None:
try:
ttl = int(self._get_ephemeral_system_ttl_default())
except Exception:
ttl = 0
if ttl and ttl > 0 and type(self).delete_message is BasePlatformAdapter.delete_message:
ttl = 0
return response.text, int(ttl or 0)
return response, 0
async def _send_with_retry(
self,
chat_id: str,
@@ -2355,13 +2476,20 @@ class BasePlatformAdapter(ABC):
release_guard=False,
discard_pending=False,
)
if response:
await self._send_with_retry(
_text, _eph_ttl = self._unwrap_ephemeral(response)
if _text:
_r = await self._send_with_retry(
chat_id=event.source.chat_id,
content=response,
content=_text,
reply_to=event.message_id,
metadata=thread_meta,
)
if _eph_ttl > 0 and _r.success and _r.message_id:
self._schedule_ephemeral_delete(
chat_id=event.source.chat_id,
message_id=_r.message_id,
ttl_seconds=_eph_ttl,
)
except Exception:
# On failure, restore the original guard if one still exists so
# we don't leave the session in a half-reset state.
@@ -2441,13 +2569,20 @@ class BasePlatformAdapter(ABC):
try:
_thread_meta = {"thread_id": event.source.thread_id} if event.source.thread_id else None
response = await self._message_handler(event)
if response:
await self._send_with_retry(
_text, _eph_ttl = self._unwrap_ephemeral(response)
if _text:
_r = await self._send_with_retry(
chat_id=event.source.chat_id,
content=response,
content=_text,
reply_to=event.message_id,
metadata=_thread_meta,
)
if _eph_ttl > 0 and _r.success and _r.message_id:
self._schedule_ephemeral_delete(
chat_id=event.source.chat_id,
message_id=_r.message_id,
ttl_seconds=_eph_ttl,
)
except Exception as e:
logger.error("[%s] Command '/%s' dispatch failed: %s", self.name, cmd, e, exc_info=True)
return
@@ -2553,7 +2688,16 @@ class BasePlatformAdapter(ABC):
# Call the handler (this can take a while with tool calls)
response = await self._message_handler(event)
# Slash-command handlers may return an EphemeralReply sentinel to
# request that their reply message auto-delete after a TTL (used
# for system notices like "✨ New session started!" that the user
# doesn't need to keep in the thread). Unwrap here so all the
# downstream extract_media / text-processing logic sees a plain
# string, and remember the TTL + platform capability so the
# post-send block can schedule the deletion.
response, _ephemeral_ttl = self._unwrap_ephemeral(response)
# Send response if any. A None/empty response is normal when
# streaming already delivered the text (already_sent=True) or
# when the message was queued behind an active agent. Log at
@@ -2642,6 +2786,21 @@ class BasePlatformAdapter(ABC):
)
_record_delivery(result)
# Schedule auto-deletion of system-notice replies.
# Detached so the handler returns immediately; errors
# (permission denied, message too old) are swallowed.
if (
_ephemeral_ttl
and _ephemeral_ttl > 0
and result.success
and result.message_id
):
self._schedule_ephemeral_delete(
chat_id=event.source.chat_id,
message_id=result.message_id,
ttl_seconds=_ephemeral_ttl,
)
# Human-like pacing delay between text and media
human_delay = self._get_human_delay()

View File

@@ -29,7 +29,7 @@ from collections import OrderedDict
from contextvars import copy_context
from pathlib import Path
from datetime import datetime
from typing import Dict, Optional, Any, List
from typing import Dict, Optional, Any, List, Union
# account_usage imports the OpenAI SDK chain (~230 ms). Only needed by
# /usage; we still import it at module top in the gateway because test
@@ -454,6 +454,7 @@ from gateway.session import (
from gateway.delivery import DeliveryRouter
from gateway.platforms.base import (
BasePlatformAdapter,
EphemeralReply,
MessageEvent,
MessageType,
merge_pending_message_event,
@@ -4472,7 +4473,7 @@ class GatewayRunner:
invalidation_reason="stop_command",
)
logger.info("STOP for session %s — agent interrupted, session lock released", _quick_key)
return "⚡ Stopped. You can continue this session."
return EphemeralReply("⚡ Stopped. You can continue this session.")
# /reset and /new must bypass the running-agent guard so they
# actually dispatch as commands instead of being queued as user
@@ -4677,7 +4678,7 @@ class GatewayRunner:
# Force-clean the sentinel so the session is unlocked.
self._release_running_agent_state(_quick_key)
logger.info("HARD STOP (pending) for session %s — sentinel cleared", _quick_key)
return "⚡ Force-stopped. The agent was still starting — session unlocked."
return EphemeralReply("⚡ Force-stopped. The agent was still starting — session unlocked.")
# Queue the message so it will be picked up after the
# agent starts.
adapter = self.adapters.get(source.platform)
@@ -6353,7 +6354,7 @@ class GatewayRunner:
return "\n".join(lines)
async def _handle_reset_command(self, event: MessageEvent) -> str:
async def _handle_reset_command(self, event: MessageEvent) -> Union[str, EphemeralReply]:
"""Handle /new or /reset command."""
source = event.source
@@ -6464,8 +6465,8 @@ class GatewayRunner:
_tip_line = ""
if session_info:
return f"{header}\n\n{session_info}{_tip_line}"
return f"{header}{_tip_line}"
return EphemeralReply(f"{header}\n\n{session_info}{_tip_line}")
return EphemeralReply(f"{header}{_tip_line}")
async def _handle_profile_command(self, event: MessageEvent) -> str:
"""Handle /profile — show active profile name and home directory."""
@@ -6713,7 +6714,7 @@ class GatewayRunner:
return "\n".join(lines)
async def _handle_stop_command(self, event: MessageEvent) -> str:
async def _handle_stop_command(self, event: MessageEvent) -> Union[str, EphemeralReply]:
"""Handle /stop command - interrupt a running agent.
When an agent is truly hung (blocked thread that never checks
@@ -6738,7 +6739,7 @@ class GatewayRunner:
invalidation_reason="stop_command_pending",
)
logger.info("STOP (pending) for session %s — sentinel cleared", session_key)
return "⚡ Stopped. The agent hadn't started yet — you can continue this session."
return EphemeralReply("⚡ Stopped. The agent hadn't started yet — you can continue this session.")
if agent:
# Force-clean the session lock so a truly hung agent doesn't
# keep it locked forever.
@@ -6748,11 +6749,11 @@ class GatewayRunner:
interrupt_reason=_INTERRUPT_REASON_STOP,
invalidation_reason="stop_command_handler",
)
return "⚡ Stopped. You can continue this session."
return EphemeralReply("⚡ Stopped. You can continue this session.")
else:
return "No active task to stop."
async def _handle_restart_command(self, event: MessageEvent) -> str:
async def _handle_restart_command(self, event: MessageEvent) -> Union[str, EphemeralReply]:
"""Handle /restart command - drain active work, then restart the gateway."""
# Defensive idempotency check: if the previous gateway process
# recorded this same /restart (same platform + update_id) and the new
@@ -6778,7 +6779,7 @@ class GatewayRunner:
count = self._running_agent_count()
if count:
return f"⏳ Draining {count} active agent(s) before restart..."
return "⏳ Gateway restart already in progress..."
return EphemeralReply("⏳ Gateway restart already in progress...")
# Save the requester's routing info so the new gateway process can
# notify them once it comes back online.
@@ -6830,7 +6831,7 @@ class GatewayRunner:
self.request_restart(detached=True, via_service=False)
if active_agents:
return f"⏳ Draining {active_agents} active agent(s) before restart..."
return "♻ Restarting gateway. If you aren't notified within 60 seconds, restart from the console with `hermes gateway restart`."
return EphemeralReply("♻ Restarting gateway. If you aren't notified within 60 seconds, restart from the console with `hermes gateway restart`.")
def _is_stale_restart_redelivery(self, event: MessageEvent) -> bool:
"""Return True if this /restart is a Telegram re-delivery we already handled.
@@ -8321,7 +8322,7 @@ class GatewayRunner:
return f"⚡ ✓ Priority Processing: **{label}** (saved to config)\n_(takes effect on next message)_"
return f"⚡ ✓ Priority Processing: **{label}** (this session only)"
async def _handle_yolo_command(self, event: MessageEvent) -> str:
async def _handle_yolo_command(self, event: MessageEvent) -> Union[str, EphemeralReply]:
"""Handle /yolo — toggle dangerous command approval bypass for this session only."""
from tools.approval import (
disable_session_yolo,
@@ -8333,10 +8334,10 @@ class GatewayRunner:
current = is_session_yolo_enabled(session_key)
if current:
disable_session_yolo(session_key)
return "⚠️ YOLO mode **OFF** for this session — dangerous commands will require approval."
return EphemeralReply("⚠️ YOLO mode **OFF** for this session — dangerous commands will require approval.")
else:
enable_session_yolo(session_key)
return "⚡ YOLO mode **ON** for this session — all commands auto-approved. Use with caution."
return EphemeralReply("⚡ YOLO mode **ON** for this session — all commands auto-approved. Use with caution.")
async def _handle_verbose_command(self, event: MessageEvent) -> str:
"""Handle /verbose command — cycle tool progress display mode.

View File

@@ -775,6 +775,14 @@ DEFAULT_CONFIG = {
"tool_progress_command": False, # Enable /verbose command in messaging gateway
"tool_progress_overrides": {}, # DEPRECATED — use display.platforms instead
"tool_preview_length": 0, # Max chars for tool call previews (0 = no limit, show full paths/commands)
# Auto-delete system-notice replies (e.g. "✨ New session started!",
# "♻ Restarting gateway…", "⚡ Stopped…") after N seconds on platforms
# that support message deletion (currently Telegram; other platforms
# ignore and leave the message in place). Only affects slash-command
# replies wrapped with gateway.platforms.base.EphemeralReply — agent
# responses and content messages are never touched. Default 0
# (disabled) preserves prior behavior.
"ephemeral_system_ttl": 0,
"platforms": {}, # Per-platform display overrides: {"telegram": {"tool_progress": "all"}, "slack": {"tool_progress": "off"}}
# Gateway runtime-metadata footer appended to the FINAL message of a turn
# (disabled by default to keep replies minimal). When enabled, renders

View File

@@ -0,0 +1,336 @@
"""Tests for EphemeralReply — system-notice auto-delete in gateway adapters.
Slash-command handlers in ``gateway/run.py`` can return an
``EphemeralReply`` wrapper to request auto-deletion of the reply message
after a TTL. The base adapter unwraps the sentinel before sending and
schedules a detached delete task when the platform supports
``delete_message``.
Covered:
1. ``_unwrap_ephemeral`` returns text + ttl for EphemeralReply, and
passes plain strings through unchanged.
2. TTL is zeroed on platforms that don't override ``delete_message``
(silent degrade — message stays in place).
3. TTL is honored on platforms that DO override ``delete_message``.
4. ``_schedule_ephemeral_delete`` invokes ``delete_message`` after the
configured delay with the correct chat_id / message_id.
5. ``_process_message_background`` sends the unwrapped text (not the
sentinel object) and schedules deletion when appropriate.
6. The two busy-session bypass paths also unwrap + schedule.
"""
import asyncio
from unittest.mock import AsyncMock, patch
import pytest
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import (
BasePlatformAdapter,
EphemeralReply,
MessageEvent,
MessageType,
SendResult,
)
from gateway.session import SessionSource
class _NoDeleteAdapter(BasePlatformAdapter):
"""Adapter that does NOT override delete_message (silent degrade)."""
async def connect(self):
pass
async def disconnect(self):
pass
async def send(self, chat_id, content="", **kwargs):
return SendResult(success=True, message_id="m-1")
async def get_chat_info(self, chat_id):
return {}
class _DeleteCapableAdapter(BasePlatformAdapter):
"""Adapter that overrides delete_message (TTL honored)."""
def __init__(self, *a, **kw):
super().__init__(*a, **kw)
self.deleted: list[tuple[str, str]] = []
async def connect(self):
pass
async def disconnect(self):
pass
async def send(self, chat_id, content="", **kwargs):
return SendResult(success=True, message_id="m-2")
async def get_chat_info(self, chat_id):
return {}
async def delete_message(self, chat_id: str, message_id: str) -> bool:
self.deleted.append((chat_id, message_id))
return True
def _no_delete_adapter():
return _NoDeleteAdapter(
PlatformConfig(enabled=True, token="t"), Platform.TELEGRAM
)
def _delete_adapter():
return _DeleteCapableAdapter(
PlatformConfig(enabled=True, token="t"), Platform.TELEGRAM
)
def _make_event(text="/stop", chat_id="42"):
return MessageEvent(
text=text,
message_id="msg-1",
source=SessionSource(
platform=Platform.TELEGRAM,
chat_id=chat_id,
user_id="u-1",
),
message_type=MessageType.TEXT,
)
# ---------------------------------------------------------------------------
# _unwrap_ephemeral
# ---------------------------------------------------------------------------
def test_unwrap_plain_string_is_passthrough():
adapter = _delete_adapter()
text, ttl = adapter._unwrap_ephemeral("hello")
assert text == "hello"
assert ttl == 0
def test_unwrap_none_is_passthrough():
adapter = _delete_adapter()
text, ttl = adapter._unwrap_ephemeral(None)
assert text is None
assert ttl == 0
def test_unwrap_ephemeral_explicit_ttl_on_capable_adapter():
adapter = _delete_adapter()
text, ttl = adapter._unwrap_ephemeral(EphemeralReply("bye", ttl_seconds=60))
assert text == "bye"
assert ttl == 60
def test_unwrap_ephemeral_zeros_ttl_on_incapable_adapter():
"""Platforms without delete_message should silently degrade to normal send."""
adapter = _no_delete_adapter()
text, ttl = adapter._unwrap_ephemeral(EphemeralReply("bye", ttl_seconds=60))
assert text == "bye"
assert ttl == 0 # forced to 0 — message will stay in place
def test_unwrap_ephemeral_default_ttl_from_config():
adapter = _delete_adapter()
with patch.object(adapter, "_get_ephemeral_system_ttl_default", return_value=120):
text, ttl = adapter._unwrap_ephemeral(EphemeralReply("bye"))
assert text == "bye"
assert ttl == 120
def test_unwrap_ephemeral_default_ttl_zero_disables():
"""Config default of 0 (the shipped default) means the feature is off."""
adapter = _delete_adapter()
with patch.object(adapter, "_get_ephemeral_system_ttl_default", return_value=0):
text, ttl = adapter._unwrap_ephemeral(EphemeralReply("bye"))
assert text == "bye"
assert ttl == 0
def test_unwrap_ephemeral_handles_unreadable_config():
adapter = _delete_adapter()
with patch.object(
adapter,
"_get_ephemeral_system_ttl_default",
side_effect=RuntimeError("boom"),
):
text, ttl = adapter._unwrap_ephemeral(EphemeralReply("bye"))
# Fall back to 0 rather than crashing the handler pipeline.
assert text == "bye"
assert ttl == 0
# ---------------------------------------------------------------------------
# _schedule_ephemeral_delete
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_schedule_ephemeral_delete_calls_delete_after_ttl():
adapter = _delete_adapter()
# Use a very short TTL to keep the test fast — the implementation
# floors sleeps at 1s via ``max(1, int(ttl_seconds))``. Patch asyncio.sleep
# inside the module under test; the test body uses the real one for
# scheduler pumping.
import gateway.platforms.base as base_module
sleeps: list[float] = []
_real_sleep = base_module.asyncio.sleep
async def _fake_sleep(duration):
sleeps.append(duration)
# Yield control so the rest of the task body can run.
await _real_sleep(0)
with patch.object(base_module.asyncio, "sleep", _fake_sleep):
adapter._schedule_ephemeral_delete(
chat_id="42", message_id="m-2", ttl_seconds=5
)
# Let the spawned task run.
for _ in range(5):
await _real_sleep(0)
# Only the ttl sleep shows up — the test pump uses the real sleep.
assert 5 in sleeps
assert adapter.deleted == [("42", "m-2")]
@pytest.mark.asyncio
async def test_schedule_ephemeral_delete_swallows_errors():
adapter = _delete_adapter()
async def _boom(*a, **kw):
raise RuntimeError("permission denied")
adapter.delete_message = _boom # type: ignore[assignment]
with patch("gateway.platforms.base.asyncio.sleep", AsyncMock()):
adapter._schedule_ephemeral_delete(
chat_id="42", message_id="m-2", ttl_seconds=1
)
# No exception should propagate even though delete_message raised.
for _ in range(5):
await asyncio.sleep(0)
def test_schedule_ephemeral_delete_outside_event_loop_is_noop():
"""No running loop → no crash, silently drops the request."""
adapter = _delete_adapter()
# No pytest.mark.asyncio → no loop. Must not raise.
adapter._schedule_ephemeral_delete(
chat_id="42", message_id="m-2", ttl_seconds=1
)
assert adapter.deleted == []
# ---------------------------------------------------------------------------
# _process_message_background unwraps EphemeralReply before send
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_process_message_unwraps_ephemeral_before_send():
"""The adapter must send the wrapper's .text, never the wrapper object."""
adapter = _delete_adapter()
adapter._send_with_retry = AsyncMock(
return_value=SendResult(success=True, message_id="sent-1")
)
async def _handler(evt):
return EphemeralReply("⚡ Stopped.", ttl_seconds=5)
adapter.set_message_handler(_handler)
sleeps: list[float] = []
async def _fake_sleep(duration):
sleeps.append(duration)
event = _make_event()
session_key = "agent:main:telegram:private:42"
with patch("gateway.platforms.base.asyncio.sleep", _fake_sleep), patch.object(
adapter, "_keep_typing", new=AsyncMock()
):
await adapter._process_message_background(event, session_key)
# Pump until the detached delete task completes.
for _ in range(10):
await asyncio.sleep(0)
# Sent text is the unwrapped string, NOT repr(EphemeralReply(...))
adapter._send_with_retry.assert_called_once()
sent_text = adapter._send_with_retry.call_args.kwargs["content"]
assert sent_text == "⚡ Stopped."
# Auto-delete scheduled using the returned message_id
assert ("42", "sent-1") in adapter.deleted
@pytest.mark.asyncio
async def test_process_message_incapable_platform_does_not_schedule_delete():
adapter = _no_delete_adapter()
adapter._send_with_retry = AsyncMock(
return_value=SendResult(success=True, message_id="sent-1")
)
async def _handler(evt):
return EphemeralReply("⚡ Stopped.", ttl_seconds=5)
adapter.set_message_handler(_handler)
# Spy on delete_message to confirm it is NOT invoked.
delete_calls: list = []
async def _spy_delete(chat_id, message_id):
delete_calls.append((chat_id, message_id))
return False
adapter.delete_message = _spy_delete # type: ignore[assignment]
event = _make_event()
session_key = "agent:main:telegram:private:42"
with patch("gateway.platforms.base.asyncio.sleep", AsyncMock()), patch.object(
adapter, "_keep_typing", new=AsyncMock()
):
await adapter._process_message_background(event, session_key)
for _ in range(10):
await asyncio.sleep(0)
# Send happened with the unwrapped text...
adapter._send_with_retry.assert_called_once()
assert adapter._send_with_retry.call_args.kwargs["content"] == "⚡ Stopped."
# ...but delete was never scheduled because the capability check skipped
# the schedule call (TTL was zeroed in _unwrap_ephemeral).
# Note: the capability gate on _unwrap_ephemeral checks for
# ``type(adapter).delete_message is BasePlatformAdapter.delete_message``.
# Monkeypatching the instance does NOT change the class, so this test
# verifies the gate uses the class method to detect capability.
assert delete_calls == []
@pytest.mark.asyncio
async def test_process_message_plain_string_behaves_unchanged():
adapter = _delete_adapter()
adapter._send_with_retry = AsyncMock(
return_value=SendResult(success=True, message_id="sent-1")
)
async def _handler(evt):
return "plain reply"
adapter.set_message_handler(_handler)
event = _make_event()
session_key = "agent:main:telegram:private:42"
with patch("gateway.platforms.base.asyncio.sleep", AsyncMock()), patch.object(
adapter, "_keep_typing", new=AsyncMock()
):
await adapter._process_message_background(event, session_key)
for _ in range(5):
await asyncio.sleep(0)
adapter._send_with_retry.assert_called_once()
assert adapter._send_with_retry.call_args.kwargs["content"] == "plain reply"
assert adapter.deleted == [] # no auto-delete for plain replies