Skip to content

Commit 1829df1

Browse files
MovieStoreGuyankitpatel96
authored andcommitted
[Schema Processor] complete implementation squashed
1 parent 62ff9a6 commit 1829df1

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+4309
-19
lines changed

processor/schemaprocessor/go.mod

+6-2
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,26 @@ go 1.22.0
44

55
require (
66
github.com/google/go-cmp v0.6.0
7+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.110.0
78
github.com/stretchr/testify v1.9.0
89
go.opentelemetry.io/collector/component v0.110.0
910
go.opentelemetry.io/collector/config/confighttp v0.110.0
11+
go.opentelemetry.io/collector/config/configtelemetry v0.110.0
1012
go.opentelemetry.io/collector/confmap v1.16.0
1113
go.opentelemetry.io/collector/consumer v0.110.0
1214
go.opentelemetry.io/collector/consumer/consumertest v0.110.0
1315
go.opentelemetry.io/collector/pdata v1.16.0
1416
go.opentelemetry.io/collector/processor v0.110.0
17+
go.opentelemetry.io/otel/metric v1.30.0
1518
go.opentelemetry.io/otel/schema v0.0.9
1619
go.uber.org/goleak v1.3.0
1720
go.uber.org/multierr v1.11.0
1821
go.uber.org/zap v1.27.0
1922
)
2023

