refactor(memory): remove flush_memories entirely (#15696)

The AIAgent.flush_memories pre-compression save, the gateway
_flush_memories_for_session, and everything feeding them are
obsolete now that the background memory/skill review handles
persistent memory extraction.

Problems with flush_memories:

- Pre-dates the background review loop.  It was the only memory-save
  path when introduced; the background review now fires every 10 user
  turns on CLI and gateway alike, which is far more frequent than
  compression or session reset ever triggered flush.
- Blocking and synchronous.  Pre-compression flush ran on the live agent
  before compression, blocking the user-visible response.
- Cache-breaking.  Flush built a temporary conversation prefix
  (system prompt + memory-only tool list) that diverged from the live
  conversation's cached prefix, invalidating prompt caching.  The
  gateway variant spawned a fresh AIAgent with its own clean prompt
  for each finalized session — still cache-breaking, just in a
  different process.
- Redundant.  Background review runs in the live conversation's
  session context, gets the same content, writes to the same memory
  store, and doesn't break the cache.  Everything flush_memories
  claimed to preserve is already covered.

What this removes:

- AIAgent.flush_memories() method (~248 LOC in run_agent.py)
- Pre-compression flush call in _compress_context
- flush_memories call sites in cli.py (/new + exit)
- GatewayRunner._flush_memories_for_session + _async_flush_memories
  (and the 3 call sites: session expiry watcher, /new, /resume)
- 'flush_memories' entry from DEFAULT_CONFIG auxiliary tasks,
  hermes tools UI task list, auxiliary_client docstrings
- _memory_flush_min_turns config + init
- #15631's headroom-deduction math in
  _check_compression_model_feasibility (headroom was only needed
  because flush dragged the full main-agent system prompt along;
  the compression summariser sends a single user-role prompt so
  new_threshold = aux_context is safe again)
- The dedicated test files and assertions that exercised
  flush-specific paths

What this renames (with read-time backcompat on sessions.json):

- SessionEntry.memory_flushed -> SessionEntry.expiry_finalized.
  The session-expiry watcher still uses the flag to avoid re-running
  finalize/eviction on the same expired session; the new name
  reflects what it now actually gates.  from_dict() reads
  'expiry_finalized' first, falls back to the legacy 'memory_flushed'
  key so existing sessions.json files upgrade seamlessly.

Supersedes #15631 and #15638.

Tested: 383 targeted tests pass across run_agent/, agent/, cli/,
and gateway/ session-boundary suites.  No behavior regressions —
background memory review continues to handle persistent memory
extraction on both CLI and gateway.
This commit is contained in:
Teknium
2026-04-25 08:21:14 -07:00
committed by GitHub
parent d635e2df3f
commit ea01bdcebe
23 changed files with 78 additions and 1567 deletions

View File

@@ -386,7 +386,7 @@ class TestProvidersDictApiModeAnthropicMessages:
},
},
"auxiliary": {
"flush_memories": {
"compression": {
"provider": "myrelay",
"model": "claude-sonnet-4.6",
},
@@ -399,11 +399,11 @@ class TestProvidersDictApiModeAnthropicMessages:
AnthropicAuxiliaryClient,
AsyncAnthropicAuxiliaryClient,
)
async_client, async_model = get_async_text_auxiliary_client("flush_memories")
async_client, async_model = get_async_text_auxiliary_client("compression")
assert isinstance(async_client, AsyncAnthropicAuxiliaryClient)
assert async_model == "claude-sonnet-4.6"
sync_client, sync_model = get_text_auxiliary_client("flush_memories")
sync_client, sync_model = get_text_auxiliary_client("compression")
assert isinstance(sync_client, AnthropicAuxiliaryClient)
assert sync_model == "claude-sonnet-4.6"

View File

@@ -1,7 +1,7 @@
"""Regression tests for the universal "unsupported temperature" retry in
``agent.auxiliary_client``.
Auxiliary callers (``flush_memories``, context compression, session search,
Auxiliary callers (context compression, session search,
web extract summarisation, etc.) hardcode ``temperature=0.3`` for historical
reasons. Several provider/model combinations reject ``temperature`` with a
400:
@@ -100,7 +100,7 @@ class TestCallLlmUnsupportedTemperatureRetry:
side_effect=lambda resp, _task: resp),
):
result = call_llm(
task="flush_memories",
task="compression",
messages=[{"role": "user", "content": "remember this"}],
temperature=0.3,
max_tokens=500,
@@ -136,7 +136,7 @@ class TestCallLlmUnsupportedTemperatureRetry:
):
with pytest.raises(RuntimeError, match="Invalid value"):
call_llm(
task="flush_memories",
task="compression",
messages=[{"role": "user", "content": "x"}],
temperature=0.3,
max_tokens=500,
@@ -166,7 +166,7 @@ class TestCallLlmUnsupportedTemperatureRetry:
):
with pytest.raises(RuntimeError):
call_llm(
task="flush_memories",
task="compression",
messages=[{"role": "user", "content": "x"}],
temperature=None, # explicit: no temperature sent
max_tokens=500,

View File

@@ -33,7 +33,6 @@ class _FakeAgent:
self._todo_store.write(
[{"id": "t1", "content": "unfinished task", "status": "in_progress"}]
)
self.flush_memories = MagicMock()
self.commit_memory_session = MagicMock()
self._invalidate_system_prompt = MagicMock()
@@ -157,7 +156,6 @@ def test_new_command_creates_real_fresh_session_and_resets_agent_state(tmp_path)
assert cli.agent._todo_store.read() == []
assert cli.session_start > old_session_start
assert cli.agent.session_start == cli.session_start
cli.agent.flush_memories.assert_called_once_with([{"role": "user", "content": "hello"}])
cli.agent._invalidate_system_prompt.assert_called_once()

View File

@@ -1,249 +0,0 @@
"""Tests for proactive memory flush on session expiry.
Verifies that:
1. _is_session_expired() works from a SessionEntry alone (no source needed)
2. The sync callback is no longer called in get_or_create_session
3. memory_flushed flag persists across save/load cycles (prevents restart re-flush)
4. The background watcher can detect expired sessions
"""
import pytest
from datetime import datetime, timedelta
from pathlib import Path
from unittest.mock import patch, MagicMock
from gateway.config import Platform, GatewayConfig, SessionResetPolicy
from gateway.session import SessionSource, SessionStore, SessionEntry
@pytest.fixture()
def idle_store(tmp_path):
"""SessionStore with a 60-minute idle reset policy."""
config = GatewayConfig(
default_reset_policy=SessionResetPolicy(mode="idle", idle_minutes=60),
)
with patch("gateway.session.SessionStore._ensure_loaded"):
s = SessionStore(sessions_dir=tmp_path, config=config)
s._db = None
s._loaded = True
return s
@pytest.fixture()
def no_reset_store(tmp_path):
"""SessionStore with no reset policy (mode=none)."""
config = GatewayConfig(
default_reset_policy=SessionResetPolicy(mode="none"),
)
with patch("gateway.session.SessionStore._ensure_loaded"):
s = SessionStore(sessions_dir=tmp_path, config=config)
s._db = None
s._loaded = True
return s
class TestIsSessionExpired:
"""_is_session_expired should detect expiry from entry alone."""
def test_idle_session_expired(self, idle_store):
entry = SessionEntry(
session_key="agent:main:telegram:dm",
session_id="sid_1",
created_at=datetime.now() - timedelta(hours=3),
updated_at=datetime.now() - timedelta(minutes=120),
platform=Platform.TELEGRAM,
chat_type="dm",
)
assert idle_store._is_session_expired(entry) is True
def test_active_session_not_expired(self, idle_store):
entry = SessionEntry(
session_key="agent:main:telegram:dm",
session_id="sid_2",
created_at=datetime.now() - timedelta(hours=1),
updated_at=datetime.now() - timedelta(minutes=10),
platform=Platform.TELEGRAM,
chat_type="dm",
)
assert idle_store._is_session_expired(entry) is False
def test_none_mode_never_expires(self, no_reset_store):
entry = SessionEntry(
session_key="agent:main:telegram:dm",
session_id="sid_3",
created_at=datetime.now() - timedelta(days=30),
updated_at=datetime.now() - timedelta(days=30),
platform=Platform.TELEGRAM,
chat_type="dm",
)
assert no_reset_store._is_session_expired(entry) is False
def test_active_processes_prevent_expiry(self, idle_store):
"""Sessions with active background processes should never expire."""
idle_store._has_active_processes_fn = lambda key: True
entry = SessionEntry(
session_key="agent:main:telegram:dm",
session_id="sid_4",
created_at=datetime.now() - timedelta(hours=5),
updated_at=datetime.now() - timedelta(hours=5),
platform=Platform.TELEGRAM,
chat_type="dm",
)
assert idle_store._is_session_expired(entry) is False
def test_daily_mode_expired(self, tmp_path):
"""Daily mode should expire sessions from before today's reset hour."""
config = GatewayConfig(
default_reset_policy=SessionResetPolicy(mode="daily", at_hour=4),
)
with patch("gateway.session.SessionStore._ensure_loaded"):
store = SessionStore(sessions_dir=tmp_path, config=config)
store._db = None
store._loaded = True
entry = SessionEntry(
session_key="agent:main:telegram:dm",
session_id="sid_5",
created_at=datetime.now() - timedelta(days=2),
updated_at=datetime.now() - timedelta(days=2),
platform=Platform.TELEGRAM,
chat_type="dm",
)
assert store._is_session_expired(entry) is True
class TestGetOrCreateSessionNoCallback:
"""get_or_create_session should NOT call a sync flush callback."""
def test_auto_reset_creates_new_session_after_flush(self, idle_store):
"""When a flushed session auto-resets, a new session_id is created."""
source = SessionSource(
platform=Platform.TELEGRAM,
chat_id="123",
chat_type="dm",
)
# Create initial session
entry1 = idle_store.get_or_create_session(source)
old_sid = entry1.session_id
# Simulate the watcher having flushed it
entry1.memory_flushed = True
# Simulate the session going idle
entry1.updated_at = datetime.now() - timedelta(minutes=120)
idle_store._save()
# Next call should auto-reset
entry2 = idle_store.get_or_create_session(source)
assert entry2.session_id != old_sid
assert entry2.was_auto_reset is True
# New session starts with memory_flushed=False
assert entry2.memory_flushed is False
def test_no_sync_callback_invoked(self, idle_store):
"""No synchronous callback should block during auto-reset."""
source = SessionSource(
platform=Platform.TELEGRAM,
chat_id="123",
chat_type="dm",
)
entry1 = idle_store.get_or_create_session(source)
entry1.updated_at = datetime.now() - timedelta(minutes=120)
idle_store._save()
# Verify no _on_auto_reset attribute
assert not hasattr(idle_store, '_on_auto_reset')
# This should NOT block (no sync LLM call)
entry2 = idle_store.get_or_create_session(source)
assert entry2.was_auto_reset is True
class TestMemoryFlushedFlag:
"""The memory_flushed flag on SessionEntry prevents double-flushing."""
def test_defaults_to_false(self):
entry = SessionEntry(
session_key="agent:main:telegram:dm:123",
session_id="sid_new",
created_at=datetime.now(),
updated_at=datetime.now(),
platform=Platform.TELEGRAM,
chat_type="dm",
)
assert entry.memory_flushed is False
def test_persists_through_save_load(self, idle_store):
"""memory_flushed=True must survive a save/load cycle (simulates restart)."""
key = "agent:main:discord:thread:789"
entry = SessionEntry(
session_key=key,
session_id="sid_flushed",
created_at=datetime.now() - timedelta(hours=5),
updated_at=datetime.now() - timedelta(hours=5),
platform=Platform.DISCORD,
chat_type="thread",
memory_flushed=True,
)
idle_store._entries[key] = entry
idle_store._save()
# Simulate restart: clear in-memory state, reload from disk
idle_store._entries.clear()
idle_store._loaded = False
idle_store._ensure_loaded()
reloaded = idle_store._entries[key]
assert reloaded.memory_flushed is True
def test_unflushed_entry_survives_restart_as_unflushed(self, idle_store):
"""An entry without memory_flushed stays False after reload."""
key = "agent:main:telegram:dm:456"
entry = SessionEntry(
session_key=key,
session_id="sid_not_flushed",
created_at=datetime.now() - timedelta(hours=2),
updated_at=datetime.now() - timedelta(hours=2),
platform=Platform.TELEGRAM,
chat_type="dm",
)
idle_store._entries[key] = entry
idle_store._save()
idle_store._entries.clear()
idle_store._loaded = False
idle_store._ensure_loaded()
reloaded = idle_store._entries[key]
assert reloaded.memory_flushed is False
def test_roundtrip_to_dict_from_dict(self):
"""to_dict/from_dict must preserve memory_flushed."""
entry = SessionEntry(
session_key="agent:main:telegram:dm:999",
session_id="sid_rt",
created_at=datetime.now(),
updated_at=datetime.now(),
platform=Platform.TELEGRAM,
chat_type="dm",
memory_flushed=True,
)
d = entry.to_dict()
assert d["memory_flushed"] is True
restored = SessionEntry.from_dict(d)
assert restored.memory_flushed is True
def test_legacy_entry_without_field_defaults_false(self):
"""Old sessions.json entries missing memory_flushed should default to False."""
data = {
"session_key": "agent:main:telegram:dm:legacy",
"session_id": "sid_legacy",
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
"platform": "telegram",
"chat_type": "dm",
# no memory_flushed key
}
entry = SessionEntry.from_dict(data)
assert entry.memory_flushed is False

View File

@@ -1,240 +0,0 @@
"""Tests for memory flush stale-overwrite prevention (#2670).
Verifies that:
1. Cron sessions are skipped (no flush for headless cron runs)
2. Current memory state is injected into the flush prompt so the
flush agent can see what's already saved and avoid overwrites
3. The flush still works normally when memory files don't exist
"""
import sys
import types
import pytest
from pathlib import Path
from unittest.mock import MagicMock, patch, call
@pytest.fixture(autouse=True)
def _mock_dotenv(monkeypatch):
"""gateway.run imports dotenv at module level; stub it so tests run without the package."""
fake = types.ModuleType("dotenv")
fake.load_dotenv = lambda *a, **kw: None
monkeypatch.setitem(sys.modules, "dotenv", fake)
def _make_runner():
from gateway.run import GatewayRunner
runner = object.__new__(GatewayRunner)
runner._honcho_managers = {}
runner._honcho_configs = {}
runner._running_agents = {}
runner._pending_messages = {}
runner._pending_approvals = {}
runner.adapters = {}
runner.hooks = MagicMock()
runner.session_store = MagicMock()
return runner
_TRANSCRIPT_4_MSGS = [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "hi there"},
{"role": "user", "content": "remember my name is Alice"},
{"role": "assistant", "content": "Got it, Alice!"},
]
class TestCronSessionBypass:
"""Cron sessions should never trigger a memory flush."""
def test_cron_session_skipped(self):
runner = _make_runner()
runner._flush_memories_for_session("cron_job123_20260323_120000")
# session_store.load_transcript should never be called
runner.session_store.load_transcript.assert_not_called()
def test_cron_session_with_prefix_skipped(self):
"""Cron sessions with different prefixes are still skipped."""
runner = _make_runner()
runner._flush_memories_for_session("cron_daily_20260323")
runner.session_store.load_transcript.assert_not_called()
def test_non_cron_session_proceeds(self):
"""Non-cron sessions should still attempt the flush."""
runner = _make_runner()
runner.session_store.load_transcript.return_value = []
runner._flush_memories_for_session("session_abc123")
runner.session_store.load_transcript.assert_called_once_with("session_abc123")
def _make_flush_context(monkeypatch, memory_dir=None):
"""Return (runner, tmp_agent, fake_run_agent) with run_agent mocked in sys.modules."""
tmp_agent = MagicMock()
fake_run_agent = types.ModuleType("run_agent")
fake_run_agent.AIAgent = MagicMock(return_value=tmp_agent)
monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent)
runner = _make_runner()
runner.session_store.load_transcript.return_value = _TRANSCRIPT_4_MSGS
return runner, tmp_agent, memory_dir
class TestMemoryInjection:
"""The flush prompt should include current memory state from disk."""
def test_memory_content_injected_into_flush_prompt(self, tmp_path, monkeypatch):
"""When memory files exist, their content appears in the flush prompt."""
memory_dir = tmp_path / "memories"
memory_dir.mkdir()
(memory_dir / "MEMORY.md").write_text("Agent knows Python\n§\nUser prefers dark mode")
(memory_dir / "USER.md").write_text("Name: Alice\n§\nTimezone: PST")
runner, tmp_agent, _ = _make_flush_context(monkeypatch, memory_dir)
with (
patch("gateway.run._resolve_runtime_agent_kwargs", return_value={"api_key": "k"}),
patch("gateway.run._resolve_gateway_model", return_value="test-model"),
patch.dict("sys.modules", {"tools.memory_tool": MagicMock(get_memory_dir=lambda: memory_dir)}),
):
runner._flush_memories_for_session("session_123")
tmp_agent.run_conversation.assert_called_once()
flush_prompt = tmp_agent.run_conversation.call_args.kwargs.get("user_message", "")
assert "Agent knows Python" in flush_prompt
assert "User prefers dark mode" in flush_prompt
assert "Name: Alice" in flush_prompt
assert "Timezone: PST" in flush_prompt
assert "Do NOT overwrite or remove entries" in flush_prompt
assert "current live state of memory" in flush_prompt
def test_flush_works_without_memory_files(self, tmp_path, monkeypatch):
"""When no memory files exist, flush still runs without the guard."""
empty_dir = tmp_path / "no_memories"
empty_dir.mkdir()
runner, tmp_agent, _ = _make_flush_context(monkeypatch)
with (
patch("gateway.run._resolve_runtime_agent_kwargs", return_value={"api_key": "k"}),
patch("gateway.run._resolve_gateway_model", return_value="test-model"),
patch.dict("sys.modules", {"tools.memory_tool": MagicMock(get_memory_dir=lambda: empty_dir)}),
):
runner._flush_memories_for_session("session_456")
tmp_agent.run_conversation.assert_called_once()
flush_prompt = tmp_agent.run_conversation.call_args.kwargs.get("user_message", "")
assert "Do NOT overwrite or remove entries" not in flush_prompt
assert "Review the conversation above" in flush_prompt
def test_empty_memory_files_no_injection(self, tmp_path, monkeypatch):
"""Empty memory files should not trigger the guard section."""
memory_dir = tmp_path / "memories"
memory_dir.mkdir()
(memory_dir / "MEMORY.md").write_text("")
(memory_dir / "USER.md").write_text(" \n ") # whitespace only
runner, tmp_agent, _ = _make_flush_context(monkeypatch)
with (
patch("gateway.run._resolve_runtime_agent_kwargs", return_value={"api_key": "k"}),
patch("gateway.run._resolve_gateway_model", return_value="test-model"),
patch.dict("sys.modules", {"tools.memory_tool": MagicMock(get_memory_dir=lambda: memory_dir)}),
):
runner._flush_memories_for_session("session_789")
tmp_agent.run_conversation.assert_called_once()
flush_prompt = tmp_agent.run_conversation.call_args.kwargs.get("user_message", "")
assert "current live state of memory" not in flush_prompt
class TestFlushAgentSilenced:
"""The flush agent must not produce any terminal output."""
def test_print_fn_set_to_noop(self, tmp_path, monkeypatch):
"""_print_fn on the flush agent must be a no-op so tool output never leaks."""
runner = _make_runner()
runner.session_store.load_transcript.return_value = _TRANSCRIPT_4_MSGS
captured_agent = {}
def _fake_ai_agent(*args, **kwargs):
agent = MagicMock()
captured_agent["instance"] = agent
return agent
fake_run_agent = types.ModuleType("run_agent")
fake_run_agent.AIAgent = _fake_ai_agent
monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent)
with (
patch("gateway.run._resolve_runtime_agent_kwargs", return_value={"api_key": "k"}),
patch("gateway.run._resolve_gateway_model", return_value="test-model"),
patch.dict("sys.modules", {"tools.memory_tool": MagicMock(get_memory_dir=lambda: tmp_path)}),
):
runner._flush_memories_for_session("session_silent")
agent = captured_agent["instance"]
assert agent._print_fn is not None, "_print_fn should be overridden to suppress output"
# Confirm it is callable and produces no output (no exception)
agent._print_fn("should be silenced")
def test_kawaii_spinner_respects_print_fn(self):
"""KawaiiSpinner must route all output through print_fn when supplied."""
from agent.display import KawaiiSpinner
written = []
spinner = KawaiiSpinner("test", print_fn=lambda *a, **kw: written.append(a))
spinner._write("hello")
assert written == [("hello",)], "spinner should route through print_fn"
# A no-op print_fn must produce no output to stdout
import io, sys
buf = io.StringIO()
old_stdout = sys.stdout
sys.stdout = buf
try:
silent_spinner = KawaiiSpinner("silent", print_fn=lambda *a, **kw: None)
silent_spinner._write("should not appear")
silent_spinner.stop("done")
finally:
sys.stdout = old_stdout
assert buf.getvalue() == "", "no-op print_fn spinner must not write to stdout"
def test_flush_agent_closes_resources_after_run(self, monkeypatch):
"""Memory flush should close temporary agent resources after the turn."""
runner, tmp_agent, _ = _make_flush_context(monkeypatch)
tmp_agent.shutdown_memory_provider = MagicMock()
tmp_agent.close = MagicMock()
with (
patch("gateway.run._resolve_runtime_agent_kwargs", return_value={"api_key": "k"}),
patch("gateway.run._resolve_gateway_model", return_value="test-model"),
patch.dict("sys.modules", {"tools.memory_tool": MagicMock(get_memory_dir=lambda: Path("/nonexistent"))}),
):
runner._flush_memories_for_session("session_cleanup")
tmp_agent.shutdown_memory_provider.assert_called_once()
tmp_agent.close.assert_called_once()
class TestFlushPromptStructure:
"""Verify the flush prompt retains its core instructions."""
def test_core_instructions_present(self, monkeypatch):
"""The flush prompt should still contain the original guidance."""
runner, tmp_agent, _ = _make_flush_context(monkeypatch)
with (
patch("gateway.run._resolve_runtime_agent_kwargs", return_value={"api_key": "k"}),
patch("gateway.run._resolve_gateway_model", return_value="test-model"),
patch.dict("sys.modules", {"tools.memory_tool": MagicMock(get_memory_dir=lambda: Path("/nonexistent"))}),
):
runner._flush_memories_for_session("session_struct")
flush_prompt = tmp_agent.run_conversation.call_args.kwargs.get("user_message", "")
assert "automatically reset" in flush_prompt
assert "Save any important facts" in flush_prompt
assert "consider saving it as a skill" in flush_prompt
assert "Do NOT respond to the user" in flush_prompt

