Files
growqr-backend/src/grow/persistence.ts
2026-06-05 00:40:28 +05:30

269 lines
11 KiB
TypeScript

import { asc, desc, eq, and } from "drizzle-orm";
import { db } from "../db/client.js";
import { growActiveMissions, growConversationMessages, growConversations, missionCoachRuns, missionSuggestions } from "../db/schema.js";
import type { GrowActiveMission, MissionSnapshot } from "../actors/missions/types.js";
import type { MissionSuggestion } from "../missions/suggestions.js";
import type { ConversationMessage } from "../actors/conversation/types.js";
import type { GrowConversation } from "../actors/grow/types.js";
const buildId = (prefix: string) => `${prefix}-${Date.now()}-${Math.random().toString(16).slice(2)}`;
const dateFromMs = (ms?: number) => new Date(ms && Number.isFinite(ms) ? ms : Date.now());
function toConversation(row: typeof growConversations.$inferSelect): GrowConversation {
return { id: row.id, title: row.title, createdAt: row.createdAt.getTime(), updatedAt: row.updatedAt.getTime() };
}
function toMessage(row: typeof growConversationMessages.$inferSelect): ConversationMessage {
return {
id: row.id,
conversationId: row.conversationId,
role: row.role,
sender: row.sender,
content: row.content,
createdAt: row.createdAt.getTime(),
};
}
export async function ensureConversation(userId: string, title = "Talk to Me") {
const existing = await listConversationsPg(userId);
if (existing[0]) return existing[0];
return createConversationPg(userId, title);
}
export async function listConversationsPg(userId: string): Promise<GrowConversation[]> {
const rows = await db.select().from(growConversations).where(eq(growConversations.userId, userId)).orderBy(desc(growConversations.updatedAt));
return rows.map(toConversation);
}
export async function createConversationPg(userId: string, title = "Talk to Me"): Promise<GrowConversation> {
const now = new Date();
const [row] = await db.insert(growConversations).values({ id: buildId("conversation"), userId, title, createdAt: now, updatedAt: now }).returning();
if (!row) throw new Error("Failed to create conversation");
return toConversation(row);
}
export async function getConversationPg(userId: string, conversationId: string): Promise<GrowConversation | null> {
const [row] = await db.select().from(growConversations).where(and(eq(growConversations.userId, userId), eq(growConversations.id, conversationId))).limit(1);
return row ? toConversation(row) : null;
}
export async function touchConversationPg(userId: string, conversationId: string, title?: string) {
const patch: Partial<typeof growConversations.$inferInsert> = { updatedAt: new Date() };
if (title?.trim()) patch.title = title.trim();
const [row] = await db.update(growConversations).set(patch).where(and(eq(growConversations.userId, userId), eq(growConversations.id, conversationId))).returning();
return row ? toConversation(row) : null;
}
export async function resetConversationPg(userId: string, conversationId: string) {
await db.delete(growConversationMessages).where(and(eq(growConversationMessages.userId, userId), eq(growConversationMessages.conversationId, conversationId)));
return touchConversationPg(userId, conversationId);
}
export async function listMessagesPg(userId: string, conversationId: string): Promise<ConversationMessage[]> {
const rows = await db.select().from(growConversationMessages).where(and(eq(growConversationMessages.userId, userId), eq(growConversationMessages.conversationId, conversationId))).orderBy(asc(growConversationMessages.createdAt));
return rows.map(toMessage);
}
export async function addMessagePg(userId: string, input: Omit<ConversationMessage, "createdAt"> & { createdAt?: number }) {
const createdAt = dateFromMs(input.createdAt);
const [row] = await db.insert(growConversationMessages).values({
id: input.id,
userId,
conversationId: input.conversationId,
role: input.role,
sender: input.sender,
content: input.content,
createdAt,
}).onConflictDoUpdate({
target: growConversationMessages.id,
set: { content: input.content, sender: input.sender },
}).returning();
await touchConversationPg(userId, input.conversationId);
if (!row) throw new Error("Failed to persist message");
return toMessage(row);
}
export async function upsertActiveMissionPg(userId: string, mission: GrowActiveMission, snapshot?: MissionSnapshot) {
const now = new Date();
await db.insert(growActiveMissions).values({
instanceId: mission.instanceId,
userId,
missionId: mission.missionId,
workflowId: mission.workflowId,
actorType: mission.actorType,
title: mission.title,
shortTitle: mission.shortTitle,
status: mission.status,
progressPercent: mission.progressPercent,
currentStageId: mission.currentStageId,
goal: mission.goal,
mission: mission as unknown as Record<string, unknown>,
snapshot: snapshot as unknown as Record<string, unknown> | undefined,
createdAt: dateFromMs(mission.createdAt),
updatedAt: dateFromMs(mission.updatedAt),
}).onConflictDoUpdate({
target: growActiveMissions.instanceId,
set: {
status: mission.status,
progressPercent: mission.progressPercent,
currentStageId: mission.currentStageId,
goal: mission.goal,
actorType: mission.actorType,
mission: mission as unknown as Record<string, unknown>,
snapshot: snapshot as unknown as Record<string, unknown> | undefined,
updatedAt: now,
},
});
}
function activeMissionFromRow(row: typeof growActiveMissions.$inferSelect): GrowActiveMission {
const raw = (row.mission ?? {}) as Partial<GrowActiveMission>;
return {
instanceId: raw.instanceId ?? row.instanceId,
missionId: raw.missionId ?? (row.missionId as GrowActiveMission["missionId"]),
workflowId: raw.workflowId ?? row.workflowId,
title: raw.title ?? row.title,
shortTitle: raw.shortTitle ?? row.shortTitle,
status: raw.status ?? (row.status as GrowActiveMission["status"]),
progressPercent: raw.progressPercent ?? row.progressPercent ?? 0,
currentStageId: raw.currentStageId ?? row.currentStageId ?? undefined,
goal: raw.goal ?? row.goal ?? undefined,
actorType: raw.actorType ?? (row.actorType as GrowActiveMission["actorType"] | undefined),
createdAt: raw.createdAt ?? row.createdAt.getTime(),
updatedAt: raw.updatedAt ?? row.updatedAt.getTime(),
};
}
function missionSnapshotFromRow(row: typeof growActiveMissions.$inferSelect): MissionSnapshot | null {
if (!row.snapshot) return null;
const raw = row.snapshot as Partial<MissionSnapshot>;
return {
...(raw as MissionSnapshot),
instanceId: raw.instanceId ?? row.instanceId,
missionId: raw.missionId ?? (row.missionId as MissionSnapshot["missionId"]),
workflowId: raw.workflowId ?? row.workflowId,
userId: raw.userId ?? row.userId,
title: raw.title ?? row.title,
shortTitle: raw.shortTitle ?? row.shortTitle,
status: raw.status ?? (row.status as MissionSnapshot["status"]),
progressPercent: raw.progressPercent ?? row.progressPercent ?? 0,
currentStageId: raw.currentStageId ?? row.currentStageId ?? undefined,
goal: raw.goal ?? row.goal ?? undefined,
stages: raw.stages ?? [],
artifacts: raw.artifacts ?? [],
events: raw.events ?? [],
createdAt: raw.createdAt ?? row.createdAt.toISOString(),
updatedAt: raw.updatedAt ?? row.updatedAt.toISOString(),
};
}
export async function listActiveMissionsPg(userId: string) {
const rows = await db.select().from(growActiveMissions).where(eq(growActiveMissions.userId, userId)).orderBy(desc(growActiveMissions.updatedAt));
return rows.map((row) => ({ mission: activeMissionFromRow(row), snapshot: missionSnapshotFromRow(row) }));
}
export async function getActiveMissionPg(userId: string, instanceId: string) {
const [row] = await db.select().from(growActiveMissions).where(and(eq(growActiveMissions.userId, userId), eq(growActiveMissions.instanceId, instanceId))).limit(1);
return row ? { mission: activeMissionFromRow(row), snapshot: missionSnapshotFromRow(row) } : null;
}
export async function listMissionSuggestionsPg(userId: string, instanceId: string | undefined | null): Promise<MissionSuggestion[]> {
if (!instanceId) return [];
const rows = await db
.select()
.from(missionSuggestions)
.where(and(eq(missionSuggestions.userId, userId), eq(missionSuggestions.missionInstanceId, instanceId), eq(missionSuggestions.status, "active")))
.orderBy(desc(missionSuggestions.priority), desc(missionSuggestions.updatedAt));
return rows.map((row) => ({
id: row.id,
userId: row.userId,
missionInstanceId: row.missionInstanceId,
missionId: row.missionId,
stageId: row.stageId ?? undefined,
role: row.role,
type: row.type,
title: row.title,
body: row.body,
reason: row.reason ?? undefined,
priority: row.priority,
urgency: row.urgency,
status: row.status,
ctaLabel: row.ctaLabel,
ctaHref: row.ctaHref,
sourceRefs: row.sourceRefs ?? {},
generatedBy: row.generatedBy,
createdAt: row.createdAt.toISOString(),
updatedAt: row.updatedAt.toISOString(),
expiresAt: row.expiresAt?.toISOString(),
}));
}
export async function replaceMissionSuggestionsPg(input: {
userId: string;
missionInstanceId: string;
missionId: string;
coachRunId?: string;
suggestions: Array<Omit<MissionSuggestion, "id" | "userId" | "missionInstanceId" | "missionId" | "status" | "createdAt" | "updatedAt"> & { id?: string }>;
}) {
const now = new Date();
await db.update(missionSuggestions)
.set({ status: "expired", updatedAt: now })
.where(and(eq(missionSuggestions.userId, input.userId), eq(missionSuggestions.missionInstanceId, input.missionInstanceId), eq(missionSuggestions.status, "active")));
if (!input.suggestions.length) return [];
await db.insert(missionSuggestions).values(input.suggestions.map((suggestion) => ({
id: suggestion.id,
userId: input.userId,
missionInstanceId: input.missionInstanceId,
missionId: input.missionId,
stageId: suggestion.stageId,
role: suggestion.role,
type: suggestion.type,
title: suggestion.title,
body: suggestion.body,
reason: suggestion.reason,
priority: suggestion.priority,
urgency: suggestion.urgency,
status: "active" as const,
ctaLabel: suggestion.ctaLabel,
ctaHref: suggestion.ctaHref,
sourceRefs: { ...(suggestion.sourceRefs ?? {}), coachRunId: input.coachRunId },
generatedBy: suggestion.generatedBy ?? "deterministic",
expiresAt: suggestion.expiresAt ? new Date(suggestion.expiresAt) : undefined,
createdAt: now,
updatedAt: now,
})));
return listMissionSuggestionsPg(input.userId, input.missionInstanceId);
}
export async function createMissionCoachRunPg(input: {
userId: string;
missionInstanceId: string;
missionId: string;
windowStart: Date;
windowEnd: Date;
inputDigest: Record<string, unknown>;
skillVersion?: string;
}) {
const [row] = await db.insert(missionCoachRuns).values({
userId: input.userId,
missionInstanceId: input.missionInstanceId,
missionId: input.missionId,
windowStart: input.windowStart,
windowEnd: input.windowEnd,
inputDigest: input.inputDigest,
skillVersion: input.skillVersion,
}).returning();
if (!row) throw new Error("Failed to create mission coach run");
return row;
}
export async function completeMissionCoachRunPg(input: { id: string; summary: string; output: Record<string, unknown> }) {
await db.update(missionCoachRuns).set({
status: "completed",
summary: input.summary,
output: input.output,
completedAt: new Date(),
}).where(eq(missionCoachRuns.id, input.id));
}