diff --git a/examples/worker/src/handler.ts b/examples/worker/src/handler.ts index f8d480d..1d49320 100644 --- a/examples/worker/src/handler.ts +++ b/examples/worker/src/handler.ts @@ -15,6 +15,10 @@ const handleDO = async (request: Request, env: Env): Promise => { return await stub.fetch('https://does-not-exist.com/blah') } +const handleNoAsync = (request: Request): Response => { + return new Response('Very quick hello!') +} + const handleRest = async (request: Request, env: Env, ctx: ExecutionContext): Promise => { trace.getActiveSpan()?.setAttribute('http.route', '/*') withNextSpan({ destination: 'cloudflare' }) @@ -38,6 +42,8 @@ export default { return handleDO(request, env) } else if (pathname === '/error') { throw new Error('You asked for it!') + } else if (pathname === '/noasync') { + return handleNoAsync(request) } else { return handleRest(request, env, ctx) } diff --git a/src/instrumentation/common.ts b/src/instrumentation/common.ts index 2837b2c..2f28c87 100644 --- a/src/instrumentation/common.ts +++ b/src/instrumentation/common.ts @@ -50,13 +50,8 @@ export async function exportSpans(tracker?: PromiseTracker) { const tracer = trace.getTracer('export') if (tracer instanceof WorkerTracer) { await scheduler.wait(1) - if (tracker) { - await tracker.wait() - } - const promises = tracer.spanProcessors.map(async (spanProcessor) => { - await spanProcessor.forceFlush() - }) - await Promise.allSettled(promises) + await tracker?.wait() + await tracer.forceFlush() } else { console.error('The global tracer is not of type WorkerTracer and can not export spans') } diff --git a/src/instrumentation/email.ts b/src/instrumentation/email.ts index 646d838..c16a7ae 100644 --- a/src/instrumentation/email.ts +++ b/src/instrumentation/email.ts @@ -1,39 +1,10 @@ -import { setConfig, type Initialiser } from '../config' -import { wrap } from '../wrap' -import { exportSpans, proxyExecutionContext } from './common' -import { context as api_context, Exception, SpanKind, type SpanOptions, trace } from '@opentelemetry/api' -import { instrumentEnv } from './env' -import { versionAttributes } from './version' +import { SpanKind, type SpanOptions } from '@opentelemetry/api' import { ATTR_FAAS_TRIGGER, ATTR_MESSAGING_DESTINATION_NAME, ATTR_RPC_MESSAGE_ID, } from '@opentelemetry/semantic-conventions/incubating' - -type EmailHandler = EmailExportedHandler -export type EmailHandlerArgs = Parameters - -export function createEmailHandler(emailFn: EmailHandler, initialiser: Initialiser): EmailHandler { - const emailHandler: ProxyHandler = { - async apply(target, _thisArg, argArray: Parameters): Promise { - const [message, orig_env, orig_ctx] = argArray - const config = initialiser(orig_env as Record, message) - const env = instrumentEnv(orig_env as Record) - const { ctx, tracker } = proxyExecutionContext(orig_ctx) - const context = setConfig(config) - - try { - const args: EmailHandlerArgs = [message, env, ctx] - return await api_context.with(context, executeEmailHandler, undefined, target, args) - } catch (error) { - throw error - } finally { - orig_ctx.waitUntil(exportSpans(tracker)) - } - }, - } - return wrap(emailFn, emailHandler) -} +import { HandlerInstrumentation, OrPromise } from '../types' /** * Converts the message headers into a record ready to be injected @@ -50,27 +21,22 @@ function headerAttributes(message: { headers: Headers }): Record [`email.header.${key}`, value] as const)) } -async function executeEmailHandler(emailFn: EmailHandler, [message, env, ctx]: EmailHandlerArgs): Promise { - const tracer = trace.getTracer('emailHandler') - const options = { - attributes: { +export const emailInstrumentation: HandlerInstrumentation> = { + getInitialSpanInfo: (message) => { + const attributes = { [ATTR_FAAS_TRIGGER]: 'other', [ATTR_RPC_MESSAGE_ID]: message.headers.get('Message-Id') ?? undefined, [ATTR_MESSAGING_DESTINATION_NAME]: message.to, - }, - kind: SpanKind.CONSUMER, - } satisfies SpanOptions - Object.assign(options.attributes!, headerAttributes(message), versionAttributes(env)) - const promise = tracer.startActiveSpan(`emailHandler ${message.to}`, options, async (span) => { - try { - const result = await emailFn(message, env, ctx) - span.end() - return result - } catch (error) { - span.recordException(error as Exception) - span.end() - throw error } - }) - return promise + Object.assign(attributes, headerAttributes(message)) + const options = { + attributes, + kind: SpanKind.CONSUMER, + } satisfies SpanOptions + + return { + name: `emailHandler ${message.to}`, + options, + } + }, } diff --git a/src/instrumentation/env.ts b/src/instrumentation/env.ts index b84a025..51b6fc9 100644 --- a/src/instrumentation/env.ts +++ b/src/instrumentation/env.ts @@ -39,7 +39,7 @@ const isD1Database = (item?: unknown): item is D1Database => { return !!(item as D1Database)?.exec && !!(item as D1Database)?.prepare } -const instrumentEnv = (env: Record): Record => { +const instrumentEnv = >(env: E): E => { const envHandler: ProxyHandler> = { get: (target, prop, receiver) => { const item = Reflect.get(target, prop, receiver) @@ -66,7 +66,7 @@ const instrumentEnv = (env: Record): Record => } }, } - return wrap(env, envHandler) + return wrap(env, envHandler) as E } export { instrumentEnv } diff --git a/src/instrumentation/fetch.ts b/src/instrumentation/fetch.ts index 4e578f8..4711d4b 100644 --- a/src/instrumentation/fetch.ts +++ b/src/instrumentation/fetch.ts @@ -5,17 +5,15 @@ import { propagation, context as api_context, Attributes, - Exception, Context, - SpanStatusCode, + Span, } from '@opentelemetry/api' -import { Initialiser, getActiveConfig, setConfig } from '../config.js' +import { getActiveConfig } from '../config.js' import { wrap } from '../wrap.js' -import { instrumentEnv } from './env.js' -import { exportSpans, proxyExecutionContext } from './common.js' -import { ResolvedTraceConfig } from '../types.js' +import { HandlerInstrumentation, OrPromise, ResolvedTraceConfig } from '../types.js' import { ReadableSpan } from '@opentelemetry/sdk-trace-base' -import { versionAttributes } from './version.js' + +type IncomingRequest = Parameters[0] export type IncludeTraceContextFn = (request: Request) => boolean export interface FetcherConfig { @@ -32,9 +30,6 @@ export interface FetchHandlerConfig { acceptTraceContext?: boolean | AcceptTraceContextFn } -type FetchHandler = ExportedHandlerFetchHandler -type FetchHandlerArgs = Parameters - const netKeysFromCF = new Set(['colo', 'country', 'request_priority', 'tls_cipher', 'tls_version', 'asn', 'tcp_rtt']) const camelToSnakeCase = (s: string): string => { @@ -125,75 +120,38 @@ function getParentContextFromRequest(request: Request) { return acceptTraceContext ? getParentContextFromHeaders(request.headers) : api_context.active() } -export function waitUntilTrace(fn: () => Promise): Promise { - const tracer = trace.getTracer('waitUntil') - return tracer.startActiveSpan('waitUntil', async (span) => { - await fn() - span.end() - }) -} - -let cold_start = true -export function executeFetchHandler(fetchFn: FetchHandler, [request, env, ctx]: FetchHandlerArgs): Promise { - const spanContext = getParentContextFromRequest(request) - - const tracer = trace.getTracer('fetchHandler') - const attributes = { - ['faas.trigger']: 'http', - ['faas.coldstart']: cold_start, - ['faas.invocation_id']: request.headers.get('cf-ray') ?? undefined, +function updateSpanNameOnRoute(span: Span, request: IncomingRequest) { + const readable = span as unknown as ReadableSpan + if (readable.attributes['http.route']) { + const method = request.method.toUpperCase() + span.updateName(`${method} ${readable.attributes['http.route']}`) } - cold_start = false - Object.assign(attributes, gatherRequestAttributes(request)) - Object.assign(attributes, gatherIncomingCfAttributes(request)) - Object.assign(attributes, versionAttributes(env)) - const options: SpanOptions = { - attributes, - kind: SpanKind.SERVER, - } - - const method = request.method.toUpperCase() - const promise = tracer.startActiveSpan(`fetchHandler ${method}`, options, spanContext, async (span) => { - const readable = span as unknown as ReadableSpan - try { - const response = await fetchFn(request, env, ctx) - span.setAttributes(gatherResponseAttributes(response)) - - return response - } catch (error) { - span.recordException(error as Exception) - span.setStatus({ code: SpanStatusCode.ERROR }) - throw error - } finally { - if (readable.attributes['http.route']) { - span.updateName(`fetchHandler ${method} ${readable.attributes['http.route']}`) - } - span.end() - } - }) - return promise } -export function createFetchHandler(fetchFn: FetchHandler, initialiser: Initialiser) { - const fetchHandler: ProxyHandler = { - apply: async (target, _thisArg, argArray: Parameters): Promise => { - const [request, orig_env, orig_ctx] = argArray - const config = initialiser(orig_env as Record, request) - const env = instrumentEnv(orig_env as Record) - const { ctx, tracker } = proxyExecutionContext(orig_ctx) - const context = setConfig(config) - - try { - const args: FetchHandlerArgs = [request, env, ctx] - return await api_context.with(context, executeFetchHandler, undefined, target, args) - } catch (error) { - throw error - } finally { - orig_ctx.waitUntil(exportSpans(tracker)) - } - }, - } - return wrap(fetchFn, fetchHandler) +export const fetchInstrumentation: HandlerInstrumentation> = { + getInitialSpanInfo: (request) => { + const spanContext = getParentContextFromRequest(request) + const attributes = { + ['faas.trigger']: 'http', + ['faas.invocation_id']: request.headers.get('cf-ray') ?? undefined, + } + Object.assign(attributes, gatherRequestAttributes(request)) + Object.assign(attributes, gatherIncomingCfAttributes(request)) + const method = request.method.toUpperCase() + return { + name: `fetchHandler ${method}`, + options: { + attributes, + kind: SpanKind.SERVER, + }, + context: spanContext, + } + }, + getAttributesFromResult: (response) => { + return gatherResponseAttributes(response) + }, + executionSucces: updateSpanNameOnRoute, + executionFailed: updateSpanNameOnRoute, } type getFetchConfig = (config: ResolvedTraceConfig) => FetcherConfig diff --git a/src/instrumentation/queue.ts b/src/instrumentation/queue.ts index 585e46c..28f09f6 100644 --- a/src/instrumentation/queue.ts +++ b/src/instrumentation/queue.ts @@ -1,20 +1,18 @@ -import { trace, SpanOptions, SpanKind, Attributes, Exception, context as api_context } from '@opentelemetry/api' -import { SemanticAttributes } from '@opentelemetry/semantic-conventions' -import { Initialiser, setConfig } from '../config.js' -import { exportSpans, proxyExecutionContext } from './common.js' -import { instrumentEnv } from './env.js' +import { trace, SpanKind, Attributes, Span } from '@opentelemetry/api' import { unwrap, wrap } from '../wrap.js' -import { versionAttributes } from './version.js' +import { HandlerInstrumentation, InitialSpanInfo, OrPromise } from '../types.js' +import { ATTR_FAAS_TRIGGER, FAAS_TRIGGER_VALUE_PUBSUB } from '@opentelemetry/semantic-conventions/incubating' type QueueHandler = ExportedHandlerQueueHandler export type QueueHandlerArgs = Parameters -const traceIdSymbol = Symbol('traceId') - class MessageStatusCount { succeeded = 0 failed = 0 + implicitly_acked = 0 + implicitly_retried = 0 readonly total: number + constructor(total: number) { this.total = total } @@ -24,6 +22,7 @@ class MessageStatusCount { } ackRemaining() { + this.implicitly_acked = this.total - this.succeeded - this.failed this.succeeded = this.total - this.failed } @@ -32,6 +31,7 @@ class MessageStatusCount { } retryRemaining() { + this.implicitly_retried = this.total - this.succeeded - this.failed this.failed = this.total - this.succeeded } @@ -41,6 +41,8 @@ class MessageStatusCount { 'queue.messages_success': this.succeeded, 'queue.messages_failed': this.failed, 'queue.batch_success': this.succeeded === this.total, + 'queue.implicitly_acked': this.implicitly_acked, + 'queue.implicitly_retried': this.implicitly_retried, } } } @@ -131,60 +133,40 @@ const proxyMessageBatch = (batch: MessageBatch, count: MessageStatusCount) => { return wrap(batch, batchHandler) } -export function executeQueueHandler(queueFn: QueueHandler, [batch, env, ctx]: QueueHandlerArgs): Promise { - const count = new MessageStatusCount(batch.messages.length) - batch = proxyMessageBatch(batch, count) - const tracer = trace.getTracer('queueHandler') - const options: SpanOptions = { - attributes: { - [SemanticAttributes.FAAS_TRIGGER]: 'pubsub', - 'queue.name': batch.queue, - }, - kind: SpanKind.CONSUMER, - } - Object.assign(options.attributes!, versionAttributes(env)) - const promise = tracer.startActiveSpan(`queueHandler ${batch.queue}`, options, async (span) => { - const traceId = span.spanContext().traceId - api_context.active().setValue(traceIdSymbol, traceId) - try { - const result = await queueFn(batch, env, ctx) - span.setAttribute('queue.implicitly_acked', count.total - count.succeeded - count.failed) - count.ackRemaining() - span.setAttributes(count.toAttributes()) - span.end() - return result - } catch (error) { - span.recordException(error as Exception) - span.setAttribute('queue.implicitly_retried', count.total - count.succeeded - count.failed) - count.retryRemaining() - span.end() - throw error +export class QueueInstrumentation implements HandlerInstrumentation> { + private count?: MessageStatusCount + + getInitialSpanInfo(batch: MessageBatch): InitialSpanInfo { + return { + name: `queueHandler ${batch.queue}`, + options: { + attributes: { + [ATTR_FAAS_TRIGGER]: FAAS_TRIGGER_VALUE_PUBSUB, + 'queue.name': batch.queue, + }, + kind: SpanKind.CONSUMER, + }, } - }) - return promise -} + } -export function createQueueHandler(queueFn: QueueHandler, initialiser: Initialiser) { - const queueHandler: ProxyHandler = { - async apply(target, _thisArg, argArray: Parameters): Promise { - const [batch, orig_env, orig_ctx] = argArray - const config = initialiser(orig_env as Record, batch) - const env = instrumentEnv(orig_env as Record) - const { ctx, tracker } = proxyExecutionContext(orig_ctx) - const context = setConfig(config) - - try { - const args: QueueHandlerArgs = [batch, env, ctx] - - return await api_context.with(context, executeQueueHandler, undefined, target, args) - } catch (error) { - throw error - } finally { - orig_ctx.waitUntil(exportSpans(tracker)) - } - }, + instrumentTrigger(batch: MessageBatch): MessageBatch { + this.count = new MessageStatusCount(batch.messages.length) + return proxyMessageBatch(batch, this.count) + } + + executionSucces(span: Span) { + if (this.count) { + this.count.ackRemaining() + span.setAttributes(this.count.toAttributes()) + } + } + + executionFailed(span: Span) { + if (this.count) { + this.count.retryRemaining() + span.setAttributes(this.count.toAttributes()) + } } - return wrap(queueFn, queueHandler) } function instrumentQueueSend(fn: Queue['send'], name: string): Queue['send'] { diff --git a/src/instrumentation/scheduled.ts b/src/instrumentation/scheduled.ts index 0dcbe37..8fbd0ae 100644 --- a/src/instrumentation/scheduled.ts +++ b/src/instrumentation/scheduled.ts @@ -1,70 +1,24 @@ -import { trace, SpanOptions, SpanKind, Exception, context as api_context, SpanStatusCode } from '@opentelemetry/api' -import { SemanticAttributes } from '@opentelemetry/semantic-conventions' -import { Initialiser, setConfig } from '../config.js' -import { exportSpans, proxyExecutionContext } from './common.js' -import { instrumentEnv } from './env.js' -import { wrap } from '../wrap.js' -import { versionAttributes } from './version.js' +import { SpanKind } from '@opentelemetry/api' +import { HandlerInstrumentation, InitialSpanInfo, OrPromise } from '../types.js' +import { + ATTR_FAAS_CRON, + ATTR_FAAS_TIME, + ATTR_FAAS_TRIGGER, + FAAS_TRIGGER_VALUE_TIMER, +} from '@opentelemetry/semantic-conventions/incubating' -type ScheduledHandler = ExportedHandlerScheduledHandler -export type ScheduledHandlerArgs = Parameters - -const traceIdSymbol = Symbol('traceId') - -let cold_start = true -export function executeScheduledHandler( - scheduledFn: ScheduledHandler, - [controller, env, ctx]: ScheduledHandlerArgs, -): Promise { - const tracer = trace.getTracer('scheduledHandler') - const attributes = { - [SemanticAttributes.FAAS_TRIGGER]: 'timer', - [SemanticAttributes.FAAS_COLDSTART]: cold_start, - [SemanticAttributes.FAAS_CRON]: controller.cron, - [SemanticAttributes.FAAS_TIME]: new Date(controller.scheduledTime).toISOString(), - } - cold_start = false - Object.assign(attributes, versionAttributes(env)) - const options: SpanOptions = { - attributes, - kind: SpanKind.SERVER, - } - - const promise = tracer.startActiveSpan(`scheduledHandler ${controller.cron}`, options, async (span) => { - const traceId = span.spanContext().traceId - api_context.active().setValue(traceIdSymbol, traceId) - try { - await scheduledFn(controller, env, ctx) - } catch (error) { - span.recordException(error as Exception) - span.setStatus({ code: SpanStatusCode.ERROR }) - throw error - } finally { - span.end() +export const scheduledInstrumentation: HandlerInstrumentation> = { + getInitialSpanInfo: function (controller: ScheduledController): InitialSpanInfo { + return { + name: `scheduledHandler ${controller.cron}`, + options: { + attributes: { + [ATTR_FAAS_TRIGGER]: FAAS_TRIGGER_VALUE_TIMER, + [ATTR_FAAS_CRON]: controller.cron, + [ATTR_FAAS_TIME]: new Date(controller.scheduledTime).toISOString(), + }, + kind: SpanKind.INTERNAL, + }, } - }) - return promise -} - -export function createScheduledHandler(scheduledFn: ScheduledHandler, initialiser: Initialiser) { - const scheduledHandler: ProxyHandler = { - async apply(target, _thisArg, argArray: Parameters): Promise { - const [controller, orig_env, orig_ctx] = argArray - const config = initialiser(orig_env as Record, controller) - const env = instrumentEnv(orig_env as Record) - const { ctx, tracker } = proxyExecutionContext(orig_ctx) - const context = setConfig(config) - - try { - const args: ScheduledHandlerArgs = [controller, env, ctx] - - return await api_context.with(context, executeScheduledHandler, undefined, target, args) - } catch (error) { - throw error - } finally { - orig_ctx.waitUntil(exportSpans(tracker)) - } - }, - } - return wrap(scheduledFn, scheduledHandler) + }, } diff --git a/src/sdk.ts b/src/sdk.ts index a015c63..c916c4b 100644 --- a/src/sdk.ts +++ b/src/sdk.ts @@ -1,26 +1,37 @@ -import { propagation } from '@opentelemetry/api' +import { context as api_context, Exception, propagation, SpanStatusCode, trace } from '@opentelemetry/api' import { Resource } from '@opentelemetry/resources' -import { Initialiser, parseConfig } from './config.js' +import { Initialiser, parseConfig, setConfig } from './config.js' import { WorkerTracerProvider } from './provider.js' -import { Trigger, TraceConfig, ResolvedTraceConfig } from './types.js' +import { Trigger, TraceConfig, ResolvedTraceConfig, OrPromise, HandlerInstrumentation } from './types.js' import { unwrap } from './wrap.js' -import { createFetchHandler, instrumentGlobalFetch } from './instrumentation/fetch.js' +import { fetchInstrumentation, instrumentGlobalFetch } from './instrumentation/fetch.js' import { instrumentGlobalCache } from './instrumentation/cache.js' -import { createQueueHandler } from './instrumentation/queue.js' +import { QueueInstrumentation } from './instrumentation/queue.js' import { DOClass, instrumentDOClass } from './instrumentation/do.js' -import { createScheduledHandler } from './instrumentation/scheduled.js' +import { scheduledInstrumentation } from './instrumentation/scheduled.js' //@ts-ignore import * as versions from '../versions.json' -import { createEmailHandler } from './instrumentation/email.js' +import { instrumentEnv } from './instrumentation/env.js' +import { versionAttributes } from './instrumentation/version.js' +import { WorkerTracer } from './tracer.js' +import { PromiseTracker, proxyExecutionContext } from './instrumentation/common.js' +import { emailInstrumentation } from './instrumentation/email.js' type FetchHandler = ExportedHandlerFetchHandler type ScheduledHandler = ExportedHandlerScheduledHandler type QueueHandler = ExportedHandlerQueueHandler type EmailHandler = EmailExportedHandler -export type ResolveConfigFn = (env: Env, trigger: Trigger) => TraceConfig -export type ConfigurationOption = TraceConfig | ResolveConfigFn +type Env = Record +type HandlerFn = ( + trigger: T, + env: E, + ctx: ExecutionContext, +) => R | Promise + +type ResolveConfigFn = (env: Env, trigger: Trigger) => TraceConfig +type ConfigurationOption = TraceConfig | ResolveConfigFn export function isRequest(trigger: Trigger): trigger is Request { return trigger instanceof Request @@ -88,7 +99,87 @@ function createInitialiser(config: ConfigurationOption): Initialiser { } } -export function instrument( +export async function exportSpans(traceId: string, tracker?: PromiseTracker) { + const tracer = trace.getTracer('export') + if (tracer instanceof WorkerTracer) { + await scheduler.wait(1) + await tracker?.wait() + await tracer.forceFlush(traceId) + } else { + console.error('The global tracer is not of type WorkerTracer and can not export spans') + } +} + +type HandlerFnArgs = (T | E | ExecutionContext)[] +type OrderedHandlerFnArgs = [trigger: T, env: E, ctx: ExecutionContext] + +let cold_start = true +function createHandlerFlowFn( + instrumentation: HandlerInstrumentation, +): (handlerFn: HandlerFn, [trigger, env, context]: HandlerFnArgs) => ReturnType> { + return (handlerFn, args) => { + const [trigger, env, context] = args as OrderedHandlerFnArgs + const proxiedEnv = instrumentEnv(env) + const { ctx: proxiedCtx, tracker } = proxyExecutionContext(context) + + const instrumentedTrigger = instrumentation.instrumentTrigger ? instrumentation.instrumentTrigger(trigger) : trigger + + const tracer = trace.getTracer('handler') as WorkerTracer + + const { name, options, context: spanContext } = instrumentation.getInitialSpanInfo(trigger) + const attrs = options.attributes || {} + attrs['faas.coldstart'] = cold_start + options.attributes = attrs + Object.assign(attrs, versionAttributes(env)) + cold_start = false + + const parentContext = spanContext || api_context.active() + const result = tracer.startActiveSpan(name, options, parentContext, async (span) => { + try { + const result = await handlerFn(instrumentedTrigger, proxiedEnv, proxiedCtx) + + if (instrumentation.getAttributesFromResult) { + const attributes = instrumentation.getAttributesFromResult(result) + span.setAttributes(attributes) + } + + if (instrumentation.executionSucces) { + instrumentation.executionSucces(span, trigger, result) + } + return result + } catch (error) { + span.recordException(error as Exception) + span.setStatus({ code: SpanStatusCode.ERROR }) + if (instrumentation.executionFailed) { + instrumentation.executionFailed(span, trigger, error) + } + throw error + } finally { + span.end() + context.waitUntil(exportSpans(span.spanContext().traceId, tracker)) + } + }) + + return result + } +} + +function createHandlerProxy>( + handler: unknown, + handlerFn: HandlerFn, + initialiser: Initialiser, + instrumentation: HandlerInstrumentation, +): HandlerFn { + return (trigger: T, env: E, ctx: ExecutionContext): ReturnType> => { + const config = initialiser(env, trigger) + const context = setConfig(config) + + const flowFn = createHandlerFlowFn(instrumentation) + return api_context.with(context, flowFn, handler, handlerFn, [trigger, env, ctx]) as R + } +} + +export function instrument( handler: ExportedHandler, config: ConfigurationOption, ): ExportedHandler { @@ -96,22 +187,22 @@ export function instrument( if (handler.fetch) { const fetcher = unwrap(handler.fetch) as FetchHandler - handler.fetch = createFetchHandler(fetcher, initialiser) + handler.fetch = createHandlerProxy(handler, fetcher, initialiser, fetchInstrumentation) } if (handler.scheduled) { const scheduler = unwrap(handler.scheduled) as ScheduledHandler - handler.scheduled = createScheduledHandler(scheduler, initialiser) + handler.scheduled = createHandlerProxy(handler, scheduler, initialiser, scheduledInstrumentation) } if (handler.queue) { const queuer = unwrap(handler.queue) as QueueHandler - handler.queue = createQueueHandler(queuer, initialiser) + handler.queue = createHandlerProxy(handler, queuer, initialiser, new QueueInstrumentation()) } if (handler.email) { const emailer = unwrap(handler.email) as EmailHandler - handler.email = createEmailHandler(emailer, initialiser) + handler.email = createHandlerProxy(handler, emailer, initialiser, emailInstrumentation) } return handler @@ -123,6 +214,4 @@ export function instrumentDO(doClass: DOClass, config: ConfigurationOption) { return instrumentDOClass(doClass, initialiser) } -export { waitUntilTrace } from './instrumentation/fetch.js' - export const __unwrappedFetch = unwrap(fetch) diff --git a/src/spanprocessor.ts b/src/spanprocessor.ts index 55779e7..2b7e596 100644 --- a/src/spanprocessor.ts +++ b/src/spanprocessor.ts @@ -1,181 +1,47 @@ -import { Context, Span, trace } from '@opentelemetry/api' -import { ReadableSpan, SpanExporter, SpanProcessor } from '@opentelemetry/sdk-trace-base' -import { ExportResult, ExportResultCode } from '@opentelemetry/core' -import { Action, State, stateMachine } from './vendor/ts-checked-fsm/StateMachine.js' +import { Context, Span } from '@opentelemetry/api' +import { ReadableSpan, SpanExporter } from '@opentelemetry/sdk-trace-base' +import { ExportResultCode } from '@opentelemetry/core' +import { getActiveConfig } from './config' +import { TraceFlushableSpanProcessor } from './types' -import { getActiveConfig } from './config.js' -import { TailSampleFn } from './sampling.js' -import { PostProcessorFn } from './types.js' - -type CompletedTrace = { - traceId: string - localRootSpan: ReadableSpan - completedSpans: ReadableSpan[] -} - -type InProgressTrace = { - inProgressSpanIds: Set -} & CompletedTrace - -type InitialState = State<'not_started'> -type InProgressTraceState = State<'in_progress', InProgressTrace> -type TraceCompleteState = State<'trace_complete', CompletedTrace> -type ExportingState = State<'exporting', { promise: Promise }> -type DoneState = State<'done'> - -type StartExportArguments = { - exporter: SpanExporter - tailSampler: TailSampleFn - postProcessor: PostProcessorFn -} - -type StartSpanAction = Action<'startSpan', { span: Span }> -type EndSpanAction = Action<'endSpan', { span: ReadableSpan }> -type StartExportAction = Action<'startExport', { args: StartExportArguments }> - -function newTrace(currentState: InitialState, { span }: StartSpanAction): InProgressTraceState { - const spanId = span.spanContext().spanId - return { - ...currentState, - stateName: 'in_progress', - traceId: span.spanContext().traceId, - localRootSpan: span as unknown as ReadableSpan, - completedSpans: [] as ReadableSpan[], - inProgressSpanIds: new Set([spanId]), - } as const -} - -function newSpan(currentState: InProgressTraceState, { span }: StartSpanAction): InProgressTraceState { - const spanId = span.spanContext().spanId - currentState.inProgressSpanIds.add(spanId) - return { ...currentState } -} - -function endSpan( - currentState: InProgressTraceState, - { span }: EndSpanAction, -): InProgressTraceState | TraceCompleteState { - currentState.completedSpans.push(span) - currentState.inProgressSpanIds.delete(span.spanContext().spanId) - if (currentState.inProgressSpanIds.size === 0) { - return { - stateName: 'trace_complete', - traceId: currentState.traceId, - localRootSpan: currentState.localRootSpan, - completedSpans: currentState.completedSpans, - } as const - } else { - return { ...currentState } - } -} - -function startExport(currentState: TraceCompleteState, { args }: StartExportAction): ExportingState | DoneState { - const { exporter, tailSampler, postProcessor } = args - const { traceId, localRootSpan, completedSpans: spans } = currentState - const shouldExport = tailSampler({ traceId, localRootSpan, spans }) - if (shouldExport) { - const exportSpans = postProcessor(spans) - const promise = new Promise((resolve) => { - exporter.export(exportSpans, resolve) - }) - return { stateName: 'exporting', promise } - } else { - return { stateName: 'done' } - } -} - -const { nextState } = stateMachine() - .state('not_started') - .state<'in_progress', InProgressTraceState>('in_progress') - .state<'trace_complete', TraceCompleteState>('trace_complete') - .state<'exporting', ExportingState>('exporting') - .state('done') - .transition('not_started', 'in_progress') - .transition('in_progress', 'in_progress') - .transition('in_progress', 'trace_complete') - .transition('trace_complete', 'exporting') - .transition('trace_complete', 'done') - .transition('exporting', 'done') - .action<'startSpan', StartSpanAction>('startSpan') - .action<'endSpan', EndSpanAction>('endSpan') - .action<'startExport', StartExportAction>('startExport') - .action('exportDone') - .actionHandler('not_started', 'startSpan', newTrace) - .actionHandler('in_progress', 'startSpan', newSpan) - .actionHandler('in_progress', 'endSpan', endSpan) - .actionHandler('trace_complete', 'startExport', startExport) - .actionHandler('exporting', 'exportDone', (_c, _a) => { - return { stateName: 'done' } as const - }) - .done() - -type AnyTraceState = Parameters[0] -type AnyTraceAction = Parameters[1] - -export class BatchTraceSpanProcessor implements SpanProcessor { - private traceLookup: Map = new Map() - private localRootSpanLookup: Map = new Map() - private inprogressExports: Map> = new Map() +export class BatchTraceSpanProcessor implements TraceFlushableSpanProcessor { + private traces: Record = {} constructor(private exporter: SpanExporter) {} - private action(localRootSpanId: string, action: AnyTraceAction): AnyTraceState { - const state = this.traceLookup.get(localRootSpanId) || { stateName: 'not_started' } - const newState = nextState(state, action) - if (newState.stateName === 'done') { - this.traceLookup.delete(localRootSpanId) - } else { - this.traceLookup.set(localRootSpanId, newState) - } - return newState - } - - private export(localRootSpanId: string) { - const config = getActiveConfig() - if (!config) throw new Error('Config is undefined. This is a bug in the instrumentation logic') - - const { sampling, postProcessor } = config - const exportArgs = { exporter: this.exporter, tailSampler: sampling.tailSampler, postProcessor } - const newState = this.action(localRootSpanId, { actionName: 'startExport', args: exportArgs }) - if (newState.stateName === 'exporting') { - const promise = newState.promise - this.inprogressExports.set(localRootSpanId, promise) - promise.then((result) => { - if (result.code === ExportResultCode.FAILED) { - console.log('Error sending spans to exporter:', result.error) - } - this.action(localRootSpanId, { actionName: 'exportDone' }) - this.inprogressExports.delete(localRootSpanId) - }) - } + private export(traceId: string): Promise { + return new Promise((resolve, reject) => { + const config = getActiveConfig() + if (!config) throw new Error('Config is undefined. This is a bug in the instrumentation logic') + const spans = this.traces[traceId] as unknown[] as ReadableSpan[] | undefined + if (spans) { + this.exporter.export(spans, (result) => { + if (result.code === ExportResultCode.SUCCESS) { + resolve() + } else { + console.log('exporting spans failed! ' + result.error) + reject(result.error) + } + }) + } else { + resolve() + } + }) } - onStart(span: Span, parentContext: Context): void { - const spanId = span.spanContext().spanId - const parentSpanId = trace.getSpan(parentContext)?.spanContext()?.spanId - const parentRootSpanId = parentSpanId ? this.localRootSpanLookup.get(parentSpanId) : undefined - const localRootSpanId = parentRootSpanId || spanId - this.localRootSpanLookup.set(spanId, localRootSpanId) - - this.action(localRootSpanId, { actionName: 'startSpan', span }) + onStart(span: Span, _parentContext: Context): void { + const traceId = span.spanContext().traceId + const spans = this.traces[traceId] || [] + spans.push(span) + this.traces[traceId] = spans } - onEnd(span: ReadableSpan): void { - const spanId = span.spanContext().spanId - const localRootSpanId = this.localRootSpanLookup.get(spanId) - if (localRootSpanId) { - const state = this.action(localRootSpanId, { actionName: 'endSpan', span }) - if (state.stateName === 'trace_complete') { - state.completedSpans.forEach((span) => { - this.localRootSpanLookup.delete(span.spanContext().spanId) - }) - this.export(localRootSpanId) - } - } + onEnd(_span: ReadableSpan): void { + //this space intentionally left blank } - async forceFlush(): Promise { - await Promise.allSettled(this.inprogressExports.values()) + forceFlush(traceId: string = ''): Promise { + return this.export(traceId) } async shutdown(): Promise {} diff --git a/src/tracer.ts b/src/tracer.ts index f643079..01ce6cc 100644 --- a/src/tracer.ts +++ b/src/tracer.ts @@ -8,6 +8,7 @@ import { Context, context as api_context, trace, + SpanContext, } from '@opentelemetry/api' import { sanitizeAttributes } from '@opentelemetry/core' import { Resource } from '@opentelemetry/resources' @@ -15,20 +16,36 @@ import { SpanProcessor, RandomIdGenerator, ReadableSpan, SamplingDecision } from import { SpanImpl } from './span.js' import { getActiveConfig } from './config.js' +import { TraceFlushableSpanProcessor } from './types.js' + +enum NewTraceFlags { + RANDOM_TRACE_ID_SET = 2, + RANDOM_TRACE_ID_UNSET = 0, +} + +type NewTraceFlagValues = NewTraceFlags.RANDOM_TRACE_ID_SET | NewTraceFlags.RANDOM_TRACE_ID_UNSET + +const idGenerator: RandomIdGenerator = new RandomIdGenerator() let withNextSpanAttributes: Attributes +function getFlagAt(flagSequence: number, position: number): number { + return ((flagSequence >> (position - 1)) & 1) * position +} + export class WorkerTracer implements Tracer { - private readonly _spanProcessors: SpanProcessor[] + private readonly spanProcessors: TraceFlushableSpanProcessor[] private readonly resource: Resource - private readonly idGenerator: RandomIdGenerator = new RandomIdGenerator() constructor(spanProcessors: SpanProcessor[], resource: Resource) { - this._spanProcessors = spanProcessors + this.spanProcessors = spanProcessors this.resource = resource } - get spanProcessors() { - return this._spanProcessors + async forceFlush(traceId?: string) { + const promises = this.spanProcessors.map(async (spanProcessor) => { + await spanProcessor.forceFlush(traceId) + }) + await Promise.allSettled(promises) } addToResource(extra: Resource) { @@ -39,31 +56,31 @@ export class WorkerTracer implements Tracer { if (options.root) { context = trace.deleteSpan(context) } - const parentSpan = trace.getSpan(context) - const parentSpanContext = parentSpan?.spanContext() - const hasParentContext = parentSpanContext && trace.isSpanContextValid(parentSpanContext) - - const traceId = hasParentContext ? parentSpanContext.traceId : this.idGenerator.generateTraceId() - const spanKind = options.kind || SpanKind.INTERNAL - const sanitisedAttrs = sanitizeAttributes(options.attributes) const config = getActiveConfig() if (!config) throw new Error('Config is undefined. This is a bug in the instrumentation logic') + const parentSpanContext = trace.getSpan(context)?.spanContext() + const { traceId, randomTraceFlag } = getTraceInfo(parentSpanContext) + + const spanKind = options.kind || SpanKind.INTERNAL + const sanitisedAttrs = sanitizeAttributes(options.attributes) const sampler = config.sampling.headSampler const samplingDecision = sampler.shouldSample(context, traceId, name, spanKind, sanitisedAttrs, []) const { decision, traceState, attributes: attrs } = samplingDecision - const attributes = Object.assign({}, sanitisedAttrs, attrs, withNextSpanAttributes) + const attributes = Object.assign({}, options.attributes, attrs, withNextSpanAttributes) withNextSpanAttributes = {} - const spanId = this.idGenerator.generateSpanId() - const parentSpanId = hasParentContext ? parentSpanContext.spanId : undefined - const traceFlags = decision === SamplingDecision.RECORD_AND_SAMPLED ? TraceFlags.SAMPLED : TraceFlags.NONE + const spanId = idGenerator.generateSpanId() + const parentSpanId = parentSpanContext?.spanId + + const sampleFlag = decision === SamplingDecision.RECORD_AND_SAMPLED ? TraceFlags.SAMPLED : TraceFlags.NONE + const traceFlags = sampleFlag + randomTraceFlag const spanContext = { traceId, spanId, traceFlags, traceState } const span = new SpanImpl({ - attributes, + attributes: sanitizeAttributes(attributes), name, onEnd: (span) => { this.spanProcessors.forEach((sp) => { @@ -107,3 +124,12 @@ export class WorkerTracer implements Tracer { export function withNextSpan(attrs: Attributes) { withNextSpanAttributes = Object.assign({}, withNextSpanAttributes, attrs) } + +function getTraceInfo(parentSpanContext?: SpanContext): { traceId: string; randomTraceFlag: NewTraceFlagValues } { + if (parentSpanContext && trace.isSpanContextValid(parentSpanContext)) { + const { traceId, traceFlags } = parentSpanContext + return { traceId, randomTraceFlag: getFlagAt(traceFlags, 2) } + } else { + return { traceId: idGenerator.generateTraceId(), randomTraceFlag: NewTraceFlags.RANDOM_TRACE_ID_SET } + } +} diff --git a/src/types.ts b/src/types.ts index ebd61e3..8326278 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,13 +1,31 @@ -import { TextMapPropagator } from '@opentelemetry/api' +import { Attributes, Context, SpanOptions, TextMapPropagator, Span } from '@opentelemetry/api' import { ReadableSpan, Sampler, SpanExporter, SpanProcessor } from '@opentelemetry/sdk-trace-base' import { OTLPExporterConfig } from './exporter.js' import { FetchHandlerConfig, FetcherConfig } from './instrumentation/fetch.js' import { TailSampleFn } from './sampling.js' +export type OrPromise = T | Promise + export type PostProcessorFn = (spans: ReadableSpan[]) => ReadableSpan[] export type ExporterConfig = OTLPExporterConfig | SpanExporter +export interface InitialSpanInfo { + name: string + options: SpanOptions + context?: Context +} + +export interface HandlerInstrumentation { + getInitialSpanInfo: (trigger: T) => InitialSpanInfo + getAttributesFromResult?: (result: Awaited) => Attributes + instrumentTrigger?: (trigger: T) => T + executionSucces?: (span: Span, trigger: T, result: Awaited) => void + executionFailed?: (span: Span, trigger: T, error?: any) => void +} + +export type TraceFlushableSpanProcessor = SpanProcessor & { forceFlush: (traceId?: string) => Promise } + export interface HandlerConfig { fetch?: FetchHandlerConfig }