Skip to content

Commit 47ebae2

Browse files
committed
Include changelist and revision parts
1 parent 20ec9bc commit 47ebae2

File tree

3 files changed

+228
-117
lines changed

3 files changed

+228
-117
lines changed
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package changelist // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/changelist"
5+
6+
import (
7+
"fmt"
8+
9+
"go.opentelemetry.io/collector/pdata/pmetric"
10+
"go.opentelemetry.io/collector/pdata/ptrace"
11+
12+
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/alias"
13+
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate"
14+
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/operator"
15+
)
16+
17+
// ChangeList represents a list of changes within a section of the schema processor. It can take in a list of different migrators for a specific section and will apply them in order, based on whether Apply or Rollback is called
18+
type ChangeList struct {
19+
Migrators []migrate.Migrator
20+
}
21+
22+
func (c ChangeList) Do(ss migrate.StateSelector, signal any) error {
23+
for i := 0; i < len(c.Migrators); i++ {
24+
var migrator migrate.Migrator
25+
// todo(ankit) in go1.23 switch to reversed iterators for this
26+
if ss == migrate.StateSelectorApply {
27+
migrator = c.Migrators[i]
28+
} else {
29+
migrator = c.Migrators[len(c.Migrators)-1-i]
30+
}
31+
// switch between operator types - what do the operators act on?
32+
switch thisMigrator := migrator.(type) {
33+
// this one acts on both spans and span events!
34+
case operator.SpanOperator:
35+
if span, ok := signal.(ptrace.Span); ok {
36+
if err := thisMigrator.Do(ss, span); err != nil {
37+
return err
38+
}
39+
} else {
40+
return fmt.Errorf("SpanOperator %T can't act on %T", thisMigrator, signal)
41+
}
42+
case operator.MetricOperator:
43+
if metric, ok := signal.(pmetric.Metric); ok {
44+
if err := thisMigrator.Do(ss, metric); err != nil {
45+
return err
46+
}
47+
} else {
48+
return fmt.Errorf("MetricOperator %T can't act on %T", thisMigrator, signal)
49+
}
50+
// no log operator because the only log operation is an attribute changeset
51+
// this block is for the `all` block, the `resource` block, and the `log` block
52+
// todo(ankit) switch these to specific typed ones?
53+
case migrate.AttributeChangeSet:
54+
switch attributeSignal := signal.(type) {
55+
case alias.Attributed:
56+
if err := thisMigrator.Do(ss, attributeSignal.Attributes()); err != nil {
57+
return err
58+
}
59+
default:
60+
return fmt.Errorf("unsupported signal type %T for AttributeChangeSet", attributeSignal)
61+
}
62+
default:
63+
return fmt.Errorf("unsupported migrator type %T", thisMigrator)
64+
}
65+
}
66+
return nil
67+
}
68+
69+
func (c ChangeList) Apply(signal any) error {
70+
return c.Do(migrate.StateSelectorApply, signal)
71+
}
72+
73+
func (c ChangeList) Rollback(signal any) error {
74+
return c.Do(migrate.StateSelectorRollback, signal)
75+
}

processor/schemaprocessor/internal/translation/revision_v1.go

Lines changed: 96 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -6,109 +6,141 @@ package translation // import "github.com/open-telemetry/opentelemetry-collector
66
import (
77
"go.opentelemetry.io/otel/schema/v1.0/ast"
88

9+
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/changelist"
910
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate"
11+
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/operator"
1012
)
1113

1214
// RevisionV1 represents all changes that are to be
1315
// applied to a signal at a given version.
16+
// todo(ankit) implement split and rest of otel schema
1417
type RevisionV1 struct {
15-
ver *Version
16-
all *migrate.AttributeChangeSetSlice
17-
resource *migrate.AttributeChangeSetSlice
18-
spans *migrate.ConditionalAttributeSetSlice
19-
eventNames *migrate.SignalNameChangeSlice
20-
eventAttrsOnSpan *migrate.ConditionalAttributeSetSlice
21-
eventAttrsOnName *migrate.ConditionalAttributeSetSlice
22-
metricsAttrs *migrate.ConditionalAttributeSetSlice
23-
metricNames *migrate.SignalNameChangeSlice
18+
ver *Version
19+
all *changelist.ChangeList
20+
resources *changelist.ChangeList
21+
spans *changelist.ChangeList
22+
spanEvents *changelist.ChangeList
23+
metrics *changelist.ChangeList
24+
logs *changelist.ChangeList
2425
}
2526

