Merge branch 'main' of github.com:NousResearch/hermes-agent into feat/ink-refactor

This commit is contained in:
Brooklyn Nicholson
2026-04-16 18:25:49 -05:00
18 changed files with 1723 additions and 377 deletions

View File

@@ -1291,7 +1291,7 @@ class BasePlatformAdapter(ABC):
path = path[1:-1].strip()
path = path.lstrip("`\"'").rstrip("`\"',.;:)}]")
if path:
media.append((path, has_voice_tag))
media.append((os.path.expanduser(path), has_voice_tag))
# Remove MEDIA tags from content (including surrounding quote/backtick wrappers)
if media:

File diff suppressed because it is too large Load Diff

View File

@@ -6986,7 +6986,7 @@ class GatewayRunner:
except Exception as exc:
return f"✗ Failed to upload debug report: {exc}"
# Schedule auto-deletion after 1 hour
# Schedule auto-deletion after 6 hours
_schedule_auto_delete(list(urls.values()))
lines = [_GATEWAY_PRIVACY_NOTICE, "", "**Debug report uploaded:**", ""]
@@ -6995,7 +6995,7 @@ class GatewayRunner:
lines.append(f"`{label:<{label_width}}` {url}")
lines.append("")
lines.append("⏱ Pastes will auto-delete in 1 hour.")
lines.append("⏱ Pastes will auto-delete in 6 hours.")
lines.append("For full log uploads, use `hermes debug share` from the CLI.")
lines.append("Share these links with the Hermes team for support.")
return "\n".join(lines)
@@ -8079,12 +8079,15 @@ class GatewayRunner:
if _adapter:
_adapter_supports_edit = getattr(_adapter, "SUPPORTS_MESSAGE_EDITING", True)
_effective_cursor = _scfg.cursor if _adapter_supports_edit else ""
_buffer_only = False
if source.platform == Platform.MATRIX:
_effective_cursor = ""
_buffer_only = True
_consumer_cfg = StreamConsumerConfig(
edit_interval=_scfg.edit_interval,
buffer_threshold=_scfg.buffer_threshold,
cursor=_effective_cursor,
buffer_only=_buffer_only,
)
_stream_consumer = GatewayStreamConsumer(
adapter=_adapter,
@@ -8650,12 +8653,15 @@ class GatewayRunner:
# Some Matrix clients render the streaming cursor
# as a visible tofu/white-box artifact. Keep
# streaming text on Matrix, but suppress the cursor.
_buffer_only = False
if source.platform == Platform.MATRIX:
_effective_cursor = ""
_buffer_only = True
_consumer_cfg = StreamConsumerConfig(
edit_interval=_scfg.edit_interval,
buffer_threshold=_scfg.buffer_threshold,
cursor=_effective_cursor,
buffer_only=_buffer_only,
)
_stream_consumer = GatewayStreamConsumer(
adapter=_adapter,

View File

@@ -43,6 +43,7 @@ class StreamConsumerConfig:
edit_interval: float = 1.0
buffer_threshold: int = 40
cursor: str = ""
buffer_only: bool = False
class GatewayStreamConsumer:
@@ -295,10 +296,13 @@ class GatewayStreamConsumer:
got_done
or got_segment_break
or commentary_text is not None
or (elapsed >= self._current_edit_interval
and self._accumulated)
or len(self._accumulated) >= self.cfg.buffer_threshold
)
if not self.cfg.buffer_only:
should_edit = should_edit or (
(elapsed >= self._current_edit_interval
and self._accumulated)
or len(self._accumulated) >= self.cfg.buffer_threshold
)
current_update_visible = False
if should_edit and self._accumulated:

View File

@@ -27,8 +27,8 @@ _DPASTE_COM_URL = "https://dpaste.com/api/"
# paste.rs caps at ~1 MB; we stay under that with headroom.
_MAX_LOG_BYTES = 512_000
# Auto-delete pastes after this many seconds (1 hour).
_AUTO_DELETE_SECONDS = 3600
# Auto-delete pastes after this many seconds (6 hours).
_AUTO_DELETE_SECONDS = 21600
# ---------------------------------------------------------------------------
@@ -44,7 +44,7 @@ _PRIVACY_NOTICE = """\
• Full agent.log and gateway.log (up to 512 KB each — likely contains
conversation content, tool outputs, and file paths)
Pastes auto-delete after 1 hour.
Pastes auto-delete after 6 hours.
"""
_GATEWAY_PRIVACY_NOTICE = (
@@ -52,7 +52,7 @@ _GATEWAY_PRIVACY_NOTICE = (
"(may contain conversation fragments) to a public paste service. "
"Full logs are NOT included from the gateway — use `hermes debug share` "
"from the CLI for full log uploads.\n"
"Pastes auto-delete after 1 hour."
"Pastes auto-delete after 6 hours."
)
@@ -422,9 +422,9 @@ def run_debug_share(args):
if failures:
print(f"\n (failed to upload: {', '.join(failures)})")
# Schedule auto-deletion after 1 hour
# Schedule auto-deletion after 6 hours
_schedule_auto_delete(list(urls.values()))
print(f"\n⏱ Pastes will auto-delete in 1 hour.")
print(f"\n⏱ Pastes will auto-delete in 6 hours.")
# Manual delete fallback
print(f"To delete now: hermes debug delete <url>")

View File

@@ -430,6 +430,8 @@ def _print_setup_summary(config: dict, hermes_home):
tool_status.append(("Text-to-Speech (MiniMax)", True, None))
elif tts_provider == "mistral" and get_env_value("MISTRAL_API_KEY"):
tool_status.append(("Text-to-Speech (Mistral Voxtral)", True, None))
elif tts_provider == "gemini" and (get_env_value("GEMINI_API_KEY") or get_env_value("GOOGLE_API_KEY")):
tool_status.append(("Text-to-Speech (Google Gemini)", True, None))
elif tts_provider == "neutts":
try:
import importlib.util
@@ -913,6 +915,7 @@ def _setup_tts_provider(config: dict):
"xai": "xAI TTS",
"minimax": "MiniMax TTS",
"mistral": "Mistral Voxtral TTS",
"gemini": "Google Gemini TTS",
"neutts": "NeuTTS",
}
current_label = provider_labels.get(current_provider, current_provider)
@@ -935,10 +938,11 @@ def _setup_tts_provider(config: dict):
"xAI TTS (Grok voices, needs API key)",
"MiniMax TTS (high quality with voice cloning, needs API key)",
"Mistral Voxtral TTS (multilingual, native Opus, needs API key)",
"Google Gemini TTS (30 prebuilt voices, prompt-controllable, needs API key)",
"NeuTTS (local on-device, free, ~300MB model download)",
]
)
providers.extend(["edge", "elevenlabs", "openai", "xai", "minimax", "mistral", "neutts"])
providers.extend(["edge", "elevenlabs", "openai", "xai", "minimax", "mistral", "gemini", "neutts"])
choices.append(f"Keep current ({current_label})")
keep_current_idx = len(choices) - 1
idx = prompt_choice("Select TTS provider:", choices, keep_current_idx)
@@ -1045,6 +1049,19 @@ def _setup_tts_provider(config: dict):
print_warning("No API key provided. Falling back to Edge TTS.")
selected = "edge"
elif selected == "gemini":
existing = get_env_value("GEMINI_API_KEY") or get_env_value("GOOGLE_API_KEY")
if not existing:
print()
print_info("Get a free API key at https://aistudio.google.com/app/apikey")
api_key = prompt("Gemini API key for TTS", password=True)
if api_key:
save_env_value("GEMINI_API_KEY", api_key)
print_success("Gemini TTS API key saved")
else:
print_warning("No API key provided. Falling back to Edge TTS.")
selected = "edge"
# Save the selection
if "tts" not in config:
config["tts"] = {}

View File

