feat: /compress <focus> — guided compression with focus topic (#8017)

Adds an optional focus topic to /compress: `/compress database schema`
guides the summariser to preserve information related to the focus topic
(60-70% of summary budget) while compressing everything else more aggressively.
Inspired by Claude Code's /compact <focus>.

Changes:
- context_compressor.py: focus_topic parameter on _generate_summary() and
  compress(); appends FOCUS TOPIC guidance block to the LLM prompt
- run_agent.py: focus_topic parameter on _compress_context(), passed through
  to the compressor
- cli.py: _manual_compress() extracts focus topic from command string,
  preserves existing manual_compression_feedback integration (no regression)
- gateway/run.py: _handle_compress_command() extracts focus from event args
  and passes through — full gateway parity
- commands.py: args_hint="[focus topic]" on /compress CommandDef

Salvaged from PR #7459 (CLI /compress focus only — /context command deferred).
15 new tests across CLI, compressor, and gateway.
This commit is contained in:
Teknium
2026-04-11 19:23:29 -07:00
committed by GitHub
parent cfbfc4c3f1
commit a0a02c1bc0
8 changed files with 445 additions and 14 deletions

View File

@@ -0,0 +1,139 @@
"""Tests for focus_topic flowing through the compressor.
Verifies that _generate_summary and compress accept and use the focus_topic
parameter correctly. Inspired by Claude Code's /compact <focus>.
"""
from unittest.mock import MagicMock, patch
from agent.context_compressor import ContextCompressor
def _make_compressor():
"""Create a ContextCompressor with minimal state for testing."""
compressor = ContextCompressor.__new__(ContextCompressor)
compressor.protect_first_n = 2
compressor.protect_last_n = 5
compressor.tail_token_budget = 20000
compressor.context_length = 200000
compressor.threshold_percent = 0.80
compressor.threshold_tokens = 160000
compressor.max_summary_tokens = 10000
compressor.quiet_mode = True
compressor.compression_count = 0
compressor.last_prompt_tokens = 0
compressor._previous_summary = None
compressor._summary_failure_cooldown_until = 0.0
compressor.summary_model = None
return compressor
def test_focus_topic_injected_into_summary_prompt():
"""When focus_topic is provided, the LLM prompt includes focus guidance."""
compressor = _make_compressor()
turns = [
{"role": "user", "content": "Tell me about the database schema"},
{"role": "assistant", "content": "The schema has tables: users, orders, products."},
]
captured_prompt = {}
def mock_call_llm(**kwargs):
captured_prompt["messages"] = kwargs["messages"]
resp = MagicMock()
resp.choices = [MagicMock()]
resp.choices[0].message.content = "## Goal\nUnderstand DB schema."
return resp
with patch("agent.context_compressor.call_llm", mock_call_llm):
result = compressor._generate_summary(turns, focus_topic="database schema")
assert result is not None
prompt_text = captured_prompt["messages"][0]["content"]
assert 'FOCUS TOPIC: "database schema"' in prompt_text
assert "PRIORITISE" in prompt_text
assert "60-70%" in prompt_text
def test_no_focus_topic_no_injection():
"""Without focus_topic, the prompt doesn't contain focus guidance."""
compressor = _make_compressor()
turns = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi"},
]
captured_prompt = {}
def mock_call_llm(**kwargs):
captured_prompt["messages"] = kwargs["messages"]
resp = MagicMock()
resp.choices = [MagicMock()]
resp.choices[0].message.content = "## Goal\nGreeting."
return resp
with patch("agent.context_compressor.call_llm", mock_call_llm):
result = compressor._generate_summary(turns)
prompt_text = captured_prompt["messages"][0]["content"]
assert "FOCUS TOPIC" not in prompt_text
def test_compress_passes_focus_to_generate_summary():
"""compress() passes focus_topic through to _generate_summary."""
compressor = _make_compressor()
# Track what _generate_summary receives
received_kwargs = {}
original_generate = compressor._generate_summary
def tracking_generate(turns, **kwargs):
received_kwargs.update(kwargs)
return "## Goal\nTest."
compressor._generate_summary = tracking_generate
messages = [
{"role": "system", "content": "System prompt"},
{"role": "user", "content": "first"},
{"role": "assistant", "content": "reply1"},
{"role": "user", "content": "second"},
{"role": "assistant", "content": "reply2"},
{"role": "user", "content": "third"},
{"role": "assistant", "content": "reply3"},
{"role": "user", "content": "fourth"},
{"role": "assistant", "content": "reply4"},
]
compressor.compress(messages, current_tokens=100000, focus_topic="authentication flow")
assert received_kwargs.get("focus_topic") == "authentication flow"
def test_compress_none_focus_by_default():
"""compress() passes None focus_topic by default."""
compressor = _make_compressor()
received_kwargs = {}
def tracking_generate(turns, **kwargs):
received_kwargs.update(kwargs)
return "## Goal\nTest."
compressor._generate_summary = tracking_generate
messages = [
{"role": "system", "content": "System prompt"},
{"role": "user", "content": "first"},
{"role": "assistant", "content": "reply1"},
{"role": "user", "content": "second"},
{"role": "assistant", "content": "reply2"},
{"role": "user", "content": "third"},
{"role": "assistant", "content": "reply3"},
{"role": "user", "content": "fourth"},
{"role": "assistant", "content": "reply4"},
]
compressor.compress(messages, current_tokens=100000)
assert received_kwargs.get("focus_topic") is None