2627
// NewRevision processes the VersionDef and assigns the version to this revision
2728
// to allow sorting within a slice.
2829
// Since VersionDef uses custom types for various definitions, it isn't possible
2930
// to cast those values into the primitives so each has to be processed together.
3031
// Generics would be handy here.
32+
// todo(ankit) investigate using generics
3133
func NewRevision(ver *Version, def ast.VersionDef) *RevisionV1 {
34+
// todo(ankit) change logs to be an ast.Attributes type so I dont have to change this
35+
var logChanges ast.Attributes
36+
for _, change := range def.Logs.Changes {
37+
//nolint:gosimple
38+
logChanges.Changes = append(logChanges.Changes, ast.AttributeChange{RenameAttributes: change.RenameAttributes})
39+
40+
}
3241
return &RevisionV1{
33-
ver: ver,
34-
all: newAttributeChangeSetSliceFromChanges(def.All),
35-
resource: newAttributeChangeSetSliceFromChanges(def.Resources),
36-
spans: newSpanConditionalAttributeSlice(def.Spans),
37-
eventNames: newSpanEventSignalSlice(def.SpanEvents),
38-
eventAttrsOnSpan: newSpanEventConditionalSpans(def.SpanEvents),
39-
eventAttrsOnName: newSpanEventConditionalNames(def.SpanEvents),
40-
metricsAttrs: newMetricConditionalSlice(def.Metrics),
41-
metricNames: newMetricNameSignalSlice(def.Metrics),
42+
ver: ver,
43+
all: newAllChangeList(def.All),
44+
resources: newResourceChangeList(def.Resources),
45+
spans: newSpanChangeList(def.Spans),
46+
spanEvents: newSpanEventChangeList(def.SpanEvents),
47+
metrics: newMetricChangeList(def.Metrics),
48+
logs: newLogsChangelist(def.Logs),
4249
}
50+
4351
}
4452

