fix: Signal duplicate replies with streaming + per-platform tool_progress (#6348)
Fixes #4647 — Signal replies duplicated when gateway streaming is enabled. Root cause: stream_consumer.py did not handle the case where send() returns success=True but no message_id (Signal behavior). Every stream delta produced a separate send() call (7+ messages instead of 2), plus the gateway sent another full duplicate since already_sent was never set. Changes: - stream_consumer.py: Add elif branch for success-without-message_id — enters fallback mode (sets already_sent, disables editing, sends only continuation) - signal.py send(): Extract timestamp from signal-cli RPC result as message_id so stream consumer follows normal edit→fallback path - signal.py: Add public stop_typing() delegating to _stop_typing_indicator() so base adapter's _keep_typing finally block can clean up typing tasks - gateway/run.py: Per-platform tool_progress_overrides (#6164) — lets users set e.g. signal: off while keeping telegram: all - hermes_cli/config.py: Add tool_progress_overrides to DEFAULT_CONFIG Refs: #4647, #6164
This commit is contained in:
@@ -647,7 +647,11 @@ class SignalAdapter(BasePlatformAdapter):
|
||||
|
||||
if result is not None:
|
||||
self._track_sent_timestamp(result)
|
||||
return SendResult(success=True)
|
||||
# Use the timestamp from the RPC result as a pseudo message_id.
|
||||
# Signal doesn't have real message IDs, but the stream consumer
|
||||
# needs a truthy value to follow its edit→fallback path correctly.
|
||||
_msg_id = str(result.get("timestamp", "")) if isinstance(result, dict) else None
|
||||
return SendResult(success=True, message_id=_msg_id or None)
|
||||
return SendResult(success=False, error="RPC send failed")
|
||||
|
||||
def _track_sent_timestamp(self, rpc_result) -> None:
|
||||
@@ -837,6 +841,11 @@ class SignalAdapter(BasePlatformAdapter):
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
async def stop_typing(self, chat_id: str) -> None:
|
||||
"""Public interface for stopping typing — called by base adapter's
|
||||
_keep_typing finally block to clean up platform-level typing tasks."""
|
||||
await self._stop_typing_indicator(chat_id)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Chat Info
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@@ -6308,7 +6308,15 @@ class GatewayRunner:
|
||||
# Falls back to env vars for backward compatibility.
|
||||
# YAML 1.1 parses bare `off` as boolean False — normalise before
|
||||
# the `or` chain so it doesn't silently fall through to "all".
|
||||
_raw_tp = user_config.get("display", {}).get("tool_progress")
|
||||
#
|
||||
# Per-platform overrides (display.tool_progress_overrides) take
|
||||
# priority over the global setting — e.g. Signal users can set
|
||||
# tool_progress to "off" while keeping Telegram on "all".
|
||||
_display_cfg = user_config.get("display", {})
|
||||
_overrides = _display_cfg.get("tool_progress_overrides", {})
|
||||
_raw_tp = _overrides.get(platform_key)
|
||||
if _raw_tp is None:
|
||||
_raw_tp = _display_cfg.get("tool_progress")
|
||||
if _raw_tp is False:
|
||||
_raw_tp = "off"
|
||||
progress_mode = (
|
||||
|
||||
@@ -353,6 +353,17 @@ class GatewayStreamConsumer:
|
||||
self._message_id = result.message_id
|
||||
self._already_sent = True
|
||||
self._last_sent_text = text
|
||||
elif result.success:
|
||||
# Platform accepted the message but returned no message_id
|
||||
# (e.g. Signal). Can't edit without an ID — switch to
|
||||
# fallback mode: suppress intermediate deltas, send only
|
||||
# the missing tail once the final response is ready.
|
||||
self._already_sent = True
|
||||
self._edit_supported = False
|
||||
self._fallback_prefix = self._clean_for_display(text)
|
||||
self._fallback_final_send = True
|
||||
# Sentinel prevents re-entering this branch on every delta
|
||||
self._message_id = "__no_edit__"
|
||||
else:
|
||||
# Initial send failed — disable streaming for this session
|
||||
self._edit_supported = False
|
||||
|
||||
@@ -392,6 +392,7 @@ DEFAULT_CONFIG = {
|
||||
"show_cost": False, # Show $ cost in the status bar (off by default)
|
||||
"skin": "default",
|
||||
"tool_progress_command": False, # Enable /verbose command in messaging gateway
|
||||
"tool_progress_overrides": {}, # Per-platform overrides: {"signal": "off", "telegram": "all"}
|
||||
"tool_preview_length": 0, # Max chars for tool call previews (0 = no limit, show full paths/commands)
|
||||
},
|
||||
|
||||
|
||||
@@ -707,3 +707,66 @@ class TestSignalSendDocumentViaHelper:
|
||||
|
||||
assert result.success is False
|
||||
assert "/nonexistent.pdf" in result.error
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# send() returns message_id from timestamp (#4647)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSignalSendReturnsMessageId:
|
||||
"""Signal send() must return a timestamp-based message_id so the stream
|
||||
consumer can follow its edit→fallback path correctly."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_returns_timestamp_as_message_id(self, monkeypatch):
|
||||
adapter = _make_signal_adapter(monkeypatch)
|
||||
mock_rpc, _ = _stub_rpc({"timestamp": 1712345678000})
|
||||
adapter._rpc = mock_rpc
|
||||
adapter._stop_typing_indicator = AsyncMock()
|
||||
|
||||
result = await adapter.send(chat_id="+155****4567", content="hello")
|
||||
|
||||
assert result.success is True
|
||||
assert result.message_id == "1712345678000"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_returns_none_message_id_when_no_timestamp(self, monkeypatch):
|
||||
adapter = _make_signal_adapter(monkeypatch)
|
||||
mock_rpc, _ = _stub_rpc({}) # No timestamp key
|
||||
adapter._rpc = mock_rpc
|
||||
adapter._stop_typing_indicator = AsyncMock()
|
||||
|
||||
result = await adapter.send(chat_id="+155****4567", content="hello")
|
||||
|
||||
assert result.success is True
|
||||
assert result.message_id is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_returns_none_message_id_for_non_dict(self, monkeypatch):
|
||||
adapter = _make_signal_adapter(monkeypatch)
|
||||
mock_rpc, _ = _stub_rpc("ok") # Non-dict result
|
||||
adapter._rpc = mock_rpc
|
||||
adapter._stop_typing_indicator = AsyncMock()
|
||||
|
||||
result = await adapter.send(chat_id="+155****4567", content="hello")
|
||||
|
||||
assert result.success is True
|
||||
assert result.message_id is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# stop_typing() delegates to _stop_typing_indicator (#4647)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSignalStopTyping:
|
||||
"""Signal must expose a public stop_typing() so base adapter's
|
||||
_keep_typing finally block can clean up platform-level typing tasks."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stop_typing_calls_private_method(self, monkeypatch):
|
||||
adapter = _make_signal_adapter(monkeypatch)
|
||||
adapter._stop_typing_indicator = AsyncMock()
|
||||
|
||||
await adapter.stop_typing("+155****4567")
|
||||
|
||||
adapter._stop_typing_indicator.assert_awaited_once_with("+155****4567")
|
||||
|
||||
@@ -383,6 +383,60 @@ class TestSegmentBreakOnToolBoundary:
|
||||
sent_texts = [call[1]["content"] for call in adapter.send.call_args_list]
|
||||
assert sent_texts == ["Hello ▉", "Next segment"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_no_message_id_enters_fallback_mode(self):
|
||||
"""Platform returns success but no message_id (Signal) — must not
|
||||
re-send on every delta. Should enter fallback mode and send only
|
||||
the continuation at finish."""
|
||||
adapter = MagicMock()
|
||||
# First send succeeds but returns no message_id (Signal behavior)
|
||||
send_result_no_id = SimpleNamespace(success=True, message_id=None)
|
||||
# Fallback final send succeeds
|
||||
send_result_final = SimpleNamespace(success=True, message_id="msg_final")
|
||||
adapter.send = AsyncMock(side_effect=[send_result_no_id, send_result_final])
|
||||
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
|
||||
adapter.MAX_MESSAGE_LENGTH = 4096
|
||||
|
||||
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5)
|
||||
consumer = GatewayStreamConsumer(adapter, "chat_123", config)
|
||||
|
||||
consumer.on_delta("Hello")
|
||||
task = asyncio.create_task(consumer.run())
|
||||
await asyncio.sleep(0.08)
|
||||
consumer.on_delta(" world, this is a longer response.")
|
||||
await asyncio.sleep(0.08)
|
||||
consumer.finish()
|
||||
await task
|
||||
|
||||
# Should send exactly 2 messages: initial chunk + fallback continuation
|
||||
# NOT one message per delta
|
||||
assert adapter.send.call_count == 2
|
||||
assert consumer.already_sent
|
||||
# edit_message should NOT have been called (no valid message_id to edit)
|
||||
adapter.edit_message.assert_not_called()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_no_message_id_single_delta_marks_already_sent(self):
|
||||
"""When the entire response fits in one delta and platform returns no
|
||||
message_id, already_sent must still be True to prevent the gateway
|
||||
from re-sending the full response."""
|
||||
adapter = MagicMock()
|
||||
send_result = SimpleNamespace(success=True, message_id=None)
|
||||
adapter.send = AsyncMock(return_value=send_result)
|
||||
adapter.MAX_MESSAGE_LENGTH = 4096
|
||||
|
||||
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5)
|
||||
consumer = GatewayStreamConsumer(adapter, "chat_123", config)
|
||||
|
||||
consumer.on_delta("Short response.")
|
||||
consumer.finish()
|
||||
|
||||
await consumer.run()
|
||||
|
||||
assert consumer.already_sent
|
||||
# Only one send call (the initial message)
|
||||
assert adapter.send.call_count == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fallback_final_splits_long_continuation_without_dropping_text(self):
|
||||
"""Long continuation tails should be chunked when fallback final-send runs."""
|
||||
|
||||
Reference in New Issue
Block a user