Skip to content

Commit

Permalink
improve implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
IMax153 committed Jan 31, 2025
1 parent 5507d53 commit 78f0a4b
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 178 deletions.
329 changes: 160 additions & 169 deletions packages/ai/ai/src/AiChat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,17 @@ export declare namespace AiChat {
readonly exportJson: Effect.Effect<string>
readonly send: (input: AiInput.Input) => Effect.Effect<AiResponse, AiError>
readonly stream: (input: AiInput.Input) => Stream.Stream<AiResponse, AiError>
readonly structured: <A, I, R>(
tool: Completions.StructuredSchema<A, I, R>,
input: AiInput.Input
) => Effect.Effect<A, AiError, R>
readonly structured: {
<A, I, R>(options: {
readonly input: AiInput.Input
readonly schema: Completions.StructuredSchema<A, I, R>
}): Effect.Effect<A, AiError, R>
<A, I, R>(options: {
readonly input: AiInput.Input
readonly schema: Schema.Schema<A, I, R>
readonly correlationId: string
}): Effect.Effect<A, AiError, R>
}
readonly toolkit: <Tools extends AiToolkit.Tool.AnySchema>(
options: {
readonly input: AiInput.Input
Expand Down Expand Up @@ -74,180 +81,164 @@ export declare namespace AiChat {
* @since 1.0.0
* @category constructors
*/
export const fromInput = (input: AiInput.Input): Effect.Effect<AiChat.Service, never, Completions> =>
Ref.make(AiInput.make(input)).pipe(
Effect.bindTo("historyRef"),
Effect.bind("completions", () => Completions),
Effect.map(({ completions, historyRef }) => new AiChatImpl(historyRef, completions))
)

class AiChatImpl implements AiChat.Service {
readonly semaphore = Effect.unsafeMakeSemaphore(1)

constructor(
readonly historyRef: Ref.Ref<AiInput.AiInput>,
readonly completions: Completions.Service
) {}

get history() {
return Ref.get(this.historyRef)
}
export const fromInput = Effect.fnUntraced(
function*(input: AiInput.Input) {
const completions = yield* Completions
const history = yield* Ref.make(AiInput.make(input))
const semaphore = yield* Effect.makeSemaphore(1)

get export() {
return Ref.get(this.historyRef).pipe(
Effect.flatMap(Schema.encode(AiInput.Schema)),
Effect.orDie
)
}

get exportJson() {
return Ref.get(this.historyRef).pipe(
Effect.flatMap(Schema.encode(AiInput.SchemaJson)),
Effect.orDie
)
}

send(input: AiInput.Input) {
const newParts = AiInput.make(input)
return Ref.get(this.historyRef).pipe(
Effect.flatMap((parts) => {
const allParts = Chunk.appendAll(parts, newParts)
return this.completions.create(allParts).pipe(
Effect.tap((response) => {
const responseParts = AiInput.make(response)
return Ref.set(this.historyRef, Chunk.appendAll(allParts, responseParts))
return AiChat.of({
history: Ref.get(history),
export: Ref.get(history).pipe(
Effect.flatMap(Schema.encode(AiInput.Schema)),
Effect.orDie
),
exportJson: Ref.get(history).pipe(
Effect.flatMap(Schema.encode(AiInput.SchemaJson)),
Effect.orDie
),
send(input) {
const newParts = AiInput.make(input)
return Ref.get(history).pipe(
Effect.flatMap((parts) => {
const allParts = Chunk.appendAll(parts, newParts)
return completions.create(allParts).pipe(
Effect.tap((response) => {
const responseParts = AiInput.make(response)
return Ref.set(history, Chunk.appendAll(allParts, responseParts))
})
)
}),
semaphore.withPermits(1),
Effect.withSpan("AiChat.send", {
attributes: { input },
captureStackTrace: false
})
)
}),
this.semaphore.withPermits(1),
Effect.withSpan("AiChat.send", { attributes: { input }, captureStackTrace: false })
)
}

stream(input: AiInput.Input) {
return Stream.suspend(() => {
let combined = AiResponse.empty
return Stream.fromChannel(Channel.acquireUseRelease(
this.semaphore.take(1).pipe(
Effect.zipRight(Ref.get(this.historyRef)),
Effect.map(Chunk.appendAll(AiInput.make(input)))
),
(parts) =>
this.completions.stream(parts).pipe(
Stream.map((chunk) => {
combined = combined.concat(chunk)
return chunk
}),
Stream.toChannel
),
(parts) =>
Effect.zipRight(
Ref.set(this.historyRef, Chunk.appendAll(parts, AiInput.make(combined))),
this.semaphore.release(1)
)
))
}).pipe(Stream.withSpan("AiChat.stream", { attributes: { input }, captureStackTrace: false }))
}

structured<A, I, R>(
schema: Completions.StructuredSchema<A, I, R>,
input: AiInput.Input
): Effect.Effect<A, AiError, R> {
const newParts = AiInput.make(input)
return Ref.get(this.historyRef).pipe(
Effect.flatMap((parts) => {
const allParts = Chunk.appendAll(parts, newParts)
return this.completions.structured({
input: allParts,
schema
}).pipe(
Effect.flatMap((response) => {
const responseParts = AiInput.make(response)
return Effect.as(
Ref.set(this.historyRef, Chunk.appendAll(allParts, responseParts)),
response.unsafeValue
},
stream(input) {
return Stream.suspend(() => {
let combined = AiResponse.empty
return Stream.fromChannel(Channel.acquireUseRelease(
semaphore.take(1).pipe(
Effect.zipRight(Ref.get(history)),
Effect.map(Chunk.appendAll(AiInput.make(input)))
),
(parts) =>
completions.stream(parts).pipe(
Stream.map((chunk) => {
combined = combined.concat(chunk)
return chunk
}),
Stream.toChannel
),
(parts) =>
Effect.zipRight(
Ref.set(history, Chunk.appendAll(parts, AiInput.make(combined))),
semaphore.release(1)
)
))
}).pipe(Stream.withSpan("AiChat.stream", {
attributes: { input },
captureStackTrace: false
}))
},
structured(options) {
const newParts = AiInput.make(options.input)
return Ref.get(history).pipe(
Effect.flatMap((parts) => {
const allParts = Chunk.appendAll(parts, newParts)
return completions.structured({
...options,
input: allParts
} as any).pipe(
Effect.flatMap((response) => {
const responseParts = AiInput.make(response)
return Effect.as(
Ref.set(history, Chunk.appendAll(allParts, responseParts)),
response.unsafeValue
)
})
)
}),
semaphore.withPermits(1),
Effect.withSpan("AiChat.structured", {
attributes: {
input: options.input,
schema: "correlationId" in options
? options.correlationId
: "_tag" in options.schema
? options.schema._tag
: options.schema.identifier
},
captureStackTrace: false
})
)
}),
this.semaphore.withPermits(1),
Effect.withSpan("AiChat.structured", {
attributes: { input, schema: schema._tag ?? schema.identifier },
captureStackTrace: false
})
)
}

toolkit<Tools extends AiToolkit.Tool.AnySchema>(
options: {
readonly input: AiInput.Input
readonly tools: AiToolkit.Handlers<Tools>
readonly required?: Tools["_tag"] | boolean | undefined
readonly concurrency?: Concurrency | undefined
}
): Effect.Effect<
WithResolved<AiToolkit.Tool.Success<Tools>>,
AiError | AiToolkit.Tool.Failure<Tools>,
AiToolkit.Tool.Context<Tools>
> {
const newParts = AiInput.make(options.input)
return Ref.get(this.historyRef).pipe(
Effect.flatMap((parts) => {
const allParts = Chunk.appendAll(parts, newParts)
return this.completions.toolkit({
...options,
input: allParts
}).pipe(
Effect.tap((response) => {
const responseParts = AiInput.make(response)
return Ref.set(this.historyRef, Chunk.appendAll(allParts, responseParts))
},
toolkit(options) {
const newParts = AiInput.make(options.input)
return Ref.get(history).pipe(
Effect.flatMap((parts) => {
const allParts = Chunk.appendAll(parts, newParts)
return completions.toolkit({
...options,
input: allParts
}).pipe(
Effect.tap((response) => {
const responseParts = AiInput.make(response)
return Ref.set(history, Chunk.appendAll(allParts, responseParts))
})
)
}),
semaphore.withPermits(1),
Effect.withSpan("AiChat.toolkit", {
attributes: { input: options.input },
captureStackTrace: false
})
)
}),
this.semaphore.withPermits(1),
Effect.withSpan("AiChat.toolkit", { attributes: { input: options.input }, captureStackTrace: false })
)
}

toolkitStream<Tools extends AiToolkit.Tool.AnySchema>(
options: {
readonly input: AiInput.Input
readonly tools: AiToolkit.Handlers<Tools>
readonly required?: Tools["_tag"] | boolean | undefined
readonly concurrency?: Concurrency | undefined
}
): Stream.Stream<
WithResolved<AiToolkit.Tool.Success<Tools>>,
AiError | AiToolkit.Tool.Failure<Tools>,
AiToolkit.Tool.Context<Tools>
> {
return Stream.suspend(() => {
let combined = WithResolved.empty as WithResolved<AiToolkit.Tool.Success<Tools>>
return Stream.fromChannel(Channel.acquireUseRelease(
this.semaphore.take(1).pipe(
Effect.zipRight(Ref.get(this.historyRef)),
Effect.map(Chunk.appendAll(AiInput.make(options.input)))
),
(parts) =>
this.completions.toolkitStream({
...options,
input: parts
}).pipe(
Stream.map((chunk) => {
combined = combined.concat(chunk)
return chunk
}),
Stream.toChannel
),
(parts) =>
Effect.zipRight(
Ref.set(this.historyRef, Chunk.appendAll(parts, AiInput.make(combined))),
this.semaphore.release(1)
)
))
}).pipe(Stream.withSpan("AiChat.toolkitStream", { attributes: { input: options.input }, captureStackTrace: false }))
},
toolkitStream<Tools extends AiToolkit.Tool.AnySchema>(options: {
readonly input: AiInput.Input
readonly tools: AiToolkit.Handlers<Tools>
readonly required?: Tools["_tag"] | boolean | undefined
readonly concurrency?: Concurrency | undefined
}): Stream.Stream<
WithResolved<AiToolkit.Tool.Success<Tools>>,
AiError | AiToolkit.Tool.Failure<Tools>,
AiToolkit.Tool.Context<Tools>
> {
return Stream.suspend(() => {
let combined = WithResolved.empty as WithResolved<AiToolkit.Tool.Success<Tools>>
return Stream.fromChannel(Channel.acquireUseRelease(
semaphore.take(1).pipe(
Effect.zipRight(Ref.get(history)),
Effect.map(Chunk.appendAll(AiInput.make(options.input)))
),
(parts) =>
completions.toolkitStream({
...options,
input: parts
}).pipe(
Stream.map((chunk) => {
combined = combined.concat(chunk)
return chunk
}),
Stream.toChannel
),
(parts) =>
Effect.zipRight(
Ref.set(history, Chunk.appendAll(parts, AiInput.make(combined))),
semaphore.release(1)
)
))
}).pipe(Stream.withSpan("AiChat.toolkitStream", {
attributes: { input: options.input },
captureStackTrace: false
}))
}
})
}
}
)

/**
* @since 1.0.0
Expand Down
Loading

0 comments on commit 78f0a4b

Please sign in to comment.