View File

@@ -4,7 +4,7 @@ Tests the _handle_resume_command handler (switch to a previously-named session)
across gateway messenger platforms.
"""
from unittest.mock import MagicMock, AsyncMock
from unittest.mock import MagicMock
import pytest
@@ -53,9 +53,6 @@ def _make_runner(session_db=None, current_session_id="current_session_001",
mock_store.switch_session.return_value = mock_session_entry
runner.session_store = mock_store
# Stub out memory flushing
runner._async_flush_memories = AsyncMock()
return runner
@@ -233,28 +230,3 @@ class TestHandleResumeCommand:
assert real_key not in runner._running_agents
db.close()
@pytest.mark.asyncio
async def test_resume_flushes_memories(self, tmp_path):
"""Resume should flush memories from the current session before switching."""
from hermes_state import SessionDB
db = SessionDB(db_path=tmp_path / "state.db")
db.create_session("old_session", "telegram")
db.set_session_title("old_session", "Old Work")
db.create_session("current_session_001", "telegram")
event = _make_event(text="/resume Old Work")
runner = _make_runner(
session_db=db,
current_session_id="current_session_001",
event=event,
)
await runner._handle_resume_command(event)
runner._async_flush_memories.assert_called_once_with(
"current_session_001",
"agent:main:telegram:dm:67890",
)
db.close()

View File

@@ -177,8 +177,8 @@ async def test_idle_expiry_fires_finalize_hook(mock_invoke_hook):
its reset policy (idle timeout, scheduled reset), it must fire
``on_session_finalize`` so plugin providers get the same final-pass
extraction opportunity they'd get from /new or CLI shutdown. Before
the fix, the expiry path flushed memories and evicted the agent but
silently skipped the hook.
the fix, the expiry path evicted the agent but silently skipped the
hook.
"""
from datetime import datetime, timedelta
@@ -200,7 +200,7 @@ async def test_idle_expiry_fires_finalize_hook(mock_invoke_hook):
platform=Platform.TELEGRAM,
chat_type="dm",
)
expired_entry.memory_flushed = False
expired_entry.expiry_finalized = False
runner.session_store = MagicMock()
runner.session_store._ensure_loaded = MagicMock()
@@ -211,24 +211,24 @@ async def test_idle_expiry_fires_finalize_hook(mock_invoke_hook):
runner.session_store._lock.__exit__ = MagicMock(return_value=None)
runner.session_store._save = MagicMock()
runner._async_flush_memories = AsyncMock()
runner._evict_cached_agent = MagicMock()
runner._cleanup_agent_resources = MagicMock()
runner._sweep_idle_cached_agents = MagicMock(return_value=0)
# The watcher starts with `await asyncio.sleep(60)` and loops while
# `self._running`. Patch sleep so the 60s initial delay is instant, then
# flip `_running` false inside the flush call so the loop exits cleanly
# after one pass.
# `self._running`. Patch sleep so the 60s initial delay is instant, and
# make the expiry hook invocation flip `_running` false so the loop
# exits cleanly after one pass.
_orig_sleep = __import__("asyncio").sleep
async def _fast_sleep(_):
await _orig_sleep(0)
async def _flush_and_stop(session_id, key):
runner._running = False # terminate the loop after this iteration
def _hook_and_stop(*a, **kw):
runner._running = False
return None
runner._async_flush_memories = AsyncMock(side_effect=_flush_and_stop)
mock_invoke_hook.side_effect = _hook_and_stop
with patch("gateway.run.asyncio.sleep", side_effect=_fast_sleep):
await runner._session_expiry_watcher(interval=0)

