feat: CloudEvents ingest compliance and add OpenTelemetry distributed tracing#3961
Open
kaio6fellipe wants to merge 18 commits into
Open
feat: CloudEvents ingest compliance and add OpenTelemetry distributed tracing#3961kaio6fellipe wants to merge 18 commits into
kaio6fellipe wants to merge 18 commits into
Conversation
mario-turno
approved these changes
Mar 30, 2026
bf28d85 to
f2b4be7
Compare
kaio6fellipe
added a commit
to kaio6fellipe/argo-events
that referenced
this pull request
Apr 9, 2026
…j#3983 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Author
|
Based on the latest commits (f73c529, a72037f, b63373b, 99099cf, 3175926, d98cdfd)
For a service/node graph vision, the relationship between the entities becomes quite clear.
I'm performing some local tests with this: https://github.com/kaio6fellipe/event-driven-bookinfo I know the argo-events documentation makes it clear that it's an "event-driven workflow automation framework," but apparently it's intended as an alternative to GCP Eventarc, AWS EventBridge, and Azure Event Grid. @eduardodbr , @whynowy , do you guys know if there's any reason why there aren't any use cases of this type? |
When an incoming HTTP request is a valid CloudEvent (binary or structured content mode), preserve the original attributes (id, source, type, subject, time) and extension attributes (e.g., traceparent) instead of discarding them and generating new values. - Add WithSource, WithType, WithSubject, WithTime, WithExtension, and WithCloudEvent Option constructors - Thread Options through webhook dispatch channel - Detect incoming CloudEvents via CE SDK HTTP binding in webhook handler - Add Extensions field to EventContext (protobuf field 8) - Preserve extensions in sensor convertEvent() Closes argoproj#2011 Related: argoproj#1015, argoproj#983 Signed-off-by: Kaio Fellipe <kaio6fellipe@gmail.com>
Add opt-in distributed tracing across the event pipeline (EventSource -> EventBus -> Sensor -> Trigger) using OpenTelemetry. - Create pkg/shared/tracing with InitTracer, SpanFromCloudEvent, and InjectTraceIntoCloudEvent using W3C Trace Context propagation - Add eventsource.publish span wrapping event bus publish - Add sensor.trigger span wrapping trigger execution - Initialize tracers in EventSource and Sensor pod entrypoints - Configuration via standard OTel env vars (OTEL_EXPORTER_OTLP_ENDPOINT) - Fully opt-in: no-op when endpoint is unset, zero performance impact Closes argoproj#1111 Signed-off-by: Kaio Fellipe <kaio6fellipe@gmail.com>
Signed-off-by: Kaio Fellipe <kaio6fellipe@gmail.com>
Signed-off-by: Kaio Fellipe <kaio6fellipe@gmail.com>
…EL_SERVICE_NAME Signed-off-by: Kaio Fellipe <kaio6fellipe@gmail.com>
…SUMER/CLIENT Signed-off-by: Kaio Fellipe <kaio6fellipe@gmail.com>
…kind classifier Signed-off-by: Kaio Fellipe <kaio6fellipe@gmail.com>
…essaging attributes Signed-off-by: Kaio Fellipe <kaio6fellipe@gmail.com>
…ice graph Add sensor.consume CONSUMER span in triggerOne() whose parent is the remote eventsource.publish PRODUCER span injected into CloudEvent extensions. This creates the eventsource -> sensor edge in the distributed trace service graph. Add extractSensorBusInfo helper (mirrors extractBusInfo in eventing.go) to read bus type and broker address from BusConfig for messaging attributes. Change sensor.trigger span from a plain internal span to a CLIENT span via tracing.StartClientSpan, adding server.address and http.request.method for HTTP triggers to satisfy OTel HTTP client semantic conventions. Signed-off-by: Kaio Fellipe <kaio6fellipe@gmail.com>
…bound webhooks Signed-off-by: Kaio Fellipe <kaio6fellipe@gmail.com>
…ODUCER/INTERNAL) Add TriggerTypeSpanKind to pkg/shared/tracing/messaging.go that inspects non-nil TriggerTemplate fields to return the appropriate OTel span kind: PRODUCER for messaging triggers (Kafka, NATS, Pulsar, AzureEventHubs, AzureServiceBus), INTERNAL for Log, and CLIENT for all outbound API triggers (HTTP, K8s, ArgoWorkflow, AWSLambda, CustomTrigger, Slack, OpenWhisk, Email). Update pkg/sensors/listener.go to use TriggerTypeSpanKind so the sensor.trigger span carries the correct kind, and extend the trigger-type-specific attribute block to add messaging.system and messaging.operation.type for producer triggers. Signed-off-by: Kaio Fellipe <kaio6fellipe@gmail.com>
Signed-off-by: Kaio Fellipe <kaio6fellipe@gmail.com>
… context Mirrors the existing WithHTTPHeaders option. When a Kafka record carries W3C trace context (traceparent/tracestate) in its headers, this option copies them into the CloudEvent extensions so the existing tracing.SpanFromCloudEvent extractor in eventing.go can pick them up as the parent span for the eventsource.publish PRODUCER span. Closes the producer-to-eventsource trace gap when used by the kafka source's dispatch call. Signed-off-by: Kaio Fellipe <kaio6fellipe@gmail.com>
Pass WithKafkaHeaders alongside WithID at both dispatch sites (consumerGroupConsumer and partitionConsumer) so traceparent/tracestate present on the Kafka record become CloudEvent extensions. The existing SpanFromCloudEvent in eventing.go then chains the eventsource.publish PRODUCER span under the upstream producer span, closing the trace gap between Kafka producers and the eventbus. Signed-off-by: Kaio Fellipe <kaio6fellipe@gmail.com>
… dispatch Inserts an eventsource.consume CONSUMER span in both processOne closures (consumerGroupConsumer and partitionConsumer). The span is parented to the upstream traceparent extracted from msg.Headers, then its own traceparent is re-injected into the headers map so WithKafkaHeaders + SpanFromCloudEvent in eventing.go make the existing eventsource.publish PRODUCER span a child of CONSUMER (instead of directly under upstream). Span carries OTel messaging semconv attributes: messaging.system=kafka, destination.name=<topic>, operation.type=receive, kafka.message.key, kafka.message.partition, kafka.message.offset. Errors from dispatch are recorded on the CONSUMER span via RecordError + SetStatus. Webhook and other source types are not affected — only the kafka source's processOne closures get the new span. Signed-off-by: Kaio Fellipe <kaio6fellipe@gmail.com>
…context Mirror of WithKafkaHeaders for the NATS EventSource path. Extracts traceparent/tracestate from nats.Header (looking up both lowercase keys, as written by OTel TextMapPropagator into a natsHeaderCarrier, and Title case, as emitted by textproto-helpers) and sets them as CloudEvent extensions so SpanFromCloudEvent in eventing.go can pick them up as the parent of the eventsource.publish span. nats.Header.Set is a plain map write (no MIMEHeader canonicalisation), so the lookup is case-sensitive on both forms — safe for either producer style. Signed-off-by: Kaio Fellipe <kaio6fellipe@gmail.com>
Pass NATS message headers through to dispatch via WithNATSHeaders so the existing SpanFromCloudEvent code in eventbus dispatch can adopt the upstream traceparent as the parent of the eventsource.publish PRODUCER span. Without this, Argo Events would start a fresh root span at every NATS EventSource and break the trace chain from the producer (e.g. natspub-driven applications) onward. Mirrors the equivalent kafka-side change (d394579). Single-line patch in the dispatch call inside the subscribe handler — all existing connection/auth/tls flow is unchanged. Signed-off-by: Kaio Fellipe <kaio6fellipe@gmail.com>
…dispatch Inserts an eventsource.consume CONSUMER span in the NATS subscribe handler. The span is parented to the upstream traceparent extracted from msg.Header (via OTel TextMapPropagator over a natsHeaderCarrier), then its own traceparent is re-injected into the header map so WithNATSHeaders + SpanFromCloudEvent in eventing.go make the existing eventsource.publish PRODUCER span a child of CONSUMER instead of directly under upstream. Span carries OTel messaging semconv attributes: messaging.system=nats, destination.name=<subject>, operation.type=receive. Errors from json.Marshal and dispatch are recorded on the CONSUMER span via RecordError + SetStatus. natsHeaderCarrier locally adapts nats.Header (which is a plain map[string][]string with no MIMEHeader canonicalisation) to propagation.TextMapCarrier — same pattern as our applications' natspub producer side, kept in lockstep so an OTel TraceContext propagator round-trips cleanly across both ends. Mirror of the equivalent kafka-side change (de7017c). Signed-off-by: Kaio Fellipe <kaio6fellipe@gmail.com>
7606d62 to
3860b54
Compare
5 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.