@@ -172,6 +172,15 @@ TOOL_CATEGORIES = {
],
"tts_provider": "mistral",
},
{
"name": "Google Gemini TTS",
"badge": "preview",
"tag": "30 prebuilt voices, controllable via prompts",
"env_vars": [
{"key": "GEMINI_API_KEY", "prompt": "Gemini API key", "url": "https://aistudio.google.com/app/apikey"},
],
"tts_provider": "gemini",
},
],
},
"web": {

View File

@@ -108,6 +108,9 @@ def _make_fake_mautrix():
def add_event_handler(self, event_type, handler):
self._event_handlers.setdefault(event_type, []).append(handler)
def add_dispatcher(self, dispatcher_type):
pass
class InternalEventType:
INVITE = "internal.invite"
@@ -115,6 +118,14 @@ def _make_fake_mautrix():
mautrix_client.InternalEventType = InternalEventType
mautrix.client = mautrix_client
# --- mautrix.client.dispatcher ---
mautrix_client_dispatcher = types.ModuleType("mautrix.client.dispatcher")
class MembershipEventDispatcher:
pass
mautrix_client_dispatcher.MembershipEventDispatcher = MembershipEventDispatcher
# --- mautrix.client.state_store ---
mautrix_client_state_store = types.ModuleType("mautrix.client.state_store")
@@ -163,6 +174,19 @@ def _make_fake_mautrix():
mautrix_crypto_store.MemoryCryptoStore = MemoryCryptoStore
# --- mautrix.crypto.attachments ---
mautrix_crypto_attachments = types.ModuleType("mautrix.crypto.attachments")
def encrypt_attachment(data):
encrypted_file = MagicMock()
encrypted_file.serialize.return_value = {
"key": {"k": "testkey"}, "iv": "testiv",
"hashes": {"sha256": "testhash"}, "v": "v2",
}
return (b"ciphertext_" + data, encrypted_file)
mautrix_crypto_attachments.encrypt_attachment = encrypt_attachment
# --- mautrix.crypto.store.asyncpg ---
mautrix_crypto_store_asyncpg = types.ModuleType("mautrix.crypto.store.asyncpg")
@@ -200,8 +224,10 @@ def _make_fake_mautrix():
"mautrix.api": mautrix_api,
"mautrix.types": mautrix_types,
"mautrix.client": mautrix_client,
"mautrix.client.dispatcher": mautrix_client_dispatcher,
"mautrix.client.state_store": mautrix_client_state_store,
"mautrix.crypto": mautrix_crypto,
"mautrix.crypto.attachments": mautrix_crypto_attachments,
"mautrix.crypto.store": mautrix_crypto_store,
"mautrix.crypto.store.asyncpg": mautrix_crypto_store_asyncpg,
"mautrix.util": mautrix_util,
@@ -357,6 +383,16 @@ class TestMatrixTypingIndicator:
timeout=0,
)
@pytest.mark.asyncio
async def test_stop_typing_no_client_is_noop(self):
self.adapter._client = None
await self.adapter.stop_typing("!room:example.org") # should not raise
@pytest.mark.asyncio
async def test_stop_typing_suppresses_exceptions(self):
self.adapter._client.set_typing = AsyncMock(side_effect=Exception("network"))
await self.adapter.stop_typing("!room:example.org") # should not raise
# ---------------------------------------------------------------------------
# mxc:// URL conversion
@@ -835,6 +871,41 @@ class TestMatrixAccessTokenAuth:
await adapter.disconnect()
class TestDeviceKeyReVerification:
@pytest.mark.asyncio
async def test_verify_fails_when_server_keys_mismatch_after_upload(self):
"""share_keys() succeeds but server still has old keys -> should return False."""
adapter = _make_adapter()
mock_client = MagicMock()
mock_client.mxid = "@bot:example.org"
mock_client.device_id = "TESTDEVICE"
# First query: keys missing -> triggers share_keys
# Second query: keys still don't match -> should fail
mock_keys_missing = MagicMock()
mock_keys_missing.device_keys = {"@bot:example.org": {}}
mock_keys_mismatch = MagicMock()
mock_device = MagicMock()
mock_device.keys = {"ed25519:TESTDEVICE": "server_old_key"}
mock_keys_mismatch.device_keys = {"@bot:example.org": {"TESTDEVICE": mock_device}}
mock_client.query_keys = AsyncMock(side_effect=[mock_keys_missing, mock_keys_mismatch])
mock_olm = MagicMock()
mock_olm.account = MagicMock()
mock_olm.account.shared = False
mock_olm.account.identity_keys = {"ed25519": "local_new_key"}
mock_olm.share_keys = AsyncMock()
from gateway.platforms.matrix import MatrixAdapter
result = await adapter._verify_device_keys_on_server(mock_client, mock_olm)
assert result is False
mock_olm.share_keys.assert_awaited_once()
class TestMatrixE2EEHardFail:
"""connect() must refuse to start when E2EE is requested but deps are missing."""
@@ -1139,6 +1210,56 @@ class TestMatrixSyncLoop:
mock_sync_store.put_next_batch.assert_awaited_once_with("s1234")
class TestMatrixUploadAndSend:
@pytest.mark.asyncio
async def test_upload_unencrypted_room_uses_plain_url(self):
"""Unencrypted rooms should use plain 'url' key."""
adapter = _make_adapter()
adapter._encryption = True
mock_client = MagicMock()
mock_client.crypto = object()
mock_client.state_store = MagicMock()
mock_client.state_store.is_encrypted = AsyncMock(return_value=False)
mock_client.upload_media = AsyncMock(return_value="mxc://example.org/plain")
mock_client.send_message_event = AsyncMock(return_value="$event")
adapter._client = mock_client
result = await adapter._upload_and_send(
"!room:example.org", b"hello", "test.txt", "text/plain", "m.file",
)
assert result.success is True
sent = mock_client.send_message_event.await_args.args[2]
assert sent["url"] == "mxc://example.org/plain"
assert "file" not in sent
@pytest.mark.asyncio
async def test_upload_encrypted_room_uses_file_payload(self):
"""Encrypted rooms should use 'file' key with crypto metadata."""
adapter = _make_adapter()
adapter._encryption = True
mock_client = MagicMock()
mock_client.crypto = object()
mock_client.state_store = MagicMock()
mock_client.state_store.is_encrypted = AsyncMock(return_value=True)
mock_client.upload_media = AsyncMock(return_value="mxc://example.org/enc")
mock_client.send_message_event = AsyncMock(return_value="$event")
adapter._client = mock_client
result = await adapter._upload_and_send(
"!room:example.org", b"secret", "secret.txt", "text/plain", "m.file",
)
assert result.success is True
# Should have uploaded ciphertext, not plaintext
uploaded_data = mock_client.upload_media.await_args.args[0]
assert uploaded_data != b"secret"
sent = mock_client.send_message_event.await_args.args[2]
assert "url" not in sent
assert "file" in sent
assert sent["file"]["url"] == "mxc://example.org/enc"
class TestMatrixEncryptedSendFallback:
@pytest.mark.asyncio
async def test_send_retries_after_e2ee_error(self):
@@ -1165,128 +1286,24 @@ class TestMatrixEncryptedSendFallback:
# ---------------------------------------------------------------------------
# E2EE: MegolmEvent key request + buffering via _on_encrypted_event
# E2EE: _joined_rooms reference preservation for CryptoStateStore
# ---------------------------------------------------------------------------
class TestMatrixMegolmEventHandling:
@pytest.mark.asyncio
async def test_encrypted_event_buffers_for_retry(self):
"""_on_encrypted_event should buffer undecrypted events for retry."""
adapter = _make_adapter()
adapter._user_id = "@bot:example.org"
adapter._startup_ts = 0.0
adapter._dm_rooms = {}
class TestJoinedRoomsReference:
def test_joined_rooms_reference_preserved_after_reassignment(self):
"""_CryptoStateStore must see updates after initial sync populates rooms."""
from gateway.platforms.matrix import _CryptoStateStore
fake_event = MagicMock()
fake_event.room_id = "!room:example.org"
fake_event.event_id = "$encrypted_event"
fake_event.sender = "@alice:example.org"
joined = set()
store = _CryptoStateStore(MagicMock(), joined)
await adapter._on_encrypted_event(fake_event)
# Simulate what connect() should do: mutate in place, not reassign.
joined.clear()
joined.update(["!room1:example.org", "!room2:example.org"])
# Should have buffered the event
assert len(adapter._pending_megolm) == 1
room_id, event, ts = adapter._pending_megolm[0]
assert room_id == "!room:example.org"
assert event is fake_event
@pytest.mark.asyncio
async def test_encrypted_event_buffer_capped(self):
"""Buffer should not grow past _MAX_PENDING_EVENTS."""
adapter = _make_adapter()
adapter._user_id = "@bot:example.org"
adapter._startup_ts = 0.0
adapter._dm_rooms = {}
from gateway.platforms.matrix import _MAX_PENDING_EVENTS
for i in range(_MAX_PENDING_EVENTS + 10):
evt = MagicMock()
evt.room_id = "!room:example.org"
evt.event_id = f"$event_{i}"
evt.sender = "@alice:example.org"
await adapter._on_encrypted_event(evt)
assert len(adapter._pending_megolm) == _MAX_PENDING_EVENTS
# ---------------------------------------------------------------------------
# E2EE: Retry pending decryptions
# ---------------------------------------------------------------------------
class TestMatrixRetryPendingDecryptions:
@pytest.mark.asyncio
async def test_successful_decryption_routes_to_handler(self):
adapter = _make_adapter()
adapter._user_id = "@bot:example.org"
adapter._startup_ts = 0.0
adapter._dm_rooms = {}
fake_encrypted = MagicMock()
fake_encrypted.event_id = "$encrypted"
decrypted_event = MagicMock()
mock_crypto = MagicMock()
mock_crypto.decrypt_megolm_event = AsyncMock(return_value=decrypted_event)
fake_client = MagicMock()
fake_client.crypto = mock_crypto
adapter._client = fake_client
now = time.time()
adapter._pending_megolm = [("!room:ex.org", fake_encrypted, now)]
with patch.object(adapter, "_on_room_message", AsyncMock()) as mock_handler:
await adapter._retry_pending_decryptions()
mock_handler.assert_awaited_once_with(decrypted_event)
# Buffer should be empty now
assert len(adapter._pending_megolm) == 0
@pytest.mark.asyncio
async def test_still_undecryptable_stays_in_buffer(self):
adapter = _make_adapter()
fake_encrypted = MagicMock()
fake_encrypted.event_id = "$still_encrypted"
mock_crypto = MagicMock()
mock_crypto.decrypt_megolm_event = AsyncMock(side_effect=Exception("missing key"))
fake_client = MagicMock()
fake_client.crypto = mock_crypto
adapter._client = fake_client
now = time.time()
adapter._pending_megolm = [("!room:ex.org", fake_encrypted, now)]
await adapter._retry_pending_decryptions()
assert len(adapter._pending_megolm) == 1
@pytest.mark.asyncio
async def test_expired_events_dropped(self):
adapter = _make_adapter()
from gateway.platforms.matrix import _PENDING_EVENT_TTL
fake_event = MagicMock()
fake_event.event_id = "$old_event"
mock_crypto = MagicMock()
fake_client = MagicMock()
fake_client.crypto = mock_crypto
adapter._client = fake_client
# Timestamp well past TTL
old_ts = time.time() - _PENDING_EVENT_TTL - 60
adapter._pending_megolm = [("!room:ex.org", fake_event, old_ts)]
await adapter._retry_pending_decryptions()
# Should have been dropped
assert len(adapter._pending_megolm) == 0
import asyncio
rooms = asyncio.get_event_loop().run_until_complete(store.find_shared_rooms("@user:ex"))
assert set(rooms) == {"!room1:example.org", "!room2:example.org"}
# ---------------------------------------------------------------------------
@@ -1354,11 +1371,70 @@ class TestMatrixEncryptedEventHandler:
handler_calls = mock_client.add_event_handler.call_args_list
registered_types = [call.args[0] for call in handler_calls]
# Should have registered handlers for ROOM_MESSAGE, REACTION, INVITE, and ROOM_ENCRYPTED
assert len(handler_calls) >= 4 # At minimum these four
# Should have registered handlers for ROOM_MESSAGE, REACTION, INVITE
assert len(handler_calls) >= 3
await adapter.disconnect()
@pytest.mark.asyncio
async def test_connect_fails_on_stale_otk_conflict(self):
"""connect() must refuse E2EE when OTK upload hits 'already exists'."""
from gateway.platforms.matrix import MatrixAdapter
config = PlatformConfig(
enabled=True,
token="syt_test_token",
extra={
"homeserver": "https://matrix.example.org",
"user_id": "@bot:example.org",
"encryption": True,
},
)
adapter = MatrixAdapter(config)
fake_mautrix_mods = _make_fake_mautrix()
mock_client = MagicMock()
mock_client.mxid = "@bot:example.org"
mock_client.device_id = None
mock_client.state_store = MagicMock()
mock_client.sync_store = MagicMock()
mock_client.crypto = None
mock_client.whoami = AsyncMock(return_value=MagicMock(user_id="@bot:example.org", device_id="DEV123"))
mock_client.add_event_handler = MagicMock()
mock_client.add_dispatcher = MagicMock()
mock_client.query_keys = AsyncMock(return_value={
"device_keys": {"@bot:example.org": {"DEV123": {
"keys": {"ed25519:DEV123": "fake_ed25519_key"},
}}},
})
mock_client.api = MagicMock()
mock_client.api.token = "syt_test_token"
mock_client.api.session = MagicMock()
mock_client.api.session.close = AsyncMock()
# share_keys succeeds on first call (from _verify_device_keys_on_server),
# then raises "already exists" on the proactive OTK flush in connect().
mock_olm = MagicMock()
mock_olm.load = AsyncMock()
mock_olm.share_keys = AsyncMock(
side_effect=[None, Exception("One time key signed_curve25519:AAAAAQ already exists")]
)
mock_olm.share_keys_min_trust = None
mock_olm.send_keys_min_trust = None
mock_olm.account = MagicMock()
mock_olm.account.identity_keys = {"ed25519": "fake_ed25519_key"}
fake_mautrix_mods["mautrix.client"].Client = MagicMock(return_value=mock_client)
fake_mautrix_mods["mautrix.crypto"].OlmMachine = MagicMock(return_value=mock_olm)
from gateway.platforms import matrix as matrix_mod
with patch.object(matrix_mod, "_check_e2ee_deps", return_value=True):
with patch.dict("sys.modules", fake_mautrix_mods):
result = await adapter.connect()
assert result is False
# ---------------------------------------------------------------------------
# Disconnect

