-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
Copy pathobs_report_sender.go
127 lines (107 loc) · 4.1 KB
/
obs_report_sender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal"
import (
"context"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
"go.opentelemetry.io/collector/pipeline"
)
const (
// spanNameSep is duplicate between receiver and exporter.
spanNameSep = "/"
// ExporterKey used to identify exporters in metrics and traces.
ExporterKey = "exporter"
// DataTypeKey used to identify the data type in the queue size metric.
DataTypeKey = "data_type"
// ItemsSent used to track number of items sent by exporters.
ItemsSent = "items.sent"
// ItemsFailed used to track number of items that failed to be sent by exporters.
ItemsFailed = "items.failed"
)
type obsReportSender[K request.Request] struct {
component.StartFunc
component.ShutdownFunc
spanName string
tracer trace.Tracer
signal pipeline.Signal
tb *metadata.TelemetryBuilder
spanAttrs trace.SpanStartEventOption
metricAttr metric.MeasurementOption
next sender.Sender[K]
}
func newObsReportSender[K request.Request](set exporter.Settings, signal pipeline.Signal, next sender.Sender[K]) (sender.Sender[K], error) {
telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings)
if err != nil {
return nil, err
}
idStr := set.ID.String()
expAttr := attribute.String(ExporterKey, idStr)
or := &obsReportSender[K]{
spanName: ExporterKey + spanNameSep + idStr + spanNameSep + signal.String(),
tracer: metadata.Tracer(set.TelemetrySettings),
signal: signal,
tb: telemetryBuilder,
spanAttrs: trace.WithAttributes(expAttr, attribute.String(DataTypeKey, signal.String())),
metricAttr: metric.WithAttributeSet(attribute.NewSet(expAttr)),
next: next,
}
return or, nil
}
func (ors *obsReportSender[K]) Send(ctx context.Context, req K) error {
// Have to read the number of items before sending the request since the request can
// be modified by the downstream components like the batcher.
c := ors.startOp(ctx)
items := req.ItemsCount()
// Forward the data to the next consumer (this pusher is the next).
err := ors.next.Send(c, req)
ors.endOp(c, items, err)
return err
}
// StartOp creates the span used to trace the operation. Returning
// the updated context and the created span.
func (ors *obsReportSender[K]) startOp(ctx context.Context) context.Context {
ctx, _ = ors.tracer.Start(ctx, ors.spanName, ors.spanAttrs)
return ctx
}
// EndOp completes the export operation that was started with StartOp.
func (ors *obsReportSender[K]) endOp(ctx context.Context, numLogRecords int, err error) {
numSent, numFailedToSend := toNumItems(numLogRecords, err)
// No metrics recorded for profiles.
switch ors.signal {
case pipeline.SignalTraces:
ors.tb.RecordExporterSentSpans(ctx, numSent, ors.metricAttr)
ors.tb.RecordExporterSendFailedSpans(ctx, numFailedToSend, ors.metricAttr)
case pipeline.SignalMetrics:
ors.tb.RecordExporterSentMetricPoints(ctx, numSent, ors.metricAttr)
ors.tb.RecordExporterSendFailedMetricPoints(ctx, numFailedToSend, ors.metricAttr)
case pipeline.SignalLogs:
ors.tb.RecordExporterSentLogRecords(ctx, numSent, ors.metricAttr)
ors.tb.RecordExporterSendFailedLogRecords(ctx, numFailedToSend, ors.metricAttr)
}
span := trace.SpanFromContext(ctx)
defer span.End()
// End the span according to errors.
if span.IsRecording() {
span.SetAttributes(
attribute.Int64(ItemsSent, numSent),
attribute.Int64(ItemsFailed, numFailedToSend),
)
if err != nil {
span.SetStatus(codes.Error, err.Error())
}
}
}
func toNumItems(numExportedItems int, err error) (int64, int64) {
if err != nil {
return 0, int64(numExportedItems)
}
return int64(numExportedItems), 0
}