Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(openai): support streamed responses for completions and chat completions #4308

Merged
merged 9 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 148 additions & 10 deletions packages/datadog-instrumentations/src/openai.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ const V4_PACKAGE_SHIMS = [
file: 'resources/chat/completions.js',
targetClass: 'Completions',
baseResource: 'chat.completions',
methods: ['create']
methods: ['create'],
streamedResponse: true
},
{
file: 'resources/completions.js',
targetClass: 'Completions',
baseResource: 'completions',
methods: ['create']
methods: ['create'],
streamedResponse: true
},
{
file: 'resources/embeddings.js',
Expand Down Expand Up @@ -141,9 +143,130 @@ addHook({ name: 'openai', file: 'dist/api.js', versions: ['>=3.0.0 <4'] }, expor
return exports
})

function addStreamedChunk (content, chunk) {
return content.choices.map((oldChoice, choiceIdx) => {
const newChoice = oldChoice
const chunkChoice = chunk.choices[choiceIdx]
if (!oldChoice.finish_reason) {
newChoice.finish_reason = chunkChoice.finish_reason
}

// delta exists on chat completions
const delta = chunkChoice.delta

if (delta) {
const content = delta.content
if (content) {
if (newChoice.delta.content) { // we don't want to append to undefined
newChoice.delta.content += content
} else {
newChoice.delta.content = content
}
}
} else {
const text = chunkChoice.text
if (text) {
if (newChoice.text) {
newChoice.text += text
} else {
newChoice.text = text
}
}
}

// tools only exist on chat completions
const tools = delta && chunkChoice.delta.tool_calls

if (tools) {
newChoice.delta.tool_calls = tools.map((newTool, toolIdx) => {
const oldTool = oldChoice.delta.tool_calls[toolIdx]

if (oldTool) {
oldTool.function.arguments += newTool.function.arguments
}

return oldTool
})
}

return newChoice
})
}

function buffersToJSON (chunks = []) {
return Buffer
.concat(chunks) // combine the buffers
.toString() // stringify
.split(/(?=data:)/) // split on "data:"
.map(chunk => chunk.split('\n').join('')) // remove newlines
.map(chunk => chunk.substring(6)) // remove 'data: ' from the front
.slice(0, -1) // remove the last [DONE] message
.map(JSON.parse) // parse all of the returned objects
}

/**
* For streamed responses, we need to accumulate all of the content in
* the chunks, and let the combined content be the final response.
* This way, spans look the same as when not streamed.
*/
function wrapStreamIterator (response, options) {
let processChunksAsBuffers = false
const chunks = []
return function (itr) {
return function () {
const iterator = itr.apply(this, arguments)
shimmer.wrap(iterator, 'next', next => function () {
return next.apply(this, arguments)
.then(res => {
const { done, value: chunk } = res

if (chunk) {
chunks.push(chunk)
if (chunk instanceof Buffer) {
// this operation should be safe
// if one chunk is a buffer (versus a plain object), the rest should be as well
processChunksAsBuffers = true
}
}

if (done) {
let content = chunks.filter(chunk => chunk != null) // filter null or undefined values

if (chunks) {
if (processChunksAsBuffers) {
content = buffersToJSON(content)
}

content = content.reduce((content, chunk) => {
content.choices = addStreamedChunk(content, chunk)
return content
})
}

finishCh.publish({
headers: response.headers,
body: content,
path: response.url,
method: options.method
})
}

return res
})
.catch(err => {
errorCh.publish({ err })

throw err
})
})
return iterator
}
}
}

for (const shim of V4_PACKAGE_SHIMS) {
const { file, targetClass, baseResource, methods } = shim
addHook({ name: 'openai', file, versions: shim.versions || ['>=4'] }, exports => {
const { file, targetClass, baseResource, methods, versions, streamedResponse } = shim
addHook({ name: 'openai', file, versions: versions || ['>=4'] }, exports => {
const targetPrototype = exports[targetClass].prototype

for (const methodName of methods) {
Expand All @@ -152,6 +275,11 @@ for (const shim of V4_PACKAGE_SHIMS) {
return methodFn.apply(this, arguments)
}

// The OpenAI library lets you set `stream: true` on the options arg to any method
// However, we only want to handle streamed responses in specific cases
// chat.completions and completions
const stream = streamedResponse && arguments[arguments.length - 1]?.stream

const client = this._client || this.client

startCh.publish({
Expand All @@ -170,12 +298,22 @@ for (const shim of V4_PACKAGE_SHIMS) {
// the original response is wrapped in a promise, so we need to unwrap it
.then(body => Promise.all([this.responsePromise, body]))
.then(([{ response, options }, body]) => {
finishCh.publish({
headers: response.headers,
body,
path: response.url,
method: options.method
})
if (stream) {
if (body.iterator) {
shimmer.wrap(body, 'iterator', wrapStreamIterator(response, options))
} else {
shimmer.wrap(
body.response.body, Symbol.asyncIterator, wrapStreamIterator(response, options)
)
}
} else {
finishCh.publish({
headers: response.headers,
body,
path: response.url,
method: options.method
})
}

return body
})
Expand Down
17 changes: 12 additions & 5 deletions packages/datadog-plugin-openai/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ class OpenApiPlugin extends TracingPlugin {
}
}

if (payload.stream) {
tags['openai.request.stream'] = payload.stream
}

switch (methodName) {
case 'createFineTune':
case 'fine_tuning.jobs.create':
Expand Down Expand Up @@ -298,7 +302,8 @@ function retrieveModelRequestExtraction (tags, payload) {
}

function createChatCompletionRequestExtraction (tags, payload, store) {
if (!defensiveArrayLength(payload.messages)) return
const messages = payload.messages
if (!defensiveArrayLength(messages)) return

store.messages = payload.messages
for (let i = 0; i < payload.messages.length; i++) {
Expand Down Expand Up @@ -600,18 +605,20 @@ function commonCreateResponseExtraction (tags, body, store) {
tags[`openai.response.choices.${choiceIdx}.text`] = truncateText(choice.text)

// createChatCompletion only
if (choice.message) {
const message = choice.message
const message = choice.message || choice.delta // delta for streamed responses
if (message) {
tags[`openai.response.choices.${choiceIdx}.message.role`] = message.role
tags[`openai.response.choices.${choiceIdx}.message.content`] = truncateText(message.content)
tags[`openai.response.choices.${choiceIdx}.message.name`] = truncateText(message.name)
if (message.tool_calls) {
const toolCalls = message.tool_calls
for (let toolIdx = 0; toolIdx < toolCalls.length; toolIdx++) {
tags[`openai.response.choices.${choiceIdx}.message.tool_calls.${toolIdx}.name`] =
tags[`openai.response.choices.${choiceIdx}.message.tool_calls.${toolIdx}.function.name`] =
toolCalls[toolIdx].function.name
tags[`openai.response.choices.${choiceIdx}.message.tool_calls.${toolIdx}.arguments`] =
tags[`openai.response.choices.${choiceIdx}.message.tool_calls.${toolIdx}.function.arguments`] =
toolCalls[toolIdx].function.arguments
tags[`openai.response.choices.${choiceIdx}.message.tool_calls.${toolIdx}.id`] =
toolCalls[toolIdx].id
}
}
}
Expand Down
Loading
Loading