Summary
Add CloudEvents compliance and OpenTelemetry distributed tracing to Argo Events.
CloudEvents ingest compliance (Closes #2011, Related: #1015, #983)
Argo Events currently discards all incoming CloudEvent metadata and regenerates it with internal values. This PR preserves the original CloudEvent attributes (
id,source,type,subject,time) and extension attributes (e.g.,traceparent,tracestate) when an incoming HTTP request is a valid CloudEvent.WithCloudEventand individual attribute Option constructorsExtensionsfield toEventContextfor pipeline-wide extension propagationconvertEvent()OpenTelemetry Distributed Tracing (Closes #1111)
Add opt-in distributed tracing across the event pipeline (EventSource → EventBus → Sensor → Trigger) using OpenTelemetry with W3C Trace Context propagation.
pkg/shared/tracingpackage withInitTracer,SpanFromCloudEvent,InjectTraceIntoCloudEventeventsource.publishspan wrapping event bus publishsensor.triggerspan wrapping trigger execution with error recordingOTEL_EXPORTER_OTLP_ENDPOINT) — fully opt-in, no-op when unsetBackward Compatibility
EventContext.Extensionsis optional (omitempty) with next available protobuf field numberTests and behavior
The implementation was tested on a real GKE environment with Jaeger All-in-One as the trace backend. The test setup consists of three interconnected EventSource/Sensor pairs spanning two namespaces (
sandboxandplatform), exercising the full event pipeline including retries and dead-letter queue (DLQ) routing.Test environment
observabilitynamespace as OTLP collector + trace UIOTEL_EXPORTER_OTLP_ENDPOINTandOTEL_SERVICE_NAMEenv vars on EventSource/Sensor pod templatesTest scenario
A single HTTP POST triggers a chain across 3 EventSources and 3 Sensors:
another-webhook(EventSource) →another-sensor(Sensor) — forwards via HTTP triggerhelloworld-webhook(EventSource) →helloworld-http(Sensor) — processor endpoint is down, retries 3x, then fires DLQ triggerdlqueue-webhook(EventSource) →dlqueue-processor(Sensor) — processes the dead lettergraph TD A["<b>eventsource.publish</b><br/>another-webhook<br/>event.id: ba49ae95...<br/>47.0ms ✅"] B["<b>sensor.trigger</b><br/>another-sensor → another-sensor<br/>6.3ms ✅"] C["<b>eventsource.publish</b><br/>helloworld-webhook<br/>event.id: d14e2273...<br/>3.9ms ✅"] D["<b>sensor.trigger</b><br/>helloworld-http → helloworld-http<br/>31.9ms ❌ retry 1"] E["<b>sensor.trigger</b><br/>helloworld-http → helloworld-http<br/>0.8ms ❌ retry 2"] F["<b>sensor.trigger</b><br/>helloworld-http → helloworld-http<br/>0.7ms ❌ retry 3"] G["<b>sensor.trigger</b><br/>helloworld-http → dlq-http-trigger<br/>7.2ms ✅"] H["<b>eventsource.publish</b><br/>dlqueue-webhook<br/>event.id: 7ed81bf0...<br/>4.6ms ✅"] I["<b>sensor.trigger</b><br/>dlqueue-processor → dlqueue-processor<br/>2.6ms ✅"] A --> B B -->|"HTTP POST"| C C --> D C --> E C --> F C -->|"retries exhausted"| G G -->|"HTTP POST to DLQ"| H H --> I style A fill:#2d6a4f,color:#fff style B fill:#2d6a4f,color:#fff style C fill:#2d6a4f,color:#fff style D fill:#ae2012,color:#fff style E fill:#ae2012,color:#fff style F fill:#ae2012,color:#fff style G fill:#2d6a4f,color:#fff style H fill:#2d6a4f,color:#fff style I fill:#2d6a4f,color:#fffJaeger trace view
The entire chain appears as a single connected trace (
a613bf5b2cccbb628b67bacb729a0e46) with 9 spans across 5 services:Jaeger service map
The service dependency graph shows the flow between all instrumented services:
Argo Events resources
EventSource and Sensors:
Key observations
End-to-end trace propagation works — W3C
traceparentis injected by the HTTP trigger into outgoing requests and extracted by downstream webhook EventSources, linking all hops into a single trace.OTEL_SERVICE_NAMEis respected — Each EventSource/Sensor appears with its configured service name in Jaeger (e.g.,helloworld-webhook,dlqueue-processor) rather than the generic defaults.Retries are visible — Failed trigger attempts appear as individual
sensor.triggerspans withERRORstatus, making it easy to identify retry behavior and eventual DLQ routing.event.idchanges at each EventSource — This is correct per the CloudEvents spec. Each EventSource is an independent event producer and generates a new ID. The trace context (traceparent) is what links the hops, not the event ID.Zero impact when disabled — When
OTEL_EXPORTER_OTLP_ENDPOINTis not set, the OTel tracer is a no-op with no performance overhead.Checklist: