From af9a9f773ce78c0083e896ed7582915430718bbe Mon Sep 17 00:00:00 2001 From: "Mariano A. Nicolini" Date: Mon, 6 Apr 2026 13:16:25 -0700 Subject: [PATCH] fix(security): sanitize workdir parameter in terminal tool backends Shell injection via unquoted workdir interpolation in docker, singularity, and SSH backends. When workdir contained shell metacharacters (e.g. ~/;id), arbitrary commands could execute. Changes: - Add shlex.quote() at each interpolation point in docker.py, singularity.py, and ssh.py with tilde-aware quoting (keep ~ unquoted for shell expansion, quote only the subpath) - Add _validate_workdir() allowlist in terminal_tool.py as defense-in-depth before workdir reaches any backend Original work by Mariano A. Nicolini (PR #5620). Salvaged with fixes for tilde expansion (shlex.quote breaks cd ~/path) and replaced incomplete deny-list with strict character allowlist. Co-authored-by: Mariano A. Nicolini --- tools/environments/docker.py | 11 ++++++--- tools/environments/singularity.py | 11 ++++++--- tools/environments/ssh.py | 9 ++++++- tools/terminal_tool.py | 41 +++++++++++++++++++++++++++++++ 4 files changed, 65 insertions(+), 7 deletions(-) diff --git a/tools/environments/docker.py b/tools/environments/docker.py index ea553a7b6..1d2d325cb 100644 --- a/tools/environments/docker.py +++ b/tools/environments/docker.py @@ -8,6 +8,7 @@ persistence via bind mounts. import logging import os import re +import shlex import shutil import subprocess import sys @@ -484,9 +485,13 @@ class DockerEnvironment(BaseEnvironment): else: effective_stdin = stdin_data - # docker exec -w doesn't expand ~, so prepend a cd into the command - if work_dir == "~" or work_dir.startswith("~/"): - exec_command = f"cd {work_dir} && {exec_command}" + # docker exec -w doesn't expand ~, so prepend a cd into the command. + # Keep ~ unquoted (for shell expansion) and quote only the subpath. + if work_dir == "~": + exec_command = f"cd ~ && {exec_command}" + work_dir = "/" + elif work_dir.startswith("~/"): + exec_command = f"cd ~/{shlex.quote(work_dir[2:])} && {exec_command}" work_dir = "/" assert self._container_id, "Container not started" diff --git a/tools/environments/singularity.py b/tools/environments/singularity.py index 89d9ffb04..6643ea1b3 100644 --- a/tools/environments/singularity.py +++ b/tools/environments/singularity.py @@ -8,6 +8,7 @@ via writable overlay directories that survive across sessions. import json import logging import os +import shlex import shutil import subprocess import tempfile @@ -311,9 +312,13 @@ class SingularityEnvironment(BaseEnvironment): else: effective_stdin = stdin_data - # apptainer exec --pwd doesn't expand ~, so prepend a cd into the command - if work_dir == "~" or work_dir.startswith("~/"): - exec_command = f"cd {work_dir} && {exec_command}" + # apptainer exec --pwd doesn't expand ~, so prepend a cd into the command. + # Keep ~ unquoted (for shell expansion) and quote only the subpath. + if work_dir == "~": + exec_command = f"cd ~ && {exec_command}" + work_dir = "/tmp" + elif work_dir.startswith("~/"): + exec_command = f"cd ~/{shlex.quote(work_dir[2:])} && {exec_command}" work_dir = "/tmp" cmd = [self.executable, "exec", "--pwd", work_dir, diff --git a/tools/environments/ssh.py b/tools/environments/ssh.py index 387dea34e..afd28c4af 100644 --- a/tools/environments/ssh.py +++ b/tools/environments/ssh.py @@ -1,6 +1,7 @@ """SSH remote execution environment with ControlMaster connection persistence.""" import logging +import shlex import shutil import subprocess import tempfile @@ -228,7 +229,13 @@ class SSHEnvironment(PersistentShellMixin, BaseEnvironment): stdin_data: str | None = None) -> dict: work_dir = cwd or self.cwd exec_command, sudo_stdin = self._prepare_command(command) - wrapped = f'cd {work_dir} && {exec_command}' + # Keep ~ unquoted (for shell expansion) and quote only the subpath. + if work_dir == "~": + wrapped = f'cd ~ && {exec_command}' + elif work_dir.startswith("~/"): + wrapped = f'cd ~/{shlex.quote(work_dir[2:])} && {exec_command}' + else: + wrapped = f'cd {shlex.quote(work_dir)} && {exec_command}' effective_timeout = timeout or self.timeout if sudo_stdin is not None and stdin_data is not None: diff --git a/tools/terminal_tool.py b/tools/terminal_tool.py index 26591ceed..be565f196 100644 --- a/tools/terminal_tool.py +++ b/tools/terminal_tool.py @@ -154,6 +154,34 @@ def _check_all_guards(command: str, env_type: str) -> dict: approval_callback=_approval_callback) +# Allowlist: characters that can legitimately appear in directory paths. +# Covers alphanumeric, path separators, tilde, dot, hyphen, underscore, space, +# plus, at, equals, and comma. Everything else is rejected. +_WORKDIR_SAFE_RE = re.compile(r'^[A-Za-z0-9/_\-.~ +@=,]+$') + + +def _validate_workdir(workdir: str) -> str | None: + """Reject workdir values that don't look like a filesystem path. + + Uses an allowlist of safe characters rather than a deny-list, so novel + shell metacharacters can't slip through. + + Returns None if safe, or an error message string if dangerous. + """ + if not workdir: + return None + if not _WORKDIR_SAFE_RE.match(workdir): + # Find the first offending character for a helpful message. + for ch in workdir: + if not _WORKDIR_SAFE_RE.match(ch): + return ( + f"Blocked: workdir contains disallowed character {repr(ch)}. " + "Use a simple filesystem path without shell metacharacters." + ) + return "Blocked: workdir contains disallowed characters." + return None + + def _handle_sudo_failure(output: str, env_type: str) -> str: """ Check for sudo failure and add helpful message for messaging contexts. @@ -1166,6 +1194,19 @@ def terminal_tool( desc = approval.get("description", "flagged as dangerous") approval_note = f"Command was flagged ({desc}) and auto-approved by smart approval." + # Validate workdir against shell injection + if workdir: + workdir_error = _validate_workdir(workdir) + if workdir_error: + logger.warning("Blocked dangerous workdir: %s (command: %s)", + workdir[:200], command[:200]) + return json.dumps({ + "output": "", + "exit_code": -1, + "error": workdir_error, + "status": "blocked" + }, ensure_ascii=False) + # Prepare command for execution if background: # Spawn a tracked background process via the process registry.