diff --git a/src/workerd/io/tracer.c++ b/src/workerd/io/tracer.c++ index 333fddb13f1..014120b861c 100644 --- a/src/workerd/io/tracer.c++ +++ b/src/workerd/io/tracer.c++ @@ -97,7 +97,7 @@ kj::Own PipelineTracer::makeWorkerTracer(PipelineLogLevel pipeline executionModel, kj::mv(durableObjectId)); traces.add(kj::addRef(*trace)); return kj::refcounted( - addRefToThis(), kj::mv(trace), pipelineLogLevel, kj::mv(maybeTailStreamWriter)); + addRefToThis(), kj::mv(trace), pipelineLogLevel, kj::mv(maybeTailStreamWriter), buffered); } void PipelineTracer::addTrace(rpc::Trace::Reader reader) { @@ -111,29 +111,29 @@ void PipelineTracer::addTailStreamWriter(kj::Own&& wr WorkerTracer::WorkerTracer(kj::Rc parentPipeline, kj::Own trace, PipelineLogLevel pipelineLogLevel, - kj::Maybe> maybeTailStreamWriter) + kj::Maybe> maybeTailStreamWriter, + bool buffered) : pipelineLogLevel(pipelineLogLevel), trace(kj::mv(trace)), parentPipeline(kj::mv(parentPipeline)), - maybeTailStreamWriter(kj::mv(maybeTailStreamWriter)) {} - -WorkerTracer::WorkerTracer(PipelineLogLevel pipelineLogLevel, ExecutionModel executionModel) - : pipelineLogLevel(pipelineLogLevel), - trace(kj::refcounted( - kj::none, kj::none, kj::none, kj::none, kj::none, nullptr, kj::none, executionModel)) {} + maybeTailStreamWriter(kj::mv(maybeTailStreamWriter)), + buffered(buffered) { + // If logLevel is none, we should not be creating an STW (we don't provide automated tracing in + // that case, so the tail stream writer is redundant), and we should have BTW tracers available + // (otherwise we should have skipped setting up a WorkerTracer entirely). + if (pipelineLogLevel == PipelineLogLevel::NONE) { + KJ_ASSERT(maybeTailStreamWriter == kj::none); + KJ_ASSERT(buffered); + } +} WorkerTracer::~WorkerTracer() noexcept(false) { // Report the outcome event, which should have been delivered by now. - // Do not attempt to report an outcome event if logging is disabled, as with other event types. - if (pipelineLogLevel == PipelineLogLevel::NONE) { - return; - } - // Report the outcome event if STWs are present. All worker events need to call setEventInfo at // the start of the invocation to submit the onset event before any other tail events. - KJ_IF_SOME(writer, maybeTailStreamWriter) { - KJ_IF_SOME(spanContext, topLevelInvocationSpanContext) { + KJ_IF_SOME(spanContext, topLevelInvocationSpanContext) { + KJ_IF_SOME(writer, maybeTailStreamWriter) { if (isPredictableModeForTest()) { writer->report(spanContext, tracing::Outcome(trace->outcome, 0 * kj::MILLISECONDS, 0 * kj::MILLISECONDS), @@ -142,18 +142,18 @@ WorkerTracer::~WorkerTracer() noexcept(false) { writer->report(spanContext, tracing::Outcome(trace->outcome, trace->cpuTime, trace->wallTime), completeTime); } - } else { - // If no span context is available, we have a streaming tail worker set up but shut down the - // worker tracer without ever sending an Onset event. In that case we either failed to set up - // the Onset properly (indicating a bug – all event types are required to report an Onset at - // the start – although this is more likely to manifest as a "Tail stream onset was not - // reported" error) or we created a WorkerInterface with WorkerTracer without ever invoking it - // (which is not incorrect behavior, but likely indicates inefficient code that sets up - // WorkerInterfaces and then ends up not using it due to an error/incorrect parameters; such - // error checking should be done beforehand to avoid unused allocations). Report such cases. - LOG_ERROR_PERIODICALLY( - "destructed WorkerTracer with STW without reporting Onset event", kj::getStackTrace()); } + } else { + // If no span context is available, we have a streaming tail worker set up but shut down the + // worker tracer without ever sending an Onset event. In that case we either failed to set up + // the Onset properly (indicating a bug – all event types are required to report an Onset at + // the start – although this is more likely to manifest as a "Tail stream onset was not + // reported" error) or we created a WorkerInterface with WorkerTracer without ever invoking it + // (which is not incorrect behavior, but likely indicates inefficient code that sets up + // WorkerInterfaces and then ends up not using it due to an error/incorrect parameters; such + // error checking should be done beforehand to avoid unused allocations). Report such cases. + LOG_ERROR_PERIODICALLY( + "destructed WorkerTracer with STW without reporting Onset event", kj::getStackTrace()); } }; @@ -168,14 +168,18 @@ void WorkerTracer::addLog(const tracing::InvocationSpanContext& context, return; } - // TODO(streaming-tail): Here we add the log to the trace object and the tail stream writer, if - // available. If the given worker stage is only tailed by a streaming tail worker, adding the log - // to the buffered trace object is not needed; this will be addressed in a future refactor. KJ_IF_SOME(writer, maybeTailStreamWriter) { + // fast path: if there are no BTWs present, we can send the log directly without needing to copy + // it. + if (!buffered && message.size() < MAX_TRACE_BYTES) { + writer->report(context, {tracing::Log(timestamp, logLevel, kj::mv(message))}, timestamp); + return; + } + // If message is too big on its own, truncate it. writer->report(context, - {(tracing::Log(timestamp, logLevel, - kj::str(message.first(kj::min(message.size(), MAX_TRACE_BYTES)))))}, + {tracing::Log( + timestamp, logLevel, kj::str(message.first(kj::min(message.size(), MAX_TRACE_BYTES))))}, timestamp); } @@ -196,14 +200,9 @@ void WorkerTracer::addLog(const tracing::InvocationSpanContext& context, } void WorkerTracer::addSpan(tracing::CompleteSpan&& span) { - // This is where we'll actually encode the span. - if (pipelineLogLevel == PipelineLogLevel::NONE) { - return; - } - - // Note: spans are not available in the buffered tail worker, so we don't need an exceededSpanLimit - // variable for it and it can't cause truncation. - auto& tailStreamWriter = KJ_UNWRAP_OR_RETURN(maybeTailStreamWriter); + // This is where we'll actually encode the span. This function should never be invoked if STW is + // inactive as span tracing is only used in STW. + auto& tailStreamWriter = KJ_ASSERT_NONNULL(maybeTailStreamWriter); adjustSpanTime(span); @@ -269,6 +268,12 @@ void WorkerTracer::addException(const tracing::InvocationSpanContext& context, messageSize += s.size(); } KJ_IF_SOME(writer, maybeTailStreamWriter) { + // STW fast path: no BTW, no truncation + if (!buffered && messageSize < MAX_TRACE_BYTES) { + writer->report(context, + {tracing::Exception(timestamp, kj::mv(name), kj::mv(message), kj::mv(stack))}, timestamp); + return; + } auto maybeTruncatedName = name.first(kj::min(name.size(), MAX_TRACE_BYTES)); auto maybeTruncatedMessage = message.first(kj::min(message.size(), MAX_TRACE_BYTES - maybeTruncatedName.size())); @@ -346,6 +351,9 @@ void WorkerTracer::setEventInfoInternal( const tracing::InvocationSpanContext& context, kj::Date timestamp, tracing::EventInfo&& info) { KJ_ASSERT(trace->eventInfo == kj::none, "tracer can only be used for a single event"); + // Always set up span context to indicate that the Onset has been set. + this->topLevelInvocationSpanContext = context.clone(); + // TODO(someday): For now, we're using logLevel == none as a hint to avoid doing anything // expensive while tracing. We may eventually want separate configuration for event info vs. // logs. @@ -356,7 +364,6 @@ void WorkerTracer::setEventInfoInternal( } trace->eventTimestamp = timestamp; - this->topLevelInvocationSpanContext = context.clone(); size_t eventSize = 0; KJ_SWITCH_ONEOF(info) { @@ -519,7 +526,7 @@ void WorkerTracer::setReturn( } KJ_IF_SOME(writer, maybeTailStreamWriter) { - auto& spanContext = KJ_UNWRAP_OR_RETURN(topLevelInvocationSpanContext); + auto& spanContext = KJ_ASSERT_NONNULL(topLevelInvocationSpanContext); // Fall back to weak IoContext if no timestamp is available writer->report(spanContext, diff --git a/src/workerd/io/tracer.h b/src/workerd/io/tracer.h index 2cddfe59ab8..69ec6963c11 100644 --- a/src/workerd/io/tracer.h +++ b/src/workerd/io/tracer.h @@ -48,7 +48,7 @@ class WorkerTracer; class PipelineTracer: public kj::Refcounted { public: // Creates a pipeline tracer (with a possible parent). - explicit PipelineTracer() = default; + PipelineTracer(bool buffered): buffered(buffered) {}; virtual ~PipelineTracer() noexcept(false); KJ_DISALLOW_COPY_AND_MOVE(PipelineTracer); @@ -79,6 +79,15 @@ class PipelineTracer: public kj::Refcounted { void addTailStreamWriter(kj::Own&& writer); + inline bool hasBuffered() { + return buffered; + } + + protected: + // Indicates that buffered tail workers may be present. There may also be streaming tail workers + // present at the same time. + bool buffered; + private: kj::Vector> traces; kj::Maybe>>>> completeFulfiller; @@ -174,8 +183,8 @@ class WorkerTracer final: public BaseTracer { explicit WorkerTracer(kj::Rc parentPipeline, kj::Own trace, PipelineLogLevel pipelineLogLevel, - kj::Maybe> maybeTailStreamWriter); - explicit WorkerTracer(PipelineLogLevel pipelineLogLevel, ExecutionModel executionModel); + kj::Maybe> maybeTailStreamWriter, + bool buffered); virtual ~WorkerTracer() noexcept(false); KJ_DISALLOW_COPY_AND_MOVE(WorkerTracer); @@ -233,5 +242,8 @@ class WorkerTracer final: public BaseTracer { kj::Maybe> parentPipeline; kj::Maybe> maybeTailStreamWriter; + // Whether any BTWs are present. Note that this serves as an optimization – it is legal for BTWs + // to be absent even when this is true. + bool buffered; }; } // namespace workerd diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index 9c828cde37d..e4783ade10f 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -2167,7 +2167,7 @@ class Server::WorkerService final: public Service, if (legacyTailWorkers.size() > 0 || streamingTailWorkers.size() > 0) { // Setting up buffered tail workers support, but only if we actually have tail workers // configured. - auto tracer = kj::rc(); + auto tracer = kj::rc(legacyTailWorkers.size() > 0); auto executionModel = actor == kj::none ? ExecutionModel::STATELESS : ExecutionModel::DURABLE_OBJECT; auto tailStreamWriter = tracing::initializeTailStreamWriter(