View File

@@ -10,7 +10,6 @@ import pytest
from gateway.config import PlatformConfig
# The matrix adapter module is importable without mautrix installed
# (module-level imports use try/except with stubs). No need for
# module-level mock installation — tests that call adapter methods
@@ -159,9 +158,15 @@ class TestStripMention:
result = self.adapter._strip_mention("@hermes:example.org help me")
assert result == "help me"
def test_strip_localpart(self):
def test_localpart_preserved(self):
"""Localpart-only text is no longer stripped — avoids false positives in paths."""
result = self.adapter._strip_mention("hermes help me")
assert result == "help me"
assert result == "hermes help me"
def test_localpart_in_path_preserved(self):
"""Localpart inside a file path must not be damaged."""
result = self.adapter._strip_mention("read /home/hermes/config.yaml")
assert result == "read /home/hermes/config.yaml"
def test_strip_returns_empty_for_mention_only(self):
result = self.adapter._strip_mention("@hermes:example.org")
@@ -273,8 +278,8 @@ async def test_require_mention_dm_always_responds(monkeypatch):
@pytest.mark.asyncio
async def test_dm_strips_mention(monkeypatch):
"""DMs strip mention from body, matching Discord behavior."""
async def test_dm_strips_full_mxid(monkeypatch):
"""DMs strip the full MXID from body when require_mention is on (default)."""
monkeypatch.delenv("MATRIX_REQUIRE_MENTION", raising=False)
monkeypatch.delenv("MATRIX_FREE_RESPONSE_ROOMS", raising=False)
monkeypatch.setenv("MATRIX_AUTO_THREAD", "false")
@@ -289,6 +294,23 @@ async def test_dm_strips_mention(monkeypatch):
assert msg.text == "help me"
@pytest.mark.asyncio
async def test_dm_preserves_localpart_in_body(monkeypatch):
"""DMs no longer strip bare localpart — only the full MXID is removed."""
monkeypatch.delenv("MATRIX_REQUIRE_MENTION", raising=False)
monkeypatch.delenv("MATRIX_FREE_RESPONSE_ROOMS", raising=False)
monkeypatch.setenv("MATRIX_AUTO_THREAD", "false")
adapter = _make_adapter()
_set_dm(adapter)
event = _make_event("hermes help me")
await adapter._on_room_message(event)
adapter.handle_message.assert_awaited_once()
msg = adapter.handle_message.await_args.args[0]
assert msg.text == "hermes help me"
@pytest.mark.asyncio
async def test_bare_mention_passes_empty_string(monkeypatch):
"""A message that is only a mention should pass through as empty, not be dropped."""
@@ -309,7 +331,9 @@ async def test_bare_mention_passes_empty_string(monkeypatch):
async def test_require_mention_free_response_room(monkeypatch):
"""Free-response rooms bypass mention requirement."""
monkeypatch.delenv("MATRIX_REQUIRE_MENTION", raising=False)
monkeypatch.setenv("MATRIX_FREE_RESPONSE_ROOMS", "!room1:example.org,!room2:example.org")
monkeypatch.setenv(
"MATRIX_FREE_RESPONSE_ROOMS", "!room1:example.org,!room2:example.org"
)
monkeypatch.setenv("MATRIX_AUTO_THREAD", "false")
adapter = _make_adapter()
@@ -351,6 +375,22 @@ async def test_require_mention_disabled(monkeypatch):
assert msg.text == "hello without mention"
@pytest.mark.asyncio
async def test_require_mention_disabled_skips_stripping(monkeypatch):
"""MATRIX_REQUIRE_MENTION=false: mention text is NOT stripped from body."""
monkeypatch.setenv("MATRIX_REQUIRE_MENTION", "false")
monkeypatch.delenv("MATRIX_FREE_RESPONSE_ROOMS", raising=False)
monkeypatch.setenv("MATRIX_AUTO_THREAD", "false")
adapter = _make_adapter()
event = _make_event("@hermes:example.org help me")
await adapter._on_room_message(event)
adapter.handle_message.assert_awaited_once()
msg = adapter.handle_message.await_args.args[0]
assert msg.text == "@hermes:example.org help me"
# ---------------------------------------------------------------------------
# Auto-thread in _on_room_message
# ---------------------------------------------------------------------------
@@ -442,8 +482,10 @@ class TestThreadPersistence:
def test_empty_state_file(self, tmp_path, monkeypatch):
"""No state file → empty set."""
from gateway.platforms.helpers import ThreadParticipationTracker
monkeypatch.setattr(
ThreadParticipationTracker, "_state_path",
ThreadParticipationTracker,
"_state_path",
lambda self: tmp_path / "matrix_threads.json",
)
adapter = _make_adapter()
@@ -452,9 +494,11 @@ class TestThreadPersistence:
def test_track_thread_persists(self, tmp_path, monkeypatch):
"""mark() writes to disk."""
from gateway.platforms.helpers import ThreadParticipationTracker
state_path = tmp_path / "matrix_threads.json"
monkeypatch.setattr(
ThreadParticipationTracker, "_state_path",
ThreadParticipationTracker,
"_state_path",
lambda self: state_path,
)
adapter = _make_adapter()
@@ -466,10 +510,12 @@ class TestThreadPersistence:
def test_threads_survive_reload(self, tmp_path, monkeypatch):
"""Persisted threads are loaded by a new adapter instance."""
from gateway.platforms.helpers import ThreadParticipationTracker
state_path = tmp_path / "matrix_threads.json"
state_path.write_text(json.dumps(["$t1", "$t2"]))
monkeypatch.setattr(
ThreadParticipationTracker, "_state_path",
ThreadParticipationTracker,
"_state_path",
lambda self: state_path,
)
adapter = _make_adapter()
@@ -479,9 +525,11 @@ class TestThreadPersistence:
def test_cap_max_tracked_threads(self, tmp_path, monkeypatch):
"""Thread set is trimmed to max_tracked."""
from gateway.platforms.helpers import ThreadParticipationTracker
state_path = tmp_path / "matrix_threads.json"
monkeypatch.setattr(
ThreadParticipationTracker, "_state_path",
ThreadParticipationTracker,
"_state_path",
lambda self: state_path,
)
adapter = _make_adapter()
@@ -604,6 +652,7 @@ class TestMatrixConfigBridge:
}
import os
import yaml
config_file = tmp_path / "config.yaml"
@@ -613,18 +662,27 @@ class TestMatrixConfigBridge:
yaml_cfg = yaml.safe_load(config_file.read_text())
matrix_cfg = yaml_cfg.get("matrix", {})
if isinstance(matrix_cfg, dict):
if "require_mention" in matrix_cfg and not os.getenv("MATRIX_REQUIRE_MENTION"):
monkeypatch.setenv("MATRIX_REQUIRE_MENTION", str(matrix_cfg["require_mention"]).lower())
if "require_mention" in matrix_cfg and not os.getenv(
"MATRIX_REQUIRE_MENTION"
):
monkeypatch.setenv(
"MATRIX_REQUIRE_MENTION", str(matrix_cfg["require_mention"]).lower()
)
frc = matrix_cfg.get("free_response_rooms")
if frc is not None and not os.getenv("MATRIX_FREE_RESPONSE_ROOMS"):
if isinstance(frc, list):
frc = ",".join(str(v) for v in frc)
monkeypatch.setenv("MATRIX_FREE_RESPONSE_ROOMS", str(frc))
if "auto_thread" in matrix_cfg and not os.getenv("MATRIX_AUTO_THREAD"):
monkeypatch.setenv("MATRIX_AUTO_THREAD", str(matrix_cfg["auto_thread"]).lower())
monkeypatch.setenv(
"MATRIX_AUTO_THREAD", str(matrix_cfg["auto_thread"]).lower()
)
assert os.getenv("MATRIX_REQUIRE_MENTION") == "false"
assert os.getenv("MATRIX_FREE_RESPONSE_ROOMS") == "!room1:example.org,!room2:example.org"
assert (
os.getenv("MATRIX_FREE_RESPONSE_ROOMS")
== "!room1:example.org,!room2:example.org"
)
assert os.getenv("MATRIX_AUTO_THREAD") == "false"
def test_yaml_bridge_sets_dm_mention_threads(self, monkeypatch, tmp_path):
@@ -632,6 +690,7 @@ class TestMatrixConfigBridge:
monkeypatch.delenv("MATRIX_DM_MENTION_THREADS", raising=False)
import os
import yaml
yaml_content = {"matrix": {"dm_mention_threads": True}}
@@ -641,8 +700,13 @@ class TestMatrixConfigBridge:
yaml_cfg = yaml.safe_load(config_file.read_text())
matrix_cfg = yaml_cfg.get("matrix", {})
if isinstance(matrix_cfg, dict):
if "dm_mention_threads" in matrix_cfg and not os.getenv("MATRIX_DM_MENTION_THREADS"):
monkeypatch.setenv("MATRIX_DM_MENTION_THREADS", str(matrix_cfg["dm_mention_threads"]).lower())
if "dm_mention_threads" in matrix_cfg and not os.getenv(
"MATRIX_DM_MENTION_THREADS"
):
monkeypatch.setenv(
"MATRIX_DM_MENTION_THREADS",
str(matrix_cfg["dm_mention_threads"]).lower(),
)
assert os.getenv("MATRIX_DM_MENTION_THREADS") == "true"
@@ -651,9 +715,12 @@ class TestMatrixConfigBridge:
monkeypatch.setenv("MATRIX_REQUIRE_MENTION", "true")
import os
yaml_cfg = {"matrix": {"require_mention": False}}
matrix_cfg = yaml_cfg.get("matrix", {})
if "require_mention" in matrix_cfg and not os.getenv("MATRIX_REQUIRE_MENTION"):
monkeypatch.setenv("MATRIX_REQUIRE_MENTION", str(matrix_cfg["require_mention"]).lower())
monkeypatch.setenv(
"MATRIX_REQUIRE_MENTION", str(matrix_cfg["require_mention"]).lower()
)
assert os.getenv("MATRIX_REQUIRE_MENTION") == "true"

