diff --git a/agent/auxiliary_client.py b/agent/auxiliary_client.py index 4d2331548..479776428 100644 --- a/agent/auxiliary_client.py +++ b/agent/auxiliary_client.py @@ -1835,9 +1835,15 @@ def auxiliary_max_tokens_param(value: int) -> dict: # Every auxiliary LLM consumer should use these instead of manually # constructing clients and calling .chat.completions.create(). -# Client cache: (provider, async_mode, base_url, api_key) -> (client, default_model) +# Client cache: (provider, async_mode, base_url, api_key, api_mode, runtime_key) -> (client, default_model, loop) +# NOTE: loop identity is NOT part of the key. On async cache hits we check +# whether the cached loop is the *current* loop; if not, the stale entry is +# replaced in-place. This bounds cache growth to one entry per unique +# provider config rather than one per (config × event-loop), which previously +# caused unbounded fd accumulation in long-running gateway processes (#10200). _client_cache: Dict[tuple, tuple] = {} _client_cache_lock = threading.Lock() +_CLIENT_CACHE_MAX_SIZE = 64 # safety belt — evict oldest when exceeded def neuter_async_httpx_del() -> None: @@ -1970,39 +1976,49 @@ def _get_cached_client( Async clients (AsyncOpenAI) use httpx.AsyncClient internally, which binds to the event loop that was current when the client was created. Using such a client on a *different* loop causes deadlocks or - RuntimeError. To prevent cross-loop issues (especially in gateway - mode where _run_async() may spawn fresh loops in worker threads), the - cache key for async clients includes the current event loop's identity - so each loop gets its own client instance. + RuntimeError. To prevent cross-loop issues, the cache validates on + every async hit that the cached loop is the *current, open* loop. + If the loop changed (e.g. a new gateway worker-thread loop), the stale + entry is replaced in-place rather than creating an additional entry. + + This keeps cache size bounded to one entry per unique provider config, + preventing the fd-exhaustion that previously occurred in long-running + gateways where recycled worker threads created unbounded entries (#10200). """ - # Include loop identity for async clients to prevent cross-loop reuse. - # httpx.AsyncClient (inside AsyncOpenAI) is bound to the loop where it - # was created — reusing it on a different loop causes deadlocks (#2681). - loop_id = 0 + # Resolve the current event loop for async clients so we can validate + # cached entries. Loop identity is NOT in the cache key — instead we + # check at hit time whether the cached loop is still current and open. + # This prevents unbounded cache growth from recycled worker-thread loops + # while still guaranteeing we never reuse a client on the wrong loop + # (which causes deadlocks, see #2681). current_loop = None if async_mode: try: import asyncio as _aio current_loop = _aio.get_event_loop() - loop_id = id(current_loop) except RuntimeError: pass runtime = _normalize_main_runtime(main_runtime) runtime_key = tuple(runtime.get(field, "") for field in _MAIN_RUNTIME_FIELDS) if provider == "auto" else () - cache_key = (provider, async_mode, base_url or "", api_key or "", api_mode or "", loop_id, runtime_key) + cache_key = (provider, async_mode, base_url or "", api_key or "", api_mode or "", runtime_key) with _client_cache_lock: if cache_key in _client_cache: cached_client, cached_default, cached_loop = _client_cache[cache_key] if async_mode: - # A cached async client whose loop has been closed will raise - # "Event loop is closed" when httpx tries to clean up its - # transport. Discard the stale client and create a fresh one. - if cached_loop is not None and cached_loop.is_closed(): - _force_close_async_httpx(cached_client) - del _client_cache[cache_key] - else: + # Validate: the cached client must be bound to the CURRENT, + # OPEN loop. If the loop changed or was closed, the httpx + # transport inside is dead — force-close and replace. + loop_ok = ( + cached_loop is not None + and cached_loop is current_loop + and not cached_loop.is_closed() + ) + if loop_ok: effective = _compat_model(cached_client, model, cached_default) return cached_client, effective + # Stale — evict and fall through to create a new client. + _force_close_async_httpx(cached_client) + del _client_cache[cache_key] else: effective = _compat_model(cached_client, model, cached_default) return cached_client, effective @@ -2022,6 +2038,12 @@ def _get_cached_client( bound_loop = current_loop with _client_cache_lock: if cache_key not in _client_cache: + # Safety belt: if the cache has grown beyond the max, evict + # the oldest entries (FIFO — dict preserves insertion order). + while len(_client_cache) >= _CLIENT_CACHE_MAX_SIZE: + evict_key, evict_entry = next(iter(_client_cache.items())) + _force_close_async_httpx(evict_entry[0]) + del _client_cache[evict_key] _client_cache[cache_key] = (client, default_model, bound_loop) else: client, default_model, _ = _client_cache[cache_key] diff --git a/tests/run_agent/test_async_httpx_del_neuter.py b/tests/run_agent/test_async_httpx_del_neuter.py index ce8e20e70..960df7084 100644 --- a/tests/run_agent/test_async_httpx_del_neuter.py +++ b/tests/run_agent/test_async_httpx_del_neuter.py @@ -103,7 +103,7 @@ class TestCleanupStaleAsyncClients: mock_client._client = MagicMock() mock_client._client.is_closed = False - key = ("test_stale", True, "", "", id(loop)) + key = ("test_stale", True, "", "", "", ()) with _client_cache_lock: _client_cache[key] = (mock_client, "test-model", loop) @@ -127,7 +127,7 @@ class TestCleanupStaleAsyncClients: loop = asyncio.new_event_loop() # NOT closed mock_client = MagicMock() - key = ("test_live", True, "", "", id(loop)) + key = ("test_live", True, "", "", "", ()) with _client_cache_lock: _client_cache[key] = (mock_client, "test-model", loop) @@ -149,7 +149,7 @@ class TestCleanupStaleAsyncClients: ) mock_client = MagicMock() - key = ("test_sync", False, "", "", 0) + key = ("test_sync", False, "", "", "", ()) with _client_cache_lock: _client_cache[key] = (mock_client, "test-model", None) @@ -160,3 +160,131 @@ class TestCleanupStaleAsyncClients: finally: with _client_cache_lock: _client_cache.pop(key, None) + + +# --------------------------------------------------------------------------- +# Cache bounded growth (#10200) +# --------------------------------------------------------------------------- + +class TestClientCacheBoundedGrowth: + """Verify the cache stays bounded when loops change (fix for #10200). + + Previously, loop_id was part of the cache key, so every new event loop + created a new entry for the same provider config. Now loop identity is + validated at hit time and stale entries are replaced in-place. + """ + + def test_same_key_replaces_stale_loop_entry(self): + """When the loop changes, the old entry should be replaced, not duplicated.""" + from agent.auxiliary_client import ( + _client_cache, + _client_cache_lock, + _get_cached_client, + ) + + key = ("test_replace", True, "", "", "", ()) + + # Simulate a stale entry from a closed loop + old_loop = asyncio.new_event_loop() + old_loop.close() + old_client = MagicMock() + old_client._client = MagicMock() + old_client._client.is_closed = False + + with _client_cache_lock: + _client_cache[key] = (old_client, "old-model", old_loop) + + try: + # Now call _get_cached_client — should detect stale loop and evict + with patch("agent.auxiliary_client.resolve_provider_client") as mock_resolve: + mock_resolve.return_value = (MagicMock(), "new-model") + client, model = _get_cached_client( + "test_replace", async_mode=True, + ) + # The old entry should have been replaced + with _client_cache_lock: + assert key in _client_cache, "Key should still exist (replaced)" + entry = _client_cache[key] + assert entry[1] == "new-model", "Should have the new model" + finally: + with _client_cache_lock: + _client_cache.pop(key, None) + + def test_different_loops_do_not_grow_cache(self): + """Multiple event loops for the same provider should NOT create multiple entries.""" + from agent.auxiliary_client import ( + _client_cache, + _client_cache_lock, + ) + + key = ("test_no_grow", True, "", "", "", ()) + + loops = [] + try: + for i in range(5): + loop = asyncio.new_event_loop() + loops.append(loop) + mock_client = MagicMock() + mock_client._client = MagicMock() + mock_client._client.is_closed = False + + # Close previous loop entries (simulating worker thread recycling) + if i > 0: + loops[i - 1].close() + + with _client_cache_lock: + # Simulate what _get_cached_client does: replace on loop mismatch + if key in _client_cache: + old_entry = _client_cache[key] + del _client_cache[key] + _client_cache[key] = (mock_client, f"model-{i}", loop) + + # Only one entry should exist for this key + with _client_cache_lock: + count = sum(1 for k in _client_cache if k == key) + assert count == 1, f"Expected 1 entry, got {count}" + finally: + for loop in loops: + if not loop.is_closed(): + loop.close() + with _client_cache_lock: + _client_cache.pop(key, None) + + def test_max_cache_size_eviction(self): + """Cache should not exceed _CLIENT_CACHE_MAX_SIZE.""" + from agent.auxiliary_client import ( + _client_cache, + _client_cache_lock, + _CLIENT_CACHE_MAX_SIZE, + ) + + # Save existing cache state + with _client_cache_lock: + saved = dict(_client_cache) + _client_cache.clear() + + try: + # Fill to max + 5 + for i in range(_CLIENT_CACHE_MAX_SIZE + 5): + mock_client = MagicMock() + mock_client._client = MagicMock() + mock_client._client.is_closed = False + key = (f"evict_test_{i}", False, "", "", "", ()) + with _client_cache_lock: + # Inline the eviction logic (same as _get_cached_client) + while len(_client_cache) >= _CLIENT_CACHE_MAX_SIZE: + evict_key = next(iter(_client_cache)) + del _client_cache[evict_key] + _client_cache[key] = (mock_client, f"model-{i}", None) + + with _client_cache_lock: + assert len(_client_cache) <= _CLIENT_CACHE_MAX_SIZE, \ + f"Cache size {len(_client_cache)} exceeds max {_CLIENT_CACHE_MAX_SIZE}" + # The earliest entries should have been evicted + assert ("evict_test_0", False, "", "", "", ()) not in _client_cache + # The latest entries should be present + assert (f"evict_test_{_CLIENT_CACHE_MAX_SIZE + 4}", False, "", "", "", ()) in _client_cache + finally: + with _client_cache_lock: + _client_cache.clear() + _client_cache.update(saved)