Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start instrumenting for streaming tail workers #3323

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions samples/tail-workers/tail.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ export default {
tail(traces) {
console.log(traces[0].logs);
},
tailStream() {
return {};
tailStream(...args) {
console.log(...args);
return (...args) => {
console.log(...args);
};
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewers: ignore this and the below worker.js for now

},
};
2 changes: 2 additions & 0 deletions samples/tail-workers/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
export default {
async fetch(req, env) {
console.log('hello to the tail worker!');
reportError('boom');
reportError(new Error('test'));
return new Response("Hello World\n");
}
};
44 changes: 24 additions & 20 deletions src/workerd/api/hibernatable-web-socket.c++
Original file line number Diff line number Diff line change
Expand Up @@ -75,30 +75,34 @@ kj::Promise<WorkerInterface::CustomEvent::Result> HibernatableWebSocketCustomEve

auto eventParameters = consumeParams();

KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
tracing::HibernatableWebSocketEventInfo::Type type =
[&]() -> tracing::HibernatableWebSocketEventInfo::Type {
KJ_SWITCH_ONEOF(eventParameters.eventType) {
KJ_CASE_ONEOF(_, HibernatableSocketParams::Text) {
return tracing::HibernatableWebSocketEventInfo::Message{};
}
KJ_CASE_ONEOF(data, HibernatableSocketParams::Data) {
return tracing::HibernatableWebSocketEventInfo::Message{};
}
KJ_CASE_ONEOF(close, HibernatableSocketParams::Close) {
return tracing::HibernatableWebSocketEventInfo::Close{
.code = close.code, .wasClean = close.wasClean};
}
KJ_CASE_ONEOF(_, HibernatableSocketParams::Error) {
return tracing::HibernatableWebSocketEventInfo::Error{};
}
auto getType = [&]() -> tracing::HibernatableWebSocketEventInfo::Type {
KJ_SWITCH_ONEOF(eventParameters.eventType) {
KJ_CASE_ONEOF(_, HibernatableSocketParams::Text) {
return tracing::HibernatableWebSocketEventInfo::Message{};
}
KJ_UNREACHABLE;
}();
KJ_CASE_ONEOF(data, HibernatableSocketParams::Data) {
return tracing::HibernatableWebSocketEventInfo::Message{};
}
KJ_CASE_ONEOF(close, HibernatableSocketParams::Close) {
return tracing::HibernatableWebSocketEventInfo::Close{
.code = close.code, .wasClean = close.wasClean};
}
KJ_CASE_ONEOF(_, HibernatableSocketParams::Error) {
return tracing::HibernatableWebSocketEventInfo::Error{};
}
}
KJ_UNREACHABLE;
};

t.setEventInfo(context.now(), tracing::HibernatableWebSocketEventInfo(kj::mv(type)));
KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
t.setEventInfo(context.now(), tracing::HibernatableWebSocketEventInfo(getType()));
}

context.getMetrics().reportTailEvent(context, [&] {
return tracing::Onset(
tracing::HibernatableWebSocketEventInfo(getType()), tracing::Onset::WorkerInfo{}, kj::none);
});

try {
co_await context.run(
[entrypointName = entrypointName, &context, eventParameters = kj::mv(eventParameters),
Expand Down
7 changes: 6 additions & 1 deletion src/workerd/api/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -532,9 +532,14 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
}

KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
t.setEventInfo(context.now(), tracing::QueueEventInfo(kj::mv(queueName), batchSize));
t.setEventInfo(context.now(), tracing::QueueEventInfo(kj::str(queueName), batchSize));
}

context.getMetrics().reportTailEvent(context, [&] {
return tracing::Onset(tracing::QueueEventInfo(kj::mv(queueName), batchSize),
tracing::Onset::WorkerInfo{}, kj::none);
});
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reviewers: The getWorkerTracer() block above is for the old style tail workers. The reportTailEvent is for streaming tail workers


// Create a custom refcounted type for holding the queueEvent so that we can pass it to the
// waitUntil'ed callback safely without worrying about whether this coroutine gets canceled.
struct QueueEventHolder: public kj::Refcounted {
Expand Down
4 changes: 4 additions & 0 deletions src/workerd/api/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,10 @@ kj::Promise<void> sendTracesToExportedHandler(kj::Own<IoContext::IncomingRequest
t.setEventInfo(context.now(), tracing::TraceEventInfo(traces));
}

metrics.reportTailEvent(context, [&] {
return tracing::Onset(tracing::TraceEventInfo(traces), tracing::Onset::WorkerInfo{}, kj::none);
});

auto nonEmptyTraces = kj::Vector<kj::Own<Trace>>(kj::size(traces));
for (auto& trace: traces) {
if (trace->eventInfo != kj::none) {
Expand Down
4 changes: 4 additions & 0 deletions src/workerd/api/worker-rpc.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1739,6 +1739,10 @@ class EntrypointJsRpcTarget final: public JsRpcTargetBase {
KJ_IF_SOME(t, tracer) {
t->setEventInfo(ioctx.now(), tracing::JsRpcEventInfo(kj::str(methodName)));
}
ioctx.getMetrics().reportTailEvent(ioctx, [&] {
return tracing::Onset(
tracing::JsRpcEventInfo(kj::str(methodName)), tracing::Onset::WorkerInfo{}, kj::none);
});
}
};

Expand Down
5 changes: 3 additions & 2 deletions src/workerd/io/trace-stream.c++
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,7 @@ class TailStreamHandler final: public TailStreamTargetBase {
// Take the received set of events and dispatch them to the correct handler.

v8::Local<v8::Value> h = handler.getHandle(js);
v8::LocalVector<v8::Value> returnValues(js.v8Isolate, events.size());
v8::LocalVector<v8::Value> returnValues(js.v8Isolate);
StringCache stringCache;

if (h->IsFunction()) {
Expand Down Expand Up @@ -836,7 +836,8 @@ class TailStreamEntrypoint final: public TailStreamTargetBase {

return ioContext.awaitJs(js,
js.toPromise(result).then(js,
ioContext.addFunctor([&results, &ioContext](jsg::Lock& js, jsg::Value value) {
ioContext.addFunctor(
[results = kj::mv(results), &ioContext](jsg::Lock& js, jsg::Value value) mutable {
// The value here can be one of a function, an object, or undefined.
// Any value other than these will result in a warning but will otherwise
// be treated like undefined.
Expand Down
13 changes: 11 additions & 2 deletions src/workerd/io/worker-entrypoint.c++
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,6 @@ kj::Promise<void> WorkerEntrypoint::request(kj::HttpMethod method,
tracing::FetchEventInfo(method, kj::str(url), kj::mv(cfJson), kj::mv(traceHeadersArray)));
}

// TODO(streaming-tail-workers): Instrument properly
context.getMetrics().reportTailEvent(context, [&] {
return tracing::Onset(tracing::FetchEventInfo(method, kj::str(url), kj::str("{}"), nullptr),
tracing::Onset::WorkerInfo{}, kj::none);
Expand Down Expand Up @@ -501,11 +500,17 @@ kj::Promise<WorkerInterface::ScheduledResult> WorkerEntrypoint::runScheduled(
// calling context->drain(). We don't ever send scheduled events to actors. If we do, we'll have
// to think more about this.

double eventTime = (scheduledTime - kj::UNIX_EPOCH) / kj::MILLISECONDS;

KJ_IF_SOME(t, context.getWorkerTracer()) {
double eventTime = (scheduledTime - kj::UNIX_EPOCH) / kj::MILLISECONDS;
t.setEventInfo(context.now(), tracing::ScheduledEventInfo(eventTime, kj::str(cron)));
}

context.getMetrics().reportTailEvent(context, [&] {
return tracing::Onset(tracing::ScheduledEventInfo(eventTime, kj::str(cron)),
tracing::Onset::WorkerInfo{}, kj::none);
});

// Scheduled handlers run entirely in waitUntil() tasks.
context.addWaitUntil(context.run(
[scheduledTime, cron, entrypointName = entrypointName, props = kj::mv(props), &context,
Expand Down Expand Up @@ -563,6 +568,10 @@ kj::Promise<WorkerInterface::AlarmResult> WorkerEntrypoint::runAlarmImpl(
KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
t.setEventInfo(context.now(), tracing::AlarmEventInfo(scheduledTime));
}
context.getMetrics().reportTailEvent(context, [&] {
return tracing::Onset(
tracing::AlarmEventInfo(scheduledTime), tracing::Onset::WorkerInfo{}, kj::none);
});

auto scheduleAlarmResult = co_await actor.scheduleAlarm(scheduledTime);
KJ_SWITCH_ONEOF(scheduleAlarmResult) {
Expand Down
27 changes: 27 additions & 0 deletions src/workerd/io/worker.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1026,6 +1026,18 @@ Worker::Isolate::Isolate(kj::Own<Api> apiParam,
addExceptionToTrace(js, ioContext, tracer, UncaughtExceptionSource::REQUEST_HANDLER,
error, api->getErrorInterfaceTypeHandler(js));
}

ioContext.getMetrics().reportTailEvent(ioContext, [&] {
KJ_IF_SOME(obj, error.tryCast<jsg::JsObject>()) {
auto name = obj.get(js, "name"_kj);
auto message = obj.get(js, "message"_kj);
auto stack = obj.get(js, "stack"_kj);
return tracing::Mark(tracing::Exception(
ioContext.now(), kj::str(name), kj::str(message), kj::str(stack)));
}
return tracing::Mark(
tracing::Exception(ioContext.now(), kj::str(), kj::str(error), kj::none));
});
}

KJ_IF_SOME(i, impl->inspector) {
Expand Down Expand Up @@ -1865,6 +1877,9 @@ void Worker::handleLog(jsg::Lock& js,
auto timestamp = ioContext.now();
tracer.addLog(timestamp, level, message());
}

ioContext.getMetrics().reportTailEvent(
ioContext, [&] { return tracing::Mark(tracing::Log(ioContext.now(), level, message())); });
}

if (consoleMode == ConsoleMode::INSPECTOR_ONLY) {
Expand Down Expand Up @@ -2066,6 +2081,18 @@ void Worker::Lock::logUncaughtException(
worker.getIsolate().getApi().getErrorInterfaceTypeHandler(*this));
});
}

ioContext.getMetrics().reportTailEvent(ioContext, [&] {
KJ_IF_SOME(obj, exception.tryCast<jsg::JsObject>()) {
auto name = obj.get(*this, "name"_kj);
auto message = obj.get(*this, "message"_kj);
auto stack = obj.get(*this, "stack"_kj);
return tracing::Mark(
tracing::Exception(ioContext.now(), kj::str(name), kj::str(message), kj::str(stack)));
}
return tracing::Mark(
tracing::Exception(ioContext.now(), kj::str(), kj::str(exception), kj::none));
});
}

KJ_IF_SOME(i, worker.script->isolate->impl->inspector) {
Expand Down
22 changes: 11 additions & 11 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1492,7 +1492,7 @@ struct TailStreamWriterState {

struct Closed {};

kj::OneOf<Pending, kj::Array<Active>, Closed> inner;
kj::OneOf<Pending, kj::Array<kj::Own<Active>>, Closed> inner;
kj::TaskSet& waitUntilTasks;

TailStreamWriterState(Pending pending, kj::TaskSet& waitUntilTasks)
Expand All @@ -1502,12 +1502,12 @@ struct TailStreamWriterState {

void reportImpl(tracing::TailEvent&& event) {
// In reportImpl, our inner state must be active.
auto& actives = KJ_ASSERT_NONNULL(inner.tryGet<kj::Array<Active>>());
auto& actives = KJ_ASSERT_NONNULL(inner.tryGet<kj::Array<kj::Own<Active>>>());

// We only care about sessions that are currently active.
kj::Vector<Active> alive(actives.size());
kj::Vector<kj::Own<Active>> alive(actives.size());
for (auto& active: actives) {
if (active.capability != kj::none) {
if (active->capability != kj::none) {
alive.add(kj::mv(active));
}
}
Expand All @@ -1520,10 +1520,10 @@ struct TailStreamWriterState {
}

// Deliver the event to the queue and make sure we are processing.
for (Active& active: alive) {
active.queue.push_back(event.clone());
if (!active.pumping) {
waitUntilTasks.add(pump(active));
for (auto& active: alive) {
active->queue.push_back(event.clone());
if (!active->pumping) {
waitUntilTasks.add(pump(*active));
}
}

Expand Down Expand Up @@ -1618,13 +1618,13 @@ kj::Maybe<kj::Own<tracing::TailStreamWriter>> initializeTailStreamWriter(
ioContext.addTask(
wi->customEvent(kj::mv(customEvent)).attach(kj::mv(wi)).then([](auto&&) {
}, [](kj::Exception&&) {}));
return TailStreamWriterState::Active{
return kj::heap<TailStreamWriterState::Active>({
.capability = kj::mv(result),
};
});
};
state->reportImpl(kj::mv(event));
}
KJ_CASE_ONEOF(active, kj::Array<TailStreamWriterState::Active>) {
KJ_CASE_ONEOF(active, kj::Array<kj::Own<TailStreamWriterState::Active>>) {
// Event cannot be a onset, which should have been validated by the writer.
KJ_ASSERT(!event.event.is<tracing::Onset>(), "Only the first event can be an onset");
auto final = event.event.is<tracing::Outcome>() || event.event.is<tracing::Hibernate>();
Expand Down
Loading