Merge remote-tracking branch 'origin/main' into bb/tui-long-session-perf

# Conflicts:
#	ui-tui/src/app/interfaces.ts
This commit is contained in:
Brooklyn Nicholson
2026-04-26 13:39:57 -05:00
82 changed files with 6072 additions and 712 deletions

View File

@@ -192,6 +192,43 @@ class TestDefaultContextLengths:
f"{model_id}: expected {expected_ctx}, got {actual}"
)
def test_deepseek_v4_models_1m_context(self):
from agent.model_metadata import get_model_context_length
from unittest.mock import patch as mock_patch
expected_keys = {
"deepseek-v4-pro": 1_000_000,
"deepseek-v4-flash": 1_000_000,
"deepseek-chat": 1_000_000,
"deepseek-reasoner": 1_000_000,
}
for key, value in expected_keys.items():
assert key in DEFAULT_CONTEXT_LENGTHS, f"{key} missing"
assert DEFAULT_CONTEXT_LENGTHS[key] == value, (
f"{key} should be {value}, got {DEFAULT_CONTEXT_LENGTHS[key]}"
)
# Longest-first substring matching must resolve both the bare V4
# ids (native DeepSeek) and the vendor-prefixed forms (OpenRouter
# / Nous Portal) to 1M without probing down to the legacy 128K
# ``deepseek`` substring fallback.
with mock_patch("agent.model_metadata.fetch_model_metadata", return_value={}), \
mock_patch("agent.model_metadata.fetch_endpoint_model_metadata", return_value={}), \
mock_patch("agent.model_metadata.get_cached_context_length", return_value=None):
cases = [
("deepseek-v4-pro", 1_000_000),
("deepseek-v4-flash", 1_000_000),
("deepseek/deepseek-v4-pro", 1_000_000),
("deepseek/deepseek-v4-flash", 1_000_000),
("deepseek-chat", 1_000_000),
("deepseek-reasoner", 1_000_000),
]
for model_id, expected_ctx in cases:
actual = get_model_context_length(model_id)
assert actual == expected_ctx, (
f"{model_id}: expected {expected_ctx}, got {actual}"
)
def test_all_values_positive(self):
for key, value in DEFAULT_CONTEXT_LENGTHS.items():
assert value > 0, f"{key} has non-positive context length"
@@ -303,7 +340,9 @@ class TestCodexOAuthContextLength:
from agent.model_metadata import get_model_context_length
# OpenRouter — should hit its own catalog path first; when mocked
# empty, falls through to hardcoded DEFAULT_CONTEXT_LENGTHS (400k).
# empty, falls through to hardcoded DEFAULT_CONTEXT_LENGTHS (1.05M,
# matching the real direct-API value — Codex OAuth's 272k cap is
# provider-specific and must not leak here).
with patch("agent.model_metadata.fetch_model_metadata", return_value={}), \
patch("agent.model_metadata.fetch_endpoint_model_metadata", return_value={}), \
patch("agent.model_metadata.get_cached_context_length", return_value=None), \
@@ -314,7 +353,7 @@ class TestCodexOAuthContextLength:
api_key="",
provider="openrouter",
)
assert ctx == 400_000, (
assert ctx == 1_050_000, (
f"Non-Codex gpt-5.5 resolved to {ctx}; Codex 272k override "
"leaked outside openai-codex provider"
)

View File

@@ -251,3 +251,141 @@ class TestAuxiliaryClientIntegration:
monkeypatch.setattr(aux, "_read_nous_auth", lambda: None)
result = aux._try_nous()
assert result == (None, None)
class TestIsGenuineNousRateLimit:
"""Tell a real account-level 429 apart from an upstream-capacity 429.
Nous Portal multiplexes upstreams (DeepSeek, Kimi, MiMo, Hermes).
A 429 from an upstream out of capacity should NOT trip the
cross-session breaker; a real user-quota 429 should.
"""
def test_exhausted_hourly_bucket_in_429_headers_is_genuine(self):
from agent.nous_rate_guard import is_genuine_nous_rate_limit
headers = {
"x-ratelimit-limit-requests-1h": "800",
"x-ratelimit-remaining-requests-1h": "0",
"x-ratelimit-reset-requests-1h": "3100",
"x-ratelimit-limit-requests": "200",
"x-ratelimit-remaining-requests": "198",
"x-ratelimit-reset-requests": "40",
}
assert is_genuine_nous_rate_limit(headers=headers) is True
def test_exhausted_tokens_bucket_is_genuine(self):
from agent.nous_rate_guard import is_genuine_nous_rate_limit
headers = {
"x-ratelimit-limit-tokens": "800000",
"x-ratelimit-remaining-tokens": "0",
"x-ratelimit-reset-tokens": "45", # < 60s threshold -> not genuine
"x-ratelimit-limit-tokens-1h": "8000000",
"x-ratelimit-remaining-tokens-1h": "0",
"x-ratelimit-reset-tokens-1h": "1800", # >= 60s threshold -> genuine
}
assert is_genuine_nous_rate_limit(headers=headers) is True
def test_healthy_headers_on_429_are_upstream_capacity(self):
# Classic upstream-capacity symptom: Nous edge reports plenty of
# headroom on every bucket, but returns 429 anyway because
# upstream (DeepSeek / Kimi / ...) is out of capacity.
from agent.nous_rate_guard import is_genuine_nous_rate_limit
headers = {
"x-ratelimit-limit-requests": "200",
"x-ratelimit-remaining-requests": "198",
"x-ratelimit-reset-requests": "40",
"x-ratelimit-limit-requests-1h": "800",
"x-ratelimit-remaining-requests-1h": "750",
"x-ratelimit-reset-requests-1h": "3100",
"x-ratelimit-limit-tokens": "800000",
"x-ratelimit-remaining-tokens": "790000",
"x-ratelimit-reset-tokens": "40",
"x-ratelimit-limit-tokens-1h": "8000000",
"x-ratelimit-remaining-tokens-1h": "7800000",
"x-ratelimit-reset-tokens-1h": "3100",
}
assert is_genuine_nous_rate_limit(headers=headers) is False
def test_bare_429_with_no_headers_is_upstream(self):
from agent.nous_rate_guard import is_genuine_nous_rate_limit
assert is_genuine_nous_rate_limit(headers=None) is False
assert is_genuine_nous_rate_limit(headers={}) is False
assert is_genuine_nous_rate_limit(
headers={"content-type": "application/json"}
) is False
def test_exhausted_bucket_with_short_reset_is_not_genuine(self):
# remaining == 0 but reset in < 60s: almost certainly a
# secondary per-minute throttle that will clear immediately --
# not worth tripping the cross-session breaker.
from agent.nous_rate_guard import is_genuine_nous_rate_limit
headers = {
"x-ratelimit-limit-requests": "200",
"x-ratelimit-remaining-requests": "0",
"x-ratelimit-reset-requests": "30",
}
assert is_genuine_nous_rate_limit(headers=headers) is False
def test_last_known_state_with_exhausted_bucket_triggers_genuine(self):
# Headers on the 429 lack rate-limit info, but the previous
# successful response already showed the hourly bucket
# exhausted -- the 429 is almost certainly that limit
# continuing.
from agent.nous_rate_guard import is_genuine_nous_rate_limit
from agent.rate_limit_tracker import parse_rate_limit_headers
prior_headers = {
"x-ratelimit-limit-requests-1h": "800",
"x-ratelimit-remaining-requests-1h": "0",
"x-ratelimit-reset-requests-1h": "2000",
"x-ratelimit-limit-requests": "200",
"x-ratelimit-remaining-requests": "100",
"x-ratelimit-reset-requests": "30",
"x-ratelimit-limit-tokens": "800000",
"x-ratelimit-remaining-tokens": "700000",
"x-ratelimit-reset-tokens": "30",
"x-ratelimit-limit-tokens-1h": "8000000",
"x-ratelimit-remaining-tokens-1h": "7000000",
"x-ratelimit-reset-tokens-1h": "2000",
}
last_state = parse_rate_limit_headers(prior_headers, provider="nous")
assert is_genuine_nous_rate_limit(
headers=None, last_known_state=last_state
) is True
def test_last_known_state_all_healthy_stays_upstream(self):
# Prior state was healthy; bare 429 arrives; should be treated
# as upstream capacity.
from agent.nous_rate_guard import is_genuine_nous_rate_limit
from agent.rate_limit_tracker import parse_rate_limit_headers
prior_headers = {
"x-ratelimit-limit-requests-1h": "800",
"x-ratelimit-remaining-requests-1h": "750",
"x-ratelimit-reset-requests-1h": "2000",
"x-ratelimit-limit-requests": "200",
"x-ratelimit-remaining-requests": "180",
"x-ratelimit-reset-requests": "30",
"x-ratelimit-limit-tokens": "800000",
"x-ratelimit-remaining-tokens": "790000",
"x-ratelimit-reset-tokens": "30",
"x-ratelimit-limit-tokens-1h": "8000000",
"x-ratelimit-remaining-tokens-1h": "7900000",
"x-ratelimit-reset-tokens-1h": "2000",
}
last_state = parse_rate_limit_headers(prior_headers, provider="nous")
assert is_genuine_nous_rate_limit(
headers=None, last_known_state=last_state
) is False
def test_none_last_state_and_no_headers_is_upstream(self):
from agent.nous_rate_guard import is_genuine_nous_rate_limit
assert is_genuine_nous_rate_limit(
headers=None, last_known_state=None
) is False

View File

@@ -0,0 +1,164 @@
"""Tests for agent/onboarding.py — contextual first-touch hint helpers."""
from __future__ import annotations
import yaml
import pytest
from agent.onboarding import (
BUSY_INPUT_FLAG,
TOOL_PROGRESS_FLAG,
busy_input_hint_cli,
busy_input_hint_gateway,
is_seen,
mark_seen,
tool_progress_hint_cli,
tool_progress_hint_gateway,
)
class TestIsSeen:
def test_empty_config_unseen(self):
assert is_seen({}, BUSY_INPUT_FLAG) is False
def test_missing_onboarding_unseen(self):
assert is_seen({"display": {}}, BUSY_INPUT_FLAG) is False
def test_onboarding_not_dict_unseen(self):
assert is_seen({"onboarding": "nope"}, BUSY_INPUT_FLAG) is False
def test_seen_dict_missing_flag(self):
assert is_seen({"onboarding": {"seen": {}}}, BUSY_INPUT_FLAG) is False
def test_seen_flag_true(self):
cfg = {"onboarding": {"seen": {BUSY_INPUT_FLAG: True}}}
assert is_seen(cfg, BUSY_INPUT_FLAG) is True
def test_seen_flag_falsy(self):
cfg = {"onboarding": {"seen": {BUSY_INPUT_FLAG: False}}}
assert is_seen(cfg, BUSY_INPUT_FLAG) is False
def test_other_flags_isolated(self):
cfg = {"onboarding": {"seen": {BUSY_INPUT_FLAG: True}}}
assert is_seen(cfg, TOOL_PROGRESS_FLAG) is False
class TestMarkSeen:
def test_creates_missing_file_and_sets_flag(self, tmp_path):
cfg_path = tmp_path / "config.yaml"
assert mark_seen(cfg_path, BUSY_INPUT_FLAG) is True
loaded = yaml.safe_load(cfg_path.read_text())
assert loaded["onboarding"]["seen"][BUSY_INPUT_FLAG] is True
def test_preserves_other_config(self, tmp_path):
cfg_path = tmp_path / "config.yaml"
cfg_path.write_text(yaml.safe_dump({
"model": {"default": "claude-sonnet-4.6"},
"display": {"skin": "default"},
}))
assert mark_seen(cfg_path, BUSY_INPUT_FLAG) is True
loaded = yaml.safe_load(cfg_path.read_text())
assert loaded["model"]["default"] == "claude-sonnet-4.6"
assert loaded["display"]["skin"] == "default"
assert loaded["onboarding"]["seen"][BUSY_INPUT_FLAG] is True
def test_preserves_other_seen_flags(self, tmp_path):
cfg_path = tmp_path / "config.yaml"
cfg_path.write_text(yaml.safe_dump({
"onboarding": {"seen": {TOOL_PROGRESS_FLAG: True}},
}))
assert mark_seen(cfg_path, BUSY_INPUT_FLAG) is True
loaded = yaml.safe_load(cfg_path.read_text())
assert loaded["onboarding"]["seen"][TOOL_PROGRESS_FLAG] is True
assert loaded["onboarding"]["seen"][BUSY_INPUT_FLAG] is True
def test_idempotent(self, tmp_path):
cfg_path = tmp_path / "config.yaml"
mark_seen(cfg_path, BUSY_INPUT_FLAG)
first = cfg_path.read_text()
# Second call must be a no-op on-disk content (file may be touched,
# but the YAML contents should be identical).
mark_seen(cfg_path, BUSY_INPUT_FLAG)
second = cfg_path.read_text()
assert yaml.safe_load(first) == yaml.safe_load(second)
def test_handles_non_dict_onboarding(self, tmp_path):
cfg_path = tmp_path / "config.yaml"
cfg_path.write_text(yaml.safe_dump({"onboarding": "corrupted"}))
assert mark_seen(cfg_path, BUSY_INPUT_FLAG) is True
loaded = yaml.safe_load(cfg_path.read_text())
assert loaded["onboarding"]["seen"][BUSY_INPUT_FLAG] is True
def test_handles_non_dict_seen(self, tmp_path):
cfg_path = tmp_path / "config.yaml"
cfg_path.write_text(yaml.safe_dump({"onboarding": {"seen": "corrupted"}}))
assert mark_seen(cfg_path, BUSY_INPUT_FLAG) is True
loaded = yaml.safe_load(cfg_path.read_text())
assert loaded["onboarding"]["seen"][BUSY_INPUT_FLAG] is True
class TestHintMessages:
def test_busy_input_hint_gateway_interrupt(self):
msg = busy_input_hint_gateway("interrupt")
assert "/busy queue" in msg
assert "interrupted" in msg.lower()
def test_busy_input_hint_gateway_queue(self):
msg = busy_input_hint_gateway("queue")
assert "/busy interrupt" in msg
assert "queued" in msg.lower()
def test_busy_input_hint_cli_interrupt(self):
msg = busy_input_hint_cli("interrupt")
assert "/busy queue" in msg
def test_busy_input_hint_cli_queue(self):
msg = busy_input_hint_cli("queue")
assert "/busy interrupt" in msg
def test_tool_progress_hints_mention_verbose(self):
assert "/verbose" in tool_progress_hint_gateway()
assert "/verbose" in tool_progress_hint_cli()
def test_hints_are_not_empty(self):
for hint in (
busy_input_hint_gateway("queue"),
busy_input_hint_gateway("interrupt"),
busy_input_hint_cli("queue"),
busy_input_hint_cli("interrupt"),
tool_progress_hint_gateway(),
tool_progress_hint_cli(),
):
assert hint.strip()
class TestRoundTrip:
"""After mark_seen, is_seen on the re-loaded config must return True."""
def test_mark_then_is_seen(self, tmp_path):
cfg_path = tmp_path / "config.yaml"
assert mark_seen(cfg_path, BUSY_INPUT_FLAG) is True
loaded = yaml.safe_load(cfg_path.read_text())
assert is_seen(loaded, BUSY_INPUT_FLAG) is True
assert is_seen(loaded, TOOL_PROGRESS_FLAG) is False
def test_mark_both_flags_independently(self, tmp_path):
cfg_path = tmp_path / "config.yaml"
mark_seen(cfg_path, BUSY_INPUT_FLAG)
mark_seen(cfg_path, TOOL_PROGRESS_FLAG)
loaded = yaml.safe_load(cfg_path.read_text())
assert is_seen(loaded, BUSY_INPUT_FLAG) is True
assert is_seen(loaded, TOOL_PROGRESS_FLAG) is True

View File

@@ -160,6 +160,30 @@ class TestBranchCommandCLI:
assert agent.reset_session_state.called
assert agent._last_flushed_db_idx == 4 # len(conversation_history)
def test_branch_updates_agent_session_log_file(self, cli_instance, session_db, tmp_path):
"""Branching must redirect the agent's session_log_file to the new session's path."""
from cli import HermesCLI
from pathlib import Path
logs_dir = tmp_path / "sessions"
logs_dir.mkdir()
agent = MagicMock()
agent._last_flushed_db_idx = 0
agent.logs_dir = logs_dir
agent.session_log_file = logs_dir / f"session_{cli_instance.session_id}.json"
cli_instance.agent = agent
old_log_file = agent.session_log_file
HermesCLI._handle_branch_command(cli_instance, "/branch")
new_session_id = cli_instance.session_id
expected_log = logs_dir / f"session_{new_session_id}.json"
assert agent.session_log_file == expected_log, (
"session_log_file must point to the branch session, not the original"
)
assert agent.session_log_file != old_log_file
def test_branch_sets_resumed_flag(self, cli_instance, session_db):
"""Branch should set _resumed=True to prevent auto-title generation."""
from cli import HermesCLI

View File

@@ -1043,3 +1043,132 @@ class TestAgentCacheIdleResume:
new_agent.close()
except Exception:
pass
_FAKE_NOW = 10_000.0 # Fixed epoch for deterministic time assertions
class TestCachedAgentInactivityReset:
"""Inactivity-clock reset must be gated on _interrupt_depth == 0.
On interrupt-recursive turns (_interrupt_depth > 0) the clock must
keep accumulating so the inactivity watchdog can fire when a turn is
stuck in an interrupt loop. Resetting unconditionally prevented the
30-min timeout from triggering (#15654). The depth-0 reset is still
needed: a session idle for 29 min must not trip the watchdog before
the new turn makes its first API call (#9051).
"""
def _fake_agent(self, stale_seconds: float = 1800.0):
m = MagicMock()
m._last_activity_ts = _FAKE_NOW - stale_seconds
m._api_call_count = 10
m._last_activity_desc = "previous turn activity"
return m
def test_fresh_turn_resets_idle_clock(self):
"""interrupt_depth=0: clock resets so a post-idle turn gets a
fresh 30-min inactivity window (guard for #9051)."""
from gateway.run import GatewayRunner
agent = self._fake_agent(stale_seconds=1800.0)
old_ts = agent._last_activity_ts
with patch("gateway.run.time") as mock_time:
mock_time.time.return_value = _FAKE_NOW
GatewayRunner._init_cached_agent_for_turn(agent, interrupt_depth=0)
assert agent._last_activity_ts == _FAKE_NOW, (
"_last_activity_ts was not reset on a fresh turn (interrupt_depth=0)"
)
assert agent._last_activity_ts > old_ts, (
"Stale idle time should be cleared so the new turn gets a fresh window"
)
def test_fresh_turn_resets_desc(self):
"""interrupt_depth=0: description is updated to reflect the new turn."""
from gateway.run import GatewayRunner
agent = self._fake_agent()
with patch("gateway.run.time") as mock_time:
mock_time.time.return_value = _FAKE_NOW
GatewayRunner._init_cached_agent_for_turn(agent, interrupt_depth=0)
assert agent._last_activity_desc == "starting new turn (cached)"
def test_interrupt_turn_preserves_idle_clock(self):
"""interrupt_depth=1: clock preserved so accumulated stuck-turn
idle time is not discarded by an interrupt-recursive re-entry (#15654)."""
from gateway.run import GatewayRunner
agent = self._fake_agent(stale_seconds=1200.0)
old_ts = agent._last_activity_ts
GatewayRunner._init_cached_agent_for_turn(agent, interrupt_depth=1)
assert agent._last_activity_ts == old_ts, (
"_last_activity_ts must not be reset on interrupt-recursive turns "
"(interrupt_depth>0) — the watchdog needs the accumulated idle time"
)
def test_interrupt_turn_preserves_desc(self):
"""interrupt_depth=1: desc preserved — it is semantically paired with ts."""
from gateway.run import GatewayRunner
agent = self._fake_agent(stale_seconds=1200.0)
GatewayRunner._init_cached_agent_for_turn(agent, interrupt_depth=1)
assert agent._last_activity_desc == "previous turn activity", (
"_last_activity_desc must not change on interrupt-recursive turns; "
"it describes the activity *at* _last_activity_ts"
)
def test_deep_interrupt_recursion_preserves_idle_clock(self):
"""interrupt_depth=MAX-1: clock still preserved at any non-zero depth."""
from gateway.run import GatewayRunner
agent = self._fake_agent(stale_seconds=600.0)
old_ts = agent._last_activity_ts
GatewayRunner._init_cached_agent_for_turn(agent, interrupt_depth=4)
assert agent._last_activity_ts == old_ts
def test_api_call_count_reset_regardless_of_depth(self):
"""_api_call_count is always reset to 0 for the new turn, at any depth."""
from gateway.run import GatewayRunner
agent_fresh = self._fake_agent()
agent_interrupted = self._fake_agent()
with patch("gateway.run.time") as mock_time:
mock_time.time.return_value = _FAKE_NOW
GatewayRunner._init_cached_agent_for_turn(agent_fresh, interrupt_depth=0)
GatewayRunner._init_cached_agent_for_turn(agent_interrupted, interrupt_depth=1)
assert agent_fresh._api_call_count == 0
assert agent_interrupted._api_call_count == 0
def test_watchdog_accumulation_across_recursive_turns(self):
"""Scenario: stuck turn + user interrupt → recursive turn.
The idle time seen by the watchdog must reflect the full stuck
duration, not restart from zero on the recursive re-entry.
"""
from gateway.run import GatewayRunner
STUCK_FOR = 1750.0
agent = self._fake_agent(stale_seconds=STUCK_FOR)
# Simulate: user sees "Still working..." and sends another message.
# That triggers an interrupt → _run_agent recurses at depth=1.
GatewayRunner._init_cached_agent_for_turn(agent, interrupt_depth=1)
# Watchdog sees time.time() - _last_activity_ts ≥ STUCK_FOR.
idle_secs = _FAKE_NOW - agent._last_activity_ts
assert idle_secs >= STUCK_FOR - 1.0, (
f"Watchdog would see {idle_secs:.0f}s idle, expected ~{STUCK_FOR}s. "
"Inactivity timeout could not fire for a stuck interrupted turn."
)

View File

@@ -349,3 +349,121 @@ class TestBusySessionAck:
result = await runner._handle_active_session_busy_message(event, sk)
assert result is False # not handled, let default path try
class TestBusySessionOnboardingHint:
"""First-touch hint appended to the busy-ack the first time it fires."""
@pytest.mark.asyncio
async def test_first_busy_ack_appends_interrupt_hint(self, tmp_path, monkeypatch):
"""First busy-while-running message gets an extra hint about /busy."""
import gateway.run as _gr
monkeypatch.setattr(_gr, "_hermes_home", tmp_path)
# mark_seen imports utils.atomic_yaml_write; make sure it resolves
# against a writable dir by pointing _hermes_home at tmp_path.
monkeypatch.setattr(_gr, "_load_gateway_config", lambda: {})
runner, _sentinel = _make_runner()
runner._busy_input_mode = "interrupt"
adapter = _make_adapter()
event = _make_event(text="ping")
sk = build_session_key(event.source)
agent = MagicMock()
agent.get_activity_summary.return_value = {
"api_call_count": 3, "max_iterations": 60,
"current_tool": None, "last_activity_ts": time.time(),
"last_activity_desc": "api", "seconds_since_activity": 0.1,
}
runner._running_agents[sk] = agent
runner._running_agents_ts[sk] = time.time() - 5
runner.adapters[event.source.platform] = adapter
await runner._handle_active_session_busy_message(event, sk)
call_kwargs = adapter._send_with_retry.call_args
content = call_kwargs.kwargs.get("content", "")
# Normal ack body
assert "Interrupting" in content
# First-touch hint appended
assert "First-time tip" in content
assert "/busy queue" in content
# The flag is now persisted to tmp_path/config.yaml
import yaml
cfg = yaml.safe_load((tmp_path / "config.yaml").read_text())
assert cfg["onboarding"]["seen"]["busy_input_prompt"] is True
@pytest.mark.asyncio
async def test_second_busy_ack_omits_hint(self, tmp_path, monkeypatch):
"""Once the flag is marked, the hint never appears again."""
import gateway.run as _gr
import yaml
monkeypatch.setattr(_gr, "_hermes_home", tmp_path)
# Pre-populate the config so is_seen() returns True from the start.
(tmp_path / "config.yaml").write_text(yaml.safe_dump({
"onboarding": {"seen": {"busy_input_prompt": True}},
}))
monkeypatch.setattr(
_gr, "_load_gateway_config",
lambda: yaml.safe_load((tmp_path / "config.yaml").read_text()),
)
runner, _sentinel = _make_runner()
runner._busy_input_mode = "interrupt"
adapter = _make_adapter()
event = _make_event(text="ping again")
sk = build_session_key(event.source)
agent = MagicMock()
agent.get_activity_summary.return_value = {
"api_call_count": 3, "max_iterations": 60,
"current_tool": None, "last_activity_ts": time.time(),
"last_activity_desc": "api", "seconds_since_activity": 0.1,
}
runner._running_agents[sk] = agent
runner._running_agents_ts[sk] = time.time() - 5
runner.adapters[event.source.platform] = adapter
await runner._handle_active_session_busy_message(event, sk)
call_kwargs = adapter._send_with_retry.call_args
content = call_kwargs.kwargs.get("content", "")
assert "Interrupting" in content
assert "First-time tip" not in content
assert "/busy queue" not in content
@pytest.mark.asyncio
async def test_queue_mode_hint_points_to_interrupt(self, tmp_path, monkeypatch):
"""In queue mode the hint should suggest /busy interrupt, not /busy queue."""
import gateway.run as _gr
monkeypatch.setattr(_gr, "_hermes_home", tmp_path)
monkeypatch.setattr(_gr, "_load_gateway_config", lambda: {})
runner, _sentinel = _make_runner()
runner._busy_input_mode = "queue"
adapter = _make_adapter()
event = _make_event(text="queue me")
sk = build_session_key(event.source)
runner.adapters[event.source.platform] = adapter
agent = MagicMock()
runner._running_agents[sk] = agent
with patch("gateway.run.merge_pending_message_event"):
await runner._handle_active_session_busy_message(event, sk)
content = adapter._send_with_retry.call_args.kwargs.get("content", "")
assert "Queued for the next turn" in content
assert "First-time tip" in content
assert "/busy interrupt" in content
# Must NOT tell the user to /busy queue when they're already on queue.
assert "/busy queue" not in content

View File

@@ -0,0 +1,215 @@
"""Tests for interrupt-aware tool-progress suppression in gateway.
When a user sends `stop` while the agent is executing a batch of parallel
tool calls, the gateway's progress_callback should stop queuing 🔍 bubbles
and the drain loop should drop any already-queued events. Without this
guard, the stop acknowledgement appears first but is followed by a trail
of tool-progress bubbles for calls that were already parsed from the LLM
response — making the interrupt feel ignored.
"""
import asyncio
import importlib
import sys
import time
import types
from types import SimpleNamespace
import pytest
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import BasePlatformAdapter, SendResult
from gateway.session import SessionSource
class ProgressCaptureAdapter(BasePlatformAdapter):
def __init__(self, platform=Platform.TELEGRAM):
super().__init__(PlatformConfig(enabled=True, token="***"), platform)
self.sent = []
self.edits = []
self.typing = []
async def connect(self) -> bool:
return True
async def disconnect(self) -> None:
return None
async def send(self, chat_id, content, reply_to=None, metadata=None) -> SendResult:
self.sent.append({"chat_id": chat_id, "content": content})
return SendResult(success=True, message_id="progress-1")
async def edit_message(self, chat_id, message_id, content) -> SendResult:
self.edits.append({"message_id": message_id, "content": content})
return SendResult(success=True, message_id=message_id)
async def send_typing(self, chat_id, metadata=None) -> None:
self.typing.append(chat_id)
async def stop_typing(self, chat_id) -> None:
return None
async def get_chat_info(self, chat_id: str):
return {"id": chat_id}
class PreInterruptAgent:
"""Fires tool-progress events BEFORE the interrupt lands.
These should render normally. Baseline for comparison with the
interrupted case — proves the harness renders events when no
interrupt is active.
"""
def __init__(self, **kwargs):
self.tool_progress_callback = kwargs.get("tool_progress_callback")
self.tools = []
self._interrupt_requested = False
@property
def is_interrupted(self) -> bool:
return self._interrupt_requested
def run_conversation(self, message, conversation_history=None, task_id=None):
self.tool_progress_callback("tool.started", "web_search", "first search", {})
time.sleep(0.35) # let the drain loop process
return {"final_response": "done", "messages": [], "api_calls": 1}
class InterruptedAgent:
"""Fires tool.started events AFTER interrupt — all should be suppressed.
Mirrors the failure mode in the bug report: LLM returned N parallel
web_search calls, interrupt flag flipped, remaining events still
rendered as bubbles. With the fix, none of these should appear.
"""
def __init__(self, **kwargs):
self.tool_progress_callback = kwargs.get("tool_progress_callback")
self.tools = []
# Start already interrupted — simulates stop having already landed
# by the time the agent batch starts firing tool.started events.
self._interrupt_requested = True
@property
def is_interrupted(self) -> bool:
return self._interrupt_requested
def run_conversation(self, message, conversation_history=None, task_id=None):
# Parallel tool batch — in production these come from one LLM
# response with 5 tool_calls. All are post-interrupt.
self.tool_progress_callback("tool.started", "web_search", "cognee hermes", {})
self.tool_progress_callback("tool.started", "web_search", "McBee deer hunting", {})
self.tool_progress_callback("tool.started", "web_search", "kuzu graph db", {})
self.tool_progress_callback("tool.started", "web_search", "moonshot kimi api", {})
self.tool_progress_callback("tool.started", "web_search", "platform.moonshot.cn", {})
time.sleep(0.35) # let the drain loop attempt to process the queue
return {"final_response": "interrupted", "messages": [], "api_calls": 1}
def _make_runner(adapter):
gateway_run = importlib.import_module("gateway.run")
GatewayRunner = gateway_run.GatewayRunner
runner = object.__new__(GatewayRunner)
runner.adapters = {adapter.platform: adapter}
runner._voice_mode = {}
runner._prefill_messages = []
runner._ephemeral_system_prompt = ""
runner._reasoning_config = None
runner._provider_routing = {}
runner._fallback_model = None
runner._session_db = None
runner._running_agents = {}
runner._session_run_generation = {}
runner.hooks = SimpleNamespace(loaded_hooks=False)
runner.config = SimpleNamespace(
thread_sessions_per_user=False,
group_sessions_per_user=False,
stt_enabled=False,
)
return runner
async def _run_once(monkeypatch, tmp_path, agent_cls, session_id):
monkeypatch.setenv("HERMES_TOOL_PROGRESS_MODE", "all")
fake_dotenv = types.ModuleType("dotenv")
fake_dotenv.load_dotenv = lambda *args, **kwargs: None
monkeypatch.setitem(sys.modules, "dotenv", fake_dotenv)
fake_run_agent = types.ModuleType("run_agent")
fake_run_agent.AIAgent = agent_cls
monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent)
adapter = ProgressCaptureAdapter()
runner = _make_runner(adapter)
gateway_run = importlib.import_module("gateway.run")
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
monkeypatch.setattr(
gateway_run,
"_resolve_runtime_agent_kwargs",
lambda: {"api_key": "fake"},
)
source = SessionSource(
platform=Platform.TELEGRAM,
chat_id="-1001",
chat_type="group",
thread_id="17585",
)
result = await runner._run_agent(
message="hi",
context_prompt="",
history=[],
source=source,
session_id=session_id,
session_key="agent:main:telegram:group:-1001:17585",
)
return adapter, result
@pytest.mark.asyncio
async def test_baseline_non_interrupted_agent_renders_progress(monkeypatch, tmp_path):
"""Sanity check: when is_interrupted is False, tool-progress renders normally."""
adapter, result = await _run_once(monkeypatch, tmp_path, PreInterruptAgent, "sess-baseline")
assert result["final_response"] == "done"
rendered = " ".join(c["content"] for c in adapter.sent) + " " + " ".join(
c["content"] for c in adapter.edits
)
assert "first search" in rendered, (
"baseline agent should render its tool-progress event — "
"if this fails the test harness is broken, not the fix"
)
@pytest.mark.asyncio
async def test_progress_suppressed_when_agent_is_interrupted(monkeypatch, tmp_path):
"""Post-interrupt tool.started events must not render as bubbles.
This is Bug B from the screenshot: user sends `stop`, agent acks with
⚡ Interrupting, but 5 more 🔍 web_search bubbles still render because
their tool.started events were already parsed from the LLM response.
With the fix, progress_callback and the drain loop both check
is_interrupted and skip these events.
"""
adapter, result = await _run_once(
monkeypatch, tmp_path, InterruptedAgent, "sess-interrupted"
)
assert result["final_response"] == "interrupted"
rendered = " ".join(c["content"] for c in adapter.sent) + " " + " ".join(
c["content"] for c in adapter.edits
)
# None of the post-interrupt queries should appear.
for leaked_query in (
"cognee hermes",
"McBee deer hunting",
"kuzu graph db",
"moonshot kimi api",
"platform.moonshot.cn",
):
assert leaked_query not in rendered, (
f"event '{leaked_query}' leaked into the UI after interrupt — "
f"progress_callback / drain loop is not checking is_interrupted"
)

