Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add some shared llm-type plugin utilities #5109

Merged
merged 10 commits into from
Jan 28, 2025
36 changes: 6 additions & 30 deletions packages/datadog-plugin-langchain/src/handlers/default.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
'use strict'

const Sampler = require('../../../dd-trace/src/sampler')
const makeUtilities = require('../../../dd-trace/src/plugins/util/llm')

const RE_NEWLINE = /\n/g
const RE_TAB = /\t/g

// TODO: should probably refactor the OpenAI integration to use a shared LLMTracingPlugin base class
// This logic isn't particular to LangChain
class LangChainHandler {
constructor (config) {
this.config = config
this.sampler = new Sampler(config.spanPromptCompletionSampleRate)
constructor (tracerConfig) {
const utilities = makeUtilities('langchain', tracerConfig)

this.normalize = utilities.normalize
this.isPromptCompletionSampled = utilities.isPromptCompletionSampled
}

// no-op for default handler
Expand All @@ -27,27 +24,6 @@ class LangChainHandler {

// no-op for default handler
extractModel (instance) {}

normalize (text) {
if (!text) return
if (typeof text !== 'string' || !text || (typeof text === 'string' && text.length === 0)) return

const max = this.config.spanCharLimit

text = text
.replace(RE_NEWLINE, '\\n')
.replace(RE_TAB, '\\t')

if (text.length > max) {
return text.substring(0, max) + '...'
}

return text
}

isPromptCompletionSampled () {
return this.sampler.isSampled()
}
}

module.exports = LangChainHandler
11 changes: 5 additions & 6 deletions packages/datadog-plugin-langchain/src/tracing.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,12 @@ class LangChainTracingPlugin extends TracingPlugin {
constructor () {
super(...arguments)

const langchainConfig = this._tracerConfig.langchain || {}
this.handlers = {
chain: new LangChainChainHandler(langchainConfig),
chat_model: new LangChainChatModelHandler(langchainConfig),
llm: new LangChainLLMHandler(langchainConfig),
embedding: new LangChainEmbeddingHandler(langchainConfig),
default: new LangChainHandler(langchainConfig)
chain: new LangChainChainHandler(this._tracerConfig),
chat_model: new LangChainChatModelHandler(this._tracerConfig),
llm: new LangChainLLMHandler(this._tracerConfig),
embedding: new LangChainEmbeddingHandler(this._tracerConfig),
default: new LangChainHandler(this._tracerConfig)
}
}

Expand Down
47 changes: 14 additions & 33 deletions packages/datadog-plugin-openai/src/tracing.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,9 @@ const Sampler = require('../../dd-trace/src/sampler')
const { MEASURED } = require('../../../ext/tags')
const { estimateTokens } = require('./token-estimator')

// String#replaceAll unavailable on Node.js@v14 (dd-trace@<=v3)
const RE_NEWLINE = /\n/g
const RE_TAB = /\t/g
const makeUtilities = require('../../dd-trace/src/plugins/util/llm')

// TODO: In the future we should refactor config.js to make it requirable
let MAX_TEXT_LEN = 128
let normalize

function safeRequire (path) {
try {
Expand Down Expand Up @@ -44,9 +41,11 @@ class OpenAiTracingPlugin extends TracingPlugin {

this.sampler = new Sampler(0.1) // default 10% log sampling

// hoist the max length env var to avoid making all of these functions a class method
// hoist the normalize function to avoid making all of these functions a class method
if (this._tracerConfig) {
MAX_TEXT_LEN = this._tracerConfig.openaiSpanCharLimit
const utilities = makeUtilities('openai', this._tracerConfig)

normalize = utilities.normalize
}
}

Expand Down Expand Up @@ -116,7 +115,7 @@ class OpenAiTracingPlugin extends TracingPlugin {
// createEdit, createEmbedding, createModeration
if (payload.input) {
const normalized = normalizeStringOrTokenArray(payload.input, false)
tags['openai.request.input'] = truncateText(normalized)
tags['openai.request.input'] = normalize(normalized)
openaiStore.input = normalized
}

Expand Down Expand Up @@ -594,7 +593,7 @@ function commonImageResponseExtraction (tags, body) {
for (let i = 0; i < body.data.length; i++) {
const image = body.data[i]
// exactly one of these two options is provided
tags[`openai.response.images.${i}.url`] = truncateText(image.url)
tags[`openai.response.images.${i}.url`] = normalize(image.url)
tags[`openai.response.images.${i}.b64_json`] = image.b64_json && 'returned'
}
}
Expand Down Expand Up @@ -731,14 +730,14 @@ function commonCreateResponseExtraction (tags, body, openaiStore, methodName) {

tags[`openai.response.choices.${choiceIdx}.finish_reason`] = choice.finish_reason
tags[`openai.response.choices.${choiceIdx}.logprobs`] = specifiesLogProb ? 'returned' : undefined
tags[`openai.response.choices.${choiceIdx}.text`] = truncateText(choice.text)
tags[`openai.response.choices.${choiceIdx}.text`] = normalize(choice.text)

// createChatCompletion only
const message = choice.message || choice.delta // delta for streamed responses
if (message) {
tags[`openai.response.choices.${choiceIdx}.message.role`] = message.role
tags[`openai.response.choices.${choiceIdx}.message.content`] = truncateText(message.content)
tags[`openai.response.choices.${choiceIdx}.message.name`] = truncateText(message.name)
tags[`openai.response.choices.${choiceIdx}.message.content`] = normalize(message.content)
tags[`openai.response.choices.${choiceIdx}.message.name`] = normalize(message.name)
if (message.tool_calls) {
const toolCalls = message.tool_calls
for (let toolIdx = 0; toolIdx < toolCalls.length; toolIdx++) {
Expand Down Expand Up @@ -795,24 +794,6 @@ function truncateApiKey (apiKey) {
return apiKey && `sk-...${apiKey.substr(apiKey.length - 4)}`
}

/**
* for cleaning up prompt and response
*/
function truncateText (text) {
if (!text) return
if (typeof text !== 'string' || !text || (typeof text === 'string' && text.length === 0)) return

text = text
.replace(RE_NEWLINE, '\\n')
.replace(RE_TAB, '\\t')

if (text.length > MAX_TEXT_LEN) {
return text.substring(0, MAX_TEXT_LEN) + '...'
}

return text
}

function tagChatCompletionRequestContent (contents, messageIdx, tags) {
if (typeof contents === 'string') {
tags[`openai.request.messages.${messageIdx}.content`] = contents
Expand All @@ -824,10 +805,10 @@ function tagChatCompletionRequestContent (contents, messageIdx, tags) {
const type = content.type
tags[`openai.request.messages.${messageIdx}.content.${contentIdx}.type`] = content.type
if (type === 'text') {
tags[`openai.request.messages.${messageIdx}.content.${contentIdx}.text`] = truncateText(content.text)
tags[`openai.request.messages.${messageIdx}.content.${contentIdx}.text`] = normalize(content.text)
} else if (type === 'image_url') {
tags[`openai.request.messages.${messageIdx}.content.${contentIdx}.image_url.url`] =
truncateText(content.image_url.url)
normalize(content.image_url.url)
}
// unsupported type otherwise, won't be tagged
}
Expand Down Expand Up @@ -1004,7 +985,7 @@ function normalizeStringOrTokenArray (input, truncate) {
const normalized = Array.isArray(input)
? `[${input.join(', ')}]` // "[1, 2, 999]"
: input // "foo"
return truncate ? truncateText(normalized) : normalized
return truncate ? normalize(normalized) : normalized
}

function defensiveArrayLength (maybeArray) {
Expand Down
4 changes: 2 additions & 2 deletions packages/dd-trace/src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ class Config {
this._setValue(defaults, 'inferredProxyServicesEnabled', false)
this._setValue(defaults, 'memcachedCommandEnabled', false)
this._setValue(defaults, 'openAiLogsEnabled', false)
this._setValue(defaults, 'openaiSpanCharLimit', 128)
this._setValue(defaults, 'openai.spanCharLimit', 128)
this._setValue(defaults, 'peerServiceMapping', {})
this._setValue(defaults, 'plugins', true)
this._setValue(defaults, 'port', '8126')
Expand Down Expand Up @@ -802,7 +802,7 @@ class Config {
// Requires an accompanying DD_APM_OBFUSCATION_MEMCACHED_KEEP_COMMAND=true in the agent
this._setBoolean(env, 'memcachedCommandEnabled', DD_TRACE_MEMCACHED_COMMAND_ENABLED)
this._setBoolean(env, 'openAiLogsEnabled', DD_OPENAI_LOGS_ENABLED)
this._setValue(env, 'openaiSpanCharLimit', maybeInt(DD_OPENAI_SPAN_CHAR_LIMIT))
this._setValue(env, 'openai.spanCharLimit', maybeInt(DD_OPENAI_SPAN_CHAR_LIMIT))
this._envUnprocessed.openaiSpanCharLimit = DD_OPENAI_SPAN_CHAR_LIMIT
if (DD_TRACE_PEER_SERVICE_MAPPING) {
this._setValue(env, 'peerServiceMapping', fromEntries(
Expand Down
35 changes: 35 additions & 0 deletions packages/dd-trace/src/plugins/util/llm.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
const Sampler = require('../../sampler')

const RE_NEWLINE = /\n/g
const RE_TAB = /\t/g

function normalize (text, limit = 128) {
if (!text) return
if (typeof text !== 'string' || !text || (typeof text === 'string' && text.length === 0)) return

text = text
.replace(RE_NEWLINE, '\\n')
.replace(RE_TAB, '\\t')

if (text.length > limit) {
return text.substring(0, limit) + '...'
}

return text
}

function isPromptCompletionSampled (sampler) {
return sampler.isSampled()
}

module.exports = function (integrationName, tracerConfig) {
const integrationConfig = tracerConfig[integrationName] || {}
const { spanCharLimit, spanPromptCompletionSampleRate } = integrationConfig

const sampler = new Sampler(spanPromptCompletionSampleRate ?? 1.0)

return {
normalize: str => normalize(str, spanCharLimit),
isPromptCompletionSampled: () => isPromptCompletionSampled(sampler)
}
}
2 changes: 1 addition & 1 deletion packages/dd-trace/test/config.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ describe('Config', () => {
{ name: 'logInjection', value: false, origin: 'default' },
{ name: 'lookup', value: undefined, origin: 'default' },
{ name: 'openAiLogsEnabled', value: false, origin: 'default' },
{ name: 'openaiSpanCharLimit', value: 128, origin: 'default' },
{ name: 'openai.spanCharLimit', value: 128, origin: 'default' },
{ name: 'peerServiceMapping', value: {}, origin: 'default' },
{ name: 'plugins', value: true, origin: 'default' },
{ name: 'port', value: '8126', origin: 'default' },
Expand Down
64 changes: 64 additions & 0 deletions packages/dd-trace/test/plugins/util/llm.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
'use strict'

require('../../setup/tap')

const makeUtilities = require('../../../src/plugins/util/llm')

describe('llm utils', () => {
let utils

describe('with default configuration', () => {
beforeEach(() => {
utils = makeUtilities('langchain', {})
})

it('should normalize text to 128 characters', () => {
const text = 'a'.repeat(256)
expect(utils.normalize(text)).to.equal('a'.repeat(128) + '...')
})

it('should return undefined for empty text', () => {
expect(utils.normalize('')).to.be.undefined
})

it('should return undefined for a non-string', () => {
expect(utils.normalize(42)).to.be.undefined
})

it('should replace special characters', () => {
expect(utils.normalize('a\nb\tc')).to.equal('a\\nb\\tc')
})

it('should always sample prompt completion', () => {
expect(utils.isPromptCompletionSampled()).to.be.true
})
})

describe('with custom configuration available', () => {
beforeEach(() => {
utils = makeUtilities('langchain', {
langchain: {
spanCharLimit: 100,
spanPromptCompletionSampleRate: 0.5
}
})
})

it('should normalize text to 100 characters', () => {
const text = 'a'.repeat(256)
expect(utils.normalize(text)).to.equal('a'.repeat(100) + '...')
})

it('should sample prompt completion at 50%', () => {
sinon.stub(Math, 'random').returns(0.4)
sabrenner marked this conversation as resolved.
Show resolved Hide resolved
expect(utils.isPromptCompletionSampled()).to.be.true
Math.random.restore()
})

it('should not sample prompt completion at 50%', () => {
sinon.stub(Math, 'random').returns(0.6)
expect(utils.isPromptCompletionSampled()).to.be.false
Math.random.restore()
})
})
})
Loading