From fa8c448f7dbd3f18639b65f51d21114069ca0537 Mon Sep 17 00:00:00 2001 From: Teknium Date: Tue, 14 Apr 2026 12:44:46 -0700 Subject: [PATCH] fix: notify active sessions on gateway shutdown + update health check MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three fixes for gateway lifecycle stability: 1. Notify active sessions before shutdown (#new) When the gateway receives SIGTERM or /restart, it now sends a notification to every chat with an active agent BEFORE starting the drain. Users see: - Shutdown: 'Gateway shutting down — your task will be interrupted.' - Restart: 'Gateway restarting — use /retry after restart to continue.' Deduplicates per-chat so group sessions with multiple users get one notification. Best-effort: send failures are logged and swallowed. 2. Skip .clean_shutdown marker when drain timed out Previously, a graceful SIGTERM always wrote .clean_shutdown, even if agents were force-interrupted when the drain timed out. This meant the next startup skipped session suspension, leaving interrupted sessions in a broken state (trailing tool response, no final message). Now the marker is only written if the drain completed without timeout, so interrupted sessions get properly suspended on next startup. 3. Post-restart health check for hermes update (#6631) cmd_update() now verifies the gateway actually survived after systemctl restart (sleep 3s + is-active check). If the service crashed immediately, it retries once. If still dead, prints actionable diagnostics (journalctl command, manual restart hint). Also closes #8104 — already fixed on main (the /restart handler correctly detects systemd via INVOCATION_ID and uses via_service=True). Test plan: - 6 new tests for shutdown notifications (dedup, restart vs shutdown messaging, sentinel filtering, send failure resilience) - Existing restart drain + update tests pass (47 total) --- gateway/run.py | 86 +++++++++++++++++++++++++-- hermes_cli/main.py | 35 ++++++++++- tests/gateway/restart_test_helpers.py | 6 ++ tests/gateway/test_restart_drain.py | 81 +++++++++++++++++++++++++ 4 files changed, 201 insertions(+), 7 deletions(-) diff --git a/gateway/run.py b/gateway/run.py index 222e28c3e..0cdfb7146 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -1391,6 +1391,65 @@ class GatewayRunner: except Exception as e: logger.debug("Failed interrupting agent during shutdown: %s", e) + async def _notify_active_sessions_of_shutdown(self) -> None: + """Send a notification to every chat with an active agent. + + Called at the very start of stop() — adapters are still connected so + messages can be delivered. Best-effort: individual send failures are + logged and swallowed so they never block the shutdown sequence. + """ + active = self._snapshot_running_agents() + if not active: + return + + action = "restarting" if self._restart_requested else "shutting down" + hint = ( + "Your current task will be interrupted. " + "Use /retry after restart to continue." + if self._restart_requested + else "Your current task will be interrupted." + ) + msg = f"⚠️ Gateway {action} — {hint}" + + notified: set = set() + for session_key in active: + # Parse platform + chat_id from the session key. + # Format: agent:main:{platform}:{chat_type}:{chat_id}[:{extra}...] + parts = session_key.split(":") + if len(parts) < 5: + continue + platform_str = parts[2] + chat_id = parts[4] + + # Deduplicate: one notification per chat, even if multiple + # sessions (different users/threads) share the same chat. + dedup_key = (platform_str, chat_id) + if dedup_key in notified: + continue + + try: + platform = Platform(platform_str) + adapter = self.adapters.get(platform) + if not adapter: + continue + + # Include thread_id if present so the message lands in the + # correct forum topic / thread. + thread_id = parts[5] if len(parts) > 5 else None + metadata = {"thread_id": thread_id} if thread_id else None + + await adapter.send(chat_id, msg, metadata=metadata) + notified.add(dedup_key) + logger.info( + "Sent shutdown notification to %s:%s", + platform_str, chat_id, + ) + except Exception as e: + logger.debug( + "Failed to send shutdown notification to %s:%s: %s", + platform_str, chat_id, e, + ) + def _finalize_shutdown_agents(self, active_agents: Dict[str, Any]) -> None: for agent in active_agents.values(): try: @@ -2018,6 +2077,10 @@ class GatewayRunner: self._running = False self._draining = True + # Notify all chats with active agents BEFORE draining. + # Adapters are still connected here, so messages can be sent. + await self._notify_active_sessions_of_shutdown() + timeout = self._restart_drain_timeout active_agents, timed_out = await self._drain_active_agents(timeout) if timed_out: @@ -2088,12 +2151,23 @@ class GatewayRunner: # Write a clean-shutdown marker so the next startup knows this # wasn't a crash. suspend_recently_active() only needs to run - # after unexpected exits — graceful shutdowns already drain - # active agents, so there's no stuck-session risk. - try: - (_hermes_home / ".clean_shutdown").touch() - except Exception: - pass + # after unexpected exits. However, if the drain timed out and + # agents were force-interrupted, their sessions may be in an + # incomplete state (trailing tool response, no final assistant + # message). Skip the marker in that case so the next startup + # suspends those sessions — giving users a clean slate instead + # of resuming a half-finished tool loop. + if not timed_out: + try: + (_hermes_home / ".clean_shutdown").touch() + except Exception: + pass + else: + logger.info( + "Skipping .clean_shutdown marker — drain timed out with " + "interrupted agents; next startup will suspend recently " + "active sessions." + ) if self._restart_requested and self._restart_via_service: self._exit_code = GATEWAY_SERVICE_RESTART_EXIT_CODE diff --git a/hermes_cli/main.py b/hermes_cli/main.py index 721e68143..c73344be4 100644 --- a/hermes_cli/main.py +++ b/hermes_cli/main.py @@ -4036,7 +4036,40 @@ def cmd_update(args): capture_output=True, text=True, timeout=15, ) if restart.returncode == 0: - restarted_services.append(svc_name) + # Verify the service actually survived the + # restart. systemctl restart returns 0 even + # if the new process crashes immediately. + import time as _time + _time.sleep(3) + verify = subprocess.run( + scope_cmd + ["is-active", svc_name], + capture_output=True, text=True, timeout=5, + ) + if verify.stdout.strip() == "active": + restarted_services.append(svc_name) + else: + # Retry once — transient startup failures + # (stale module cache, import race) often + # resolve on the second attempt. + print(f" ⚠ {svc_name} died after restart, retrying...") + retry = subprocess.run( + scope_cmd + ["restart", svc_name], + capture_output=True, text=True, timeout=15, + ) + _time.sleep(3) + verify2 = subprocess.run( + scope_cmd + ["is-active", svc_name], + capture_output=True, text=True, timeout=5, + ) + if verify2.stdout.strip() == "active": + restarted_services.append(svc_name) + print(f" ✓ {svc_name} recovered on retry") + else: + print( + f" ✗ {svc_name} failed to stay running after restart.\n" + f" Check logs: journalctl --user -u {svc_name} --since '2 min ago'\n" + f" Restart manually: systemctl {'--user ' if scope == 'user' else ''}restart {svc_name}" + ) else: print(f" ⚠ Failed to restart {svc_name}: {restart.stderr.strip()}") except (FileNotFoundError, subprocess.TimeoutExpired): diff --git a/tests/gateway/restart_test_helpers.py b/tests/gateway/restart_test_helpers.py index 8b4897467..75665325b 100644 --- a/tests/gateway/restart_test_helpers.py +++ b/tests/gateway/restart_test_helpers.py @@ -93,6 +93,12 @@ def make_restart_runner( runner._running_agent_count = GatewayRunner._running_agent_count.__get__( runner, GatewayRunner ) + runner._snapshot_running_agents = GatewayRunner._snapshot_running_agents.__get__( + runner, GatewayRunner + ) + runner._notify_active_sessions_of_shutdown = ( + GatewayRunner._notify_active_sessions_of_shutdown.__get__(runner, GatewayRunner) + ) runner._launch_detached_restart_command = GatewayRunner._launch_detached_restart_command.__get__( runner, GatewayRunner ) diff --git a/tests/gateway/test_restart_drain.py b/tests/gateway/test_restart_drain.py index cfc2c364c..732470c12 100644 --- a/tests/gateway/test_restart_drain.py +++ b/tests/gateway/test_restart_drain.py @@ -161,3 +161,84 @@ async def test_launch_detached_restart_command_uses_setsid(monkeypatch): assert kwargs["start_new_session"] is True assert kwargs["stdout"] is subprocess.DEVNULL assert kwargs["stderr"] is subprocess.DEVNULL + + +# ── Shutdown notification tests ────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_shutdown_notification_sent_to_active_sessions(): + """Active sessions receive a notification when the gateway starts shutting down.""" + runner, adapter = make_restart_runner() + source = make_restart_source(chat_id="999", chat_type="dm") + session_key = f"agent:main:telegram:dm:999" + runner._running_agents[session_key] = MagicMock() + + await runner._notify_active_sessions_of_shutdown() + + assert len(adapter.sent) == 1 + assert "shutting down" in adapter.sent[0] + assert "interrupted" in adapter.sent[0] + + +@pytest.mark.asyncio +async def test_shutdown_notification_says_restarting_when_restart_requested(): + """When _restart_requested is True, the message says 'restarting' and mentions /retry.""" + runner, adapter = make_restart_runner() + runner._restart_requested = True + session_key = "agent:main:telegram:dm:999" + runner._running_agents[session_key] = MagicMock() + + await runner._notify_active_sessions_of_shutdown() + + assert len(adapter.sent) == 1 + assert "restarting" in adapter.sent[0] + assert "/retry" in adapter.sent[0] + + +@pytest.mark.asyncio +async def test_shutdown_notification_deduplicates_per_chat(): + """Multiple sessions in the same chat only get one notification.""" + runner, adapter = make_restart_runner() + # Two sessions (different users) in the same chat + runner._running_agents["agent:main:telegram:group:chat1:u1"] = MagicMock() + runner._running_agents["agent:main:telegram:group:chat1:u2"] = MagicMock() + + await runner._notify_active_sessions_of_shutdown() + + assert len(adapter.sent) == 1 + + +@pytest.mark.asyncio +async def test_shutdown_notification_skipped_when_no_active_agents(): + """No notification is sent when there are no active agents.""" + runner, adapter = make_restart_runner() + + await runner._notify_active_sessions_of_shutdown() + + assert len(adapter.sent) == 0 + + +@pytest.mark.asyncio +async def test_shutdown_notification_ignores_pending_sentinels(): + """Pending sentinels (not-yet-started agents) don't trigger notifications.""" + from gateway.run import _AGENT_PENDING_SENTINEL + + runner, adapter = make_restart_runner() + runner._running_agents["agent:main:telegram:dm:999"] = _AGENT_PENDING_SENTINEL + + await runner._notify_active_sessions_of_shutdown() + + assert len(adapter.sent) == 0 + + +@pytest.mark.asyncio +async def test_shutdown_notification_send_failure_does_not_block(): + """If sending a notification fails, the method still completes.""" + runner, adapter = make_restart_runner() + adapter.send = AsyncMock(side_effect=Exception("network error")) + session_key = "agent:main:telegram:dm:999" + runner._running_agents[session_key] = MagicMock() + + # Should not raise + await runner._notify_active_sessions_of_shutdown()