From 970042deab633f3f10242d7624568f6c4061e294 Mon Sep 17 00:00:00 2001 From: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com> Date: Thu, 2 Apr 2026 22:52:52 +0530 Subject: [PATCH] fix(gateway): prevent stuck sessions with agent timeout and staleness eviction Three changes to prevent sessions from getting permanently locked: 1. Agent execution timeout (HERMES_AGENT_TIMEOUT, default 10min): Wraps run_in_executor with asyncio.wait_for so a hung API call or runaway tool can't lock a session indefinitely. On timeout, the agent is interrupted and the user gets an actionable error message. 2. Staleness eviction for _running_agents: Tracks start timestamps for each session entry. When a new message arrives and the entry is older than timeout + 1min grace, it's evicted as a leaked lock. Safety net for any cleanup path that fails to remove the entry. 3. Cron job timeout (HERMES_CRON_TIMEOUT, default 10min): Wraps run_conversation in a ThreadPoolExecutor with timeout so a hung cron job doesn't block the ticker thread (and all subsequent cron jobs) indefinitely. Follows grammY runner's per-update timeout pattern and aiogram's asyncio.wait_for approach for handler deadlines. --- cron/scheduler.py | 24 +++++++++++++++++++-- gateway/run.py | 55 +++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 75 insertions(+), 4 deletions(-) diff --git a/cron/scheduler.py b/cron/scheduler.py index 6b65eff25..906953c0a 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -443,8 +443,28 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]: session_db=_session_db, ) - result = agent.run_conversation(prompt) - + # Run the agent with a timeout so a hung API call or tool doesn't + # block the cron ticker thread indefinitely. Default 10 minutes; + # override via env var. Uses a separate thread because + # run_conversation is synchronous. + _cron_timeout = float(os.getenv("HERMES_CRON_TIMEOUT", 600)) + import concurrent.futures + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as _cron_pool: + _cron_future = _cron_pool.submit(agent.run_conversation, prompt) + try: + result = _cron_future.result(timeout=_cron_timeout) + except concurrent.futures.TimeoutError: + logger.error( + "Job '%s' timed out after %.0fs — interrupting agent", + job_name, _cron_timeout, + ) + if hasattr(agent, "interrupt"): + agent.interrupt("Cron job timed out") + raise TimeoutError( + f"Cron job '{job_name}' timed out after " + f"{int(_cron_timeout // 60)} minutes" + ) + final_response = result.get("final_response", "") or "" # Use a separate variable for log display; keep final_response clean # for delivery logic (empty response = no delivery). diff --git a/gateway/run.py b/gateway/run.py index 7c711d39e..593d00583 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -468,6 +468,7 @@ class GatewayRunner: # Track running agents per session for interrupt support # Key: session_key, Value: AIAgent instance self._running_agents: Dict[str, Any] = {} + self._running_agents_ts: Dict[str, float] = {} # start timestamp per session self._pending_messages: Dict[str, str] = {} # Queued messages during interrupt # Cache AIAgent instances per session to preserve prompt caching. @@ -1720,6 +1721,21 @@ class GatewayRunner: # simultaneous updates. Do NOT interrupt for photo-only follow-ups here; # let the adapter-level batching/queueing logic absorb them. _quick_key = self._session_key_for_source(source) + + # Staleness eviction: if an entry has been in _running_agents for + # longer than the agent timeout, it's a leaked lock from a hung or + # crashed handler. Evict it so the session isn't permanently stuck. + _STALE_TTL = float(os.getenv("HERMES_AGENT_TIMEOUT", 600)) + 60 # timeout + 1 min grace + _ts_dict = getattr(self, "_running_agents_ts", {}) + _stale_ts = _ts_dict.get(_quick_key, 0) + if _quick_key in self._running_agents and _stale_ts and (time.time() - _stale_ts) > _STALE_TTL: + logger.warning( + "Evicting stale _running_agents entry for %s (age: %.0fs)", + _quick_key[:30], time.time() - _stale_ts, + ) + del self._running_agents[_quick_key] + _ts_dict.pop(_quick_key, None) + if _quick_key in self._running_agents: if event.get_command() == "status": return await self._handle_status_command(event) @@ -2045,6 +2061,8 @@ class GatewayRunner: # "already running" guard and spin up a duplicate agent for the # same session — corrupting the transcript. self._running_agents[_quick_key] = _AGENT_PENDING_SENTINEL + if hasattr(self, "_running_agents_ts"): + self._running_agents_ts[_quick_key] = time.time() try: return await self._handle_message_with_agent(event, source, _quick_key) @@ -2055,6 +2073,8 @@ class GatewayRunner: # not linger or the session would be permanently locked out. if self._running_agents.get(_quick_key) is _AGENT_PENDING_SENTINEL: del self._running_agents[_quick_key] + if hasattr(self, "_running_agents_ts"): + self._running_agents_ts.pop(_quick_key, None) async def _handle_message_with_agent(self, event, source, _quick_key: str): """Inner handler that runs under the _running_agents sentinel guard.""" @@ -5985,9 +6005,38 @@ class GatewayRunner: interrupt_monitor = asyncio.create_task(monitor_for_interrupt()) try: - # Run in thread pool to not block + # Run in thread pool to not block. Cap total execution time + # so a hung API call or runaway tool doesn't permanently lock + # the session. Default 10 minutes; override with env var. + _agent_timeout = float(os.getenv("HERMES_AGENT_TIMEOUT", 600)) loop = asyncio.get_event_loop() - response = await loop.run_in_executor(None, run_sync) + try: + response = await asyncio.wait_for( + loop.run_in_executor(None, run_sync), + timeout=_agent_timeout, + ) + except asyncio.TimeoutError: + logger.error( + "Agent execution timed out after %.0fs for session %s", + _agent_timeout, session_key, + ) + # Interrupt the agent if it's still running so the thread + # pool worker is freed. + _timed_out_agent = agent_holder[0] + if _timed_out_agent and hasattr(_timed_out_agent, "interrupt"): + _timed_out_agent.interrupt("Execution timed out") + response = { + "final_response": ( + f"⏱️ Request timed out after {int(_agent_timeout // 60)} minutes. " + "The agent may have been stuck on a tool or API call.\n" + "Try again, or use /reset to start fresh." + ), + "messages": result_holder[0].get("messages", []) if result_holder[0] else [], + "api_calls": 0, + "tools": tools_holder[0] or [], + "history_offset": 0, + "failed": True, + } # Track fallback model state: if the agent switched to a # fallback model during this run, persist it so /model shows @@ -6110,6 +6159,8 @@ class GatewayRunner: tracking_task.cancel() if session_key and session_key in self._running_agents: del self._running_agents[session_key] + if session_key and hasattr(self, "_running_agents_ts"): + self._running_agents_ts.pop(session_key, None) # Wait for cancelled tasks for task in [progress_task, interrupt_monitor, tracking_task]: