fix(security): isolate interactive sudo password cache per session
This commit is contained in:
@@ -118,6 +118,29 @@ class TestThreadLocalApprovalCallback:
|
||||
assert worker_saw == [None]
|
||||
assert _get_sudo_password_callback() is cb_main
|
||||
|
||||
def test_sudo_password_cache_does_not_leak_across_threads(self):
|
||||
"""Interactive sudo cache must not bleed into another executor thread."""
|
||||
from tools.terminal_tool import (
|
||||
_get_cached_sudo_password,
|
||||
_reset_cached_sudo_passwords,
|
||||
_set_cached_sudo_password,
|
||||
)
|
||||
|
||||
_reset_cached_sudo_passwords()
|
||||
_set_cached_sudo_password("main-thread-password")
|
||||
|
||||
worker_saw = []
|
||||
|
||||
def worker():
|
||||
worker_saw.append(_get_cached_sudo_password())
|
||||
|
||||
t = threading.Thread(target=worker)
|
||||
t.start()
|
||||
t.join()
|
||||
|
||||
assert worker_saw == [""]
|
||||
assert _get_cached_sudo_password() == "main-thread-password"
|
||||
|
||||
|
||||
class TestAcpExecAskGate:
|
||||
"""GHSA-96vc-wcxf-jjff: ACP's _run_agent must set HERMES_INTERACTIVE so
|
||||
|
||||
@@ -4,11 +4,11 @@ import tools.terminal_tool as terminal_tool
|
||||
|
||||
|
||||
def setup_function():
|
||||
terminal_tool._cached_sudo_password = ""
|
||||
terminal_tool._reset_cached_sudo_passwords()
|
||||
|
||||
|
||||
def teardown_function():
|
||||
terminal_tool._cached_sudo_password = ""
|
||||
terminal_tool._reset_cached_sudo_passwords()
|
||||
|
||||
|
||||
def test_searching_for_sudo_does_not_trigger_rewrite(monkeypatch):
|
||||
@@ -82,7 +82,7 @@ def test_explicit_empty_sudo_password_tries_empty_without_prompt(monkeypatch):
|
||||
def test_cached_sudo_password_is_used_when_env_is_unset(monkeypatch):
|
||||
monkeypatch.delenv("SUDO_PASSWORD", raising=False)
|
||||
monkeypatch.delenv("HERMES_INTERACTIVE", raising=False)
|
||||
terminal_tool._cached_sudo_password = "cached-pass"
|
||||
terminal_tool._set_cached_sudo_password("cached-pass")
|
||||
|
||||
transformed, sudo_stdin = terminal_tool._transform_sudo_command("echo ok && sudo whoami")
|
||||
|
||||
@@ -90,6 +90,20 @@ def test_cached_sudo_password_is_used_when_env_is_unset(monkeypatch):
|
||||
assert sudo_stdin == "cached-pass\n"
|
||||
|
||||
|
||||
def test_cached_sudo_password_isolated_by_session_key(monkeypatch):
|
||||
monkeypatch.delenv("SUDO_PASSWORD", raising=False)
|
||||
monkeypatch.delenv("HERMES_INTERACTIVE", raising=False)
|
||||
|
||||
monkeypatch.setenv("HERMES_SESSION_KEY", "session-a")
|
||||
terminal_tool._set_cached_sudo_password("alpha-pass")
|
||||
|
||||
monkeypatch.setenv("HERMES_SESSION_KEY", "session-b")
|
||||
assert terminal_tool._get_cached_sudo_password() == ""
|
||||
|
||||
monkeypatch.setenv("HERMES_SESSION_KEY", "session-a")
|
||||
assert terminal_tool._get_cached_sudo_password() == "alpha-pass"
|
||||
|
||||
|
||||
def test_validate_workdir_allows_windows_drive_paths():
|
||||
assert terminal_tool._validate_workdir(r"C:\Users\Alice\project") is None
|
||||
assert terminal_tool._validate_workdir("C:/Users/Alice/project") is None
|
||||
|
||||
@@ -145,8 +145,14 @@ def _check_disk_usage_warning():
|
||||
return False
|
||||
|
||||
|
||||
# Session-cached sudo password (persists until CLI exits)
|
||||
_cached_sudo_password: str = ""
|
||||
# Interactive sudo password cache.
|
||||
#
|
||||
# Scope the cache to the active session when a session key is available, then
|
||||
# fall back to callback identity (ACP / CLI interactive callbacks), then the
|
||||
# current thread. This prevents one interactive session from reusing another
|
||||
# session's cached sudo password inside the same long-lived process.
|
||||
_sudo_password_cache: dict[str, str] = {}
|
||||
_sudo_password_cache_lock = threading.Lock()
|
||||
|
||||
# Optional UI callbacks for interactive prompts. When set, these are called
|
||||
# instead of the default /dev/tty or input() readers. The CLI registers these
|
||||
@@ -190,6 +196,54 @@ def set_approval_callback(cb):
|
||||
"""
|
||||
_callback_tls.approval = cb
|
||||
|
||||
|
||||
def _get_sudo_password_cache_scope() -> str:
|
||||
"""Return the cache scope for interactive sudo passwords."""
|
||||
try:
|
||||
from gateway.session_context import get_session_env
|
||||
|
||||
session_key = get_session_env("HERMES_SESSION_KEY", "")
|
||||
except Exception:
|
||||
session_key = os.getenv("HERMES_SESSION_KEY", "")
|
||||
if session_key:
|
||||
return f"session:{session_key}"
|
||||
|
||||
callback = _get_sudo_password_callback()
|
||||
if callback is not None:
|
||||
owner = getattr(callback, "__self__", None)
|
||||
func = getattr(callback, "__func__", None)
|
||||
if owner is not None and func is not None:
|
||||
return f"callback-owner:{id(owner)}:{id(func)}"
|
||||
return f"callback:{id(callback)}"
|
||||
|
||||
return f"thread:{threading.get_ident()}"
|
||||
|
||||
|
||||
def _get_cached_sudo_password() -> str:
|
||||
"""Return the cached sudo password for the current scope."""
|
||||
scope = _get_sudo_password_cache_scope()
|
||||
with _sudo_password_cache_lock:
|
||||
return _sudo_password_cache.get(scope, "")
|
||||
|
||||
|
||||
def _set_cached_sudo_password(password: str) -> None:
|
||||
"""Persist a sudo password for the current scope."""
|
||||
scope = _get_sudo_password_cache_scope()
|
||||
with _sudo_password_cache_lock:
|
||||
if password:
|
||||
_sudo_password_cache[scope] = password
|
||||
else:
|
||||
_sudo_password_cache.pop(scope, None)
|
||||
|
||||
|
||||
def _reset_cached_sudo_passwords() -> None:
|
||||
"""Clear all cached sudo passwords.
|
||||
|
||||
Internal helper for tests and process teardown paths.
|
||||
"""
|
||||
with _sudo_password_cache_lock:
|
||||
_sudo_password_cache.clear()
|
||||
|
||||
# =============================================================================
|
||||
# Dangerous Command Approval System
|
||||
# =============================================================================
|
||||
@@ -700,8 +754,6 @@ def _transform_sudo_command(command: str | None) -> tuple[str | None, str | None
|
||||
If SUDO_PASSWORD is not set and NOT interactive:
|
||||
Command runs as-is (fails gracefully with "sudo: a password is required").
|
||||
"""
|
||||
global _cached_sudo_password
|
||||
|
||||
if command is None:
|
||||
return None, None
|
||||
transformed, has_real_sudo = _rewrite_real_sudo_invocations(command)
|
||||
@@ -709,12 +761,16 @@ def _transform_sudo_command(command: str | None) -> tuple[str | None, str | None
|
||||
return command, None
|
||||
|
||||
has_configured_password = "SUDO_PASSWORD" in os.environ
|
||||
sudo_password = os.environ.get("SUDO_PASSWORD", "") if has_configured_password else _cached_sudo_password
|
||||
sudo_password = (
|
||||
os.environ.get("SUDO_PASSWORD", "")
|
||||
if has_configured_password
|
||||
else _get_cached_sudo_password()
|
||||
)
|
||||
|
||||
if not has_configured_password and not sudo_password and os.getenv("HERMES_INTERACTIVE"):
|
||||
sudo_password = _prompt_for_sudo_password(timeout_seconds=45)
|
||||
if sudo_password:
|
||||
_cached_sudo_password = sudo_password
|
||||
_set_cached_sudo_password(sudo_password)
|
||||
|
||||
if has_configured_password or sudo_password:
|
||||
# Trailing newline is required: sudo -S reads one line for the password.
|
||||
|
||||
Reference in New Issue
Block a user