Skip to content

Commit dc0c40e

Browse files
authored
Add ability to process traces (#38486)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Adds ability to transform traces based on the schema target. <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes <!--Describe what testing was performed and which tests were added.--> #### Testing <!--Describe the documentation added.--> #### Documentation <!--Please delete paragraphs that you did not use before submitting.-->
1 parent ceeb462 commit dc0c40e

File tree

10 files changed

+308
-81
lines changed

10 files changed

+308
-81
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: schemaprocessor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Adds functionality to transform traces
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [38486]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
- Adds functionality to transform traces using the target schema version.
20+
21+
# If your change doesn't affect end users or the exported elements of any package,
22+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
23+
# Optional: The change log or logs in which this entry should be included.
24+
# e.g. '[user]' or '[user, api]'
25+
# Include 'user' if the change is relevant to end users.
26+
# Include 'api' if there is a change to a library API.
27+
# Default: '[user]'
28+
change_logs: []

cmd/otelcontribcol/builder-config.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ processors:
127127
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor v0.121.0
128128
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/routingprocessor v0.121.0
129129
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/sumologicprocessor v0.121.0
130+
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor v0.121.0
130131
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanprocessor v0.121.0
131132
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor v0.121.0
132133
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor v0.121.0

processor/schemaprocessor/DESIGN.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ graph LR;
1919
end
2020
2121
```
22-
The [Transformer](transformer.go) is registered as a Processor in the Collector by the factory.
22+
The [Schema Processor](processor.go) is registered as a Processor in the Collector by the factory.
2323
Data flows into the Transformer, which uses the Schema URL to fetch the translation from the Translation Manager.
2424
The Translation Manager (at internal/translation/manager.go in a future PR) is responsible for fetching and caching the translations. It takes in a schema URL and returns a Translator struct.
2525

processor/schemaprocessor/README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Schema Transformer Processor
1+
# Schema Processor
22

33
<!-- status autogenerated section -->
44
| Status | |
@@ -59,4 +59,4 @@ processors:
5959
6060
For more complete examples, please refer to [config.yml](./testdata/config.yml).
6161
62-
There's a rough design/overview of the processor in the [DESIGN.md](./DESIGN.md) file.
62+
There's a rough design/overview of the processor in the [DESIGN.md](./DESIGN.md) file.

processor/schemaprocessor/factory.go

+10-10
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ var processorCapabilities = consumer.Capabilities{MutatesData: true}
2222
// factory will store any of the precompiled schemas in future
2323
type factory struct{}
2424

25-
// newDefaultConfiguration returns the configuration for schema transformer processor
25+
// newDefaultConfiguration returns the configuration for schema processor
2626
// with the default values being used throughout it
2727
func newDefaultConfiguration() component.Config {
2828
return &Config{
@@ -47,7 +47,7 @@ func (f factory) createLogsProcessor(
4747
cfg component.Config,
4848
next consumer.Logs,
4949
) (processor.Logs, error) {
50-
transformer, err := newTransformer(ctx, cfg, set)
50+
schemaProcessor, err := newSchemaProcessor(ctx, cfg, set)
5151
if err != nil {
5252
return nil, err
5353
}
@@ -56,9 +56,9 @@ func (f factory) createLogsProcessor(
5656
set,
5757
cfg,
5858
next,
59-
transformer.processLogs,
59+
schemaProcessor.processLogs,
6060
processorhelper.WithCapabilities(processorCapabilities),
61-
processorhelper.WithStart(transformer.start),
61+
processorhelper.WithStart(schemaProcessor.start),
6262
)
6363
}
6464

@@ -68,7 +68,7 @@ func (f factory) createMetricsProcessor(
6868
cfg component.Config,
6969
next consumer.Metrics,
7070
) (processor.Metrics, error) {
71-
transformer, err := newTransformer(ctx, cfg, set)
71+
schemaProcessor, err := newSchemaProcessor(ctx, cfg, set)
7272
if err != nil {
7373
return nil, err
7474
}
@@ -77,9 +77,9 @@ func (f factory) createMetricsProcessor(
7777
set,
7878
cfg,
7979
next,
80-
transformer.processMetrics,
80+
schemaProcessor.processMetrics,
8181
processorhelper.WithCapabilities(processorCapabilities),
82-
processorhelper.WithStart(transformer.start),
82+
processorhelper.WithStart(schemaProcessor.start),
8383
)
8484
}
8585

@@ -89,7 +89,7 @@ func (f factory) createTracesProcessor(
8989
cfg component.Config,
9090
next consumer.Traces,
9191
) (processor.Traces, error) {
92-
transformer, err := newTransformer(ctx, cfg, set)
92+
schemaProcessor, err := newSchemaProcessor(ctx, cfg, set)
9393
if err != nil {
9494
return nil, err
9595
}
@@ -98,8 +98,8 @@ func (f factory) createTracesProcessor(
9898
set,
9999
cfg,
100100
next,
101-
transformer.processTraces,
101+
schemaProcessor.processTraces,
102102
processorhelper.WithCapabilities(processorCapabilities),
103-
processorhelper.WithStart(transformer.start),
103+
processorhelper.WithStart(schemaProcessor.start),
104104
)
105105
}

processor/schemaprocessor/internal/translation/manager.go

+1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ func NewManager(targetSchemaURLS []string, log *zap.Logger, providers ...Provide
7070
}
7171

7272
func (m *manager) RequestTranslation(ctx context.Context, schemaURL string) (Translation, error) {
73+
m.log.Debug("Requesting translation for schemaURL", zap.String("schema-url", schemaURL))
7374
family, version, err := GetFamilyAndVersion(schemaURL)
7475
if err != nil {
7576
m.log.Error("No valid schema url was provided",
+117
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package schemaprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor"
5+
6+
import (
7+
"context"
8+
"errors"
9+
10+
"go.opentelemetry.io/collector/component"
11+
"go.opentelemetry.io/collector/pdata/plog"
12+
"go.opentelemetry.io/collector/pdata/pmetric"
13+
"go.opentelemetry.io/collector/pdata/ptrace"
14+
"go.opentelemetry.io/collector/processor"
15+
"go.uber.org/zap"
16+
17+
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/translation"
18+
)
19+
20+
type schemaProcessor struct {
21+
telemetry component.TelemetrySettings
22+
config *Config
23+
24+
log *zap.Logger
25+
26+
manager translation.Manager
27+
}
28+
29+
func newSchemaProcessor(_ context.Context, conf component.Config, set processor.Settings) (*schemaProcessor, error) {
30+
cfg, ok := conf.(*Config)
31+
if !ok {
32+
return nil, errors.New("invalid configuration provided")
33+
}
34+
35+
m, err := translation.NewManager(
36+
cfg.Targets,
37+
set.Logger.Named("schema-manager"),
38+
)
39+
if err != nil {
40+
return nil, err
41+
}
42+
return &schemaProcessor{
43+
config: cfg,
44+
telemetry: set.TelemetrySettings,
45+
log: set.Logger,
46+
manager: m,
47+
}, nil
48+
}
49+
50+
func (t schemaProcessor) processLogs(_ context.Context, ld plog.Logs) (plog.Logs, error) {
51+
return ld, nil
52+
}
53+
54+
func (t schemaProcessor) processMetrics(_ context.Context, md pmetric.Metrics) (pmetric.Metrics, error) {
55+
return md, nil
56+
}
57+
58+
func (t schemaProcessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) {
59+
for rt := 0; rt < td.ResourceSpans().Len(); rt++ {
60+
rTrace := td.ResourceSpans().At(rt)
61+
resourceSchemaURL := rTrace.SchemaUrl()
62+
63+
if resourceSchemaURL != "" {
64+
t.log.Debug("requesting translation for resourceSchemaURL", zap.String("resourceSchemaURL", resourceSchemaURL))
65+
tr, err := t.manager.
66+
RequestTranslation(ctx, resourceSchemaURL)
67+
if err != nil {
68+
t.log.Error("failed to request translation", zap.Error(err))
69+
return td, err
70+
}
71+
err = tr.ApplyAllResourceChanges(rTrace, resourceSchemaURL)
72+
if err != nil {
73+
t.log.Error("failed to apply resource changes", zap.Error(err))
74+
return td, err
75+
}
76+
}
77+
for ss := 0; ss < rTrace.ScopeSpans().Len(); ss++ {
78+
span := rTrace.ScopeSpans().At(ss)
79+
spanSchemaURL := span.SchemaUrl()
80+
if spanSchemaURL == "" {
81+
spanSchemaURL = resourceSchemaURL
82+
}
83+
if spanSchemaURL == "" {
84+
continue
85+
}
86+
tr, err := t.manager.
87+
RequestTranslation(ctx, spanSchemaURL)
88+
if err != nil {
89+
t.log.Error("failed to request translation", zap.Error(err))
90+
continue
91+
}
92+
err = tr.ApplyScopeSpanChanges(span, spanSchemaURL)
93+
if err != nil {
94+
t.log.Error("failed to apply scope span changes", zap.Error(err))
95+
}
96+
}
97+
}
98+
return td, nil
99+
}
100+
101+
// start will add HTTP provider to the manager and prefetch schemas
102+
func (t *schemaProcessor) start(ctx context.Context, host component.Host) error {
103+
client, err := t.config.ToClient(ctx, host, t.telemetry)
104+
if err != nil {
105+
return err
106+
}
107+
t.manager.AddProvider(translation.NewHTTPProvider(client))
108+
109+
go func(ctx context.Context) {
110+
for _, schemaURL := range t.config.Prefetch {
111+
t.log.Info("prefetching schema", zap.String("url", schemaURL))
112+
_, _ = t.manager.RequestTranslation(ctx, schemaURL)
113+
}
114+
}(ctx)
115+
116+
return nil
117+
}

processor/schemaprocessor/transformer_test.go processor/schemaprocessor/processor_test.go

+35-11
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ package schemaprocessor
66
import (
77
"context"
88
_ "embed"
9+
"fmt"
10+
"strings"
911
"testing"
1012

1113
"github.com/stretchr/testify/assert"
@@ -18,27 +20,49 @@ import (
1820
"go.uber.org/zap/zaptest"
1921
)
2022

21-
func newTestTransformer(t *testing.T) *transformer {
22-
trans, err := newTransformer(context.Background(), newDefaultConfiguration(), processor.Settings{
23+
type dummySchemaProvider struct {
24+
transformations string
25+
}
26+
27+
func (m *dummySchemaProvider) Retrieve(_ context.Context, _ string) (string, error) {
28+
data := fmt.Sprintf(`
29+
file_format: 1.1.0
30+
schema_url: http://opentelemetry.io/schemas/1.9.0
31+
versions:
32+
%s`, m.transformations)
33+
data = strings.TrimSpace(data)
34+
return data, nil
35+
}
36+
37+
func newTestSchemaProcessor(t *testing.T, transformations string, targerVerion string) *schemaProcessor {
38+
cfg := &Config{
39+
Targets: []string{fmt.Sprintf("http://opentelemetry.io/schemas/%s", targerVerion)},
40+
}
41+
trans, err := newSchemaProcessor(context.Background(), cfg, processor.Settings{
2342
TelemetrySettings: component.TelemetrySettings{
2443
Logger: zaptest.NewLogger(t),
2544
},
2645
})
27-
require.NoError(t, err, "Must not error when creating default transformer")
46+
require.NoError(t, err, "Must not error when creating default schemaProcessor")
47+
trans.manager.AddProvider(&dummySchemaProvider{
48+
transformations: transformations,
49+
})
2850
return trans
2951
}
3052

31-
func TestTransformerStart(t *testing.T) {
53+
func TestSchemaProcessorStart(t *testing.T) {
3254
t.Parallel()
3355

34-
trans := newTestTransformer(t)
56+
trans := newTestSchemaProcessor(t, "", "1.9.0")
3557
assert.NoError(t, trans.start(context.Background(), nil))
3658
}
3759

38-
func TestTransformerProcessing(t *testing.T) {
60+
func TestSchemaProcessorProcessing(t *testing.T) {
3961
t.Parallel()
62+
// these tests are just to ensure that the processor does not error out
63+
// and that the data is not modified as dummyprovider has no transformations
4064

41-
trans := newTestTransformer(t)
65+
trans := newTestSchemaProcessor(t, "", "1.9.0")
4266
t.Run("metrics", func(t *testing.T) {
4367
in := pmetric.NewMetrics()
4468
in.ResourceMetrics().AppendEmpty()
@@ -52,7 +76,7 @@ func TestTransformerProcessing(t *testing.T) {
5276

5377
out, err := trans.processMetrics(context.Background(), in)
5478
assert.NoError(t, err, "Must not error when processing metrics")
55-
assert.Equal(t, in, out, "Must return the same data (subject to change)")
79+
assert.Equal(t, in, out, "Must return the same data")
5680
})
5781

5882
t.Run("traces", func(t *testing.T) {
@@ -67,8 +91,8 @@ func TestTransformerProcessing(t *testing.T) {
6791
s.CopyTo(in.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0))
6892

6993
out, err := trans.processTraces(context.Background(), in)
70-
assert.NoError(t, err, "Must not error when processing metrics")
71-
assert.Equal(t, in, out, "Must return the same data (subject to change)")
94+
assert.NoError(t, err, "Must not error when processing traces")
95+
assert.Equal(t, in, out, "Must return the same data")
7296
})
7397

7498
t.Run("logs", func(t *testing.T) {
@@ -83,6 +107,6 @@ func TestTransformerProcessing(t *testing.T) {
83107

84108
out, err := trans.processLogs(context.Background(), in)
85109
assert.NoError(t, err, "Must not error when processing metrics")
86-
assert.Equal(t, in, out, "Must return the same data (subject to change)")
110+
assert.Equal(t, in, out, "Must return the same data")
87111
})
88112
}

0 commit comments

Comments
 (0)