Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Core logic refactor #182

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
6 changes: 6 additions & 0 deletions examples/worker/src/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ const handleDO = async (request: Request, env: Env): Promise<Response> => {
return await stub.fetch('https://does-not-exist.com/blah')
}

const handleNoAsync = (request: Request): Response => {
return new Response('Very quick hello!')
}

const handleRest = async (request: Request, env: Env, ctx: ExecutionContext): Promise<Response> => {
trace.getActiveSpan()?.setAttribute('http.route', '/*')
withNextSpan({ destination: 'cloudflare' })
Expand All @@ -38,6 +42,8 @@ export default {
return handleDO(request, env)
} else if (pathname === '/error') {
throw new Error('You asked for it!')
} else if (pathname === '/noasync') {
return handleNoAsync(request)
} else {
return handleRest(request, env, ctx)
}
Expand Down
9 changes: 2 additions & 7 deletions src/instrumentation/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,8 @@ export async function exportSpans(tracker?: PromiseTracker) {
const tracer = trace.getTracer('export')
if (tracer instanceof WorkerTracer) {
await scheduler.wait(1)
if (tracker) {
await tracker.wait()
}
const promises = tracer.spanProcessors.map(async (spanProcessor) => {
await spanProcessor.forceFlush()
})
await Promise.allSettled(promises)
await tracker?.wait()
await tracer.forceFlush()
} else {
console.error('The global tracer is not of type WorkerTracer and can not export spans')
}
Expand Down
66 changes: 16 additions & 50 deletions src/instrumentation/email.ts
Original file line number Diff line number Diff line change
@@ -1,39 +1,10 @@
import { setConfig, type Initialiser } from '../config'
import { wrap } from '../wrap'
import { exportSpans, proxyExecutionContext } from './common'
import { context as api_context, Exception, SpanKind, type SpanOptions, trace } from '@opentelemetry/api'
import { instrumentEnv } from './env'
import { versionAttributes } from './version'
import { SpanKind, type SpanOptions } from '@opentelemetry/api'
import {
ATTR_FAAS_TRIGGER,
ATTR_MESSAGING_DESTINATION_NAME,
ATTR_RPC_MESSAGE_ID,
} from '@opentelemetry/semantic-conventions/incubating'

type EmailHandler = EmailExportedHandler
export type EmailHandlerArgs = Parameters<EmailHandler>

export function createEmailHandler(emailFn: EmailHandler, initialiser: Initialiser): EmailHandler {
const emailHandler: ProxyHandler<EmailHandler> = {
async apply(target, _thisArg, argArray: Parameters<EmailHandler>): Promise<void> {
const [message, orig_env, orig_ctx] = argArray
const config = initialiser(orig_env as Record<string, unknown>, message)
const env = instrumentEnv(orig_env as Record<string, unknown>)
const { ctx, tracker } = proxyExecutionContext(orig_ctx)
const context = setConfig(config)

try {
const args: EmailHandlerArgs = [message, env, ctx]
return await api_context.with(context, executeEmailHandler, undefined, target, args)
} catch (error) {
throw error
} finally {
orig_ctx.waitUntil(exportSpans(tracker))
}
},
}
return wrap(emailFn, emailHandler)
}
import { HandlerInstrumentation, OrPromise } from '../types'

/**
* Converts the message headers into a record ready to be injected
Expand All @@ -50,27 +21,22 @@ function headerAttributes(message: { headers: Headers }): Record<string, unknown
return Object.fromEntries([...message.headers].map(([key, value]) => [`email.header.${key}`, value] as const))
}

async function executeEmailHandler(emailFn: EmailHandler, [message, env, ctx]: EmailHandlerArgs): Promise<void> {
const tracer = trace.getTracer('emailHandler')
const options = {
attributes: {
export const emailInstrumentation: HandlerInstrumentation<ForwardableEmailMessage, OrPromise<void>> = {
getInitialSpanInfo: (message) => {
const attributes = {
[ATTR_FAAS_TRIGGER]: 'other',
[ATTR_RPC_MESSAGE_ID]: message.headers.get('Message-Id') ?? undefined,
[ATTR_MESSAGING_DESTINATION_NAME]: message.to,
},
kind: SpanKind.CONSUMER,
} satisfies SpanOptions
Object.assign(options.attributes!, headerAttributes(message), versionAttributes(env))
const promise = tracer.startActiveSpan(`emailHandler ${message.to}`, options, async (span) => {
try {
const result = await emailFn(message, env, ctx)
span.end()
return result
} catch (error) {
span.recordException(error as Exception)
span.end()
throw error
}
})
return promise
Object.assign(attributes, headerAttributes(message))
const options = {
attributes,
kind: SpanKind.CONSUMER,
} satisfies SpanOptions

return {
name: `emailHandler ${message.to}`,
options,
}
},
}
4 changes: 2 additions & 2 deletions src/instrumentation/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const isD1Database = (item?: unknown): item is D1Database => {
return !!(item as D1Database)?.exec && !!(item as D1Database)?.prepare
}

const instrumentEnv = (env: Record<string, unknown>): Record<string, unknown> => {
const instrumentEnv = <E extends Record<string, unknown>>(env: E): E => {
const envHandler: ProxyHandler<Record<string, unknown>> = {
get: (target, prop, receiver) => {
const item = Reflect.get(target, prop, receiver)
Expand All @@ -66,7 +66,7 @@ const instrumentEnv = (env: Record<string, unknown>): Record<string, unknown> =>
}
},
}
return wrap(env, envHandler)
return wrap(env, envHandler) as E
}

export { instrumentEnv }
110 changes: 34 additions & 76 deletions src/instrumentation/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,15 @@ import {
propagation,
context as api_context,
Attributes,
Exception,
Context,
SpanStatusCode,
Span,
} from '@opentelemetry/api'
import { Initialiser, getActiveConfig, setConfig } from '../config.js'
import { getActiveConfig } from '../config.js'
import { wrap } from '../wrap.js'
import { instrumentEnv } from './env.js'
import { exportSpans, proxyExecutionContext } from './common.js'
import { ResolvedTraceConfig } from '../types.js'
import { HandlerInstrumentation, OrPromise, ResolvedTraceConfig } from '../types.js'
import { ReadableSpan } from '@opentelemetry/sdk-trace-base'
import { versionAttributes } from './version.js'

type IncomingRequest = Parameters<ExportedHandlerFetchHandler>[0]

export type IncludeTraceContextFn = (request: Request) => boolean
export interface FetcherConfig {
Expand All @@ -32,9 +30,6 @@ export interface FetchHandlerConfig {
acceptTraceContext?: boolean | AcceptTraceContextFn
}

type FetchHandler = ExportedHandlerFetchHandler
type FetchHandlerArgs = Parameters<FetchHandler>

const netKeysFromCF = new Set(['colo', 'country', 'request_priority', 'tls_cipher', 'tls_version', 'asn', 'tcp_rtt'])

const camelToSnakeCase = (s: string): string => {
Expand Down Expand Up @@ -125,75 +120,38 @@ function getParentContextFromRequest(request: Request) {
return acceptTraceContext ? getParentContextFromHeaders(request.headers) : api_context.active()
}

export function waitUntilTrace(fn: () => Promise<any>): Promise<void> {
const tracer = trace.getTracer('waitUntil')
return tracer.startActiveSpan('waitUntil', async (span) => {
await fn()
span.end()
})
}

let cold_start = true
export function executeFetchHandler(fetchFn: FetchHandler, [request, env, ctx]: FetchHandlerArgs): Promise<Response> {
const spanContext = getParentContextFromRequest(request)

const tracer = trace.getTracer('fetchHandler')
const attributes = {
['faas.trigger']: 'http',
['faas.coldstart']: cold_start,
['faas.invocation_id']: request.headers.get('cf-ray') ?? undefined,
function updateSpanNameOnRoute(span: Span, request: IncomingRequest) {
const readable = span as unknown as ReadableSpan
if (readable.attributes['http.route']) {
const method = request.method.toUpperCase()
span.updateName(`${method} ${readable.attributes['http.route']}`)
}
cold_start = false
Object.assign(attributes, gatherRequestAttributes(request))
Object.assign(attributes, gatherIncomingCfAttributes(request))
Object.assign(attributes, versionAttributes(env))
const options: SpanOptions = {
attributes,
kind: SpanKind.SERVER,
}

const method = request.method.toUpperCase()
const promise = tracer.startActiveSpan(`fetchHandler ${method}`, options, spanContext, async (span) => {
const readable = span as unknown as ReadableSpan
try {
const response = await fetchFn(request, env, ctx)
span.setAttributes(gatherResponseAttributes(response))

return response
} catch (error) {
span.recordException(error as Exception)
span.setStatus({ code: SpanStatusCode.ERROR })
throw error
} finally {
if (readable.attributes['http.route']) {
span.updateName(`fetchHandler ${method} ${readable.attributes['http.route']}`)
}
span.end()
}
})
return promise
}

export function createFetchHandler(fetchFn: FetchHandler, initialiser: Initialiser) {
const fetchHandler: ProxyHandler<FetchHandler> = {
apply: async (target, _thisArg, argArray: Parameters<FetchHandler>): Promise<Response> => {
const [request, orig_env, orig_ctx] = argArray
const config = initialiser(orig_env as Record<string, unknown>, request)
const env = instrumentEnv(orig_env as Record<string, unknown>)
const { ctx, tracker } = proxyExecutionContext(orig_ctx)
const context = setConfig(config)

try {
const args: FetchHandlerArgs = [request, env, ctx]
return await api_context.with(context, executeFetchHandler, undefined, target, args)
} catch (error) {
throw error
} finally {
orig_ctx.waitUntil(exportSpans(tracker))
}
},
}
return wrap(fetchFn, fetchHandler)
export const fetchInstrumentation: HandlerInstrumentation<IncomingRequest, OrPromise<Response>> = {
getInitialSpanInfo: (request) => {
const spanContext = getParentContextFromRequest(request)
const attributes = {
['faas.trigger']: 'http',
['faas.invocation_id']: request.headers.get('cf-ray') ?? undefined,
}
Object.assign(attributes, gatherRequestAttributes(request))
Object.assign(attributes, gatherIncomingCfAttributes(request))
const method = request.method.toUpperCase()
return {
name: `fetchHandler ${method}`,
options: {
attributes,
kind: SpanKind.SERVER,
},
context: spanContext,
}
},
getAttributesFromResult: (response) => {
return gatherResponseAttributes(response)
},
executionSucces: updateSpanNameOnRoute,
executionFailed: updateSpanNameOnRoute,
}

type getFetchConfig = (config: ResolvedTraceConfig) => FetcherConfig
Expand Down
Loading