View File

@@ -1,7 +1,7 @@
"""Regression tests for approval-state cleanup on session boundaries."""
from datetime import datetime
from unittest.mock import AsyncMock, MagicMock
from unittest.mock import MagicMock
import pytest
@@ -72,7 +72,6 @@ def _make_resume_runner():
runner = object.__new__(GatewayRunner)
runner.adapters = {}
runner._background_tasks = set()
runner._async_flush_memories = AsyncMock()
runner._running_agents = {}
runner._running_agents_ts = {}
runner._busy_ack_ts = {}

View File

@@ -31,7 +31,6 @@ def _make_agent_with_engine(engine):
agent._vprint = lambda *a, **kw: None
agent._last_flushed_db_idx = 0
# Stub the few AIAgent methods _compress_context uses.
agent.flush_memories = lambda *a, **kw: None
agent._invalidate_system_prompt = lambda *a, **kw: None
agent._build_system_prompt = lambda *a, **kw: "new-system-prompt"
agent.commit_memory_session = lambda *a, **kw: None

View File

@@ -41,8 +41,6 @@ def _make_agent(
agent.tool_progress_callback = None
agent._compression_warning = None
agent._aux_compression_context_length_config = None
# Tools feed into the headroom calculation in _check_compression_model_feasibility.
# Tests that want to assert specific threshold values can override this.
agent.tools = []
compressor = MagicMock(spec=ContextCompressor)
@@ -85,9 +83,8 @@ def test_auto_corrects_threshold_when_aux_context_below_threshold(mock_get_clien
assert "threshold:" in messages[0]
# Warning stored for gateway replay
assert agent._compression_warning is not None
# Threshold on the live compressor was actually lowered, accounting for
# the request-overhead headroom (empty tools list → ~12K headroom only).
assert agent.context_compressor.threshold_tokens == 68_000
# Threshold on the live compressor was actually lowered to aux_context.
assert agent.context_compressor.threshold_tokens == 80_000
@patch("agent.model_metadata.get_model_context_length", return_value=32_768)
@@ -346,93 +343,7 @@ def test_just_below_threshold_auto_corrects(mock_get_client, mock_ctx_len):
assert len(messages) == 1
assert "small-model" in messages[0]
assert "Auto-lowered" in messages[0]
assert agent.context_compressor.threshold_tokens == 87_999
# ── Headroom for system prompt + tool schemas ────────────────────────
@patch("agent.model_metadata.get_model_context_length", return_value=128_000)
@patch("agent.auxiliary_client.get_text_auxiliary_client")
def test_auto_lowered_threshold_reserves_headroom_for_tools_and_system(mock_get_client, mock_ctx_len):
"""When aux context binds the threshold, new_threshold must leave room
for the system prompt and tool schemas that auxiliary callers
(compression summariser, flush_memories) prepend to the message list.
Without headroom, a full-budget message window + ~25K system/tool
overhead overflows the aux model with HTTP 400. Regression guard for
the flush_memories-on-busy-toolset overflow path.
"""
# Main context 200K, threshold 70% = 140K. Aux pins at 128K (below
# threshold → triggers auto-correct).
agent = _make_agent(main_context=200_000, threshold_percent=0.70)
# Build a realistic tool schema load.
agent.tools = [
{
"type": "function",
"function": {
"name": f"tool_{i}",
"description": "x" * 200,
"parameters": {"type": "object", "properties": {"arg": {"type": "string", "description": "y" * 120}}},
},
}
for i in range(50)
]
mock_client = MagicMock()
mock_client.base_url = "https://openrouter.ai/api/v1"
mock_client.api_key = "sk-aux"
mock_get_client.return_value = (mock_client, "model-with-128k")
agent._emit_status = lambda msg: None
agent._check_compression_model_feasibility()
new_threshold = agent.context_compressor.threshold_tokens
# Must have strictly reserved headroom: new_threshold < aux_context.
assert new_threshold < 128_000, (
f"threshold {new_threshold} did not reserve headroom below aux=128,000 "
f"— system prompt + tools would overflow the aux model"
)
# Must respect the 64K hard floor.
from agent.model_metadata import MINIMUM_CONTEXT_LENGTH
assert new_threshold >= MINIMUM_CONTEXT_LENGTH
@patch("agent.model_metadata.get_model_context_length", return_value=80_000)
@patch("agent.auxiliary_client.get_text_auxiliary_client")
def test_headroom_floors_at_minimum_context(mock_get_client, mock_ctx_len):
"""If headroom subtraction would push below 64K floor, clamp to 64K
rather than refusing the session — the aux is still workable for a
smaller message window.
"""
# Aux at 80K, with enough tools to push headroom > 16K → naive subtract
# would land at < 64K. The max(..., MINIMUM_CONTEXT_LENGTH) clamp must
# keep the session running.
agent = _make_agent(main_context=200_000, threshold_percent=0.50)
agent.tools = [
{
"type": "function",
"function": {
"name": f"tool_{i}",
"description": "z" * 2_000, # fat descriptions
"parameters": {},
},
}
for i in range(30)
]
mock_client = MagicMock()
mock_client.base_url = "https://openrouter.ai/api/v1"
mock_client.api_key = "sk-aux"
mock_get_client.return_value = (mock_client, "small-aux-model")
agent._emit_status = lambda msg: None
agent._check_compression_model_feasibility()
from agent.model_metadata import MINIMUM_CONTEXT_LENGTH
assert agent.context_compressor.threshold_tokens == MINIMUM_CONTEXT_LENGTH
assert agent.context_compressor.threshold_tokens == 99_999
# ── Two-phase: __init__ + run_conversation replay ───────────────────

View File

@@ -1,398 +0,0 @@
"""Tests for flush_memories() working correctly across all provider modes.
Catches the bug where Codex mode called chat.completions.create on a
Responses-only client, which would fail silently or with a 404.
"""
import json
import os
import sys
import types
from types import SimpleNamespace
from unittest.mock import patch, MagicMock, call
import pytest
sys.modules.setdefault("fire", types.SimpleNamespace(Fire=lambda *a, **k: None))
sys.modules.setdefault("firecrawl", types.SimpleNamespace(Firecrawl=object))
sys.modules.setdefault("fal_client", types.SimpleNamespace())
import run_agent
class _FakeOpenAI:
def __init__(self, **kwargs):
self.kwargs = kwargs
self.api_key = kwargs.get("api_key", "test")
self.base_url = kwargs.get("base_url", "http://test")
def close(self):
pass
def _make_agent(monkeypatch, api_mode="chat_completions", provider="openrouter"):
"""Build an AIAgent with mocked internals, ready for flush_memories testing."""
monkeypatch.setattr(run_agent, "get_tool_definitions", lambda **kw: [
{
"type": "function",
"function": {
"name": "memory",
"description": "Manage memories.",
"parameters": {
"type": "object",
"properties": {
"action": {"type": "string"},
"target": {"type": "string"},
"content": {"type": "string"},
},
},
},
},
])
monkeypatch.setattr(run_agent, "check_toolset_requirements", lambda: {})
monkeypatch.setattr(run_agent, "OpenAI", _FakeOpenAI)
agent = run_agent.AIAgent(
api_key="test-key",
base_url="https://test.example.com/v1",
provider=provider,
api_mode=api_mode,
max_iterations=4,
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
# Give it a valid memory store
agent._memory_store = MagicMock()
agent._memory_flush_min_turns = 1
agent._user_turn_count = 5
return agent
def _chat_response_with_memory_call():
"""Simulated chat completions response with a memory tool call."""
return SimpleNamespace(
choices=[SimpleNamespace(
finish_reason="tool_calls",
message=SimpleNamespace(
content=None,
tool_calls=[SimpleNamespace(
id="call_mem_0",
type="function",
function=SimpleNamespace(
name="memory",
arguments=json.dumps({
"action": "add",
"target": "notes",
"content": "User prefers dark mode.",
}),
),
)],
),
)],
usage=SimpleNamespace(prompt_tokens=100, completion_tokens=20, total_tokens=120),
)
class TestFlushMemoriesRespectsConfigTimeout:
"""flush_memories() must NOT hardcode timeout=30.0 — it should defer
to the config value via auxiliary.flush_memories.timeout."""
def test_auxiliary_path_omits_explicit_timeout(self, monkeypatch):
"""When calling _call_llm, timeout should NOT be passed so that
_get_task_timeout('flush_memories') reads from config."""
agent = _make_agent(monkeypatch, api_mode="chat_completions", provider="openrouter")
mock_response = _chat_response_with_memory_call()
with patch("agent.auxiliary_client.call_llm", return_value=mock_response) as mock_call:
messages = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi"},
{"role": "user", "content": "Note this"},
]
with patch("tools.memory_tool.memory_tool", return_value="Saved."):
agent.flush_memories(messages)
mock_call.assert_called_once()
call_kwargs = mock_call.call_args
# timeout must NOT be explicitly passed (so _get_task_timeout resolves it)
assert "timeout" not in call_kwargs.kwargs, (
"flush_memories should not pass explicit timeout to _call_llm; "
"let _get_task_timeout('flush_memories') resolve from config"
)
def test_fallback_path_uses_config_timeout(self, monkeypatch):
"""When auxiliary client is unavailable and we fall back to direct
OpenAI client, timeout should come from _get_task_timeout, not hardcoded."""
agent = _make_agent(monkeypatch, api_mode="chat_completions", provider="openrouter")
agent.client = MagicMock()
agent.client.chat.completions.create.return_value = _chat_response_with_memory_call()
custom_timeout = 180.0
with patch("agent.auxiliary_client.call_llm", side_effect=RuntimeError("no provider")), \
patch("agent.auxiliary_client._get_task_timeout", return_value=custom_timeout) as mock_gtt, \
patch("tools.memory_tool.memory_tool", return_value="Saved."):
messages = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi"},
{"role": "user", "content": "Save this"},
]
agent.flush_memories(messages)
mock_gtt.assert_called_once_with("flush_memories")
agent.client.chat.completions.create.assert_called_once()
call_kwargs = agent.client.chat.completions.create.call_args
assert call_kwargs.kwargs.get("timeout") == custom_timeout, (
f"Expected timeout={custom_timeout} from config, got {call_kwargs.kwargs.get('timeout')}"
)
class TestFlushMemoriesUsesAuxiliaryClient:
"""When an auxiliary client is available, flush_memories should use it
instead of self.client -- especially critical in Codex mode."""
def test_flush_uses_auxiliary_when_available(self, monkeypatch):
agent = _make_agent(monkeypatch, api_mode="codex_responses", provider="openai-codex")
mock_response = _chat_response_with_memory_call()
with patch("agent.auxiliary_client.call_llm", return_value=mock_response) as mock_call:
messages = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there"},
{"role": "user", "content": "Remember this"},
]
with patch("tools.memory_tool.memory_tool", return_value="Saved.") as mock_memory:
agent.flush_memories(messages)
mock_call.assert_called_once()
call_kwargs = mock_call.call_args
assert call_kwargs.kwargs.get("task") == "flush_memories"
def test_flush_uses_main_client_when_no_auxiliary(self, monkeypatch):
"""Non-Codex mode with no auxiliary falls back to self.client."""
agent = _make_agent(monkeypatch, api_mode="chat_completions", provider="openrouter")
agent.client = MagicMock()
agent.client.chat.completions.create.return_value = _chat_response_with_memory_call()
with patch("agent.auxiliary_client.call_llm", side_effect=RuntimeError("no provider")):
messages = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there"},
{"role": "user", "content": "Save this"},
]
with patch("tools.memory_tool.memory_tool", return_value="Saved."):
agent.flush_memories(messages)
agent.client.chat.completions.create.assert_called_once()
def test_auxiliary_provider_failure_surfaces_warning_and_falls_back(self, monkeypatch):
"""Provider/API failures from auxiliary flush must be visible.
Exhausted keys and rate limits are not always RuntimeError. They used
to fall into the broad outer handler and disappear into debug logs.
"""
agent = _make_agent(monkeypatch, api_mode="chat_completions", provider="openrouter")
agent.client = MagicMock()
agent.client.chat.completions.create.return_value = _chat_response_with_memory_call()
events = []
agent.status_callback = lambda kind, text=None: events.append((kind, text))
with patch("agent.auxiliary_client.call_llm", side_effect=Exception("opencode-go key exhausted")), \
patch("tools.memory_tool.memory_tool", return_value="Saved."):
messages = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there"},
{"role": "user", "content": "Save this"},
]
agent.flush_memories(messages)
agent.client.chat.completions.create.assert_called_once()
assert any(kind == "warn" and "Auxiliary memory flush failed" in text for kind, text in events)
def test_flush_executes_memory_tool_calls(self, monkeypatch):
"""Verify that memory tool calls from the flush response actually get executed."""
agent = _make_agent(monkeypatch, api_mode="chat_completions", provider="openrouter")
mock_response = _chat_response_with_memory_call()
with patch("agent.auxiliary_client.call_llm", return_value=mock_response):
messages = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi"},
{"role": "user", "content": "Note this"},
]
with patch("tools.memory_tool.memory_tool", return_value="Saved.") as mock_memory:
agent.flush_memories(messages)
mock_memory.assert_called_once()
call_kwargs = mock_memory.call_args
assert call_kwargs.kwargs["action"] == "add"
assert call_kwargs.kwargs["target"] == "notes"
assert "dark mode" in call_kwargs.kwargs["content"]
def test_flush_bridges_memory_write_metadata(self, monkeypatch):
"""Flush memory writes notify external providers with flush provenance."""
agent = _make_agent(monkeypatch, api_mode="chat_completions", provider="openrouter")
agent._memory_manager = MagicMock()
agent.session_id = "sess-flush"
agent.platform = "cli"
mock_response = _chat_response_with_memory_call()
with patch("agent.auxiliary_client.call_llm", return_value=mock_response):
messages = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi"},
{"role": "user", "content": "Note this"},
]
with patch("tools.memory_tool.memory_tool", return_value="Saved."):
agent.flush_memories(messages)
agent._memory_manager.on_memory_write.assert_called_once()
call_kwargs = agent._memory_manager.on_memory_write.call_args
assert call_kwargs.args[:3] == ("add", "notes", "User prefers dark mode.")
assert call_kwargs.kwargs["metadata"]["write_origin"] == "memory_flush"
assert call_kwargs.kwargs["metadata"]["execution_context"] == "flush_memories"
assert call_kwargs.kwargs["metadata"]["session_id"] == "sess-flush"
def test_flush_strips_artifacts_from_messages(self, monkeypatch):
"""After flush, the flush prompt and any response should be removed from messages."""
agent = _make_agent(monkeypatch, api_mode="chat_completions", provider="openrouter")
mock_response = _chat_response_with_memory_call()
with patch("agent.auxiliary_client.call_llm", return_value=mock_response):
messages = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi"},
{"role": "user", "content": "Remember X"},
]
original_len = len(messages)
with patch("tools.memory_tool.memory_tool", return_value="Saved."):
agent.flush_memories(messages)
# Messages should not grow from the flush
assert len(messages) <= original_len
# No flush sentinel should remain
for msg in messages:
assert "_flush_sentinel" not in msg
class TestFlushMemoriesCodexFallback:
"""When no auxiliary client exists and we're in Codex mode, flush should
use the Codex Responses API path instead of chat.completions."""
def test_codex_mode_no_aux_uses_responses_api(self, monkeypatch):
agent = _make_agent(monkeypatch, api_mode="codex_responses", provider="openai-codex")
codex_response = SimpleNamespace(
output=[
SimpleNamespace(
type="function_call",
call_id="call_1",
name="memory",
arguments=json.dumps({
"action": "add",
"target": "notes",
"content": "Codex flush test",
}),
),
],
usage=SimpleNamespace(input_tokens=50, output_tokens=10, total_tokens=60),
status="completed",
model="gpt-5-codex",
)
with patch("agent.auxiliary_client.call_llm", side_effect=RuntimeError("no provider")), \
patch.object(agent, "_run_codex_stream", return_value=codex_response) as mock_stream, \
patch.object(agent, "_build_api_kwargs") as mock_build, \
patch("tools.memory_tool.memory_tool", return_value="Saved.") as mock_memory:
mock_build.return_value = {
"model": "gpt-5-codex",
"instructions": "test",
"input": [],
"tools": [],
"max_output_tokens": 4096,
}
messages = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi"},
{"role": "user", "content": "Save this"},
]
agent.flush_memories(messages)
mock_stream.assert_called_once()
mock_memory.assert_called_once()
assert mock_memory.call_args.kwargs["content"] == "Codex flush test"
@pytest.mark.parametrize(
"provider,base_url",
[
# chatgpt.com/backend-api/codex — rejects temperature unconditionally
("openai-codex", "https://chatgpt.com/backend-api/codex"),
# Native OpenAI Responses — rejects temperature on gpt-5/o-series reasoning models
("openai", "https://api.openai.com/v1"),
# Copilot Responses — rejects temperature on reasoning models
("copilot", "https://api.githubcopilot.com"),
],
)
def test_codex_fallback_never_sends_temperature(self, monkeypatch, provider, base_url):
"""Regression for the ``⚠ Auxiliary memory flush failed: HTTP 400:
Unsupported parameter: temperature`` error.
The codex_responses fallback must strip temperature before calling
_run_codex_stream — the Responses API does not accept it on any
supported backend, matching the transport's behavior."""
agent = _make_agent(monkeypatch, api_mode="codex_responses", provider=provider)
agent.base_url = base_url
codex_response = SimpleNamespace(
output=[
SimpleNamespace(
type="function_call",
call_id="call_1",
name="memory",
arguments=json.dumps({
"action": "add",
"target": "notes",
"content": "no-temp test",
}),
),
],
usage=SimpleNamespace(input_tokens=50, output_tokens=10, total_tokens=60),
status="completed",
model="gpt-5.5",
)
with patch("agent.auxiliary_client.call_llm", side_effect=RuntimeError("no provider")), \
patch.object(agent, "_run_codex_stream", return_value=codex_response) as mock_stream, \
patch.object(agent, "_build_api_kwargs") as mock_build, \
patch("tools.memory_tool.memory_tool", return_value="Saved."):
# Simulate a transport that (correctly) never includes temperature,
# but also verify we strip any stray temperature the fallback used
# to inject before the fix.
mock_build.return_value = {
"model": "gpt-5.5",
"instructions": "test",
"input": [],
"tools": [],
"max_output_tokens": 4096,
# Intentionally poison the dict to prove we pop it:
"temperature": 0.3,
}
messages = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi"},
{"role": "user", "content": "Save this"},
]
agent.flush_memories(messages)
mock_stream.assert_called_once()
sent_kwargs = mock_stream.call_args.args[0]
assert "temperature" not in sent_kwargs, (
f"codex_responses fallback must strip temperature before calling "
f"_run_codex_stream, got: {sent_kwargs.get('temperature')!r}"
)