View File

@@ -165,3 +165,26 @@ async def test_reasoning_rejected_mid_run():
assert result is not None
assert "can't run mid-turn" in result
assert "/reasoning" in result
@pytest.mark.asyncio
async def test_btw_dispatches_mid_run():
"""/btw mid-run must dispatch to /background's handler, not hit the catch-all.
/btw is an alias of /background (see hermes_cli/commands.py). Typing
/btw mid-turn must spawn a parallel background task — that's the whole
point of the command. Before the mid-turn bypass was added for
/background, /btw fell through to the "Agent is running — wait or
/stop first" catch-all, making it useless in exactly the scenario it
was designed for. The alias and the bypass together make it work.
"""
runner = _make_runner()
runner._handle_background_command = AsyncMock(
return_value='🚀 Background task started: "what module owns titles?"'
)
result = await runner._handle_message(_make_event("/btw what module owns titles?"))
runner._handle_background_command.assert_awaited_once()
assert result is not None
assert "can't run mid-turn" not in result

View File

@@ -147,7 +147,20 @@ class TestAppMentionHandler:
assert "app_mention" in registered_events
assert "assistant_thread_started" in registered_events
assert "assistant_thread_context_changed" in registered_events
assert "/hermes" in registered_commands
# Slack slash commands are registered via a single regex matcher
# covering every COMMAND_REGISTRY entry (e.g. /hermes, /btw, /stop,
# /model, ...) so users get native-slash parity with Discord and
# Telegram. Verify the regex matches the key expected slashes.
assert len(registered_commands) == 1, (
f"expected 1 combined slash matcher, got {registered_commands!r}"
)
slash_matcher = registered_commands[0]
import re as _re
assert isinstance(slash_matcher, _re.Pattern)
for expected in ("/hermes", "/btw", "/stop", "/model", "/help"):
assert slash_matcher.match(expected), (
f"Slack slash regex does not match {expected}"
)
class TestSlackConnectCleanup:
@@ -1544,6 +1557,83 @@ class TestSlashCommands:
msg = adapter.handle_message.call_args[0][0]
assert msg.text == "/reasoning"
# ------------------------------------------------------------------
# Native slash commands — /btw, /stop, /model, ... dispatched directly
# instead of as /hermes subcommands. This is the Discord/Telegram parity
# fix: the slash name itself becomes the command.
# ------------------------------------------------------------------
@pytest.mark.asyncio
async def test_native_btw_slash(self, adapter):
"""/btw with args must dispatch to /background, not /hermes btw."""
command = {
"command": "/btw",
"text": "fix the failing test",
"user_id": "U1",
"channel_id": "C1",
}
await adapter._handle_slash_command(command)
msg = adapter.handle_message.call_args[0][0]
# The gateway command dispatcher resolves /btw -> background via
# resolve_command() — our handler's job is just to deliver
# "/btw <args>" to the gateway runner, which is what this asserts.
assert msg.text == "/btw fix the failing test"
@pytest.mark.asyncio
async def test_native_stop_slash_no_args(self, adapter):
command = {
"command": "/stop",
"text": "",
"user_id": "U1",
"channel_id": "C1",
}
await adapter._handle_slash_command(command)
msg = adapter.handle_message.call_args[0][0]
assert msg.text == "/stop"
@pytest.mark.asyncio
async def test_native_model_slash_with_args(self, adapter):
command = {
"command": "/model",
"text": "anthropic/claude-sonnet-4",
"user_id": "U1",
"channel_id": "C1",
}
await adapter._handle_slash_command(command)
msg = adapter.handle_message.call_args[0][0]
assert msg.text == "/model anthropic/claude-sonnet-4"
@pytest.mark.asyncio
async def test_legacy_hermes_prefix_still_works(self, adapter):
"""Backward compat: /hermes btw foo must still route to /btw foo.
Old workspace manifests only declared /hermes as the single slash.
After users refresh their manifest they get /btw natively, but the
legacy form must keep working during the transition.
"""
command = {
"command": "/hermes",
"text": "btw run the tests",
"user_id": "U1",
"channel_id": "C1",
}
await adapter._handle_slash_command(command)
msg = adapter.handle_message.call_args[0][0]
assert msg.text == "/btw run the tests"
@pytest.mark.asyncio
async def test_legacy_hermes_freeform_question(self, adapter):
"""/hermes <free-form text> must stay as the raw text (non-command)."""
command = {
"command": "/hermes",
"text": "what's the weather today?",
"user_id": "U1",
"channel_id": "C1",
}
await adapter._handle_slash_command(command)
msg = adapter.handle_message.call_args[0][0]
assert msg.text == "what's the weather today?"
# ---------------------------------------------------------------------------
# TestMessageSplitting

