refactor(run_agent): extract OpenAI proxy, safe stdio, IterationBudget
Three small extractions into focused modules:
* agent/process_bootstrap.py — \_OpenAIProxy (lazy openai.OpenAI import),
\_SafeWriter (broken-pipe-resistant stdio wrapper), \_install_safe_stdio,
\_get_proxy_from_env, \_get_proxy_for_base_url. All process / IO bootstrap.
* agent/iteration_budget.py — IterationBudget class (thread-safe consume/
refund counter shared by parent agent and subagents).
run_agent re-exports every name so existing test patches like
patch('run_agent.OpenAI', ...) and 'from run_agent import IterationBudget'
keep working unchanged. Verified the patch-rebinding contract for OpenAI
explicitly.
tests/run_agent/ + tests/agent/test_gemini_fast_fallback.py:
1347 passed, 3 skipped.
run_agent.py: 15427 -> 15261 lines (-166).
This commit is contained in:
62
agent/iteration_budget.py
Normal file
62
agent/iteration_budget.py
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
"""Per-agent iteration budget — thread-safe consume/refund counter.
|
||||||
|
|
||||||
|
Extracted from ``run_agent.py``. Each ``AIAgent`` instance (parent or
|
||||||
|
subagent) holds an :class:`IterationBudget`; the parent's cap comes from
|
||||||
|
``max_iterations`` (default 90), each subagent's cap comes from
|
||||||
|
``delegation.max_iterations`` (default 50).
|
||||||
|
|
||||||
|
``run_agent`` re-exports ``IterationBudget`` so existing
|
||||||
|
``from run_agent import IterationBudget`` imports keep working unchanged.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import threading
|
||||||
|
|
||||||
|
|
||||||
|
class IterationBudget:
|
||||||
|
"""Thread-safe iteration counter for an agent.
|
||||||
|
|
||||||
|
Each agent (parent or subagent) gets its own ``IterationBudget``.
|
||||||
|
The parent's budget is capped at ``max_iterations`` (default 90).
|
||||||
|
Each subagent gets an independent budget capped at
|
||||||
|
``delegation.max_iterations`` (default 50) — this means total
|
||||||
|
iterations across parent + subagents can exceed the parent's cap.
|
||||||
|
Users control the per-subagent limit via ``delegation.max_iterations``
|
||||||
|
in config.yaml.
|
||||||
|
|
||||||
|
``execute_code`` (programmatic tool calling) iterations are refunded via
|
||||||
|
:meth:`refund` so they don't eat into the budget.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, max_total: int):
|
||||||
|
self.max_total = max_total
|
||||||
|
self._used = 0
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
|
||||||
|
def consume(self) -> bool:
|
||||||
|
"""Try to consume one iteration. Returns True if allowed."""
|
||||||
|
with self._lock:
|
||||||
|
if self._used >= self.max_total:
|
||||||
|
return False
|
||||||
|
self._used += 1
|
||||||
|
return True
|
||||||
|
|
||||||
|
def refund(self) -> None:
|
||||||
|
"""Give back one iteration (e.g. for execute_code turns)."""
|
||||||
|
with self._lock:
|
||||||
|
if self._used > 0:
|
||||||
|
self._used -= 1
|
||||||
|
|
||||||
|
@property
|
||||||
|
def used(self) -> int:
|
||||||
|
with self._lock:
|
||||||
|
return self._used
|
||||||
|
|
||||||
|
@property
|
||||||
|
def remaining(self) -> int:
|
||||||
|
with self._lock:
|
||||||
|
return max(0, self.max_total - self._used)
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = ["IterationBudget"]
|
||||||
167
agent/process_bootstrap.py
Normal file
167
agent/process_bootstrap.py
Normal file
@@ -0,0 +1,167 @@
|
|||||||
|
"""Process-level bootstrap helpers for ``run_agent``.
|
||||||
|
|
||||||
|
Three concerns, all tied to ``AIAgent`` boot-time / runtime IO setup:
|
||||||
|
|
||||||
|
1. **Lazy OpenAI SDK import** — ``_load_openai_cls`` + ``_OpenAIProxy``
|
||||||
|
defer the 240ms-ish ``from openai import OpenAI`` cost until first use,
|
||||||
|
while preserving ``isinstance(client, OpenAI)`` checks and
|
||||||
|
``patch("run_agent.OpenAI", ...)`` test patterns.
|
||||||
|
|
||||||
|
2. **Crash-resistant stdio** — ``_SafeWriter`` wraps stdout/stderr so
|
||||||
|
``OSError: Input/output error`` from broken pipes (systemd, Docker,
|
||||||
|
thread teardown races) cannot crash the agent. ``_install_safe_stdio``
|
||||||
|
applies the wrapper.
|
||||||
|
|
||||||
|
3. **HTTP proxy resolution** — ``_get_proxy_from_env`` reads
|
||||||
|
``HTTPS_PROXY`` / ``HTTP_PROXY`` / ``ALL_PROXY``;
|
||||||
|
``_get_proxy_for_base_url`` respects ``NO_PROXY`` for the given base URL.
|
||||||
|
|
||||||
|
``run_agent`` re-exports every name so existing
|
||||||
|
``from run_agent import _get_proxy_from_env`` imports keep working
|
||||||
|
unchanged.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import urllib.request
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from utils import base_url_hostname, normalize_proxy_url
|
||||||
|
|
||||||
|
|
||||||
|
# Cached at module level so we only pay the OpenAI SDK import cost once
|
||||||
|
# per process (after the first lazy load).
|
||||||
|
_OPENAI_CLS_CACHE = None
|
||||||
|
|
||||||
|
|
||||||
|
def _load_openai_cls() -> type:
|
||||||
|
"""Import and cache ``openai.OpenAI``."""
|
||||||
|
global _OPENAI_CLS_CACHE
|
||||||
|
if _OPENAI_CLS_CACHE is None:
|
||||||
|
from openai import OpenAI as _cls
|
||||||
|
_OPENAI_CLS_CACHE = _cls
|
||||||
|
return _OPENAI_CLS_CACHE
|
||||||
|
|
||||||
|
|
||||||
|
class _OpenAIProxy:
|
||||||
|
"""Module-level proxy that looks like ``openai.OpenAI`` but imports lazily."""
|
||||||
|
|
||||||
|
__slots__ = ()
|
||||||
|
|
||||||
|
def __call__(self, *args, **kwargs):
|
||||||
|
return _load_openai_cls()(*args, **kwargs)
|
||||||
|
|
||||||
|
def __instancecheck__(self, obj):
|
||||||
|
return isinstance(obj, _load_openai_cls())
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return "<lazy openai.OpenAI proxy>"
|
||||||
|
|
||||||
|
|
||||||
|
class _SafeWriter:
|
||||||
|
"""Transparent stdio wrapper that catches OSError/ValueError from broken pipes.
|
||||||
|
|
||||||
|
When hermes-agent runs as a systemd service, Docker container, or headless
|
||||||
|
daemon, the stdout/stderr pipe can become unavailable (idle timeout, buffer
|
||||||
|
exhaustion, socket reset). Any print() call then raises
|
||||||
|
``OSError: [Errno 5] Input/output error``, which can crash agent setup or
|
||||||
|
run_conversation() — especially via double-fault when an except handler
|
||||||
|
also tries to print.
|
||||||
|
|
||||||
|
Additionally, when subagents run in ThreadPoolExecutor threads, the shared
|
||||||
|
stdout handle can close between thread teardown and cleanup, raising
|
||||||
|
``ValueError: I/O operation on closed file`` instead of OSError.
|
||||||
|
|
||||||
|
This wrapper delegates all writes to the underlying stream and silently
|
||||||
|
catches both OSError and ValueError. It is transparent when the wrapped
|
||||||
|
stream is healthy.
|
||||||
|
"""
|
||||||
|
|
||||||
|
__slots__ = ("_inner",)
|
||||||
|
|
||||||
|
def __init__(self, inner):
|
||||||
|
object.__setattr__(self, "_inner", inner)
|
||||||
|
|
||||||
|
def write(self, data):
|
||||||
|
try:
|
||||||
|
return self._inner.write(data)
|
||||||
|
except (OSError, ValueError):
|
||||||
|
return len(data) if isinstance(data, str) else 0
|
||||||
|
|
||||||
|
def flush(self):
|
||||||
|
try:
|
||||||
|
self._inner.flush()
|
||||||
|
except (OSError, ValueError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def fileno(self):
|
||||||
|
return self._inner.fileno()
|
||||||
|
|
||||||
|
def isatty(self):
|
||||||
|
try:
|
||||||
|
return self._inner.isatty()
|
||||||
|
except (OSError, ValueError):
|
||||||
|
return False
|
||||||
|
|
||||||
|
def __getattr__(self, name):
|
||||||
|
return getattr(self._inner, name)
|
||||||
|
|
||||||
|
|
||||||
|
def _get_proxy_from_env() -> Optional[str]:
|
||||||
|
"""Read proxy URL from environment variables.
|
||||||
|
|
||||||
|
Checks HTTPS_PROXY, HTTP_PROXY, ALL_PROXY (and lowercase variants) in order.
|
||||||
|
Returns the first valid proxy URL found, or None if no proxy is configured.
|
||||||
|
"""
|
||||||
|
for key in ("HTTPS_PROXY", "HTTP_PROXY", "ALL_PROXY",
|
||||||
|
"https_proxy", "http_proxy", "all_proxy"):
|
||||||
|
value = os.environ.get(key, "").strip()
|
||||||
|
if value:
|
||||||
|
return normalize_proxy_url(value)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _get_proxy_for_base_url(base_url: Optional[str]) -> Optional[str]:
|
||||||
|
"""Return an env-configured proxy unless NO_PROXY excludes this base URL."""
|
||||||
|
proxy = _get_proxy_from_env()
|
||||||
|
if not proxy or not base_url:
|
||||||
|
return proxy
|
||||||
|
|
||||||
|
host = base_url_hostname(base_url)
|
||||||
|
if not host:
|
||||||
|
return proxy
|
||||||
|
|
||||||
|
try:
|
||||||
|
if urllib.request.proxy_bypass_environment(host):
|
||||||
|
return None
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return proxy
|
||||||
|
|
||||||
|
|
||||||
|
def _install_safe_stdio() -> None:
|
||||||
|
"""Wrap stdout/stderr so best-effort console output cannot crash the agent."""
|
||||||
|
for stream_name in ("stdout", "stderr"):
|
||||||
|
stream = getattr(sys, stream_name, None)
|
||||||
|
if stream is not None and not isinstance(stream, _SafeWriter):
|
||||||
|
setattr(sys, stream_name, _SafeWriter(stream))
|
||||||
|
|
||||||
|
|
||||||
|
# Module-level proxy instance — drops in for ``openai.OpenAI``. Imported as
|
||||||
|
# ``from agent.process_bootstrap import OpenAI`` (or re-exported via
|
||||||
|
# ``run_agent`` for legacy tests).
|
||||||
|
OpenAI = _OpenAIProxy()
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"OpenAI",
|
||||||
|
"_OpenAIProxy",
|
||||||
|
"_load_openai_cls",
|
||||||
|
"_SafeWriter",
|
||||||
|
"_install_safe_stdio",
|
||||||
|
"_get_proxy_from_env",
|
||||||
|
"_get_proxy_for_base_url",
|
||||||
|
]
|
||||||
179
run_agent.py
179
run_agent.py
@@ -70,38 +70,20 @@ from pathlib import Path
|
|||||||
|
|
||||||
from hermes_constants import get_hermes_home
|
from hermes_constants import get_hermes_home
|
||||||
|
|
||||||
|
# OpenAI lazy proxy + safe stdio + proxy URL helpers — see agent/process_bootstrap.py.
|
||||||
_OPENAI_CLS_CACHE: Optional[type] = None
|
# `OpenAI` is re-exported here so `patch("run_agent.OpenAI", ...)` in tests works.
|
||||||
|
from agent.process_bootstrap import (
|
||||||
|
OpenAI,
|
||||||
|
_OpenAIProxy,
|
||||||
|
_load_openai_cls,
|
||||||
|
_SafeWriter,
|
||||||
|
_install_safe_stdio,
|
||||||
|
_get_proxy_from_env,
|
||||||
|
_get_proxy_for_base_url,
|
||||||
|
)
|
||||||
|
from agent.iteration_budget import IterationBudget
|
||||||
|
|
||||||
|
|
||||||
def _load_openai_cls() -> type:
|
|
||||||
"""Import and cache ``openai.OpenAI``."""
|
|
||||||
global _OPENAI_CLS_CACHE
|
|
||||||
if _OPENAI_CLS_CACHE is None:
|
|
||||||
from openai import OpenAI as _cls
|
|
||||||
_OPENAI_CLS_CACHE = _cls
|
|
||||||
return _OPENAI_CLS_CACHE
|
|
||||||
|
|
||||||
|
|
||||||
class _OpenAIProxy:
|
|
||||||
"""Module-level proxy that looks like ``openai.OpenAI`` but imports lazily."""
|
|
||||||
|
|
||||||
__slots__ = ()
|
|
||||||
|
|
||||||
def __call__(self, *args, **kwargs):
|
|
||||||
return _load_openai_cls()(*args, **kwargs)
|
|
||||||
|
|
||||||
def __instancecheck__(self, obj):
|
|
||||||
return isinstance(obj, _load_openai_cls())
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return "<lazy openai.OpenAI proxy>"
|
|
||||||
|
|
||||||
|
|
||||||
OpenAI = _OpenAIProxy()
|
|
||||||
|
|
||||||
# Load .env from ~/.hermes/.env first, then project root as dev fallback.
|
|
||||||
# User-managed env files should override stale shell exports on restart.
|
|
||||||
from hermes_cli.env_loader import load_hermes_dotenv
|
from hermes_cli.env_loader import load_hermes_dotenv
|
||||||
from hermes_cli.timeouts import (
|
from hermes_cli.timeouts import (
|
||||||
get_provider_request_timeout,
|
get_provider_request_timeout,
|
||||||
@@ -224,143 +206,6 @@ from hermes_cli.config import cfg_get
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
class _SafeWriter:
|
|
||||||
"""Transparent stdio wrapper that catches OSError/ValueError from broken pipes.
|
|
||||||
|
|
||||||
When hermes-agent runs as a systemd service, Docker container, or headless
|
|
||||||
daemon, the stdout/stderr pipe can become unavailable (idle timeout, buffer
|
|
||||||
exhaustion, socket reset). Any print() call then raises
|
|
||||||
``OSError: [Errno 5] Input/output error``, which can crash agent setup or
|
|
||||||
run_conversation() — especially via double-fault when an except handler
|
|
||||||
also tries to print.
|
|
||||||
|
|
||||||
Additionally, when subagents run in ThreadPoolExecutor threads, the shared
|
|
||||||
stdout handle can close between thread teardown and cleanup, raising
|
|
||||||
``ValueError: I/O operation on closed file`` instead of OSError.
|
|
||||||
|
|
||||||
This wrapper delegates all writes to the underlying stream and silently
|
|
||||||
catches both OSError and ValueError. It is transparent when the wrapped
|
|
||||||
stream is healthy.
|
|
||||||
"""
|
|
||||||
|
|
||||||
__slots__ = ("_inner",)
|
|
||||||
|
|
||||||
def __init__(self, inner):
|
|
||||||
object.__setattr__(self, "_inner", inner)
|
|
||||||
|
|
||||||
def write(self, data):
|
|
||||||
try:
|
|
||||||
return self._inner.write(data)
|
|
||||||
except (OSError, ValueError):
|
|
||||||
return len(data) if isinstance(data, str) else 0
|
|
||||||
|
|
||||||
def flush(self):
|
|
||||||
try:
|
|
||||||
self._inner.flush()
|
|
||||||
except (OSError, ValueError):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def fileno(self):
|
|
||||||
return self._inner.fileno()
|
|
||||||
|
|
||||||
def isatty(self):
|
|
||||||
try:
|
|
||||||
return self._inner.isatty()
|
|
||||||
except (OSError, ValueError):
|
|
||||||
return False
|
|
||||||
|
|
||||||
def __getattr__(self, name):
|
|
||||||
return getattr(self._inner, name)
|
|
||||||
|
|
||||||
|
|
||||||
def _get_proxy_from_env() -> Optional[str]:
|
|
||||||
"""Read proxy URL from environment variables.
|
|
||||||
|
|
||||||
Checks HTTPS_PROXY, HTTP_PROXY, ALL_PROXY (and lowercase variants) in order.
|
|
||||||
Returns the first valid proxy URL found, or None if no proxy is configured.
|
|
||||||
"""
|
|
||||||
for key in ("HTTPS_PROXY", "HTTP_PROXY", "ALL_PROXY",
|
|
||||||
"https_proxy", "http_proxy", "all_proxy"):
|
|
||||||
value = os.environ.get(key, "").strip()
|
|
||||||
if value:
|
|
||||||
return normalize_proxy_url(value)
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def _get_proxy_for_base_url(base_url: Optional[str]) -> Optional[str]:
|
|
||||||
"""Return an env-configured proxy unless NO_PROXY excludes this base URL."""
|
|
||||||
proxy = _get_proxy_from_env()
|
|
||||||
if not proxy or not base_url:
|
|
||||||
return proxy
|
|
||||||
|
|
||||||
host = base_url_hostname(base_url)
|
|
||||||
if not host:
|
|
||||||
return proxy
|
|
||||||
|
|
||||||
try:
|
|
||||||
if urllib.request.proxy_bypass_environment(host):
|
|
||||||
return None
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
return proxy
|
|
||||||
|
|
||||||
|
|
||||||
def _install_safe_stdio() -> None:
|
|
||||||
"""Wrap stdout/stderr so best-effort console output cannot crash the agent."""
|
|
||||||
for stream_name in ("stdout", "stderr"):
|
|
||||||
stream = getattr(sys, stream_name, None)
|
|
||||||
if stream is not None and not isinstance(stream, _SafeWriter):
|
|
||||||
setattr(sys, stream_name, _SafeWriter(stream))
|
|
||||||
|
|
||||||
|
|
||||||
class IterationBudget:
|
|
||||||
"""Thread-safe iteration counter for an agent.
|
|
||||||
|
|
||||||
Each agent (parent or subagent) gets its own ``IterationBudget``.
|
|
||||||
The parent's budget is capped at ``max_iterations`` (default 90).
|
|
||||||
Each subagent gets an independent budget capped at
|
|
||||||
``delegation.max_iterations`` (default 50) — this means total
|
|
||||||
iterations across parent + subagents can exceed the parent's cap.
|
|
||||||
Users control the per-subagent limit via ``delegation.max_iterations``
|
|
||||||
in config.yaml.
|
|
||||||
|
|
||||||
``execute_code`` (programmatic tool calling) iterations are refunded via
|
|
||||||
:meth:`refund` so they don't eat into the budget.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, max_total: int):
|
|
||||||
self.max_total = max_total
|
|
||||||
self._used = 0
|
|
||||||
self._lock = threading.Lock()
|
|
||||||
|
|
||||||
def consume(self) -> bool:
|
|
||||||
"""Try to consume one iteration. Returns True if allowed."""
|
|
||||||
with self._lock:
|
|
||||||
if self._used >= self.max_total:
|
|
||||||
return False
|
|
||||||
self._used += 1
|
|
||||||
return True
|
|
||||||
|
|
||||||
def refund(self) -> None:
|
|
||||||
"""Give back one iteration (e.g. for execute_code turns)."""
|
|
||||||
with self._lock:
|
|
||||||
if self._used > 0:
|
|
||||||
self._used -= 1
|
|
||||||
|
|
||||||
@property
|
|
||||||
def used(self) -> int:
|
|
||||||
with self._lock:
|
|
||||||
return self._used
|
|
||||||
|
|
||||||
@property
|
|
||||||
def remaining(self) -> int:
|
|
||||||
with self._lock:
|
|
||||||
return max(0, self.max_total - self._used)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# Maximum number of concurrent worker threads for parallel tool execution.
|
|
||||||
_MAX_TOOL_WORKERS = 8
|
_MAX_TOOL_WORKERS = 8
|
||||||
|
|
||||||
# Guard so the OpenRouter metadata pre-warm thread is only spawned once per
|
# Guard so the OpenRouter metadata pre-warm thread is only spawned once per
|
||||||
|
|||||||
Reference in New Issue
Block a user