View File

@@ -1013,3 +1013,106 @@ class TestFilterAndAccumulateIntegration:
await task
except asyncio.CancelledError:
pass
# ── buffer_only mode tests ─────────────────────────────────────────────
class TestBufferOnlyMode:
"""Verify buffer_only mode suppresses intermediate edits and only
flushes on structural boundaries (done, segment break, commentary)."""
@pytest.mark.asyncio
async def test_suppresses_intermediate_edits(self):
"""Time-based and size-based edits are skipped; only got_done flushes."""
adapter = MagicMock()
adapter.MAX_MESSAGE_LENGTH = 4096
adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg1"))
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
cfg = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor="", buffer_only=True)
consumer = GatewayStreamConsumer(adapter, "!room:server", config=cfg)
for word in ["Hello", " world", ", this", " is", " a", " test"]:
consumer.on_delta(word)
consumer.finish()
await consumer.run()
adapter.send.assert_called_once()
adapter.edit_message.assert_not_called()
assert "Hello world, this is a test" in adapter.send.call_args_list[0][1]["content"]
@pytest.mark.asyncio
async def test_flushes_on_segment_break(self):
"""A segment break (tool call boundary) flushes accumulated text."""
adapter = MagicMock()
adapter.MAX_MESSAGE_LENGTH = 4096
adapter.send = AsyncMock(side_effect=[
SimpleNamespace(success=True, message_id="msg1"),
SimpleNamespace(success=True, message_id="msg2"),
])
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
cfg = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor="", buffer_only=True)
consumer = GatewayStreamConsumer(adapter, "!room:server", config=cfg)
consumer.on_delta("Before tool call")
consumer.on_delta(None)
consumer.on_delta("After tool call")
consumer.finish()
await consumer.run()
assert adapter.send.call_count == 2
assert "Before tool call" in adapter.send.call_args_list[0][1]["content"]
assert "After tool call" in adapter.send.call_args_list[1][1]["content"]
adapter.edit_message.assert_not_called()
@pytest.mark.asyncio
async def test_flushes_on_commentary(self):
"""An interim commentary message flushes in buffer_only mode."""
adapter = MagicMock()
adapter.MAX_MESSAGE_LENGTH = 4096
adapter.send = AsyncMock(side_effect=[
SimpleNamespace(success=True, message_id="msg1"),
SimpleNamespace(success=True, message_id="msg2"),
SimpleNamespace(success=True, message_id="msg3"),
])
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
cfg = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor="", buffer_only=True)
consumer = GatewayStreamConsumer(adapter, "!room:server", config=cfg)
consumer.on_delta("Working on it...")
consumer.on_commentary("I'll search for that first.")
consumer.on_delta("Here are the results.")
consumer.finish()
await consumer.run()
# Three sends: accumulated text, commentary, final text
assert adapter.send.call_count >= 2
adapter.edit_message.assert_not_called()
@pytest.mark.asyncio
async def test_default_mode_still_triggers_intermediate_edits(self):
"""Regression: buffer_only=False (default) still does progressive edits."""
adapter = MagicMock()
adapter.MAX_MESSAGE_LENGTH = 4096
adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg1"))
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
# buffer_threshold=5 means any 5+ chars triggers an early edit
cfg = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5, cursor="")
consumer = GatewayStreamConsumer(adapter, "!room:server", config=cfg)
consumer.on_delta("Hello world, this is long enough to trigger edits")
consumer.finish()
await consumer.run()
# Should have at least one send. With buffer_threshold=5 and this much
# text, the consumer may send then edit, or just send once at got_done.
# The key assertion: this doesn't break.
assert adapter.send.call_count >= 1

View File

