refactor(run_agent): extract stream diagnostics to agent/stream_diag.py

Move the five stream-drop diagnostic helpers + the headers tuple:

* STREAM_DIAG_HEADERS — cf-ray, x-openrouter-provider, x-request-id, etc.
* stream_diag_init — fresh per-attempt diagnostic dict
* stream_diag_capture_response — snapshot upstream headers + HTTP status
* flatten_exception_chain — compact Outer(msg) <- Inner(msg) rendering
* log_stream_retry — structured WARNING with provider/bytes/elapsed/ttfb
* emit_stream_drop — user-facing status line + activity touch

AIAgent keeps thin forwarder methods (and exposes the headers tuple as
_STREAM_DIAG_HEADERS for back-compat).  All test patches and call sites
unchanged.

tests/run_agent/ + tests/agent/: 4313 passed (same pre-existing
test_auxiliary_client failure).

run_agent.py: 13470 -> 13227 lines (-243).
This commit is contained in:
teknium1
2026-05-16 18:28:17 -07:00
parent 79559214a6
commit 57f6762ca0
2 changed files with 303 additions and 211 deletions

View File

@@ -2110,99 +2110,28 @@ class AIAgent:
except Exception:
logger.debug("status_callback error in _emit_warning", exc_info=True)
# Headers we capture from the dying stream's HTTP response so post-mortem
# diagnosis can answer "which CF edge / which OpenRouter downstream
# provider / which request id". Lowercased; httpx returns CIMultiDict.
_STREAM_DIAG_HEADERS = (
"cf-ray",
"cf-cache-status",
"x-openrouter-provider",
"x-openrouter-model",
"x-openrouter-id",
"x-request-id",
"x-vercel-id",
"via",
"server",
"x-forwarded-for",
)
# Stream-diagnostic class header preserved for backward compat —
# actual list lives in ``agent.stream_diag.STREAM_DIAG_HEADERS``.
from agent.stream_diag import STREAM_DIAG_HEADERS as _STREAM_DIAG_HEADERS # noqa: E402
@staticmethod
def _stream_diag_init() -> Dict[str, Any]:
"""Return a fresh per-attempt diagnostic dict.
Mutated in-place by the streaming functions and read from the retry
block when a stream dies. Lives on ``request_client_holder`` so it
survives across the closure boundary.
"""
return {
"started_at": time.time(),
"first_chunk_at": None,
"chunks": 0,
"bytes": 0,
"headers": {},
"http_status": None,
}
"""Forwarder — see ``agent.stream_diag.stream_diag_init``."""
from agent.stream_diag import stream_diag_init
return stream_diag_init()
def _stream_diag_capture_response(
self, diag: Dict[str, Any], http_response: Any
) -> None:
"""Snapshot interesting headers + HTTP status from the live stream.
Called once at stream open (before iterating chunks) so the metadata
survives even if the stream dies before any chunk arrives. Failures
are swallowed — diag is best-effort.
"""
if http_response is None or not isinstance(diag, dict):
return
try:
diag["http_status"] = getattr(http_response, "status_code", None)
except Exception:
pass
try:
headers = getattr(http_response, "headers", None) or {}
captured: Dict[str, str] = {}
for name in self._STREAM_DIAG_HEADERS:
try:
val = headers.get(name)
if val:
# Truncate single-value to keep log lines bounded.
captured[name] = str(val)[:120]
except Exception:
continue
diag["headers"] = captured
except Exception:
pass
"""Forwarder — see ``agent.stream_diag.stream_diag_capture_response``."""
from agent.stream_diag import stream_diag_capture_response
stream_diag_capture_response(self, diag, http_response)
@staticmethod
def _flatten_exception_chain(error: BaseException) -> str:
"""Return a compact ``Outer(msg) <- Inner(msg) <- ...`` rendering.
OpenAI SDK wraps httpx errors as ``APIConnectionError`` /
``APIError`` and only the wrapper's class is visible at the catch
site — but the underlying ``RemoteProtocolError`` /
``ConnectError`` / ``ReadError`` is what tells us WHY the stream
died. Walks ``__cause__`` then ``__context__`` (deduped, max 4
deep) to surface the chain in one line.
"""
seen: List[BaseException] = []
link: Optional[BaseException] = error
while link is not None and len(seen) < 4:
if link in seen:
break
seen.append(link)
nxt = getattr(link, "__cause__", None) or getattr(
link, "__context__", None
)
if nxt is None or nxt is link:
break
link = nxt
parts: List[str] = []
for e in seen:
msg = str(e).strip().replace("\n", " ")
if len(msg) > 140:
msg = msg[:140] + ""
parts.append(f"{type(e).__name__}({msg})" if msg else type(e).__name__)
return " <- ".join(parts) if parts else type(error).__name__
"""Forwarder — see ``agent.stream_diag.flatten_exception_chain``."""
from agent.stream_diag import flatten_exception_chain
return flatten_exception_chain(error)
def _log_stream_retry(
self,
@@ -2214,88 +2143,12 @@ class AIAgent:
mid_tool_call: bool,
diag: Optional[Dict[str, Any]] = None,
) -> None:
"""Record a transient stream-drop and retry to ``agent.log``.
Always logs a structured WARNING so users have a breadcrumb regardless
of UI verbosity. Subagents in particular benefit because their
retries no longer spam the parent's terminal — but the file log keeps
full detail (provider, error class, attempt, base_url, subagent_id).
When *diag* is provided (the per-attempt stream-diagnostic dict from
``_stream_diag_init``), the WARNING also captures upstream headers
(cf-ray, x-openrouter-provider, x-openrouter-id), HTTP status, bytes
streamed before the drop, and elapsed time on the dying attempt.
These are the breadcrumbs needed to answer "is one CF edge / one
downstream provider responsible, or is it random across runs?"
"""
try:
try:
_summary = self._summarize_api_error(error)
except Exception:
_summary = str(error)
if _summary and len(_summary) > 240:
_summary = _summary[:240] + ""
# Inner-cause chain (httpx errors hide under openai.APIError).
try:
_chain = self._flatten_exception_chain(error)
except Exception:
_chain = type(error).__name__
# Per-attempt counters and upstream headers.
_now = time.time()
_bytes = 0
_chunks = 0
_elapsed = 0.0
_ttfb = None
_headers_repr = "-"
_http_status = "-"
if isinstance(diag, dict):
try:
_bytes = int(diag.get("bytes") or 0)
_chunks = int(diag.get("chunks") or 0)
_started = float(diag.get("started_at") or _now)
_elapsed = max(0.0, _now - _started)
_first = diag.get("first_chunk_at")
if _first is not None:
_ttfb = max(0.0, float(_first) - _started)
headers = diag.get("headers") or {}
if isinstance(headers, dict) and headers:
_headers_repr = " ".join(
f"{k}={v}" for k, v in headers.items()
)
if diag.get("http_status") is not None:
_http_status = str(diag.get("http_status"))
except Exception:
pass
logger.warning(
"Stream %s on attempt %s/%s — retrying. "
"subagent_id=%s depth=%s provider=%s base_url=%s "
"error_type=%s error=%s "
"chain=%s "
"http_status=%s bytes=%d chunks=%d elapsed=%.2fs ttfb=%s "
"upstream=[%s]",
kind,
attempt,
max_attempts,
getattr(self, "_subagent_id", None) or "-",
getattr(self, "_delegate_depth", 0),
self.provider or "-",
self.base_url or "-",
type(error).__name__,
_summary,
_chain,
_http_status,
_bytes,
_chunks,
_elapsed,
f"{_ttfb:.2f}s" if _ttfb is not None else "-",
_headers_repr,
extra={"mid_tool_call": mid_tool_call},
)
except Exception:
logger.debug("stream-retry log emit failed", exc_info=True)
"""Forwarder — see ``agent.stream_diag.log_stream_retry``."""
from agent.stream_diag import log_stream_retry
log_stream_retry(
self, kind=kind, error=error, attempt=attempt,
max_attempts=max_attempts, mid_tool_call=mid_tool_call, diag=diag,
)
def _emit_stream_drop(
self,
@@ -2306,53 +2159,12 @@ class AIAgent:
mid_tool_call: bool,
diag: Optional[Dict[str, Any]] = None,
) -> None:
"""Emit a single user-visible line for a stream drop+retry.
Both top-level agents and subagents announce drops in the UI — the
parent prefixes subagent lines with ``[subagent-N]`` via ``log_prefix``
so they're easy to attribute. All cases also write a structured
WARNING to ``agent.log`` via :meth:`_log_stream_retry` with the full
diagnostic detail (subagent_id, provider, base_url, error_type,
cf-ray, x-openrouter-provider, bytes/chunks, elapsed) for post-hoc
analysis.
The user-visible status line is intentionally compact: provider,
error class, attempt N/M, plus ``after Xs`` when the stream dropped
mid-flight. Full diagnostic detail goes to ``agent.log`` only —
``hermes logs --level WARNING | grep "Stream drop"`` to inspect.
"""
kind = "drop mid tool-call" if mid_tool_call else "drop"
self._log_stream_retry(
kind=kind,
error=error,
attempt=attempt,
max_attempts=max_attempts,
mid_tool_call=mid_tool_call,
diag=diag,
"""Forwarder — see ``agent.stream_diag.emit_stream_drop``."""
from agent.stream_diag import emit_stream_drop
emit_stream_drop(
self, error=error, attempt=attempt, max_attempts=max_attempts,
mid_tool_call=mid_tool_call, diag=diag,
)
provider = self.provider or "provider"
# Compose a brief "after Xs" suffix when we have timing data — helps
# the user distinguish "couldn't connect" (0s) from "died after 30s
# of streaming" (likely upstream idle-kill or proxy timeout).
_suffix = ""
if isinstance(diag, dict):
try:
started = diag.get("started_at")
if started is not None:
_suffix = f" after {max(0.0, time.time() - float(started)):.1f}s"
except Exception:
pass
try:
self._emit_status(
f"⚠️ {provider} stream {kind} ({type(error).__name__}){_suffix} "
f"— reconnecting, retry {attempt}/{max_attempts}"
)
self._touch_activity(
f"stream retry {attempt}/{max_attempts} "
f"after {type(error).__name__}"
)
except Exception:
pass
def _emit_auxiliary_failure(self, task: str, exc: BaseException) -> None:
"""Surface a compact warning for failed auxiliary work."""