update source code (src) (14 files)

This commit is contained in:
-Puter
2026-06-01 20:58:55 +05:30
parent 4a4a03ebb9
commit f9f69653e3
20 changed files with 1011 additions and 165 deletions

View File

@@ -1,11 +1,13 @@
import { setup } from "rivetkit";
import { userActor } from "./user-actor.js";
import { workflowRunActor } from "./workflow-run-actor.js";
// Per changes.md §5: ONE unified actor per user.
// No separate growAgent, subAgent, or workflowJob actors.
export const registry = setup({
use: {
userActor,
workflowRunActor,
},
});

View File

@@ -23,8 +23,12 @@ import {
syncWorkspaceToGit,
} from "../docker/manager.js";
import { db } from "../db/client.js";
import { actors as actorsTable, events as eventsTable } from "../db/schema.js";
import { actors as actorsTable, events as eventsTable, type UserStack } from "../db/schema.js";
import { createChatCompletion, type LlmMessage, type LlmToolCall } from "../lib/llm.js";
import { getWorkflowDefinition } from "../workflows/registry.js";
import { workflowRunModules, workflowArtifacts, workflowEvents } from "../db/schema.js";
import { eq, and } from "drizzle-orm";
import { prepareOpenCodeWorkflowModule } from "../workflows/executors/opencode-executor.js";
// ── Types ──
@@ -35,8 +39,13 @@ type ChatTurn = {
toolCalls?: LlmToolCall[];
};
function publicStack(stack: UserStack) {
const { opencodePassword: _opencodePassword, ...safe } = stack;
return safe;
}
type WorkflowStatus = "draft" | "running" | "paused" | "completed";
type ModuleStatus = "idle" | "running" | "blocked" | "done";
type ModuleStatus = "idle" | "running" | "blocked" | "done" | "manual_required" | "waiting_for_input" | "opencode_required" | "coming_soon";
type Scorecard = {
id: string;
@@ -78,6 +87,7 @@ type UserActorState = {
workflowId: string;
workflowStatus: WorkflowStatus;
workflowGoal: string;
workflowRunId?: string;
modules: WorkflowModuleState[];
timeline: WorkflowEvent[];
createdAt: string;
@@ -318,19 +328,15 @@ function messagesForApi(history: ChatTurn[]): LlmMessage[] {
// ── Workflow helpers ──
function makeModules(): WorkflowModuleState[] {
function makeModules(workflowId = "interview-to-offer"): WorkflowModuleState[] {
const def = getWorkflowDefinition(workflowId);
if (def) {
return def.modules.map((m) => ({ id: m.id, name: m.title, role: m.role, service: m.service, status: "idle" as ModuleStatus, summary: m.description, scorecards: [] }));
}
return jobApplicationModuleIds()
.map((id) => getSubAgentModule(id))
.filter((m): m is SubAgentModule => Boolean(m))
.map((m) => ({
id: m.id,
name: m.name,
role: m.role,
service: m.service,
status: "idle" as ModuleStatus,
summary: m.description,
scorecards: [],
}));
.map((m) => ({ id: m.id, name: m.name, role: m.role, service: m.service, status: "idle" as ModuleStatus, summary: m.description, scorecards: [] }));
}
function appendTimelineEvent(
@@ -410,7 +416,7 @@ export const userActor = actor({
prompt: stack.promptVersion,
},
});
return stack;
return publicStack(stack);
},
shutdown: async (c) => {
@@ -533,9 +539,9 @@ export const userActor = actor({
status: m.status,
sessionId: detail?.session_id as string | undefined,
sessionUrl: m.service === "interview-service"
? `http://localhost:8007/api/v1/demo?session_id=${detail?.session_id ?? ""}`
? `${config.interviewPublicUrl.replace(/\/$/, "")}/api/v1/demo?session_id=${detail?.session_id ?? ""}`
: m.service === "roleplay-service"
? `http://localhost:8008/api/v1/demo?session_id=${detail?.session_id ?? ""}`
? `${config.roleplayPublicUrl.replace(/\/$/, "")}/api/v1/demo?session_id=${detail?.session_id ?? ""}`
: undefined,
summary: m.lastResult?.summary,
};
@@ -545,12 +551,14 @@ export const userActor = actor({
// ── Workflow (was workflowJob actor, now part of user actor — changes.md §5) ──
startWorkflow: async (c, input: { goal?: string }) => {
startWorkflow: async (c, input: { workflowId?: string; runId?: string; goal?: string; input?: Record<string, unknown> }) => {
const workflowId = input.workflowId ?? "interview-to-offer";
const goal = input.goal ?? "Find and apply to high-fit jobs";
c.state.workflowId = `job-application:${c.state.userId}`;
c.state.workflowId = `${workflowId}:${c.state.userId}`;
c.state.workflowRunId = input.runId;
c.state.workflowStatus = "running";
c.state.workflowGoal = goal;
c.state.modules = makeModules();
c.state.modules = makeModules(workflowId);
c.state.createdAt = now();
c.state.updatedAt = now();
@@ -558,7 +566,7 @@ export const userActor = actor({
c.state,
{ id: "grow", name: "Grow Agent" },
"workflow",
"Job application workflow started.",
`${getWorkflowDefinition(workflowId)?.title ?? "Workflow"} started.`,
);
c.broadcast("workflow.updated", {
workflowId: c.state.workflowId,
@@ -586,7 +594,7 @@ export const userActor = actor({
return c.state;
},
runWorkflowModule: async (c, input: { moduleId: string }) => {
runWorkflowModule: async (c, input: { moduleId: string; runId?: string }) => {
const mod = c.state.modules.find((m) => m.id === input.moduleId);
if (!mod) throw new Error(`Unknown workflow module: ${input.moduleId}`);
@@ -594,18 +602,25 @@ export const userActor = actor({
appendTimelineEvent(c.state, mod, "module", `${mod.name} started.`);
c.broadcast("workflow.updated", workflowSnapshot(c.state));
const workflowKey = c.state.workflowId.split(":")[0] || "interview-to-offer";
const defModule = getWorkflowDefinition(workflowKey)?.modules.find((m) => m.id === mod.id);
const subModule = getSubAgentModule(mod.id);
if (subModule?.service) {
const service = defModule?.service ?? subModule?.service;
if (service) {
const userId = c.state.userId;
const goal = c.state.workflowGoal;
c.waitUntil(
(async () => {
const result = await runServiceAgentProbe(
{ id: subModule.id, name: subModule.name, role: subModule.role, kind: "microservice", description: subModule.description, service: subModule.service },
{ id: mod.id, name: mod.name, role: mod.role, kind: "microservice", description: mod.summary, service },
{ userId, goal },
);
mod.lastResult = result;
mod.status = result.status === "unavailable" ? "blocked" : "done";
if (input.runId) {
await db.update(workflowRunModules).set({ status: mod.status, outputSummary: result.summary, output: result.detail as Record<string, unknown> | undefined, completedAt: new Date() }).where(and(eq(workflowRunModules.runId, input.runId), eq(workflowRunModules.moduleId, mod.id)));
await db.insert(workflowEvents).values({ runId: input.runId, userId, type: mod.status === "done" ? "module.completed" : "module.blocked", payload: { moduleId: mod.id, summary: result.summary, detail: result.detail } });
}
appendTimelineEvent(c.state, mod, "module", result.summary, result.detail);
c.broadcast("workflow.updated", workflowSnapshot(c.state));
await c.saveState({ immediate: true });
@@ -614,12 +629,28 @@ export const userActor = actor({
return c.state;
}
// Local workflow modules
mod.lastResult = {
status: "local",
summary: `${mod.name} completed a local workflow step for "${c.state.workflowGoal}".`,
};
mod.status = "done";
// Never fake success for non-service modules. They must execute via a real
// service/OpenCode path, be approved by a human, or report an honest blocked/manual status.
if (defModule?.execution === "opencode" && input.runId) {
const workflow = getWorkflowDefinition(workflowKey);
if (workflow) {
const prepared = await prepareOpenCodeWorkflowModule({ userId: c.state.userId, runId: input.runId, workflow, module: defModule, goal: c.state.workflowGoal });
mod.lastResult = { status: prepared.status === "blocked_service_unavailable" ? "unavailable" : prepared.status, summary: prepared.summary, detail: { artifacts: prepared.artifacts } };
mod.status = prepared.status === "ok" ? "done" : prepared.status === "blocked_service_unavailable" ? "blocked" : "opencode_required";
await db.insert(workflowArtifacts).values(prepared.artifacts.map((a) => ({ runId: input.runId!, moduleId: mod.id, type: a.type, title: a.title, repoPath: a.repoPath, metadata: a.metadata })));
await db.update(workflowRunModules).set({ status: mod.status, outputSummary: prepared.summary, output: { artifacts: prepared.artifacts }, completedAt: new Date() }).where(and(eq(workflowRunModules.runId, input.runId), eq(workflowRunModules.moduleId, mod.id)));
await db.insert(workflowEvents).values({ runId: input.runId, userId: c.state.userId, type: prepared.status === "ok" ? "artifact.generated" : "artifact.contract_created", payload: { moduleId: mod.id, artifacts: prepared.artifacts, status: prepared.status } });
appendTimelineEvent(c.state, mod, "module", prepared.summary, { artifacts: prepared.artifacts });
c.broadcast("workflow.updated", workflowSnapshot(c.state));
return c.state;
}
}
mod.lastResult = { status: defModule?.execution === "coming_soon" ? "coming_soon" : "manual_required", summary: defModule?.execution === "opencode" ? `${mod.name} requires OpenCode artifact execution.` : `${mod.name} requires manual input or is not available yet.` };
mod.status = defModule?.execution === "opencode" ? "opencode_required" : defModule?.execution === "coming_soon" ? "coming_soon" : "manual_required";
if (input.runId) {
await db.update(workflowRunModules).set({ status: mod.status, outputSummary: mod.lastResult.summary, completedAt: new Date() }).where(and(eq(workflowRunModules.runId, input.runId), eq(workflowRunModules.moduleId, mod.id)));
await db.insert(workflowEvents).values({ runId: input.runId, userId: c.state.userId, type: "module.blocked", payload: { moduleId: mod.id, status: mod.status, summary: mod.lastResult.summary } });
}
appendTimelineEvent(c.state, mod, "module", mod.lastResult.summary);
c.broadcast("workflow.updated", workflowSnapshot(c.state));
return c.state;
@@ -706,7 +737,7 @@ async function dispatchUnifiedTool(
if (!client || !stack?.giteaRepoOwner || !stack.giteaRepoName) return [];
try {
const res = await fetch(
`${config.giteaUrl}/api/v1/repos/${encodeURIComponent(stack.giteaRepoOwner)}/${encodeURIComponent(stack.giteaRepoName)}/contents/${encodeURI(pathPrefix)}`,
`${config.giteaInternalUrl}/api/v1/repos/${encodeURIComponent(stack.giteaRepoOwner)}/${encodeURIComponent(stack.giteaRepoName)}/contents/${encodeURI(pathPrefix)}`,
{ headers: { authorization: `token ${config.giteaAdminToken}`, accept: "application/json" } },
);
if (!res.ok) return [];
@@ -746,8 +777,8 @@ async function dispatchUnifiedTool(
mod.status = result.status === "unavailable" ? "blocked" : "done";
appendTimelineEvent(c.state, mod, "module", result.summary, result.detail);
} else {
mod.lastResult = { status: "local", summary: `${mod.name} completed a local workflow step.` };
mod.status = "done";
mod.lastResult = { status: "manual_required", summary: `${mod.name} requires manual input or a configured service before it can complete.` };
mod.status = "manual_required";
appendTimelineEvent(c.state, mod, "module", mod.lastResult.summary);
}
c.broadcast("workflow.updated", workflowSnapshot(c.state));

View File

@@ -0,0 +1,198 @@
import { actor, event, queue } from "rivetkit";
import { workflow } from "rivetkit/workflow";
import { db } from "../db/client.js";
import { workflowApprovals, workflowEvents, workflowRunModules, workflowRuns } from "../db/schema.js";
import { and, eq } from "drizzle-orm";
import { getWorkflowDefinition } from "../workflows/registry.js";
import { executeWorkflowModule } from "../workflows/module-runner.js";
type WorkflowCommand =
| { type: "run_module"; userId: string; runId: string; moduleId: string; idempotencyKey?: string }
| { type: "run_all"; userId: string; runId: string; idempotencyKey?: string }
| { type: "pause"; userId: string; runId: string }
| { type: "resume"; userId: string; runId: string };
type WorkflowRunActorState = {
userId: string;
runId: string;
phase: "idle" | "running" | "paused" | "error";
currentModuleId?: string;
processedCommands: number;
lastError?: string;
updatedAt?: string;
};
export const workflowRunActor = actor({
options: { name: "Workflow Run", icon: "diagram-project", noSleep: true, actionTimeout: 600_000 },
state: { userId: "", runId: "", phase: "idle", processedCommands: 0 } as WorkflowRunActorState,
events: {
updated: event<WorkflowRunActorState>(),
},
queues: {
commands: queue<WorkflowCommand>(),
},
actions: {
init: async (c, input: { userId: string; runId: string }) => {
if (c.state.userId && (c.state.userId !== input.userId || c.state.runId !== input.runId)) throw new Error("workflow actor already initialized for a different run");
c.state.userId = input.userId;
c.state.runId = input.runId;
c.state.updatedAt = new Date().toISOString();
c.broadcast("updated", c.state);
return c.state;
},
runModule: async (c, input: { userId: string; runId: string; moduleId: string; idempotencyKey?: string }) => {
await c.queue.send("commands", { type: "run_module", ...input });
return { queued: true };
},
runAll: async (c, input: { userId: string; runId: string; idempotencyKey?: string }) => {
await c.queue.send("commands", { type: "run_all", ...input });
return { queued: true };
},
pause: async (c, input: { userId: string; runId: string }) => {
await c.queue.send("commands", { type: "pause", ...input });
return { queued: true };
},
resume: async (c, input: { userId: string; runId: string }) => {
await c.queue.send("commands", { type: "resume", ...input });
return { queued: true };
},
getState: (c) => c.state,
},
run: workflow(async (ctx) => {
await ctx.loop("workflow-command-loop", async (loopCtx) => {
const message = await loopCtx.queue.next("wait-command", { names: ["commands"] });
const cmd = message.body as WorkflowCommand;
if (cmd.type === "pause") {
await loopCtx.step("pause-run", async () => {
loopCtx.state.phase = "paused";
loopCtx.state.updatedAt = new Date().toISOString();
loopCtx.broadcast("updated", loopCtx.state);
await db.insert(workflowEvents).values({ runId: cmd.runId, userId: cmd.userId, type: "workflow.pause.queued", payload: {} });
});
return;
}
if (cmd.type === "resume") {
await loopCtx.step("resume-run", async () => {
loopCtx.state.phase = "running";
loopCtx.state.updatedAt = new Date().toISOString();
loopCtx.broadcast("updated", loopCtx.state);
await db.insert(workflowEvents).values({ runId: cmd.runId, userId: cmd.userId, type: "workflow.resume.queued", payload: {} });
});
return;
}
if (loopCtx.state.phase === "paused") {
await loopCtx.step("skip-while-paused", async () => {
await db.insert(workflowEvents).values({ runId: cmd.runId, userId: cmd.userId, type: "module.deferred_paused", payload: { moduleId: cmd.type === "run_module" ? cmd.moduleId : "all" } });
});
return;
}
if (cmd.type === "run_all") {
const moduleIds = await loopCtx.step(`load-run-modules:${cmd.runId}`, async () => {
const [run] = await db.select().from(workflowRuns).where(and(eq(workflowRuns.id, cmd.runId), eq(workflowRuns.userId, cmd.userId))).limit(1);
if (!run) throw new Error(`run not found: ${cmd.runId}`);
const def = getWorkflowDefinition(run.workflowId);
if (!def) throw new Error(`workflow not found: ${run.workflowId}`);
return def.modules.map((m) => ({ id: m.id, approvalGateAfter: m.approvalGateAfter }));
});
for (const mod of moduleIds) {
const result = await runOneModule(loopCtx, cmd.userId, cmd.runId, mod.id, `${cmd.idempotencyKey ?? "all"}:${mod.id}`);
if (!result.ok) break;
if (mod.approvalGateAfter) {
const shouldPause = await loopCtx.step(`approval-gate:${cmd.runId}:${mod.approvalGateAfter}:${cmd.idempotencyKey ?? "all"}`, async () => {
const [existing] = await db.select().from(workflowApprovals).where(and(eq(workflowApprovals.runId, cmd.runId), eq(workflowApprovals.approvalId, mod.approvalGateAfter!))).limit(1);
if (existing?.status === "approved") return false;
await db.insert(workflowApprovals).values({ runId: cmd.runId, approvalId: mod.approvalGateAfter!, status: "pending", payload: { afterModuleId: mod.id } }).onConflictDoNothing();
await db.insert(workflowEvents).values({ runId: cmd.runId, userId: cmd.userId, type: "approval.required", payload: { approvalId: mod.approvalGateAfter, afterModuleId: mod.id } });
return true;
});
if (shouldPause) break;
}
}
return;
}
const result = await loopCtx.tryStep({
name: `run-module:${cmd.moduleId}:${cmd.idempotencyKey ?? "default"}`,
maxRetries: 3,
retryBackoffBase: 1_000,
retryBackoffMax: 30_000,
timeout: 300_000,
run: async () => {
loopCtx.state.phase = "running";
loopCtx.state.currentModuleId = cmd.moduleId;
loopCtx.state.updatedAt = new Date().toISOString();
loopCtx.broadcast("updated", loopCtx.state);
const result = await executeWorkflowModule({ userId: cmd.userId, runId: cmd.runId, moduleId: cmd.moduleId });
loopCtx.state.processedCommands += 1;
loopCtx.state.currentModuleId = undefined;
loopCtx.state.updatedAt = new Date().toISOString();
loopCtx.broadcast("updated", loopCtx.state);
return result;
},
catch: ["timeout", "exhausted"],
});
if (!result.ok) {
await loopCtx.step(`record-failure:${cmd.moduleId}:${cmd.idempotencyKey ?? "default"}`, async () => {
const error = JSON.stringify(result.failure.error);
loopCtx.state.phase = "error";
loopCtx.state.lastError = error;
loopCtx.state.currentModuleId = undefined;
loopCtx.state.updatedAt = new Date().toISOString();
loopCtx.broadcast("updated", loopCtx.state);
await db.update(workflowRunModules).set({ status: "blocked", error, completedAt: new Date() }).where(and(eq(workflowRunModules.runId, cmd.runId), eq(workflowRunModules.moduleId, cmd.moduleId)));
await db.update(workflowRuns).set({ status: "failed", updatedAt: new Date() }).where(eq(workflowRuns.id, cmd.runId));
await db.insert(workflowEvents).values({ runId: cmd.runId, userId: cmd.userId, type: "module.failed", payload: { moduleId: cmd.moduleId, failure: result.failure } });
});
}
});
}, {
onError: async (_ctx, event) => {
console.error("workflow-run-actor workflow error", event);
},
}),
});
async function runOneModule(loopCtx: any, userId: string, runId: string, moduleId: string, idempotencyKey: string): Promise<{ ok: boolean }> {
const shouldSkip = await loopCtx.step(`precheck-module:${moduleId}:${idempotencyKey}`, async () => {
const [row] = await db.select().from(workflowRunModules).where(and(eq(workflowRunModules.runId, runId), eq(workflowRunModules.moduleId, moduleId))).limit(1);
return row ? ["done", "blocked", "manual_required", "coming_soon"].includes(row.status) : false;
});
if (shouldSkip) return { ok: true };
const result = await loopCtx.tryStep({
name: `run-module:${moduleId}:${idempotencyKey}`,
maxRetries: 3,
retryBackoffBase: 1_000,
retryBackoffMax: 30_000,
timeout: 300_000,
run: async () => {
loopCtx.state.phase = "running";
loopCtx.state.currentModuleId = moduleId;
loopCtx.state.updatedAt = new Date().toISOString();
loopCtx.broadcast("updated", loopCtx.state);
const moduleResult = await executeWorkflowModule({ userId, runId, moduleId });
loopCtx.state.processedCommands += 1;
loopCtx.state.currentModuleId = undefined;
loopCtx.state.updatedAt = new Date().toISOString();
loopCtx.broadcast("updated", loopCtx.state);
return moduleResult;
},
catch: ["timeout", "exhausted"],
});
if (result.ok) return { ok: true };
await loopCtx.step(`record-failure:${moduleId}:${idempotencyKey}`, async () => {
const error = JSON.stringify(result.failure.error);
loopCtx.state.phase = "error";
loopCtx.state.lastError = error;
loopCtx.state.currentModuleId = undefined;
loopCtx.state.updatedAt = new Date().toISOString();
loopCtx.broadcast("updated", loopCtx.state);
await db.update(workflowRunModules).set({ status: "blocked", error, completedAt: new Date() }).where(and(eq(workflowRunModules.runId, runId), eq(workflowRunModules.moduleId, moduleId)));
await db.update(workflowRuns).set({ status: "failed", updatedAt: new Date() }).where(eq(workflowRuns.id, runId));
await db.insert(workflowEvents).values({ runId, userId, type: "module.failed", payload: { moduleId, failure: result.failure } });
});
return { ok: false };
}

View File

@@ -50,15 +50,31 @@ export const config = {
// Product microservices exposed as sub-agent surfaces.
interviewServiceUrl:
process.env.INTERVIEW_SERVICE_URL ?? "http://localhost:8007",
interviewPublicUrl:
process.env.INTERVIEW_PUBLIC_URL ?? process.env.INTERVIEW_SERVICE_URL ?? "http://localhost:8007",
roleplayServiceUrl:
process.env.ROLEPLAY_SERVICE_URL ?? "http://localhost:8008",
roleplayPublicUrl:
process.env.ROLEPLAY_PUBLIC_URL ?? process.env.ROLEPLAY_SERVICE_URL ?? "http://localhost:8008",
qscoreServiceUrl:
process.env.QSCORE_SERVICE_URL ?? "http://localhost:8000",
resumeServiceUrl:
process.env.RESUME_SERVICE_URL ?? "http://localhost:8002",
resumePublicUrl:
process.env.RESUME_PUBLIC_URL ?? process.env.RESUME_SERVICE_URL ?? "http://localhost:8002",
// ── Central Gitea (one org-wide instance, changes.md §2A) ──
giteaUrl: process.env.GITEA_URL ?? "http://127.0.0.1:3001",
// Public URL is what Git remotes should use and what OpenCode containers see.
// Internal URL is only for backend-to-Gitea API calls on a private network.
giteaPublicUrl:
process.env.GITEA_PUBLIC_URL ??
process.env.GITEA_URL ??
"http://127.0.0.1:3001",
giteaInternalUrl:
process.env.GITEA_INTERNAL_URL ??
process.env.GITEA_URL ??
process.env.GITEA_PUBLIC_URL ??
"http://127.0.0.1:3001",
giteaAdminUser: process.env.GITEA_ADMIN_USER ?? "growqr-admin",
giteaAdminPassword: process.env.GITEA_ADMIN_PASSWORD ?? "growqr-admin-dev",
giteaAdminToken: process.env.GITEA_ADMIN_TOKEN ?? "",
@@ -68,9 +84,9 @@ export const config = {
opencodeImage:
process.env.OPENCODE_IMAGE ?? "growqr/opencode:dev",
// Version tracking for rollout (changes.md §9)
opencodeImageVersion: process.env.OPENCODE_IMAGE_VERSION ?? "1.0.0",
opencodeImageVersion: process.env.OPENCODE_IMAGE_VERSION ?? "dev",
migrationVersion: process.env.MIGRATION_VERSION ?? "1",
promptVersion: process.env.PROMPT_VERSION ?? "1",
promptVersion: process.env.PROMPT_VERSION ?? "2",
// Host that user containers expose ports on (the host running Docker).
userContainerHost: process.env.USER_CONTAINER_HOST ?? "127.0.0.1",
@@ -82,6 +98,10 @@ export const config = {
// CORS for the Next.js frontend.
frontendOrigin:
process.env.FRONTEND_ORIGIN ?? "http://localhost:3000",
adminUserIds: (process.env.ADMIN_USER_IDS ?? "")
.split(",")
.map((s) => s.trim())
.filter(Boolean),
// Used by LLM requests.
maxAgentTokens: Number(process.env.MAX_AGENT_TOKENS ?? 4096),

View File

@@ -171,4 +171,84 @@ export type UserStack = typeof userStacks.$inferSelect;
export type NewUserStack = typeof userStacks.$inferInsert;
export type ActorRow = typeof actors.$inferSelect;
export type RepoRow = typeof repos.$inferSelect;
export const workflowRuns = pgTable(
"workflow_runs",
{
id: text("id").primaryKey().default(sql`gen_random_uuid()::text`),
userId: text("user_id").notNull().references(() => users.id, { onDelete: "cascade" }),
workflowId: text("workflow_id").notNull(),
workflowVersion: text("workflow_version").notNull(),
status: text("status", { enum: ["draft", "running", "paused", "completed", "failed"] }).notNull().default("running"),
goal: text("goal"),
input: jsonb("input").$type<Record<string, unknown>>(),
currentStepId: text("current_step_id"),
progressPercent: integer("progress_percent").notNull().default(0),
qscoreBefore: jsonb("qscore_before").$type<Record<string, unknown>>(),
qscoreAfter: jsonb("qscore_after").$type<Record<string, unknown>>(),
createdAt: timestamp("created_at", { withTimezone: true }).defaultNow().notNull(),
updatedAt: timestamp("updated_at", { withTimezone: true }).defaultNow().notNull(),
completedAt: timestamp("completed_at", { withTimezone: true }),
},
(t) => ({ userIdx: index("workflow_runs_user_idx").on(t.userId, t.createdAt), workflowIdx: index("workflow_runs_workflow_idx").on(t.workflowId) }),
);
export const workflowRunModules = pgTable("workflow_run_modules", {
id: text("id").primaryKey().default(sql`gen_random_uuid()::text`),
runId: text("run_id").notNull().references(() => workflowRuns.id, { onDelete: "cascade" }),
moduleId: text("module_id").notNull(),
title: text("title").notNull(),
status: text("status").notNull().default("idle"),
service: text("service"),
idempotencyKey: text("idempotency_key"),
retryCount: integer("retry_count").notNull().default(0),
maxRetries: integer("max_retries").notNull().default(2),
outputSummary: text("output_summary"),
output: jsonb("output").$type<Record<string, unknown>>(),
error: text("error"),
startedAt: timestamp("started_at", { withTimezone: true }),
completedAt: timestamp("completed_at", { withTimezone: true }),
});
export const workflowArtifacts = pgTable("workflow_artifacts", {
id: text("id").primaryKey().default(sql`gen_random_uuid()::text`),
runId: text("run_id").notNull().references(() => workflowRuns.id, { onDelete: "cascade" }),
moduleId: text("module_id"),
type: text("type").notNull(),
title: text("title").notNull(),
repoPath: text("repo_path"),
publicUrl: text("public_url"),
metadata: jsonb("metadata").$type<Record<string, unknown>>(),
createdAt: timestamp("created_at", { withTimezone: true }).defaultNow().notNull(),
});
export const workflowApprovals = pgTable("workflow_approvals", {
id: text("id").primaryKey().default(sql`gen_random_uuid()::text`),
runId: text("run_id").notNull().references(() => workflowRuns.id, { onDelete: "cascade" }),
approvalId: text("approval_id").notNull(),
status: text("status", { enum: ["pending", "approved", "rejected"] }).notNull().default("pending"),
payload: jsonb("payload").$type<Record<string, unknown>>(),
createdAt: timestamp("created_at", { withTimezone: true }).defaultNow().notNull(),
resolvedAt: timestamp("resolved_at", { withTimezone: true }),
});
export const qscoreSnapshots = pgTable("qscore_snapshots", {
id: text("id").primaryKey().default(sql`gen_random_uuid()::text`),
userId: text("user_id").notNull().references(() => users.id, { onDelete: "cascade" }),
runId: text("run_id").references(() => workflowRuns.id, { onDelete: "cascade" }),
snapshotType: text("snapshot_type", { enum: ["baseline", "module", "final"] }).notNull(),
score: integer("score"),
payload: jsonb("payload").$type<Record<string, unknown>>(),
createdAt: timestamp("created_at", { withTimezone: true }).defaultNow().notNull(),
});
export const workflowEvents = pgTable("workflow_events", {
id: text("id").primaryKey().default(sql`gen_random_uuid()::text`),
runId: text("run_id").notNull().references(() => workflowRuns.id, { onDelete: "cascade" }),
userId: text("user_id").notNull().references(() => users.id, { onDelete: "cascade" }),
type: text("type").notNull(),
payload: jsonb("payload").$type<Record<string, unknown>>(),
createdAt: timestamp("created_at", { withTimezone: true }).defaultNow().notNull(),
});
export type OpencodeSessionRow = typeof opencodeSessions.$inferSelect;
export type WorkflowRunRow = typeof workflowRuns.$inferSelect;

View File

@@ -86,9 +86,9 @@ async function getCentralGiteaClient(): Promise<GiteaClient> {
if (!centralGiteaClient) {
const token = config.giteaAdminToken;
if (token) {
centralGiteaClient = new GiteaClient(config.giteaUrl, { kind: "token", token });
centralGiteaClient = new GiteaClient(config.giteaInternalUrl, { kind: "token", token });
} else {
centralGiteaClient = new GiteaClient(config.giteaUrl, {
centralGiteaClient = new GiteaClient(config.giteaInternalUrl, {
kind: "basic",
username: config.giteaAdminUser,
password: config.giteaAdminPassword,
@@ -100,7 +100,7 @@ async function getCentralGiteaClient(): Promise<GiteaClient> {
export async function ensureCentralGiteaReady(): Promise<void> {
if (centralGiteaReady) return;
await waitForGitea(config.giteaUrl, 120_000);
await waitForGitea(config.giteaInternalUrl, 120_000);
const client = await getCentralGiteaClient();
// Ensure the org exists (changes.md §2A: single org manages all users).
@@ -111,7 +111,7 @@ export async function ensureCentralGiteaReady(): Promise<void> {
}
centralGiteaReady = true;
log.info({ url: config.giteaUrl, org: config.giteaOrgName }, "central Gitea ready");
log.info({ url: config.giteaInternalUrl, org: config.giteaOrgName }, "central Gitea ready");
}
// ── Git clone into OpenCode workspace (changes.md §4 step 3) ──
@@ -199,7 +199,7 @@ export async function syncWorkspaceToGit(userId: string, message?: string): Prom
const commitMsg = message ?? `growqr: workspace sync at ${new Date().toISOString()}`;
// Build authenticated remote URL for push.
let authUrl = `${config.giteaUrl}/${encodeURIComponent(stack.giteaRepoOwner)}/${encodeURIComponent(stack.giteaRepoName)}.git`;
let authUrl = `${config.giteaPublicUrl}/${encodeURIComponent(stack.giteaRepoOwner)}/${encodeURIComponent(stack.giteaRepoName)}.git`;
if (config.giteaAdminToken) {
authUrl = authUrl.replace("://", `://${encodeURIComponent(config.giteaAdminToken)}@`);
} else {
@@ -250,10 +250,23 @@ async function startOpencodeContainer(opts: {
const existing = await findExistingContainer(name);
if (existing) {
if (existing.State !== "running") {
await docker.getContainer(existing.Id).start().catch(() => undefined);
const existingContainer = docker.getContainer(existing.Id);
const info = await existingContainer.inspect().catch(() => null);
const labels = info?.Config?.Labels ?? {};
const current =
labels["growqr.imageVersion"] === config.opencodeImageVersion &&
labels["growqr.promptVersion"] === config.promptVersion &&
info?.Config?.Image === config.opencodeImage;
if (current) {
if (existing.State !== "running") {
await existingContainer.start().catch(() => undefined);
}
return { id: existing.Id, name };
}
return { id: existing.Id, name };
log.info({ userId: opts.userId, name }, "removing stale OpenCode container");
await existingContainer.remove({ force: true }).catch(() => undefined);
}
// Sub-agents are loaded as prompt modules at build time (changes.md §2D).
@@ -269,7 +282,7 @@ async function startOpencodeContainer(opts: {
`GROWQR_PROMPT_VERSION=${config.promptVersion}`,
`GROWQR_MIGRATION_VERSION=${config.migrationVersion}`,
`GROWQR_USER_ID=${opts.userId}`,
`GROWQR_GITEA_URL=${config.giteaUrl}`,
`GROWQR_GITEA_URL=${config.giteaPublicUrl}`,
],
WorkingDir: "/workspace",
HostConfig: {
@@ -320,7 +333,25 @@ export async function provisionUserStack(userId: string): Promise<UserStack> {
where: eq(userStacks.userId, userId),
});
if (existing && existing.status === "running") {
return existing;
const current =
existing.imageVersion === config.opencodeImageVersion &&
existing.migrationVersion === config.migrationVersion &&
existing.promptVersion === config.promptVersion;
if (current) return existing;
log.info(
{
userId,
currentImage: existing.imageVersion,
targetImage: config.opencodeImageVersion,
currentMigration: existing.migrationVersion,
targetMigration: config.migrationVersion,
currentPrompt: existing.promptVersion,
targetPrompt: config.promptVersion,
},
"recreating stale OpenCode stack before provisioning",
);
await stopUserStack(userId);
}
await ensureDir(userDataDir(userId));
@@ -417,7 +448,7 @@ export async function provisionUserStack(userId: string): Promise<UserStack> {
try {
await cloneRepoIntoContainer({
containerId: opencode.id,
repoUrl: `${config.giteaUrl}/${encodeURIComponent(repoOwner)}/${encodeURIComponent(repoName)}.git`,
repoUrl: `${config.giteaPublicUrl}/${encodeURIComponent(repoOwner)}/${encodeURIComponent(repoName)}.git`,
giteaToken: config.giteaAdminToken || undefined,
giteaUser: config.giteaAdminUser,
giteaPassword: !config.giteaAdminToken ? config.giteaAdminPassword : undefined,

View File

@@ -10,7 +10,7 @@ import { opencodeRoutes } from "./routes/opencode.js";
import { gitRoutes } from "./routes/git.js";
import { userRoutes } from "./routes/users.js";
import { agentRoutes } from "./routes/agents.js";
import { workflowRoutes } from "./routes/workflows.js";
import { workflowRoutes, workflowRunRoutes } from "./routes/workflows.js";
import { chatRoutes } from "./routes/chat.js";
import { db } from "./db/client.js";
import { hydratePortAllocator, reconcileOnBoot, ensureCentralGiteaReady } from "./docker/manager.js";
@@ -73,6 +73,7 @@ async function main() {
app.route("/users", userRoutes());
app.route("/agents", agentRoutes());
app.route("/workflows", workflowRoutes());
app.route("/workflow-runs", workflowRunRoutes());
app.route("/actors", actorRoutes());
app.route("/opencode", opencodeRoutes());
app.route("/git", gitRoutes());
@@ -82,7 +83,7 @@ async function main() {
// Self-hosted: embedded engine runs at localhost:6420.
// Proxy frontend Rivet traffic to the engine instead of using registry.handler()
// (handler conflicts with startRunner — they're mutually exclusive).
app.all("/api/rivet/*", async (c) => {
const proxyRivet = async (c: any) => {
const url = new URL(c.req.url);
const target = new URL(config.rivetEndpoint);
url.protocol = target.protocol;
@@ -92,7 +93,7 @@ async function main() {
// Forward headers, stripping hop-by-hop ones
const fwdHeaders = new Headers();
for (const [k, v] of Object.entries(c.req.raw.headers)) {
for (const [k, v] of c.req.raw.headers.entries()) {
if (k.toLowerCase() === "host") continue;
if (k.toLowerCase() === "transfer-encoding") continue;
fwdHeaders.set(k, v);
@@ -122,7 +123,9 @@ async function main() {
log.error({ err, url: url.toString() }, "rivet proxy error");
return c.json({ error: "proxy_error" }, 502);
}
});
};
app.all("/api/rivet", proxyRivet);
app.all("/api/rivet/*", proxyRivet);
registry.startRunner();
} else {
// Serverless: use registry.handler() for incoming actor traffic.
@@ -134,7 +137,8 @@ async function main() {
{
port: info.port,
rivet: config.rivetEndpoint,
gitea: config.giteaUrl,
giteaPublic: config.giteaPublicUrl,
giteaInternal: config.giteaInternalUrl,
env: config.nodeEnv,
},
"growqr-backend listening",

View File

@@ -9,6 +9,13 @@ import { requireUser, type AuthContext } from "../auth/clerk.js";
import { db } from "../db/client.js";
import { actors as actorsTable } from "../db/schema.js";
import { eq } from "drizzle-orm";
import type { UserStack } from "../db/schema.js";
function publicStack(stack: UserStack | null) {
if (!stack) return null;
const { opencodePassword: _opencodePassword, ...safe } = stack;
return safe;
}
// Per changes.md §5: ONE unified actor per user.
// Routes are user-scoped via Clerk auth; userId derived from session token.
@@ -19,7 +26,7 @@ export function actorRoutes() {
app.post("/provision", async (c) => {
const userId = c.get("userId");
const stack = await provisionUserStack(userId);
return c.json({ userId, stack });
return c.json({ userId, stack: publicStack(stack) });
});
app.get("/me", async (c) => {
@@ -29,13 +36,13 @@ export function actorRoutes() {
.select()
.from(actorsTable)
.where(eq(actorsTable.userId, userId));
return c.json({ userId, stack, actors: rows });
return c.json({ userId, stack: publicStack(stack), actors: rows });
});
app.get("/", async (c) => {
const userId = c.get("userId");
const all = await listStacks();
return c.json({ stacks: all.filter((s) => s.userId === userId) });
return c.json({ stacks: all.filter((s) => s.userId === userId).map(publicStack) });
});
app.post("/stop", async (c) => {

View File

@@ -13,6 +13,18 @@ import {
} from "../services/service-agents.js";
import { getSubAgentModules } from "../lib/prompt-loader.js";
const RIVET_CHAT_TIMEOUT_MS = 2500;
function withTimeout<T>(promise: Promise<T>, ms: number, label: string): Promise<T> {
return new Promise((resolve, reject) => {
const timer = setTimeout(() => reject(new Error(`${label} timed out`)), ms);
promise.then(
(value) => { clearTimeout(timer); resolve(value); },
(error) => { clearTimeout(timer); reject(error); },
);
});
}
const chatSchema = z.object({
messages: z.array(
z.object({
@@ -96,44 +108,6 @@ export function chatRoutes() {
const app = new Hono<AuthContext>();
app.use("*", requireUser);
// Infer workflow step from which agents have been run
function inferWorkflowStep(sessions: Array<{ moduleId: string; status: string }>, messages: Array<{ role: string; content: string }>): { workflowActive: boolean; workflowStep: number; goal: string } {
const doneModules = new Set(sessions.filter(s => s.status === "done").map(s => s.moduleId));
let step = 0;
let goal = "";
// Extract goal from conversation (look for "I have an interview at..." or "prepare for...")
for (const m of messages) {
if (m.role === "user") {
const lower = m.content.toLowerCase();
if (lower.includes("interview at") || lower.includes("prepare for") || lower.includes("role at") || lower.includes("apply to")) {
goal = m.content;
break;
}
}
}
// Infer step from completed modules
// Step 1: Workflow started (user described goal)
if (goal) step = 1;
// Step 2: User shared JD/role info
if (messages.filter(m => m.role === "user" && m.content.length > 30).length >= 2) step = 2;
// Step 3: Resume agent done
if (doneModules.has("resume")) step = 3;
// Step 4: Interview session created
if (doneModules.has("sara")) step = 4;
// Step 5: Roleplay session created
if (doneModules.has("emily")) step = 5;
// Step 6: QScore computed
if (doneModules.has("qscore")) step = 6;
return {
workflowActive: step > 0,
workflowStep: step,
goal: goal || "Career preparation",
};
}
app.post("/", async (c) => {
const userId = c.get("userId");
const body = chatSchema.parse(await c.req.json());
@@ -143,14 +117,13 @@ export function chatRoutes() {
try {
const client = createClient<Registry>(config.rivetClientEndpoint);
const handle = client.userActor.getOrCreate([userId]);
await handle.init({ userId });
const result = await handle.receiveMessage({ text: userText });
await withTimeout(handle.init({ userId }), RIVET_CHAT_TIMEOUT_MS, "Rivet init");
const result = await withTimeout(handle.receiveMessage({ text: userText }), RIVET_CHAT_TIMEOUT_MS, "Rivet chat");
if (result?.reply) {
const reply = cleanWorkflowTag(String(result.reply));
const workflow = extractWorkflowTag(String(result.reply));
const sessions = (result as any).sessions ?? [];
const stepInfo = inferWorkflowStep(sessions, body.messages);
return c.json({ reply, workflow, sessions, ...stepInfo });
return c.json({ reply, workflow, sessions });
}
} catch (err) {
console.warn("Rivet chat unavailable, using direct LLM:", err instanceof Error ? err.message : String(err));
@@ -210,7 +183,7 @@ export function chatRoutes() {
moduleName: "Sara",
status: "done",
sessionId: detail.session_id as string,
sessionUrl: `http://localhost:8007/api/v1/demo?session_id=${detail.session_id ?? ""}`,
sessionUrl: `${config.interviewPublicUrl.replace(/\/$/, "")}/api/v1/demo?session_id=${detail.session_id ?? ""}`,
summary: toolResult.summary,
});
}
@@ -228,7 +201,7 @@ export function chatRoutes() {
moduleName: "Emily",
status: "done",
sessionId: detail.session_id as string,
sessionUrl: `http://localhost:8008/api/v1/demo?session_id=${detail.session_id ?? ""}`,
sessionUrl: `${config.roleplayPublicUrl.replace(/\/$/, "")}/api/v1/demo?session_id=${detail.session_id ?? ""}`,
summary: toolResult.summary,
});
}
@@ -279,7 +252,6 @@ export function chatRoutes() {
reply: cleanWorkflowTag(reply),
workflow: extractWorkflowTag(reply),
sessions,
...inferWorkflowStep(sessions, body.messages),
});
} catch (llmErr) {
console.error("Direct LLM chat error:", llmErr);

View File

@@ -7,6 +7,10 @@ import { opencodeSessions } from "../db/schema.js";
import { eq } from "drizzle-orm";
import { requireUser, type AuthContext } from "../auth/clerk.js";
function publicWorkspace(target: { baseUrl: string; password: string | undefined }) {
return { baseUrl: target.baseUrl, auth: target.password ? "basic" : "none" };
}
// PRD §5.3 — OpenCode Docker management API.
// Proxies into the user's OpenCode container's HTTP surface.
export function opencodeRoutes() {
@@ -31,7 +35,7 @@ export function opencodeRoutes() {
if (!target) return c.json({ error: "not provisioned" }, 404);
const client = new OpencodeClient(target.baseUrl, target.password);
const health = await client.health();
return c.json({ workspace: target, health });
return c.json({ workspace: publicWorkspace(target), health });
});
app.post("/sessions", async (c) => {

View File

@@ -1,11 +1,17 @@
import { Hono } from "hono";
import { requireUser, type AuthContext } from "../auth/clerk.js";
import { db } from "../db/client.js";
import { users, userStacks } from "../db/schema.js";
import { users, userStacks, type UserStack } from "../db/schema.js";
import { eq } from "drizzle-orm";
import { provisionUserStack } from "../docker/manager.js";
import { log } from "../log.js";
function publicStack(stack: UserStack | null | undefined) {
if (!stack) return stack;
const { opencodePassword: _opencodePassword, ...safe } = stack;
return safe;
}
export function userRoutes() {
const app = new Hono<AuthContext>();
app.use("*", requireUser);
@@ -31,7 +37,7 @@ export function userRoutes() {
}
return c.json({
user: userRow,
stack: stack ?? { status: "provisioning" },
stack: publicStack(stack) ?? { status: "provisioning" },
});
});
@@ -43,7 +49,7 @@ export function userRoutes() {
const stack = await db.query.userStacks.findFirst({
where: eq(userStacks.userId, userId),
});
return c.json({ user: userRow, stack });
return c.json({ user: userRow, stack: publicStack(stack) });
});
return app;

View File

@@ -1,84 +1,236 @@
import { Hono } from "hono";
import { z } from "zod";
import { createClient, type Client } from "rivetkit/client";
import { desc, eq, and } from "drizzle-orm";
import { readFile } from "node:fs/promises";
import { join, normalize } from "node:path";
import { config } from "../config.js";
import { requireUser, type AuthContext } from "../auth/clerk.js";
import type { Registry } from "../actors/registry.js";
import { db } from "../db/client.js";
import { qscoreSnapshots, workflowRuns, workflowRunModules, workflowEvents, workflowArtifacts, workflowApprovals } from "../db/schema.js";
import { getWorkflowDefinition, listWorkflowDefinitions, normalizeWorkflowId } from "../workflows/registry.js";
import { executeWorkflowModule } from "../workflows/module-runner.js";
import { validateWorkflowDefinition } from "../workflows/validation.js";
import { listServiceCapabilities } from "../workflows/service-capabilities.js";
import { getUserStack, giteaClientFor } from "../docker/manager.js";
import { runServiceAgentProbe } from "../services/service-agents.js";
// Lazy-load the Rivet client to avoid connecting at import time when the engine
// isn't running (avoids "failed to fetch metadata" spam on startup).
let _client: Client<Registry> | null = null;
function getClient(): Client<Registry> {
if (!_client) {
_client = createClient<Registry>(config.rivetEndpoint);
}
return _client;
}
function getClient(): Client<Registry> { return (_client ??= createClient<Registry>(config.rivetEndpoint)); }
function userActorFor(userId: string) { return getClient().userActor.getOrCreate([userId]); }
function workflowActorFor(userId: string, runId: string) { return getClient().workflowRunActor.getOrCreate([userId, runId]); }
// Per changes.md §5: one unified userActor per user.
function userActorFor(userId: string) {
return getClient().userActor.getOrCreate([userId]);
}
const startSchema = z.object({ goal: z.string().min(1).optional(), input: z.record(z.unknown()).optional() });
export function workflowRoutes() {
const app = new Hono<AuthContext>();
app.use("*", requireUser);
app.get("/", (c) => c.json({ workflows: listWorkflowDefinitions() }));
app.get("/capabilities/services", (c) => c.json({ services: listServiceCapabilities() }));
app.get("/capabilities/health", async (c) => c.json({ services: await Promise.all(listServiceCapabilities().map(async (s) => ({ ...s, healthy: s.internalUrl ? await serviceHealthy(s.internalUrl) : s.enabled }))) }));
app.get("/validate", (c) => c.json({ results: listWorkflowDefinitions().map((workflow) => ({ workflowId: workflow.id, ...validateWorkflowDefinition(workflow) })) }));
app.get("/admin/ops", async (c) => {
if (config.adminUserIds.length && !config.adminUserIds.includes(c.get("userId"))) return c.json({ error: "forbidden" }, 403);
const failedRuns = await db.select().from(workflowRuns).where(eq(workflowRuns.status, "failed")).orderBy(desc(workflowRuns.updatedAt)).limit(50);
const blockedModules = await db.select().from(workflowRunModules).where(eq(workflowRunModules.status, "blocked")).limit(100);
return c.json({ failedRuns, blockedModules, services: await Promise.all(listServiceCapabilities().map(async (s) => ({ id: s.id, enabled: s.enabled, healthy: s.internalUrl ? await serviceHealthy(s.internalUrl) : s.enabled }))) });
});
app.get("/runs/history", async (c) => {
const runs = await db.select().from(workflowRuns).where(eq(workflowRuns.userId, c.get("userId"))).orderBy(desc(workflowRuns.createdAt)).limit(50);
return c.json({ runs });
});
app.get("/:workflowId", async (c) => {
if (c.req.param("workflowId") === "job-application") {
const userId = c.get("userId");
const h = userActorFor(userId);
await h.init({ userId });
return c.json({ workflow: await h.getWorkflowStatus() });
}
const def = getWorkflowDefinition(c.req.param("workflowId"));
if (!def) return c.json({ error: "workflow_not_found" }, 404);
return c.json({ workflow: def });
});
app.post("/:workflowId/runs", async (c) => {
const userId = c.get("userId");
const workflowId = normalizeWorkflowId(c.req.param("workflowId"));
const def = getWorkflowDefinition(workflowId);
if (!def) return c.json({ error: "workflow_not_found" }, 404);
const body = startSchema.parse(await c.req.json().catch(() => ({})));
const [createdRun] = await db.insert(workflowRuns).values({ userId, workflowId: def.id, workflowVersion: def.version, status: "running", goal: body.goal, input: body.input ?? {} }).returning();
if (!createdRun) return c.json({ error: "run_create_failed" }, 500);
const run = createdRun;
await db.insert(workflowRunModules).values(def.modules.map((m) => ({ runId: run.id, moduleId: m.id, title: m.title, status: "idle", service: m.service })));
await db.insert(workflowEvents).values({ runId: run.id, userId, type: "workflow.started", payload: { workflowId: def.id, goal: body.goal } });
try {
const baseline = await runServiceAgentProbe({ id: "qscore", name: "Quinn", role: "Q Score", kind: "score", description: "Baseline Q Score", service: "qscore-service" }, { userId, goal: body.goal ?? "workflow baseline" });
if (baseline.status === "ok" && baseline.detail) {
const payload = baseline.detail as Record<string, unknown>;
await db.insert(qscoreSnapshots).values({ userId, runId: run.id, snapshotType: "baseline", score: extractQScore(payload), payload });
await db.update(workflowRuns).set({ qscoreBefore: payload, updatedAt: new Date() }).where(eq(workflowRuns.id, run.id));
}
} catch {
await db.insert(workflowEvents).values({ runId: run.id, userId, type: "qscore.baseline_unavailable", payload: {} });
}
const handle = userActorFor(userId);
await handle.init({ userId });
await handle.startWorkflow({ workflowId: def.id, runId: run.id, goal: body.goal, input: body.input ?? {} });
return c.json({ run, workflow: def }, 201);
});
app.get("/:workflowId/runs/current", async (c) => {
const userId = c.get("userId");
const workflowId = normalizeWorkflowId(c.req.param("workflowId"));
const [run] = await db.select().from(workflowRuns).where(and(eq(workflowRuns.userId, userId), eq(workflowRuns.workflowId, workflowId))).orderBy(desc(workflowRuns.createdAt)).limit(1);
if (!run) return c.json({ run: null });
const modules = await db.select().from(workflowRunModules).where(eq(workflowRunModules.runId, run.id));
return c.json({ run, modules });
});
// Compatibility aliases for existing frontend.
app.post("/job-application", async (c) => {
const userId = c.get("userId");
const body = z
.object({ goal: z.string().min(1).optional() })
.parse(await c.req.json().catch(() => ({})));
const body = startSchema.parse(await c.req.json().catch(() => ({})));
const handle = userActorFor(userId);
await handle.init({ userId });
const state = await handle.startWorkflow({ goal: body.goal });
const state = await handle.startWorkflow({ workflowId: "interview-to-offer", goal: body.goal, input: body.input ?? {} });
return c.json({ workflow: state });
});
app.get("/job-application", async (c) => {
const userId = c.get("userId");
const handle = userActorFor(userId);
await handle.init({ userId });
const state = await handle.getWorkflowStatus();
return c.json({ workflow: state });
});
app.post("/job-application/pause", async (c) => {
const userId = c.get("userId");
const workflow = await userActorFor(userId).pauseWorkflow();
return c.json({ workflow });
});
app.post("/job-application/resume", async (c) => {
const userId = c.get("userId");
const workflow = await userActorFor(userId).resumeWorkflow();
return c.json({ workflow });
});
app.post("/job-application/agents/:moduleId/run", async (c) => {
const userId = c.get("userId");
const moduleId = c.req.param("moduleId");
const workflow = await userActorFor(userId).runWorkflowModule({ moduleId });
return c.json({ workflow });
});
app.get("/job-application", async (c) => { const userId = c.get("userId"); const h = userActorFor(userId); await h.init({ userId }); return c.json({ workflow: await h.getWorkflowStatus() }); });
app.post("/job-application/pause", async (c) => c.json({ workflow: await userActorFor(c.get("userId")).pauseWorkflow() }));
app.post("/job-application/resume", async (c) => c.json({ workflow: await userActorFor(c.get("userId")).resumeWorkflow() }));
app.post("/job-application/agents/:moduleId/run", async (c) => c.json({ workflow: await userActorFor(c.get("userId")).runWorkflowModule({ moduleId: c.req.param("moduleId") }) }));
app.post("/job-application/agents/:moduleId/score", async (c) => {
const userId = c.get("userId");
const moduleId = c.req.param("moduleId");
const body = z
.object({
question: z.string().min(1),
answer: z.string().min(1),
score: z.number().min(0).max(100),
notes: z.string().optional(),
})
.parse(await c.req.json());
const workflow = await userActorFor(userId).recordQaScore({
moduleId,
...body,
});
return c.json({ workflow });
const body = z.object({ question: z.string().min(1), answer: z.string().min(1), score: z.number().min(0).max(100), notes: z.string().optional() }).parse(await c.req.json());
return c.json({ workflow: await userActorFor(c.get("userId")).recordQaScore({ moduleId: c.req.param("moduleId"), ...body }) });
});
return app;
}
export function workflowRunRoutes() {
const app = new Hono<AuthContext>();
app.use("*", requireUser);
app.get("/:runId", async (c) => {
const [run] = await db.select().from(workflowRuns).where(and(eq(workflowRuns.id, c.req.param("runId")), eq(workflowRuns.userId, c.get("userId")))).limit(1);
if (!run) return c.json({ error: "run_not_found" }, 404);
const modules = await db.select().from(workflowRunModules).where(eq(workflowRunModules.runId, run.id));
const artifacts = await db.select().from(workflowArtifacts).where(eq(workflowArtifacts.runId, run.id));
const events = await db.select().from(workflowEvents).where(eq(workflowEvents.runId, run.id)).orderBy(desc(workflowEvents.createdAt)).limit(100);
return c.json({ run, modules, artifacts, events });
});
app.post("/:runId/pause", async (c) => {
const run = await requireOwnedRun(c.get("userId"), c.req.param("runId"));
if (!run) return c.json({ error: "run_not_found" }, 404);
await db.update(workflowRuns).set({ status: "paused", updatedAt: new Date() }).where(eq(workflowRuns.id, run.id));
const actor = workflowActorFor(c.get("userId"), run.id);
await actor.init({ userId: c.get("userId"), runId: run.id });
await actor.pause({ userId: c.get("userId"), runId: run.id });
return c.json({ ok: true });
});
app.post("/:runId/resume", async (c) => {
const run = await requireOwnedRun(c.get("userId"), c.req.param("runId"));
if (!run) return c.json({ error: "run_not_found" }, 404);
await db.update(workflowRuns).set({ status: "running", updatedAt: new Date() }).where(eq(workflowRuns.id, run.id));
const actor = workflowActorFor(c.get("userId"), run.id);
await actor.init({ userId: c.get("userId"), runId: run.id });
await actor.resume({ userId: c.get("userId"), runId: run.id });
return c.json({ ok: true });
});
app.post("/:runId/run", async (c) => {
const run = await requireOwnedRun(c.get("userId"), c.req.param("runId"));
if (!run) return c.json({ error: "run_not_found" }, 404);
const idempotencyKey = c.req.header("idempotency-key") ?? `${run.id}:all:${Date.now()}`;
await db.insert(workflowEvents).values({ runId: run.id, userId: c.get("userId"), type: "workflow.run_all_queued", payload: { idempotencyKey } });
const def = getWorkflowDefinition(run.workflowId);
if (!def) return c.json({ error: "workflow_not_found" }, 404);
for (const mod of def.modules) {
const result = await executeWorkflowModule({ userId: c.get("userId"), runId: run.id, moduleId: mod.id });
if (mod.approvalGateAfter && result.status === "done") {
await db.insert(workflowApprovals).values({ runId: run.id, approvalId: mod.approvalGateAfter, status: "pending", payload: { afterModuleId: mod.id } }).onConflictDoNothing();
await db.insert(workflowEvents).values({ runId: run.id, userId: c.get("userId"), type: "approval.required", payload: { approvalId: mod.approvalGateAfter, afterModuleId: mod.id } });
break;
}
}
return c.json({ queued: true, runId: run.id, idempotencyKey });
});
app.post("/:runId/modules/:moduleId/run", async (c) => {
const runId = c.req.param("runId");
const moduleId = c.req.param("moduleId");
const [run] = await db.select().from(workflowRuns).where(and(eq(workflowRuns.id, runId), eq(workflowRuns.userId, c.get("userId")))).limit(1);
if (!run) return c.json({ error: "run_not_found" }, 404);
const idempotencyKey = c.req.header("idempotency-key") ?? `${runId}:${moduleId}`;
await db.update(workflowRunModules).set({ status: "running", startedAt: new Date(), idempotencyKey }).where(and(eq(workflowRunModules.runId, runId), eq(workflowRunModules.moduleId, moduleId)));
await db.insert(workflowEvents).values({ runId, userId: c.get("userId"), type: "module.queued", payload: { moduleId, idempotencyKey } });
await executeWorkflowModule({ userId: c.get("userId"), runId, moduleId });
return c.json({ queued: true, runId, moduleId, idempotencyKey });
});
app.get("/:runId/artifacts", async (c) => {
const run = await requireOwnedRun(c.get("userId"), c.req.param("runId"));
if (!run) return c.json({ error: "run_not_found" }, 404);
return c.json({ artifacts: await db.select().from(workflowArtifacts).where(eq(workflowArtifacts.runId, run.id)) });
});
app.get("/:runId/artifacts/:artifactId/content", async (c) => {
const run = await requireOwnedRun(c.get("userId"), c.req.param("runId"));
if (!run) return c.json({ error: "run_not_found" }, 404);
const [artifact] = await db.select().from(workflowArtifacts).where(and(eq(workflowArtifacts.runId, run.id), eq(workflowArtifacts.id, c.req.param("artifactId")))).limit(1);
if (!artifact?.repoPath) return c.json({ error: "artifact_not_found" }, 404);
const stack = await getUserStack(c.get("userId"));
const safePath = normalize(artifact.repoPath).replace(/^(\.\.\/)+/, "");
let content = stack?.workspacePath ? await readFile(join(stack.workspacePath, safePath), "utf8").catch(() => null) : null;
if (content == null && stack?.giteaRepoOwner && stack.giteaRepoName) {
const client = await giteaClientFor(c.get("userId"));
content = client ? await client.readFile({ owner: stack.giteaRepoOwner, repo: stack.giteaRepoName, path: safePath }) : null;
}
if (content == null) return c.json({ error: "artifact_content_not_found" }, 404);
return c.json({ artifact, content });
});
app.get("/:runId/events", async (c) => {
const run = await requireOwnedRun(c.get("userId"), c.req.param("runId"));
if (!run) return c.json({ error: "run_not_found" }, 404);
return c.json({ events: await db.select().from(workflowEvents).where(eq(workflowEvents.runId, run.id)).orderBy(desc(workflowEvents.createdAt)).limit(200) });
});
app.post("/:runId/approvals/:approvalId", async (c) => {
const run = await requireOwnedRun(c.get("userId"), c.req.param("runId"));
if (!run) return c.json({ error: "run_not_found" }, 404);
const body = await c.req.json().catch(() => ({}));
const status = body?.status === "rejected" ? "rejected" : "approved";
await db.insert(workflowApprovals).values({ runId: run.id, approvalId: c.req.param("approvalId"), status, payload: body, resolvedAt: new Date() });
await db.insert(workflowEvents).values({ runId: run.id, userId: c.get("userId"), type: "approval.recorded", payload: { approvalId: c.req.param("approvalId"), status, body } });
if (status === "approved" && body?.continue !== false) {
const actor = workflowActorFor(c.get("userId"), run.id);
await actor.init({ userId: c.get("userId"), runId: run.id });
await actor.runAll({ userId: c.get("userId"), runId: run.id, idempotencyKey: `${run.id}:approval:${c.req.param("approvalId")}:${Date.now()}` });
}
return c.json({ ok: true, status });
});
return app;
}
async function requireOwnedRun(userId: string, runId: string) {
const [run] = await db.select().from(workflowRuns).where(and(eq(workflowRuns.id, runId), eq(workflowRuns.userId, userId))).limit(1);
return run ?? null;
}
function extractQScore(output: Record<string, unknown>): number | undefined {
const direct = output.q_score ?? output.estimated_q_score;
if (typeof direct === "number") return Math.round(direct);
const compute = output.compute as Record<string, unknown> | undefined;
if (typeof compute?.q_score === "number") return Math.round(compute.q_score);
return undefined;
}
async function serviceHealthy(baseUrl: string): Promise<boolean> {
try {
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), 1500);
const res = await fetch(`${baseUrl.replace(/\/$/, "")}/health`, { signal: controller.signal });
clearTimeout(timer);
return res.ok;
} catch {
return false;
}
}

View File

@@ -12,7 +12,7 @@ export type ServiceAgentRef = {
};
export type ServiceAgentResult = {
status: "ok" | "unavailable" | "local";
status: "ok" | "unavailable" | "manual_required" | "opencode_required" | "coming_soon";
summary: string;
detail?: unknown;
};
@@ -313,8 +313,8 @@ export async function runServiceAgentProbe(
: healthCheck(config.resumeServiceUrl, "Resume Agent / resume-service");
default:
return {
status: "local",
summary: `${agent.name} is a local workflow agent managed by Rivet.`,
status: "manual_required",
summary: `${agent.name} has no configured service adapter; manual workflow action is required.`,
};
}
} catch (err) {

View File

@@ -0,0 +1,87 @@
import { mkdir, readFile, writeFile } from "node:fs/promises";
import { dirname, join, resolve } from "node:path";
import { getUserStack, opencodeUrlFor, provisionUserStack, syncWorkspaceToGit } from "../../docker/manager.js";
import { OpencodeClient } from "../../lib/opencode.js";
import { validateArtifactMarkdown } from "../validation.js";
import type { WorkflowDefinition, WorkflowModuleDefinition } from "../types.js";
export type OpenCodeWorkflowResult = {
status: "opencode_required" | "ok" | "blocked_service_unavailable";
summary: string;
artifacts: Array<{ type: string; title: string; repoPath: string; metadata?: Record<string, unknown> }>;
};
// Safe launch adapter: ensures the user's OpenCode stack/workspace exists,
// sends a module prompt to OpenCode when reachable, and validates the artifact
// contract before returning ok. If OpenCode is not reachable or validation fails,
// it returns an honest non-success status.
export async function prepareOpenCodeWorkflowModule(input: {
userId: string;
runId: string;
workflow: WorkflowDefinition;
module: WorkflowModuleDefinition;
goal?: string;
}): Promise<OpenCodeWorkflowResult> {
await provisionUserStack(input.userId);
const stack = await getUserStack(input.userId);
const artifactType = input.module.artifactTypes?.[0] ?? input.module.id;
const repoPath = `artifacts/${input.workflow.id}/${input.runId}/${input.module.id}.md`;
const artifacts = [{ type: artifactType, title: input.module.title, repoPath, metadata: { promptPath: input.module.promptPath, promptVersion: process.env.PROMPT_VERSION ?? "1" } }];
if (!stack?.workspacePath) {
return { status: "blocked_service_unavailable", summary: "OpenCode workspace is not provisioned.", artifacts };
}
const abs = join(stack.workspacePath, repoPath);
await mkdir(dirname(abs), { recursive: true });
await writeFile(abs, placeholder(input.module.title, input.goal, input.module.promptPath), "utf8");
const oc = await opencodeUrlFor(input.userId);
if (!oc) {
await syncWorkspaceToGit(input.userId, `workflow artifact contract: ${input.module.id}`).catch(() => {});
return { status: "opencode_required", summary: `${input.module.title} is ready for OpenCode execution; artifact contract reserved at ${repoPath}.`, artifacts };
}
const client = new OpencodeClient(oc.baseUrl, oc.password);
const health = await client.health();
if (!health) {
await syncWorkspaceToGit(input.userId, `workflow artifact contract: ${input.module.id}`).catch(() => {});
return { status: "opencode_required", summary: `OpenCode is not reachable yet; artifact contract reserved at ${repoPath}.`, artifacts };
}
const promptText = await loadPrompt(input.module.promptPath);
const session = await withTimeout(client.createSession({ title: `${input.workflow.shortTitle}: ${input.module.title}` }), 8_000, "OpenCode session creation timed out");
await withTimeout(client.sendMessage({ sessionId: session.id, text: `${promptText}\n\nWorkflow: ${input.workflow.id}\nRun ID: ${input.runId}\nModule: ${input.module.id}\nGoal: ${input.goal ?? ""}\nWrite the final markdown artifact exactly to /workspace/${repoPath}.` }), 30_000, "OpenCode message timed out");
const markdown = await readFile(abs, "utf8").catch(() => "");
const validation = validateArtifactMarkdown(markdown);
await syncWorkspaceToGit(input.userId, `workflow artifact: ${input.module.id}`).catch(() => {});
if (!validation.ok) {
return { status: "opencode_required", summary: `OpenCode session ${session.id} ran, but artifact validation is incomplete: ${validation.errors.join("; ")}.`, artifacts: artifacts.map((a) => ({ ...a, metadata: { ...a.metadata, sessionId: session.id, validationErrors: validation.errors } })) };
}
return { status: "ok", summary: `${input.module.title} generated and validated artifact at ${repoPath}.`, artifacts: artifacts.map((a) => ({ ...a, metadata: { ...a.metadata, sessionId: session.id, validated: true } })) };
}
function placeholder(title: string, goal?: string, promptPath?: string) {
return `# ${title}\n\n## Summary\nOpenCode execution pending.\n\n## User Inputs\nGoal: ${goal ?? ""}\n\n## Recommendations\nPending.\n\n## Next Actions\nRun the OpenCode artifact executor.\n\n## Memory To Save\nPrompt: ${promptPath ?? "not configured"}\n`;
}
async function loadPrompt(promptPath?: string) {
if (!promptPath) return "Generate the requested GrowQR workflow artifact.";
const abs = resolve(process.cwd(), promptPath);
return readFile(abs, "utf8").catch(() => "Generate the requested GrowQR workflow artifact.");
}
async function withTimeout<T>(promise: Promise<T>, ms: number, message: string): Promise<T> {
let timer: ReturnType<typeof setTimeout> | undefined;
try {
return await Promise.race([
promise,
new Promise<T>((_, reject) => { timer = setTimeout(() => reject(new Error(message)), ms); }),
]);
} finally {
if (timer) clearTimeout(timer);
}
}

View File

@@ -0,0 +1,89 @@
import { and, eq } from "drizzle-orm";
import { db } from "../db/client.js";
import { qscoreSnapshots, workflowArtifacts, workflowEvents, workflowRunModules, workflowRuns } from "../db/schema.js";
import { runServiceAgentProbe } from "../services/service-agents.js";
import { getWorkflowDefinition } from "./registry.js";
import { prepareOpenCodeWorkflowModule } from "./executors/opencode-executor.js";
export type ModuleRunResult = {
status: "done" | "blocked" | "manual_required" | "coming_soon" | "opencode_required";
summary: string;
output?: Record<string, unknown>;
};
export async function executeWorkflowModule(input: { userId: string; runId: string; moduleId: string }): Promise<ModuleRunResult> {
const [run] = await db.select().from(workflowRuns).where(and(eq(workflowRuns.id, input.runId), eq(workflowRuns.userId, input.userId))).limit(1);
if (!run) throw new Error(`workflow run not found: ${input.runId}`);
const workflow = getWorkflowDefinition(run.workflowId);
if (!workflow) throw new Error(`workflow definition not found: ${run.workflowId}`);
const mod = workflow.modules.find((m) => m.id === input.moduleId);
if (!mod) throw new Error(`workflow module not found: ${input.moduleId}`);
await db.update(workflowRunModules).set({ status: "running", startedAt: new Date() }).where(and(eq(workflowRunModules.runId, input.runId), eq(workflowRunModules.moduleId, input.moduleId)));
if (mod.execution === "service" && mod.service) {
const result = await runServiceAgentProbe({ id: mod.id, name: mod.title, role: mod.role, kind: "microservice", description: mod.description, service: mod.service }, { userId: input.userId, goal: run.goal ?? "" });
const status = result.status === "ok" ? "done" : "blocked";
const output = result.detail as Record<string, unknown> | undefined;
await db.update(workflowRunModules).set({ status, outputSummary: result.summary, output, completedAt: new Date() }).where(and(eq(workflowRunModules.runId, input.runId), eq(workflowRunModules.moduleId, input.moduleId)));
if (mod.service === "qscore-service" && output) {
const score = extractQScore(output);
await db.insert(qscoreSnapshots).values({ userId: input.userId, runId: input.runId, snapshotType: "module", score, payload: output });
await db.update(workflowRuns).set({ qscoreAfter: output, updatedAt: new Date() }).where(eq(workflowRuns.id, input.runId));
}
await db.insert(workflowEvents).values({ runId: input.runId, userId: input.userId, type: status === "done" ? "module.completed" : "module.blocked", payload: { moduleId: input.moduleId, summary: result.summary, output } });
await updateRunProgress(input.runId);
return { status, summary: result.summary, output };
}
if (mod.execution === "opencode") {
try {
const prepared = await prepareOpenCodeWorkflowModule({ userId: input.userId, runId: input.runId, workflow, module: mod, goal: run.goal ?? undefined });
const status = prepared.status === "ok" ? "done" : prepared.status === "blocked_service_unavailable" ? "blocked" : "opencode_required";
const output = { artifacts: prepared.artifacts };
if (prepared.artifacts.length) {
await db.insert(workflowArtifacts).values(prepared.artifacts.map((a) => ({ runId: input.runId, moduleId: input.moduleId, type: a.type, title: a.title, repoPath: a.repoPath, metadata: a.metadata }))).onConflictDoNothing();
}
await db.update(workflowRunModules).set({ status, outputSummary: prepared.summary, output, completedAt: new Date() }).where(and(eq(workflowRunModules.runId, input.runId), eq(workflowRunModules.moduleId, input.moduleId)));
await db.insert(workflowEvents).values({ runId: input.runId, userId: input.userId, type: status === "done" ? "artifact.generated" : "artifact.contract_created", payload: { moduleId: input.moduleId, status, artifacts: prepared.artifacts } });
await updateRunProgress(input.runId);
return { status, summary: prepared.summary, output };
} catch (err) {
const summary = `OpenCode execution unavailable: ${err instanceof Error ? err.message : String(err)}`;
await db.update(workflowRunModules).set({ status: "blocked", outputSummary: summary, error: summary, completedAt: new Date() }).where(and(eq(workflowRunModules.runId, input.runId), eq(workflowRunModules.moduleId, input.moduleId)));
await db.insert(workflowEvents).values({ runId: input.runId, userId: input.userId, type: "module.blocked", payload: { moduleId: input.moduleId, status: "blocked", summary } });
await updateRunProgress(input.runId);
return { status: "blocked", summary };
}
}
const status = mod.execution === "coming_soon" ? "coming_soon" : "manual_required";
const summary = mod.execution === "coming_soon" ? `${mod.title} is coming soon.` : `${mod.title} requires manual input or approval.`;
await db.update(workflowRunModules).set({ status, outputSummary: summary, completedAt: new Date() }).where(and(eq(workflowRunModules.runId, input.runId), eq(workflowRunModules.moduleId, input.moduleId)));
await db.insert(workflowEvents).values({ runId: input.runId, userId: input.userId, type: "module.blocked", payload: { moduleId: input.moduleId, status, summary } });
await updateRunProgress(input.runId);
return { status, summary };
}
export async function updateRunProgress(runId: string) {
const modules = await db.select().from(workflowRunModules).where(eq(workflowRunModules.runId, runId));
if (!modules.length) return;
const terminal = new Set(["done", "blocked", "manual_required", "coming_soon", "opencode_required"]);
const finished = modules.filter((m) => terminal.has(m.status)).length;
const done = modules.filter((m) => m.status === "done").length;
const progressPercent = Math.round((finished / modules.length) * 100);
const status = finished === modules.length ? (done === modules.length ? "completed" : "failed") : "running";
await db.update(workflowRuns).set({ progressPercent, status, completedAt: status === "completed" ? new Date() : null, updatedAt: new Date() }).where(eq(workflowRuns.id, runId));
if (status === "completed") {
const [run] = await db.select().from(workflowRuns).where(eq(workflowRuns.id, runId)).limit(1);
if (run) await db.insert(workflowEvents).values({ runId, userId: run.userId, type: "workflow.completed", payload: { progressPercent, modules: modules.map((m) => ({ moduleId: m.moduleId, status: m.status, summary: m.outputSummary })) } });
}
}
function extractQScore(output: Record<string, unknown>): number | undefined {
const direct = output.q_score ?? output.estimated_q_score;
if (typeof direct === "number") return Math.round(direct);
const compute = output.compute as Record<string, unknown> | undefined;
if (typeof compute?.q_score === "number") return Math.round(compute.q_score);
return undefined;
}

42
src/workflows/registry.ts Normal file
View File

@@ -0,0 +1,42 @@
import type { WorkflowDefinition } from "./types.js";
const commonInputs = [
{ id: "goal", label: "Target outcome", type: "text", required: true },
{ id: "context", label: "Background context", type: "textarea", required: false },
];
export const workflowDefinitions: WorkflowDefinition[] = [
{
id: "interview-to-offer", version: "1.0.0", title: "Interview-to-Offer Accelerator", shortTitle: "Interview to Offer",
promise: "Turn a scheduled interview into a focused prep plan, practice sessions, and a readiness report.", segment: ["job-seekers", "interviewing"], urgency: "high", estimatedDuration: "2-5 days", priceTier: "starter", sku: "workflow_interview_to_offer", isPurchasable: true, isFreePreview: true,
visual: { icon: "briefcase-business", color: "emerald", mascotAgentIds: ["sara", "emily", "qscore"] }, requiredInputs: commonInputs,
modules: [
{ id: "resume", title: "Resume fit scan", role: "Resume Agent", description: "Analyze resume readiness for the target role.", execution: "service", service: "resume-service" },
{ id: "interview-plan", title: "Interview prep plan", role: "OpenCode", description: "Generate a prep plan and likely questions artifact.", execution: "opencode", promptPath: "prompts/workflows/interview-to-offer/interview-plan.md", artifactTypes: ["interview_plan"], approvalGateAfter: "review-plan" },
{ id: "sara", title: "Mock interview", role: "Sara", description: "Create a real interview practice session.", execution: "service", service: "interview-service" },
{ id: "emily", title: "Communication roleplay", role: "Emily", description: "Create a realistic roleplay session.", execution: "service", service: "roleplay-service" },
{ id: "qscore", title: "Readiness Q Score", role: "Quinn", description: "Compute readiness score.", execution: "service", service: "qscore-service" },
],
outputs: [{ id: "interview_plan", type: "markdown", title: "Interview prep plan", path: "artifacts/interview-to-offer/interview-plan.md" }], qscoreDimensions: ["clarity", "communication", "role_fit"], approvalGates: [{ id: "review-plan", title: "Review prep plan", description: "User reviews generated plan before practice.", required: false }],
},
{
id: "career-transition", version: "1.0.0", title: "Career Transition Sprint", shortTitle: "Career Transition", promise: "Map transferable skills and produce a transition narrative.", segment: ["career-changers"], urgency: "medium", estimatedDuration: "1-2 weeks", priceTier: "starter", sku: "workflow_career_transition", isPurchasable: true, isFreePreview: true, visual: { icon: "route", color: "blue", mascotAgentIds: ["resume", "qscore"] }, requiredInputs: commonInputs,
modules: [{ id: "transition-map", title: "Transition map", role: "OpenCode", description: "Generate skills map and positioning narrative.", execution: "opencode", promptPath: "prompts/workflows/career-transition/orchestrator.md", artifactTypes: ["transition_map"] }, { id: "resume", title: "Resume fit scan", role: "Resume Agent", description: "Analyze resume for target path.", execution: "service", service: "resume-service" }], outputs: [{ id: "transition_map", type: "markdown", title: "Transition map" }], qscoreDimensions: ["positioning", "skills", "confidence"], approvalGates: [],
},
{
id: "salary-negotiation-war-room", version: "1.0.0", title: "Salary Negotiation War Room", shortTitle: "Negotiation", promise: "Prepare scripts, ranges, and roleplay for an offer conversation.", segment: ["offer-stage"], urgency: "high", estimatedDuration: "24-72 hours", priceTier: "premium", sku: "workflow_salary_negotiation", isPurchasable: true, isFreePreview: false, visual: { icon: "badge-dollar-sign", color: "amber", mascotAgentIds: ["emily"] }, requiredInputs: commonInputs,
modules: [{ id: "negotiation-script", title: "Negotiation script", role: "OpenCode", description: "Generate negotiation strategy and scripts.", execution: "opencode", promptPath: "prompts/workflows/salary-negotiation-war-room/orchestrator.md", artifactTypes: ["negotiation_script"] }, { id: "emily", title: "Negotiation roleplay", role: "Emily", description: "Create offer negotiation roleplay.", execution: "service", service: "roleplay-service" }], outputs: [{ id: "negotiation_script", type: "markdown", title: "Negotiation script" }], qscoreDimensions: ["voice", "confidence", "strategy"], approvalGates: [],
},
{
id: "promotion-readiness", version: "1.0.0", title: "Promotion Readiness Packet", shortTitle: "Promotion", promise: "Build an evidence packet and manager conversation plan.", segment: ["employed"], urgency: "medium", estimatedDuration: "1 week", priceTier: "starter", sku: "workflow_promotion_readiness", isPurchasable: true, isFreePreview: true, visual: { icon: "trending-up", color: "purple", mascotAgentIds: ["emily", "qscore"] }, requiredInputs: commonInputs,
modules: [{ id: "evidence-packet", title: "Evidence packet", role: "OpenCode", description: "Generate promotion evidence packet.", execution: "opencode", promptPath: "prompts/workflows/promotion-readiness/orchestrator.md", artifactTypes: ["promotion_packet"] }, { id: "emily", title: "Manager conversation roleplay", role: "Emily", description: "Practice the promotion conversation.", execution: "service", service: "roleplay-service" }], outputs: [{ id: "promotion_packet", type: "markdown", title: "Promotion evidence packet" }], qscoreDimensions: ["impact", "leadership", "communication"], approvalGates: [],
},
{
id: "personal-brand-opportunity-engine", version: "1.0.0", title: "Personal Brand Opportunity Engine", shortTitle: "Brand Engine", promise: "Draft profile positioning and a weekly opportunity/content plan.", segment: ["networking", "creators"], urgency: "low", estimatedDuration: "1 week", priceTier: "starter", sku: "workflow_brand_engine", isPurchasable: true, isFreePreview: true, visual: { icon: "sparkles", color: "pink", mascotAgentIds: ["qscore"] }, requiredInputs: commonInputs,
modules: [{ id: "profile-rewrite", title: "Profile rewrite", role: "OpenCode", description: "Generate LinkedIn/profile rewrite draft.", execution: "opencode", promptPath: "prompts/workflows/personal-brand-opportunity-engine/orchestrator.md", artifactTypes: ["profile_rewrite", "content_plan"] }], outputs: [{ id: "profile_rewrite", type: "markdown", title: "Profile rewrite" }, { id: "content_plan", type: "markdown", title: "Weekly content plan" }], qscoreDimensions: ["visibility", "network", "voice"], approvalGates: [],
},
];
export function listWorkflowDefinitions() { return workflowDefinitions; }
export function getWorkflowDefinition(id: string) { return workflowDefinitions.find((w) => w.id === normalizeWorkflowId(id)); }
export function normalizeWorkflowId(id: string) { return id === "job-application" ? "interview-to-offer" : id; }

View File

@@ -0,0 +1,20 @@
import { config } from "../config.js";
export type ServiceCapability = {
id: string;
name: string;
enabled: boolean;
internalUrl?: string;
publicUrl?: string;
operations: string[];
};
export function listServiceCapabilities(): ServiceCapability[] {
return [
{ id: "resume-service", name: "Resume Agent", enabled: Boolean(config.resumeServiceUrl), internalUrl: config.resumeServiceUrl, publicUrl: config.resumePublicUrl, operations: ["resume.analyze", "resume.tailor"] },
{ id: "interview-service", name: "Sara Interview", enabled: Boolean(config.interviewServiceUrl), internalUrl: config.interviewServiceUrl, publicUrl: config.interviewPublicUrl, operations: ["interview.configure", "interview.practice"] },
{ id: "roleplay-service", name: "Emily Roleplay", enabled: Boolean(config.roleplayServiceUrl), internalUrl: config.roleplayServiceUrl, publicUrl: config.roleplayPublicUrl, operations: ["roleplay.configure", "roleplay.practice"] },
{ id: "qscore-service", name: "Quinn Q Score", enabled: Boolean(config.qscoreServiceUrl), internalUrl: config.qscoreServiceUrl, operations: ["qscore.ingest", "qscore.compute"] },
{ id: "opencode", name: "OpenCode Artifact Executor", enabled: true, operations: ["artifact.prepare", "artifact.generate"] },
];
}

View File

@@ -0,0 +1,31 @@
import { access, readFile } from "node:fs/promises";
import { resolve } from "node:path";
import { listWorkflowDefinitions } from "./registry.js";
import { validateWorkflowDefinition } from "./validation.js";
let failed = false;
for (const workflow of listWorkflowDefinitions()) {
const result = validateWorkflowDefinition(workflow);
if (!result.ok) {
failed = true;
console.error(`[workflow:${workflow.id}] ${result.errors.join("; ")}`);
}
for (const mod of workflow.modules.filter((m) => m.execution === "opencode")) {
const path = resolve(process.cwd(), mod.promptPath ?? "");
try {
await access(path);
const text = await readFile(path, "utf8");
for (const required of ["PROMPT_VERSION", "Required sections", "Metadata JSON keys"]) {
if (!text.includes(required)) {
failed = true;
console.error(`[prompt:${mod.id}] ${mod.promptPath} missing ${required}`);
}
}
} catch {
failed = true;
console.error(`[prompt:${mod.id}] missing prompt ${mod.promptPath}`);
}
}
}
if (failed) process.exit(1);
console.log("workflow prompt smoke test passed");

40
src/workflows/types.ts Normal file
View File

@@ -0,0 +1,40 @@
export type WorkflowPriceTier = "free" | "starter" | "premium";
export type WorkflowUrgency = "low" | "medium" | "high";
export type WorkflowModuleExecution = "service" | "opencode" | "manual" | "coming_soon";
export type WorkflowInputDefinition = { id: string; label: string; type: string; required: boolean };
export type ArtifactDefinition = { id: string; type: string; title: string; description?: string; path?: string };
export type ApprovalDefinition = { id: string; title: string; description: string; required: boolean };
export type WorkflowModuleDefinition = {
id: string;
title: string;
role: string;
description: string;
execution: WorkflowModuleExecution;
service?: "resume-service" | "interview-service" | "roleplay-service" | "qscore-service";
promptPath?: string;
artifactTypes?: string[];
approvalGateAfter?: string;
};
export type WorkflowDefinition = {
id: string;
version: string;
title: string;
shortTitle: string;
promise: string;
segment: string[];
urgency: WorkflowUrgency;
estimatedDuration: string;
priceTier: WorkflowPriceTier;
sku: string;
isPurchasable: boolean;
isFreePreview: boolean;
visual: { icon: string; color: string; mascotAgentIds: string[] };
requiredInputs: WorkflowInputDefinition[];
modules: WorkflowModuleDefinition[];
outputs: ArtifactDefinition[];
qscoreDimensions: string[];
approvalGates: ApprovalDefinition[];
};

View File

@@ -0,0 +1,30 @@
import type { WorkflowDefinition, WorkflowModuleDefinition } from "./types.js";
export type ValidationResult = { ok: boolean; errors: string[] };
const REQUIRED_MARKDOWN_SECTIONS = ["Summary", "Next Actions"];
export function validateArtifactMarkdown(markdown: string): ValidationResult {
const errors: string[] = [];
if (!markdown.trim()) errors.push("artifact is empty");
for (const section of REQUIRED_MARKDOWN_SECTIONS) {
const re = new RegExp(`(^|\\n)#{1,3}\\s+${section}\\b`, "i");
if (!re.test(markdown)) errors.push(`missing required section: ${section}`);
}
return { ok: errors.length === 0, errors };
}
export function validateWorkflowDefinition(def: WorkflowDefinition): ValidationResult {
const errors: string[] = [];
if (!def.id || !def.version || !def.title) errors.push("workflow missing id/version/title");
if (!def.modules.length) errors.push("workflow has no modules");
for (const mod of def.modules) validateModule(def, mod, errors);
return { ok: errors.length === 0, errors };
}
function validateModule(def: WorkflowDefinition, mod: WorkflowModuleDefinition, errors: string[]) {
if (!mod.id || !mod.title) errors.push(`${def.id}: module missing id/title`);
if (mod.execution === "service" && !mod.service) errors.push(`${def.id}/${mod.id}: service execution missing service`);
if (mod.execution === "opencode" && !mod.promptPath) errors.push(`${def.id}/${mod.id}: opencode execution missing promptPath`);
if (mod.execution === "opencode" && !mod.artifactTypes?.length) errors.push(`${def.id}/${mod.id}: opencode execution missing artifactTypes`);
}