converstaion actor

This commit is contained in:
-Puter
2026-06-02 19:08:31 +05:30
parent a937bcf09e
commit c4217eb18c
7 changed files with 477 additions and 0 deletions

View File

@@ -0,0 +1,39 @@
# Conversation Actor Prototype
Standalone Rivet actor prototype based on Rivet's `examples/ai-agent` shape, but with actor-local SQLite + Drizzle.
This folder is intentionally **not wired into `src/actors/registry.ts` yet**.
## Files
- `conversation-actor.ts` — Rivet actor with queue-driven message processing and streaming AI SDK response events.
- `schema.ts` — Drizzle SQLite schema for messages, tool calls, and summaries.
- `migrations.ts` — tiny inline SQLite migration for this isolated prototype.
- `agent.ts` — AI SDK v6 `streamText` wrapper and stub memory tools.
- `types.ts` — public event/message/status types.
## Actor key
Use a compound actor key when it is eventually wired:
```ts
client.conversationActor.getOrCreate([userId, conversationId])
```
## Runtime env
The prototype expects:
```txt
OPENAI_API_KEY=...
CONVERSATION_ACTOR_MODEL=...
```
No default model is hardcoded so we do not accidentally freeze this prototype to a stale model id.
## Next steps when wiring later
1. Add `conversationActor` to `src/actors/registry.ts`.
2. Decide whether `userActor` creates conversation ids or frontend supplies them.
3. Replace stub memory tools in `agent.ts` with actor-to-actor calls to `memoryActor[userId]`.
4. Move inline migrations to generated Drizzle migrations if/when this becomes production code.

View File

@@ -0,0 +1,64 @@
import { openai } from "@ai-sdk/openai";
import { streamText, tool } from "ai";
import { z } from "zod";
import type { ConversationMessage } from "./types.js";
const SYSTEM_PROMPT = `You are the GrowQR conversation agent.
Keep answers concise, practical, and focused on the user's goals.
When you learn durable information, call the memory tools. For now these tools
are intentionally stubbed so this actor can stay isolated and unwired.`;
export function getConversationModel() {
const modelId = process.env.CONVERSATION_ACTOR_MODEL;
if (!modelId) {
throw new Error(
"Missing CONVERSATION_ACTOR_MODEL. Set it to an OpenAI model id before using the conversation actor.",
);
}
return openai(modelId);
}
export function buildModelMessages(messages: ConversationMessage[]) {
return messages.map((message) => ({
role: message.role,
content: message.content,
}));
}
export function streamConversationResponse(messages: ConversationMessage[]) {
return streamText({
model: getConversationModel(),
system: SYSTEM_PROMPT,
messages: buildModelMessages(messages),
tools: {
readMemory: tool({
description: "Read a markdown memory file. Stubbed until memoryActor is wired.",
inputSchema: z.object({
path: z.string().describe("Memory path, e.g. /profile.md"),
}),
execute: async ({ path }) => ({
path,
found: false,
content: "",
note: "memoryActor is not wired yet",
}),
}),
writeMemory: tool({
description: "Write a markdown memory file. Stubbed until memoryActor is wired.",
inputSchema: z.object({
path: z.string(),
contentMd: z.string(),
reason: z.string().optional(),
}),
execute: async ({ path, contentMd, reason }) => ({
path,
bytes: contentMd.length,
reason,
saved: false,
note: "memoryActor is not wired yet",
}),
}),
},
});
}

View File

