Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(openai): finish span on error #4352

Merged
merged 10 commits into from
May 29, 2024
28 changes: 18 additions & 10 deletions packages/datadog-instrumentations/src/openai.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ addHook({ name: 'openai', file: 'dist/api.js', versions: ['>=3.0.0 <4'] }, expor

return fn.apply(this, arguments)
.then((response) => {
finishCh.publish({
finish({
headers: response.headers,
body: response.data,
path: response.request.path,
Expand All @@ -132,10 +132,10 @@ addHook({ name: 'openai', file: 'dist/api.js', versions: ['>=3.0.0 <4'] }, expor

return response
})
.catch((err) => {
errorCh.publish({ err })
.catch(error => {
finish(undefined, error)

throw err
throw error
})
})
}
Expand Down Expand Up @@ -243,7 +243,7 @@ function wrapStreamIterator (response, options) {
})
}

finishCh.publish({
finish({
headers: response.headers,
body: content,
path: response.url,
Expand All @@ -254,7 +254,7 @@ function wrapStreamIterator (response, options) {
return res
})
.catch(err => {
errorCh.publish({ err })
finish(undefined, err)

throw err
})
Expand Down Expand Up @@ -307,7 +307,7 @@ for (const shim of V4_PACKAGE_SHIMS) {
)
}
} else {
finishCh.publish({
finish({
headers: response.headers,
body,
path: response.url,
Expand All @@ -317,10 +317,10 @@ for (const shim of V4_PACKAGE_SHIMS) {

return body
})
.catch(err => {
errorCh.publish({ err })
.catch(error => {
finish(undefined, error)

throw err
throw error
})
.finally(() => {
// maybe we don't want to unwrap here in case the promise is re-used?
Expand All @@ -335,3 +335,11 @@ for (const shim of V4_PACKAGE_SHIMS) {
return exports
})
}

function finish (response, error) {
if (error) {
errorCh.publish({ error })
}

finishCh.publish(response)
}
111 changes: 55 additions & 56 deletions packages/datadog-plugin-openai/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -179,76 +179,71 @@ class OpenApiPlugin extends TracingPlugin {
span.addTags(tags)
}

finish ({ headers, body, method, path }) {
if (headers.constructor.name === 'Headers') {
headers = Object.fromEntries(headers)
finish (response) {
const span = this.activeSpan
const error = !!span.context()._tags.error

let headers, body, method, path
if (!error) {
headers = response.headers
body = response.body
method = response.method
path = response.path
}

const span = this.activeSpan
if (!error && headers?.constructor.name === 'Headers') {
headers = Object.fromEntries(headers)
}
const methodName = span._spanContext._tags['resource.name']

body = coerceResponseBody(body, methodName)

const fullStore = storage.getStore()
const store = fullStore.openai

if (path.startsWith('https://') || path.startsWith('http://')) {
if (!error && (path.startsWith('https://') || path.startsWith('http://'))) {
// basic checking for if the path was set as a full URL
// not using a full regex as it will likely be "https://api.openai.com/..."
path = new URL(path).pathname
}
const endpoint = lookupOperationEndpoint(methodName, path)

const tags = {
'openai.request.endpoint': endpoint,
'openai.request.method': method.toUpperCase(),
const tags = error
? {}
: {
'openai.request.endpoint': endpoint,
'openai.request.method': method.toUpperCase(),

'openai.organization.id': body.organization_id, // only available in fine-tunes endpoints
'openai.organization.name': headers['openai-organization'],
'openai.organization.id': body.organization_id, // only available in fine-tunes endpoints
'openai.organization.name': headers['openai-organization'],

'openai.response.model': headers['openai-model'] || body.model, // specific model, often undefined
'openai.response.id': body.id, // common creation value, numeric epoch
'openai.response.deleted': body.deleted, // common boolean field in delete responses
'openai.response.model': headers['openai-model'] || body.model, // specific model, often undefined
'openai.response.id': body.id, // common creation value, numeric epoch
'openai.response.deleted': body.deleted, // common boolean field in delete responses

// The OpenAI API appears to use both created and created_at in different places
// Here we're conciously choosing to surface this inconsistency instead of normalizing
'openai.response.created': body.created,
'openai.response.created_at': body.created_at
}
// The OpenAI API appears to use both created and created_at in different places
// Here we're conciously choosing to surface this inconsistency instead of normalizing
'openai.response.created': body.created,
'openai.response.created_at': body.created_at
}

responseDataExtractionByMethod(methodName, tags, body, store)
span.addTags(tags)

super.finish()
this.sendLog(methodName, span, tags, store, false)
this.sendMetrics(headers, body, endpoint, span._duration)
this.sendLog(methodName, span, tags, store, error)
this.sendMetrics(headers, body, endpoint, span._duration, error)
}

error (...args) {
super.error(...args)

const span = this.activeSpan
const methodName = span._spanContext._tags['resource.name']

const fullStore = storage.getStore()
const store = fullStore.openai

// We don't know most information about the request when it fails

const tags = ['error:1']
this.metrics.distribution('openai.request.duration', span._duration * 1000, tags)
this.metrics.increment('openai.request.error', 1, tags)

this.sendLog(methodName, span, {}, store, true)
}

sendMetrics (headers, body, endpoint, duration) {
const tags = [
`org:${headers['openai-organization']}`,
`endpoint:${endpoint}`, // just "/v1/models", no method
`model:${headers['openai-model']}`,
'error:0'
]
sendMetrics (headers, body, endpoint, duration, error) {
const tags = [`error:${Number(!!error)}`]
if (error) {
this.metrics.increment('openai.request.error', 1, tags)
} else {
tags.push(`org:${headers['openai-organization']}`)
tags.push(`endpoint:${endpoint}`) // just "/v1/models", no method
tags.push(`model:${headers['openai-model']}`)
}

this.metrics.distribution('openai.request.duration', duration * 1000, tags)

Expand All @@ -260,20 +255,24 @@ class OpenApiPlugin extends TracingPlugin {
this.metrics.distribution('openai.tokens.total', promptTokens + completionTokens, tags)
}

if (headers['x-ratelimit-limit-requests']) {
this.metrics.gauge('openai.ratelimit.requests', Number(headers['x-ratelimit-limit-requests']), tags)
}
if (headers) {
if (headers['x-ratelimit-limit-requests']) {
this.metrics.gauge('openai.ratelimit.requests', Number(headers['x-ratelimit-limit-requests']), tags)
}

if (headers['x-ratelimit-remaining-requests']) {
this.metrics.gauge('openai.ratelimit.remaining.requests', Number(headers['x-ratelimit-remaining-requests']), tags)
}
if (headers['x-ratelimit-remaining-requests']) {
this.metrics.gauge(
'openai.ratelimit.remaining.requests', Number(headers['x-ratelimit-remaining-requests']), tags
)
}

if (headers['x-ratelimit-limit-tokens']) {
this.metrics.gauge('openai.ratelimit.tokens', Number(headers['x-ratelimit-limit-tokens']), tags)
}
if (headers['x-ratelimit-limit-tokens']) {
this.metrics.gauge('openai.ratelimit.tokens', Number(headers['x-ratelimit-limit-tokens']), tags)
}

if (headers['x-ratelimit-remaining-tokens']) {
this.metrics.gauge('openai.ratelimit.remaining.tokens', Number(headers['x-ratelimit-remaining-tokens']), tags)
if (headers['x-ratelimit-remaining-tokens']) {
this.metrics.gauge('openai.ratelimit.remaining.tokens', Number(headers['x-ratelimit-remaining-tokens']), tags)
}
}
}

Expand Down
68 changes: 64 additions & 4 deletions packages/datadog-plugin-openai/test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,66 @@ describe('Plugin', () => {
})
})

describe('with error', () => {
let scope

beforeEach(() => {
scope = nock('https://api.openai.com:443')
.get('/v1/models')
.reply(400, {
error: {
message: 'fake message',
type: 'fake type',
param: 'fake param',
code: null
}
})
})

afterEach(() => {
nock.removeInterceptor(scope)
scope.done()
})

it('should attach the error to the span', async () => {
const checkTraces = agent
.use(traces => {
expect(traces[0][0]).to.have.property('error', 1)
// the message content differs on OpenAI version, even between patches
expect(traces[0][0].meta['error.message']).to.exist
expect(traces[0][0].meta).to.have.property('error.type', 'Error')
expect(traces[0][0].meta['error.stack']).to.exist
})

try {
if (semver.satisfies(realVersion, '>=4.0.0')) {
await openai.models.list()
} else {
await openai.listModels()
}
} catch {
// ignore, we expect an error
}

await checkTraces

clock.tick(10 * 1000)

const expectedTags = ['error:1']

expect(metricStub).to.have.been.calledWith('openai.request.error', 1, 'c', expectedTags)
expect(metricStub).to.have.been.calledWith('openai.request.duration') // timing value not guaranteed

expect(metricStub).to.not.have.been.calledWith('openai.tokens.prompt')
expect(metricStub).to.not.have.been.calledWith('openai.tokens.completion')
expect(metricStub).to.not.have.been.calledWith('openai.tokens.total')
expect(metricStub).to.not.have.been.calledWith('openai.ratelimit.requests')
expect(metricStub).to.not.have.been.calledWith('openai.ratelimit.tokens')
expect(metricStub).to.not.have.been.calledWith('openai.ratelimit.remaining.requests')
expect(metricStub).to.not.have.been.calledWith('openai.ratelimit.remaining.tokens')
})
})

describe('create completion', () => {
afterEach(() => {
nock.cleanAll()
Expand Down Expand Up @@ -196,10 +256,10 @@ describe('Plugin', () => {
clock.tick(10 * 1000)

const expectedTags = [
'error:0',
'org:kill-9',
'endpoint:/v1/completions',
'model:text-davinci-002',
'error:0'
'model:text-davinci-002'
]

expect(metricStub).to.have.been.calledWith('openai.request.duration') // timing value not guaranteed
Expand Down Expand Up @@ -745,10 +805,10 @@ describe('Plugin', () => {
await checkTraces

const expectedTags = [
'error:0',
'org:kill-9',
'endpoint:/v1/edits',
'model:text-davinci-edit:001',
'error:0'
'model:text-davinci-edit:001'
]

expect(metricStub).to.be.calledWith('openai.ratelimit.requests', 20, 'g', expectedTags)
Expand Down
Loading