@@ -0,0 +1,200 @@
"""Tests for the activity-heartbeat behavior of the blocking gateway approval wait.
Regression test for false gateway inactivity timeouts firing while the agent
is legitimately blocked waiting for a user to respond to a dangerous-command
approval prompt. Before the fix, ``entry.event.wait(timeout=...)`` blocked
silently — no ``_touch_activity()`` calls — and the gateway's inactivity
watchdog (``agent.gateway_timeout``, default 1800s) would kill the agent
while the user was still choosing whether to approve.
The fix polls the event in short slices and fires ``touch_activity_if_due``
between slices, mirroring ``_wait_for_process`` in ``tools/environments/base.py``.
"""
import os
import threading
import time
from unittest.mock import patch
def _clear_approval_state():
"""Reset all module-level approval state between tests."""
from tools import approval as mod
mod._gateway_queues.clear()
mod._gateway_notify_cbs.clear()
mod._session_approved.clear()
mod._permanent_approved.clear()
mod._pending.clear()
class TestApprovalHeartbeat:
"""The blocking gateway approval wait must fire activity heartbeats.
Without heartbeats, the gateway's inactivity watchdog kills the agent
thread while it's legitimately waiting for a slow user to respond to
an approval prompt (observed in real user logs: MRB, April 2026).
"""
SESSION_KEY = "heartbeat-test-session"
def setup_method(self):
_clear_approval_state()
self._saved_env = {
k: os.environ.get(k)
for k in ("HERMES_GATEWAY_SESSION", "HERMES_YOLO_MODE",
"HERMES_SESSION_KEY")
}
os.environ.pop("HERMES_YOLO_MODE", None)
os.environ["HERMES_GATEWAY_SESSION"] = "1"
# The blocking wait path reads the session key via contextvar OR
# os.environ fallback. Contextvars don't propagate across threads
# by default, so env var is the portable way to drive this in tests.
os.environ["HERMES_SESSION_KEY"] = self.SESSION_KEY
def teardown_method(self):
for k, v in self._saved_env.items():
if v is None:
os.environ.pop(k, None)
else:
os.environ[k] = v
_clear_approval_state()
def test_heartbeat_fires_while_waiting_for_approval(self):
"""touch_activity_if_due is called repeatedly during the wait."""
from tools.approval import (
check_all_command_guards,
register_gateway_notify,
resolve_gateway_approval,
)
register_gateway_notify(self.SESSION_KEY, lambda _payload: None)
# Use an Event to signal from _fake_touch back to the main thread
# so we can resolve as soon as the first heartbeat fires — avoids
# flakiness from fixed sleeps racing against thread startup.
first_heartbeat = threading.Event()
heartbeat_calls: list[str] = []
def _fake_touch(state, label):
# Bypass the 10s throttle so the heartbeat fires every loop
# iteration; we're measuring whether the call happens at all.
heartbeat_calls.append(label)
state["last_touch"] = 0.0
first_heartbeat.set()
result_holder: dict = {}
def _run_check():
try:
with patch(
"tools.environments.base.touch_activity_if_due",
side_effect=_fake_touch,
):
result_holder["result"] = check_all_command_guards(
"rm -rf /tmp/nonexistent-heartbeat-target", "local"
)
except Exception as exc: # pragma: no cover
result_holder["exc"] = exc
thread = threading.Thread(target=_run_check, daemon=True)
thread.start()
# Wait for at least one heartbeat to fire — bounded at 10s to catch
# a genuinely hung worker thread without making a green run slow.
assert first_heartbeat.wait(timeout=10.0), (
"no heartbeat fired within 10s — the approval wait is blocking "
"without firing activity pings, which is the exact bug this "
"test exists to catch"
)
# Resolve the approval so the thread exits cleanly.
resolve_gateway_approval(self.SESSION_KEY, "once")
thread.join(timeout=5)
assert not thread.is_alive(), "approval wait did not exit after resolve"
assert "exc" not in result_holder, (
f"check_all_command_guards raised: {result_holder.get('exc')!r}"
)
# The fix: heartbeats fire while waiting. Before the fix this list
# was empty because event.wait() blocked for the full timeout with
# no activity pings.
assert heartbeat_calls, "expected at least one heartbeat"
assert all(
call == "waiting for user approval" for call in heartbeat_calls
), f"unexpected heartbeat labels: {set(heartbeat_calls)}"
# Sanity: the approval was resolved with "once" → command approved.
assert result_holder["result"]["approved"] is True
def test_wait_returns_immediately_on_user_response(self):
"""Polling slices don't delay responsiveness — resolve is near-instant."""
from tools.approval import (
check_all_command_guards,
register_gateway_notify,
resolve_gateway_approval,
)
register_gateway_notify(self.SESSION_KEY, lambda _payload: None)
start_time = time.monotonic()
result_holder: dict = {}
def _run_check():
result_holder["result"] = check_all_command_guards(
"rm -rf /tmp/nonexistent-fast-target", "local"
)
thread = threading.Thread(target=_run_check, daemon=True)
thread.start()
# Resolve almost immediately — the wait loop should return within
# its current 1s poll slice.
time.sleep(0.1)
resolve_gateway_approval(self.SESSION_KEY, "once")
thread.join(timeout=5)
elapsed = time.monotonic() - start_time
assert not thread.is_alive()
assert result_holder["result"]["approved"] is True
# Generous bound to tolerate CI load; the previous single-wait
# impl returned in <10ms, the polling impl is bounded by the 1s
# slice length.
assert elapsed < 3.0, f"resolution took {elapsed:.2f}s, expected <3s"
def test_heartbeat_import_failure_does_not_break_wait(self):
"""If tools.environments.base can't be imported, the wait still works."""
from tools.approval import (
check_all_command_guards,
register_gateway_notify,
resolve_gateway_approval,
)
register_gateway_notify(self.SESSION_KEY, lambda _payload: None)
result_holder: dict = {}
import builtins
real_import = builtins.__import__
def _fail_environments_base(name, *args, **kwargs):
if name == "tools.environments.base":
raise ImportError("simulated")
return real_import(name, *args, **kwargs)
def _run_check():
with patch.object(builtins, "__import__",
side_effect=_fail_environments_base):
result_holder["result"] = check_all_command_guards(
"rm -rf /tmp/nonexistent-import-fail-target", "local"
)
thread = threading.Thread(target=_run_check, daemon=True)
thread.start()
time.sleep(0.2)
resolve_gateway_approval(self.SESSION_KEY, "once")
thread.join(timeout=5)
assert not thread.is_alive()
# Even when heartbeat import fails, the approval flow completes.
assert result_holder["result"]["approved"] is True

View File

@@ -587,3 +587,112 @@ class TestSecurity:
result = mgr.restore(str(work_dir), target_hash, file_path="subdir/test.txt")
assert result["success"] is True
# =========================================================================
# GPG / global git config isolation
# =========================================================================
# Regression tests for the bug where users with ``commit.gpgsign = true``
# in their global git config got a pinentry popup (or a failed commit)
# every time the agent took a background snapshot.
import os as _os
class TestGpgAndGlobalConfigIsolation:
def test_git_env_isolates_global_and_system_config(self, tmp_path):
"""_git_env must null out GIT_CONFIG_GLOBAL / GIT_CONFIG_SYSTEM so the
shadow repo does not inherit user-level gpgsign, hooks, aliases, etc."""
env = _git_env(tmp_path / "shadow", str(tmp_path))
assert env["GIT_CONFIG_GLOBAL"] == _os.devnull
assert env["GIT_CONFIG_SYSTEM"] == _os.devnull
assert env["GIT_CONFIG_NOSYSTEM"] == "1"
def test_init_sets_commit_gpgsign_false(self, work_dir, checkpoint_base, monkeypatch):
monkeypatch.setattr("tools.checkpoint_manager.CHECKPOINT_BASE", checkpoint_base)
shadow = _shadow_repo_path(str(work_dir))
_init_shadow_repo(shadow, str(work_dir))
# Inspect the shadow's own config directly — the settings must be
# written into the repo, not just inherited via env vars.
result = subprocess.run(
["git", "config", "--file", str(shadow / "config"), "--get", "commit.gpgsign"],
capture_output=True, text=True,
)
assert result.stdout.strip() == "false"
def test_init_sets_tag_gpgsign_false(self, work_dir, checkpoint_base, monkeypatch):
monkeypatch.setattr("tools.checkpoint_manager.CHECKPOINT_BASE", checkpoint_base)
shadow = _shadow_repo_path(str(work_dir))
_init_shadow_repo(shadow, str(work_dir))
result = subprocess.run(
["git", "config", "--file", str(shadow / "config"), "--get", "tag.gpgSign"],
capture_output=True, text=True,
)
assert result.stdout.strip() == "false"
def test_checkpoint_works_with_global_gpgsign_and_broken_gpg(
self, work_dir, checkpoint_base, monkeypatch, tmp_path
):
"""The real bug scenario: user has global commit.gpgsign=true but GPG
is broken or pinentry is unavailable. Before the fix, every snapshot
either failed or spawned a pinentry window. After the fix, snapshots
succeed without ever invoking GPG."""
monkeypatch.setattr("tools.checkpoint_manager.CHECKPOINT_BASE", checkpoint_base)
# Fake HOME with global gpgsign=true and a deliberately broken GPG
# binary. If isolation fails, the commit will try to exec this
# nonexistent path and the checkpoint will fail.
fake_home = tmp_path / "fake_home"
fake_home.mkdir()
(fake_home / ".gitconfig").write_text(
"[user]\n email = real@user.com\n name = Real User\n"
"[commit]\n gpgsign = true\n"
"[tag]\n gpgSign = true\n"
"[gpg]\n program = /nonexistent/fake-gpg-binary\n"
)
monkeypatch.setenv("HOME", str(fake_home))
monkeypatch.delenv("GPG_TTY", raising=False)
monkeypatch.delenv("DISPLAY", raising=False) # block GUI pinentry
mgr = CheckpointManager(enabled=True)
assert mgr.ensure_checkpoint(str(work_dir), reason="with-global-gpgsign") is True
assert len(mgr.list_checkpoints(str(work_dir))) == 1
def test_checkpoint_works_on_prefix_shadow_without_local_gpgsign(
self, work_dir, checkpoint_base, monkeypatch, tmp_path
):
"""Users with shadow repos created before the fix will not have
commit.gpgsign=false in their shadow's own config. The inline
``--no-gpg-sign`` flag on the commit call must cover them."""
monkeypatch.setattr("tools.checkpoint_manager.CHECKPOINT_BASE", checkpoint_base)
# Simulate a pre-fix shadow repo: init without commit.gpgsign=false
# in its own config. _init_shadow_repo now writes it, so we must
# manually remove it to mimic the pre-fix state.
shadow = _shadow_repo_path(str(work_dir))
_init_shadow_repo(shadow, str(work_dir))
subprocess.run(
["git", "config", "--file", str(shadow / "config"),
"--unset", "commit.gpgsign"],
capture_output=True, text=True, check=False,
)
subprocess.run(
["git", "config", "--file", str(shadow / "config"),
"--unset", "tag.gpgSign"],
capture_output=True, text=True, check=False,
)
# And simulate hostile global config
fake_home = tmp_path / "fake_home"
fake_home.mkdir()
(fake_home / ".gitconfig").write_text(
"[commit]\n gpgsign = true\n"
"[gpg]\n program = /nonexistent/fake-gpg-binary\n"
)
monkeypatch.setenv("HOME", str(fake_home))
monkeypatch.delenv("GPG_TTY", raising=False)
monkeypatch.delenv("DISPLAY", raising=False)
mgr = CheckpointManager(enabled=True)
assert mgr.ensure_checkpoint(str(work_dir), reason="prefix-shadow") is True
assert len(mgr.list_checkpoints(str(work_dir))) == 1

View File