View File

@@ -3078,48 +3078,6 @@ class TestRetryExhaustion:
assert "bad messages" in result["error"]
# ---------------------------------------------------------------------------
# Flush sentinel leak
# ---------------------------------------------------------------------------
class TestFlushSentinelNotLeaked:
"""_flush_sentinel must be stripped before sending messages to the API."""
def test_flush_sentinel_stripped_from_api_messages(self, agent_with_memory_tool):
"""Verify _flush_sentinel is not sent to the API provider."""
agent = agent_with_memory_tool
agent._memory_store = MagicMock()
agent._memory_flush_min_turns = 1
agent._user_turn_count = 10
agent._cached_system_prompt = "system"
messages = [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "hi"},
{"role": "user", "content": "remember this"},
]
# Mock the API to return a simple response (no tool calls)
mock_msg = SimpleNamespace(content="OK", tool_calls=None)
mock_choice = SimpleNamespace(message=mock_msg)
mock_response = SimpleNamespace(choices=[mock_choice])
agent.client.chat.completions.create.return_value = mock_response
# Bypass auxiliary client so flush uses agent.client directly
with patch("agent.auxiliary_client.call_llm", side_effect=RuntimeError("no provider")):
agent.flush_memories(messages, min_turns=0)
# Check what was actually sent to the API
call_args = agent.client.chat.completions.create.call_args
assert call_args is not None, "flush_memories never called the API"
api_messages = call_args.kwargs.get("messages") or call_args[1].get("messages")
for msg in api_messages:
assert "_flush_sentinel" not in msg, (
f"_flush_sentinel leaked to API in message: {msg}"
)
# ---------------------------------------------------------------------------
# Conversation history mutation
# ---------------------------------------------------------------------------