View File

@@ -177,6 +177,53 @@ class TestHandleVoiceCommand:
assert adapter._auto_tts_disabled_chats == {"123"}
def test_sync_populates_enabled_chats_from_voice_modes(self, runner):
"""Issue #16007: sync also restores per-chat /voice on|tts opt-ins.
The adapter's ``_auto_tts_enabled_chats`` must mirror chats whose
persisted voice_mode is ``voice_only`` or ``all`` — without this,
``/voice on`` was relying on a "not in disabled set" default that
silently enabled auto-TTS for every chat.
"""
from gateway.config import Platform
runner._voice_mode = {
"telegram:off_chat": "off",
"telegram:on_chat": "voice_only",
"telegram:tts_chat": "all",
"slack:999": "voice_only", # wrong platform, must be ignored
}
adapter = SimpleNamespace(
_auto_tts_default=False,
_auto_tts_disabled_chats=set(),
_auto_tts_enabled_chats=set(),
platform=Platform.TELEGRAM,
)
runner._sync_voice_mode_state_to_adapter(adapter)
assert adapter._auto_tts_disabled_chats == {"off_chat"}
assert adapter._auto_tts_enabled_chats == {"on_chat", "tts_chat"}
def test_sync_pushes_config_default_onto_adapter(self, runner, monkeypatch):
"""Issue #16007: ``voice.auto_tts`` must propagate to ``_auto_tts_default``."""
from gateway.config import Platform
fake_cfg = {"voice": {"auto_tts": True}}
monkeypatch.setattr(
"hermes_cli.config.load_config",
lambda: fake_cfg,
)
adapter = SimpleNamespace(
_auto_tts_default=False,
_auto_tts_disabled_chats=set(),
_auto_tts_enabled_chats=set(),
platform=Platform.TELEGRAM,
)
runner._sync_voice_mode_state_to_adapter(adapter)
assert adapter._auto_tts_default is True
def test_restart_restores_voice_off_state(self, runner, tmp_path):
from gateway.config import Platform
runner._VOICE_MODE_PATH.write_text(json.dumps({"telegram:123": "off"}))
@@ -2706,3 +2753,56 @@ class TestUDPKeepalive:
mock_conn.send_packet.assert_called_with(b'\xf8\xff\xfe')
finally:
DiscordAdapter._KEEPALIVE_INTERVAL = original_interval
# =====================================================================
# BasePlatformAdapter._should_auto_tts_for_chat — gate for auto-TTS
# on voice input. Regression test for Issue #16007.
# =====================================================================
class TestShouldAutoTtsForChat:
"""Three-layer gate: per-chat enable > per-chat disable > config default."""
def _make_adapter(self, *, default: bool, enabled=(), disabled=()):
"""Build a bare adapter with only the attrs the gate reads."""
adapter = SimpleNamespace(
_auto_tts_default=default,
_auto_tts_enabled_chats=set(enabled),
_auto_tts_disabled_chats=set(disabled),
)
# Bind the unbound method — _should_auto_tts_for_chat only reads the
# three attrs above via ``self.``, so an unbound call works.
from gateway.platforms.base import BasePlatformAdapter
return BasePlatformAdapter._should_auto_tts_for_chat, adapter
def test_default_false_no_override_suppresses(self):
"""Issue #16007: voice.auto_tts=False and no per-chat state → no TTS."""
fn, adapter = self._make_adapter(default=False)
assert fn(adapter, "chat1") is False
def test_default_true_no_override_fires(self):
fn, adapter = self._make_adapter(default=True)
assert fn(adapter, "chat1") is True
def test_explicit_enable_overrides_false_default(self):
"""``/voice on`` with config auto_tts=False still fires."""
fn, adapter = self._make_adapter(default=False, enabled={"chat1"})
assert fn(adapter, "chat1") is True
def test_explicit_disable_overrides_true_default(self):
"""``/voice off`` with config auto_tts=True still suppresses."""
fn, adapter = self._make_adapter(default=True, disabled={"chat1"})
assert fn(adapter, "chat1") is False
def test_enabled_wins_over_disabled(self):
"""An explicit enable beats an explicit disable (enable takes priority)."""
fn, adapter = self._make_adapter(
default=False, enabled={"chat1"}, disabled={"chat1"}
)
assert fn(adapter, "chat1") is True
def test_per_chat_isolation(self):
"""Enable for chat1 doesn't leak to chat2."""
fn, adapter = self._make_adapter(default=False, enabled={"chat1"})
assert fn(adapter, "chat1") is True
assert fn(adapter, "chat2") is False