@@ -0,0 +1,287 @@
"""Tests for the Google Gemini TTS provider in tools/tts_tool.py."""
import base64
import struct
from unittest.mock import MagicMock, patch
import pytest
@pytest.fixture(autouse=True)
def clean_env(monkeypatch):
for key in (
"GEMINI_API_KEY",
"GOOGLE_API_KEY",
"GEMINI_BASE_URL",
"HERMES_SESSION_PLATFORM",
):
monkeypatch.delenv(key, raising=False)
@pytest.fixture
def fake_pcm_bytes():
# 0.1s of silence at 24kHz mono 16-bit = 4800 bytes
return b"\x00" * 4800
@pytest.fixture
def mock_gemini_response(fake_pcm_bytes):
"""A successful Gemini generateContent response."""
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = {
"candidates": [
{
"content": {
"parts": [
{
"inlineData": {
"mimeType": "audio/L16;codec=pcm;rate=24000",
"data": base64.b64encode(fake_pcm_bytes).decode(),
}
}
]
}
}
]
}
return resp
class TestWrapPcmAsWav:
def test_riff_header_structure(self):
from tools.tts_tool import _wrap_pcm_as_wav
pcm = b"\x01\x02\x03\x04" * 10
wav = _wrap_pcm_as_wav(pcm, sample_rate=24000, channels=1, sample_width=2)
assert wav[:4] == b"RIFF"
assert wav[8:12] == b"WAVE"
assert wav[12:16] == b"fmt "
# Audio format (PCM=1)
assert struct.unpack("<H", wav[20:22])[0] == 1
# Channels
assert struct.unpack("<H", wav[22:24])[0] == 1
# Sample rate
assert struct.unpack("<I", wav[24:28])[0] == 24000
# Bits per sample
assert struct.unpack("<H", wav[34:36])[0] == 16
assert wav[36:40] == b"data"
assert wav[44:] == pcm
def test_header_size_is_44(self):
from tools.tts_tool import _wrap_pcm_as_wav
pcm = b"\xff" * 100
wav = _wrap_pcm_as_wav(pcm)
assert len(wav) == 44 + len(pcm)
class TestGenerateGeminiTts:
def test_missing_api_key_raises_value_error(self, tmp_path):
from tools.tts_tool import _generate_gemini_tts
output_path = str(tmp_path / "test.wav")
with pytest.raises(ValueError, match="GEMINI_API_KEY"):
_generate_gemini_tts("Hello", output_path, {})
def test_google_api_key_fallback(self, tmp_path, monkeypatch, mock_gemini_response):
from tools.tts_tool import _generate_gemini_tts
monkeypatch.setenv("GOOGLE_API_KEY", "from-google-env")
output_path = str(tmp_path / "test.wav")
with patch("requests.post", return_value=mock_gemini_response) as mock_post:
_generate_gemini_tts("Hi", output_path, {})
# Confirm it used the GOOGLE_API_KEY as the query parameter
_, kwargs = mock_post.call_args
assert kwargs["params"]["key"] == "from-google-env"
def test_wav_output_fast_path(self, tmp_path, monkeypatch, mock_gemini_response, fake_pcm_bytes):
from tools.tts_tool import _generate_gemini_tts
monkeypatch.setenv("GEMINI_API_KEY", "test-key")
output_path = str(tmp_path / "test.wav")
with patch("requests.post", return_value=mock_gemini_response):
result = _generate_gemini_tts("Hi", output_path, {})
assert result == output_path
data = (tmp_path / "test.wav").read_bytes()
assert data[:4] == b"RIFF"
assert data[8:12] == b"WAVE"
# Audio payload should match the PCM we put in
assert data[44:] == fake_pcm_bytes
def test_default_voice_and_model(self, tmp_path, monkeypatch, mock_gemini_response):
from tools.tts_tool import (
DEFAULT_GEMINI_TTS_MODEL,
DEFAULT_GEMINI_TTS_VOICE,
_generate_gemini_tts,
)
monkeypatch.setenv("GEMINI_API_KEY", "test-key")
with patch("requests.post", return_value=mock_gemini_response) as mock_post:
_generate_gemini_tts("Hi", str(tmp_path / "test.wav"), {})
args, kwargs = mock_post.call_args
assert DEFAULT_GEMINI_TTS_MODEL in args[0]
payload = kwargs["json"]
voice = (
payload["generationConfig"]["speechConfig"]["voiceConfig"]
["prebuiltVoiceConfig"]["voiceName"]
)
assert voice == DEFAULT_GEMINI_TTS_VOICE
def test_custom_voice(self, tmp_path, monkeypatch, mock_gemini_response):
from tools.tts_tool import _generate_gemini_tts
monkeypatch.setenv("GEMINI_API_KEY", "test-key")
config = {"gemini": {"voice": "Puck"}}
with patch("requests.post", return_value=mock_gemini_response) as mock_post:
_generate_gemini_tts("Hi", str(tmp_path / "test.wav"), config)
payload = mock_post.call_args[1]["json"]
voice = (
payload["generationConfig"]["speechConfig"]["voiceConfig"]
["prebuiltVoiceConfig"]["voiceName"]
)
assert voice == "Puck"
def test_custom_model(self, tmp_path, monkeypatch, mock_gemini_response):
from tools.tts_tool import _generate_gemini_tts
monkeypatch.setenv("GEMINI_API_KEY", "test-key")
config = {"gemini": {"model": "gemini-2.5-pro-preview-tts"}}
with patch("requests.post", return_value=mock_gemini_response) as mock_post:
_generate_gemini_tts("Hi", str(tmp_path / "test.wav"), config)
endpoint = mock_post.call_args[0][0]
assert "gemini-2.5-pro-preview-tts" in endpoint
def test_response_modality_is_audio(self, tmp_path, monkeypatch, mock_gemini_response):
from tools.tts_tool import _generate_gemini_tts
monkeypatch.setenv("GEMINI_API_KEY", "test-key")
with patch("requests.post", return_value=mock_gemini_response) as mock_post:
_generate_gemini_tts("Hi", str(tmp_path / "test.wav"), {})
payload = mock_post.call_args[1]["json"]
assert payload["generationConfig"]["responseModalities"] == ["AUDIO"]
def test_http_error_raises_runtime_error(self, tmp_path, monkeypatch):
from tools.tts_tool import _generate_gemini_tts
monkeypatch.setenv("GEMINI_API_KEY", "test-key")
err_resp = MagicMock()
err_resp.status_code = 400
err_resp.json.return_value = {"error": {"message": "Invalid voice"}}
with patch("requests.post", return_value=err_resp):
with pytest.raises(RuntimeError, match="HTTP 400.*Invalid voice"):
_generate_gemini_tts("Hi", str(tmp_path / "test.wav"), {})
def test_empty_audio_raises(self, tmp_path, monkeypatch):
from tools.tts_tool import _generate_gemini_tts
monkeypatch.setenv("GEMINI_API_KEY", "test-key")
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = {
"candidates": [
{"content": {"parts": [{"inlineData": {"data": ""}}]}}
]
}
with patch("requests.post", return_value=resp):
with pytest.raises(RuntimeError, match="empty audio"):
_generate_gemini_tts("Hi", str(tmp_path / "test.wav"), {})
def test_malformed_response_raises(self, tmp_path, monkeypatch):
from tools.tts_tool import _generate_gemini_tts
monkeypatch.setenv("GEMINI_API_KEY", "test-key")
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = {"candidates": []} # no content
with patch("requests.post", return_value=resp):
with pytest.raises(RuntimeError, match="malformed"):
_generate_gemini_tts("Hi", str(tmp_path / "test.wav"), {})
def test_snake_case_inline_data_accepted(self, tmp_path, monkeypatch, fake_pcm_bytes):
"""Some Gemini SDK versions return inline_data instead of inlineData."""
from tools.tts_tool import _generate_gemini_tts
monkeypatch.setenv("GEMINI_API_KEY", "test-key")
resp = MagicMock()
resp.status_code = 200
resp.json.return_value = {
"candidates": [
{
"content": {
"parts": [
{
"inline_data": {
"data": base64.b64encode(fake_pcm_bytes).decode()
}
}
]
}
}
]
}
output_path = str(tmp_path / "test.wav")
with patch("requests.post", return_value=resp):
_generate_gemini_tts("Hi", output_path, {})
data = (tmp_path / "test.wav").read_bytes()
assert data[:4] == b"RIFF"
def test_custom_base_url_env(self, tmp_path, monkeypatch, mock_gemini_response):
from tools.tts_tool import _generate_gemini_tts
monkeypatch.setenv("GEMINI_API_KEY", "test-key")
monkeypatch.setenv("GEMINI_BASE_URL", "https://custom-gemini.example.com/v1beta")
with patch("requests.post", return_value=mock_gemini_response) as mock_post:
_generate_gemini_tts("Hi", str(tmp_path / "test.wav"), {})
assert mock_post.call_args[0][0].startswith("https://custom-gemini.example.com/v1beta/")
class TestGeminiInCheckRequirements:
def test_gemini_api_key_satisfies_requirements(self, monkeypatch):
from tools.tts_tool import check_tts_requirements
# Strip everything else
for key in (
"ELEVENLABS_API_KEY",
"OPENAI_API_KEY",
"VOICE_TOOLS_OPENAI_KEY",
"MINIMAX_API_KEY",
"XAI_API_KEY",
"MISTRAL_API_KEY",
"GOOGLE_API_KEY",
):
monkeypatch.delenv(key, raising=False)
monkeypatch.setenv("GEMINI_API_KEY", "k")
# Force edge_tts import to fail so we actually hit the gemini check
import builtins
real_import = builtins.__import__
def fake_import(name, *args, **kwargs):
if name == "edge_tts":
raise ImportError("simulated")
return real_import(name, *args, **kwargs)
with patch("builtins.__import__", side_effect=fake_import):
assert check_tts_requirements() is True

View File