45-
func newAttributeChangeSetSliceFromChanges(attrs ast.Attributes) *migrate.AttributeChangeSetSlice {
46-
values := make([]*migrate.AttributeChangeSet, 0, 10)
47-
for _, at := range attrs.Changes {
48-
if renamed := at.RenameAttributes; renamed != nil {
49-
values = append(values, migrate.NewAttributeChangeSet(renamed.AttributeMap))
50-
}
51-
}
52-
return migrate.NewAttributeChangeSetSlice(values...)
53+
func (r RevisionV1) Version() *Version {
54+
return r.ver
5355
}
5456

55-
func newSpanConditionalAttributeSlice(spans ast.Spans) *migrate.ConditionalAttributeSetSlice {
56-
values := make([]*migrate.ConditionalAttributeSet, 0, 10)
57-
for _, ch := range spans.Changes {
58-
if renamed := ch.RenameAttributes; renamed != nil {
59-
values = append(values, migrate.NewConditionalAttributeSet(
60-
renamed.AttributeMap,
61-
renamed.ApplyToSpans...,
62-
))
57+
func newAllChangeList(all ast.Attributes) *changelist.ChangeList {
58+
values := make([]migrate.Migrator, 0)
59+
for _, at := range all.Changes {
60+
if renamed := at.RenameAttributes; renamed != nil {
61+
attributeChangeSet := migrate.NewAttributeChangeSet(renamed.AttributeMap)
62+
values = append(values, attributeChangeSet)
6363
}
6464
}
65-
return migrate.NewConditionalAttributeSetSlice(values...)
65+
return &changelist.ChangeList{Migrators: values}
6666
}
6767

68-
func newSpanEventSignalSlice(events ast.SpanEvents) *migrate.SignalNameChangeSlice {
69-
values := make([]*migrate.SignalNameChange, 0, 10)
70-
for _, ch := range events.Changes {
71-
if renamed := ch.RenameEvents; renamed != nil {
72-
values = append(values, migrate.NewSignalNameChange(renamed.EventNameMap))
68+
func newResourceChangeList(resource ast.Attributes) *changelist.ChangeList {
69+
values := make([]migrate.Migrator, 0)
70+
for _, at := range resource.Changes {
71+
if renamed := at.RenameAttributes; renamed != nil {
72+
attributeChangeSet := migrate.NewAttributeChangeSet(renamed.AttributeMap)
73+
values = append(values, attributeChangeSet)
7374
}
7475
}
75-
return migrate.NewSignalNameChangeSlice(values...)
76+
return &changelist.ChangeList{Migrators: values}
7677
}
7778

78-
func newSpanEventConditionalSpans(events ast.SpanEvents) *migrate.ConditionalAttributeSetSlice {
79-
values := make([]*migrate.ConditionalAttributeSet, 0, 10)
80-
for _, ch := range events.Changes {
81-
if rename := ch.RenameAttributes; rename != nil {
82-
values = append(values, migrate.NewConditionalAttributeSet(rename.AttributeMap, rename.ApplyToSpans...))
79+
func newSpanChangeList(spans ast.Spans) *changelist.ChangeList {
80+
values := make([]migrate.Migrator, 0)
81+
for _, at := range spans.Changes {
82+
if renamed := at.RenameAttributes; renamed != nil {
83+
conditionalAttributeChangeSet := operator.SpanConditionalAttributeOperator{Migrator: migrate.NewConditionalAttributeSet(renamed.AttributeMap, renamed.ApplyToSpans...)}
84+
values = append(values, conditionalAttributeChangeSet)
8385
}
8486
}
85-
return migrate.NewConditionalAttributeSetSlice(values...)
87+
return &changelist.ChangeList{Migrators: values}
88+
8689
}
8790

88-
func newSpanEventConditionalNames(events ast.SpanEvents) *migrate.ConditionalAttributeSetSlice {
89-
values := make([]*migrate.ConditionalAttributeSet, 0, 10)
90-
for _, ch := range events.Changes {
91-
if rename := ch.RenameAttributes; rename != nil {
92-
values = append(values, migrate.NewConditionalAttributeSet(rename.AttributeMap, rename.ApplyToEvents...))
91+
func newMetricChangeList(metrics ast.Metrics) *changelist.ChangeList {
92+
values := make([]migrate.Migrator, 0)
93+
for _, at := range metrics.Changes {
94+
if renameAttributes := at.RenameAttributes; renameAttributes != nil {
95+
attributeChangeSet := operator.MetricDataPointAttributeOperator{
96+
ConditionalAttributeChange: migrate.NewConditionalAttributeSet(renameAttributes.AttributeMap, renameAttributes.ApplyToMetrics...),
97+
}
98+
values = append(values, attributeChangeSet)
99+
} else if renamedMetrics := at.RenameMetrics; renamedMetrics != nil {
100+
signalNameChange := operator.MetricSignalNameChange{SignalNameChange: migrate.NewSignalNameChange(renamedMetrics)}
101+
values = append(values, signalNameChange)
93102
}
94103
}
95-
return migrate.NewConditionalAttributeSetSlice(values...)
104+
return &changelist.ChangeList{Migrators: values}
96105
}
97106

98-
func newMetricConditionalSlice(metrics ast.Metrics) *migrate.ConditionalAttributeSetSlice {
99-
values := make([]*migrate.ConditionalAttributeSet, 0, 10)
100-
for _, ch := range metrics.Changes {
101-
if rename := ch.RenameAttributes; rename != nil {
102-
values = append(values, migrate.NewConditionalAttributeSet(rename.AttributeMap, rename.ApplyToMetrics...))
107+
func newSpanEventChangeList(spanEvents ast.SpanEvents) *changelist.ChangeList {
108+
values := make([]migrate.Migrator, 0)
109+
for _, at := range spanEvents.Changes {
110+
if renamedEvent := at.RenameEvents; renamedEvent != nil {
111+
signalNameChange := migrate.NewSignalNameChange(renamedEvent.EventNameMap)
112+
spanEventSignalNameChange := operator.SpanEventSignalNameChange{SignalNameChange: signalNameChange}
113+
values = append(values, spanEventSignalNameChange)
114+
} else if renamedAttribute := at.RenameAttributes; renamedAttribute != nil {
115+
acceptableSpanNames := make([]string, 0)
116+
for _, spanName := range renamedAttribute.ApplyToSpans {
117+
acceptableSpanNames = append(acceptableSpanNames, string(spanName))
118+
}
119+
acceptableEventNames := make([]string, 0)
120+
for _, eventName := range renamedAttribute.ApplyToEvents {
121+
acceptableEventNames = append(acceptableEventNames, string(eventName))
122+
}
123+
124+
attributeChangeSet := migrate.NewMultiConditionalAttributeSet(renamedAttribute.AttributeMap, map[string][]string{
125+
"span.name": acceptableSpanNames,
126+
"event.name": acceptableEventNames,
127+
})
128+
spanEventAttributeChangeSet := operator.NewSpanEventConditionalAttributeOperator(attributeChangeSet)
129+
values = append(values, spanEventAttributeChangeSet)
130+
} else {
131+
panic("spanEvents change must have either RenameEvents or RenameAttributes")
103132
}
104133
}
105-
return migrate.NewConditionalAttributeSetSlice(values...)
134+
return &changelist.ChangeList{Migrators: values}
106135
}
107136

108-
func newMetricNameSignalSlice(metrics ast.Metrics) *migrate.SignalNameChangeSlice {
109-
values := make([]*migrate.SignalNameChange, 0, 10)
110-
for _, ch := range metrics.Changes {
111-
values = append(values, migrate.NewSignalNameChange(ch.RenameMetrics))
137+
func newLogsChangelist(logs ast.Logs) *changelist.ChangeList {
138+
values := make([]migrate.Migrator, 0)
139+
for _, at := range logs.Changes {
140+
if renamed := at.RenameAttributes; renamed != nil {
141+
attributeChangeSet := migrate.NewAttributeChangeSet(renamed.AttributeMap)
142+
values = append(values, attributeChangeSet)
143+
}
112144
}
113-
return migrate.NewSignalNameChangeSlice(values...)
145+
return &changelist.ChangeList{Migrators: values}
114146
}

0 commit comments

Comments
 (0)