From de03a332f7b5dc9570e23926c31ce9f840888410 Mon Sep 17 00:00:00 2001 From: hharry11 Date: Tue, 28 Apr 2026 09:28:19 +0300 Subject: [PATCH] fix(security): isolate interactive sudo password cache per session --- tests/acp/test_approval_isolation.py | 23 ++++++++++ tests/tools/test_terminal_tool.py | 20 ++++++-- tools/terminal_tool.py | 68 +++++++++++++++++++++++++--- 3 files changed, 102 insertions(+), 9 deletions(-) diff --git a/tests/acp/test_approval_isolation.py b/tests/acp/test_approval_isolation.py index 90ea4e063..3099ef9e2 100644 --- a/tests/acp/test_approval_isolation.py +++ b/tests/acp/test_approval_isolation.py @@ -118,6 +118,29 @@ class TestThreadLocalApprovalCallback: assert worker_saw == [None] assert _get_sudo_password_callback() is cb_main + def test_sudo_password_cache_does_not_leak_across_threads(self): + """Interactive sudo cache must not bleed into another executor thread.""" + from tools.terminal_tool import ( + _get_cached_sudo_password, + _reset_cached_sudo_passwords, + _set_cached_sudo_password, + ) + + _reset_cached_sudo_passwords() + _set_cached_sudo_password("main-thread-password") + + worker_saw = [] + + def worker(): + worker_saw.append(_get_cached_sudo_password()) + + t = threading.Thread(target=worker) + t.start() + t.join() + + assert worker_saw == [""] + assert _get_cached_sudo_password() == "main-thread-password" + class TestAcpExecAskGate: """GHSA-96vc-wcxf-jjff: ACP's _run_agent must set HERMES_INTERACTIVE so diff --git a/tests/tools/test_terminal_tool.py b/tests/tools/test_terminal_tool.py index dd2a67418..9245d9c6b 100644 --- a/tests/tools/test_terminal_tool.py +++ b/tests/tools/test_terminal_tool.py @@ -4,11 +4,11 @@ import tools.terminal_tool as terminal_tool def setup_function(): - terminal_tool._cached_sudo_password = "" + terminal_tool._reset_cached_sudo_passwords() def teardown_function(): - terminal_tool._cached_sudo_password = "" + terminal_tool._reset_cached_sudo_passwords() def test_searching_for_sudo_does_not_trigger_rewrite(monkeypatch): @@ -82,7 +82,7 @@ def test_explicit_empty_sudo_password_tries_empty_without_prompt(monkeypatch): def test_cached_sudo_password_is_used_when_env_is_unset(monkeypatch): monkeypatch.delenv("SUDO_PASSWORD", raising=False) monkeypatch.delenv("HERMES_INTERACTIVE", raising=False) - terminal_tool._cached_sudo_password = "cached-pass" + terminal_tool._set_cached_sudo_password("cached-pass") transformed, sudo_stdin = terminal_tool._transform_sudo_command("echo ok && sudo whoami") @@ -90,6 +90,20 @@ def test_cached_sudo_password_is_used_when_env_is_unset(monkeypatch): assert sudo_stdin == "cached-pass\n" +def test_cached_sudo_password_isolated_by_session_key(monkeypatch): + monkeypatch.delenv("SUDO_PASSWORD", raising=False) + monkeypatch.delenv("HERMES_INTERACTIVE", raising=False) + + monkeypatch.setenv("HERMES_SESSION_KEY", "session-a") + terminal_tool._set_cached_sudo_password("alpha-pass") + + monkeypatch.setenv("HERMES_SESSION_KEY", "session-b") + assert terminal_tool._get_cached_sudo_password() == "" + + monkeypatch.setenv("HERMES_SESSION_KEY", "session-a") + assert terminal_tool._get_cached_sudo_password() == "alpha-pass" + + def test_validate_workdir_allows_windows_drive_paths(): assert terminal_tool._validate_workdir(r"C:\Users\Alice\project") is None assert terminal_tool._validate_workdir("C:/Users/Alice/project") is None diff --git a/tools/terminal_tool.py b/tools/terminal_tool.py index 105c5aa85..24a6bff40 100644 --- a/tools/terminal_tool.py +++ b/tools/terminal_tool.py @@ -145,8 +145,14 @@ def _check_disk_usage_warning(): return False -# Session-cached sudo password (persists until CLI exits) -_cached_sudo_password: str = "" +# Interactive sudo password cache. +# +# Scope the cache to the active session when a session key is available, then +# fall back to callback identity (ACP / CLI interactive callbacks), then the +# current thread. This prevents one interactive session from reusing another +# session's cached sudo password inside the same long-lived process. +_sudo_password_cache: dict[str, str] = {} +_sudo_password_cache_lock = threading.Lock() # Optional UI callbacks for interactive prompts. When set, these are called # instead of the default /dev/tty or input() readers. The CLI registers these @@ -190,6 +196,54 @@ def set_approval_callback(cb): """ _callback_tls.approval = cb + +def _get_sudo_password_cache_scope() -> str: + """Return the cache scope for interactive sudo passwords.""" + try: + from gateway.session_context import get_session_env + + session_key = get_session_env("HERMES_SESSION_KEY", "") + except Exception: + session_key = os.getenv("HERMES_SESSION_KEY", "") + if session_key: + return f"session:{session_key}" + + callback = _get_sudo_password_callback() + if callback is not None: + owner = getattr(callback, "__self__", None) + func = getattr(callback, "__func__", None) + if owner is not None and func is not None: + return f"callback-owner:{id(owner)}:{id(func)}" + return f"callback:{id(callback)}" + + return f"thread:{threading.get_ident()}" + + +def _get_cached_sudo_password() -> str: + """Return the cached sudo password for the current scope.""" + scope = _get_sudo_password_cache_scope() + with _sudo_password_cache_lock: + return _sudo_password_cache.get(scope, "") + + +def _set_cached_sudo_password(password: str) -> None: + """Persist a sudo password for the current scope.""" + scope = _get_sudo_password_cache_scope() + with _sudo_password_cache_lock: + if password: + _sudo_password_cache[scope] = password + else: + _sudo_password_cache.pop(scope, None) + + +def _reset_cached_sudo_passwords() -> None: + """Clear all cached sudo passwords. + + Internal helper for tests and process teardown paths. + """ + with _sudo_password_cache_lock: + _sudo_password_cache.clear() + # ============================================================================= # Dangerous Command Approval System # ============================================================================= @@ -700,8 +754,6 @@ def _transform_sudo_command(command: str | None) -> tuple[str | None, str | None If SUDO_PASSWORD is not set and NOT interactive: Command runs as-is (fails gracefully with "sudo: a password is required"). """ - global _cached_sudo_password - if command is None: return None, None transformed, has_real_sudo = _rewrite_real_sudo_invocations(command) @@ -709,12 +761,16 @@ def _transform_sudo_command(command: str | None) -> tuple[str | None, str | None return command, None has_configured_password = "SUDO_PASSWORD" in os.environ - sudo_password = os.environ.get("SUDO_PASSWORD", "") if has_configured_password else _cached_sudo_password + sudo_password = ( + os.environ.get("SUDO_PASSWORD", "") + if has_configured_password + else _get_cached_sudo_password() + ) if not has_configured_password and not sudo_password and os.getenv("HERMES_INTERACTIVE"): sudo_password = _prompt_for_sudo_password(timeout_seconds=45) if sudo_password: - _cached_sudo_password = sudo_password + _set_cached_sudo_password(sudo_password) if has_configured_password or sudo_password: # Trailing newline is required: sudo -S reads one line for the password.