feat(feishu): show processing state via reactions on user messages

Replaces the permanent "OK" receipt reaction with a 3-phase visual
lifecycle:

- Typing animation appears when the agent starts processing.
- Cleared when processing succeeds — the reply message is the signal.
- Replaced with CrossMark when processing fails.
- Cleared when processing is cancelled or interrupted.

When Feishu rejects the reaction-delete call, we keep the Typing in
place and skip adding CrossMark. Showing both at once would leave the
user seeing both "still working" and "done/failed" simultaneously,
which is worse than a stuck Typing.

A FEISHU_REACTIONS env var (default on) disables the whole lifecycle.
User-added reactions with the same emoji still route through to the
agent; only bot-origin reactions are filtered to break the feedback
loop.

Change-Id: I527081da31f0f9d59b451f45de59df4ddab522ba
This commit is contained in:
Roy-oss1
2026-04-18 12:56:06 +08:00
committed by Teknium
parent 60236862ee
commit 520edd3499
3 changed files with 395 additions and 90 deletions

View File

@@ -8,7 +8,8 @@ Supports:
- Gateway allowlist integration via FEISHU_ALLOWED_USERS
- Persistent dedup state across restarts
- Per-chat serial message processing (matches openclaw createChatQueue)
- Persistent ACK emoji reaction on inbound messages
- Processing status reactions: Typing while working, removed on success,
swapped for CrossMark on failure
- Reaction events routed as synthetic text events (matches openclaw)
- Interactive card button-click events routed as synthetic COMMAND events
- Webhook anomaly tracking (matches openclaw createWebhookAnomalyTracker)
@@ -29,6 +30,7 @@ import re
import threading
import time
import uuid
from collections import OrderedDict
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
@@ -98,6 +100,7 @@ from gateway.platforms.base import (
BasePlatformAdapter,
MessageEvent,
MessageType,
ProcessingOutcome,
SendResult,
SUPPORTED_DOCUMENT_TYPES,
cache_document_from_bytes,
@@ -190,7 +193,17 @@ _APPROVAL_LABEL_MAP: Dict[str, str] = {
}
_FEISHU_BOT_MSG_TRACK_SIZE = 512 # LRU size for tracking sent message IDs
_FEISHU_REPLY_FALLBACK_CODES = frozenset({230011, 231003}) # reply target withdrawn/missing → create fallback
_FEISHU_ACK_EMOJI = "OK"
# Feishu reactions render as prominent badges, unlike Discord/Telegram's
# small footer emoji — a success badge on every message would add noise, so
# we only mark start (Typing) and failure (CrossMark); the reply itself is
# the success signal.
_FEISHU_REACTION_IN_PROGRESS = "Typing"
_FEISHU_REACTION_FAILURE = "CrossMark"
# Bound on the (message_id → reaction_id) handle cache. Happy-path entries
# drain on completion; the cap is a safeguard against unbounded growth from
# delete-failures, not a capacity plan.
_FEISHU_PROCESSING_REACTION_CACHE_SIZE = 1024
# QR onboarding constants
_ONBOARD_ACCOUNTS_URLS = {
@@ -1141,6 +1154,9 @@ class FeishuAdapter(BasePlatformAdapter):
# Exec approval button state (approval_id → {session_key, message_id, chat_id})
self._approval_state: Dict[int, Dict[str, str]] = {}
self._approval_counter = itertools.count(1)
# Feishu reaction deletion requires the opaque reaction_id returned
# by create, so we cache it per message_id.
self._pending_processing_reactions: "OrderedDict[str, str]" = OrderedDict()
self._load_seen_message_ids()
@staticmethod
@@ -2050,12 +2066,12 @@ class FeishuAdapter(BasePlatformAdapter):
operator_type,
emoji_type,
)
# Only process reactions from real users. Ignore app/bot-generated reactions
# and Hermes' own ACK emoji to avoid feedback loops.
# Drop bot/app-origin reactions to break the feedback loop from our
# own lifecycle reactions. A human reacting with the same emoji (e.g.
# clicking Typing on a bot message) is still routed through.
loop = self._loop
if (
operator_type in {"bot", "app"}
or emoji_type == _FEISHU_ACK_EMOJI
or not message_id
or loop is None
or bool(getattr(loop, "is_closed", lambda: False)())
@@ -2279,33 +2295,35 @@ class FeishuAdapter(BasePlatformAdapter):
async def _handle_message_with_guards(self, event: MessageEvent) -> None:
"""Dispatch a single event through the agent pipeline with per-chat serialization
and a persistent ACK emoji reaction before processing starts.
before handing the event off to the agent.
- Per-chat lock: ensures messages in the same chat are processed one at a time
(matches openclaw's createChatQueue serial queue behaviour).
- ACK indicator: adds a CHECK reaction to the triggering message before handing
off to the agent and leaves it in place as a receipt marker.
Per-chat lock ensures messages in the same chat are processed one at a
time (matches openclaw's createChatQueue serial queue behaviour).
"""
chat_id = getattr(event.source, "chat_id", "") or "" if event.source else ""
chat_lock = self._get_chat_lock(chat_id)
async with chat_lock:
message_id = event.message_id
if message_id:
await self._add_ack_reaction(message_id)
await self.handle_message(event)
async def _add_ack_reaction(self, message_id: str) -> Optional[str]:
"""Add a persistent ACK emoji reaction to signal the message was received."""
if not self._client or not message_id:
# =========================================================================
# Processing status reactions
# =========================================================================
def _reactions_enabled(self) -> bool:
return os.getenv("FEISHU_REACTIONS", "true").strip().lower() not in ("false", "0", "no")
async def _add_reaction(self, message_id: str, emoji_type: str) -> Optional[str]:
"""Return the reaction_id on success, else None. The id is needed later for deletion."""
if not self._client or not message_id or not emoji_type:
return None
try:
from lark_oapi.api.im.v1 import ( # lazy import — keeps optional dep optional
from lark_oapi.api.im.v1 import (
CreateMessageReactionRequest,
CreateMessageReactionRequestBody,
)
body = (
CreateMessageReactionRequestBody.builder()
.reaction_type({"emoji_type": _FEISHU_ACK_EMOJI})
.reaction_type({"emoji_type": emoji_type})
.build()
)
request = (
@@ -2318,16 +2336,93 @@ class FeishuAdapter(BasePlatformAdapter):
if response and getattr(response, "success", lambda: False)():
data = getattr(response, "data", None)
return getattr(data, "reaction_id", None)
logger.warning(
"[Feishu] Failed to add ack reaction to %s: code=%s msg=%s",
logger.debug(
"[Feishu] Add reaction %s on %s rejected: code=%s msg=%s",
emoji_type,
message_id,
getattr(response, "code", None),
getattr(response, "msg", None),
)
except Exception:
logger.warning("[Feishu] Failed to add ack reaction to %s", message_id, exc_info=True)
logger.warning(
"[Feishu] Add reaction %s on %s raised",
emoji_type,
message_id,
exc_info=True,
)
return None
async def _remove_reaction(self, message_id: str, reaction_id: str) -> bool:
if not self._client or not message_id or not reaction_id:
return False
try:
from lark_oapi.api.im.v1 import DeleteMessageReactionRequest
request = (
DeleteMessageReactionRequest.builder()
.message_id(message_id)
.reaction_id(reaction_id)
.build()
)
response = await asyncio.to_thread(self._client.im.v1.message_reaction.delete, request)
if response and getattr(response, "success", lambda: False)():
return True
logger.debug(
"[Feishu] Remove reaction %s on %s rejected: code=%s msg=%s",
reaction_id,
message_id,
getattr(response, "code", None),
getattr(response, "msg", None),
)
except Exception:
logger.warning(
"[Feishu] Remove reaction %s on %s raised",
reaction_id,
message_id,
exc_info=True,
)
return False
def _remember_processing_reaction(self, message_id: str, reaction_id: str) -> None:
cache = self._pending_processing_reactions
cache[message_id] = reaction_id
cache.move_to_end(message_id)
while len(cache) > _FEISHU_PROCESSING_REACTION_CACHE_SIZE:
cache.popitem(last=False)
def _pop_processing_reaction(self, message_id: str) -> Optional[str]:
return self._pending_processing_reactions.pop(message_id, None)
async def on_processing_start(self, event: MessageEvent) -> None:
if not self._reactions_enabled():
return
message_id = event.message_id
if not message_id or message_id in self._pending_processing_reactions:
return
reaction_id = await self._add_reaction(message_id, _FEISHU_REACTION_IN_PROGRESS)
if reaction_id:
self._remember_processing_reaction(message_id, reaction_id)
async def on_processing_complete(
self, event: MessageEvent, outcome: ProcessingOutcome
) -> None:
if not self._reactions_enabled():
return
message_id = event.message_id
if not message_id:
return
start_reaction_id = self._pending_processing_reactions.get(message_id)
if start_reaction_id:
if not await self._remove_reaction(message_id, start_reaction_id):
# Don't stack a second badge on top of a Typing we couldn't
# remove — UI would read as both "working" and "done/failed"
# simultaneously. Keep the handle so LRU eventually evicts it.
return
self._pop_processing_reaction(message_id)
if outcome is ProcessingOutcome.FAILURE:
await self._add_reaction(message_id, _FEISHU_REACTION_FAILURE)
# =========================================================================
# Webhook server and security
# =========================================================================

View File

@@ -10,6 +10,8 @@ from pathlib import Path
from types import SimpleNamespace
from unittest.mock import AsyncMock, Mock, patch
from gateway.platforms.base import ProcessingOutcome
try:
import lark_oapi
_HAS_LARK_OAPI = True
@@ -638,83 +640,54 @@ class TestAdapterBehavior(unittest.TestCase):
)
@patch.dict(os.environ, {}, clear=True)
@unittest.skipUnless(_HAS_LARK_OAPI, "lark-oapi not installed")
def test_add_ack_reaction_uses_ok_emoji(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
captured = {}
class _ReactionAPI:
def create(self, request):
captured["request"] = request
return SimpleNamespace(
success=lambda: True,
data=SimpleNamespace(reaction_id="r_typing"),
)
adapter._client = SimpleNamespace(
im=SimpleNamespace(v1=SimpleNamespace(message_reaction=_ReactionAPI()))
)
async def _direct(func, *args, **kwargs):
return func(*args, **kwargs)
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
reaction_id = asyncio.run(adapter._add_ack_reaction("om_msg"))
self.assertEqual(reaction_id, "r_typing")
self.assertEqual(captured["request"].request_body.reaction_type["emoji_type"], "OK")
@patch.dict(os.environ, {}, clear=True)
def test_add_ack_reaction_logs_warning_on_failure(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
class _ReactionAPI:
def create(self, request):
raise RuntimeError("boom")
adapter._client = SimpleNamespace(
im=SimpleNamespace(v1=SimpleNamespace(message_reaction=_ReactionAPI()))
)
async def _direct(func, *args, **kwargs):
return func(*args, **kwargs)
with (
patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct),
self.assertLogs("gateway.platforms.feishu", level="WARNING") as logs,
):
reaction_id = asyncio.run(adapter._add_ack_reaction("om_msg"))
self.assertIsNone(reaction_id)
self.assertTrue(
any("Failed to add ack reaction to om_msg" in entry for entry in logs.output),
logs.output,
)
@patch.dict(os.environ, {}, clear=True)
def test_ack_reaction_events_are_ignored_to_avoid_feedback_loops(self):
def test_bot_origin_reactions_are_dropped_to_avoid_feedback_loops(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
adapter._loop = object()
for emoji in ("Typing", "CrossMark"):
event = SimpleNamespace(
message_id="om_msg",
operator_type="bot",
reaction_type=SimpleNamespace(emoji_type=emoji),
)
data = SimpleNamespace(event=event)
with patch(
"gateway.platforms.feishu.asyncio.run_coroutine_threadsafe"
) as run_threadsafe:
adapter._on_reaction_event("im.message.reaction.created_v1", data)
run_threadsafe.assert_not_called()
@patch.dict(os.environ, {}, clear=True)
def test_user_reaction_with_managed_emoji_is_still_routed(self):
# Operator-origin filter is enough to prevent feedback loops; we must
# not additionally swallow user-origin reactions just because their
# emoji happens to collide with a lifecycle emoji.
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
adapter._loop = SimpleNamespace(is_closed=lambda: False)
event = SimpleNamespace(
message_id="om_msg",
operator_type="user",
reaction_type=SimpleNamespace(emoji_type="OK"),
reaction_type=SimpleNamespace(emoji_type="Typing"),
)
data = SimpleNamespace(event=event)
with patch("gateway.platforms.feishu.asyncio.run_coroutine_threadsafe") as run_threadsafe:
adapter._on_reaction_event("im.message.reaction.created_v1", data)
def _close_coro_and_return_future(coro, _loop):
coro.close()
return SimpleNamespace(add_done_callback=lambda _: None)
run_threadsafe.assert_not_called()
with patch(
"gateway.platforms.feishu.asyncio.run_coroutine_threadsafe",
side_effect=_close_coro_and_return_future,
) as run_threadsafe:
adapter._on_reaction_event("im.message.reaction.created_v1", data)
run_threadsafe.assert_called_once()
@patch.dict(os.environ, {"FEISHU_GROUP_POLICY": "open"}, clear=True)
def test_group_message_requires_mentions_even_when_policy_open(self):
@@ -3278,3 +3251,231 @@ class TestSenderNameResolution(unittest.TestCase):
result = asyncio.run(adapter._resolve_sender_name_from_api("ou_broken"))
self.assertIsNone(result)
@unittest.skipUnless(_HAS_LARK_OAPI, "lark-oapi not installed")
class TestProcessingReactions(unittest.TestCase):
"""Typing on start → removed on SUCCESS, swapped for CrossMark on FAILURE,
removed (no replacement) on CANCELLED."""
@staticmethod
def _run(coro):
return asyncio.run(coro)
def _build_adapter(
self,
create_success: bool = True,
delete_success: bool = True,
next_reaction_id: str = "r1",
):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
tracker = SimpleNamespace(
create_calls=[],
delete_calls=[],
next_reaction_id=next_reaction_id,
create_success=create_success,
delete_success=delete_success,
)
def _create(request):
tracker.create_calls.append(
request.request_body.reaction_type["emoji_type"]
)
if tracker.create_success:
return SimpleNamespace(
success=lambda: True,
data=SimpleNamespace(reaction_id=tracker.next_reaction_id),
)
return SimpleNamespace(
success=lambda: False, code=99, msg="rejected", data=None,
)
def _delete(request):
tracker.delete_calls.append(request.reaction_id)
return SimpleNamespace(
success=lambda: tracker.delete_success,
code=0 if tracker.delete_success else 99,
msg="success" if tracker.delete_success else "rejected",
)
adapter._client = SimpleNamespace(
im=SimpleNamespace(
v1=SimpleNamespace(
message_reaction=SimpleNamespace(create=_create, delete=_delete),
),
),
)
return adapter, tracker
@staticmethod
def _event(message_id: str = "om_msg"):
return SimpleNamespace(message_id=message_id)
def _patch_to_thread(self):
async def _direct(func, *args, **kwargs):
return func(*args, **kwargs)
return patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct)
# ------------------------------------------------------------------ start
@patch.dict(os.environ, {}, clear=True)
def test_start_adds_typing_and_caches_reaction_id(self):
adapter, tracker = self._build_adapter(next_reaction_id="r_typing")
with self._patch_to_thread():
self._run(adapter.on_processing_start(self._event()))
self.assertEqual(tracker.create_calls, ["Typing"])
self.assertEqual(adapter._pending_processing_reactions["om_msg"], "r_typing")
@patch.dict(os.environ, {}, clear=True)
def test_start_is_idempotent_for_same_message_id(self):
adapter, tracker = self._build_adapter(next_reaction_id="r_typing")
with self._patch_to_thread():
self._run(adapter.on_processing_start(self._event()))
self._run(adapter.on_processing_start(self._event()))
self.assertEqual(tracker.create_calls, ["Typing"])
@patch.dict(os.environ, {}, clear=True)
def test_start_does_not_cache_when_create_fails(self):
adapter, tracker = self._build_adapter(create_success=False)
with self._patch_to_thread():
self._run(adapter.on_processing_start(self._event()))
self.assertEqual(tracker.create_calls, ["Typing"])
self.assertNotIn("om_msg", adapter._pending_processing_reactions)
# --------------------------------------------------------------- complete
@patch.dict(os.environ, {}, clear=True)
def test_success_removes_typing_and_adds_nothing(self):
adapter, tracker = self._build_adapter(next_reaction_id="r_typing")
with self._patch_to_thread():
self._run(adapter.on_processing_start(self._event()))
self._run(
adapter.on_processing_complete(self._event(), ProcessingOutcome.SUCCESS)
)
self.assertEqual(tracker.create_calls, ["Typing"])
self.assertEqual(tracker.delete_calls, ["r_typing"])
self.assertNotIn("om_msg", adapter._pending_processing_reactions)
@patch.dict(os.environ, {}, clear=True)
def test_failure_removes_typing_then_adds_cross_mark(self):
adapter, tracker = self._build_adapter(next_reaction_id="r_typing")
with self._patch_to_thread():
self._run(adapter.on_processing_start(self._event()))
self._run(
adapter.on_processing_complete(self._event(), ProcessingOutcome.FAILURE)
)
self.assertEqual(tracker.create_calls, ["Typing", "CrossMark"])
self.assertEqual(tracker.delete_calls, ["r_typing"])
@patch.dict(os.environ, {}, clear=True)
def test_cancelled_removes_typing_and_adds_nothing(self):
adapter, tracker = self._build_adapter(next_reaction_id="r_typing")
with self._patch_to_thread():
self._run(adapter.on_processing_start(self._event()))
self._run(
adapter.on_processing_complete(self._event(), ProcessingOutcome.CANCELLED)
)
self.assertEqual(tracker.create_calls, ["Typing"])
self.assertEqual(tracker.delete_calls, ["r_typing"])
self.assertNotIn("om_msg", adapter._pending_processing_reactions)
@patch.dict(os.environ, {}, clear=True)
def test_failure_without_preceding_start_still_adds_cross_mark(self):
adapter, tracker = self._build_adapter()
with self._patch_to_thread():
self._run(
adapter.on_processing_complete(self._event(), ProcessingOutcome.FAILURE)
)
self.assertEqual(tracker.create_calls, ["CrossMark"])
self.assertEqual(tracker.delete_calls, [])
@patch.dict(os.environ, {}, clear=True)
def test_success_without_preceding_start_is_full_noop(self):
adapter, tracker = self._build_adapter()
with self._patch_to_thread():
self._run(
adapter.on_processing_complete(self._event(), ProcessingOutcome.SUCCESS)
)
self.assertEqual(tracker.create_calls, [])
self.assertEqual(tracker.delete_calls, [])
# ------------------------- delete failure: don't stack badges -----------
@patch.dict(os.environ, {}, clear=True)
def test_delete_failure_on_failure_outcome_skips_cross_mark(self):
# Removing Typing is best-effort — but if it fails, we must NOT
# additionally add CrossMark, or the UI would show two contradictory
# badges. The handle stays in the cache for LRU to clean up later.
adapter, tracker = self._build_adapter(
next_reaction_id="r_typing", delete_success=False,
)
with self._patch_to_thread():
self._run(adapter.on_processing_start(self._event()))
self._run(
adapter.on_processing_complete(self._event(), ProcessingOutcome.FAILURE)
)
self.assertEqual(tracker.create_calls, ["Typing"]) # CrossMark NOT added
self.assertEqual(tracker.delete_calls, ["r_typing"]) # delete was attempted
self.assertEqual(
adapter._pending_processing_reactions["om_msg"], "r_typing",
) # handle retained
@patch.dict(os.environ, {}, clear=True)
def test_delete_failure_on_success_outcome_retains_handle(self):
adapter, tracker = self._build_adapter(
next_reaction_id="r_typing", delete_success=False,
)
with self._patch_to_thread():
self._run(adapter.on_processing_start(self._event()))
self._run(
adapter.on_processing_complete(self._event(), ProcessingOutcome.SUCCESS)
)
self.assertEqual(tracker.create_calls, ["Typing"])
self.assertEqual(tracker.delete_calls, ["r_typing"])
self.assertEqual(
adapter._pending_processing_reactions["om_msg"], "r_typing",
)
# ------------------------------------------------------------- env toggle
@patch.dict(os.environ, {"FEISHU_REACTIONS": "false"}, clear=True)
def test_env_disable_short_circuits_both_hooks(self):
adapter, tracker = self._build_adapter()
with self._patch_to_thread():
self._run(adapter.on_processing_start(self._event()))
self._run(
adapter.on_processing_complete(self._event(), ProcessingOutcome.FAILURE)
)
self.assertEqual(tracker.create_calls, [])
self.assertEqual(tracker.delete_calls, [])
# ------------------------------------------------------------- LRU bounds
@patch.dict(os.environ, {}, clear=True)
def test_cache_evicts_oldest_entry_beyond_size_limit(self):
from gateway.platforms.feishu import _FEISHU_PROCESSING_REACTION_CACHE_SIZE
adapter, _ = self._build_adapter()
counter = {"n": 0}
def _create(_request):
counter["n"] += 1
return SimpleNamespace(
success=lambda: True,
data=SimpleNamespace(reaction_id=f"r{counter['n']}"),
)
adapter._client.im.v1.message_reaction.create = _create
with self._patch_to_thread():
for i in range(_FEISHU_PROCESSING_REACTION_CACHE_SIZE + 1):
self._run(adapter.on_processing_start(self._event(f"om_{i}")))
self.assertNotIn("om_0", adapter._pending_processing_reactions)
self.assertIn(
f"om_{_FEISHU_PROCESSING_REACTION_CACHE_SIZE}",
adapter._pending_processing_reactions,
)
self.assertEqual(
len(adapter._pending_processing_reactions),
_FEISHU_PROCESSING_REACTION_CACHE_SIZE,
)

View File

@@ -335,13 +335,22 @@ If the Feishu API rejects the post payload (e.g., due to unsupported markdown co
Plain text messages (no markdown detected) are sent as the simple `text` message type.
## ACK Emoji Reactions
## Processing Status Reactions
When the adapter receives an inbound message, it immediately adds an ✅ (OK) emoji reaction to signal that the message was received and is being processed. This provides visual feedback before the agent completes its response.
The adapter cycles a reaction on the user's message to signal what the agent is doing:
The reaction is persistent — it remains on the message after the response is sent, serving as a receipt marker.
| Phase | Reaction |
|-------|----------|
| Agent begins processing | `Typing` added |
| Processing succeeds | `Typing` removed (the reply message itself is the success signal) |
| Processing fails | `Typing` removed, `CrossMark` added |
| Processing is cancelled or interrupted | `Typing` removed (task aborted; no replacement badge) |
User reactions on bot messages are also tracked. If a user adds or removes an emoji reaction on a message sent by the bot, it is routed as a synthetic text event (`reaction:added:EMOJI_TYPE` or `reaction:removed:EMOJI_TYPE`) so the agent can respond to feedback.
Unlike Discord/Matrix, no positive badge is added on success — Feishu reactions render as prominent timeline badges and a per-message success marker would create visual noise. The absence of a badge, together with the reply message, is the success signal.
Set `FEISHU_REACTIONS=false` to disable this entirely (e.g., for tenants where the bot lacks reaction permission, or where the noise is unwanted).
User reactions on bot messages are routed back to the agent as synthetic text events (`reaction:added:EMOJI_TYPE` or `reaction:removed:EMOJI_TYPE`). Only real user reactions are routed — bot/app-origin reactions (including the adapter's own `Typing`/`CrossMark`) are dropped to avoid feedback loops.
## Burst Protection and Batching