View File

@@ -0,0 +1,118 @@
"""Tests for /compress <focus> — guided compression with focus topic.
Inspired by Claude Code's /compact <focus> feature.
"""
from unittest.mock import MagicMock, patch
from tests.cli.test_cli_init import _make_cli
def _make_history() -> list[dict[str, str]]:
return [
{"role": "user", "content": "one"},
{"role": "assistant", "content": "two"},
{"role": "user", "content": "three"},
{"role": "assistant", "content": "four"},
]
def test_focus_topic_extracted_and_passed(capsys):
"""Focus topic is extracted from the command and passed to _compress_context."""
shell = _make_cli()
history = _make_history()
compressed = [history[0], history[-1]]
shell.conversation_history = history
shell.agent = MagicMock()
shell.agent.compression_enabled = True
shell.agent._cached_system_prompt = ""
shell.agent._compress_context.return_value = (compressed, "")
def _estimate(messages):
if messages is history:
return 100
return 50
with patch("agent.model_metadata.estimate_messages_tokens_rough", side_effect=_estimate):
shell._manual_compress("/compress database schema")
output = capsys.readouterr().out
assert 'focus: "database schema"' in output
# Verify focus_topic was passed through
shell.agent._compress_context.assert_called_once()
call_kwargs = shell.agent._compress_context.call_args
assert call_kwargs.kwargs.get("focus_topic") == "database schema"
def test_no_focus_topic_when_bare_command(capsys):
"""When no focus topic is provided, None is passed."""
shell = _make_cli()
history = _make_history()
shell.conversation_history = history
shell.agent = MagicMock()
shell.agent.compression_enabled = True
shell.agent._cached_system_prompt = ""
shell.agent._compress_context.return_value = (list(history), "")
with patch("agent.model_metadata.estimate_messages_tokens_rough", return_value=100):
shell._manual_compress("/compress")
shell.agent._compress_context.assert_called_once()
call_kwargs = shell.agent._compress_context.call_args
assert call_kwargs.kwargs.get("focus_topic") is None
def test_empty_focus_after_command_treated_as_none(capsys):
"""Trailing whitespace after /compress does not produce a focus topic."""
shell = _make_cli()
history = _make_history()
shell.conversation_history = history
shell.agent = MagicMock()
shell.agent.compression_enabled = True
shell.agent._cached_system_prompt = ""
shell.agent._compress_context.return_value = (list(history), "")
with patch("agent.model_metadata.estimate_messages_tokens_rough", return_value=100):
shell._manual_compress("/compress ")
shell.agent._compress_context.assert_called_once()
call_kwargs = shell.agent._compress_context.call_args
assert call_kwargs.kwargs.get("focus_topic") is None
def test_focus_topic_printed_in_compression_banner(capsys):
"""The focus topic shows in the compression progress banner."""
shell = _make_cli()
history = _make_history()
compressed = [history[0], history[-1]]
shell.conversation_history = history
shell.agent = MagicMock()
shell.agent.compression_enabled = True
shell.agent._cached_system_prompt = ""
shell.agent._compress_context.return_value = (compressed, "")
with patch("agent.model_metadata.estimate_messages_tokens_rough", return_value=100):
shell._manual_compress("/compress API endpoints")
output = capsys.readouterr().out
assert 'focus: "API endpoints"' in output
def test_no_focus_prints_standard_banner(capsys):
"""Without focus, the standard banner (no focus: line) is printed."""
shell = _make_cli()
history = _make_history()
compressed = [history[0], history[-1]]
shell.conversation_history = history
shell.agent = MagicMock()
shell.agent.compression_enabled = True
shell.agent._cached_system_prompt = ""
shell.agent._compress_context.return_value = (compressed, "")
with patch("agent.model_metadata.estimate_messages_tokens_rough", return_value=100):
shell._manual_compress("/compress")
output = capsys.readouterr().out
assert "focus:" not in output
assert "Compressing" in output