View File

@@ -0,0 +1,152 @@
"""Regression test for the `/model` picker confirmation display.
Bug (April 2026): after choosing a model from the interactive `/model` picker,
``HermesCLI._apply_model_switch_result()`` printed ``ModelInfo.context_window``
straight from models.dev, which always reports the vendor-wide value (e.g.
gpt-5.5 = 1,050,000 on ``openai``). That ignored provider-specific caps — in
particular, ChatGPT Codex OAuth enforces 272K on the same slug. The sibling
``_handle_model_switch()`` (typed ``/model <name>``) was already fixed to use
``resolve_display_context_length()``; the picker path was missed, causing
"sometimes 1M, sometimes 272K" for the same model across sibling UI paths.
Fix: both display paths now go through ``resolve_display_context_length()``.
"""
from __future__ import annotations
from unittest.mock import patch
from hermes_cli.model_switch import ModelSwitchResult
class _FakeModelInfo:
context_window = 1_050_000
max_output = 0
def has_cost_data(self):
return False
def format_capabilities(self):
return ""
class _StubCLI:
"""Minimum attrs ``_apply_model_switch_result`` reads on ``self``."""
agent = None
model = ""
provider = ""
requested_provider = ""
api_key = ""
_explicit_api_key = ""
base_url = ""
_explicit_base_url = ""
api_mode = ""
_pending_model_switch_note = ""
def _run_display(monkeypatch, result):
import cli as cli_mod
captured: list[str] = []
monkeypatch.setattr(cli_mod, "_cprint", lambda s, *a, **k: captured.append(str(s)))
# Avoid writing to ~/.hermes/config.yaml during the test.
monkeypatch.setattr(cli_mod, "save_config_value", lambda *a, **k: None)
cli_mod.HermesCLI._apply_model_switch_result(_StubCLI(), result, False)
return captured
def test_picker_path_uses_provider_aware_context_on_codex(monkeypatch):
"""``_apply_model_switch_result`` must prefer the provider-aware resolver
(272K on Codex) over the raw models.dev value (1.05M for gpt-5.5).
"""
result = ModelSwitchResult(
success=True,
new_model="gpt-5.5",
target_provider="openai-codex",
provider_changed=True,
api_key="",
base_url="https://chatgpt.com/backend-api/codex",
api_mode="codex_responses",
warning_message="",
provider_label="ChatGPT Codex",
resolved_via_alias=False,
capabilities=None,
model_info=_FakeModelInfo(), # models.dev says 1.05M
is_global=False,
)
with patch(
"agent.model_metadata.get_model_context_length",
return_value=272_000,
):
lines = _run_display(monkeypatch, result)
ctx_line = next((l for l in lines if "Context:" in l), "")
assert "272,000" in ctx_line, (
f"picker-path display must show Codex's 272K cap, got: {ctx_line!r}"
)
assert "1,050,000" not in ctx_line, (
f"picker-path display leaked models.dev's 1.05M for Codex: {ctx_line!r}"
)
def test_picker_path_shows_vendor_value_when_no_provider_cap(monkeypatch):
"""On providers with no enforced cap (e.g. OpenRouter), the picker path
should surface the real 1.05M context for gpt-5.5 — resolver and models.dev
agree here.
"""
result = ModelSwitchResult(
success=True,
new_model="openai/gpt-5.5",
target_provider="openrouter",
provider_changed=True,
api_key="",
base_url="https://openrouter.ai/api/v1",
api_mode="chat_completions",
warning_message="",
provider_label="OpenRouter",
resolved_via_alias=False,
capabilities=None,
model_info=_FakeModelInfo(),
is_global=False,
)
with patch(
"agent.model_metadata.get_model_context_length",
return_value=1_050_000,
):
lines = _run_display(monkeypatch, result)
ctx_line = next((l for l in lines if "Context:" in l), "")
assert "1,050,000" in ctx_line, (
f"OpenRouter gpt-5.5 should show 1.05M context, got: {ctx_line!r}"
)
def test_picker_path_falls_back_to_model_info_when_resolver_empty(monkeypatch):
"""If ``get_model_context_length`` returns nothing (rare — truly unknown
endpoint), the display still surfaces ``ModelInfo.context_window`` so the
user sees *something* rather than a silent blank.
"""
result = ModelSwitchResult(
success=True,
new_model="some-model",
target_provider="some-provider",
provider_changed=True,
api_key="",
base_url="",
api_mode="chat_completions",
warning_message="",
provider_label="Some Provider",
resolved_via_alias=False,
capabilities=None,
model_info=_FakeModelInfo(), # context_window = 1_050_000
is_global=False,
)
with patch(
"agent.model_metadata.get_model_context_length",
return_value=None,
):
lines = _run_display(monkeypatch, result)
ctx_line = next((l for l in lines if "Context:" in l), "")
assert "1,050,000" in ctx_line, (
f"resolver-empty path should fall back to ModelInfo, got: {ctx_line!r}"
)

View File

