Skip to content

Commit 58e3435

Browse files
ankitpatel96mx-psi
authored andcommitted
[chore] Schema Processor Revamp [Part 2] - ChangeList and Revision (open-telemetry#35267)
**Description:** <Describe what has changed.> This is a slice of changes from open-telemetry#35248 This PR details how operators are used to build the execution pipeline for a given schemafile. Changed files from the [previous PR](open-telemetry#35214) are: processor/schemaprocessor/internal/changelist/changelist.go processor/schemaprocessor/internal/translation/revision_v1.go processor/schemaprocessor/internal/translation/revision_v1_test.go processor/schemaprocessor/go.mod I'm asking a maintainer if they would be willing to push a copy of the previous PR's branch to the core repo so I can switch the base of this PR to the previous PR - thus only the stacked changes would be shown. Edit: this is apparently not easily supported - so asking reviewers to just focus on the changed files listed above. Sorry about that! **Testing:** <Describe what testing was performed and which tests were added.> Unit tests --------- Co-authored-by: Pablo Baeyens <[email protected]>
1 parent f9b9e2c commit 58e3435

26 files changed

+1299
-487
lines changed

processor/schemaprocessor/DESIGN.md

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Design
2+
3+
The Schema Processor is split into several different components.
4+
5+
Here's a general structure diagram:
6+
7+
```mermaid
8+
graph LR;
9+
A[Previous Collector Component] --> B[Transformer]
10+
B -- Schema URL --> C[Translation Manager]
11+
C -- Translation --> B
12+
B --> H[Translator]
13+
H --> E[Revision]
14+
E --> I[ChangeList]
15+
subgraph Interpreter
16+
direction RL
17+
I --> F[Transformer]
18+
F --> G[Migrator]
19+
end
20+
21+
```
22+
The [Transformer](transformer.go) is registered as a Processor in the Collector by the factory.
23+
Data flows into the Transformer, which uses the Schema URL to fetch the translation from the Translation Manager.
24+
The Translation Manager (at internal/translation/manager.go in a future PR) is responsible for fetching and caching the translations. It takes in a schema URL and returns a Translator struct.
25+
26+
The Translator struct contains the target schema URL, the target schema version, and a list of Revisions. The Translator figures out what the version of the incoming data is and what Revisions to apply to the incoming data to get it to the target schema version. The Translator is also responsible for applying the Revisions to the incoming data - it iterates through these Revisions and applies them to the incoming data.
27+
28+
Each Revision represents all the changes within a specific version. It consists of several ChangeLists (at internal/changelist/changelist.go in a future PR) - one for each type of change block (at the time of writing - `all`, `resources`, `spans`, `spanEvents`, `metrics`, `logs`). Each ChangeList is similar to a program in an interpreter - in this case the programming language is the schema file! They iterate through whatever changes they are constructed with, and call a [Transformer](internal/transformer) for each type of change. The Transformer accepts a typed value - a log, a metric, etc. It then, under the hood, calls one of a few Migrators. The Migrators do the fundamental work of changing attributes, changing names, etc. The Migrators generally operate on lower levels than the Transformers - they operate on `Attributes`, or an `alias.NamedSignal` (a signal that implements `Name()` and `SetName()`).

processor/schemaprocessor/README.md

+3-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
| Stability | [development]: traces, metrics, logs |
77
| Distributions | [] |
88
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Fschema%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Fschema) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Fschema%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Fschema) |
9-
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@MovieStoreGuy](https://www.github.com/MovieStoreGuy) |
9+
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@MovieStoreGuy](https://www.github.com/MovieStoreGuy), [@ankitpatel96](https://www.github.com/ankitpatel96) |
1010

1111
[development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development
1212
<!-- end autogenerated section -->
@@ -59,3 +59,5 @@ processors:
5959
```
6060
6161
For more complete examples, please refer to [config.yml](./testdata/config.yml).
62+
63+
There's a rough design/overview of the processor in the [DESIGN.md](./DESIGN.md) file.

processor/schemaprocessor/go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/schem
33
go 1.22.0
44

55
require (
6+
github.com/google/go-cmp v0.6.0
67
github.com/stretchr/testify v1.10.0
78
go.opentelemetry.io/collector/component v0.114.1-0.20241202231142-b9ff1bc54c99
89
go.opentelemetry.io/collector/component/componenttest v0.114.1-0.20241202231142-b9ff1bc54c99

processor/schemaprocessor/internal/alias/alias.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
// Package Alias is a subset of the interfaces defined by pdata and family
4+
// Package alias is a subset of the interfaces defined by pdata and family
55
// package to allow for higher code reuse without using generics.
66
package alias // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/alias"
77

@@ -30,6 +30,10 @@ type NamedSignal interface {
3030
SetName(name string)
3131
}
3232

33+
type Attributed interface {
34+
Attributes() pcommon.Map
35+
}
36+
3337
var (
3438
_ Resource = (*plog.ResourceLogs)(nil)
3539
_ Resource = (*pmetric.ResourceMetrics)(nil)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package changelist // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/changelist"
5+
6+
import (
7+
"fmt"
8+
9+
"go.opentelemetry.io/collector/pdata/pcommon"
10+
"go.opentelemetry.io/collector/pdata/plog"
11+
"go.opentelemetry.io/collector/pdata/pmetric"
12+
"go.opentelemetry.io/collector/pdata/ptrace"
13+
14+
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate"
15+
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/transformer"
16+
)
17+
18+
// ChangeList represents a list of changes within a section of the schema processor. It can take in a list of different migrators for a specific section and will apply them in order, based on whether Apply or Rollback is called
19+
type ChangeList struct {
20+
Migrators []migrate.Migrator
21+
}
22+
23+
func (c ChangeList) Do(ss migrate.StateSelector, signal any) error {
24+
for i := 0; i < len(c.Migrators); i++ {
25+
var migrator migrate.Migrator
26+
// todo(ankit) in go1.23 switch to reversed iterators for this
27+
if ss == migrate.StateSelectorApply {
28+
migrator = c.Migrators[i]
29+
} else {
30+
migrator = c.Migrators[len(c.Migrators)-1-i]
31+
}
32+
// switch between transformer types - what do the transformers act on?
33+
switch thisMigrator := migrator.(type) {
34+
// this one acts on both spans and span events!
35+
case transformer.Transformer[ptrace.Span]:
36+
if span, ok := signal.(ptrace.Span); ok {
37+
if err := thisMigrator.Do(ss, span); err != nil {
38+
return err
39+
}
40+
} else {
41+
return fmt.Errorf("span Transformer %T can't act on %T", thisMigrator, signal)
42+
}
43+
case transformer.Transformer[pmetric.Metric]:
44+
if metric, ok := signal.(pmetric.Metric); ok {
45+
if err := thisMigrator.Do(ss, metric); err != nil {
46+
return err
47+
}
48+
} else {
49+
return fmt.Errorf("metric Transformer %T can't act on %T", thisMigrator, signal)
50+
}
51+
case transformer.Transformer[plog.LogRecord]:
52+
if log, ok := signal.(plog.LogRecord); ok {
53+
if err := thisMigrator.Do(ss, log); err != nil {
54+
return err
55+
}
56+
} else {
57+
return fmt.Errorf("log Transformer %T can't act on %T", thisMigrator, signal)
58+
}
59+
case transformer.Transformer[pcommon.Resource]:
60+
if resource, ok := signal.(pcommon.Resource); ok {
61+
if err := thisMigrator.Do(ss, resource); err != nil {
62+
return err
63+
}
64+
} else {
65+
return fmt.Errorf("resource Transformer %T can't act on %T", thisMigrator, signal)
66+
}
67+
case transformer.AllAttributes:
68+
if err := thisMigrator.Do(ss, signal); err != nil {
69+
return err
70+
}
71+
default:
72+
return fmt.Errorf("unsupported migrator type %T", thisMigrator)
73+
}
74+
}
75+
return nil
76+
}
77+
78+
func (c ChangeList) Apply(signal any) error {
79+
return c.Do(migrate.StateSelectorApply, signal)
80+
}
81+
82+
func (c ChangeList) Rollback(signal any) error {
83+
return c.Do(migrate.StateSelectorRollback, signal)
84+
}

processor/schemaprocessor/internal/migrate/attributes.go

+10-48
Original file line numberDiff line numberDiff line change
@@ -11,24 +11,22 @@ import (
1111
"go.uber.org/multierr"
1212
)
1313

14-
// AttributeChangeSet represents an unscoped entry that can be applied.
15-
//
14+
// AttributeChangeSet represents a rename_attributes type operation.
1615
// The listed changes are duplicated twice
1716
// to allow for simplified means of transition to or from a revision.
1817
type AttributeChangeSet struct {
19-
updates ast.AttributeMap
18+
// The keys are the old attribute name used in the previous version, the values are the
19+
// new attribute name starting from this version (comment from ast.AttributeMap)
20+
updates ast.AttributeMap
21+
// the inverse of the updates map
2022
rollback ast.AttributeMap
2123
}
2224

23-
// AttributeChangeSetSlice allows for `AttributeChangeSet`
24-
// to be chained together as they are defined within the schema
25-
// and be applied sequentially to ensure deterministic behavior.
26-
type AttributeChangeSetSlice []*AttributeChangeSet
27-
2825
// NewAttributeChangeSet allows for typed strings to be used as part
2926
// of the invocation that will be converted into the default string type.
30-
func NewAttributeChangeSet(mappings ast.AttributeMap) *AttributeChangeSet {
31-
attr := &AttributeChangeSet{
27+
func NewAttributeChangeSet(mappings ast.AttributeMap) AttributeChangeSet {
28+
// for ambiguous rollbacks (if updates contains entries with multiple keys that have the same value), rollback contains the last key iterated over in mappings
29+
attr := AttributeChangeSet{
3230
updates: make(map[string]string, len(mappings)),
3331
rollback: make(map[string]string, len(mappings)),
3432
}
@@ -39,15 +37,9 @@ func NewAttributeChangeSet(mappings ast.AttributeMap) *AttributeChangeSet {
3937
return attr
4038
}
4139

42-
func (a *AttributeChangeSet) Apply(attrs pcommon.Map) error {
43-
return a.do(StateSelectorApply, attrs)
44-
}
40+
func (a AttributeChangeSet) IsMigrator() {}
4541

46-
func (a *AttributeChangeSet) Rollback(attrs pcommon.Map) error {
47-
return a.do(StateSelectorRollback, attrs)
48-
}
49-
50-
func (a *AttributeChangeSet) do(ss StateSelector, attrs pcommon.Map) (errs error) {
42+
func (a *AttributeChangeSet) Do(ss StateSelector, attrs pcommon.Map) (errs error) {
5143
var (
5244
updated = make(map[string]struct{})
5345
results = pcommon.NewMap()
@@ -81,33 +73,3 @@ func (a *AttributeChangeSet) do(ss StateSelector, attrs pcommon.Map) (errs error
8173
results.CopyTo(attrs)
8274
return errs
8375
}
84-
85-
// NewAttributeChangeSetSlice combines all the provided `AttributeChangeSets`
86-
// and allows them to be executed in the provided order.
87-
func NewAttributeChangeSetSlice(changes ...*AttributeChangeSet) *AttributeChangeSetSlice {
88-
values := new(AttributeChangeSetSlice)
89-
for _, c := range changes {
90-
(*values) = append((*values), c)
91-
}
92-
return values
93-
}
94-
95-
func (slice *AttributeChangeSetSlice) Apply(attrs pcommon.Map) error {
96-
return slice.do(StateSelectorApply, attrs)
97-
}
98-
99-
func (slice *AttributeChangeSetSlice) Rollback(attrs pcommon.Map) error {
100-
return slice.do(StateSelectorRollback, attrs)
101-
}
102-
103-
func (slice *AttributeChangeSetSlice) do(ss StateSelector, attrs pcommon.Map) (errs error) {
104-
for i := 0; i < len(*slice); i++ {
105-
switch ss {
106-
case StateSelectorApply:
107-
errs = multierr.Append(errs, (*slice)[i].Apply(attrs))
108-
case StateSelectorRollback:
109-
errs = multierr.Append(errs, (*slice)[len(*slice)-1-i].Rollback(attrs))
110-
}
111-
}
112-
return errs
113-
}

processor/schemaprocessor/internal/migrate/attributes_test.go

+5-99
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func TestNewAttributeChangeSet(t *testing.T) {
2727
"hello": "world",
2828
})
2929

30-
expect := &AttributeChangeSet{
30+
expect := AttributeChangeSet{
3131
updates: map[string]string{
3232
"hello": "world",
3333
},
@@ -45,7 +45,7 @@ func TestAttributeChangeSetApply(t *testing.T) {
4545

4646
for _, tc := range []struct {
4747
name string
48-
acs *AttributeChangeSet
48+
acs AttributeChangeSet
4949
attrs pcommon.Map
5050
expect pcommon.Map
5151
errVal string
@@ -104,7 +104,7 @@ func TestAttributeChangeSetApply(t *testing.T) {
104104
t.Run(tc.name, func(t *testing.T) {
105105
t.Parallel()
106106

107-
err := tc.acs.Apply(tc.attrs)
107+
err := tc.acs.Do(StateSelectorApply, tc.attrs)
108108
if tc.errVal == "" {
109109
assert.NoError(t, err, "Must not return an error")
110110
} else {
@@ -120,7 +120,7 @@ func TestAttributeChangeSetRollback(t *testing.T) {
120120

121121
for _, tc := range []struct {
122122
name string
123-
acs *AttributeChangeSet
123+
acs AttributeChangeSet
124124
attrs pcommon.Map
125125
expect pcommon.Map
126126
errVal string
@@ -179,7 +179,7 @@ func TestAttributeChangeSetRollback(t *testing.T) {
179179
t.Run(tc.name, func(t *testing.T) {
180180
t.Parallel()
181181

182-
err := tc.acs.Rollback(tc.attrs)
182+
err := tc.acs.Do(StateSelectorRollback, tc.attrs)
183183
if tc.errVal == "" {
184184
assert.NoError(t, err, "Must not return an error")
185185
} else {
@@ -189,97 +189,3 @@ func TestAttributeChangeSetRollback(t *testing.T) {
189189
})
190190
}
191191
}
192-
193-
func TestNewAttributeChangeSetSliceApply(t *testing.T) {
194-
t.Parallel()
195-
196-
for _, tc := range []struct {
197-
name string
198-
changes *AttributeChangeSetSlice
199-
attr pcommon.Map
200-
expect pcommon.Map
201-
}{
202-
{
203-
name: "no changes listed",
204-
changes: NewAttributeChangeSetSlice(),
205-
attr: testHelperBuildMap(func(m pcommon.Map) {
206-
m.PutStr("service.version", "v0.0.1")
207-
}),
208-
expect: testHelperBuildMap(func(m pcommon.Map) {
209-
m.PutStr("service.version", "v0.0.1")
210-
}),
211-
},
212-
{
213-
name: "changes defined",
214-
changes: NewAttributeChangeSetSlice(
215-
NewAttributeChangeSet(map[string]string{
216-
"service_version": "service.version",
217-
}),
218-
NewAttributeChangeSet(map[string]string{
219-
"service.version": "application.service.version",
220-
}),
221-
),
222-
attr: testHelperBuildMap(func(m pcommon.Map) {
223-
m.PutStr("service_version", "v0.0.1")
224-
}),
225-
expect: testHelperBuildMap(func(m pcommon.Map) {
226-
m.PutStr("application.service.version", "v0.0.1")
227-
}),
228-
},
229-
} {
230-
tc := tc
231-
t.Run(tc.name, func(t *testing.T) {
232-
t.Parallel()
233-
234-
assert.NoError(t, tc.changes.Apply(tc.attr))
235-
assert.Equal(t, tc.expect.AsRaw(), tc.attr.AsRaw(), "Must match the expected attributes")
236-
})
237-
}
238-
}
239-
240-
func TestNewAttributeChangeSetSliceApplyRollback(t *testing.T) {
241-
t.Parallel()
242-
243-
for _, tc := range []struct {
244-
name string
245-
changes *AttributeChangeSetSlice
246-
attr pcommon.Map
247-
expect pcommon.Map
248-
}{
249-
{
250-
name: "no changes listed",
251-
changes: NewAttributeChangeSetSlice(),
252-
attr: testHelperBuildMap(func(m pcommon.Map) {
253-
m.PutStr("service.version", "v0.0.1")
254-
}),
255-
expect: testHelperBuildMap(func(m pcommon.Map) {
256-
m.PutStr("service.version", "v0.0.1")
257-
}),
258-
},
259-
{
260-
name: "changes defined",
261-
changes: NewAttributeChangeSetSlice(
262-
NewAttributeChangeSet(map[string]string{
263-
"service_version": "service.version",
264-
}),
265-
NewAttributeChangeSet(map[string]string{
266-
"service.version": "application.service.version",
267-
}),
268-
),
269-
attr: testHelperBuildMap(func(m pcommon.Map) {
270-
m.PutStr("application.service.version", "v0.0.1")
271-
}),
272-
expect: testHelperBuildMap(func(m pcommon.Map) {
273-
m.PutStr("service_version", "v0.0.1")
274-
}),
275-
},
276-
} {
277-
tc := tc
278-
t.Run(tc.name, func(t *testing.T) {
279-
t.Parallel()
280-
281-
assert.NoError(t, tc.changes.Rollback(tc.attr))
282-
assert.Equal(t, tc.expect.AsRaw(), tc.attr.AsRaw(), "Must match the expected attributes")
283-
})
284-
}
285-
}

0 commit comments

Comments
 (0)