Skip to content

Commit b846651

Browse files
committed
split out manager provider
1 parent 4063e74 commit b846651

File tree

5 files changed

+436
-0
lines changed

5 files changed

+436
-0
lines changed
+131
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package translation // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/translation"
5+
6+
import (
7+
"context"
8+
"errors"
9+
"fmt"
10+
"sync"
11+
12+
"go.uber.org/zap"
13+
)
14+
15+
var errNilValueProvided = errors.New("nil value provided")
16+
17+
// Manager is responsible for ensuring that schemas are kept up to date
18+
// with the most recent version that are requested.
19+
type Manager interface {
20+
// RequestTranslation will provide either the defined Translation
21+
// if it is a known target, or, return a noop variation.
22+
// In the event that a matched Translation, on a missed version
23+
// there is a potential to block during this process.
24+
// Otherwise, the translation will allow concurrent reads.
25+
RequestTranslation(ctx context.Context, schemaURL string) Translation
26+
27+
// SetProviders will update the list of providers used by the manager
28+
// to look up schemaURLs
29+
SetProviders(providers ...Provider) error
30+
}
31+
32+
type manager struct {
33+
log *zap.Logger
34+
35+
rw sync.RWMutex
36+
providers []Provider
37+
match map[string]*Version
38+
translations map[string]*translator
39+
}
40+
41+
var _ Manager = (*manager)(nil)
42+
43+
// NewManager creates a manager that will allow for management
44+
// of schema, the options allow for additional properties to be
45+
// added to manager to enable additional locations of where to check
46+
// for translations file.
47+
func NewManager(targets []string, log *zap.Logger) (Manager, error) {
48+
if log == nil {
49+
return nil, fmt.Errorf("logger: %w", errNilValueProvided)
50+
}
51+
52+
match := make(map[string]*Version, len(targets))
53+
for _, target := range targets {
54+
family, version, err := GetFamilyAndVersion(target)
55+
if err != nil {
56+
return nil, err
57+
}
58+
match[family] = version
59+
}
60+
61+
return &manager{
62+
log: log,
63+
match: match,
64+
translations: make(map[string]*translator),
65+
}, nil
66+
}
67+
68+
func (m *manager) RequestTranslation(ctx context.Context, schemaURL string) Translation {
69+
family, version, err := GetFamilyAndVersion(schemaURL)
70+
if err != nil {
71+
m.log.Debug("No valid schema url was provided, using no-op schema",
72+
zap.String("schema-url", schemaURL),
73+
)
74+
return nopTranslation{}
75+
}
76+
77+
target, match := m.match[family]
78+
if !match {
79+
m.log.Debug("Not a known target, providing Nop Translation",
80+
zap.String("schema-url", schemaURL),
81+
)
82+
return nopTranslation{}
83+
}
84+
85+
m.rw.RLock()
86+
t, exists := m.translations[family]
87+
m.rw.RUnlock()
88+
89+
if exists && t.SupportedVersion(version) {
90+
return t
91+
}
92+
93+
for _, p := range m.providers {
94+
content, err := p.Lookup(ctx, schemaURL)
95+
if err != nil {
96+
m.log.Error("Failed to lookup schemaURL",
97+
zap.Error(err),
98+
zap.String("schemaURL", schemaURL),
99+
)
100+
// todo(ankit) figure out what to do when the providers dont respond something good
101+
}
102+
t, err := newTranslatorFromReader(
103+
m.log.Named("translator").With(
104+
zap.String("family", family),
105+
zap.Stringer("target", target),
106+
),
107+
joinSchemaFamilyAndVersion(family, target),
108+
content,
109+
)
110+
if err != nil {
111+
m.log.Error("Failed to create translator", zap.Error(err))
112+
continue
113+
}
114+
m.rw.Lock()
115+
m.translations[family] = t
116+
m.rw.Unlock()
117+
return t
118+
}
119+
120+
return nopTranslation{}
121+
}
122+
123+
func (m *manager) SetProviders(providers ...Provider) error {
124+
if len(providers) == 0 {
125+
return fmt.Errorf("zero providers set: %w", errNilValueProvided)
126+
}
127+
m.rw.Lock()
128+
m.providers = append(m.providers[:0], providers...)
129+
m.rw.Unlock()
130+
return nil
131+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package translation
5+
6+
import (
7+
"context"
8+
_ "embed"
9+
"fmt"
10+
"net/http"
11+
"net/http/httptest"
12+
"testing"
13+
14+
"github.com/stretchr/testify/assert"
15+
"github.com/stretchr/testify/require"
16+
"go.uber.org/zap/zaptest"
17+
)
18+
19+
//go:embed testdata/schema.yaml
20+
var exampleTranslation []byte
21+
22+
func TranslationHandler(t *testing.T) http.Handler {
23+
assert.NotEmpty(t, exampleTranslation, "SchemaContent MUST not be empty")
24+
return http.HandlerFunc(func(wr http.ResponseWriter, _ *http.Request) {
25+
_, err := wr.Write(exampleTranslation)
26+
assert.NoError(t, err, "Must not have issues writing schema content")
27+
})
28+
}
29+
30+
func TestRequestTranslation(t *testing.T) {
31+
t.Parallel()
32+
33+
s := httptest.NewServer(TranslationHandler(t))
34+
t.Cleanup(s.Close)
35+
36+
schemaURL := fmt.Sprintf("%s/1.1.0", s.URL)
37+
38+
m, err := NewManager(
39+
[]string{schemaURL},
40+
zaptest.NewLogger(t),
41+
)
42+
require.NoError(t, err, "Must not error when created manager")
43+
require.NoError(t, m.SetProviders(NewHTTPProvider(s.Client())), "Must have no issues trying to set providers")
44+
45+
nop, ok := m.RequestTranslation(context.Background(), "/not/a/valid/schema/URL").(nopTranslation)
46+
require.True(t, ok, "Must return a NoopTranslation if no valid schema URL is provided")
47+
require.NotNil(t, nop, "Must have a valid translation")
48+
49+
tn, ok := m.RequestTranslation(context.Background(), schemaURL).(*translator)
50+
require.True(t, ok, "Can cast to the concrete type")
51+
require.NotNil(t, tn, "Must have a valid translation")
52+
53+
assert.True(t, tn.SupportedVersion(&Version{1, 0, 0}), "Must have the version listed as supported")
54+
55+
count := 0
56+
prevRev := &Version{1, 0, 0}
57+
it, status := tn.iterator(context.Background(), prevRev)
58+
assert.Equal(t, Update, status, "Must return a status of update")
59+
for currRev, more := it(); more; currRev, more = it() {
60+
assert.True(t, prevRev.LessThan(currRev.Version()))
61+
prevRev = currRev.Version()
62+
count++
63+
}
64+
65+
tn, ok = m.RequestTranslation(context.Background(), schemaURL).(*translator)
66+
require.True(t, ok, "Can cast to the concrete type")
67+
require.NotNil(t, tn, "Must have a valid translation")
68+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package translation // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/translation"
5+
6+
import (
7+
"bytes"
8+
"context"
9+
"embed"
10+
"fmt"
11+
"io"
12+
"net/http"
13+
"net/url"
14+
)
15+
16+
// Provider allows for collector extensions to be used to look up schemaURLs
17+
type Provider interface {
18+
// Lookup whill check the underlying provider to see if content exists
19+
// for the provided schemaURL, in the even that it doesn't an error is returned.
20+
Lookup(ctx context.Context, schemaURL string) (content io.Reader, err error)
21+
}
22+
23+
type httpProvider struct {
24+
client *http.Client
25+
}
26+
27+
var _ Provider = (*httpProvider)(nil)
28+
29+
func NewHTTPProvider(client *http.Client) Provider {
30+
return &httpProvider{client: client}
31+
}
32+
33+
func (hp *httpProvider) Lookup(ctx context.Context, schemaURL string) (io.Reader, error) {
34+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, schemaURL, http.NoBody)
35+
if err != nil {
36+
return nil, err
37+
}
38+
resp, err := hp.client.Do(req)
39+
if err != nil {
40+
return nil, err
41+
}
42+
content := bytes.NewBuffer(nil)
43+
if _, err := content.ReadFrom(resp.Body); err != nil {
44+
return nil, err
45+
}
46+
if err := resp.Body.Close(); err != nil {
47+
return nil, err
48+
}
49+
if resp.StatusCode != http.StatusOK {
50+
return nil, fmt.Errorf("invalid status code returned: %d", resp.StatusCode)
51+
}
52+
return content, nil
53+
}
54+
55+
type testProvider struct {
56+
fs *embed.FS
57+
}
58+
59+
func NewTestProvider(fs *embed.FS) Provider {
60+
return &testProvider{fs: fs}
61+
}
62+
63+
func (tp testProvider) Lookup(_ context.Context, schemaURL string) (io.Reader, error) {
64+
parsedPath, err := url.Parse(schemaURL)
65+
if err != nil {
66+
return nil, err
67+
}
68+
f, err := tp.fs.Open(parsedPath.Path[1:])
69+
if err != nil {
70+
return nil, err
71+
}
72+
return f, nil
73+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package translation
5+
6+
import (
7+
"context"
8+
"fmt"
9+
"io"
10+
"net/http"
11+
"net/http/httptest"
12+
"testing"
13+
14+
"github.com/stretchr/testify/assert"
15+
)
16+
17+
func TestInvalidHTTPProviderTests(t *testing.T) {
18+
t.Parallel()
19+
20+
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
21+
if r.RequestURI != "/1.7.0" {
22+
w.WriteHeader(http.StatusBadRequest)
23+
return
24+
}
25+
_, err := io.Copy(w, LoadTranslationVersion(t, "complex_changeset.yml"))
26+
assert.NoError(t, err, "Must not error when trying load dataset")
27+
}))
28+
t.Cleanup(s.Close)
29+
30+
tests := []struct {
31+
scenario string
32+
url string
33+
}{
34+
{
35+
scenario: "A failed request happens",
36+
url: fmt.Sprint(s.URL, "/not/a/valid/path/1.7.0"),
37+
},
38+
{
39+
scenario: "invalid url",
40+
url: "unix:///localhost",
41+
},
42+
}
43+
44+
for _, tc := range tests {
45+
t.Run(tc.scenario, func(t *testing.T) {
46+
p := NewHTTPProvider(s.Client())
47+
content, err := p.Lookup(context.Background(), tc.url)
48+
assert.Nil(t, content, "Expected to be nil")
49+
assert.Error(t, err, "Must have errored processing request")
50+
})
51+
}
52+
}

0 commit comments

Comments
 (0)