diff --git a/cron/scheduler.py b/cron/scheduler.py index 12dae811f..21ec8dbde 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -822,6 +822,8 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]: logger.info("Running job '%s' (ID: %s)", job_name, job_id) logger.info("Prompt: %s", prompt[:100]) + agent = None + # Mark this as a cron session so the approval system can apply cron_mode. # This env var is process-wide and persists for the lifetime of the # scheduler process — every job this process runs is a cron job. @@ -1170,6 +1172,24 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]: _session_db.close() except (Exception, KeyboardInterrupt) as e: logger.debug("Job '%s': failed to close SQLite session store: %s", job_id, e) + # Release subprocesses, terminal sandboxes, browser daemons, and the + # main OpenAI/httpx client held by this ephemeral cron agent. Without + # this, a gateway that ticks cron every N minutes leaks fds per job + # until it hits EMFILE (#10200 / "too many open files"). + try: + if agent is not None: + agent.close() + except (Exception, KeyboardInterrupt) as e: + logger.debug("Job '%s': failed to close agent resources: %s", job_id, e) + # Each cron run spins up a short-lived worker thread whose event loop + # dies as soon as the ``ThreadPoolExecutor`` shuts down. Any async + # httpx clients cached under that loop are now unusable — reap them + # so their transports don't accumulate in the process-global cache. + try: + from agent.auxiliary_client import cleanup_stale_async_clients + cleanup_stale_async_clients() + except Exception as e: + logger.debug("Job '%s': failed to reap stale auxiliary clients: %s", job_id, e) def tick(verbose: bool = True, adapters=None, loop=None) -> int: diff --git a/gateway/run.py b/gateway/run.py index 8bdd6c416..b50bbc585 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -1968,6 +1968,15 @@ class GatewayRunner: agent.close() except Exception: pass + # Auxiliary async clients (session_search/web/vision/etc.) live in a + # process-global cache and are created inside worker threads. Clean up + # any entries whose event loop is now dead so their httpx transports do + # not accumulate across gateway turns. + try: + from agent.auxiliary_client import cleanup_stale_async_clients + cleanup_stale_async_clients() + except Exception: + pass _STUCK_LOOP_THRESHOLD = 3 # restarts while active before auto-suspend _STUCK_LOOP_FILE = ".restart_failure_counts" @@ -2931,6 +2940,19 @@ class GatewayRunner: # disconnect (defense in depth; safe to call repeatedly). _kill_tool_subprocesses("final-cleanup") + # Reap the process-global auxiliary-client cache once at the very + # end of teardown. Per-turn cleanup runs in _cleanup_agent_resources + # for each active agent, but clients bound to worker-thread loops + # that died with their ThreadPoolExecutor (notably cron ticks) only + # get swept here. Without this, long-running gateways accumulate + # async httpx transports until they hit EMFILE on macOS's default + # RLIMIT_NOFILE=256. See #14210. + try: + from agent.auxiliary_client import shutdown_cached_clients + shutdown_cached_clients() + except Exception as _e: + logger.debug("shutdown_cached_clients error: %s", _e) + # Close SQLite session DBs so the WAL write lock is released. # Without this, --replace and similar restart flows leave the # old gateway's connection holding the WAL lock until Python diff --git a/scripts/release.py b/scripts/release.py index f85e51ad1..c1dac5bb2 100755 --- a/scripts/release.py +++ b/scripts/release.py @@ -63,6 +63,7 @@ AUTHOR_MAP = { "yoimexex@gmail.com": "Yoimex", "6548898+romanornr@users.noreply.github.com": "romanornr", "foxion37@gmail.com": "foxion37", + "bloodcarter@gmail.com": "bloodcarter", # contributors (from noreply pattern) "david.vv@icloud.com": "davidvv", "wangqiang@wangqiangdeMac-mini.local": "xiaoqiang243", diff --git a/tests/cron/test_scheduler.py b/tests/cron/test_scheduler.py index 4cd4b7cd7..25f707efd 100644 --- a/tests/cron/test_scheduler.py +++ b/tests/cron/test_scheduler.py @@ -672,6 +672,79 @@ class TestRunJobSessionPersistence: assert call_args[0][0].startswith("cron_test-job_") assert call_args[0][1] == "cron_complete" fake_db.close.assert_called_once() + mock_agent.close.assert_called_once() + + def test_run_job_closes_agent_on_failure_to_prevent_fd_leak(self, tmp_path): + # Regression: if ``run_conversation`` raises, the ephemeral cron + # agent was previously leaked — over days of ticks this accumulated + # httpx transports and hit EMFILE / "too many open files". + job = { + "id": "failing-job", + "name": "failing", + "prompt": "hello", + } + fake_db = MagicMock() + + with patch("cron.scheduler._hermes_home", tmp_path), \ + patch("cron.scheduler._resolve_origin", return_value=None), \ + patch("dotenv.load_dotenv"), \ + patch("hermes_state.SessionDB", return_value=fake_db), \ + patch( + "hermes_cli.runtime_provider.resolve_runtime_provider", + return_value={ + "api_key": "***", + "base_url": "https://example.invalid/v1", + "provider": "openrouter", + "api_mode": "chat_completions", + }, + ), \ + patch("run_agent.AIAgent") as mock_agent_cls: + mock_agent = MagicMock() + mock_agent.run_conversation.side_effect = RuntimeError("boom") + mock_agent_cls.return_value = mock_agent + + success, output, final_response, error = run_job(job) + + assert success is False + assert final_response == "" + assert "RuntimeError: boom" in error + mock_agent.close.assert_called_once() + + def test_run_job_reaps_stale_auxiliary_clients_per_tick(self, tmp_path): + # Regression: auxiliary clients bound to the cron worker's dead + # event loop must be reaped each tick. Without this, ``_client_cache`` + # holds onto transports whose underlying sockets can no longer be + # closed (their loop is gone), leaking one fd batch per cron run. + job = { + "id": "aux-clean-job", + "name": "aux-clean", + "prompt": "hello", + } + fake_db = MagicMock() + + with patch("cron.scheduler._hermes_home", tmp_path), \ + patch("cron.scheduler._resolve_origin", return_value=None), \ + patch("dotenv.load_dotenv"), \ + patch("hermes_state.SessionDB", return_value=fake_db), \ + patch( + "hermes_cli.runtime_provider.resolve_runtime_provider", + return_value={ + "api_key": "***", + "base_url": "https://example.invalid/v1", + "provider": "openrouter", + "api_mode": "chat_completions", + }, + ), \ + patch("run_agent.AIAgent") as mock_agent_cls, \ + patch("agent.auxiliary_client.cleanup_stale_async_clients") as cleanup_mock: + mock_agent = MagicMock() + mock_agent.run_conversation.return_value = {"final_response": "ok"} + mock_agent_cls.return_value = mock_agent + + success, _output, _final_response, _error = run_job(job) + + assert success is True + cleanup_mock.assert_called_once() def _make_run_job_patches(self, tmp_path): """Common patches for run_job tests.""" diff --git a/tests/gateway/test_gateway_shutdown.py b/tests/gateway/test_gateway_shutdown.py index 137ddfd03..d12fac14b 100644 --- a/tests/gateway/test_gateway_shutdown.py +++ b/tests/gateway/test_gateway_shutdown.py @@ -35,6 +35,18 @@ async def test_cancel_background_tasks_cancels_inflight_message_processing(): assert adapter._pending_messages == {} +def test_cleanup_agent_resources_reaps_stale_aux_clients(): + runner, _adapter = make_restart_runner() + agent = MagicMock() + + with patch("agent.auxiliary_client.cleanup_stale_async_clients") as cleanup_mock: + runner._cleanup_agent_resources(agent) + + agent.shutdown_memory_provider.assert_called_once() + agent.close.assert_called_once() + cleanup_mock.assert_called_once() + + @pytest.mark.asyncio async def test_gateway_stop_interrupts_running_agents_and_cancels_adapter_tasks(): runner, adapter = make_restart_runner() @@ -60,11 +72,16 @@ async def test_gateway_stop_interrupts_running_agents_and_cancels_adapter_tasks( running_agent = MagicMock() runner._running_agents = {session_key: running_agent} - with patch("gateway.status.remove_pid_file"), patch("gateway.status.write_runtime_status"): + with ( + patch("gateway.status.remove_pid_file"), + patch("gateway.status.write_runtime_status"), + patch("agent.auxiliary_client.shutdown_cached_clients") as shutdown_cached_clients, + ): await runner.stop() running_agent.interrupt.assert_called_once_with("Gateway shutting down") disconnect_mock.assert_awaited_once() + shutdown_cached_clients.assert_called_once() assert runner.adapters == {} assert runner._running_agents == {} assert runner._pending_messages == {} diff --git a/tests/gateway/test_status.py b/tests/gateway/test_status.py index e91bb6e41..e56b2107e 100644 --- a/tests/gateway/test_status.py +++ b/tests/gateway/test_status.py @@ -51,6 +51,29 @@ class TestGatewayPidState: assert status.get_running_pid() is None assert not pid_path.exists() + def test_get_running_pid_cleans_stale_record_from_dead_process(self, tmp_path, monkeypatch): + # Simulates the aftermath of a crash: the PID file still points at a + # process that no longer exists. The next gateway startup must be + # able to unlink it so ``write_pid_file``'s O_EXCL create succeeds — + # otherwise systemd's restart loop hits "PID file race lost" forever. + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + pid_path = tmp_path / "gateway.pid" + dead_pid = 999999 # not our pid, and below we simulate it's dead + pid_path.write_text(json.dumps({ + "pid": dead_pid, + "kind": "hermes-gateway", + "argv": ["python", "-m", "hermes_cli.main", "gateway", "run"], + "start_time": 111, + })) + + def _dead_process(pid, sig): + raise ProcessLookupError + + monkeypatch.setattr(status.os, "kill", _dead_process) + + assert status.get_running_pid() is None + assert not pid_path.exists() + def test_get_running_pid_accepts_gateway_metadata_when_cmdline_unavailable(self, tmp_path, monkeypatch): monkeypatch.setenv("HERMES_HOME", str(tmp_path)) pid_path = tmp_path / "gateway.pid"