@@ -14,6 +14,7 @@ import os
import re
import sys
import threading
import time
import unicodedata
from typing import Optional
@@ -834,13 +835,43 @@ def check_all_command_guards(command: str, env_type: str,
"description": combined_desc,
}
# Block until the user responds or timeout (default 5 min)
# Block until the user responds or timeout (default 5 min).
# Poll in short slices so we can fire activity heartbeats every
# ~10s to the agent's inactivity tracker. Without this, the
# blocking event.wait() never touches activity, and the
# gateway's inactivity watchdog (agent.gateway_timeout, default
# 1800s) kills the agent while the user is still responding to
# the approval prompt. Mirrors the _wait_for_process() cadence
# in tools/environments/base.py.
timeout = _get_approval_config().get("gateway_timeout", 300)
try:
timeout = int(timeout)
except (ValueError, TypeError):
timeout = 300
resolved = entry.event.wait(timeout=timeout)
try:
from tools.environments.base import touch_activity_if_due
except Exception: # pragma: no cover
touch_activity_if_due = None
_now = time.monotonic()
_deadline = _now + max(timeout, 0)
_activity_state = {"last_touch": _now, "start": _now}
resolved = False
while True:
_remaining = _deadline - time.monotonic()
if _remaining <= 0:
break
# 1s poll slice — the event is set immediately when the
# user responds, so slice length only controls heartbeat
# cadence, not user-visible responsiveness.
if entry.event.wait(timeout=min(1.0, _remaining)):
resolved = True
break
if touch_activity_if_due is not None:
touch_activity_if_due(
_activity_state, "waiting for user approval"
)
# Clean up this entry from the queue
with _lock:

View File

@@ -126,7 +126,22 @@ def _shadow_repo_path(working_dir: str) -> Path:
def _git_env(shadow_repo: Path, working_dir: str) -> dict:
"""Build env dict that redirects git to the shadow repo."""
"""Build env dict that redirects git to the shadow repo.
The shadow repo is internal Hermes infrastructure — it must NOT inherit
the user's global or system git config. User-level settings like
``commit.gpgsign = true``, signing hooks, or credential helpers would
either break background snapshots or, worse, spawn interactive prompts
(pinentry GUI windows) mid-session every time a file is written.
Isolation strategy:
* ``GIT_CONFIG_GLOBAL=<os.devnull>`` — ignore ``~/.gitconfig`` (git 2.32+).
* ``GIT_CONFIG_SYSTEM=<os.devnull>`` — ignore ``/etc/gitconfig`` (git 2.32+).
* ``GIT_CONFIG_NOSYSTEM=1`` — legacy belt-and-suspenders for older git.
The shadow repo still has its own per-repo config (user.email, user.name,
commit.gpgsign=false) set in ``_init_shadow_repo``.
"""
normalized_working_dir = _normalize_path(working_dir)
env = os.environ.copy()
env["GIT_DIR"] = str(shadow_repo)
@@ -134,6 +149,13 @@ def _git_env(shadow_repo: Path, working_dir: str) -> dict:
env.pop("GIT_INDEX_FILE", None)
env.pop("GIT_NAMESPACE", None)
env.pop("GIT_ALTERNATE_OBJECT_DIRECTORIES", None)
# Isolate the shadow repo from the user's global/system git config.
# Prevents commit.gpgsign, hooks, aliases, credential helpers, etc. from
# leaking into background snapshots. Uses os.devnull for cross-platform
# support (``/dev/null`` on POSIX, ``nul`` on Windows).
env["GIT_CONFIG_GLOBAL"] = os.devnull
env["GIT_CONFIG_SYSTEM"] = os.devnull
env["GIT_CONFIG_NOSYSTEM"] = "1"
return env
@@ -211,6 +233,13 @@ def _init_shadow_repo(shadow_repo: Path, working_dir: str) -> Optional[str]:
_run_git(["config", "user.email", "hermes@local"], shadow_repo, working_dir)
_run_git(["config", "user.name", "Hermes Checkpoint"], shadow_repo, working_dir)
# Explicitly disable commit/tag signing in the shadow repo. _git_env
# already isolates from the user's global config, but writing these into
# the shadow's own config is belt-and-suspenders — it guarantees the
# shadow repo is correct even if someone inspects or runs git against it
# directly (without the GIT_CONFIG_* env vars).
_run_git(["config", "commit.gpgsign", "false"], shadow_repo, working_dir)
_run_git(["config", "tag.gpgSign", "false"], shadow_repo, working_dir)
info_dir = shadow_repo / "info"
info_dir.mkdir(exist_ok=True)
@@ -552,9 +581,11 @@ class CheckpointManager:
logger.debug("Checkpoint skipped: no changes in %s", working_dir)
return False
# Commit
# Commit. ``--no-gpg-sign`` inline covers shadow repos created before
# the commit.gpgsign=false config was added to _init_shadow_repo — so
# users with existing checkpoints never hit a GPG pinentry popup.
ok, _, err = _run_git(
["commit", "-m", reason, "--allow-empty-message"],
["commit", "-m", reason, "--allow-empty-message", "--no-gpg-sign"],
shadow, working_dir, timeout=_GIT_TIMEOUT * 2,
)
if not ok:

View File

