fix(gateway): pre-mark sessions as resume_pending before drain to prevent data loss (#27856)
Pre-mark all running agent sessions as resume_pending BEFORE the drain wait begins. If the service manager kills the process during the drain (window), the durable marker is already written so the next gateway boot can recover in-flight sessions. On graceful drain completion, clear the early markers for sessions that finished successfully.
This commit is contained in:
@@ -5574,6 +5574,24 @@ class GatewayRunner:
|
||||
)
|
||||
|
||||
timeout = self._restart_drain_timeout
|
||||
|
||||
# Pre-mark sessions as resume_pending BEFORE the drain wait.
|
||||
# If the process is killed by the service manager during the
|
||||
# drain, the durable marker is already written so the next
|
||||
# gateway boot can recover in-flight sessions (#27856).
|
||||
_pre_drain_keys: list[str] = []
|
||||
for _sk, _agent in list(self._running_agents.items()):
|
||||
if _agent is _AGENT_PENDING_SENTINEL:
|
||||
continue
|
||||
try:
|
||||
self.session_store.mark_resume_pending(
|
||||
_sk,
|
||||
"restart_timeout" if self._restart_requested else "shutdown_timeout",
|
||||
)
|
||||
_pre_drain_keys.append(_sk)
|
||||
except Exception as _e:
|
||||
logger.debug("pre-drain mark_resume_pending failed for %s: %s", _sk, _e)
|
||||
|
||||
_drain_started_at = time.monotonic()
|
||||
active_agents, timed_out = await self._drain_active_agents(timeout)
|
||||
logger.info(
|
||||
@@ -5585,6 +5603,21 @@ class GatewayRunner:
|
||||
len(active_agents),
|
||||
self._running_agent_count(),
|
||||
)
|
||||
|
||||
if not timed_out:
|
||||
# Drain completed gracefully — all running sessions finished.
|
||||
# Clear the pre-drain resume_pending markers so sessions that
|
||||
# completed during the drain window don't carry a stale flag.
|
||||
for _sk in _pre_drain_keys:
|
||||
if _sk not in self._running_agents:
|
||||
try:
|
||||
self.session_store.clear_resume_pending(_sk)
|
||||
except Exception as _e:
|
||||
logger.debug(
|
||||
"clear_resume_pending after drain failed for %s: %s",
|
||||
_sk, _e,
|
||||
)
|
||||
|
||||
if timed_out:
|
||||
logger.warning(
|
||||
"Gateway drain timed out after %.1fs with %d active agent(s); interrupting remaining work.",
|
||||
|
||||
Reference in New Issue
Block a user