diff --git a/pip/pip-446.md b/pip/pip-446.md new file mode 100644 index 0000000000000..fc21970b42929 --- /dev/null +++ b/pip/pip-446.md @@ -0,0 +1,524 @@ +# PIP-446: Support Native OpenTelemetry Tracing in Pulsar Java Client + +# Background knowledge + +## OpenTelemetry + +OpenTelemetry is a vendor-neutral observability framework that provides APIs, SDKs, and tools for collecting distributed traces, metrics, and logs. It has become the industry standard for observability, adopted by major cloud providers and APM vendors. + +## Distributed Tracing + +Distributed tracing tracks requests as they flow through distributed systems. A **trace** represents the entire journey of a request, composed of multiple **spans**. Each span represents a single operation (e.g., sending a message, processing a request). Spans form parent-child relationships, creating a trace tree that visualizes request flow across services. + +## W3C Trace Context + +The W3C Trace Context specification defines a standard way to propagate trace context across service boundaries using HTTP headers or message properties: +- `traceparent`: Contains trace ID, span ID, and trace flags +- `tracestate`: Contains vendor-specific trace information + +## Pulsar Interceptors + +Pulsar client interceptors allow users to intercept and modify messages before sending (producer) or after receiving (consumer). They provide hooks for cross-cutting concerns like tracing, metrics, and security. + +## Cumulative Acknowledgment + +In Pulsar, cumulative acknowledgment allows consumers to acknowledge all messages up to a specific message ID in one operation. This is only available for Failover and Exclusive subscription types where message order is guaranteed. When a message is cumulatively acknowledged, all previous messages on that partition are implicitly acknowledged. + +# Motivation + +Currently, the Pulsar Java client lacks native support for distributed tracing with OpenTelemetry. While the OpenTelemetry Java Agent can automatically instrument Pulsar clients, there are several limitations: + +1. **Agent-only approach**: Users must use the Java Agent, which may not be suitable for all deployment scenarios (e.g., serverless, embedded applications) +2. **Limited control**: Users cannot easily customize tracing behavior or selectively enable tracing for specific producers/consumers +3. **Missing first-class support**: Other Apache projects (Kafka, Camel) provide native OpenTelemetry support, making Pulsar less competitive +4. **Complex setup**: Users must understand agent configuration and classpath setup + +Native OpenTelemetry support would: +- Provide a programmatic API for tracing configuration +- Enable selective tracing without agent overhead +- Improve observability in production systems +- Align Pulsar with modern observability practices +- Make it easier to diagnose performance issues and message flow + +# Goals + +## In Scope + +1. **Producer tracing**: Create spans for message send operations with automatic trace context injection +2. **Consumer tracing**: Create spans for message receive/process operations with automatic trace context extraction +3. **Trace context propagation**: Inject and extract W3C Trace Context via message properties +4. **Programmatic API**: Enable tracing via `ClientBuilder` API +5. **Interceptor-based design**: Implement using Pulsar's existing interceptor mechanism +6. **Cumulative acknowledgment support**: Properly handle span lifecycle for cumulative acks +7. **Multi-topic consumer support**: Track spans across multiple topic partitions +8. **Agent compatibility**: Ensure compatibility with OpenTelemetry Java Agent +9. **Semantic conventions**: Follow OpenTelemetry messaging semantic conventions +10. **Zero overhead when disabled**: No performance impact when tracing is not enabled + +## Out of Scope + +1. **Broker-side tracing**: This PIP focuses on client-side tracing only +2. **Metrics collection**: Only distributed tracing, not OpenTelemetry metrics +3. **Log correlation**: Only tracing integration, not log integration +4. **Custom propagators**: Only W3C Trace Context format supported initially +5. **Transaction tracing**: Tracing for Pulsar transactions (future enhancement) +6. **Schema registry tracing**: Tracing for schema operations +7. **Admin API tracing**: Tracing for admin operations + +# High Level Design + +The implementation adds native OpenTelemetry tracing to the Pulsar Java client through: + +## 1. New Interfaces + +Two new interfaces enable attaching tracing spans to messages and message IDs: +- `TraceableMessage`: Allows messages to carry OpenTelemetry spans +- `TraceableMessageId`: Allows message IDs to carry OpenTelemetry spans + +## 2. Interceptors + +Two interceptors implement the tracing logic: +- `OpenTelemetryProducerInterceptor`: Creates producer spans and injects trace context +- `OpenTelemetryConsumerInterceptor`: Creates consumer spans and extracts trace context + +## 3. Configuration API + +Users can enable tracing through the `ClientBuilder`: +```java +PulsarClient client = PulsarClient.builder() + .serviceUrl("pulsar://localhost:6650") + .openTelemetry(openTelemetry) + .enableTracing(true) + .build(); +``` + +When enabled, the client automatically adds tracing interceptors to all producers and consumers. + +## 4. Trace Context Propagation + +The implementation uses W3C Trace Context format to propagate trace context: +- **Producer**: Injects `traceparent` and `tracestate` into message properties +- **Consumer**: Extracts trace context from message properties + +This enables end-to-end tracing across services that communicate via Pulsar. + +## 5. Span Lifecycle Management + +The implementation carefully manages span lifecycle: +- **Producer spans**: Start on send, end on broker acknowledgment (or error) +- **Consumer spans**: Start on receive, end on acknowledgment (or negative ack) +- **Cumulative ack**: Ends all spans for messages up to the acknowledged position + +## 6. Multi-Topic Support + +For multi-topic consumers, the implementation maintains separate span maps per topic partition to correctly handle cumulative acknowledgments across multiple topics. + +# Detailed Design + +## Design & Implementation Details + +### 1. Traceable Interfaces + +**TraceableMessage interface** (`pulsar-client-api`): +```java +public interface TraceableMessage { + void setTracingSpan(io.opentelemetry.api.trace.Span span); + io.opentelemetry.api.trace.Span getTracingSpan(); +} +``` + +**TraceableMessageId interface** (`pulsar-client-api`): +```java +public interface TraceableMessageId { + void setTracingSpan(io.opentelemetry.api.trace.Span span); + io.opentelemetry.api.trace.Span getTracingSpan(); +} +``` + +Both `MessageImpl` and `MessageIdImpl` implement these interfaces by adding a transient field to store the span without affecting serialization. + +### 2. OpenTelemetryProducerInterceptor + +Located in `pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/` + +**Key methods**: +- `beforeSend()`: Creates a producer span and injects trace context into message properties +- `onSendAcknowledgement()`: Ends the span successfully and records message ID +- `onPartitionsChange()`: No-op (not needed for producer) + +**Span creation**: +- Uses `TracingContext.createProducerSpan()` to create a PRODUCER span +- Span name: `send {topic}` +- Attributes: `messaging.system`, `messaging.destination.name`, `messaging.operation.name` +- Records `messaging.message.id` when broker acknowledges + +**Trace context injection**: +- Uses OpenTelemetry `TextMapPropagator` to inject context into message properties +- Injects `traceparent` and `tracestate` headers +- Only injects if not already present (allows compatibility with Java Agent) + +### 3. OpenTelemetryConsumerInterceptor + +Located in `pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/` + +**Key methods**: +- `beforeConsume()`: Extracts trace context and creates a consumer span +- `onAcknowledge()`: Ends the span for individual ack with OK status +- `onAcknowledgeCumulative()`: Ends all spans up to the acknowledged position with OK status +- `onNegativeAcksSend()`: Ends the span with OK status and adds an event (not an error) +- `onAckTimeoutSend()`: Ends the span with OK status and adds an event (not an error) + +**Span creation**: +- Uses `TracingContext.createConsumerSpan()` to create a CONSUMER span +- Span name: `process {topic}` +- Attributes: `messaging.system`, `messaging.destination.name`, `messaging.operation.name`, `messaging.message.id` +- Links to producer span via extracted trace context + +**Cumulative acknowledgment handling**: +- Maintains `Map>` for Failover/Exclusive subscriptions +- Outer map key: topic partition (from `TopicMessageId.getOwnerTopic()`) +- Inner map: sorted message IDs to spans for efficient range operations +- When cumulative ack occurs, removes and ends all spans up to the acknowledged position +- Zero overhead for Shared/Key_Shared subscriptions (map is null) + +**Multi-topic support**: +- Nested map structure handles messages from multiple topic partitions +- Each topic partition maintains independent sorted span map +- Cumulative ack only affects spans from the same topic partition + +**Acknowledgment type tracking**: +- Every consumer span includes a `messaging.pulsar.acknowledgment.type` attribute indicating how it was completed: + - `"acknowledge"`: Normal individual acknowledgment + - `"cumulative_acknowledge"`: Cumulative acknowledgment + - `"negative_acknowledge"`: Message negatively acknowledged (will be redelivered) + - `"ack_timeout"`: Acknowledgment timeout (will be redelivered) +- Negative ack and ack timeout end spans with **OK status** (not ERROR) because they are normal Pulsar message flow +- This design separates messaging operations (which succeed) from application logic failures (which should be tracked in separate child spans) +- When a message is redelivered, a new consumer span is created for the new delivery attempt +- The attribute allows users to query and analyze retry patterns, timeout issues, and acknowledgment types in their tracing backend + +### 4. TracingContext Utility + +Provides helper methods for span creation and management: +- `createProducerSpan()`: Creates a producer span with correct attributes +- `createConsumerSpan()`: Creates a consumer span with trace context extraction +- `endSpan()`: Safely ends a span +- `endSpan(span, exception)`: Ends a span with error status +- `isValid()`: Checks if a span is valid and recording + +### 5. TracingProducerBuilder + +Helper for manual trace context injection (advanced use cases): +- `injectContext()`: Injects trace context into message properties +- `extractFromHeaders()`: Extracts trace context from HTTP headers + +### 6. ClientBuilder Integration + +**New API methods** (`ClientBuilder`): +```java +ClientBuilder enableTracing(boolean tracingEnabled); +``` + +**Implementation** (`ClientBuilderImpl`): +- `enableTracing()` stores `tracingEnabled` flag in `ClientConfigurationData` +- Passes configuration to `PulsarClientImpl` + +**Automatic interceptor addition** (`ConsumerBuilderImpl`, `ProducerBuilderImpl`): +- Checks if tracing is enabled in client configuration +- Automatically adds appropriate interceptor if enabled +- User-provided interceptors are preserved and combined + +### 7. InstrumentProvider Enhancement + +Enhanced to provide OpenTelemetry instance: +```java +public OpenTelemetry getOpenTelemetry(); +``` + +Falls back to `GlobalOpenTelemetry.get()` if not explicitly configured. + +### 8. Implementation Classes + +**Modified classes**: +- `MessageImpl`: Implements `TraceableMessage` +- `MessageIdImpl`: Implements `TraceableMessageId` +- `TopicMessageImpl`: Delegates `TraceableMessage` methods to wrapped message +- `TopicMessageIdImpl`: Delegates `TraceableMessageId` methods to wrapped message ID +- `ConsumerBase`: Provides `getSubscriptionType()` for interceptors +- `ConsumerBuilderImpl`: Auto-adds consumer interceptor when enabled +- `ProducerBuilderImpl`: Auto-adds producer interceptor when enabled +- `PulsarClientImpl`: Stores and provides OpenTelemetry configuration +- `ClientConfigurationData`: Stores OpenTelemetry and enableTracing settings + +### 9. Span Attributes + +Following [OpenTelemetry messaging semantic conventions](https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/): + +**Producer spans**: +- `messaging.system`: "pulsar" +- `messaging.destination.name`: Topic name +- `messaging.operation.name`: "send" +- `messaging.message.id`: Message ID (added on ack) + +**Consumer spans**: +- `messaging.system`: "pulsar" +- `messaging.destination.name`: Topic name +- `messaging.destination.subscription.name`: Subscription name +- `messaging.operation.name`: "process" +- `messaging.message.id`: Message ID +- `messaging.pulsar.acknowledgment.type`: Custom attribute indicating how the message was acknowledged: + - `"acknowledge"`: Individual acknowledgment + - `"cumulative_acknowledge"`: Cumulative acknowledgment + - `"negative_acknowledge"`: Negative acknowledgment (message will be redelivered) + - `"ack_timeout"`: Acknowledgment timeout (message will be redelivered) + +**Rationale for `messaging.pulsar.acknowledgment.type` attribute**: +- Provides visibility into message acknowledgment patterns +- Enables querying for retry scenarios (negative ack, timeout) +- Helps identify timeout configuration issues +- Allows analysis of cumulative vs. individual acknowledgment usage +- Uses attribute (not event) for better queryability in tracing backends + +## Public-facing Changes + +### Public API + +**New interfaces** (`org.apache.pulsar.client.api`): + +```java +public interface TraceableMessage { + void setTracingSpan(io.opentelemetry.api.trace.Span span); + io.opentelemetry.api.trace.Span getTracingSpan(); +} + +public interface TraceableMessageId { + void setTracingSpan(io.opentelemetry.api.trace.Span span); + io.opentelemetry.api.trace.Span getTracingSpan(); +} +``` + +**ClientBuilder new methods**: + +```java +/** + * Enable or disable automatic tracing. + * When enabled, uses GlobalOpenTelemetry.get() if no OpenTelemetry instance is set. + * Tracing interceptors are automatically added to all producers and consumers. + */ +ClientBuilder enableTracing(boolean tracingEnabled); +``` + +**New public classes** (`org.apache.pulsar.client.impl.tracing`): +- `OpenTelemetryProducerInterceptor`: Producer interceptor for tracing +- `OpenTelemetryConsumerInterceptor`: Consumer interceptor for tracing +- `TracingContext`: Utility methods for span creation +- `TracingProducerBuilder`: Helper for manual trace context injection + +**Modified classes**: +- `Message` interface: Now extends `TraceableMessage` (via implementations) +- `MessageId` interface: Now extends `TraceableMessageId` (via implementations) + +### Binary protocol + +No changes to binary protocol. Trace context is propagated via existing message properties mechanism. + +### Configuration + +**New ClientBuilder options**: +- `enableTracing(boolean)`: Enable automatic tracing + +**Example configuration**: + +```java +// Option 1: Explicit OpenTelemetry instance with tracing enabled +OpenTelemetry otel = OpenTelemetrySdk.builder() + .setTracerProvider(tracerProvider) + .build(); + +PulsarClient client = PulsarClient.builder() + .serviceUrl("pulsar://localhost:6650") + .openTelemetry(otel) + .enableTracing(true) + .build(); + +// Option 2: Use GlobalOpenTelemetry +PulsarClient client = PulsarClient.builder() + .serviceUrl("pulsar://localhost:6650") + .enableTracing(true) // Uses GlobalOpenTelemetry + .build(); + +// Option 3: Manual interceptor (advanced) +Producer producer = client.newProducer(Schema.STRING) + .topic("my-topic") + .intercept(new OpenTelemetryProducerInterceptor()) + .create(); +``` + +### CLI + +No CLI changes. This is a client library feature. + +### Metrics + +This PIP focuses on distributed tracing, not metrics. No new metrics are added. + +# Monitoring + +Users can monitor tracing effectiveness through their OpenTelemetry backend (e.g., Jaeger, Zipkin, Grafana Tempo): + +## Key Monitoring Aspects + +1. **Span creation rate**: Monitor the rate of producer and consumer spans to ensure tracing is active +2. **Trace completeness**: Verify traces show complete paths from producer to consumer +3. **Error spans**: Monitor spans with ERROR status to identify failures +4. **Span duration**: Analyze span durations to identify performance bottlenecks: + - Long producer spans may indicate slow broker acknowledgments + - Long consumer spans may indicate slow message processing + +## Recommended Dashboards + +1. **Producer Performance**: + - Track `send` span durations by topic + - Alert on high error rates + - Monitor throughput (spans per second) + +2. **Consumer Performance**: + - Track `process` span durations by topic + - Monitor acknowledgment latency + - Alert on negative acknowledgment rates + +3. **End-to-End Latency**: + - Visualize complete traces from producer to consumer + - Identify bottlenecks in the message flow + - Track latency percentiles (p50, p95, p99) + +# Security Considerations + +This feature does not introduce new security concerns: + +1. **Trace context in properties**: Trace context (`traceparent`, `tracestate`) is stored in message properties, which are part of the existing message structure. No additional authentication or authorization is needed. + +2. **No sensitive data**: Trace context only contains trace IDs, span IDs, and trace flags. No user data or sensitive information is included. + +3. **OpenTelemetry instance**: The `OpenTelemetry` instance is provided by the application and follows the same security model as other client configuration. + +4. **Multi-tenancy**: Tracing respects existing Pulsar multi-tenancy boundaries. Trace context is scoped to individual messages and does not leak across tenants. + +5. **No new endpoints**: This feature does not add new HTTP endpoints or protocol commands. + +# Backward & Forward Compatibility + +## Upgrade + +**Fully backward compatible**. No breaking changes: + +1. **Default behavior unchanged**: Tracing is disabled by default. Existing applications work without modification. +2. **Serialization compatible**: New interfaces use `transient` fields that don't affect serialization. +3. **Message format unchanged**: Trace context uses existing message properties mechanism. +4. **Interceptor compatible**: Works alongside existing user interceptors. + +**Upgrade steps**: +1. Upgrade client library to version containing this feature +2. Optionally enable tracing via `ClientBuilder.enableTracing(true)` +3. Configure OpenTelemetry SDK and exporters if using programmatic configuration + +## Downgrade / Rollback + +**Fully compatible with downgrade**: + +1. **Message compatibility**: Messages sent with trace context can be received by older clients (they ignore unknown properties) +2. **No schema changes**: No changes to message schema or protocol +3. **Graceful degradation**: Older clients simply don't create spans but can still process messages + +**Rollback steps**: +1. Downgrade client library to previous version +2. Tracing will stop, but message flow continues normally +3. Existing traces may be incomplete if some clients are downgraded + +## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations + +**No impact on geo-replication**: + +1. **Trace context preserved**: Message properties (including trace context) are replicated across clusters +2. **Mixed versions**: Clusters can run different client versions. Trace context propagates through older brokers without issues. +3. **No broker changes**: This is a client-only feature. Broker version doesn't matter. + +**Considerations**: +- Traces may span multiple clusters, providing visibility into geo-replication latency +- If some clusters use tracing and others don't, traces will have gaps but remain functional +- Trace context continues across cluster boundaries via message properties + +# Alternatives + +## Alternative 1: OpenTelemetry Java Agent Only + +**Approach**: Only support tracing via OpenTelemetry Java Agent automatic instrumentation. + +**Rejected because**: +- Requires agent deployment, not suitable for all environments +- No programmatic control over tracing behavior +- Harder to debug and customize +- Not aligned with other Apache projects that provide native support + +## Alternative 2: Custom Tracing API + +**Approach**: Design a custom Pulsar-specific tracing API instead of using OpenTelemetry. + +**Rejected because**: +- OpenTelemetry is the industry standard +- Custom API would require additional exporters and integrations +- Would not work with existing OpenTelemetry ecosystem +- Increases maintenance burden + +## Alternative 3: Span Storage in Message Properties + +**Approach**: Store spans directly in message properties instead of using separate `TraceableMessage` interface. + +**Rejected because**: +- Spans are not serializable +- Would require serializing span context for every message (overhead) +- Less clean API design +- Harder to integrate with cumulative acknowledgment + +## Alternative 4: Per-Message Acknowledgment Only + +**Approach**: Only support individual acknowledgment, not cumulative acknowledgment. + +**Rejected because**: +- Cumulative acknowledgment is a key Pulsar feature +- Would leave spans unclosed until timeout +- Poor user experience for Failover/Exclusive subscriptions +- Incomplete tracing for common use cases + +# General Notes + +## Performance Considerations + +The implementation is designed for minimal overhead: + +1. **Zero overhead when disabled**: No performance impact when tracing is not enabled +2. **Lazy initialization**: Span maps only created for Failover/Exclusive subscriptions +3. **Efficient data structures**: `ConcurrentSkipListMap` for O(log n) range operations +4. **Transient fields**: Spans not serialized with messages +5. **Batching**: OpenTelemetry SDK batches span exports by default + +## Testing + +Comprehensive testing includes: + +1. **Unit tests**: `OpenTelemetryTracingTest` verifies span creation, attributes, and context propagation +2. **Example tests**: `TracingExampleTest` demonstrates usage patterns +3. **Integration tests**: Manual testing with Jaeger backend +4. **Compatibility tests**: Verified with OpenTelemetry Java Agent + +## Documentation + +User documentation provided in: +- `pulsar-client/TRACING.md`: Comprehensive tracing guide +- Javadoc comments on all public APIs +- Code examples in test classes + +# Links + +* Mailing List discussion thread: https://lists.apache.org/thread/61f5zxsc8ydcbjllwbp2x6y5g4f1xj5m +* Mailing List voting thread: https://lists.apache.org/thread/yv455tyyyrpr3j6o1lzx8k9bq4d0ko5m