View File

@@ -0,0 +1,118 @@
"""Tests for gateway /compress <focus> — focus topic on the gateway side."""
from datetime import datetime
from unittest.mock import MagicMock, patch
import pytest
from gateway.config import GatewayConfig, Platform, PlatformConfig
from gateway.platforms.base import MessageEvent
from gateway.session import SessionEntry, SessionSource, build_session_key
def _make_source() -> SessionSource:
return SessionSource(
platform=Platform.TELEGRAM,
user_id="u1",
chat_id="c1",
user_name="tester",
chat_type="dm",
)
def _make_event(text: str = "/compress") -> MessageEvent:
return MessageEvent(text=text, source=_make_source(), message_id="m1")
def _make_history() -> list[dict[str, str]]:
return [
{"role": "user", "content": "one"},
{"role": "assistant", "content": "two"},
{"role": "user", "content": "three"},
{"role": "assistant", "content": "four"},
]
def _make_runner(history: list[dict[str, str]]):
from gateway.run import GatewayRunner
runner = object.__new__(GatewayRunner)
runner.config = GatewayConfig(
platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="***")}
)
session_entry = SessionEntry(
session_key=build_session_key(_make_source()),
session_id="sess-1",
created_at=datetime.now(),
updated_at=datetime.now(),
platform=Platform.TELEGRAM,
chat_type="dm",
)
runner.session_store = MagicMock()
runner.session_store.get_or_create_session.return_value = session_entry
runner.session_store.load_transcript.return_value = history
runner.session_store.rewrite_transcript = MagicMock()
runner.session_store.update_session = MagicMock()
runner.session_store._save = MagicMock()
return runner
@pytest.mark.asyncio
async def test_compress_focus_topic_passed_to_agent():
"""Focus topic from /compress <focus> is passed through to _compress_context."""
history = _make_history()
compressed = [history[0], history[-1]]
runner = _make_runner(history)
agent_instance = MagicMock()
agent_instance.context_compressor.protect_first_n = 0
agent_instance.context_compressor._align_boundary_forward.return_value = 0
agent_instance.context_compressor._find_tail_cut_by_tokens.return_value = 2
agent_instance.session_id = "sess-1"
agent_instance._compress_context.return_value = (compressed, "")
def _estimate(messages):
return 100
with (
patch("gateway.run._resolve_runtime_agent_kwargs", return_value={"api_key": "***"}),
patch("gateway.run._resolve_gateway_model", return_value="test-model"),
patch("run_agent.AIAgent", return_value=agent_instance),
patch("agent.model_metadata.estimate_messages_tokens_rough", side_effect=_estimate),
):
result = await runner._handle_compress_command(_make_event("/compress database schema"))
# Verify focus_topic was passed
agent_instance._compress_context.assert_called_once()
call_kwargs = agent_instance._compress_context.call_args
assert call_kwargs.kwargs.get("focus_topic") == "database schema"
# Verify focus is mentioned in response
assert 'Focus: "database schema"' in result
@pytest.mark.asyncio
async def test_compress_no_focus_passes_none():
"""Bare /compress passes focus_topic=None."""
history = _make_history()
runner = _make_runner(history)
agent_instance = MagicMock()
agent_instance.context_compressor.protect_first_n = 0
agent_instance.context_compressor._align_boundary_forward.return_value = 0
agent_instance.context_compressor._find_tail_cut_by_tokens.return_value = 2
agent_instance.session_id = "sess-1"
agent_instance._compress_context.return_value = (list(history), "")
with (
patch("gateway.run._resolve_runtime_agent_kwargs", return_value={"api_key": "***"}),
patch("gateway.run._resolve_gateway_model", return_value="test-model"),
patch("run_agent.AIAgent", return_value=agent_instance),
patch("agent.model_metadata.estimate_messages_tokens_rough", return_value=100),
):
result = await runner._handle_compress_command(_make_event("/compress"))
agent_instance._compress_context.assert_called_once()
call_kwargs = agent_instance._compress_context.call_args
assert call_kwargs.kwargs.get("focus_topic") is None
# No focus line in response
assert "Focus:" not in result