From bd86bf024affca3e4f739ab9bb8cac1162e7698e Mon Sep 17 00:00:00 2001 From: Jacco Flenter Date: Tue, 15 Apr 2025 22:18:32 +0200 Subject: [PATCH 1/2] Avoid issues in the Observed decorator That are incompatible with private/protected types and the decorator pattern --- packages/agents/package.json | 1 + packages/agents/src/index.tsx | 101 ++++++++++++++++++++-------------- pnpm-lock.yaml | 8 +++ 3 files changed, 69 insertions(+), 41 deletions(-) diff --git a/packages/agents/package.json b/packages/agents/package.json index 75fa1668d..1a36ffd59 100644 --- a/packages/agents/package.json +++ b/packages/agents/package.json @@ -46,6 +46,7 @@ "hono": "^4.7.4", "jsonc-parser": "^3.3.1", "mime-types": "^2.1.35", + "reflect-metadata": "^0.2.2", "toml": "^3.0.0", "wrangler": "^4.1.0", "ws": "^8.18.1", diff --git a/packages/agents/src/index.tsx b/packages/agents/src/index.tsx index 3d6ea6e97..d1b1b873c 100644 --- a/packages/agents/src/index.tsx +++ b/packages/agents/src/index.tsx @@ -1,4 +1,5 @@ import type { Agent, Connection, ConnectionContext, WSMessage } from "agents"; +import type { AIChatAgent } from "agents/ai-chat-agent"; import { Hono } from "hono"; import { type SSEStreamingApi, streamSSE } from "hono/streaming"; import packageJson from "../package.json" assert { type: "json" }; @@ -7,7 +8,7 @@ import { registerAgent, registerAgentInstance, } from "./agentInstances"; -import { type AgentEvent, agentEventSchema } from "./types"; +import { type AgentEvent } from "./types"; import { createRequestPayload, createResponsePayload, @@ -16,7 +17,6 @@ import { toKebabCase, tryCatch, } from "./utils"; - // Define types for database schema type ColumnType = "string" | "number" | "boolean" | "null" | "object" | "array"; type TableSchema = { @@ -26,9 +26,28 @@ type TableSchema = { }; type DatabaseResult = Record; +interface DurableObjectState { + waitUntil(promise: Promise): void; + readonly id: DurableObjectId; + readonly storage: DurableObjectStorage; + container?: Container; + blockConcurrencyWhile(callback: () => Promise): Promise; + acceptWebSocket(ws: WebSocket, tags?: string[]): void; + getWebSockets(tag?: string): WebSocket[]; + setWebSocketAutoResponse(maybeReqResp?: WebSocketRequestResponsePair): void; + getWebSocketAutoResponse(): WebSocketRequestResponsePair | null; + getWebSocketAutoResponseTimestamp(ws: WebSocket): Date | null; + setHibernatableWebSocketEventTimeout(timeoutMs?: number): void; + getHibernatableWebSocketEventTimeout(): number | null; + getTags(ws: WebSocket): string[]; + abort(reason?: string): void; +} + type AgentConstructor = new ( // biome-ignore lint/suspicious/noExplicitAny: mixin pattern requires any[] - ...args: any[] + // ...args: any[] + ctx: DurableObjectState, + env: E, ) => Agent; const version = packageJson.version; @@ -219,7 +238,7 @@ interface FiberProperties { activeStreams: Set; } -type FiberDecoratedAgent = Agent & FiberProperties; +type FiberDecoratedAgent = Agent & FiberProperties; /** * Class decorator factory that adds Observed capabilities to Agent classes @@ -234,30 +253,30 @@ type FiberDecoratedAgent = Agent & FiberProperties; * ``` */ export function Observed() { - return >(BaseClass: T) => { - return class extends BaseClass { + return (BaseClass: AgentConstructor) => { + return class ObservedClass extends BaseClass { // Store the class name of the super class - private superClassName: string; + #superClassName: string; // Store whether we've registered the instance already - private instanceRegistered = false; + #instanceRegistered = false; // biome-ignore lint/complexity/noUselessConstructor: Required for TypeScript mixins // biome-ignore lint/suspicious/noExplicitAny: Required for TypeScript mixins - constructor(...args: any[]) { - super(...args); - this.superClassName = Object.getPrototypeOf(this.constructor).name; - registerAgent(this.superClassName); + constructor(readonly ctx: DurableObjectState, readonly env: E) { + super(ctx, env); + this.#superClassName = Object.getPrototypeOf(this.constructor).name; + registerAgent(this.#superClassName); } fiberRouter?: Hono; activeStreams = new Set(); - private recordEvent(event: AgentEvent) { - if (!this.instanceRegistered) { - this.instanceRegistered = true; - registerAgentInstance(this.superClassName, this.name); + #recordEvent(event: AgentEvent) { + if (!this.#instanceRegistered) { + this.#instanceRegistered = true; + registerAgentInstance(this.#superClassName, this.name); } const { type: eventName, payload } = event; @@ -271,7 +290,7 @@ export function Observed() { onStateUpdate(state: unknown, source: Connection | "server"): void { const sourceId = source === "server" ? "server" : source.id; - this.recordEvent({ + this.#recordEvent({ type: "state_change", payload: { state, @@ -282,20 +301,20 @@ export function Observed() { super.onStateUpdate(state as S, source); } - override broadcast( + broadcast( msg: string | ArrayBuffer | ArrayBufferView, without?: string[] | undefined, ): void { - this.recordEvent({ + this.#recordEvent({ type: "broadcast", payload: { message: typeof msg === "string" ? msg : { - type: "binary", - size: msg instanceof Blob ? msg.size : msg.byteLength, - }, + type: "binary", + size: msg instanceof Blob ? msg.size : msg.byteLength, + }, without, }, }); @@ -304,7 +323,7 @@ export function Observed() { } // Create a proxy for a WebSocket-like object to intercept send calls - private createWebSocketProxy(connection: Connection): Connection { + #createWebSocketProxy(connection: Connection): Connection { const self = this; return new Proxy(connection, { get(target, prop, receiver) { @@ -314,7 +333,7 @@ export function Observed() { this: Connection, message: string | ArrayBuffer | ArrayBufferView, ) { - self.recordEvent({ + self.#recordEvent({ type: "ws_send", payload: { connectionId: target.id, @@ -322,12 +341,12 @@ export function Observed() { typeof message === "string" ? message : { - type: "binary" as const, - size: - message instanceof Blob - ? message.size - : message.byteLength, - }, + type: "binary" as const, + size: + message instanceof Blob + ? message.size + : message.byteLength, + }, }, }); @@ -346,7 +365,7 @@ export function Observed() { } onMessage(connection: Connection, message: WSMessage) { - this.recordEvent({ + this.#recordEvent({ type: "ws_message", payload: { connectionId: connection.id, @@ -357,14 +376,14 @@ export function Observed() { }, }); - const connectionProxy = this.createWebSocketProxy(connection); + const connectionProxy = this.#createWebSocketProxy(connection); // Use the original connection for the parent class return super.onMessage(connectionProxy, message); } onConnect(connection: Connection, ctx: ConnectionContext) { - this.recordEvent({ + this.#recordEvent({ type: "ws_open", payload: { connectionId: connection.id, @@ -372,7 +391,7 @@ export function Observed() { }); // Create a proxied connection to intercept send calls - const proxiedConnection = this.createWebSocketProxy(connection); + const proxiedConnection = this.#createWebSocketProxy(connection); // Use the proxied connection for the parent class return super.onConnect(proxiedConnection, ctx); @@ -384,7 +403,7 @@ export function Observed() { reason: string, wasClean: boolean, ): void | Promise { - this.recordEvent({ + this.#recordEvent({ type: "ws_close", payload: { connectionId: connection.id, code, reason, wasClean }, }); @@ -404,7 +423,7 @@ export function Observed() { // Create a promise chain to ensure the event is recorded // since we may need to read the body of the request const eventPromise = Promise.resolve().then(async () => { - this.recordEvent({ + this.#recordEvent({ type: "http_request", // Clone the request to avoid consuming the body payload: await createRequestPayload( @@ -418,7 +437,7 @@ export function Observed() { if (isPromiseLike(result)) { return Promise.all([result, eventPromise]).then(async ([res]) => { const payload = await createResponsePayload(res.clone()); - this.recordEvent({ + this.#recordEvent({ type: "http_response", payload: { ...payload, @@ -435,7 +454,7 @@ export function Observed() { eventPromise.then(async () => { const payload = await createResponsePayload(capturedResponse); - this.recordEvent({ + this.#recordEvent({ type: "http_response", payload: { ...payload, @@ -450,7 +469,7 @@ export function Observed() { return this.fiberRouter.fetch(request); } - } as T; + }; }; } @@ -471,8 +490,8 @@ function createFpApp() { const durableObjects = c.env && typeof c.env === "object" ? (Object.entries(c.env as Record).filter( - ([key, value]) => isDurableObjectNamespace(value), - ) as Array<[string, DurableObjectNamespace]>) + ([key, value]) => isDurableObjectNamespace(value), + ) as Array<[string, DurableObjectNamespace]>) : []; for (const [name] of durableObjects) { // See if we're aware of an agent with the same id diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 14f37b2c7..7f368a6a3 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -196,6 +196,9 @@ importers: mime-types: specifier: ^2.1.35 version: 2.1.35 + reflect-metadata: + specifier: ^0.2.2 + version: 0.2.2 toml: specifier: ^3.0.0 version: 3.0.0 @@ -7968,6 +7971,9 @@ packages: recma-stringify@1.0.0: resolution: {integrity: sha512-cjwII1MdIIVloKvC9ErQ+OgAtwHBmcZ0Bg4ciz78FtbT8In39aAYbaA7zvxQ61xVMSPE8WxhLwLbhif4Js2C+g==} + reflect-metadata@0.2.2: + resolution: {integrity: sha512-urBwgfrvVP/eAyXx4hluJivBKzuEbSQs9rKWCrCkbSxNv8mxPcUZKeuoF3Uy4mJl3Lwprp6yy5/39VWigZ4K6Q==} + refractor@3.6.0: resolution: {integrity: sha512-MY9W41IOWxxk31o+YvFCNyNzdkc9M20NoZK5vq6jkv4I/uh2zkWcfudj0Q1fovjUQJrNewS9NMzeTtqPf+n5EA==} @@ -17561,6 +17567,8 @@ snapshots: unified: 11.0.5 vfile: 6.0.3 + reflect-metadata@0.2.2: {} + refractor@3.6.0: dependencies: hastscript: 6.0.0 From 69c40ac3a3fc78f91870ace6741d05c8ad9962d7 Mon Sep 17 00:00:00 2001 From: Jacco Flenter Date: Wed, 16 Apr 2025 11:08:55 +0200 Subject: [PATCH 2/2] WIP - not valid code --- examples/simple-agent/src/server.ts | 2 +- packages/agents/src/agent-types.ts | 354 ++++++++++++++++++++++++++++ packages/agents/src/index.tsx | 55 +++-- 3 files changed, 391 insertions(+), 20 deletions(-) create mode 100644 packages/agents/src/agent-types.ts diff --git a/examples/simple-agent/src/server.ts b/examples/simple-agent/src/server.ts index ec49665cb..cf8327a49 100644 --- a/examples/simple-agent/src/server.ts +++ b/examples/simple-agent/src/server.ts @@ -36,7 +36,7 @@ export const agentContext = new AsyncLocalStorage(); /** * Chat Agent implementation that handles real-time AI chat interactions */ -@Observed() +@Observed() export class ChatClient extends AIChatAgent { initialState = { memories: {} }; diff --git a/packages/agents/src/agent-types.ts b/packages/agents/src/agent-types.ts new file mode 100644 index 000000000..1eb92b966 --- /dev/null +++ b/packages/agents/src/agent-types.ts @@ -0,0 +1,354 @@ +abstract class DurableObject { + protected ctx: DurableObjectState; + protected env: Env; + constructor(ctx: DurableObjectState, env: Env) { + this.ctx = ctx; + this.env = env; + } + fetch?(request: Request): Response | Promise; + alarm?(alarmInfo?: AlarmInvocationInfo): void | Promise; + webSocketMessage?( + ws: WebSocket, + message: string | ArrayBuffer, + ): void | Promise; + webSocketClose?( + ws: WebSocket, + code: number, + reason: string, + wasClean: boolean, + ): void | Promise; + webSocketError?(ws: WebSocket, error: unknown): void | Promise; +} + +abstract class Server extends DurableObject { + // constructor(ctx: DurableObjectState, env: Env); + /** + * Handle incoming requests to the server. + */ + abstract fetch(request: Request): Promise; + abstract webSocketMessage(ws: WebSocket, message: WSMessage): Promise; + abstract webSocketClose( + ws: WebSocket, + code: number, + reason: string, + wasClean: boolean + ): Promise; + abstract webSocketError(ws: WebSocket, error: unknown): Promise; + /** + * The name for this server. Write-once-only. + */ + abstract get name(): string; + abstract setName(name: string): Promise; + /** Send a message to all connected clients, except connection ids listed in `without` */ + abstract broadcast( + msg: string | ArrayBuffer | ArrayBufferView, + without?: string[] | undefined + ): void; + /** Get a connection by connection id */ + abstract getConnection(id: string): Connection | undefined; + /** + * Get all connections. Optionally, you can provide a tag to filter returned connections. + * Use `Server#getConnectionTags` to tag the connection on connect. + */ + abstract getConnections(tag?: string): Iterable>; + /** + * You can tag a connection to filter them in Server#getConnections. + * Each connection supports up to 9 tags, each tag max length is 256 characters. + */ + abstract getConnectionTags( + connection: Connection, + context: ConnectionContext + ): string[] | Promise; + /** + * Called when the server is started for the first time. + */ + abstract onStart(): void | Promise; + /** + * Called when a new connection is made to the server. + */ + abstract onConnect( + connection: Connection, + ctx: ConnectionContext + ): void | Promise; + /** + * Called when a message is received from a connection. + */ + abstract onMessage(connection: Connection, message: WSMessage): void | Promise; + /** + * Called when a connection is closed. + */ + abstract onClose( + connection: Connection, + code: number, + reason: string, + wasClean: boolean + ): void | Promise; + /** + * Called when an error occurs on a connection. + */ + abstract onError(connection: Connection, error: unknown): void | Promise; + /** + * Called when a request is made to the server. + */ + abstract onRequest(request: Request): Response | Promise; + abstract onAlarm(): void | Promise; + abstract alarm(): Promise; +} + + + +export abstract class IAgent extends Server { + /** + * Initial state for the Agent + * Override to provide default state values + */ + abstract initialState: State; + /** + * Current state of the Agent + */ + abstract get state(): State; + /** + * Agent configuration options + */ + // static options: { + // /** Whether the Agent should hibernate when inactive */ + // hibernate: boolean; + // }; + /** + * Execute SQL queries against the Agent's database + * @template T Type of the returned rows + * @param strings SQL query template strings + * @param values Values to be inserted into the query + * @returns Array of query results + */ + abstract sql>( + strings: TemplateStringsArray, + ...values: (string | number | boolean | null)[] + ): T[]; + // constructor(ctx: AgentContext, env: Env); + /** + * Update the Agent's state + * @param state New state to set + */ + abstract setState(state: State): void; + /** + * Called when the Agent's state is updated + * @param state Updated state + * @param source Source of the state update ("server" or a client connection) + */ + abstract onStateUpdate(state: State | undefined, source: Connection | "server"): void; + /** + * Called when the Agent receives an email + * @param email Email message to process + */ + abstract onEmail(email: ForwardableEmailMessage): Promise; + abstract onError(connection: Connection, error: unknown): void | Promise; + abstract onError(error: unknown): void | Promise; + /** + * Render content (not implemented in base class) + */ + abstract render(): void; + /** + * Schedule a task to be executed in the future + * @template T Type of the payload data + * @param when When to execute the task (Date, seconds delay, or cron expression) + * @param callback Name of the method to call + * @param payload Data to pass to the callback + * @returns Schedule object representing the scheduled task + */ + abstract schedule( + when: Date | string | number, + callback: keyof this, + payload?: T + ): Promise>; + /** + * Get a scheduled task by ID + * @template T Type of the payload data + * @param id ID of the scheduled task + * @returns The Schedule object or undefined if not found + */ + abstract getSchedule(id: string): Promise | undefined>; + /** + * Get scheduled tasks matching the given criteria + * @template T Type of the payload data + * @param criteria Criteria to filter schedules + * @returns Array of matching Schedule objects + */ + abstract getSchedules(criteria?: { + description?: string; + id?: string; + type?: "scheduled" | "delayed" | "cron"; + timeRange?: { + start?: Date; + end?: Date; + }; + }): Schedule[]; + /** + * Cancel a scheduled task + * @param id ID of the task to cancel + * @returns true if the task was cancelled, false otherwise + */ + abstract cancelSchedule(id: string): Promise; + /** + * Method called when an alarm fires + * Executes any scheduled tasks that are due + */ + abstract alarm(): Promise; + /** + * Destroy the Agent, removing all state and scheduled tasks + */ + abstract destroy(): Promise; +} +type ImmutablePrimitive = undefined | null | boolean | string | number; +type Immutable = T extends ImmutablePrimitive + ? T + : T extends Array + ? ImmutableArray + : T extends Map + ? ImmutableMap + : T extends Set + ? ImmutableSet + : ImmutableObject; +type ImmutableArray = ReadonlyArray>; +type ImmutableMap = ReadonlyMap, Immutable>; +type ImmutableSet = ReadonlySet>; +type ImmutableObject = { + readonly [K in keyof T]: Immutable; +}; + +type ConnectionState = ImmutableObject | null; +/** + * Connection object for WebSocket connections + */ +// export interface Connection { +// /** Send a message to the connection */ +// send(message: string): void; +// /** Close the connection */ +// close(code?: number, reason?: string): void; +// } + +export type Connection = WebSocket & { + /** Connection identifier */ + id: string; + /** + * Arbitrary state associated with this connection. + * Read-only, use Connection.setState to update the state. + */ + // state: ConnectionState; + // setState( + // state: TState | ConnectionSetStateFn | null + // ): ConnectionState; + // /** @deprecated use Connection.setState instead */ + // serializeAttachment(attachment: T): void; + // /** @deprecated use Connection.state instead */ + // deserializeAttachment(): T | null; + /** + * Server's name + */ + server: string; +}; + + +/** + * Context for WebSocket connections + */ +export interface ConnectionContext { + /** HTTP request that initiated the connection */ + request: Request; +} + +/** + * WebSocket message type + */ +export type WSMessage = string | ArrayBuffer | ArrayBufferView; + +export type Schedule = { + /** Unique identifier for the schedule */ + id: string; + /** Name of the method to be called */ + callback: string; + /** Data to be passed to the callback */ + payload: T; +} & ( + | { + /** Type of schedule for one-time execution at a specific time */ + type: "scheduled"; + /** Timestamp when the task should execute */ + time: number; + } + | { + /** Type of schedule for delayed execution */ + type: "delayed"; + /** Timestamp when the task should execute */ + time: number; + /** Number of seconds to delay execution */ + delayInSeconds: number; + } + | { + /** Type of schedule for recurring execution based on cron expression */ + type: "cron"; + /** Timestamp for the next execution */ + time: number; + /** Cron expression defining the schedule */ + cron: string; + } + ); + +// export interface ForwardableEmailMessage { +// from: string; +// to: string[]; +// subject?: string; +// text?: string; +// html?: string; +// headers?: Record; +// attachments?: Array<{ +// filename?: string; +// content: ArrayBuffer; +// contentType?: string; +// }>; +// } +interface EmailMessage { + /** + * Envelope From attribute of the email message. + */ + readonly from: string; + /** + * Envelope To attribute of the email message. + */ + readonly to: string; +} + + +export interface ForwardableEmailMessage extends EmailMessage { + /** + * Stream of the email message content. + */ + readonly raw: ReadableStream; + /** + * An [Headers object](https://developer.mozilla.org/en-US/docs/Web/API/Headers). + */ + readonly headers: Headers; + /** + * Size of the email message content. + */ + readonly rawSize: number; + /** + * Reject this email message by returning a permanent SMTP error back to the connecting client including the given reason. + * @param reason The reject reason. + * @returns void + */ + setReject(reason: string): void; + /** + * Forward this email message to a verified destination address of the account. + * @param rcptTo Verified destination address. + * @param headers A [Headers object](https://developer.mozilla.org/en-US/docs/Web/API/Headers). + * @returns A promise that resolves when the email message is forwarded. + */ + forward(rcptTo: string, headers?: Headers): Promise; + /** + * Reply to the sender of this email message with a new EmailMessage object. + * @param message The reply message. + * @returns A promise that resolves when the email message is replied. + */ + reply(message: EmailMessage): Promise; +} diff --git a/packages/agents/src/index.tsx b/packages/agents/src/index.tsx index d1b1b873c..41ebb1a1b 100644 --- a/packages/agents/src/index.tsx +++ b/packages/agents/src/index.tsx @@ -1,7 +1,8 @@ -import type { Agent, Connection, ConnectionContext, WSMessage } from "agents"; -import type { AIChatAgent } from "agents/ai-chat-agent"; +import type { Agent as IAgent, Connection, ConnectionContext, WSMessage } from "agents"; +// import type { AIChatAgent } from "agents/ai-chat-agent"; import { Hono } from "hono"; import { type SSEStreamingApi, streamSSE } from "hono/streaming"; +import "reflect-metadata"; // Import reflect-metadata import packageJson from "../package.json" assert { type: "json" }; import { getAgents, @@ -17,6 +18,11 @@ import { toKebabCase, tryCatch, } from "./utils"; +// import type { IAgent, ConnectionContext, Connection, WSMessage, } from "./agent-types"; +// import type { IAgent, ConnectionContext, Connection, WSMessage, } from "./agent-types"; + +// Metadata keys for the decorator +const METADATA_OBSERVED = Symbol("ob:observed"); // Define types for database schema type ColumnType = "string" | "number" | "boolean" | "null" | "object" | "array"; type TableSchema = { @@ -43,17 +49,19 @@ interface DurableObjectState { abort(reason?: string): void; } -type AgentConstructor = new ( + + +type AgentConstructor = new ( // biome-ignore lint/suspicious/noExplicitAny: mixin pattern requires any[] // ...args: any[] ctx: DurableObjectState, env: E, -) => Agent; +) => T extends IAgent ? T : never; const version = packageJson.version; const commitHash = import.meta.env.GIT_COMMIT_HASH ?? ""; -function createAgentAdminRouter(agent: FiberDecoratedAgent) { +function createAgentAdminRouter(agent: FiberDecoratedAgent) { const router = new Hono(); router.get("/agents/:namespace/:instance/admin/db", async (c) => { @@ -238,35 +246,34 @@ interface FiberProperties { activeStreams: Set; } -type FiberDecoratedAgent = Agent & FiberProperties; +type FiberDecoratedAgent = IAgent & FiberProperties; /** * Class decorator factory that adds Observed capabilities to Agent classes - * - * Usage: - * ```typescript - * - * @Observed() - * export class MyAgent extends Agent { - * // Your agent implementation - * } - * ``` */ export function Observed() { return (BaseClass: AgentConstructor) => { - return class ObservedClass extends BaseClass { + // Mark the class as observed with reflect-metadata + Reflect.defineMetadata(METADATA_OBSERVED, true, BaseClass); + + // Create the new class that extends the base class + const ObservedClass = class extends BaseClass { // Store the class name of the super class #superClassName: string; // Store whether we've registered the instance already #instanceRegistered = false; - // biome-ignore lint/complexity/noUselessConstructor: Required for TypeScript mixins - // biome-ignore lint/suspicious/noExplicitAny: Required for TypeScript mixins constructor(readonly ctx: DurableObjectState, readonly env: E) { super(ctx, env); this.#superClassName = Object.getPrototypeOf(this.constructor).name; registerAgent(this.#superClassName); + + // Store metadata about this instance + Reflect.defineMetadata(METADATA_OBSERVED, { + className: this.#superClassName, + instanceName: this.name + }, this); } fiberRouter?: Hono; @@ -413,7 +420,7 @@ export function Observed() { onRequest(request: Request): Response | Promise { if (!this.fiberRouter) { - this.fiberRouter = createAgentAdminRouter(this); + this.fiberRouter = createAgentAdminRouter(this); } this.fiberRouter.notFound(() => { @@ -470,6 +477,16 @@ export function Observed() { return this.fiberRouter.fetch(request); } }; + + // Copy all static properties from BaseClass to ObservedClass + Object.getOwnPropertyNames(BaseClass).forEach(prop => { + if (prop !== 'prototype' && prop !== 'name' && prop !== 'length') { + // @ts-ignore - We're copying all static properties + ObservedClass[prop] = BaseClass[prop]; + } + }); + + return ObservedClass; }; }