@@ -2,12 +2,13 @@
"""
Text-to-Speech Tool Module
Supports six TTS providers:
Supports seven TTS providers:
- Edge TTS (default, free, no API key): Microsoft Edge neural voices
- ElevenLabs (premium): High-quality voices, needs ELEVENLABS_API_KEY
- OpenAI TTS: Good quality, needs OPENAI_API_KEY
- MiniMax TTS: High-quality with voice cloning, needs MINIMAX_API_KEY
- Mistral (Voxtral TTS): Multilingual, native Opus, needs MISTRAL_API_KEY
- Google Gemini TTS: Controllable, 30 prebuilt voices, needs GEMINI_API_KEY
- NeuTTS (local, free, no API key): On-device TTS via neutts_cli, needs neutts installed
Output formats:
@@ -99,6 +100,13 @@ DEFAULT_XAI_LANGUAGE = "en"
DEFAULT_XAI_SAMPLE_RATE = 24000
DEFAULT_XAI_BIT_RATE = 128000
DEFAULT_XAI_BASE_URL = "https://api.x.ai/v1"
DEFAULT_GEMINI_TTS_MODEL = "gemini-2.5-flash-preview-tts"
DEFAULT_GEMINI_TTS_VOICE = "Kore"
DEFAULT_GEMINI_TTS_BASE_URL = "https://generativelanguage.googleapis.com/v1beta"
# PCM output specs for Gemini TTS (fixed by the API)
GEMINI_TTS_SAMPLE_RATE = 24000
GEMINI_TTS_CHANNELS = 1
GEMINI_TTS_SAMPLE_WIDTH = 2 # 16-bit PCM (L16)
def _get_default_output_dir() -> str:
from hermes_constants import get_hermes_dir
@@ -506,6 +514,174 @@ def _generate_mistral_tts(text: str, output_path: str, tts_config: Dict[str, Any
return output_path
# ===========================================================================
# Provider: Google Gemini TTS
# ===========================================================================
def _wrap_pcm_as_wav(
pcm_bytes: bytes,
sample_rate: int = GEMINI_TTS_SAMPLE_RATE,
channels: int = GEMINI_TTS_CHANNELS,
sample_width: int = GEMINI_TTS_SAMPLE_WIDTH,
) -> bytes:
"""Wrap raw signed-little-endian PCM with a standard WAV RIFF header.
Gemini TTS returns audio/L16;codec=pcm;rate=24000 -- raw PCM samples with
no container. We add a minimal WAV header so the file is playable and
ffmpeg can re-encode it to MP3/Opus downstream.
"""
import struct
byte_rate = sample_rate * channels * sample_width
block_align = channels * sample_width
data_size = len(pcm_bytes)
fmt_chunk = struct.pack(
"<4sIHHIIHH",
b"fmt ",
16, # fmt chunk size (PCM)
1, # audio format (PCM)
channels,
sample_rate,
byte_rate,
block_align,
sample_width * 8,
)
data_chunk_header = struct.pack("<4sI", b"data", data_size)
riff_size = 4 + len(fmt_chunk) + len(data_chunk_header) + data_size
riff_header = struct.pack("<4sI4s", b"RIFF", riff_size, b"WAVE")
return riff_header + fmt_chunk + data_chunk_header + pcm_bytes
def _generate_gemini_tts(text: str, output_path: str, tts_config: Dict[str, Any]) -> str:
"""Generate audio using Google Gemini TTS.
Gemini's generateContent endpoint with responseModalities=["AUDIO"] returns
raw 24kHz mono 16-bit PCM (L16) as base64. We wrap it with a WAV RIFF
header to produce a playable file, then ffmpeg-convert to MP3 / Opus if
the caller requested those formats (same pattern as NeuTTS).
Args:
text: Text to convert (prompt-style; supports inline direction like
"Say cheerfully:" and audio tags like [whispers]).
output_path: Where to save the audio file (.wav, .mp3, or .ogg).
tts_config: TTS config dict.
Returns:
Path to the saved audio file.
"""
import requests
api_key = (os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY") or "").strip()
if not api_key:
raise ValueError(
"GEMINI_API_KEY not set. Get one at https://aistudio.google.com/app/apikey"
)
gemini_config = tts_config.get("gemini", {})
model = str(gemini_config.get("model", DEFAULT_GEMINI_TTS_MODEL)).strip() or DEFAULT_GEMINI_TTS_MODEL
voice = str(gemini_config.get("voice", DEFAULT_GEMINI_TTS_VOICE)).strip() or DEFAULT_GEMINI_TTS_VOICE
base_url = str(
gemini_config.get("base_url")
or os.getenv("GEMINI_BASE_URL")
or DEFAULT_GEMINI_TTS_BASE_URL
).strip().rstrip("/")
payload: Dict[str, Any] = {
"contents": [{"parts": [{"text": text}]}],
"generationConfig": {
"responseModalities": ["AUDIO"],
"speechConfig": {
"voiceConfig": {
"prebuiltVoiceConfig": {"voiceName": voice},
},
},
},
}
endpoint = f"{base_url}/models/{model}:generateContent"
response = requests.post(
endpoint,
params={"key": api_key},
headers={"Content-Type": "application/json"},
json=payload,
timeout=60,
)
if response.status_code != 200:
# Surface the API error message when present
try:
err = response.json().get("error", {})
detail = err.get("message") or response.text[:300]
except Exception:
detail = response.text[:300]
raise RuntimeError(
f"Gemini TTS API error (HTTP {response.status_code}): {detail}"
)
try:
data = response.json()
parts = data["candidates"][0]["content"]["parts"]
audio_part = next((p for p in parts if "inlineData" in p or "inline_data" in p), None)
if audio_part is None:
raise RuntimeError("Gemini TTS response contained no audio data")
inline = audio_part.get("inlineData") or audio_part.get("inline_data") or {}
audio_b64 = inline.get("data", "")
except (KeyError, IndexError, TypeError) as e:
raise RuntimeError(f"Gemini TTS response was malformed: {e}") from e
if not audio_b64:
raise RuntimeError("Gemini TTS returned empty audio data")
pcm_bytes = base64.b64decode(audio_b64)
wav_bytes = _wrap_pcm_as_wav(pcm_bytes)
# Fast path: caller wants WAV directly, just write.
if output_path.lower().endswith(".wav"):
with open(output_path, "wb") as f:
f.write(wav_bytes)
return output_path
# Otherwise write WAV to a temp file and ffmpeg-convert to the target
# format (.mp3 or .ogg). If ffmpeg is missing, fall back to renaming the
# WAV -- this matches the NeuTTS behavior and keeps the tool usable on
# systems without ffmpeg (audio still plays, just with a misleading
# extension).
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
tmp.write(wav_bytes)
wav_path = tmp.name
try:
ffmpeg = shutil.which("ffmpeg")
if ffmpeg:
# For .ogg output, force libopus encoding (Telegram voice bubbles
# require Opus specifically; ffmpeg's default for .ogg is Vorbis).
if output_path.lower().endswith(".ogg"):
cmd = [
ffmpeg, "-i", wav_path,
"-acodec", "libopus", "-ac", "1",
"-b:a", "64k", "-vbr", "off",
"-y", "-loglevel", "error",
output_path,
]
else:
cmd = [ffmpeg, "-i", wav_path, "-y", "-loglevel", "error", output_path]
result = subprocess.run(cmd, capture_output=True, timeout=30)
if result.returncode != 0:
stderr = result.stderr.decode("utf-8", errors="ignore")[:300]
raise RuntimeError(f"ffmpeg conversion failed: {stderr}")
else:
logger.warning(
"ffmpeg not found; writing raw WAV to %s (extension may be misleading)",
output_path,
)
shutil.copyfile(wav_path, output_path)
finally:
try:
os.remove(wav_path)
except OSError:
pass
return output_path
# ===========================================================================
# NeuTTS (local, on-device TTS via neutts_cli)
# ===========================================================================
@@ -634,7 +810,7 @@ def text_to_speech_tool(
out_dir.mkdir(parents=True, exist_ok=True)
# Use .ogg for Telegram with providers that support native Opus output,
# otherwise fall back to .mp3 (Edge TTS will attempt ffmpeg conversion later).
if want_opus and provider in ("openai", "elevenlabs", "mistral"):
if want_opus and provider in ("openai", "elevenlabs", "mistral", "gemini"):
file_path = out_dir / f"tts_{timestamp}.ogg"
else:
file_path = out_dir / f"tts_{timestamp}.mp3"
@@ -687,6 +863,10 @@ def text_to_speech_tool(
logger.info("Generating speech with Mistral Voxtral TTS...")
_generate_mistral_tts(text, file_str, tts_config)
elif provider == "gemini":
logger.info("Generating speech with Google Gemini TTS...")
_generate_gemini_tts(text, file_str, tts_config)
elif provider == "neutts":
if not _check_neutts_available():
return json.dumps({
@@ -741,7 +921,7 @@ def text_to_speech_tool(
if opus_path:
file_str = opus_path
voice_compatible = True
elif provider in ("elevenlabs", "openai", "mistral"):
elif provider in ("elevenlabs", "openai", "mistral", "gemini"):
voice_compatible = file_str.endswith(".ogg")
file_size = os.path.getsize(file_str)
@@ -811,6 +991,8 @@ def check_tts_requirements() -> bool:
return True
if os.getenv("XAI_API_KEY"):
return True
if os.getenv("GEMINI_API_KEY") or os.getenv("GOOGLE_API_KEY"):
return True
try:
_import_mistral_client()
if os.getenv("MISTRAL_API_KEY"):

View File

@@ -14,7 +14,7 @@ If you have a paid [Nous Portal](https://portal.nousresearch.com) subscription,
## Text-to-Speech
Convert text to speech with six providers:
Convert text to speech with seven providers:
| Provider | Quality | Cost | API Key |
|----------|---------|------|---------|
@@ -23,6 +23,7 @@ Convert text to speech with six providers:
| **OpenAI TTS** | Good | Paid | `VOICE_TOOLS_OPENAI_KEY` |
| **MiniMax TTS** | Excellent | Paid | `MINIMAX_API_KEY` |
| **Mistral (Voxtral TTS)** | Excellent | Paid | `MISTRAL_API_KEY` |
| **Google Gemini TTS** | Excellent | Free tier | `GEMINI_API_KEY` |
| **NeuTTS** | Good | Free | None needed |
### Platform Delivery
@@ -39,7 +40,7 @@ Convert text to speech with six providers:
```yaml
# In ~/.hermes/config.yaml
tts:
provider: "edge" # "edge" | "elevenlabs" | "openai" | "minimax" | "mistral" | "neutts"
provider: "edge" # "edge" | "elevenlabs" | "openai" | "minimax" | "mistral" | "gemini" | "neutts"
speed: 1.0 # Global speed multiplier (provider-specific settings override this)
edge:
voice: "en-US-AriaNeural" # 322 voices, 74 languages
@@ -61,6 +62,9 @@ tts:
mistral:
model: "voxtral-mini-tts-2603"
voice_id: "c69964a6-ab8b-4f8a-9465-ec0925096ec8" # Paul - Neutral (default)
gemini:
model: "gemini-2.5-flash-preview-tts" # or gemini-2.5-pro-preview-tts
voice: "Kore" # 30 prebuilt voices: Zephyr, Puck, Kore, Enceladus, Gacrux, etc.
neutts:
ref_audio: ''
ref_text: ''
@@ -77,6 +81,7 @@ Telegram voice bubbles require Opus/OGG audio format:
- **OpenAI, ElevenLabs, and Mistral** produce Opus natively — no extra setup
- **Edge TTS** (default) outputs MP3 and needs **ffmpeg** to convert:
- **MiniMax TTS** outputs MP3 and needs **ffmpeg** to convert for Telegram voice bubbles
- **Google Gemini TTS** outputs raw PCM and uses **ffmpeg** to encode Opus directly for Telegram voice bubbles
- **NeuTTS** outputs WAV and also needs **ffmpeg** to convert for Telegram voice bubbles
```bash

View File

@@ -284,8 +284,40 @@ MATRIX_RECOVERY_KEY=EsT... your recovery key here
On each startup, if `MATRIX_RECOVERY_KEY` is set, Hermes imports cross-signing keys from the homeserver's secure secret storage and signs the current device. This is idempotent and safe to leave enabled permanently.
:::warning
If you delete the `~/.hermes/platforms/matrix/store/` directory, the bot loses its encryption keys. You'll need to verify the device again in your Matrix client. Back up this directory if you want to preserve encrypted sessions.
:::warning[Deleting the crypto store]
If you delete `~/.hermes/platforms/matrix/store/crypto.db`, the bot loses its encryption identity. Simply restarting with the same device ID will **not** fully recover — the homeserver still holds one-time keys signed with the old identity key, and peers cannot establish new Olm sessions.
Hermes detects this condition on startup and refuses to enable E2EE, logging: `device XXXX has stale one-time keys on the server signed with a previous identity key`.
**Easiest recovery: generate a new access token** (which gets a fresh device ID with no stale key history). See the "Upgrading from a previous version with E2EE" section below. This is the most reliable path and avoids touching the homeserver database.
**Manual recovery** (advanced — keeps the same device ID):
1. Stop Synapse and delete the old device from its database:
```bash
sudo systemctl stop matrix-synapse
sudo sqlite3 /var/lib/matrix-synapse/homeserver.db "
DELETE FROM e2e_device_keys_json WHERE device_id = 'DEVICE_ID' AND user_id = '@hermes:your-server';
DELETE FROM e2e_one_time_keys_json WHERE device_id = 'DEVICE_ID' AND user_id = '@hermes:your-server';
DELETE FROM e2e_fallback_keys_json WHERE device_id = 'DEVICE_ID' AND user_id = '@hermes:your-server';
DELETE FROM devices WHERE device_id = 'DEVICE_ID' AND user_id = '@hermes:your-server';
"
sudo systemctl start matrix-synapse
```
Or via the Synapse admin API (note the URL-encoded user ID):
```bash
curl -X DELETE -H "Authorization: Bearer ADMIN_TOKEN" \
'https://your-server/_synapse/admin/v2/users/%40hermes%3Ayour-server/devices/DEVICE_ID'
```
Note: deleting a device via the admin API may also invalidate the associated access token. You may need to generate a new token afterward.
2. Delete the local crypto store and restart Hermes:
```bash
rm -f ~/.hermes/platforms/matrix/store/crypto.db*
# restart hermes
```
Other Matrix clients (Element, matrix-commander) may cache the old device keys. After recovery, type `/discardsession` in Element to force a new encryption session with the bot.
:::
:::info
@@ -361,6 +393,10 @@ pip install 'hermes-agent[matrix]'
### Upgrading from a previous version with E2EE
:::tip
If you also manually deleted `crypto.db`, see the "Deleting the crypto store" warning in the E2EE section above — there are additional steps to clear stale one-time keys from the homeserver.
:::
If you previously used Hermes with `MATRIX_ENCRYPTION=true` and are upgrading to
a version that uses the new SQLite-based crypto store, the bot's encryption
identity has changed. Your Matrix client (Element) may cache the old device keys