@@ -0,0 +1,201 @@
import { asc, eq } from "drizzle-orm";
import { actor, event, queue } from "rivetkit";
import { db as drizzleDb } from "rivetkit/db/drizzle";
import { streamConversationResponse } from "./agent.js";
import { migrateConversationDb } from "./migrations.js";
import {
conversationMessages,
conversationSchema,
} from "./schema.js";
import type {
ConversationMessage,
ConversationQueueMessage,
ConversationResponseEvent,
ConversationStatus,
} from "./types.js";
const buildId = (prefix: string) =>
`${prefix}-${Date.now()}-${Math.random().toString(16).slice(2)}`;
function now() {
return Date.now();
}
function conversationIdFromKey(key: unknown[]) {
return String(key[1] ?? key[0] ?? "default");
}
function toPublicMessage(row: typeof conversationMessages.$inferSelect): ConversationMessage {
return {
id: row.id,
conversationId: row.conversationId,
role: row.role,
sender: row.sender,
content: row.content,
createdAt:
row.createdAt instanceof Date ? row.createdAt.getTime() : Number(row.createdAt),
};
}
export const conversationActor = actor({
// Keep only small runtime state here. Message history lives in actor-local
// SQLite via Drizzle so this actor can grow without bloating c.state.
state: {
status: {
state: "idle",
updatedAt: Date.now(),
} as ConversationStatus,
},
db: drizzleDb({
schema: conversationSchema,
}),
queues: {
message: queue<ConversationQueueMessage>(),
},
events: {
messageAdded: event<ConversationMessage>(),
status: event<ConversationStatus>(),
response: event<ConversationResponseEvent>(),
},
onCreate: async (c) => {
await migrateConversationDb(c.db);
},
onWake: async (c) => {
await migrateConversationDb(c.db);
},
run: async (c) => {
for await (const queued of c.queue.iter()) {
const { body } = queued;
if (!body?.text || typeof body.text !== "string") continue;
const conversationId = conversationIdFromKey(c.key);
const sender = body.sender?.trim() || "User";
const userMessage: ConversationMessage = {
id: buildId("user"),
conversationId,
role: "user",
sender,
content: body.text.trim(),
createdAt: now(),
};
await c.db.insert(conversationMessages).values({
...userMessage,
createdAt: new Date(userMessage.createdAt),
});
c.broadcast("messageAdded", userMessage);
const historyRows = await c.db
.select()
.from(conversationMessages)
.where(eq(conversationMessages.conversationId, conversationId))
.orderBy(asc(conversationMessages.createdAt));
const history = historyRows.map(toPublicMessage);
const assistantMessage: ConversationMessage = {
id: buildId("assistant"),
conversationId,
role: "assistant",
sender: "GrowQR",
content: "",
createdAt: now(),
};
await c.db.insert(conversationMessages).values({
...assistantMessage,
createdAt: new Date(assistantMessage.createdAt),
});
c.broadcast("messageAdded", assistantMessage);
c.state.status = { state: "thinking", updatedAt: now() };
c.broadcast("status", c.state.status);
try {
const result = streamConversationResponse(history);
let content = "";
for await (const delta of result.textStream) {
if (c.aborted) break;
content += delta;
c.broadcast("response", {
messageId: assistantMessage.id,
delta,
content,
done: false,
});
}
assistantMessage.content = content || assistantMessage.content;
await c.db
.update(conversationMessages)
.set({ content: assistantMessage.content })
.where(eq(conversationMessages.id, assistantMessage.id));
c.broadcast("response", {
messageId: assistantMessage.id,
delta: "",
content: assistantMessage.content,
done: true,
});
c.state.status = { state: "idle", updatedAt: now() };
c.broadcast("status", c.state.status);
} catch (error) {
const errorMessage =
error instanceof Error ? error.message : "Unknown conversation error";
assistantMessage.content =
assistantMessage.content ||
"I hit a snag while responding. Please try again.";
await c.db
.update(conversationMessages)
.set({ content: assistantMessage.content })
.where(eq(conversationMessages.id, assistantMessage.id));
c.state.status = {
state: "error",
updatedAt: now(),
error: errorMessage,
};
c.broadcast("response", {
messageId: assistantMessage.id,
delta: "",
content: assistantMessage.content,
done: true,
error: errorMessage,
});
c.broadcast("status", c.state.status);
}
}
},
actions: {
sendMessage: async (c, input: ConversationQueueMessage) => {
await c.queue.send("message", input);
return { queued: true };
},
getHistory: async (c): Promise<ConversationMessage[]> => {
const conversationId = conversationIdFromKey(c.key);
const rows = await c.db
.select()
.from(conversationMessages)
.where(eq(conversationMessages.conversationId, conversationId))
.orderBy(asc(conversationMessages.createdAt));
return rows.map(toPublicMessage);
},
getStatus: (c): ConversationStatus => c.state.status,
},
});

View File

@@ -0,0 +1,3 @@
export { conversationActor } from "./conversation-actor.js";
export * from "./schema.js";
export * from "./types.js";

View File

