diff --git a/.changeset/angry-llamas-move.md b/.changeset/angry-llamas-move.md new file mode 100644 index 00000000000..af688d49a2d --- /dev/null +++ b/.changeset/angry-llamas-move.md @@ -0,0 +1,52 @@ +--- +"@effect/ai": patch +--- + +Support non-identified schemas in `AiChat.structured` and `Completions.structured`. + +Instead of requiring a `Schema` with either an `identifier` or `_tag` property +for AI APIs that allow for returning structured outputs, you can now optionally +pass a `correlationId` to `AiChat.structured` and `Completions.structured` when +you want to either use a simple schema or inline the schema. + +Example: + +```ts +import { Completions } from "@effect/ai" +import { OpenAiClient, OpenAiCompletions } from "@effect/ai-openai" +import { NodeHttpClient } from "@effect/platform-node" +import { Config, Effect, Layer, Schema, String } from "effect" + +const OpenAi = OpenAiClient.layerConfig({ + apiKey: Config.redacted("OPENAI_API_KEY") +}).pipe(Layer.provide(NodeHttpClient.layerUndici)) + +const Gpt4oCompletions = OpenAiCompletions.layer({ + model: "gpt-4o" +}).pipe(Layer.provide(OpenAi)) + +const program = Effect.gen(function*() { + const completions = yield* Completions.Completions + + const CalendarEvent = Schema.Struct({ + name: Schema.String, + date: Schema.DateFromString, + participants: Schema.Array(Schema.String) + }) + + yield* completions.structured({ + correlationId: "CalendarEvent", + schema: CalendarEvent, + input: String.stripMargin(` + |Extract event information from the following prose: + | + |Alice and Bob are going to a science fair on Friday. + `) + }) +}) + +program.pipe( + Effect.provide(Gpt4oCompletions), + Effect.runPromise +) +``` diff --git a/packages/ai/ai/src/AiChat.ts b/packages/ai/ai/src/AiChat.ts index da90dfaabbc..a0b84f3179d 100644 --- a/packages/ai/ai/src/AiChat.ts +++ b/packages/ai/ai/src/AiChat.ts @@ -39,10 +39,17 @@ export declare namespace AiChat { readonly exportJson: Effect.Effect readonly send: (input: AiInput.Input) => Effect.Effect readonly stream: (input: AiInput.Input) => Stream.Stream - readonly structured: ( - tool: Completions.StructuredSchema, - input: AiInput.Input - ) => Effect.Effect + readonly structured: { + (options: { + readonly input: AiInput.Input + readonly schema: Completions.StructuredSchema + }): Effect.Effect + (options: { + readonly input: AiInput.Input + readonly schema: Schema.Schema + readonly correlationId: string + }): Effect.Effect + } readonly toolkit: ( options: { readonly input: AiInput.Input @@ -74,180 +81,164 @@ export declare namespace AiChat { * @since 1.0.0 * @category constructors */ -export const fromInput = (input: AiInput.Input): Effect.Effect => - Ref.make(AiInput.make(input)).pipe( - Effect.bindTo("historyRef"), - Effect.bind("completions", () => Completions), - Effect.map(({ completions, historyRef }) => new AiChatImpl(historyRef, completions)) - ) - -class AiChatImpl implements AiChat.Service { - readonly semaphore = Effect.unsafeMakeSemaphore(1) - - constructor( - readonly historyRef: Ref.Ref, - readonly completions: Completions.Service - ) {} - - get history() { - return Ref.get(this.historyRef) - } +export const fromInput = Effect.fnUntraced( + function*(input: AiInput.Input) { + const completions = yield* Completions + const history = yield* Ref.make(AiInput.make(input)) + const semaphore = yield* Effect.makeSemaphore(1) - get export() { - return Ref.get(this.historyRef).pipe( - Effect.flatMap(Schema.encode(AiInput.Schema)), - Effect.orDie - ) - } - - get exportJson() { - return Ref.get(this.historyRef).pipe( - Effect.flatMap(Schema.encode(AiInput.SchemaJson)), - Effect.orDie - ) - } - - send(input: AiInput.Input) { - const newParts = AiInput.make(input) - return Ref.get(this.historyRef).pipe( - Effect.flatMap((parts) => { - const allParts = Chunk.appendAll(parts, newParts) - return this.completions.create(allParts).pipe( - Effect.tap((response) => { - const responseParts = AiInput.make(response) - return Ref.set(this.historyRef, Chunk.appendAll(allParts, responseParts)) + return AiChat.of({ + history: Ref.get(history), + export: Ref.get(history).pipe( + Effect.flatMap(Schema.encode(AiInput.Schema)), + Effect.orDie + ), + exportJson: Ref.get(history).pipe( + Effect.flatMap(Schema.encode(AiInput.SchemaJson)), + Effect.orDie + ), + send(input) { + const newParts = AiInput.make(input) + return Ref.get(history).pipe( + Effect.flatMap((parts) => { + const allParts = Chunk.appendAll(parts, newParts) + return completions.create(allParts).pipe( + Effect.tap((response) => { + const responseParts = AiInput.make(response) + return Ref.set(history, Chunk.appendAll(allParts, responseParts)) + }) + ) + }), + semaphore.withPermits(1), + Effect.withSpan("AiChat.send", { + attributes: { input }, + captureStackTrace: false }) ) - }), - this.semaphore.withPermits(1), - Effect.withSpan("AiChat.send", { attributes: { input }, captureStackTrace: false }) - ) - } - - stream(input: AiInput.Input) { - return Stream.suspend(() => { - let combined = AiResponse.empty - return Stream.fromChannel(Channel.acquireUseRelease( - this.semaphore.take(1).pipe( - Effect.zipRight(Ref.get(this.historyRef)), - Effect.map(Chunk.appendAll(AiInput.make(input))) - ), - (parts) => - this.completions.stream(parts).pipe( - Stream.map((chunk) => { - combined = combined.concat(chunk) - return chunk - }), - Stream.toChannel - ), - (parts) => - Effect.zipRight( - Ref.set(this.historyRef, Chunk.appendAll(parts, AiInput.make(combined))), - this.semaphore.release(1) - ) - )) - }).pipe(Stream.withSpan("AiChat.stream", { attributes: { input }, captureStackTrace: false })) - } - - structured( - schema: Completions.StructuredSchema, - input: AiInput.Input - ): Effect.Effect { - const newParts = AiInput.make(input) - return Ref.get(this.historyRef).pipe( - Effect.flatMap((parts) => { - const allParts = Chunk.appendAll(parts, newParts) - return this.completions.structured({ - input: allParts, - schema - }).pipe( - Effect.flatMap((response) => { - const responseParts = AiInput.make(response) - return Effect.as( - Ref.set(this.historyRef, Chunk.appendAll(allParts, responseParts)), - response.unsafeValue + }, + stream(input) { + return Stream.suspend(() => { + let combined = AiResponse.empty + return Stream.fromChannel(Channel.acquireUseRelease( + semaphore.take(1).pipe( + Effect.zipRight(Ref.get(history)), + Effect.map(Chunk.appendAll(AiInput.make(input))) + ), + (parts) => + completions.stream(parts).pipe( + Stream.map((chunk) => { + combined = combined.concat(chunk) + return chunk + }), + Stream.toChannel + ), + (parts) => + Effect.zipRight( + Ref.set(history, Chunk.appendAll(parts, AiInput.make(combined))), + semaphore.release(1) + ) + )) + }).pipe(Stream.withSpan("AiChat.stream", { + attributes: { input }, + captureStackTrace: false + })) + }, + structured(options) { + const newParts = AiInput.make(options.input) + return Ref.get(history).pipe( + Effect.flatMap((parts) => { + const allParts = Chunk.appendAll(parts, newParts) + return completions.structured({ + ...options, + input: allParts + } as any).pipe( + Effect.flatMap((response) => { + const responseParts = AiInput.make(response) + return Effect.as( + Ref.set(history, Chunk.appendAll(allParts, responseParts)), + response.unsafeValue + ) + }) ) + }), + semaphore.withPermits(1), + Effect.withSpan("AiChat.structured", { + attributes: { + input: options.input, + schema: "correlationId" in options + ? options.correlationId + : "_tag" in options.schema + ? options.schema._tag + : options.schema.identifier + }, + captureStackTrace: false }) ) - }), - this.semaphore.withPermits(1), - Effect.withSpan("AiChat.structured", { - attributes: { input, schema: schema._tag ?? schema.identifier }, - captureStackTrace: false - }) - ) - } - - toolkit( - options: { - readonly input: AiInput.Input - readonly tools: AiToolkit.Handlers - readonly required?: Tools["_tag"] | boolean | undefined - readonly concurrency?: Concurrency | undefined - } - ): Effect.Effect< - WithResolved>, - AiError | AiToolkit.Tool.Failure, - AiToolkit.Tool.Context - > { - const newParts = AiInput.make(options.input) - return Ref.get(this.historyRef).pipe( - Effect.flatMap((parts) => { - const allParts = Chunk.appendAll(parts, newParts) - return this.completions.toolkit({ - ...options, - input: allParts - }).pipe( - Effect.tap((response) => { - const responseParts = AiInput.make(response) - return Ref.set(this.historyRef, Chunk.appendAll(allParts, responseParts)) + }, + toolkit(options) { + const newParts = AiInput.make(options.input) + return Ref.get(history).pipe( + Effect.flatMap((parts) => { + const allParts = Chunk.appendAll(parts, newParts) + return completions.toolkit({ + ...options, + input: allParts + }).pipe( + Effect.tap((response) => { + const responseParts = AiInput.make(response) + return Ref.set(history, Chunk.appendAll(allParts, responseParts)) + }) + ) + }), + semaphore.withPermits(1), + Effect.withSpan("AiChat.toolkit", { + attributes: { input: options.input }, + captureStackTrace: false }) ) - }), - this.semaphore.withPermits(1), - Effect.withSpan("AiChat.toolkit", { attributes: { input: options.input }, captureStackTrace: false }) - ) - } - - toolkitStream( - options: { - readonly input: AiInput.Input - readonly tools: AiToolkit.Handlers - readonly required?: Tools["_tag"] | boolean | undefined - readonly concurrency?: Concurrency | undefined - } - ): Stream.Stream< - WithResolved>, - AiError | AiToolkit.Tool.Failure, - AiToolkit.Tool.Context - > { - return Stream.suspend(() => { - let combined = WithResolved.empty as WithResolved> - return Stream.fromChannel(Channel.acquireUseRelease( - this.semaphore.take(1).pipe( - Effect.zipRight(Ref.get(this.historyRef)), - Effect.map(Chunk.appendAll(AiInput.make(options.input))) - ), - (parts) => - this.completions.toolkitStream({ - ...options, - input: parts - }).pipe( - Stream.map((chunk) => { - combined = combined.concat(chunk) - return chunk - }), - Stream.toChannel - ), - (parts) => - Effect.zipRight( - Ref.set(this.historyRef, Chunk.appendAll(parts, AiInput.make(combined))), - this.semaphore.release(1) - ) - )) - }).pipe(Stream.withSpan("AiChat.toolkitStream", { attributes: { input: options.input }, captureStackTrace: false })) + }, + toolkitStream(options: { + readonly input: AiInput.Input + readonly tools: AiToolkit.Handlers + readonly required?: Tools["_tag"] | boolean | undefined + readonly concurrency?: Concurrency | undefined + }): Stream.Stream< + WithResolved>, + AiError | AiToolkit.Tool.Failure, + AiToolkit.Tool.Context + > { + return Stream.suspend(() => { + let combined = WithResolved.empty as WithResolved> + return Stream.fromChannel(Channel.acquireUseRelease( + semaphore.take(1).pipe( + Effect.zipRight(Ref.get(history)), + Effect.map(Chunk.appendAll(AiInput.make(options.input))) + ), + (parts) => + completions.toolkitStream({ + ...options, + input: parts + }).pipe( + Stream.map((chunk) => { + combined = combined.concat(chunk) + return chunk + }), + Stream.toChannel + ), + (parts) => + Effect.zipRight( + Ref.set(history, Chunk.appendAll(parts, AiInput.make(combined))), + semaphore.release(1) + ) + )) + }).pipe(Stream.withSpan("AiChat.toolkitStream", { + attributes: { input: options.input }, + captureStackTrace: false + })) + } + }) } -} +) /** * @since 1.0.0 diff --git a/packages/ai/ai/src/Completions.ts b/packages/ai/ai/src/Completions.ts index eb791c8b2c6..b3a8fb4e8ae 100644 --- a/packages/ai/ai/src/Completions.ts +++ b/packages/ai/ai/src/Completions.ts @@ -36,8 +36,21 @@ export declare namespace Completions { * @since 1.0.0 * @models */ - export interface StructuredSchema extends Schema.Schema { - readonly _tag?: string + export type StructuredSchema = TaggedSchema | IdentifiedSchema + + /** + * @since 1.0.0 + * @category models + */ + export interface TaggedSchema extends Schema.Schema { + readonly _tag: string + } + + /** + * @since 1.0.0 + * @category models + */ + export interface IdentifiedSchema extends Schema.Schema { readonly identifier: string } @@ -48,12 +61,17 @@ export declare namespace Completions { export interface Service { readonly create: (input: AiInput.Input) => Effect.Effect readonly stream: (input: AiInput.Input) => Stream.Stream - readonly structured: ( - options: { + readonly structured: { + (options: { readonly input: AiInput.Input readonly schema: StructuredSchema - } - ) => Effect.Effect, AiError, R> + }): Effect.Effect, AiError, R> + (options: { + readonly input: AiInput.Input + readonly schema: Schema.Schema + readonly correlationId: string + }): Effect.Effect, AiError, R> + } readonly toolkit: ( options: { readonly input: AiInput.Input @@ -159,15 +177,18 @@ export const make = (options: { }, structured(opts) { const input = AiInput.make(opts.input) - const schema = opts.schema - const decode = Schema.decodeUnknown(schema) - const toolId = schema._tag ?? schema.identifier + const decode = Schema.decodeUnknown(opts.schema) + const toolId = "correlationId" in opts + ? opts.correlationId + : "_tag" in opts.schema + ? opts.schema._tag + : opts.schema.identifier return Effect.serviceOption(AiInput.SystemInstruction).pipe( Effect.flatMap((system) => options.create({ input: input as Chunk.NonEmptyChunk, system: Option.orElse(system, () => parentSystem), - tools: [convertTool(schema, true)], + tools: [convertTool(toolId, opts.schema, true)], required: true }) ), @@ -221,7 +242,7 @@ export const make = (options: { structured: boolean }> = [] for (const [, tool] of tools.toolkit.tools) { - toolArr.push(convertTool(tool as any)) + toolArr.push(convertTool(tool._tag, tool as any)) } return Effect.serviceOption(AiInput.SystemInstruction).pipe( Effect.flatMap((system) => @@ -250,7 +271,7 @@ export const make = (options: { structured: boolean }> = [] for (const [, tool] of tools.toolkit.tools) { - toolArr.push(convertTool(tool as any)) + toolArr.push(convertTool(tool._tag, tool as any)) } return Effect.serviceOption(AiInput.SystemInstruction).pipe( Effect.map((system) => @@ -278,10 +299,14 @@ export const make = (options: { }) }) -const convertTool = (tool: Completions.StructuredSchema, structured = false) => ({ - name: tool._tag ?? tool.identifier, - description: getDescription(tool.ast), - parameters: makeJsonSchema(tool.ast), +const convertTool = ( + name: string, + schema: Schema.Schema, + structured = false +) => ({ + name, + description: getDescription(schema.ast), + parameters: makeJsonSchema(schema.ast), structured })