fix: observe legacy service redis events

This commit is contained in:
-Puter
2026-06-04 16:58:48 +05:30
parent f03de1ea58
commit 1d3cfbcff7
2 changed files with 234 additions and 5 deletions

View File

@@ -31,6 +31,13 @@ export const config = {
growEventsConsumerGroup: process.env.GROW_EVENTS_CONSUMER_GROUP ?? "growqr-backend",
growEventsConsumerName: process.env.GROW_EVENTS_CONSUMER_NAME ?? `backend-${process.pid}`,
// Legacy service Redis surfaces. These let backend observe existing service A2A traffic
// without changing the services. Defaults fall back to GROW_EVENTS_REDIS_URL/REDIS_URL.
interviewRedisUrl: process.env.INTERVIEW_REDIS_URL ?? process.env.GROW_EVENTS_REDIS_URL ?? process.env.REDIS_URL ?? "",
roleplayRedisUrl: process.env.ROLEPLAY_REDIS_URL ?? process.env.GROW_EVENTS_REDIS_URL ?? process.env.REDIS_URL ?? "",
resumeRedisUrl: process.env.RESUME_REDIS_URL ?? process.env.GROW_EVENTS_REDIS_URL ?? process.env.REDIS_URL ?? "",
legacyServiceTaskObserverGroup: process.env.LEGACY_SERVICE_TASK_OBSERVER_GROUP ?? "growqr-backend-observer",
// LLM gateway for the unified user agent.
llmProvider: process.env.LLM_PROVIDER ?? "opencode",
llmApiKey:

View File

@@ -1,19 +1,46 @@
import { randomUUID } from "node:crypto";
import { config } from "../config.js";
import { log } from "../log.js";
import { recordGrowEvent } from "./record-grow-event.js";
import { routeGrowEventToUserActor } from "./route-to-user-actor.js";
// This file has two Redis ingestion modes:
// 1. Canonical GrowEvent stream: grow.events.raw — future service event bus.
// 2. Legacy A2A observer: watches existing tasks:{service} streams + responses:* pub/sub
// so backend can capture current service emissions without service changes.
type RedisMessage = { id: string; message: Record<string, string> };
type RedisStreamResponse = Array<{ name: string; messages: RedisMessage[] }>;
type RedisClientLike = {
connect: () => Promise<void>;
duplicate?: () => RedisClientLike;
on: (event: string, listener: (err: unknown) => void) => void;
xGroupCreate: (stream: string, group: string, id: string, opts?: Record<string, unknown>) => Promise<unknown>;
xReadGroup: (group: string, consumer: string, streams: { key: string; id: string }, opts?: Record<string, unknown>) => Promise<RedisStreamResponse | null>;
xAck: (stream: string, group: string, id: string) => Promise<unknown>;
pSubscribe?: (pattern: string, listener: (message: string, channel: string) => void | Promise<void>) => Promise<unknown>;
};
type ServiceRedisSpec = {
serviceId: "interview" | "roleplay" | "resume";
agentName: "interview-service" | "roleplay-service" | "resume-builder";
redisUrl: string;
};
type LegacyTaskContext = {
taskId: string;
userId?: string;
action?: string;
params?: Record<string, unknown>;
userContext?: Record<string, unknown>;
sessionStart?: boolean;
streamEntryId: string;
seenAt: string;
};
const legacyTaskContexts = new Map<string, LegacyTaskContext>();
async function loadRedisCreateClient(): Promise<(opts: { url: string }) => RedisClientLike> {
const dynamicImport = Function("specifier", "return import(specifier)") as (specifier: string) => Promise<unknown>;
const mod = (await dynamicImport("redis")) as { createClient?: (opts: { url: string }) => RedisClientLike };
@@ -21,7 +48,7 @@ async function loadRedisCreateClient(): Promise<(opts: { url: string }) => Redis
return mod.createClient;
}
function parseFieldValue(value: string): unknown {
function parseFieldValue(value: string | undefined): unknown {
if (!value) return value;
if ((value.startsWith("{") && value.endsWith("}")) || (value.startsWith("[") && value.endsWith("]"))) {
try { return JSON.parse(value); } catch { return value; }
@@ -29,6 +56,14 @@ function parseFieldValue(value: string): unknown {
return value;
}
function asRecord(value: unknown): Record<string, unknown> {
return value && typeof value === "object" && !Array.isArray(value) ? value as Record<string, unknown> : {};
}
function getString(value: unknown): string | undefined {
return typeof value === "string" && value.trim() ? value.trim() : undefined;
}
function fieldsToEvent(fields: Record<string, string>, stream: string) {
const payload = fields.payload ? parseFieldValue(fields.payload) : parseFieldValue(fields.data ?? "{}");
const correlation = fields.correlation ? parseFieldValue(fields.correlation) : undefined;
@@ -51,13 +86,188 @@ function fieldsToEvent(fields: Record<string, string>, stream: string) {
};
}
export async function startGrowEventsRedisConsumer() {
function serviceSpecs(): ServiceRedisSpec[] {
const specs: ServiceRedisSpec[] = [
{ serviceId: "interview", agentName: "interview-service", redisUrl: config.interviewRedisUrl },
{ serviceId: "roleplay", agentName: "roleplay-service", redisUrl: config.roleplayRedisUrl },
{ serviceId: "resume", agentName: "resume-builder", redisUrl: config.resumeRedisUrl },
];
return specs.filter((spec) => Boolean(spec.redisUrl));
}
function actionToEventType(serviceId: ServiceRedisSpec["serviceId"], action: string | undefined, message: Record<string, unknown>) {
const msgAction = getString(message.action);
const effective = msgAction || action || "event";
if (serviceId === "interview") {
if (effective === "interview_configured" || action === "configure_interview") return "interview.configured";
if (effective === "review_loaded") {
const data = asRecord(message.data);
return data.status === "completed" ? "interview.review_completed" : "interview.review_processing";
}
if (effective === "interview_page_loaded") return "interview.page_state_loaded";
return `interview.${effective.replaceAll("_", ".")}`;
}
if (serviceId === "roleplay") {
if (effective === "roleplay_configured" || action === "configure_roleplay") return "roleplay.configured";
if (effective === "roleplay_review_loaded" || effective === "review_loaded") {
const data = asRecord(message.data);
return data.status === "completed" ? "roleplay.review_completed" : "roleplay.review_processing";
}
if (effective === "roleplay_page_loaded") return "roleplay.page_state_loaded";
return `roleplay.${effective.replaceAll("_", ".")}`;
}
if (effective === "ai_analysis_complete" || action === "ai_analyze") return "resume.analysis_completed";
if (effective === "resume_loaded") return "resume.loaded";
if (effective === "resume_parsed") return "resume.parsed";
return `resume.${effective.replaceAll("_", ".")}`;
}
function extractSessionId(message: Record<string, unknown>, ctx?: LegacyTaskContext): string | undefined {
const data = asRecord(message.data);
const params = ctx?.params ?? {};
return getString(
data.session_id ?? data.sessionId ?? data.id ??
params.session_id ?? params.sessionId ??
data.review_session_id,
);
}
function extractResumeId(message: Record<string, unknown>, ctx?: LegacyTaskContext): string | undefined {
const data = asRecord(message.data);
const params = ctx?.params ?? {};
return getString(
data.resume_id ?? data.resumeId ??
asRecord(data.resume).id ??
params.resume_id ?? params.resumeId,
);
}
async function recordAndRoute(input: unknown) {
const event = await recordGrowEvent(input);
await routeGrowEventToUserActor(event).catch((err) => {
log.warn({ err, eventId: event.id, userId: event.userId }, "failed to route grow event to user actor");
});
return event;
}
async function handleLegacyResponseMessage(spec: ServiceRedisSpec, channel: string, raw: string) {
const taskId = channel.replace(/^responses:/, "");
const ctx = legacyTaskContexts.get(taskId);
const parsed = parseFieldValue(raw);
const message = asRecord(parsed);
const type = getString(message.type);
if (!type || type === "__task_complete__") return;
const data = asRecord(message.data);
const eventType = actionToEventType(spec.serviceId, ctx?.action, message);
const sessionId = extractSessionId(message, ctx);
const resumeId = extractResumeId(message, ctx);
await recordAndRoute({
id: randomUUID(),
source: `${spec.agentName}:legacy-a2a`,
type: eventType,
category: type === "agent_error" ? "system" : "service",
userId: ctx?.userId,
occurredAt: new Date().toISOString(),
correlation: {
taskId,
action: ctx?.action,
sessionId,
resumeId,
externalId: sessionId ?? resumeId,
},
payload: {
action: ctx?.action,
params: ctx?.params,
message,
data,
},
raw: { channel, message },
dedupeKey: `legacy-a2a:${spec.agentName}:${taskId}:${eventType}:${JSON.stringify(message).slice(0, 512)}`,
});
}
function parseLegacyTask(entryId: string, fields: Record<string, string>): LegacyTaskContext | null {
const taskId = getString(fields.task_id);
const payload = asRecord(parseFieldValue(fields.payload));
if (!taskId) return null;
return {
taskId,
userId: getString(payload.user_id ?? fields.user_id),
action: getString(payload.action),
params: asRecord(payload.params),
userContext: asRecord(payload.user_context),
sessionStart: Boolean(payload.session_start),
streamEntryId: entryId,
seenAt: new Date().toISOString(),
};
}
async function startLegacyServiceObserver(spec: ServiceRedisSpec, createClient: (opts: { url: string }) => RedisClientLike) {
const taskStream = `tasks:${spec.agentName}`;
const consumer = `${config.growEventsConsumerName}-${spec.agentName}`;
const group = `${config.legacyServiceTaskObserverGroup}:${spec.agentName}`;
const redis = createClient({ url: spec.redisUrl });
redis.on("error", (err) => log.warn({ err, service: spec.agentName }, "legacy service task observer redis error"));
await redis.connect();
try {
await redis.xGroupCreate(taskStream, group, "0", { MKSTREAM: true });
log.info({ taskStream, group }, "created legacy service task observer group");
} catch (err) {
if (!String(err).includes("BUSYGROUP")) throw err;
}
const subscriber = redis.duplicate ? redis.duplicate() : createClient({ url: spec.redisUrl });
subscriber.on("error", (err) => log.warn({ err, service: spec.agentName }, "legacy service response subscriber redis error"));
await subscriber.connect();
if (!subscriber.pSubscribe) {
log.warn({ service: spec.agentName }, "redis client does not support pSubscribe; legacy response observation disabled");
} else {
await subscriber.pSubscribe("responses:*", async (message, channel) => {
await handleLegacyResponseMessage(spec, channel, message).catch((err) => {
log.error({ err, service: spec.agentName, channel }, "failed to ingest legacy service response");
});
});
}
log.info({ service: spec.agentName, taskStream, group }, "legacy service Redis observer started");
void (async () => {
while (true) {
try {
const response = await redis.xReadGroup(group, consumer, { key: taskStream, id: ">" }, { COUNT: 100, BLOCK: 5000 });
if (!response) continue;
for (const stream of response) {
for (const message of stream.messages) {
const ctx = parseLegacyTask(message.id, message.message);
if (ctx) legacyTaskContexts.set(ctx.taskId, ctx);
await redis.xAck(stream.name, group, message.id);
}
}
} catch (err) {
if (String(err).includes("NOGROUP")) {
try { await redis.xGroupCreate(taskStream, group, "0", { MKSTREAM: true }); } catch {}
} else {
log.error({ err, service: spec.agentName }, "legacy service task observer loop error");
}
await new Promise((resolve) => setTimeout(resolve, 2000));
}
}
})();
}
async function startCanonicalGrowEventStream(createClient: (opts: { url: string }) => RedisClientLike) {
if (!config.growEventsRedisUrl) {
log.info("grow events Redis consumer disabled (GROW_EVENTS_REDIS_URL/REDIS_URL not set)");
return;
}
const createClient = await loadRedisCreateClient();
const redis = createClient({ url: config.growEventsRedisUrl });
redis.on("error", (err) => log.warn({ err }, "grow events redis client error"));
await redis.connect();
@@ -84,8 +294,7 @@ export async function startGrowEventsRedisConsumer() {
for (const stream of response) {
for (const message of stream.messages) {
try {
const event = await recordGrowEvent(fieldsToEvent(message.message, stream.name));
await routeGrowEventToUserActor(event);
await recordAndRoute(fieldsToEvent(message.message, stream.name));
} catch (err) {
log.error({ err, messageId: message.id }, "failed to ingest grow event stream message");
} finally {
@@ -100,3 +309,16 @@ export async function startGrowEventsRedisConsumer() {
}
})();
}
export async function startGrowEventsRedisConsumer() {
const createClient = await loadRedisCreateClient();
await startCanonicalGrowEventStream(createClient);
const specs = serviceSpecs();
if (!specs.length) {
log.info("legacy service Redis observers disabled (INTERVIEW_REDIS_URL/ROLEPLAY_REDIS_URL/RESUME_REDIS_URL not set)");
return;
}
await Promise.all(specs.map((spec) => startLegacyServiceObserver(spec, createClient)));
}