Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 48 additions & 41 deletions src/workerd/io/tracer.c++
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ kj::Own<WorkerTracer> PipelineTracer::makeWorkerTracer(PipelineLogLevel pipeline
executionModel, kj::mv(durableObjectId));
traces.add(kj::addRef(*trace));
return kj::refcounted<WorkerTracer>(
addRefToThis(), kj::mv(trace), pipelineLogLevel, kj::mv(maybeTailStreamWriter));
addRefToThis(), kj::mv(trace), pipelineLogLevel, kj::mv(maybeTailStreamWriter), buffered);
}

void PipelineTracer::addTrace(rpc::Trace::Reader reader) {
Expand All @@ -111,29 +111,29 @@ void PipelineTracer::addTailStreamWriter(kj::Own<tracing::TailStreamWriter>&& wr
WorkerTracer::WorkerTracer(kj::Rc<PipelineTracer> parentPipeline,
kj::Own<Trace> trace,
PipelineLogLevel pipelineLogLevel,
kj::Maybe<kj::Own<tracing::TailStreamWriter>> maybeTailStreamWriter)
kj::Maybe<kj::Own<tracing::TailStreamWriter>> maybeTailStreamWriter,
bool buffered)
: pipelineLogLevel(pipelineLogLevel),
trace(kj::mv(trace)),
parentPipeline(kj::mv(parentPipeline)),
maybeTailStreamWriter(kj::mv(maybeTailStreamWriter)) {}

WorkerTracer::WorkerTracer(PipelineLogLevel pipelineLogLevel, ExecutionModel executionModel)
: pipelineLogLevel(pipelineLogLevel),
trace(kj::refcounted<Trace>(
kj::none, kj::none, kj::none, kj::none, kj::none, nullptr, kj::none, executionModel)) {}
maybeTailStreamWriter(kj::mv(maybeTailStreamWriter)),
buffered(buffered) {
// If logLevel is none, we should not be creating an STW (we don't provide automated tracing in
// that case, so the tail stream writer is redundant), and we should have BTW tracers available
// (otherwise we should have skipped setting up a WorkerTracer entirely).
if (pipelineLogLevel == PipelineLogLevel::NONE) {
KJ_ASSERT(maybeTailStreamWriter == kj::none);
KJ_ASSERT(buffered);
}
}

WorkerTracer::~WorkerTracer() noexcept(false) {
// Report the outcome event, which should have been delivered by now.

// Do not attempt to report an outcome event if logging is disabled, as with other event types.
if (pipelineLogLevel == PipelineLogLevel::NONE) {
return;
}

// Report the outcome event if STWs are present. All worker events need to call setEventInfo at
// the start of the invocation to submit the onset event before any other tail events.
KJ_IF_SOME(writer, maybeTailStreamWriter) {
KJ_IF_SOME(spanContext, topLevelInvocationSpanContext) {
KJ_IF_SOME(spanContext, topLevelInvocationSpanContext) {
KJ_IF_SOME(writer, maybeTailStreamWriter) {
if (isPredictableModeForTest()) {
writer->report(spanContext,
tracing::Outcome(trace->outcome, 0 * kj::MILLISECONDS, 0 * kj::MILLISECONDS),
Expand All @@ -142,18 +142,18 @@ WorkerTracer::~WorkerTracer() noexcept(false) {
writer->report(spanContext,
tracing::Outcome(trace->outcome, trace->cpuTime, trace->wallTime), completeTime);
}
} else {
// If no span context is available, we have a streaming tail worker set up but shut down the
// worker tracer without ever sending an Onset event. In that case we either failed to set up
// the Onset properly (indicating a bug – all event types are required to report an Onset at
// the start – although this is more likely to manifest as a "Tail stream onset was not
// reported" error) or we created a WorkerInterface with WorkerTracer without ever invoking it
// (which is not incorrect behavior, but likely indicates inefficient code that sets up
// WorkerInterfaces and then ends up not using it due to an error/incorrect parameters; such
// error checking should be done beforehand to avoid unused allocations). Report such cases.
LOG_ERROR_PERIODICALLY(
"destructed WorkerTracer with STW without reporting Onset event", kj::getStackTrace());
}
} else {
// If no span context is available, we have a streaming tail worker set up but shut down the
// worker tracer without ever sending an Onset event. In that case we either failed to set up
// the Onset properly (indicating a bug – all event types are required to report an Onset at
// the start – although this is more likely to manifest as a "Tail stream onset was not
// reported" error) or we created a WorkerInterface with WorkerTracer without ever invoking it
// (which is not incorrect behavior, but likely indicates inefficient code that sets up
// WorkerInterfaces and then ends up not using it due to an error/incorrect parameters; such
// error checking should be done beforehand to avoid unused allocations). Report such cases.
LOG_ERROR_PERIODICALLY(
"destructed WorkerTracer with STW without reporting Onset event", kj::getStackTrace());
}
};

Expand All @@ -168,14 +168,18 @@ void WorkerTracer::addLog(const tracing::InvocationSpanContext& context,
return;
}

// TODO(streaming-tail): Here we add the log to the trace object and the tail stream writer, if
// available. If the given worker stage is only tailed by a streaming tail worker, adding the log
// to the buffered trace object is not needed; this will be addressed in a future refactor.
KJ_IF_SOME(writer, maybeTailStreamWriter) {
// fast path: if there are no BTWs present, we can send the log directly without needing to copy
// it.
if (!buffered && message.size() < MAX_TRACE_BYTES) {
writer->report(context, {tracing::Log(timestamp, logLevel, kj::mv(message))}, timestamp);
return;
}

// If message is too big on its own, truncate it.
writer->report(context,
{(tracing::Log(timestamp, logLevel,
kj::str(message.first(kj::min(message.size(), MAX_TRACE_BYTES)))))},
{tracing::Log(
timestamp, logLevel, kj::str(message.first(kj::min(message.size(), MAX_TRACE_BYTES))))},
timestamp);
}

Expand All @@ -196,14 +200,9 @@ void WorkerTracer::addLog(const tracing::InvocationSpanContext& context,
}

void WorkerTracer::addSpan(tracing::CompleteSpan&& span) {
// This is where we'll actually encode the span.
if (pipelineLogLevel == PipelineLogLevel::NONE) {
return;
}

// Note: spans are not available in the buffered tail worker, so we don't need an exceededSpanLimit
// variable for it and it can't cause truncation.
auto& tailStreamWriter = KJ_UNWRAP_OR_RETURN(maybeTailStreamWriter);
// This is where we'll actually encode the span. This function should never be invoked if STW is
// inactive as span tracing is only used in STW.
auto& tailStreamWriter = KJ_ASSERT_NONNULL(maybeTailStreamWriter);

adjustSpanTime(span);

Expand Down Expand Up @@ -269,6 +268,12 @@ void WorkerTracer::addException(const tracing::InvocationSpanContext& context,
messageSize += s.size();
}
KJ_IF_SOME(writer, maybeTailStreamWriter) {
// STW fast path: no BTW, no truncation
if (!buffered && messageSize < MAX_TRACE_BYTES) {
writer->report(context,
{tracing::Exception(timestamp, kj::mv(name), kj::mv(message), kj::mv(stack))}, timestamp);
return;
}
auto maybeTruncatedName = name.first(kj::min(name.size(), MAX_TRACE_BYTES));
auto maybeTruncatedMessage =
message.first(kj::min(message.size(), MAX_TRACE_BYTES - maybeTruncatedName.size()));
Expand Down Expand Up @@ -346,6 +351,9 @@ void WorkerTracer::setEventInfoInternal(
const tracing::InvocationSpanContext& context, kj::Date timestamp, tracing::EventInfo&& info) {
KJ_ASSERT(trace->eventInfo == kj::none, "tracer can only be used for a single event");

// Always set up span context to indicate that the Onset has been set.
this->topLevelInvocationSpanContext = context.clone();

// TODO(someday): For now, we're using logLevel == none as a hint to avoid doing anything
// expensive while tracing. We may eventually want separate configuration for event info vs.
// logs.
Expand All @@ -356,7 +364,6 @@ void WorkerTracer::setEventInfoInternal(
}

trace->eventTimestamp = timestamp;
this->topLevelInvocationSpanContext = context.clone();

size_t eventSize = 0;
KJ_SWITCH_ONEOF(info) {
Expand Down Expand Up @@ -519,7 +526,7 @@ void WorkerTracer::setReturn(
}

KJ_IF_SOME(writer, maybeTailStreamWriter) {
auto& spanContext = KJ_UNWRAP_OR_RETURN(topLevelInvocationSpanContext);
auto& spanContext = KJ_ASSERT_NONNULL(topLevelInvocationSpanContext);

// Fall back to weak IoContext if no timestamp is available
writer->report(spanContext,
Expand Down
18 changes: 15 additions & 3 deletions src/workerd/io/tracer.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class WorkerTracer;
class PipelineTracer: public kj::Refcounted {
public:
// Creates a pipeline tracer (with a possible parent).
explicit PipelineTracer() = default;
PipelineTracer(bool buffered): buffered(buffered) {};
virtual ~PipelineTracer() noexcept(false);
KJ_DISALLOW_COPY_AND_MOVE(PipelineTracer);

Expand Down Expand Up @@ -79,6 +79,15 @@ class PipelineTracer: public kj::Refcounted {

void addTailStreamWriter(kj::Own<tracing::TailStreamWriter>&& writer);

inline bool hasBuffered() {
return buffered;
}

protected:
// Indicates that buffered tail workers may be present. There may also be streaming tail workers
// present at the same time.
bool buffered;

private:
kj::Vector<kj::Own<Trace>> traces;
kj::Maybe<kj::Own<kj::PromiseFulfiller<kj::Array<kj::Own<Trace>>>>> completeFulfiller;
Expand Down Expand Up @@ -174,8 +183,8 @@ class WorkerTracer final: public BaseTracer {
explicit WorkerTracer(kj::Rc<PipelineTracer> parentPipeline,
kj::Own<Trace> trace,
PipelineLogLevel pipelineLogLevel,
kj::Maybe<kj::Own<tracing::TailStreamWriter>> maybeTailStreamWriter);
explicit WorkerTracer(PipelineLogLevel pipelineLogLevel, ExecutionModel executionModel);
kj::Maybe<kj::Own<tracing::TailStreamWriter>> maybeTailStreamWriter,
bool buffered);
virtual ~WorkerTracer() noexcept(false);
KJ_DISALLOW_COPY_AND_MOVE(WorkerTracer);

Expand Down Expand Up @@ -233,5 +242,8 @@ class WorkerTracer final: public BaseTracer {
kj::Maybe<kj::Rc<PipelineTracer>> parentPipeline;

kj::Maybe<kj::Own<tracing::TailStreamWriter>> maybeTailStreamWriter;
// Whether any BTWs are present. Note that this serves as an optimization – it is legal for BTWs
// to be absent even when this is true.
bool buffered;
};
} // namespace workerd
2 changes: 1 addition & 1 deletion src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -2167,7 +2167,7 @@ class Server::WorkerService final: public Service,
if (legacyTailWorkers.size() > 0 || streamingTailWorkers.size() > 0) {
// Setting up buffered tail workers support, but only if we actually have tail workers
// configured.
auto tracer = kj::rc<PipelineTracer>();
auto tracer = kj::rc<PipelineTracer>(legacyTailWorkers.size() > 0);
auto executionModel =
actor == kj::none ? ExecutionModel::STATELESS : ExecutionModel::DURABLE_OBJECT;
auto tailStreamWriter = tracing::initializeTailStreamWriter(
Expand Down
Loading