325 lines
13 KiB
TypeScript
325 lines
13 KiB
TypeScript
import { randomUUID } from "node:crypto";
|
|
import { config } from "../config.js";
|
|
import { log } from "../log.js";
|
|
import { recordGrowEvent } from "./record-grow-event.js";
|
|
import { routeGrowEventToUserActor } from "./route-to-user-actor.js";
|
|
|
|
// This file has two Redis ingestion modes:
|
|
// 1. Canonical GrowEvent stream: grow.events.raw — future service event bus.
|
|
// 2. Legacy A2A observer: watches existing tasks:{service} streams + responses:* pub/sub
|
|
// so backend can capture current service emissions without service changes.
|
|
|
|
type RedisMessage = { id: string; message: Record<string, string> };
|
|
type RedisStreamResponse = Array<{ name: string; messages: RedisMessage[] }>;
|
|
|
|
type RedisClientLike = {
|
|
connect: () => Promise<void>;
|
|
duplicate?: () => RedisClientLike;
|
|
on: (event: string, listener: (err: unknown) => void) => void;
|
|
xGroupCreate: (stream: string, group: string, id: string, opts?: Record<string, unknown>) => Promise<unknown>;
|
|
xReadGroup: (group: string, consumer: string, streams: { key: string; id: string }, opts?: Record<string, unknown>) => Promise<RedisStreamResponse | null>;
|
|
xAck: (stream: string, group: string, id: string) => Promise<unknown>;
|
|
pSubscribe?: (pattern: string, listener: (message: string, channel: string) => void | Promise<void>) => Promise<unknown>;
|
|
};
|
|
|
|
type ServiceRedisSpec = {
|
|
serviceId: "interview" | "roleplay" | "resume";
|
|
agentName: "interview-service" | "roleplay-service" | "resume-builder";
|
|
redisUrl: string;
|
|
};
|
|
|
|
type LegacyTaskContext = {
|
|
taskId: string;
|
|
userId?: string;
|
|
action?: string;
|
|
params?: Record<string, unknown>;
|
|
userContext?: Record<string, unknown>;
|
|
sessionStart?: boolean;
|
|
streamEntryId: string;
|
|
seenAt: string;
|
|
};
|
|
|
|
const legacyTaskContexts = new Map<string, LegacyTaskContext>();
|
|
|
|
async function loadRedisCreateClient(): Promise<(opts: { url: string }) => RedisClientLike> {
|
|
const dynamicImport = Function("specifier", "return import(specifier)") as (specifier: string) => Promise<unknown>;
|
|
const mod = (await dynamicImport("redis")) as { createClient?: (opts: { url: string }) => RedisClientLike };
|
|
if (!mod.createClient) throw new Error("redis package did not expose createClient");
|
|
return mod.createClient;
|
|
}
|
|
|
|
function parseFieldValue(value: string | undefined): unknown {
|
|
if (!value) return value;
|
|
if ((value.startsWith("{") && value.endsWith("}")) || (value.startsWith("[") && value.endsWith("]"))) {
|
|
try { return JSON.parse(value); } catch { return value; }
|
|
}
|
|
return value;
|
|
}
|
|
|
|
function asRecord(value: unknown): Record<string, unknown> {
|
|
return value && typeof value === "object" && !Array.isArray(value) ? value as Record<string, unknown> : {};
|
|
}
|
|
|
|
function getString(value: unknown): string | undefined {
|
|
return typeof value === "string" && value.trim() ? value.trim() : undefined;
|
|
}
|
|
|
|
function fieldsToEvent(fields: Record<string, string>, stream: string) {
|
|
const payload = fields.payload ? parseFieldValue(fields.payload) : parseFieldValue(fields.data ?? "{}");
|
|
const correlation = fields.correlation ? parseFieldValue(fields.correlation) : undefined;
|
|
const subject = fields.subject ? parseFieldValue(fields.subject) : undefined;
|
|
const mission = fields.mission ? parseFieldValue(fields.mission) : undefined;
|
|
return {
|
|
id: fields.id,
|
|
source: fields.source ?? stream,
|
|
type: fields.type ?? fields.event_type ?? "service.event",
|
|
category: fields.category ?? "service",
|
|
userId: fields.userId ?? fields.user_id,
|
|
orgId: fields.orgId ?? fields.org_id,
|
|
occurredAt: fields.occurredAt ?? fields.occurred_at ?? fields.timestamp,
|
|
mission,
|
|
subject,
|
|
correlation,
|
|
payload: payload && typeof payload === "object" && !Array.isArray(payload) ? payload : { value: payload },
|
|
raw: fields,
|
|
dedupeKey: fields.dedupeKey ?? fields.dedupe_key,
|
|
};
|
|
}
|
|
|
|
function serviceSpecs(): ServiceRedisSpec[] {
|
|
const specs: ServiceRedisSpec[] = [
|
|
{ serviceId: "interview", agentName: "interview-service", redisUrl: config.interviewRedisUrl },
|
|
{ serviceId: "roleplay", agentName: "roleplay-service", redisUrl: config.roleplayRedisUrl },
|
|
{ serviceId: "resume", agentName: "resume-builder", redisUrl: config.resumeRedisUrl },
|
|
];
|
|
return specs.filter((spec) => Boolean(spec.redisUrl));
|
|
}
|
|
|
|
function actionToEventType(serviceId: ServiceRedisSpec["serviceId"], action: string | undefined, message: Record<string, unknown>) {
|
|
const msgAction = getString(message.action);
|
|
const effective = msgAction || action || "event";
|
|
|
|
if (serviceId === "interview") {
|
|
if (effective === "interview_configured" || action === "configure_interview") return "interview.configured";
|
|
if (effective === "review_loaded") {
|
|
const data = asRecord(message.data);
|
|
return data.status === "completed" ? "interview.review_completed" : "interview.review_processing";
|
|
}
|
|
if (effective === "interview_page_loaded") return "interview.page_state_loaded";
|
|
return `interview.${effective.replaceAll("_", ".")}`;
|
|
}
|
|
|
|
if (serviceId === "roleplay") {
|
|
if (effective === "roleplay_configured" || action === "configure_roleplay") return "roleplay.configured";
|
|
if (effective === "roleplay_review_loaded" || effective === "review_loaded") {
|
|
const data = asRecord(message.data);
|
|
return data.status === "completed" ? "roleplay.review_completed" : "roleplay.review_processing";
|
|
}
|
|
if (effective === "roleplay_page_loaded") return "roleplay.page_state_loaded";
|
|
return `roleplay.${effective.replaceAll("_", ".")}`;
|
|
}
|
|
|
|
if (effective === "ai_analysis_complete" || action === "ai_analyze") return "resume.analysis_completed";
|
|
if (effective === "resume_loaded") return "resume.loaded";
|
|
if (effective === "resume_parsed") return "resume.parsed";
|
|
return `resume.${effective.replaceAll("_", ".")}`;
|
|
}
|
|
|
|
function extractSessionId(message: Record<string, unknown>, ctx?: LegacyTaskContext): string | undefined {
|
|
const data = asRecord(message.data);
|
|
const params = ctx?.params ?? {};
|
|
return getString(
|
|
data.session_id ?? data.sessionId ?? data.id ??
|
|
params.session_id ?? params.sessionId ??
|
|
data.review_session_id,
|
|
);
|
|
}
|
|
|
|
function extractResumeId(message: Record<string, unknown>, ctx?: LegacyTaskContext): string | undefined {
|
|
const data = asRecord(message.data);
|
|
const params = ctx?.params ?? {};
|
|
return getString(
|
|
data.resume_id ?? data.resumeId ??
|
|
asRecord(data.resume).id ??
|
|
params.resume_id ?? params.resumeId,
|
|
);
|
|
}
|
|
|
|
async function recordAndRoute(input: unknown) {
|
|
const event = await recordGrowEvent(input);
|
|
await routeGrowEventToUserActor(event).catch((err) => {
|
|
log.warn({ err, eventId: event.id, userId: event.userId }, "failed to route grow event to user actor");
|
|
});
|
|
return event;
|
|
}
|
|
|
|
async function handleLegacyResponseMessage(spec: ServiceRedisSpec, channel: string, raw: string) {
|
|
const taskId = channel.replace(/^responses:/, "");
|
|
const ctx = legacyTaskContexts.get(taskId);
|
|
const parsed = parseFieldValue(raw);
|
|
const message = asRecord(parsed);
|
|
const type = getString(message.type);
|
|
if (!type || type === "__task_complete__") return;
|
|
|
|
const data = asRecord(message.data);
|
|
const eventType = actionToEventType(spec.serviceId, ctx?.action, message);
|
|
const sessionId = extractSessionId(message, ctx);
|
|
const resumeId = extractResumeId(message, ctx);
|
|
|
|
await recordAndRoute({
|
|
id: randomUUID(),
|
|
source: `${spec.agentName}:legacy-a2a`,
|
|
type: eventType,
|
|
category: type === "agent_error" ? "system" : "service",
|
|
userId: ctx?.userId,
|
|
occurredAt: new Date().toISOString(),
|
|
correlation: {
|
|
taskId,
|
|
action: ctx?.action,
|
|
sessionId,
|
|
resumeId,
|
|
externalId: sessionId ?? resumeId,
|
|
},
|
|
payload: {
|
|
action: ctx?.action,
|
|
params: ctx?.params,
|
|
message,
|
|
data,
|
|
},
|
|
raw: { channel, message },
|
|
dedupeKey: `legacy-a2a:${spec.agentName}:${taskId}:${eventType}:${JSON.stringify(message).slice(0, 512)}`,
|
|
});
|
|
}
|
|
|
|
function parseLegacyTask(entryId: string, fields: Record<string, string>): LegacyTaskContext | null {
|
|
const taskId = getString(fields.task_id);
|
|
const payload = asRecord(parseFieldValue(fields.payload));
|
|
if (!taskId) return null;
|
|
return {
|
|
taskId,
|
|
userId: getString(payload.user_id ?? fields.user_id),
|
|
action: getString(payload.action),
|
|
params: asRecord(payload.params),
|
|
userContext: asRecord(payload.user_context),
|
|
sessionStart: Boolean(payload.session_start),
|
|
streamEntryId: entryId,
|
|
seenAt: new Date().toISOString(),
|
|
};
|
|
}
|
|
|
|
async function startLegacyServiceObserver(spec: ServiceRedisSpec, createClient: (opts: { url: string }) => RedisClientLike) {
|
|
const taskStream = `tasks:${spec.agentName}`;
|
|
const consumer = `${config.growEventsConsumerName}-${spec.agentName}`;
|
|
const group = `${config.legacyServiceTaskObserverGroup}:${spec.agentName}`;
|
|
|
|
const redis = createClient({ url: spec.redisUrl });
|
|
redis.on("error", (err) => log.warn({ err, service: spec.agentName }, "legacy service task observer redis error"));
|
|
await redis.connect();
|
|
|
|
try {
|
|
await redis.xGroupCreate(taskStream, group, "0", { MKSTREAM: true });
|
|
log.info({ taskStream, group }, "created legacy service task observer group");
|
|
} catch (err) {
|
|
if (!String(err).includes("BUSYGROUP")) throw err;
|
|
}
|
|
|
|
const subscriber = redis.duplicate ? redis.duplicate() : createClient({ url: spec.redisUrl });
|
|
subscriber.on("error", (err) => log.warn({ err, service: spec.agentName }, "legacy service response subscriber redis error"));
|
|
await subscriber.connect();
|
|
if (!subscriber.pSubscribe) {
|
|
log.warn({ service: spec.agentName }, "redis client does not support pSubscribe; legacy response observation disabled");
|
|
} else {
|
|
await subscriber.pSubscribe("responses:*", async (message, channel) => {
|
|
await handleLegacyResponseMessage(spec, channel, message).catch((err) => {
|
|
log.error({ err, service: spec.agentName, channel }, "failed to ingest legacy service response");
|
|
});
|
|
});
|
|
}
|
|
|
|
log.info({ service: spec.agentName, taskStream, group }, "legacy service Redis observer started");
|
|
|
|
void (async () => {
|
|
while (true) {
|
|
try {
|
|
const response = await redis.xReadGroup(group, consumer, { key: taskStream, id: ">" }, { COUNT: 100, BLOCK: 5000 });
|
|
if (!response) continue;
|
|
for (const stream of response) {
|
|
for (const message of stream.messages) {
|
|
const ctx = parseLegacyTask(message.id, message.message);
|
|
if (ctx) legacyTaskContexts.set(ctx.taskId, ctx);
|
|
await redis.xAck(stream.name, group, message.id);
|
|
}
|
|
}
|
|
} catch (err) {
|
|
if (String(err).includes("NOGROUP")) {
|
|
try { await redis.xGroupCreate(taskStream, group, "0", { MKSTREAM: true }); } catch {}
|
|
} else {
|
|
log.error({ err, service: spec.agentName }, "legacy service task observer loop error");
|
|
}
|
|
await new Promise((resolve) => setTimeout(resolve, 2000));
|
|
}
|
|
}
|
|
})();
|
|
}
|
|
|
|
async function startCanonicalGrowEventStream(createClient: (opts: { url: string }) => RedisClientLike) {
|
|
if (!config.growEventsRedisUrl) {
|
|
log.info("grow events Redis consumer disabled (GROW_EVENTS_REDIS_URL/REDIS_URL not set)");
|
|
return;
|
|
}
|
|
|
|
const redis = createClient({ url: config.growEventsRedisUrl });
|
|
redis.on("error", (err) => log.warn({ err }, "grow events redis client error"));
|
|
await redis.connect();
|
|
|
|
try {
|
|
await redis.xGroupCreate(config.growEventsStream, config.growEventsConsumerGroup, "0", { MKSTREAM: true });
|
|
log.info({ stream: config.growEventsStream, group: config.growEventsConsumerGroup }, "created grow events consumer group");
|
|
} catch (err) {
|
|
if (!String(err).includes("BUSYGROUP")) throw err;
|
|
}
|
|
|
|
log.info({ stream: config.growEventsStream, group: config.growEventsConsumerGroup }, "grow events redis consumer started");
|
|
|
|
void (async () => {
|
|
while (true) {
|
|
try {
|
|
const response = await redis.xReadGroup(
|
|
config.growEventsConsumerGroup,
|
|
config.growEventsConsumerName,
|
|
{ key: config.growEventsStream, id: ">" },
|
|
{ COUNT: 50, BLOCK: 5000 },
|
|
);
|
|
if (!response) continue;
|
|
for (const stream of response) {
|
|
for (const message of stream.messages) {
|
|
try {
|
|
await recordAndRoute(fieldsToEvent(message.message, stream.name));
|
|
} catch (err) {
|
|
log.error({ err, messageId: message.id }, "failed to ingest grow event stream message");
|
|
} finally {
|
|
await redis.xAck(stream.name, config.growEventsConsumerGroup, message.id);
|
|
}
|
|
}
|
|
}
|
|
} catch (err) {
|
|
log.error({ err }, "grow events redis consumer loop error");
|
|
await new Promise((resolve) => setTimeout(resolve, 2000));
|
|
}
|
|
}
|
|
})();
|
|
}
|
|
|
|
export async function startGrowEventsRedisConsumer() {
|
|
const createClient = await loadRedisCreateClient();
|
|
await startCanonicalGrowEventStream(createClient);
|
|
|
|
const specs = serviceSpecs();
|
|
if (!specs.length) {
|
|
log.info("legacy service Redis observers disabled (INTERVIEW_REDIS_URL/ROLEPLAY_REDIS_URL/RESUME_REDIS_URL not set)");
|
|
return;
|
|
}
|
|
|
|
await Promise.all(specs.map((spec) => startLegacyServiceObserver(spec, createClient)));
|
|
}
|