Files
growqr-backend/src/actors/grow-agent.ts
NinjasPyajamas 2d471c61b4 feat: introduce workflow job management and agent orchestration
- Added workflow job actor to manage job application workflows.
- Implemented agent catalog for various workflow agents.
- Created service agents for interview, roleplay, and Q-Score functionalities.
- Enhanced user authentication to automatically create users if they do not exist.
- Updated configuration to support new LLM provider and API keys.
- Introduced new routes for agent and workflow management.
- Refactored Docker management to improve Gitea admin user creation and token generation.
- Removed deprecated Anthropics SDK integration.
2026-05-21 23:17:26 +05:30

339 lines
9.9 KiB
TypeScript

import { actor } from "rivetkit";
import { log } from "../log.js";
import { config } from "../config.js";
import {
createChatCompletion,
GROW_AGENT_SYSTEM,
growAgentTools,
type LlmMessage,
type LlmToolCall,
} from "../lib/llm.js";
import {
provisionUserStack,
getUserStack,
stopUserStack,
giteaClientFor,
} from "../docker/manager.js";
import { runSubAgentTask } from "./sub-agent-runner.js";
import { db } from "../db/client.js";
import { actors as actorsTable, events as eventsTable } from "../db/schema.js";
type ChatTurn = {
role: "user" | "assistant" | "tool";
content: string;
toolCallId?: string;
toolCalls?: LlmToolCall[];
};
type GrowAgentState = {
userId: string;
goals: string[];
history: ChatTurn[];
// Trimmed once it grows past N turns; long history is delegated to memory repo.
maxHistory: number;
};
const initialState: GrowAgentState = {
userId: "",
goals: [],
history: [],
maxHistory: 40,
};
const MEMORY_REPO_PATH_LIMIT = 1024;
// One Grow Agent actor instance per user (key the actor by userId).
// Owns the user's Docker stack + LLM conversation loop.
export const growAgent = actor({
state: initialState,
actions: {
// Idempotent. Provisions the per-user OpenCode + Gitea stack if missing.
init: async (c, input: { userId: string }) => {
if (c.state.userId && c.state.userId !== input.userId) {
throw new Error("Grow Agent already bound to a different user");
}
c.state.userId = input.userId;
const stack = await provisionUserStack(input.userId);
await db
.insert(actorsTable)
.values({
actorId: `grow-${input.userId}`,
userId: input.userId,
kind: "grow",
status: "idle",
lastActivityAt: new Date(),
})
.onConflictDoNothing();
c.broadcast("stack-ready", {
userId: input.userId,
opencode: `${stack.opencodeHost}:${stack.opencodePort}`,
gitea: `${stack.giteaHost}:${stack.giteaHttpPort}`,
memoryRepo: stack.giteaMemoryRepo,
});
return stack;
},
// Main chat entry point. Runs the full agentic loop through the configured LLM.
receiveMessage: async (c, msg: { text: string }) => {
if (!c.state.userId) {
throw new Error("Grow Agent not initialized");
}
const userTurn: ChatTurn = { role: "user", content: msg.text };
c.state.history.push(userTurn);
c.broadcast("message", { role: "user", text: msg.text });
const assistantText = await runAgentLoop(c, c.state.userId);
// Trim history to maxHistory turns; long-term context lives in Gitea.
while (c.state.history.length > c.state.maxHistory) {
c.state.history.shift();
}
await db
.insert(eventsTable)
.values({
userId: c.state.userId,
actorId: `grow-${c.state.userId}`,
type: "grow.message",
payload: { userText: msg.text, assistantText },
});
return { reply: assistantText };
},
// Sub-agent status updates fan back in via this action; the Grow Agent
// broadcasts them so the frontend's sidebar can render them under the
// right channel.
subAgentEvent: async (
c,
input: {
subAgentId: string;
type: "started" | "progress" | "done" | "error";
message?: string;
result?: unknown;
},
) => {
c.broadcast("sub-agent-event", input);
},
getHistory: async (c) => c.state.history,
getGoals: async (c) => c.state.goals,
shutdown: async (c) => {
if (c.state.userId) await stopUserStack(c.state.userId);
},
},
});
// The agentic loop. Keeps calling the configured LLM with tools until the model
// returns a normal assistant turn.
async function runAgentLoop(
c: {
state: GrowAgentState;
broadcast: (event: string, data: unknown) => void;
},
userId: string,
): Promise<string> {
if (!config.llmApiKey) {
const reply =
"LLM_API_KEY or OPENCODE_API_KEY is not configured on the backend - set it to enable the Grow Agent.";
c.state.history.push({ role: "assistant", content: reply });
c.broadcast("message", { role: "agent", text: reply });
return reply;
}
c.broadcast("agent-thinking", { state: "running" });
const MAX_ITERATIONS = 8;
let assistantTextOut = "";
for (let i = 0; i < MAX_ITERATIONS; i++) {
const response = await createChatCompletion({
model: config.growAgentModel,
maxTokens: config.maxAgentTokens,
tools: growAgentTools,
messages: messagesForApi(c.state.history),
});
// Capture assistant text for streaming-style broadcast.
if (response.content) {
assistantTextOut += (assistantTextOut ? "\n\n" : "") + response.content;
c.broadcast("message", { role: "agent", text: response.content });
}
// Persist the assistant turn, including tool calls for the next tool result turn.
c.state.history.push({
role: "assistant",
content: response.content,
toolCalls: response.toolCalls,
});
if (response.toolCalls.length === 0) {
break;
}
for (const call of response.toolCalls) {
try {
const result = await dispatchTool(c, userId, call);
c.state.history.push({
role: "tool",
toolCallId: call.id,
content: typeof result === "string" ? result : JSON.stringify(result),
});
} catch (err) {
log.error({ err, tool: call.name }, "tool dispatch failed");
c.state.history.push({
role: "tool",
toolCallId: call.id,
content: `Error: ${err instanceof Error ? err.message : String(err)}`,
});
}
}
}
c.broadcast("agent-thinking", { state: "idle" });
return assistantTextOut || "(no response)";
}
function messagesForApi(history: ChatTurn[]): LlmMessage[] {
const messages: LlmMessage[] = [
{ role: "system", content: GROW_AGENT_SYSTEM },
];
for (const turn of history) {
if (turn.role === "tool") {
messages.push({
role: "tool",
content: turn.content,
tool_call_id: turn.toolCallId,
});
continue;
}
messages.push({
role: turn.role,
content: turn.content,
tool_calls: turn.toolCalls?.map((call) => ({
id: call.id,
type: "function",
function: {
name: call.name,
arguments: JSON.stringify(call.arguments),
},
})),
});
}
return messages;
}
async function dispatchTool(
c: {
broadcast: (event: string, data: unknown) => void;
state: GrowAgentState;
},
userId: string,
call: LlmToolCall,
): Promise<unknown> {
const input = call.arguments;
switch (call.name) {
case "spawn_sub_agent": {
const type = String(input.type ?? "generic");
const prompt = String(input.prompt ?? "");
const channelId =
typeof input.channelId === "string"
? input.channelId
: `${type}-${Date.now()}`;
const id = `sub-${type}-${Date.now()}`;
await db
.insert(actorsTable)
.values({
actorId: id,
userId,
kind: "sub",
subType: type,
status: "running",
channelId,
parentActorId: `grow-${userId}`,
lastActivityAt: new Date(),
});
c.broadcast("sub-agent-spawned", { id, type, channelId, prompt });
// Fire-and-forget; the runner updates DB + broadcasts via the actor.
void runSubAgentTask({
userId,
subAgentId: id,
type,
prompt,
channelId,
onEvent: (event, data) => c.broadcast(event, data),
});
return { id, type, channelId, status: "running" };
}
case "commit_memory": {
const path = String(input.path ?? "").slice(0, MEMORY_REPO_PATH_LIMIT);
const content = String(input.content ?? "");
const message = String(input.message ?? "memory update");
const client = await giteaClientFor(userId);
const stack = await getUserStack(userId);
if (!client || !stack?.giteaMemoryRepo) {
return { ok: false, error: "memory repo not provisioned" };
}
const [owner, repo] = stack.giteaMemoryRepo.split("/") as [string, string];
const result = await client.putFile({
owner,
repo,
path,
contentUtf8: content,
message,
});
c.broadcast("memory-committed", { path, message });
return { ok: true, path, commitSha: result.commitSha };
}
case "read_memory": {
const path = String(input.path ?? "");
const client = await giteaClientFor(userId);
const stack = await getUserStack(userId);
if (!client || !stack?.giteaMemoryRepo) return null;
const [owner, repo] = stack.giteaMemoryRepo.split("/") as [string, string];
const text = await client.readFile({ owner, repo, path });
return text;
}
case "list_memory": {
const pathPrefix = String(input.pathPrefix ?? "");
const client = await giteaClientFor(userId);
const stack = await getUserStack(userId);
if (!client || !stack?.giteaMemoryRepo) return [];
const [owner, repo] = stack.giteaMemoryRepo.split("/") as [string, string];
// Gitea contents API on a directory returns an array of entries.
try {
const res = await fetch(
`http://${stack.giteaHost}:${stack.giteaHttpPort}/api/v1/repos/${owner}/${repo}/contents/${encodeURI(pathPrefix)}`,
{
headers: {
authorization: `token ${stack.giteaAdminToken}`,
accept: "application/json",
},
},
);
if (!res.ok) return [];
const entries = (await res.json()) as Array<{
name: string;
path: string;
type: string;
}>;
return entries.map((e) => ({ name: e.name, path: e.path, type: e.type }));
} catch {
return [];
}
}
default:
throw new Error(`unknown tool: ${call.name}`);
}
}