@@ -0,0 +1,42 @@
import type { RawAccess } from "rivetkit/db";
// Tiny inline migration for the actor-local SQLite database. This keeps the
// example self-contained and avoids wiring drizzle-kit output into the app yet.
export async function migrateConversationDb(db: RawAccess) {
await db.execute(`
CREATE TABLE IF NOT EXISTS conversation_messages (
id TEXT PRIMARY KEY NOT NULL,
conversation_id TEXT NOT NULL,
role TEXT NOT NULL CHECK (role IN ('user', 'assistant')),
sender TEXT NOT NULL,
content TEXT NOT NULL,
created_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS conversation_messages_conversation_created_at_idx
ON conversation_messages (conversation_id, created_at);
CREATE TABLE IF NOT EXISTS conversation_tool_calls (
id TEXT PRIMARY KEY NOT NULL,
conversation_id TEXT NOT NULL,
message_id TEXT NOT NULL REFERENCES conversation_messages(id) ON DELETE CASCADE,
tool_name TEXT NOT NULL,
args_json TEXT,
result_json TEXT,
status TEXT NOT NULL DEFAULT 'running' CHECK (status IN ('running', 'done', 'error')),
created_at INTEGER NOT NULL,
finished_at INTEGER
);
CREATE INDEX IF NOT EXISTS conversation_tool_calls_conversation_idx
ON conversation_tool_calls (conversation_id, created_at);
CREATE TABLE IF NOT EXISTS conversation_summaries (
id TEXT PRIMARY KEY NOT NULL,
conversation_id TEXT NOT NULL,
content_md TEXT NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
`);
}

View File

@@ -0,0 +1,89 @@
import { relations, sql } from "drizzle-orm";
import {
index,
integer,
sqliteTable,
text,
} from "rivetkit/db/drizzle";
export const conversationMessages = sqliteTable(
"conversation_messages",
{
id: text("id").primaryKey(),
conversationId: text("conversation_id").notNull(),
role: text("role", { enum: ["user", "assistant"] }).notNull(),
sender: text("sender").notNull(),
content: text("content").notNull(),
createdAt: integer("created_at", { mode: "timestamp_ms" }).notNull(),
},
(table) => ({
conversationCreatedAtIdx: index("conversation_messages_conversation_created_at_idx").on(
table.conversationId,
table.createdAt,
),
}),
);
export const conversationToolCalls = sqliteTable(
"conversation_tool_calls",
{
id: text("id").primaryKey(),
conversationId: text("conversation_id").notNull(),
messageId: text("message_id")
.notNull()
.references(() => conversationMessages.id, { onDelete: "cascade" }),
toolName: text("tool_name").notNull(),
argsJson: text("args_json", { mode: "json" }).$type<Record<string, unknown>>(),
resultJson: text("result_json", { mode: "json" }).$type<Record<string, unknown>>(),
status: text("status", { enum: ["running", "done", "error"] })
.notNull()
.default("running"),
createdAt: integer("created_at", { mode: "timestamp_ms" }).notNull(),
finishedAt: integer("finished_at", { mode: "timestamp_ms" }),
},
(table) => ({
conversationIdx: index("conversation_tool_calls_conversation_idx").on(
table.conversationId,
table.createdAt,
),
}),
);
export const conversationSummaries = sqliteTable("conversation_summaries", {
id: text("id").primaryKey(),
conversationId: text("conversation_id").notNull(),
contentMd: text("content_md").notNull(),
createdAt: integer("created_at", { mode: "timestamp_ms" }).notNull(),
updatedAt: integer("updated_at", { mode: "timestamp_ms" })
.notNull()
.default(sql`(unixepoch('subsec') * 1000)`),
});
export const conversationMessagesRelations = relations(
conversationMessages,
({ many }) => ({
toolCalls: many(conversationToolCalls),
}),
);
export const conversationToolCallsRelations = relations(
conversationToolCalls,
({ one }) => ({
message: one(conversationMessages, {
fields: [conversationToolCalls.messageId],
references: [conversationMessages.id],
}),
}),
);
export const conversationSchema = {
conversationMessages,
conversationToolCalls,
conversationSummaries,
};
export type ConversationMessageRow = typeof conversationMessages.$inferSelect;
export type NewConversationMessageRow = typeof conversationMessages.$inferInsert;
export type ConversationToolCallRow = typeof conversationToolCalls.$inferSelect;
export type NewConversationToolCallRow = typeof conversationToolCalls.$inferInsert;
export type ConversationSummaryRow = typeof conversationSummaries.$inferSelect;

View File

@@ -0,0 +1,39 @@
export type ConversationRole = "user" | "assistant";
export type ConversationStatus = {
state: "idle" | "thinking" | "error";
updatedAt: number;
error?: string;
};
export type ConversationMessage = {
id: string;
conversationId: string;
role: ConversationRole;
sender: string;
content: string;
createdAt: number;
};
export type ConversationQueueMessage = {
text: string;
sender?: string;
};
export type ConversationResponseEvent = {
messageId: string;
delta: string;
content: string;
done: boolean;
error?: string;
};
export type ConversationToolEvent = {
id: string;
messageId: string;
toolName: string;
status: "running" | "done" | "error";
args?: Record<string, unknown>;
result?: Record<string, unknown>;
error?: string;
};