@@ -20,6 +20,8 @@ from hermes_cli.commands import (
discord_skill_commands,
gateway_help_lines,
resolve_command,
slack_app_manifest,
slack_native_slashes,
slack_subcommand_map,
telegram_bot_commands,
telegram_menu_commands,
@@ -256,6 +258,115 @@ class TestSlackSubcommandMap:
assert cmd.name not in mapping
class TestSlackNativeSlashes:
"""Slack native slash command generation — used to register every
COMMAND_REGISTRY entry as a first-class Slack slash, matching Discord
and Telegram."""
def test_returns_triples(self):
slashes = slack_native_slashes()
assert len(slashes) >= 10
for entry in slashes:
assert isinstance(entry, tuple) and len(entry) == 3
name, desc, hint = entry
assert isinstance(name, str) and name
assert isinstance(desc, str)
assert isinstance(hint, str)
def test_hermes_catchall_is_first(self):
"""``/hermes`` must be reserved as the first slot so the legacy
``/hermes <subcommand>`` form keeps working after we add new
commands and hit the 50-slash cap."""
slashes = slack_native_slashes()
assert slashes[0][0] == "hermes"
def test_names_respect_slack_limits(self):
for name, _desc, _hint in slack_native_slashes():
# Slack: lowercase a-z, 0-9, hyphens, underscores; max 32 chars
assert len(name) <= 32, f"slash {name!r} exceeds 32 chars"
assert name == name.lower()
for ch in name:
assert ch.isalnum() or ch in "-_", f"invalid char {ch!r} in {name!r}"
def test_under_fifty_command_cap(self):
"""Slack allows at most 50 slash commands per app."""
assert len(slack_native_slashes()) <= 50
def test_unique_names(self):
names = [n for n, _d, _h in slack_native_slashes()]
assert len(names) == len(set(names)), "duplicate Slack slash names"
def test_includes_canonical_commands(self):
names = {n for n, _d, _h in slack_native_slashes()}
# Sample of gateway-available canonical commands
for expected in ("new", "stop", "background", "model", "help", "status"):
assert expected in names, f"missing canonical /{expected}"
def test_includes_aliases_as_first_class_slashes(self):
"""Aliases (/btw, /bg, /reset, /q) must be registered as standalone
slashes — this is the whole point of native-slashes parity."""
names = {n for n, _d, _h in slack_native_slashes()}
assert "btw" in names
assert "bg" in names
assert "reset" in names
assert "q" in names
def test_telegram_parity(self):
"""Every Telegram bot command must be registerable on Slack too.
This catches the old behavior where Slack users couldn't invoke
commands like /btw natively. If a future command surfaces on
Telegram but not Slack (because of Slack's 50-slash cap), this
test fails loudly so we can curate the list rather than silently
dropping parity.
"""
slack_names = {n for n, _d, _h in slack_native_slashes()}
tg_names = {n for n, _d in telegram_bot_commands()}
# Some Telegram names have underscores where Slack uses hyphens
# (e.g. set_home vs sethome). Normalize both sides for comparison.
def _norm(s: str) -> str:
return s.replace("-", "_").replace("__", "_").strip("_")
slack_norm = {_norm(n) for n in slack_names}
tg_norm = {_norm(n) for n in tg_names}
missing = tg_norm - slack_norm
assert not missing, (
f"commands on Telegram but missing from Slack native slashes: {sorted(missing)}"
)
class TestSlackAppManifest:
"""Generated Slack app manifest (used by `hermes slack manifest`)."""
def test_returns_dict(self):
m = slack_app_manifest()
assert isinstance(m, dict)
assert "features" in m
assert "slash_commands" in m["features"]
def test_each_slash_has_required_fields(self):
m = slack_app_manifest()
for entry in m["features"]["slash_commands"]:
assert entry["command"].startswith("/")
assert "description" in entry
assert "url" in entry
# should_escape must be present (Slack defaults to True which
# HTML-escapes args — we want the raw text)
assert "should_escape" in entry
def test_btw_is_in_manifest(self):
"""Regression: /btw must be a native Slack slash, not just a
/hermes subcommand."""
m = slack_app_manifest()
commands = [c["command"] for c in m["features"]["slash_commands"]]
assert "/btw" in commands
def test_custom_request_url(self):
m = slack_app_manifest(request_url="https://example.com/slack")
for entry in m["features"]["slash_commands"]:
assert entry["url"] == "https://example.com/slack"
# ---------------------------------------------------------------------------
# Config-gated gateway commands
# ---------------------------------------------------------------------------

View File

@@ -0,0 +1,486 @@
"""Tests for `hermes fallback` — chain reading, add/remove/clear, legacy migration."""
from __future__ import annotations
import io
import types
from pathlib import Path
from unittest.mock import patch
import pytest
import yaml
# ---------------------------------------------------------------------------
# Shared fixture — isolate HERMES_HOME so save_config writes to tmp_path
# ---------------------------------------------------------------------------
@pytest.fixture()
def isolated_home(tmp_path, monkeypatch):
monkeypatch.setattr(Path, "home", lambda: tmp_path)
home = tmp_path / ".hermes"
home.mkdir(exist_ok=True)
monkeypatch.setenv("HERMES_HOME", str(home))
return tmp_path
def _write_config(home: Path, data: dict) -> None:
config_path = home / ".hermes" / "config.yaml"
config_path.write_text(yaml.safe_dump(data), encoding="utf-8")
def _read_config(home: Path) -> dict:
config_path = home / ".hermes" / "config.yaml"
return yaml.safe_load(config_path.read_text(encoding="utf-8")) or {}
# ---------------------------------------------------------------------------
# _read_chain / _write_chain
# ---------------------------------------------------------------------------
class TestReadChain:
def test_returns_empty_list_when_unset(self):
from hermes_cli.fallback_cmd import _read_chain
assert _read_chain({}) == []
def test_reads_new_list_format(self):
from hermes_cli.fallback_cmd import _read_chain
cfg = {
"fallback_providers": [
{"provider": "openrouter", "model": "anthropic/claude-sonnet-4.6"},
{"provider": "nous", "model": "Hermes-4-Llama-3.1-405B"},
]
}
assert _read_chain(cfg) == [
{"provider": "openrouter", "model": "anthropic/claude-sonnet-4.6"},
{"provider": "nous", "model": "Hermes-4-Llama-3.1-405B"},
]
def test_migrates_legacy_single_dict(self):
from hermes_cli.fallback_cmd import _read_chain
cfg = {"fallback_model": {"provider": "openrouter", "model": "gpt-5.4"}}
assert _read_chain(cfg) == [{"provider": "openrouter", "model": "gpt-5.4"}]
def test_skips_incomplete_entries(self):
from hermes_cli.fallback_cmd import _read_chain
cfg = {
"fallback_providers": [
{"provider": "openrouter"}, # missing model
{"model": "gpt-5.4"}, # missing provider
{"provider": "nous", "model": "foo"}, # valid
"not-a-dict", # noise
]
}
assert _read_chain(cfg) == [{"provider": "nous", "model": "foo"}]
def test_returns_copies_not_aliases(self):
from hermes_cli.fallback_cmd import _read_chain
cfg = {"fallback_providers": [{"provider": "nous", "model": "foo"}]}
result = _read_chain(cfg)
result[0]["provider"] = "mutated"
assert cfg["fallback_providers"][0]["provider"] == "nous"
# ---------------------------------------------------------------------------
# _extract_fallback_from_model_cfg
# ---------------------------------------------------------------------------
class TestExtractFallback:
def test_extracts_from_default_field(self):
from hermes_cli.fallback_cmd import _extract_fallback_from_model_cfg
model_cfg = {"provider": "openrouter", "default": "anthropic/claude-sonnet-4.6"}
assert _extract_fallback_from_model_cfg(model_cfg) == {
"provider": "openrouter",
"model": "anthropic/claude-sonnet-4.6",
}
def test_extracts_optional_base_url_and_api_mode(self):
from hermes_cli.fallback_cmd import _extract_fallback_from_model_cfg
model_cfg = {
"provider": "custom",
"default": "local-model",
"base_url": "http://localhost:11434/v1",
"api_mode": "chat_completions",
}
assert _extract_fallback_from_model_cfg(model_cfg) == {
"provider": "custom",
"model": "local-model",
"base_url": "http://localhost:11434/v1",
"api_mode": "chat_completions",
}
def test_returns_none_without_provider(self):
from hermes_cli.fallback_cmd import _extract_fallback_from_model_cfg
assert _extract_fallback_from_model_cfg({"default": "foo"}) is None
def test_returns_none_without_model(self):
from hermes_cli.fallback_cmd import _extract_fallback_from_model_cfg
assert _extract_fallback_from_model_cfg({"provider": "openrouter"}) is None
def test_returns_none_for_non_dict(self):
from hermes_cli.fallback_cmd import _extract_fallback_from_model_cfg
assert _extract_fallback_from_model_cfg("plain-string") is None
assert _extract_fallback_from_model_cfg(None) is None
# ---------------------------------------------------------------------------
# cmd_fallback_list
# ---------------------------------------------------------------------------
class TestListCommand:
def test_list_empty(self, isolated_home, capsys):
_write_config(isolated_home, {})
from hermes_cli.fallback_cmd import cmd_fallback_list
cmd_fallback_list(types.SimpleNamespace())
out = capsys.readouterr().out
assert "No fallback providers configured" in out
assert "hermes fallback add" in out
def test_list_with_entries(self, isolated_home, capsys):
_write_config(isolated_home, {
"model": {"provider": "anthropic", "default": "claude-sonnet-4-6"},
"fallback_providers": [
{"provider": "openrouter", "model": "anthropic/claude-sonnet-4.6"},
{"provider": "nous", "model": "Hermes-4"},
],
})
from hermes_cli.fallback_cmd import cmd_fallback_list
cmd_fallback_list(types.SimpleNamespace())
out = capsys.readouterr().out
assert "Fallback chain (2 entries)" in out
assert "anthropic/claude-sonnet-4.6" in out
assert "Hermes-4" in out
# Primary should be shown too
assert "claude-sonnet-4-6" in out
def test_list_migrates_legacy_for_display(self, isolated_home, capsys):
_write_config(isolated_home, {
"fallback_model": {"provider": "openrouter", "model": "gpt-5.4"},
})
from hermes_cli.fallback_cmd import cmd_fallback_list
cmd_fallback_list(types.SimpleNamespace())
out = capsys.readouterr().out
assert "1 entry" in out
assert "gpt-5.4" in out
# ---------------------------------------------------------------------------
# cmd_fallback_add — mock select_provider_and_model
# ---------------------------------------------------------------------------
class TestAddCommand:
def test_add_appends_new_entry(self, isolated_home, capsys):
_write_config(isolated_home, {
"model": {"provider": "anthropic", "default": "claude-sonnet-4-6"},
})
def fake_picker(args=None):
# Simulate what the real picker does: writes the selection to config["model"]
from hermes_cli.config import load_config, save_config
cfg = load_config()
cfg["model"] = {
"provider": "openrouter",
"default": "anthropic/claude-sonnet-4.6",
"base_url": "https://openrouter.ai/api/v1",
"api_mode": "chat_completions",
}
save_config(cfg)
with patch("hermes_cli.main.select_provider_and_model", side_effect=fake_picker), \
patch("hermes_cli.main._require_tty"):
from hermes_cli.fallback_cmd import cmd_fallback_add
cmd_fallback_add(types.SimpleNamespace())
cfg = _read_config(isolated_home)
# Primary is preserved
assert cfg["model"]["provider"] == "anthropic"
assert cfg["model"]["default"] == "claude-sonnet-4-6"
# Fallback was appended
assert cfg["fallback_providers"] == [
{
"provider": "openrouter",
"model": "anthropic/claude-sonnet-4.6",
"base_url": "https://openrouter.ai/api/v1",
"api_mode": "chat_completions",
}
]
out = capsys.readouterr().out
assert "Added fallback" in out
def test_add_rejects_duplicate(self, isolated_home, capsys):
_write_config(isolated_home, {
"model": {"provider": "anthropic", "default": "claude-sonnet-4-6"},
"fallback_providers": [
{"provider": "openrouter", "model": "gpt-5.4"},
],
})
def fake_picker(args=None):
from hermes_cli.config import load_config, save_config
cfg = load_config()
cfg["model"] = {"provider": "openrouter", "default": "gpt-5.4"}
save_config(cfg)
with patch("hermes_cli.main.select_provider_and_model", side_effect=fake_picker), \
patch("hermes_cli.main._require_tty"):
from hermes_cli.fallback_cmd import cmd_fallback_add
cmd_fallback_add(types.SimpleNamespace())
cfg = _read_config(isolated_home)
# Should still have exactly one entry
assert len(cfg["fallback_providers"]) == 1
out = capsys.readouterr().out
assert "already in the fallback chain" in out
def test_add_rejects_same_as_primary(self, isolated_home, capsys):
_write_config(isolated_home, {
"model": {"provider": "openrouter", "default": "gpt-5.4"},
})
def fake_picker(args=None):
# User picks the same thing that's already the primary
from hermes_cli.config import load_config, save_config
cfg = load_config()
cfg["model"] = {"provider": "openrouter", "default": "gpt-5.4"}
save_config(cfg)
with patch("hermes_cli.main.select_provider_and_model", side_effect=fake_picker), \
patch("hermes_cli.main._require_tty"):
from hermes_cli.fallback_cmd import cmd_fallback_add
cmd_fallback_add(types.SimpleNamespace())
cfg = _read_config(isolated_home)
assert "fallback_providers" not in cfg or cfg["fallback_providers"] == []
out = capsys.readouterr().out
assert "matches the current primary" in out
def test_add_preserves_primary_when_picker_changes_it(self, isolated_home):
"""The picker mutates config["model"]; fallback_add must restore the primary."""
_write_config(isolated_home, {
"model": {
"provider": "anthropic",
"default": "claude-sonnet-4-6",
"base_url": "https://api.anthropic.com",
"api_mode": "anthropic_messages",
},
})
def fake_picker(args=None):
from hermes_cli.config import load_config, save_config
cfg = load_config()
cfg["model"] = {
"provider": "openrouter",
"default": "anthropic/claude-sonnet-4.6",
"base_url": "https://openrouter.ai/api/v1",
"api_mode": "chat_completions",
}
save_config(cfg)
with patch("hermes_cli.main.select_provider_and_model", side_effect=fake_picker), \
patch("hermes_cli.main._require_tty"):
from hermes_cli.fallback_cmd import cmd_fallback_add
cmd_fallback_add(types.SimpleNamespace())
cfg = _read_config(isolated_home)
# Primary exactly as it was
assert cfg["model"]["provider"] == "anthropic"
assert cfg["model"]["default"] == "claude-sonnet-4-6"
assert cfg["model"]["base_url"] == "https://api.anthropic.com"
assert cfg["model"]["api_mode"] == "anthropic_messages"
# Fallback added
assert len(cfg["fallback_providers"]) == 1
assert cfg["fallback_providers"][0]["provider"] == "openrouter"
def test_add_noop_when_picker_cancelled(self, isolated_home, capsys):
_write_config(isolated_home, {
"model": {"provider": "anthropic", "default": "claude-sonnet-4-6"},
})
def fake_picker(args=None):
# User cancelled — no change to config
pass
with patch("hermes_cli.main.select_provider_and_model", side_effect=fake_picker), \
patch("hermes_cli.main._require_tty"):
from hermes_cli.fallback_cmd import cmd_fallback_add
cmd_fallback_add(types.SimpleNamespace())
cfg = _read_config(isolated_home)
assert "fallback_providers" not in cfg or cfg["fallback_providers"] == []
out = capsys.readouterr().out
# Either "No fallback added" (picker fully cancelled) or "matches the current primary"
# (picker left config untouched) — both indicate a non-add outcome.
assert ("No fallback added" in out) or ("matches the current primary" in out)
def test_add_noop_when_picker_clears_model(self, isolated_home, capsys):
"""Simulate picker explicitly clearing model.default (unusual but possible)."""
_write_config(isolated_home, {
"model": {"provider": "anthropic", "default": "claude-sonnet-4-6"},
})
def fake_picker(args=None):
from hermes_cli.config import load_config, save_config
cfg = load_config()
cfg["model"] = {"provider": "", "default": ""}
save_config(cfg)
with patch("hermes_cli.main.select_provider_and_model", side_effect=fake_picker), \
patch("hermes_cli.main._require_tty"):
from hermes_cli.fallback_cmd import cmd_fallback_add
cmd_fallback_add(types.SimpleNamespace())
out = capsys.readouterr().out
assert "No fallback added" in out
# ---------------------------------------------------------------------------
# cmd_fallback_remove
# ---------------------------------------------------------------------------
class TestRemoveCommand:
def test_remove_empty_chain(self, isolated_home, capsys):
_write_config(isolated_home, {})
from hermes_cli.fallback_cmd import cmd_fallback_remove
cmd_fallback_remove(types.SimpleNamespace())
out = capsys.readouterr().out
assert "nothing to remove" in out
def test_remove_selected_entry(self, isolated_home, capsys):
_write_config(isolated_home, {
"fallback_providers": [
{"provider": "openrouter", "model": "gpt-5.4"},
{"provider": "nous", "model": "Hermes-4"},
{"provider": "anthropic", "model": "claude-sonnet-4-6"},
],
})
# Picker returns index 1 (the middle entry, "nous / Hermes-4")
with patch("hermes_cli.setup._curses_prompt_choice", return_value=1):
from hermes_cli.fallback_cmd import cmd_fallback_remove
cmd_fallback_remove(types.SimpleNamespace())
cfg = _read_config(isolated_home)
assert cfg["fallback_providers"] == [
{"provider": "openrouter", "model": "gpt-5.4"},
{"provider": "anthropic", "model": "claude-sonnet-4-6"},
]
out = capsys.readouterr().out
assert "Removed fallback" in out
assert "Hermes-4" in out
def test_remove_cancel_keeps_chain(self, isolated_home):
_write_config(isolated_home, {
"fallback_providers": [
{"provider": "openrouter", "model": "gpt-5.4"},
],
})
# Cancel = last item (index == len(chain) == 1 in our menu)
with patch("hermes_cli.setup._curses_prompt_choice", return_value=1):
from hermes_cli.fallback_cmd import cmd_fallback_remove
cmd_fallback_remove(types.SimpleNamespace())
cfg = _read_config(isolated_home)
assert len(cfg["fallback_providers"]) == 1
# ---------------------------------------------------------------------------
# cmd_fallback_clear
# ---------------------------------------------------------------------------
class TestClearCommand:
def test_clear_empty_chain(self, isolated_home, capsys):
_write_config(isolated_home, {})
from hermes_cli.fallback_cmd import cmd_fallback_clear
cmd_fallback_clear(types.SimpleNamespace())
out = capsys.readouterr().out
assert "nothing to clear" in out
def test_clear_with_confirmation(self, isolated_home, capsys, monkeypatch):
_write_config(isolated_home, {
"fallback_providers": [
{"provider": "openrouter", "model": "gpt-5.4"},
{"provider": "nous", "model": "Hermes-4"},
],
})
monkeypatch.setattr("builtins.input", lambda *a, **kw: "y")
from hermes_cli.fallback_cmd import cmd_fallback_clear
cmd_fallback_clear(types.SimpleNamespace())
cfg = _read_config(isolated_home)
assert cfg.get("fallback_providers") == []
out = capsys.readouterr().out
assert "Fallback chain cleared" in out
def test_clear_cancelled(self, isolated_home, monkeypatch):
_write_config(isolated_home, {
"fallback_providers": [{"provider": "openrouter", "model": "gpt-5.4"}],
})
monkeypatch.setattr("builtins.input", lambda *a, **kw: "n")
from hermes_cli.fallback_cmd import cmd_fallback_clear
cmd_fallback_clear(types.SimpleNamespace())
cfg = _read_config(isolated_home)
assert len(cfg["fallback_providers"]) == 1
# ---------------------------------------------------------------------------
# cmd_fallback dispatcher
# ---------------------------------------------------------------------------
class TestDispatcher:
def test_no_subcommand_lists(self, isolated_home, capsys):
_write_config(isolated_home, {})
from hermes_cli.fallback_cmd import cmd_fallback
cmd_fallback(types.SimpleNamespace(fallback_command=None))
out = capsys.readouterr().out
assert "No fallback providers configured" in out
def test_list_alias(self, isolated_home, capsys):
_write_config(isolated_home, {})
from hermes_cli.fallback_cmd import cmd_fallback
cmd_fallback(types.SimpleNamespace(fallback_command="ls"))
out = capsys.readouterr().out
assert "No fallback providers configured" in out
def test_remove_alias(self, isolated_home, capsys):
_write_config(isolated_home, {})
from hermes_cli.fallback_cmd import cmd_fallback
cmd_fallback(types.SimpleNamespace(fallback_command="rm"))
out = capsys.readouterr().out
assert "nothing to remove" in out
def test_unknown_subcommand_exits(self, isolated_home):
_write_config(isolated_home, {})
from hermes_cli.fallback_cmd import cmd_fallback
with pytest.raises(SystemExit):
cmd_fallback(types.SimpleNamespace(fallback_command="nope"))
# ---------------------------------------------------------------------------
# argparse wiring — verify the subparser is registered
# ---------------------------------------------------------------------------
class TestArgparseWiring:
"""Verify `hermes fallback` is wired into main.py's argparse tree.
main() builds the parser inline, so we invoke main([...]) via subprocess
with --help to introspect registered subcommands without side effects.
"""
def test_fallback_help_lists_subcommands(self):
import subprocess
import sys
result = subprocess.run(
[sys.executable, "-m", "hermes_cli.main", "fallback", "--help"],
capture_output=True,
text=True,
timeout=30,
)
# --help exits 0
assert result.returncode == 0, f"stderr: {result.stderr}"
out = result.stdout + result.stderr
# All four subcommands should appear in help
assert "list" in out
assert "add" in out
assert "remove" in out
assert "clear" in out

View File

@@ -0,0 +1,284 @@
"""Tests for hermes_cli.model_catalog — remote manifest fetch + cache + fallback."""
from __future__ import annotations
import json
import time
from pathlib import Path
from unittest.mock import patch
import pytest
@pytest.fixture
def isolated_home(tmp_path, monkeypatch):
"""Isolate HERMES_HOME + reset any module-level catalog cache per test."""
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setattr(Path, "home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(home))
# Force a fresh catalog module state for each test.
import importlib
from hermes_cli import model_catalog
importlib.reload(model_catalog)
yield home
model_catalog.reset_cache()
def _valid_manifest() -> dict:
return {
"version": 1,
"updated_at": "2026-04-25T22:00:00Z",
"metadata": {"source": "test"},
"providers": {
"openrouter": {
"metadata": {"display_name": "OpenRouter"},
"models": [
{"id": "anthropic/claude-opus-4.7", "description": "recommended"},
{"id": "openai/gpt-5.4", "description": ""},
{"id": "openrouter/elephant-alpha", "description": "free"},
],
},
"nous": {
"metadata": {"display_name": "Nous Portal"},
"models": [
{"id": "anthropic/claude-opus-4.7"},
{"id": "moonshotai/kimi-k2.6"},
],
},
},
}
class TestValidation:
def test_accepts_well_formed_manifest(self, isolated_home):
from hermes_cli.model_catalog import _validate_manifest
assert _validate_manifest(_valid_manifest()) is True
def test_rejects_non_dict(self, isolated_home):
from hermes_cli.model_catalog import _validate_manifest
assert _validate_manifest("string") is False
assert _validate_manifest([]) is False
assert _validate_manifest(None) is False
def test_rejects_missing_version(self, isolated_home):
from hermes_cli.model_catalog import _validate_manifest
m = _valid_manifest()
del m["version"]
assert _validate_manifest(m) is False
def test_rejects_future_version(self, isolated_home):
from hermes_cli.model_catalog import _validate_manifest
m = _valid_manifest()
m["version"] = 999
assert _validate_manifest(m) is False
def test_rejects_missing_providers(self, isolated_home):
from hermes_cli.model_catalog import _validate_manifest
m = _valid_manifest()
del m["providers"]
assert _validate_manifest(m) is False
def test_rejects_malformed_model_entry(self, isolated_home):
from hermes_cli.model_catalog import _validate_manifest
m = _valid_manifest()
m["providers"]["openrouter"]["models"][0] = {"id": ""} # empty id
assert _validate_manifest(m) is False
def test_rejects_non_string_model_id(self, isolated_home):
from hermes_cli.model_catalog import _validate_manifest
m = _valid_manifest()
m["providers"]["openrouter"]["models"][0] = {"id": 42}
assert _validate_manifest(m) is False
class TestFetchSuccess:
def test_fetch_and_cache_writes_disk(self, isolated_home):
from hermes_cli import model_catalog
manifest = _valid_manifest()
with patch.object(
model_catalog, "_fetch_manifest", return_value=manifest
) as fetch:
result = model_catalog.get_catalog(force_refresh=True)
assert result == manifest
assert fetch.called
cache_file = model_catalog._cache_path()
assert cache_file.exists()
with open(cache_file) as fh:
assert json.load(fh) == manifest
def test_second_call_uses_in_process_cache(self, isolated_home):
from hermes_cli import model_catalog
manifest = _valid_manifest()
with patch.object(
model_catalog, "_fetch_manifest", return_value=manifest
) as fetch:
model_catalog.get_catalog(force_refresh=True)
model_catalog.get_catalog() # should not hit network again
assert fetch.call_count == 1
def test_force_refresh_always_refetches(self, isolated_home):
from hermes_cli import model_catalog
manifest = _valid_manifest()
with patch.object(
model_catalog, "_fetch_manifest", return_value=manifest
) as fetch:
model_catalog.get_catalog(force_refresh=True)
model_catalog.get_catalog(force_refresh=True)
assert fetch.call_count == 2
class TestFetchFailure:
def test_network_failure_returns_empty_when_no_cache(self, isolated_home):
from hermes_cli import model_catalog
with patch.object(model_catalog, "_fetch_manifest", return_value=None):
result = model_catalog.get_catalog(force_refresh=True)
assert result == {}
def test_network_failure_falls_back_to_disk_cache(self, isolated_home):
from hermes_cli import model_catalog
# Prime disk cache with a fresh copy.
manifest = _valid_manifest()
with patch.object(model_catalog, "_fetch_manifest", return_value=manifest):
model_catalog.get_catalog(force_refresh=True)
# Now wipe in-process cache and simulate network failure on refetch.
model_catalog.reset_cache()
with patch.object(model_catalog, "_fetch_manifest", return_value=None):
result = model_catalog.get_catalog(force_refresh=True)
assert result == manifest
def test_fetch_failure_falls_back_to_stale_cache(self, isolated_home):
from hermes_cli import model_catalog
manifest = _valid_manifest()
# Write stale cache directly (mtime in the past).
cache = model_catalog._cache_path()
cache.parent.mkdir(parents=True, exist_ok=True)
with open(cache, "w") as fh:
json.dump(manifest, fh)
old = time.time() - 30 * 24 * 3600 # 30 days ago
import os as _os
_os.utime(cache, (old, old))
with patch.object(model_catalog, "_fetch_manifest", return_value=None):
result = model_catalog.get_catalog()
# Stale cache is better than nothing.
assert result == manifest
class TestCuratedAccessors:
def test_openrouter_returns_tuples(self, isolated_home):
from hermes_cli import model_catalog
with patch.object(
model_catalog, "_fetch_manifest", return_value=_valid_manifest()
):
result = model_catalog.get_curated_openrouter_models()
assert result == [
("anthropic/claude-opus-4.7", "recommended"),
("openai/gpt-5.4", ""),
("openrouter/elephant-alpha", "free"),
]
def test_nous_returns_ids(self, isolated_home):
from hermes_cli import model_catalog
with patch.object(
model_catalog, "_fetch_manifest", return_value=_valid_manifest()
):
result = model_catalog.get_curated_nous_models()
assert result == ["anthropic/claude-opus-4.7", "moonshotai/kimi-k2.6"]
def test_openrouter_returns_none_when_catalog_empty(self, isolated_home):
from hermes_cli import model_catalog
with patch.object(model_catalog, "_fetch_manifest", return_value=None):
assert model_catalog.get_curated_openrouter_models() is None
def test_nous_returns_none_when_catalog_empty(self, isolated_home):
from hermes_cli import model_catalog
with patch.object(model_catalog, "_fetch_manifest", return_value=None):
assert model_catalog.get_curated_nous_models() is None
class TestDisabled:
def test_disabled_config_short_circuits(self, isolated_home):
from hermes_cli import model_catalog
with patch.object(
model_catalog,
"_load_catalog_config",
return_value={
"enabled": False,
"url": "http://ignored",
"ttl_hours": 24.0,
"providers": {},
},
):
with patch.object(model_catalog, "_fetch_manifest") as fetch:
result = model_catalog.get_catalog()
assert result == {}
fetch.assert_not_called()
class TestProviderOverride:
def test_override_url_takes_precedence(self, isolated_home):
from hermes_cli import model_catalog
override_payload = {
"version": 1,
"providers": {
"openrouter": {
"models": [
{"id": "override/model", "description": "custom"},
]
}
},
}
def fake_fetch(url, timeout):
if "override" in url:
return override_payload
return _valid_manifest()
with patch.object(
model_catalog,
"_load_catalog_config",
return_value={
"enabled": True,
"url": "http://master",
"ttl_hours": 24.0,
"providers": {"openrouter": {"url": "http://override"}},
},
):
with patch.object(model_catalog, "_fetch_manifest", side_effect=fake_fetch):
result = model_catalog.get_curated_openrouter_models()
assert result == [("override/model", "custom")]
class TestIntegrationWithModelsModule:
"""Exercise the fallback paths via the real callers in hermes_cli.models."""
def test_curated_nous_ids_falls_back_to_hardcoded_on_empty_catalog(
self, isolated_home
):
from hermes_cli import model_catalog
from hermes_cli.models import get_curated_nous_model_ids, _PROVIDER_MODELS
with patch.object(model_catalog, "_fetch_manifest", return_value=None):
result = get_curated_nous_model_ids()
assert result == list(_PROVIDER_MODELS["nous"])
def test_curated_nous_ids_prefers_manifest(self, isolated_home):
from hermes_cli import model_catalog
from hermes_cli.models import get_curated_nous_model_ids
with patch.object(
model_catalog, "_fetch_manifest", return_value=_valid_manifest()
):
result = get_curated_nous_model_ids()
assert result == ["anthropic/claude-opus-4.7", "moonshotai/kimi-k2.6"]

View File

@@ -56,7 +56,7 @@ def three_source_env(monkeypatch, hub_env):
import tools.skills_tool as skills_tool
monkeypatch.setattr(hub, "HubLockFile", lambda: _DummyLockFile([_HUB_ENTRY]))
monkeypatch.setattr(skills_tool, "_find_all_skills", lambda: list(_ALL_THREE_SKILLS))
monkeypatch.setattr(skills_tool, "_find_all_skills", lambda **_kwargs: list(_ALL_THREE_SKILLS))
monkeypatch.setattr(skills_sync, "_read_manifest", lambda: dict(_BUILTIN_MANIFEST))
return hub_env
@@ -107,7 +107,7 @@ def test_do_list_initializes_hub_dir(monkeypatch, hub_env):
import tools.skills_sync as skills_sync
import tools.skills_tool as skills_tool
monkeypatch.setattr(skills_tool, "_find_all_skills", lambda: [])
monkeypatch.setattr(skills_tool, "_find_all_skills", lambda **_kwargs: [])
monkeypatch.setattr(skills_sync, "_read_manifest", lambda: {})
hub_dir = hub_env
@@ -154,6 +154,74 @@ def test_do_list_filter_builtin(three_source_env):
assert "local-skill" not in output
def test_do_list_renders_status_column(three_source_env, monkeypatch):
"""Every list row should carry an enabled/disabled status (new in PR that
answered Mr Mochizuki's 'I just want to see what's live' question)."""
from agent import skill_utils
monkeypatch.setattr(skill_utils, "get_disabled_skill_names", lambda platform=None: set())
output = _capture()
assert "Status" in output
assert "enabled" in output.lower()
# Summary counts enabled skills.
assert "3 enabled, 0 disabled" in output
def test_do_list_marks_disabled_skills(three_source_env, monkeypatch):
from agent import skill_utils
# Simulate `skills.disabled: [hub-skill]` in config.
monkeypatch.setattr(
skill_utils, "get_disabled_skill_names",
lambda platform=None: {"hub-skill"},
)
output = _capture()
# Row still appears (no --enabled-only), but marked disabled
assert "hub-skill" in output
assert "disabled" in output.lower()
assert "2 enabled, 1 disabled" in output
def test_do_list_enabled_only_hides_disabled(three_source_env, monkeypatch):
from agent import skill_utils
monkeypatch.setattr(
skill_utils, "get_disabled_skill_names",
lambda platform=None: {"hub-skill"},
)
sink = StringIO()
console = Console(file=sink, force_terminal=False, color_system=None)
do_list(enabled_only=True, console=console)
output = sink.getvalue()
assert "hub-skill" not in output
assert "builtin-skill" in output
assert "local-skill" in output
assert "enabled only" in output.lower()
assert "2 enabled shown" in output
def test_do_list_platform_env_is_ignored(three_source_env, monkeypatch):
"""`hermes skills list` reads the active profile's config via
HERMES_HOME (swapped by -p), so it must NOT pass a platform arg to
``get_disabled_skill_names`` — otherwise per-platform overrides
would silently leak in from HERMES_PLATFORM env."""
from agent import skill_utils
seen = {}
def _fake(platform=None):
seen["platform"] = platform
return set()
monkeypatch.setattr(skill_utils, "get_disabled_skill_names", _fake)
_capture()
assert seen["platform"] is None
def test_do_check_reports_available_updates(monkeypatch):
output = _capture_check(monkeypatch, [
{"name": "hub-skill", "source": "skills.sh", "status": "update_available"},

View File

@@ -0,0 +1,78 @@
"""Behavior tests for the class-first skill review prompts.
The skill review / combined review prompts steer the background review agent
toward generalizing existing skills rather than accumulating near-duplicates.
These tests assert the behavioral *instructions* are present — they do NOT
snapshot the full prompt text (change-detector).
"""
from run_agent import AIAgent
def test_skill_review_prompt_instructs_survey_first():
"""Prompt must tell the reviewer to list existing skills before deciding."""
prompt = AIAgent._SKILL_REVIEW_PROMPT
assert "skills_list" in prompt, "must instruct the reviewer to call skills_list"
assert "skill_view" in prompt, "must instruct the reviewer to skill_view candidates"
assert "SURVEY" in prompt, "must name the survey step explicitly"
def test_skill_review_prompt_is_class_first():
"""Prompt must steer toward the CLASS of task, not the specific task."""
prompt = AIAgent._SKILL_REVIEW_PROMPT
assert "CLASS" in prompt, "must tell the reviewer to think about the task class"
assert "class level" in prompt, "must anchor naming at the class level"
def test_skill_review_prompt_prefers_updating_existing():
"""Prompt must prefer generalizing an existing skill over creating a new one."""
prompt = AIAgent._SKILL_REVIEW_PROMPT
assert "PREFER GENERALIZING" in prompt or "PREFER UPDATING" in prompt, (
"must state the update-over-create preference"
)
assert "ONLY CREATE A NEW SKILL" in prompt, (
"must gate new-skill creation behind a last-resort clause"
)
def test_skill_review_prompt_flags_overlap_for_followup():
"""Prompt must ask the reviewer to note overlapping skills for future review."""
prompt = AIAgent._SKILL_REVIEW_PROMPT
assert "overlap" in prompt.lower(), "must mention the overlap-flagging protocol"
def test_skill_review_prompt_preserves_opt_out_clause():
"""The 'Nothing to save.' escape clause must remain."""
prompt = AIAgent._SKILL_REVIEW_PROMPT
assert "Nothing to save." in prompt
def test_combined_review_prompt_keeps_memory_section():
"""Combined prompt must still cover memory review."""
prompt = AIAgent._COMBINED_REVIEW_PROMPT
assert "**Memory**" in prompt
assert "memory tool" in prompt
def test_combined_review_prompt_skills_section_is_class_first():
"""The **Skills** half of the combined prompt must follow the same protocol."""
prompt = AIAgent._COMBINED_REVIEW_PROMPT
assert "**Skills**" in prompt
assert "SURVEY" in prompt
assert "CLASS" in prompt
assert "skills_list" in prompt
assert "ONLY CREATE A NEW SKILL" in prompt
def test_combined_review_prompt_preserves_opt_out_clause():
prompt = AIAgent._COMBINED_REVIEW_PROMPT
assert "Nothing to save." in prompt
def test_memory_review_prompt_unchanged_in_structure():
"""Memory-only review prompt stays focused on user facts — not touched by this change."""
prompt = AIAgent._MEMORY_REVIEW_PROMPT
# Guardrails: the memory-only prompt must NOT mention skills/surveys.
assert "skills_list" not in prompt
assert "SURVEY" not in prompt
assert "memory tool" in prompt

View File

@@ -1485,6 +1485,48 @@ class TestListSessionsRich:
assert "\n" not in sessions[0]["preview"]
assert "Line one Line two" in sessions[0]["preview"]
def test_branch_session_visible_in_list(self, db):
"""Branch sessions (parent ended with 'branched') must appear in list_sessions_rich."""
db.create_session("parent", "cli")
db.end_session("parent", "branched")
db.create_session("branch", "cli", parent_session_id="parent")
db.append_message("branch", "user", "Exploring the alternative approach")
sessions = db.list_sessions_rich()
ids = [s["id"] for s in sessions]
assert "branch" in ids, "Branch session should be visible in default list"
def test_subagent_session_still_hidden(self, db):
"""Sub-agent children (parent NOT ended with 'branched') remain hidden."""
db.create_session("root", "cli")
db.create_session("delegate", "cli", parent_session_id="root")
sessions = db.list_sessions_rich()
ids = [s["id"] for s in sessions]
assert "delegate" not in ids, "Delegate sub-agent should not appear in default list"
assert "root" in ids
def test_compression_child_still_hidden(self, db):
"""Compression continuation sessions remain hidden (parent ended with 'compression')."""
import time as _time
t0 = _time.time()
db.create_session("root", "cli")
db._conn.execute("UPDATE sessions SET started_at=? WHERE id=?", (t0, "root"))
db._conn.execute(
"UPDATE sessions SET ended_at=?, end_reason='compression' WHERE id=?",
(t0 + 1800, "root"),
)
db._conn.commit()
db.create_session("continuation", "cli", parent_session_id="root")
db._conn.execute(
"UPDATE sessions SET started_at=? WHERE id=?", (t0 + 1801, "continuation")
)
db._conn.commit()
sessions = db.list_sessions_rich(project_compression_tips=False)
ids = [s["id"] for s in sessions]
assert "continuation" not in ids, "Compression continuation should stay hidden"
class TestCompressionChainProjection:
"""Tests for lineage-aware list_sessions_rich — compressed conversations

View File

@@ -1835,3 +1835,112 @@ def test_model_options_propagates_list_exception(monkeypatch):
assert "error" in resp
assert resp["error"]["code"] == 5033
assert "catalog blew up" in resp["error"]["message"]
# ---------------------------------------------------------------------------
# prompt.submit — auto-title
# ---------------------------------------------------------------------------
class _ImmediateThread:
"""Runs the target callable synchronously so assertions can follow."""
def __init__(self, target=None, daemon=None):
self._target = target
def start(self):
self._target()
def test_prompt_submit_auto_titles_session_on_complete(monkeypatch):
"""maybe_auto_title is called after a successful (complete) prompt."""
class _Agent:
def run_conversation(self, prompt, conversation_history=None, stream_callback=None):
return {
"final_response": "Rome was founded in 753 BC.",
"messages": [
{"role": "user", "content": "Tell me about Rome"},
{"role": "assistant", "content": "Rome was founded in 753 BC."},
],
}
server._sessions["sid"] = _session(agent=_Agent())
monkeypatch.setattr(server.threading, "Thread", _ImmediateThread)
monkeypatch.setattr(server, "_emit", lambda *args, **kwargs: None)
monkeypatch.setattr(server, "make_stream_renderer", lambda cols: None)
monkeypatch.setattr(server, "render_message", lambda raw, cols: None)
monkeypatch.setattr(server, "_get_db", lambda: None)
with patch("agent.title_generator.maybe_auto_title") as mock_title:
server.handle_request(
{
"id": "1",
"method": "prompt.submit",
"params": {"session_id": "sid", "text": "Tell me about Rome"},
}
)
mock_title.assert_called_once()
args = mock_title.call_args.args
assert args[1] == "session-key"
assert args[2] == "Tell me about Rome"
assert args[3] == "Rome was founded in 753 BC."
def test_prompt_submit_skips_auto_title_when_interrupted(monkeypatch):
"""maybe_auto_title must NOT be called when the agent was interrupted."""
class _Agent:
def run_conversation(self, prompt, conversation_history=None, stream_callback=None):
return {
"final_response": "partial answer",
"interrupted": True,
"messages": [],
}
server._sessions["sid"] = _session(agent=_Agent())
monkeypatch.setattr(server.threading, "Thread", _ImmediateThread)
monkeypatch.setattr(server, "_emit", lambda *args, **kwargs: None)
monkeypatch.setattr(server, "make_stream_renderer", lambda cols: None)
monkeypatch.setattr(server, "render_message", lambda raw, cols: None)
monkeypatch.setattr(server, "_get_db", lambda: None)
with patch("agent.title_generator.maybe_auto_title") as mock_title:
server.handle_request(
{
"id": "1",
"method": "prompt.submit",
"params": {"session_id": "sid", "text": "Tell me about Rome"},
}
)
mock_title.assert_not_called()
def test_prompt_submit_skips_auto_title_when_response_empty(monkeypatch):
"""maybe_auto_title must NOT be called when the agent returns an empty reply."""
class _Agent:
def run_conversation(self, prompt, conversation_history=None, stream_callback=None):
return {
"final_response": "",
"messages": [],
}
server._sessions["sid"] = _session(agent=_Agent())
monkeypatch.setattr(server.threading, "Thread", _ImmediateThread)
monkeypatch.setattr(server, "_emit", lambda *args, **kwargs: None)
monkeypatch.setattr(server, "make_stream_renderer", lambda cols: None)
monkeypatch.setattr(server, "render_message", lambda raw, cols: None)
monkeypatch.setattr(server, "_get_db", lambda: None)
with patch("agent.title_generator.maybe_auto_title") as mock_title:
server.handle_request(
{
"id": "1",
"method": "prompt.submit",
"params": {"session_id": "sid", "text": "Tell me about Rome"},
}
)
mock_title.assert_not_called()

View File

@@ -0,0 +1,248 @@
"""Tests for hybrid browser-backend routing (LAN/localhost auto-local).
When a cloud browser provider (Browserbase / Browser-Use / Firecrawl) is
configured globally, ``browser.auto_local_for_private_urls`` (default True)
causes ``browser_navigate`` to transparently spawn a local Chromium sidecar
for URLs whose host resolves to a private/loopback/LAN address, while
public URLs continue to hit the cloud session in the same conversation.
These tests cover the routing decision layer — session_key selection,
sidecar detection, last-active-session tracking, and the config toggle.
The downstream session creation is covered by test_browser_cloud_fallback.py.
"""
from unittest.mock import Mock
import pytest
import tools.browser_tool as browser_tool
@pytest.fixture(autouse=True)
def _reset_routing_state(monkeypatch):
"""Clear module-level caches so each test starts clean."""
monkeypatch.setattr(browser_tool, "_active_sessions", {})
monkeypatch.setattr(browser_tool, "_last_active_session_key", {})
monkeypatch.setattr(browser_tool, "_cached_cloud_provider", None)
monkeypatch.setattr(browser_tool, "_cloud_provider_resolved", False)
monkeypatch.setattr(browser_tool, "_auto_local_for_private_urls_resolved", False)
monkeypatch.setattr(browser_tool, "_cached_auto_local_for_private_urls", True)
monkeypatch.setattr(browser_tool, "_start_browser_cleanup_thread", lambda: None)
monkeypatch.setattr(browser_tool, "_update_session_activity", lambda t: None)
# Default: no CDP override, no Camofox
monkeypatch.setattr(browser_tool, "_get_cdp_override", lambda: None)
monkeypatch.setattr(browser_tool, "_is_camofox_mode", lambda: False)
class TestNavigationSessionKey:
"""Tests for _navigation_session_key URL-based routing decisions."""
def test_public_url_uses_bare_task_id(self, monkeypatch):
"""Public URL with cloud provider configured → bare task_id (cloud)."""
monkeypatch.setattr(browser_tool, "_get_cloud_provider", lambda: Mock())
key = browser_tool._navigation_session_key("default", "https://github.com/x/y")
assert key == "default"
def test_localhost_routes_to_local_sidecar(self, monkeypatch):
"""``localhost`` URL → ``::local`` suffix when cloud configured + flag on."""
monkeypatch.setattr(browser_tool, "_get_cloud_provider", lambda: Mock())
key = browser_tool._navigation_session_key("default", "http://localhost:3000/")
assert key == "default::local"
def test_loopback_ipv4_routes_to_local_sidecar(self, monkeypatch):
monkeypatch.setattr(browser_tool, "_get_cloud_provider", lambda: Mock())
key = browser_tool._navigation_session_key("default", "http://127.0.0.1:8080/")
assert key == "default::local"
def test_rfc1918_lan_routes_to_local_sidecar(self, monkeypatch):
monkeypatch.setattr(browser_tool, "_get_cloud_provider", lambda: Mock())
key = browser_tool._navigation_session_key("default", "http://192.168.1.50:8000/")
assert key == "default::local"
def test_ipv6_loopback_routes_to_local_sidecar(self, monkeypatch):
monkeypatch.setattr(browser_tool, "_get_cloud_provider", lambda: Mock())
key = browser_tool._navigation_session_key("default", "http://[::1]:3000/")
assert key == "default::local"
def test_public_ip_literal_uses_bare_task_id(self, monkeypatch):
monkeypatch.setattr(browser_tool, "_get_cloud_provider", lambda: Mock())
key = browser_tool._navigation_session_key("default", "https://8.8.8.8/")
assert key == "default"
def test_mdns_local_hostname_routes_to_sidecar(self, monkeypatch):
"""``*.local`` mDNS / ``*.lan`` / ``*.internal`` hostnames route to sidecar."""
monkeypatch.setattr(browser_tool, "_get_cloud_provider", lambda: Mock())
for host in ("raspberrypi.local", "printer.lan", "db.internal"):
key = browser_tool._navigation_session_key("default", f"http://{host}/")
assert key == "default::local", f"host {host!r} did not route to sidecar"
def test_no_cloud_provider_stays_on_bare_task_id(self, monkeypatch):
"""When cloud provider is not configured, no hybrid routing happens."""
monkeypatch.setattr(browser_tool, "_get_cloud_provider", lambda: None)
key = browser_tool._navigation_session_key("default", "http://localhost:3000/")
assert key == "default"
def test_camofox_mode_stays_on_bare_task_id(self, monkeypatch):
"""Camofox is already local — no hybrid routing needed."""
monkeypatch.setattr(browser_tool, "_get_cloud_provider", lambda: Mock())
monkeypatch.setattr(browser_tool, "_is_camofox_mode", lambda: True)
key = browser_tool._navigation_session_key("default", "http://localhost:3000/")
assert key == "default"
def test_cdp_override_stays_on_bare_task_id(self, monkeypatch):
"""A user-supplied CDP endpoint owns the whole session — no hybrid."""
monkeypatch.setattr(browser_tool, "_get_cloud_provider", lambda: Mock())
monkeypatch.setattr(browser_tool, "_get_cdp_override", lambda: "ws://localhost:9222")
key = browser_tool._navigation_session_key("default", "http://localhost:3000/")
assert key == "default"
def test_feature_flag_off_disables_hybrid_routing(self, monkeypatch):
"""``auto_local_for_private_urls: false`` keeps private URLs on cloud."""
monkeypatch.setattr(browser_tool, "_get_cloud_provider", lambda: Mock())
monkeypatch.setattr(browser_tool, "_auto_local_for_private_urls", lambda: False)
key = browser_tool._navigation_session_key("default", "http://localhost:3000/")
assert key == "default"
def test_none_task_id_defaults(self, monkeypatch):
"""``None`` task_id resolves to 'default'."""
monkeypatch.setattr(browser_tool, "_get_cloud_provider", lambda: Mock())
key = browser_tool._navigation_session_key(None, "http://localhost:3000/")
assert key == "default::local"
class TestSessionKeyHelpers:
def test_is_local_sidecar_key(self):
assert browser_tool._is_local_sidecar_key("default::local")
assert browser_tool._is_local_sidecar_key("my_task::local")
assert not browser_tool._is_local_sidecar_key("default")
assert not browser_tool._is_local_sidecar_key("my_task")
def test_last_session_key_falls_back_to_task_id(self, monkeypatch):
"""Without a recorded last-active key, returns the bare task_id."""
monkeypatch.setattr(browser_tool, "_last_active_session_key", {})
assert browser_tool._last_session_key("default") == "default"
assert browser_tool._last_session_key("task-42") == "task-42"
assert browser_tool._last_session_key(None) == "default"
def test_last_session_key_returns_recorded_key(self, monkeypatch):
monkeypatch.setattr(
browser_tool,
"_last_active_session_key",
{"default": "default::local", "task-42": "task-42"},
)
assert browser_tool._last_session_key("default") == "default::local"
assert browser_tool._last_session_key("task-42") == "task-42"
# Unknown task_id still falls back
assert browser_tool._last_session_key("other") == "other"
class TestHybridRoutingSessionCreation:
"""_get_session_info must force a local session when the key carries ``::local``."""
def test_local_sidecar_key_skips_cloud_provider(self, monkeypatch):
"""A ``::local``-suffixed key creates a local session even when cloud is set."""
provider = Mock()
provider.create_session.return_value = {
"session_name": "should_not_be_used",
"bb_session_id": "bb_xxx",
"cdp_url": "wss://fake.browserbase.com/ws",
}
monkeypatch.setattr(browser_tool, "_get_cloud_provider", lambda: provider)
monkeypatch.setattr(browser_tool, "_ensure_cdp_supervisor", lambda t: None)
session = browser_tool._get_session_info("default::local")
assert provider.create_session.call_count == 0
assert session["bb_session_id"] is None
assert session["cdp_url"] is None
assert session["features"]["local"] is True
def test_bare_task_id_with_cloud_provider_uses_cloud(self, monkeypatch):
"""A bare task_id with cloud provider configured hits the cloud path."""
provider = Mock()
provider.create_session.return_value = {
"session_name": "cloud-sess",
"bb_session_id": "bb_123",
"cdp_url": "wss://real.browserbase.com/ws",
}
monkeypatch.setattr(browser_tool, "_get_cloud_provider", lambda: provider)
monkeypatch.setattr(browser_tool, "_ensure_cdp_supervisor", lambda t: None)
monkeypatch.setattr(browser_tool, "_resolve_cdp_override", lambda u: u)
session = browser_tool._get_session_info("default")
assert provider.create_session.call_count == 1
assert session["bb_session_id"] == "bb_123"
class TestCleanupHybridSessions:
"""cleanup_browser(bare_task_id) must reap both cloud + local sidecar sessions."""
def test_cleanup_reaps_both_primary_and_sidecar(self, monkeypatch):
"""Given a bare task_id with both sessions alive, both get cleaned."""
reaped = []
def _fake_cleanup_one(key):
reaped.append(key)
monkeypatch.setattr(browser_tool, "_cleanup_single_browser_session", _fake_cleanup_one)
monkeypatch.setattr(
browser_tool,
"_active_sessions",
{
"default": {"session_name": "cloud_sess"},
"default::local": {"session_name": "local_sess"},
},
)
monkeypatch.setattr(
browser_tool, "_last_active_session_key", {"default": "default::local"}
)
browser_tool.cleanup_browser("default")
assert set(reaped) == {"default", "default::local"}
# last-active pointer dropped
assert "default" not in browser_tool._last_active_session_key
def test_cleanup_reaps_only_primary_when_no_sidecar(self, monkeypatch):
"""When no sidecar exists, only the primary is reaped."""
reaped = []
def _fake_cleanup_one(key):
reaped.append(key)
monkeypatch.setattr(browser_tool, "_cleanup_single_browser_session", _fake_cleanup_one)
monkeypatch.setattr(
browser_tool,
"_active_sessions",
{"default": {"session_name": "cloud_sess"}},
)
browser_tool.cleanup_browser("default")
assert reaped == ["default"]
def test_cleanup_sidecar_directly_keeps_primary(self, monkeypatch):
"""Calling cleanup with a ``::local`` key reaps only the sidecar."""
reaped = []
def _fake_cleanup_one(key):
reaped.append(key)
monkeypatch.setattr(browser_tool, "_cleanup_single_browser_session", _fake_cleanup_one)
monkeypatch.setattr(
browser_tool,
"_active_sessions",
{
"default": {"session_name": "cloud_sess"},
"default::local": {"session_name": "local_sess"},
},
)
monkeypatch.setattr(
browser_tool, "_last_active_session_key", {"default": "default::local"}
)
browser_tool.cleanup_browser("default::local")
assert reaped == ["default::local"]
# Last-active pointer NOT dropped (primary task is still alive)
assert browser_tool._last_active_session_key.get("default") == "default::local"

View File

@@ -0,0 +1,210 @@
"""Tests for credential_pool .env fallback and auth credential_pool lookup.
Covers the fix from #15914 / PR #15920:
- _seed_from_env reads API keys from ~/.hermes/.env when not in os.environ
- _resolve_api_key_provider_secret falls back to credential_pool when env vars are empty
- env vars take priority over .env file (handled by get_env_value itself)
- env vars take priority over credential pool (fallback only kicks in when env is empty)
"""
import os
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
def _make_pconfig(provider_id="deepseek", env_vars=None):
"""Create a minimal ProviderConfig for testing.
Default provider_id is 'deepseek' because it's a real api_key provider
in PROVIDER_REGISTRY (needed for _seed_from_env's generic path).
"""
from hermes_cli.auth import ProviderConfig
return ProviderConfig(
id=provider_id,
name=provider_id.title(),
auth_type="api_key",
api_key_env_vars=tuple(env_vars or [f"{provider_id.upper()}_API_KEY"]),
)
@pytest.fixture
def isolated_hermes_home(tmp_path, monkeypatch):
"""Point HERMES_HOME at a temp dir and clear known API key env vars.
Also invalidates any cached get_env_value state by patching Path.home().
"""
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setattr(Path, "home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(home))
# Clear all known API key env vars so get_env_value falls through to .env
for key in [
"OPENAI_API_KEY", "ANTHROPIC_API_KEY", "OPENROUTER_API_KEY",
"ZAI_API_KEY", "DEEPSEEK_API_KEY", "ANTHROPIC_TOKEN",
"CLAUDE_CODE_OAUTH_TOKEN", "OPENAI_BASE_URL",
]:
monkeypatch.delenv(key, raising=False)
return home
def _write_env_file(home: Path, **kwargs) -> None:
"""Write key=value pairs to ~/.hermes/.env."""
lines = [f"{k}={v}" for k, v in kwargs.items()]
(home / ".env").write_text("\n".join(lines) + "\n")
class TestCredentialPoolSeedsFromDotEnv:
"""_seed_from_env must read keys from ~/.hermes/.env, not just os.environ.
This is the load-bearing behaviour for the fix: when a user adds a key to
.env mid-session or via a non-CLI entry point that doesn't run
load_hermes_dotenv, the credential pool must still discover it.
"""
def test_deepseek_key_from_dotenv_only(self, isolated_hermes_home):
"""Key in .env but not os.environ → _seed_from_env adds a pool entry."""
_write_env_file(isolated_hermes_home, DEEPSEEK_API_KEY="sk-dotenv-only-12345")
assert "DEEPSEEK_API_KEY" not in os.environ
from agent.credential_pool import _seed_from_env
entries = []
changed, active_sources = _seed_from_env("deepseek", entries)
assert changed is True
assert "env:DEEPSEEK_API_KEY" in active_sources
assert any(
e.access_token == "sk-dotenv-only-12345"
and e.source == "env:DEEPSEEK_API_KEY"
for e in entries
), f"Expected seeded entry with dotenv key, got: {[(e.source, e.access_token) for e in entries]}"
def test_openrouter_key_from_dotenv_only(self, isolated_hermes_home):
"""OpenRouter path has its own branch — verify it also reads .env."""
_write_env_file(isolated_hermes_home, OPENROUTER_API_KEY="sk-or-dotenv-abc")
assert "OPENROUTER_API_KEY" not in os.environ
from agent.credential_pool import _seed_from_env
entries = []
changed, active_sources = _seed_from_env("openrouter", entries)
assert changed is True
assert "env:OPENROUTER_API_KEY" in active_sources
assert any(
e.access_token == "sk-or-dotenv-abc" for e in entries
)
def test_empty_dotenv_no_entries(self, isolated_hermes_home):
"""No .env file, no env vars → no entries seeded (and no crash)."""
from agent.credential_pool import _seed_from_env
entries = []
changed, active_sources = _seed_from_env("deepseek", entries)
assert changed is False
assert active_sources == set()
assert entries == []
def test_os_environ_still_wins_over_dotenv(self, isolated_hermes_home, monkeypatch):
"""get_env_value checks os.environ first — verify seeding picks that up."""
_write_env_file(isolated_hermes_home, DEEPSEEK_API_KEY="sk-dotenv-stale")
monkeypatch.setenv("DEEPSEEK_API_KEY", "sk-env-fresh-xyz")
from agent.credential_pool import _seed_from_env
entries = []
changed, _ = _seed_from_env("deepseek", entries)
assert changed is True
seeded = [e for e in entries if e.source == "env:DEEPSEEK_API_KEY"]
assert len(seeded) == 1
assert seeded[0].access_token == "sk-env-fresh-xyz"
class TestAuthResolvesFromDotEnv:
"""_resolve_api_key_provider_secret must also read from ~/.hermes/.env."""
def test_key_from_dotenv_only(self, isolated_hermes_home):
"""Key in .env but not os.environ → _resolve returns it with the env var source."""
_write_env_file(isolated_hermes_home, DEEPSEEK_API_KEY="sk-dotenv-resolve-789")
assert "DEEPSEEK_API_KEY" not in os.environ
from hermes_cli.auth import _resolve_api_key_provider_secret
key, source = _resolve_api_key_provider_secret(
provider_id="deepseek",
pconfig=_make_pconfig(),
)
assert key == "sk-dotenv-resolve-789"
assert source == "DEEPSEEK_API_KEY"
class TestAuthCredentialPoolFallback:
"""_resolve_api_key_provider_secret falls back to credential pool when env + dotenv are empty."""
def test_credential_pool_fallback_structure(self, isolated_hermes_home):
"""Empty env + empty .env → auth falls back to credential pool."""
mock_entry = MagicMock()
mock_entry.access_token = "test-pool-key-12345"
mock_entry.runtime_api_key = ""
mock_pool = MagicMock()
mock_pool.has_credentials.return_value = True
mock_pool.peek.return_value = mock_entry
from hermes_cli.auth import _resolve_api_key_provider_secret
with patch("agent.credential_pool.load_pool", return_value=mock_pool):
key, source = _resolve_api_key_provider_secret(
provider_id="deepseek",
pconfig=_make_pconfig(),
)
assert "test-pool-key-12345" in key
assert "credential_pool" in source
def test_credential_pool_empty_returns_empty(self, isolated_hermes_home):
"""Empty env + empty .env + empty pool → empty string."""
mock_pool = MagicMock()
mock_pool.has_credentials.return_value = False
from hermes_cli.auth import _resolve_api_key_provider_secret
with patch("agent.credential_pool.load_pool", return_value=mock_pool):
key, source = _resolve_api_key_provider_secret(
provider_id="deepseek",
pconfig=_make_pconfig(),
)
assert key == ""
def test_env_var_takes_priority_over_pool(self, isolated_hermes_home, monkeypatch):
"""os.environ key wins — credential pool is NEVER consulted."""
monkeypatch.setenv("DEEPSEEK_API_KEY", "sk-env-key-first-abc123")
mock_pool = MagicMock()
mock_pool.has_credentials.return_value = True
from hermes_cli.auth import _resolve_api_key_provider_secret
with patch("agent.credential_pool.load_pool", return_value=mock_pool) as mp:
key, source = _resolve_api_key_provider_secret(
provider_id="deepseek",
pconfig=_make_pconfig(),
)
assert key == "sk-env-key-first-abc123"
assert source == "DEEPSEEK_API_KEY"
# Pool should not even have been loaded — env var satisfied the request first
mp.assert_not_called()
def test_dotenv_takes_priority_over_pool(self, isolated_hermes_home):
"""Key in .env beats credential pool — pool only fires when both env sources are empty."""
_write_env_file(isolated_hermes_home, DEEPSEEK_API_KEY="sk-dotenv-priority-xyz")
assert "DEEPSEEK_API_KEY" not in os.environ
mock_pool = MagicMock()
mock_pool.has_credentials.return_value = True
from hermes_cli.auth import _resolve_api_key_provider_secret
with patch("agent.credential_pool.load_pool", return_value=mock_pool) as mp:
key, source = _resolve_api_key_provider_secret(
provider_id="deepseek",
pconfig=_make_pconfig(),
)
assert key == "sk-dotenv-priority-xyz"
assert source == "DEEPSEEK_API_KEY"
mp.assert_not_called()

View File

@@ -491,11 +491,36 @@ def test_configure_callback_port_uses_explicit_port():
assert cfg["_resolved_port"] == 54321
def test_parse_base_url_strips_path():
"""_parse_base_url drops path components for OAuth discovery."""
from tools.mcp_oauth import _parse_base_url
def test_build_oauth_auth_preserves_server_url_path():
"""server_url with path is forwarded to OAuthClientProvider unmodified.
Regression for #16015: previously ``_parse_base_url`` stripped the path,
collapsing ``https://mcp.notion.com/mcp`` to ``https://mcp.notion.com`` and
breaking RFC 9728 protected-resource validation against servers whose PRM
advertises a path-scoped resource (Notion). The MCP SDK strips the path
itself for authorization-server discovery via
``OAuthContext.get_authorization_base_url``; Hermes must not pre-strip.
"""
from tools import mcp_oauth
captured: dict = {}
class _FakeProvider:
def __init__(self, **kwargs):
captured.update(kwargs)
with patch.object(mcp_oauth, "_OAUTH_AVAILABLE", True), \
patch.object(mcp_oauth, "OAuthClientProvider", _FakeProvider), \
patch.object(mcp_oauth, "_is_interactive", return_value=True), \
patch.object(mcp_oauth, "_maybe_preregister_client"), \
patch.object(mcp_oauth, "HermesTokenStorage") as mock_storage_cls:
mock_storage_cls.return_value = MagicMock(has_cached_tokens=lambda: True)
build_oauth_auth(
server_name="notion",
server_url="https://mcp.notion.com/mcp",
oauth_config={},
)
assert captured["server_url"] == "https://mcp.notion.com/mcp"
assert _parse_base_url("https://example.com/mcp/v1") == "https://example.com"
assert _parse_base_url("https://example.com") == "https://example.com"
assert _parse_base_url("https://host.example.com:8080/api") == "https://host.example.com:8080"