2124
require (
25+
github.com/Masterminds/semver/v3 v3.3.0 // indirect
26+
github.com/cespare/xxhash/v2 v2.3.0 // indirect
2227
github.com/davecgh/go-spew v1.1.1 // indirect
2328
github.com/felixge/httpsnoop v1.0.4 // indirect
2429
github.com/fsnotify/fsnotify v1.7.0 // indirect
@@ -37,14 +42,14 @@ require (
3742
github.com/mitchellh/reflectwalk v1.0.2 // indirect
3843
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
3944
github.com/modern-go/reflect2 v1.0.2 // indirect
45+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.110.0 // indirect
4046
github.com/pmezard/go-difflib v1.0.0 // indirect
4147
github.com/rs/cors v1.11.1 // indirect
4248
go.opentelemetry.io/collector/client v1.16.0 // indirect
4349
go.opentelemetry.io/collector/component/componentstatus v0.110.0 // indirect
4450
go.opentelemetry.io/collector/config/configauth v0.110.0 // indirect
4551
go.opentelemetry.io/collector/config/configcompression v1.16.0 // indirect
4652
go.opentelemetry.io/collector/config/configopaque v1.16.0 // indirect
47-
go.opentelemetry.io/collector/config/configtelemetry v0.110.0 // indirect
4853
go.opentelemetry.io/collector/config/configtls v1.16.0 // indirect
4954
go.opentelemetry.io/collector/config/internal v0.110.0 // indirect
5055
go.opentelemetry.io/collector/consumer/consumerprofiles v0.110.0 // indirect
@@ -57,7 +62,6 @@ require (
5762
go.opentelemetry.io/collector/processor/processorprofiles v0.110.0 // indirect
5863
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0 // indirect
5964
go.opentelemetry.io/otel v1.30.0 // indirect
60-
go.opentelemetry.io/otel/metric v1.30.0 // indirect
6165
go.opentelemetry.io/otel/sdk v1.30.0 // indirect
6266
go.opentelemetry.io/otel/sdk/metric v1.30.0 // indirect
6367
go.opentelemetry.io/otel/trace v1.30.0 // indirect

processor/schemaprocessor/go.sum

+10
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package translation // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/translation"
5+
6+
import (
7+
"context"
8+
"errors"
9+
"fmt"
10+
"sync"
11+
12+
"go.uber.org/zap"
13+
)
14+
15+
var (
16+
errNilValueProvided = errors.New("nil value provided")
17+
)
18+
19+
// Manager is responsible for ensuring that schemas are kept up to date
20+
// with the most recent version that are requested.
21+
type Manager interface {
22+
// RequestTranslation will provide either the defined Translation
23+
// if it is a known target, or, return a noop variation.
24+
// In the event that a matched Translation, on a missed version
25+
// there is a potential to block during this process.
26+
// Otherwise, the translation will allow concurrent reads.
27+
RequestTranslation(ctx context.Context, schemaURL string) Translation
28+
29+
// SetProviders will update the list of providers used by the manager
30+
// to look up schemaURLs
31+
SetProviders(providers ...Provider) error
32+
}
33+
34+
type manager struct {
35+
log *zap.Logger
36+
37+
rw sync.RWMutex
38+
providers []Provider
39+
match map[string]*Version
40+
translations map[string]*translator
41+
}
42+
43+
var (
44+
_ Manager = (*manager)(nil)
45+
)
46+
47+
// NewManager creates a manager that will allow for management
48+
// of schema, the options allow for additional properties to be
49+
// added to manager to enable additional locations of where to check
50+
// for translations file.
51+
func NewManager(targets []string, log *zap.Logger) (Manager, error) {
52+
if log == nil {
53+
return nil, fmt.Errorf("logger: %w", errNilValueProvided)
54+
}
55+
56+
match := make(map[string]*Version, len(targets))
57+
for _, target := range targets {
58+
family, version, err := GetFamilyAndVersion(target)
59+
if err != nil {
60+
return nil, err
61+
}
62+
match[family] = version
63+
}
64+
65+
return &manager{
66+
log: log,
67+
match: match,
68+
translations: make(map[string]*translator),
69+
}, nil
70+
}
71+
72+
func (m *manager) RequestTranslation(ctx context.Context, schemaURL string) Translation {
73+
family, version, err := GetFamilyAndVersion(schemaURL)
74+
if err != nil {
75+
m.log.Debug("No valid schema url was provided, using no-op schema",
76+
zap.String("schema-url", schemaURL),
77+
)
78+
return nopTranslation{}
79+
}
80+
81+
target, match := m.match[family]
82+
if !match {
83+
m.log.Debug("Not a known target, providing Nop Translation",
84+
zap.String("schema-url", schemaURL),
85+
)
86+
return nopTranslation{}
87+
}
88+
89+
m.rw.RLock()
90+
t, exists := m.translations[family]
91+
m.rw.RUnlock()
92+
93+
if exists && t.SupportedVersion(version) {
94+
return t
95+
}
96+
97+
for _, p := range m.providers {
98+
content, err := p.Lookup(ctx, schemaURL)
99+
if err != nil {
100+
m.log.Error("Failed to lookup schemaURL",
101+
zap.Error(err),
102+
zap.String("schemaURL", schemaURL),
103+
)
104+
// todo(ankit) figure out what to do when the providers dont respond something good
105+
}
106+
t, err := newTranslatorFromReader(
107+
m.log.Named("translator").With(
108+
zap.String("family", family),
109+
zap.Stringer("target", target),
110+
),
111+
joinSchemaFamilyAndVersion(family, target),
112+
content,
113+
)
114+
if err != nil {
115+
m.log.Error("Failed to create translator", zap.Error(err))
116+
continue
117+
}
118+
m.rw.Lock()
119+
m.translations[family] = t
120+
m.rw.Unlock()
121+
return t
122+
}
123+
124+
return nopTranslation{}
125+
}
126+
127+
func (m *manager) SetProviders(providers ...Provider) error {
128+
if len(providers) == 0 {
129+
return fmt.Errorf("zero providers set: %w", errNilValueProvided)
130+
}
131+
m.rw.Lock()
132+
m.providers = append(m.providers[:0], providers...)
133+
m.rw.Unlock()
134+
return nil
135+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package translation
5+
6+
import (
7+
"context"
8+
_ "embed"
9+
"fmt"
10+
"net/http"
11+
"net/http/httptest"
12+
"testing"
13+
14+
"github.com/stretchr/testify/assert"
15+
"github.com/stretchr/testify/require"
16+
"go.uber.org/zap/zaptest"
17+
)
18+
19+
//go:embed testdata/schema.yaml
20+
var exampleTranslation []byte
21+
22+
func TranslationHandler(t *testing.T) http.Handler {
23+
assert.NotEmpty(t, exampleTranslation, "SchemaContent MUST not be empty")
24+
return http.HandlerFunc(func(wr http.ResponseWriter, _ *http.Request) {
25+
_, err := wr.Write(exampleTranslation)
26+
assert.NoError(t, err, "Must not have issues writing schema content")
27+
})
28+
}
29+
30+
func TestRequestTranslation(t *testing.T) {
31+
t.Parallel()
32+
33+
s := httptest.NewServer(TranslationHandler(t))
34+
t.Cleanup(s.Close)
35+
36+
var (
37+
schemaURL = fmt.Sprintf("%s/1.1.0", s.URL)
38+
)
39+
40+
m, err := NewManager(
41+
[]string{schemaURL},
42+
zaptest.NewLogger(t),
43+
)
44+
require.NoError(t, err, "Must not error when created manager")
45+
require.NoError(t, m.SetProviders(NewHTTPProvider(s.Client())), "Must have no issues trying to set providers")
46+
47+
nop, ok := m.RequestTranslation(context.Background(), "/not/a/valid/schema/URL").(nopTranslation)
48+
require.True(t, ok, "Must return a NoopTranslation if no valid schema URL is provided")
49+
require.NotNil(t, nop, "Must have a valid translation")
50+
51+
tn, ok := m.RequestTranslation(context.Background(), schemaURL).(*translator)
52+
require.True(t, ok, "Can cast to the concrete type")
53+
require.NotNil(t, tn, "Must have a valid translation")
54+
55+
assert.True(t, tn.SupportedVersion(&Version{1, 0, 0}), "Must have the version listed as supported")
56+
57+
count := 0
58+
prevRev := &Version{1, 0, 0}
59+
it, status := tn.iterator(context.Background(), prevRev)
60+
assert.Equal(t, Update, status, "Must return a status of update")
61+
for currRev, more := it(); more; currRev, more = it() {
62+
assert.True(t, prevRev.LessThan(currRev.Version()))
63+
prevRev = currRev.Version()
64+
count++
65+
}
66+
67+
tn, ok = m.RequestTranslation(context.Background(), schemaURL).(*translator)
68+
require.True(t, ok, "Can cast to the concrete type")
69+
require.NotNil(t, tn, "Must have a valid translation")
70+
71+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package translation // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/translation"
5+
6+
import (
7+
"bytes"
8+
"context"
9+
"embed"
10+
"fmt"
11+
"io"
12+
"net/http"
13+
"net/url"
14+
)
15+
16+
// Provider allows for collector extensions to be used to look up schemaURLs
17+
type Provider interface {
18+
// Lookup whill check the underlying provider to see if content exists
19+
// for the provided schemaURL, in the even that it doesn't an error is returned.
20+
Lookup(ctx context.Context, schemaURL string) (content io.Reader, err error)
21+
}
22+
23+
type httpProvider struct {
24+
client *http.Client
25+
}
26+
27+
var (
28+
_ Provider = (*httpProvider)(nil)
29+
)
30+
31+
func NewHTTPProvider(client *http.Client) Provider {
32+
return &httpProvider{client: client}
33+
}
34+
35+
func (hp *httpProvider) Lookup(ctx context.Context, schemaURL string) (io.Reader, error) {
36+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, schemaURL, http.NoBody)
37+
if err != nil {
38+
return nil, err
39+
}
40+
resp, err := hp.client.Do(req)
41+
if err != nil {
42+
return nil, err
43+
}
44+
content := bytes.NewBuffer(nil)
45+
if _, err := content.ReadFrom(resp.Body); err != nil {
46+
return nil, err
47+
}
48+
if err := resp.Body.Close(); err != nil {
49+
return nil, err
50+
}
51+
if resp.StatusCode/100 != 2 {
52+
return nil, fmt.Errorf("invalid status code returned: %d", resp.StatusCode)
53+
}
54+
return content, nil
55+
}
56+
57+
type testProvider struct {
58+
fs *embed.FS
59+
}
60+
61+
func NewTestProvider(fs *embed.FS) Provider {
62+
return &testProvider{fs: fs}
63+
}
64+
65+
func (tp testProvider) Lookup(_ context.Context, schemaURL string) (io.Reader, error) {
66+
parsedPath, err := url.Parse(schemaURL)
67+
if err != nil {
68+
return nil, err
69+
}
70+
f, err := tp.fs.Open(parsedPath.Path[1:])
71+
if err != nil {
72+
return nil, err
73+
}
74+
return f, nil
75+
}

0 commit comments

Comments
 (0)