Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): Adds typing for model streamEvents events, makes v2 the default #6927

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions langchain-core/src/language_models/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@ import {
StringPromptValue,
ChatPromptValue,
} from "../prompt_values.js";
import {
type BaseMessage,
type BaseMessageLike,
type MessageContent,
} from "../messages/base.js";
import { type BaseMessage, type MessageContent } from "../messages/base.js";
import type { BaseLanguageModelInput } from "./types.js";
import { coerceMessageLikeToMessage } from "../messages/utils.js";
import { type LLMResult } from "../outputs.js";
import { CallbackManager, Callbacks } from "../callbacks/manager.js";
Expand Down Expand Up @@ -249,10 +246,7 @@ export interface BaseFunctionCallOptions extends BaseLanguageModelCallOptions {
functions?: FunctionDefinition[];
}

export type BaseLanguageModelInput =
| BasePromptValueInterface
| string
| BaseMessageLike[];
export type { BaseLanguageModelInput };

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type StructuredOutputType = z.infer<z.ZodObject<any, any, any, any>>;
Expand Down
7 changes: 7 additions & 0 deletions langchain-core/src/language_models/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { BaseMessageLike } from "../messages/base.js";
import { type BasePromptValueInterface } from "../prompt_values.js";

export type BaseLanguageModelInput =
| BasePromptValueInterface
| string
| BaseMessageLike[];
94 changes: 67 additions & 27 deletions langchain-core/src/runnables/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
EventStreamCallbackHandlerInput,
StreamEvent,
StreamEventData,
StreamEventV2,
isStreamEventsHandler,
} from "../tracers/event_stream.js";
import { Serializable } from "../load/serializable.js";
Expand Down Expand Up @@ -751,7 +752,8 @@ export abstract class Runnable<
* chains. Metadata fields have been omitted from the table for brevity.
* Chain definitions have been included after the table.
*
* **ATTENTION** This reference table is for the V2 version of the schema.
* **ATTENTION** This reference table is for the V2 version of the schema (the current default if no
* "version" option is passed).
*
* ```md
* +----------------------+-----------------------------+------------------------------------------+
Expand Down Expand Up @@ -826,9 +828,7 @@ export abstract class Runnable<
* return "Done";
* });
*
* const eventStream = await slowThing.streamEvents("hello world", {
* version: "v2",
* });
* const eventStream = await slowThing.streamEvents("hello world");
*
* for await (const event of eventStream) {
* if (event.event === "on_custom_event") {
Expand All @@ -839,38 +839,58 @@ export abstract class Runnable<
*/
streamEvents(
input: RunInput,
options: Partial<CallOptions> & { version: "v1" | "v2" },
options: Partial<CallOptions> & {
version?: "v1" | "v2";
encoding: "text/event-stream";
},
streamOptions?: Omit<EventStreamCallbackHandlerInput, "autoClose">
): IterableReadableStream<Uint8Array>;

streamEvents(
input: RunInput,
options: Partial<CallOptions> & {
version?: "v2";
},
streamOptions?: Omit<EventStreamCallbackHandlerInput, "autoClose">
): IterableReadableStream<StreamEventV2>;

streamEvents(
input: RunInput,
options: Partial<CallOptions> & {
version: "v1";
},
streamOptions?: Omit<EventStreamCallbackHandlerInput, "autoClose">
): IterableReadableStream<StreamEvent>;

streamEvents(
input: RunInput,
options: Partial<CallOptions> & {
version: "v1" | "v2";
encoding: "text/event-stream";
version?: "v1" | "v2";
encoding?: "text/event-stream";
},
streamOptions?: Omit<EventStreamCallbackHandlerInput, "autoClose">
): IterableReadableStream<Uint8Array>;
): IterableReadableStream<StreamEventV2 | StreamEvent | Uint8Array>;

streamEvents(
input: RunInput,
options: Partial<CallOptions> & {
version: "v1" | "v2";
encoding?: "text/event-stream" | undefined;
version?: "v1" | "v2";
encoding?: "text/event-stream";
},
streamOptions?: Omit<EventStreamCallbackHandlerInput, "autoClose">
): IterableReadableStream<StreamEvent | Uint8Array> {
): IterableReadableStream<StreamEventV2 | StreamEvent | Uint8Array> {
let stream;
if (options.version === "v1") {
const { version, encoding } = options;
if (version === "v1") {
stream = this._streamEventsV1(input, options, streamOptions);
} else if (options.version === "v2") {
} else if (version === "v2" || version === undefined) {
stream = this._streamEventsV2(input, options, streamOptions);
} else {
throw new Error(
`Only versions "v1" and "v2" of the schema are currently supported.`
);
}
if (options.encoding === "text/event-stream") {
if (encoding === "text/event-stream") {
return convertToHttpEventStream(stream);
} else {
return IterableReadableStream.fromAsyncGenerator(stream);
Expand All @@ -879,9 +899,9 @@ export abstract class Runnable<

private async *_streamEventsV2(
input: RunInput,
options: Partial<CallOptions> & { version: "v1" | "v2" },
options: Partial<CallOptions>,
streamOptions?: Omit<EventStreamCallbackHandlerInput, "autoClose">
): AsyncGenerator<StreamEvent> {
): AsyncGenerator<StreamEventV2> {
const eventStreamer = new EventStreamCallbackHandler({
...streamOptions,
autoClose: false,
Expand Down Expand Up @@ -951,7 +971,7 @@ export abstract class Runnable<

private async *_streamEventsV1(
input: RunInput,
options: Partial<CallOptions> & { version: "v1" | "v2" },
options: Partial<CallOptions>,
streamOptions?: Omit<LogStreamCallbackHandlerInput, "autoClose">
): AsyncGenerator<StreamEvent> {
let runLog;
Expand Down Expand Up @@ -1332,29 +1352,49 @@ export class RunnableBinding<

streamEvents(
input: RunInput,
options: Partial<CallOptions> & { version: "v1" | "v2" },
streamOptions?: Omit<LogStreamCallbackHandlerInput, "autoClose">
options: Partial<CallOptions> & {
version?: "v1" | "v2";
encoding: "text/event-stream";
},
streamOptions?: Omit<EventStreamCallbackHandlerInput, "autoClose">
): IterableReadableStream<Uint8Array>;

streamEvents(
input: RunInput,
options: Partial<CallOptions> & {
version?: "v2";
},
streamOptions?: Omit<EventStreamCallbackHandlerInput, "autoClose">
): IterableReadableStream<StreamEventV2>;

streamEvents(
input: RunInput,
options: Partial<CallOptions> & {
version: "v1";
},
streamOptions?: Omit<EventStreamCallbackHandlerInput, "autoClose">
): IterableReadableStream<StreamEvent>;

streamEvents(
input: RunInput,
options: Partial<CallOptions> & {
version: "v1" | "v2";
encoding: "text/event-stream";
version?: "v1" | "v2";
encoding?: "text/event-stream";
},
streamOptions?: Omit<LogStreamCallbackHandlerInput, "autoClose">
): IterableReadableStream<Uint8Array>;
streamOptions?: Omit<EventStreamCallbackHandlerInput, "autoClose">
): IterableReadableStream<StreamEventV2 | StreamEvent | Uint8Array>;

streamEvents(
input: RunInput,
options: Partial<CallOptions> & {
version: "v1" | "v2";
encoding?: "text/event-stream" | undefined;
version?: "v1" | "v2";
encoding?: "text/event-stream";
},
streamOptions?: Omit<LogStreamCallbackHandlerInput, "autoClose">
): IterableReadableStream<StreamEvent | Uint8Array> {
streamOptions?: Omit<EventStreamCallbackHandlerInput, "autoClose">
): IterableReadableStream<StreamEventV2 | StreamEvent | Uint8Array> {
// eslint-disable-next-line @typescript-eslint/no-this-alias
const outerThis = this;

const generator = async function* () {
yield* outerThis.bound.streamEvents(
input,
Expand Down
50 changes: 38 additions & 12 deletions langchain-core/src/runnables/remote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import {
import { GenerationChunk, ChatGenerationChunk, RUN_KEY } from "../outputs.js";
import { convertEventStreamToIterableReadableDataStream } from "../utils/event_source_parse.js";
import { IterableReadableStream, concat } from "../utils/stream.js";
import {
EventStreamCallbackHandlerInput,
StreamEventV2,
} from "../tracers/event_stream.js";

type RemoteRunnableOptions = {
timeout?: number;
Expand Down Expand Up @@ -597,11 +601,14 @@ export class RemoteRunnable<

_streamEvents(
input: RunInput,
options: Partial<CallOptions> & { version: "v1" | "v2" },
streamOptions?: Omit<LogStreamCallbackHandlerInput, "autoClose"> | undefined
config: Partial<CallOptions> & { version?: "v1" | "v2" },
streamOptions?:
| Omit<EventStreamCallbackHandlerInput, "autoClose">
| undefined
): AsyncGenerator<StreamEvent> {
// eslint-disable-next-line @typescript-eslint/no-this-alias
const outerThis = this;
const options = { version: "v2", ...config };
const generator = async function* () {
const [config, kwargs] =
outerThis._separateRunnableConfigFromCallOptions(options);
Expand Down Expand Up @@ -678,27 +685,46 @@ export class RemoteRunnable<

streamEvents(
input: RunInput,
options: Partial<CallOptions> & { version: "v1" | "v2" },
streamOptions?: Omit<LogStreamCallbackHandlerInput, "autoClose">
options: Partial<CallOptions> & {
version?: "v1" | "v2";
encoding: "text/event-stream";
},
streamOptions?: Omit<EventStreamCallbackHandlerInput, "autoClose">
): IterableReadableStream<Uint8Array>;

streamEvents(
input: RunInput,
options: Partial<CallOptions> & {
version?: "v2";
},
streamOptions?: Omit<EventStreamCallbackHandlerInput, "autoClose">
): IterableReadableStream<StreamEventV2>;

streamEvents(
input: RunInput,
options: Partial<CallOptions> & {
version: "v1";
},
streamOptions?: Omit<EventStreamCallbackHandlerInput, "autoClose">
): IterableReadableStream<StreamEvent>;

streamEvents(
input: RunInput,
options: Partial<CallOptions> & {
version: "v1" | "v2";
encoding: "text/event-stream";
version?: "v1" | "v2";
encoding?: "text/event-stream";
},
streamOptions?: Omit<LogStreamCallbackHandlerInput, "autoClose">
): IterableReadableStream<Uint8Array>;
streamOptions?: Omit<EventStreamCallbackHandlerInput, "autoClose">
): IterableReadableStream<StreamEventV2 | StreamEvent | Uint8Array>;

streamEvents(
input: RunInput,
options: Partial<CallOptions> & {
version: "v1" | "v2";
encoding?: "text/event-stream" | undefined;
version?: "v1" | "v2";
encoding?: "text/event-stream";
},
streamOptions?: Omit<LogStreamCallbackHandlerInput, "autoClose">
): IterableReadableStream<StreamEvent | Uint8Array> {
streamOptions?: Omit<EventStreamCallbackHandlerInput, "autoClose">
): IterableReadableStream<StreamEventV2 | StreamEvent | Uint8Array> {
if (options.version !== "v1" && options.version !== "v2") {
throw new Error(
`Only versions "v1" and "v2" of the events schema is currently supported.`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1866,6 +1866,88 @@ test("LLM that doesn't support streaming, but is invoked, should emit one on_str
]);
});

test.skip("Test typing", async () => {
const tool = new DynamicTool({
func: async () => "hello",
name: "parameterless",
description: "A tool that does nothing",
});
const events = [];
const eventStream = await tool.streamEvents({}, { version: "v2" });
for await (const { event, data } of eventStream) {
events.push(event);
// These shouldn't run, we're just testing type checking
if (event === "on_llm_start") {
// @ts-expect-error Assert typing is not any
void data.input.foo;
}
if (event === "on_llm_stream") {
// @ts-expect-error Assert typing is not any
void data.chunk.foo;
}
if (event === "on_llm_end") {
// @ts-expect-error Assert typing is not any
void data.output.foo;
}
if (event === "on_chat_model_start") {
// @ts-expect-error Assert typing is not any
void data.input.foo;
}
if (event === "on_chat_model_stream") {
// @ts-expect-error Assert typing is not any
void data.chunk.foo;
}
if (event === "on_chat_model_end") {
// @ts-expect-error Assert typing is not any
void data.output.foo;
}
if (event === "on_chain_start") {
// Typing is any
void data.input.foo;
}
if (event === "on_chain_stream") {
// Typing is any
void data.chunk.foo;
}
if (event === "on_chain_end") {
// Typing is any
void data.output.foo;
}
if (event === "on_prompt_start") {
// Typing is any
void data.input.foo;
}
if (event === "on_prompt_end") {
// Typing is any
void data.output.foo;
}
if (event === "on_retriever_start") {
// Typing is any
void data.input.foo;
}
if (event === "on_retriever_end") {
// Typing is any
void data.output.foo;
}
if (event === "on_tool_start") {
// Typing is any
void data.input.foo;
}
if (event === "on_tool_end") {
// Typing is any
void data.output.foo;
}
if (event === "on_custom_event") {
// Typing is any
void data.output.foo;
}
// @ts-expect-error Invalid event name
if (event === "something else") {
void 0;
}
}
});

test("Runnable streamEvents method with simple tools", async () => {
const tool = new DynamicTool({
func: async () => "hello",
Expand Down
Loading
Loading