From 96a23636512204bb133a758a3d3d65c580e688f2 Mon Sep 17 00:00:00 2001 From: Ankit Patel <8731662+ankitpatel96@users.noreply.github.com> Date: Wed, 22 Jan 2025 00:23:43 -0500 Subject: [PATCH 01/17] split out manager provider --- .../internal/translation/manager.go | 131 ++++++++++++++++++ .../internal/translation/manager_test.go | 68 +++++++++ .../internal/translation/provider.go | 73 ++++++++++ .../internal/translation/provider_test.go | 52 +++++++ .../internal/translation/testdata/schema.yaml | 112 +++++++++++++++ 5 files changed, 436 insertions(+) create mode 100644 processor/schemaprocessor/internal/translation/manager.go create mode 100644 processor/schemaprocessor/internal/translation/manager_test.go create mode 100644 processor/schemaprocessor/internal/translation/provider.go create mode 100644 processor/schemaprocessor/internal/translation/provider_test.go create mode 100644 processor/schemaprocessor/internal/translation/testdata/schema.yaml diff --git a/processor/schemaprocessor/internal/translation/manager.go b/processor/schemaprocessor/internal/translation/manager.go new file mode 100644 index 000000000000..5d8431b89b51 --- /dev/null +++ b/processor/schemaprocessor/internal/translation/manager.go @@ -0,0 +1,131 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package translation // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/translation" + +import ( + "context" + "errors" + "fmt" + "sync" + + "go.uber.org/zap" +) + +var errNilValueProvided = errors.New("nil value provided") + +// Manager is responsible for ensuring that schemas are kept up to date +// with the most recent version that are requested. +type Manager interface { + // RequestTranslation will provide either the defined Translation + // if it is a known target, or, return a noop variation. + // In the event that a matched Translation, on a missed version + // there is a potential to block during this process. + // Otherwise, the translation will allow concurrent reads. + RequestTranslation(ctx context.Context, schemaURL string) Translation + + // SetProviders will update the list of providers used by the manager + // to look up schemaURLs + SetProviders(providers ...Provider) error +} + +type manager struct { + log *zap.Logger + + rw sync.RWMutex + providers []Provider + match map[string]*Version + translations map[string]*translator +} + +var _ Manager = (*manager)(nil) + +// NewManager creates a manager that will allow for management +// of schema, the options allow for additional properties to be +// added to manager to enable additional locations of where to check +// for translations file. +func NewManager(targets []string, log *zap.Logger) (Manager, error) { + if log == nil { + return nil, fmt.Errorf("logger: %w", errNilValueProvided) + } + + match := make(map[string]*Version, len(targets)) + for _, target := range targets { + family, version, err := GetFamilyAndVersion(target) + if err != nil { + return nil, err + } + match[family] = version + } + + return &manager{ + log: log, + match: match, + translations: make(map[string]*translator), + }, nil +} + +func (m *manager) RequestTranslation(ctx context.Context, schemaURL string) Translation { + family, version, err := GetFamilyAndVersion(schemaURL) + if err != nil { + m.log.Debug("No valid schema url was provided, using no-op schema", + zap.String("schema-url", schemaURL), + ) + return nopTranslation{} + } + + target, match := m.match[family] + if !match { + m.log.Debug("Not a known target, providing Nop Translation", + zap.String("schema-url", schemaURL), + ) + return nopTranslation{} + } + + m.rw.RLock() + t, exists := m.translations[family] + m.rw.RUnlock() + + if exists && t.SupportedVersion(version) { + return t + } + + for _, p := range m.providers { + content, err := p.Lookup(ctx, schemaURL) + if err != nil { + m.log.Error("Failed to lookup schemaURL", + zap.Error(err), + zap.String("schemaURL", schemaURL), + ) + // todo(ankit) figure out what to do when the providers dont respond something good + } + t, err := newTranslatorFromReader( + m.log.Named("translator").With( + zap.String("family", family), + zap.Stringer("target", target), + ), + joinSchemaFamilyAndVersion(family, target), + content, + ) + if err != nil { + m.log.Error("Failed to create translator", zap.Error(err)) + continue + } + m.rw.Lock() + m.translations[family] = t + m.rw.Unlock() + return t + } + + return nopTranslation{} +} + +func (m *manager) SetProviders(providers ...Provider) error { + if len(providers) == 0 { + return fmt.Errorf("zero providers set: %w", errNilValueProvided) + } + m.rw.Lock() + m.providers = append(m.providers[:0], providers...) + m.rw.Unlock() + return nil +} diff --git a/processor/schemaprocessor/internal/translation/manager_test.go b/processor/schemaprocessor/internal/translation/manager_test.go new file mode 100644 index 000000000000..a6326d5a7679 --- /dev/null +++ b/processor/schemaprocessor/internal/translation/manager_test.go @@ -0,0 +1,68 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package translation + +import ( + "context" + _ "embed" + "fmt" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" +) + +//go:embed testdata/schema.yaml +var exampleTranslation []byte + +func TranslationHandler(t *testing.T) http.Handler { + assert.NotEmpty(t, exampleTranslation, "SchemaContent MUST not be empty") + return http.HandlerFunc(func(wr http.ResponseWriter, _ *http.Request) { + _, err := wr.Write(exampleTranslation) + assert.NoError(t, err, "Must not have issues writing schema content") + }) +} + +func TestRequestTranslation(t *testing.T) { + t.Parallel() + + s := httptest.NewServer(TranslationHandler(t)) + t.Cleanup(s.Close) + + schemaURL := fmt.Sprintf("%s/1.1.0", s.URL) + + m, err := NewManager( + []string{schemaURL}, + zaptest.NewLogger(t), + ) + require.NoError(t, err, "Must not error when created manager") + require.NoError(t, m.SetProviders(NewHTTPProvider(s.Client())), "Must have no issues trying to set providers") + + nop, ok := m.RequestTranslation(context.Background(), "/not/a/valid/schema/URL").(nopTranslation) + require.True(t, ok, "Must return a NoopTranslation if no valid schema URL is provided") + require.NotNil(t, nop, "Must have a valid translation") + + tn, ok := m.RequestTranslation(context.Background(), schemaURL).(*translator) + require.True(t, ok, "Can cast to the concrete type") + require.NotNil(t, tn, "Must have a valid translation") + + assert.True(t, tn.SupportedVersion(&Version{1, 0, 0}), "Must have the version listed as supported") + + count := 0 + prevRev := &Version{1, 0, 0} + it, status := tn.iterator(prevRev) + assert.Equal(t, Update, status, "Must return a status of update") + for currRev, more := it(); more; currRev, more = it() { + assert.True(t, prevRev.LessThan(currRev.Version())) + prevRev = currRev.Version() + count++ + } + + tn, ok = m.RequestTranslation(context.Background(), schemaURL).(*translator) + require.True(t, ok, "Can cast to the concrete type") + require.NotNil(t, tn, "Must have a valid translation") +} diff --git a/processor/schemaprocessor/internal/translation/provider.go b/processor/schemaprocessor/internal/translation/provider.go new file mode 100644 index 000000000000..d82eb19d3414 --- /dev/null +++ b/processor/schemaprocessor/internal/translation/provider.go @@ -0,0 +1,73 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package translation // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/translation" + +import ( + "bytes" + "context" + "embed" + "fmt" + "io" + "net/http" + "net/url" +) + +// Provider allows for collector extensions to be used to look up schemaURLs +type Provider interface { + // Lookup whill check the underlying provider to see if content exists + // for the provided schemaURL, in the even that it doesn't an error is returned. + Lookup(ctx context.Context, schemaURL string) (content io.Reader, err error) +} + +type httpProvider struct { + client *http.Client +} + +var _ Provider = (*httpProvider)(nil) + +func NewHTTPProvider(client *http.Client) Provider { + return &httpProvider{client: client} +} + +func (hp *httpProvider) Lookup(ctx context.Context, schemaURL string) (io.Reader, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, schemaURL, http.NoBody) + if err != nil { + return nil, err + } + resp, err := hp.client.Do(req) + if err != nil { + return nil, err + } + content := bytes.NewBuffer(nil) + if _, err := content.ReadFrom(resp.Body); err != nil { + return nil, err + } + if err := resp.Body.Close(); err != nil { + return nil, err + } + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("invalid status code returned: %d", resp.StatusCode) + } + return content, nil +} + +type testProvider struct { + fs *embed.FS +} + +func NewTestProvider(fs *embed.FS) Provider { + return &testProvider{fs: fs} +} + +func (tp testProvider) Lookup(_ context.Context, schemaURL string) (io.Reader, error) { + parsedPath, err := url.Parse(schemaURL) + if err != nil { + return nil, err + } + f, err := tp.fs.Open(parsedPath.Path[1:]) + if err != nil { + return nil, err + } + return f, nil +} diff --git a/processor/schemaprocessor/internal/translation/provider_test.go b/processor/schemaprocessor/internal/translation/provider_test.go new file mode 100644 index 000000000000..407cf25a8579 --- /dev/null +++ b/processor/schemaprocessor/internal/translation/provider_test.go @@ -0,0 +1,52 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package translation + +import ( + "context" + "fmt" + "io" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestInvalidHTTPProviderTests(t *testing.T) { + t.Parallel() + + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.RequestURI != "/1.7.0" { + w.WriteHeader(http.StatusBadRequest) + return + } + _, err := io.Copy(w, LoadTranslationVersion(t, "complex_changeset.yml")) + assert.NoError(t, err, "Must not error when trying load dataset") + })) + t.Cleanup(s.Close) + + tests := []struct { + scenario string + url string + }{ + { + scenario: "A failed request happens", + url: fmt.Sprint(s.URL, "/not/a/valid/path/1.7.0"), + }, + { + scenario: "invalid url", + url: "unix:///localhost", + }, + } + + for _, tc := range tests { + t.Run(tc.scenario, func(t *testing.T) { + p := NewHTTPProvider(s.Client()) + content, err := p.Lookup(context.Background(), tc.url) + assert.Nil(t, content, "Expected to be nil") + assert.Error(t, err, "Must have errored processing request") + }) + } +} diff --git a/processor/schemaprocessor/internal/translation/testdata/schema.yaml b/processor/schemaprocessor/internal/translation/testdata/schema.yaml new file mode 100644 index 000000000000..ae7c1665bc78 --- /dev/null +++ b/processor/schemaprocessor/internal/translation/testdata/schema.yaml @@ -0,0 +1,112 @@ +# Defines the file format. MUST be set to 1.0.0. +file_format: 1.0.0 + +# The Schema URL that this file is published at. The version number in the URL +# MUST match the highest version number in the "versions" section below. +# Note: the schema version number in the URL is not related in any way to +# the file_format setting above. +schema_url: https://opentelemetry.io/schemas/1.1.0 + +# Definitions for each schema version in this family. +# Note: the ordering of versions is defined according to semver +# version number ordering rules. +versions: + 1.1.0: + # Definitions for version 1.1.0. + all: + # Definitions that apply to all data types. + changes: + # Transformations to apply when converting from version 1.0.0 to 1.1.0. + - rename_attributes: + attribute_map: + # map of key/values. The keys are the old attribute name used + # the previous version, the values are the new attribute name + # starting from this version. + # Rename k8s.* to kubernetes.* + k8s.cluster.name: kubernetes.cluster.name + k8s.namespace.name: kubernetes.namespace.name + k8s.node.name: kubernetes.node.name + k8s.node.uid: kubernetes.node.uid + k8s.pod.name: kubernetes.pod.name + k8s.pod.uid: kubernetes.pod.uid + k8s.container.name: kubernetes.container.name + k8s.replicaset.name: kubernetes.replicaset.name + k8s.replicaset.uid: kubernetes.replicaset.uid + k8s.cronjob.name: kubernetes.cronjob.name + k8s.cronjob.uid: kubernetes.cronjob.uid + k8s.job.name: kubernetes.job.name + k8s.job.uid: kubernetes.job.uid + k8s.statefulset.name: kubernetes.statefulset.name + k8s.statefulset.uid: kubernetes.statefulset.uid + k8s.daemonset.name: kubernetes.daemonset.name + k8s.daemonset.uid: kubernetes.daemonset.uid + k8s.deployment.name: kubernetes.deployment.name + k8s.deployment.uid: kubernetes.deployment.uid + + resources: + # Definitions that apply to Resource data type. + changes: + - rename_attributes: + attribute_map: + telemetry.auto.version: telemetry.auto_instr.version + + spans: + # Definitions that apply to Span data type. + changes: + - rename_attributes: + attribute_map: + # map of key/values. The keys are the old attribute name used + # in the previous version, the values are the new attribute name + # starting from this version. + peer.service: peer.service.name + apply_to_spans: + # apply only to spans named "HTTP GET" + - "HTTP GET" + + span_events: + # Definitions that apply to Span Event data type. + changes: + - rename_events: + # The keys are old event name used in the previous version, the + # values are the new event name starting from this version. + name_map: {stacktrace: stack_trace} + + - rename_attributes: + attribute_map: + peer.service: peer.service.name + apply_to_events: + # Optional event names to apply to. If empty applies to all events. + - exception.stack_trace + + metrics: + # Definitions that apply to Metric data type. + changes: + - rename_metrics: + # map of key/values. The keys are the old metric name used + # in the previous version, the values are the new metric name + # starting from this version. + container.cpu.usage.total: cpu.usage.total + container.memory.usage.max: memory.usage.max + + - rename_attributes: + attribute_map: + status: state + apply_to_metrics: + # Optional. If it is missing the transformation is applied + # to all metrics. If it is present the transformation is applied + # only to the metrics with the name that is found in the sequence + # specified below. + - system.cpu.utilization + - system.memory.usage + - system.memory.utilization + - system.paging.usage + + logs: + # Definitions that apply to LogRecord data type. + changes: + - rename_attributes: + attribute_map: + process.executable_name: process.executable.name + + 1.0.0: + # First version of this schema family. From 5fe87d3d9f0cfa5589da39629571cbb890769e4e Mon Sep 17 00:00:00 2001 From: Ankit Patel <8731662+ankitpatel96@users.noreply.github.com> Date: Wed, 22 Jan 2025 13:53:51 -0500 Subject: [PATCH 02/17] remove nolint --- processor/schemaprocessor/internal/translation/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/schemaprocessor/internal/translation/version.go b/processor/schemaprocessor/internal/translation/version.go index f9d1888140ce..ec8048afa7e2 100644 --- a/processor/schemaprocessor/internal/translation/version.go +++ b/processor/schemaprocessor/internal/translation/version.go @@ -72,7 +72,7 @@ func GetFamilyAndVersion(schemaURL string) (family string, version *Version, err return u.String(), version, err } -func joinSchemaFamilyAndVersion(family string, version *Version) string { //nolint: unparam +func joinSchemaFamilyAndVersion(family string, version *Version) string { u, err := url.Parse(family) if err != nil { return "" From b5b947715e31c5137cd4cd8cfe61e6cea002ad74 Mon Sep 17 00:00:00 2001 From: Ankit Patel <8731662+ankitpatel96@users.noreply.github.com> Date: Wed, 5 Feb 2025 12:52:11 -0500 Subject: [PATCH 03/17] add dinesh as codeowner --- processor/schemaprocessor/README.md | 2 +- processor/schemaprocessor/generated_package_test.go | 3 +-- processor/schemaprocessor/metadata.yaml | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/processor/schemaprocessor/README.md b/processor/schemaprocessor/README.md index 08e322121d71..be88823c1954 100644 --- a/processor/schemaprocessor/README.md +++ b/processor/schemaprocessor/README.md @@ -6,7 +6,7 @@ | Stability | [development]: traces, metrics, logs | | Distributions | [] | | Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Fschema%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Fschema) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Fschema%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Fschema) | -| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@MovieStoreGuy](https://www.github.com/MovieStoreGuy), [@ankitpatel96](https://www.github.com/ankitpatel96) | +| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@MovieStoreGuy](https://www.github.com/MovieStoreGuy), [@ankitpatel96](https://www.github.com/ankitpatel96), [@dineshg13](https://www.github.com/dineshg13) | [development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development diff --git a/processor/schemaprocessor/generated_package_test.go b/processor/schemaprocessor/generated_package_test.go index 8593af713ccf..b4b9a973793d 100644 --- a/processor/schemaprocessor/generated_package_test.go +++ b/processor/schemaprocessor/generated_package_test.go @@ -3,9 +3,8 @@ package schemaprocessor import ( - "testing" - "go.uber.org/goleak" + "testing" ) func TestMain(m *testing.M) { diff --git a/processor/schemaprocessor/metadata.yaml b/processor/schemaprocessor/metadata.yaml index 731ef9a26e34..dc0962ce34b3 100644 --- a/processor/schemaprocessor/metadata.yaml +++ b/processor/schemaprocessor/metadata.yaml @@ -6,7 +6,7 @@ status: development: [traces, metrics, logs] distributions: [] codeowners: - active: [MovieStoreGuy, ankitpatel96] + active: [MovieStoreGuy, ankitpatel96, dineshg13] tests: config: From ef63c0b9a34836346bd09c5470229b814514a31f Mon Sep 17 00:00:00 2001 From: Ankit Patel <8731662+ankitpatel96@users.noreply.github.com> Date: Wed, 5 Feb 2025 16:51:12 -0500 Subject: [PATCH 04/17] go generate --- processor/schemaprocessor/generated_package_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/processor/schemaprocessor/generated_package_test.go b/processor/schemaprocessor/generated_package_test.go index b4b9a973793d..8593af713ccf 100644 --- a/processor/schemaprocessor/generated_package_test.go +++ b/processor/schemaprocessor/generated_package_test.go @@ -3,8 +3,9 @@ package schemaprocessor import ( - "go.uber.org/goleak" "testing" + + "go.uber.org/goleak" ) func TestMain(m *testing.M) { From e160c06dcb279949ba6c1369b50f107201e269db Mon Sep 17 00:00:00 2001 From: Dinesh Gurumurthy Date: Sun, 23 Feb 2025 10:34:10 -0500 Subject: [PATCH 05/17] PR feedback --- .../internal/translation/manager.go | 46 +++++++-------- .../internal/translation/provider.go | 58 ++++++++----------- .../internal/translation/provider_test.go | 34 ++++++++++- .../internal/translation/translation.go | 6 +- .../translation/translation_helpers_test.go | 11 ++-- .../translation/translation_race_test.go | 6 +- .../internal/translation/translation_test.go | 26 ++++----- 7 files changed, 103 insertions(+), 84 deletions(-) diff --git a/processor/schemaprocessor/internal/translation/manager.go b/processor/schemaprocessor/internal/translation/manager.go index 5d8431b89b51..7220bb247a5b 100644 --- a/processor/schemaprocessor/internal/translation/manager.go +++ b/processor/schemaprocessor/internal/translation/manager.go @@ -32,25 +32,23 @@ type Manager interface { type manager struct { log *zap.Logger - rw sync.RWMutex - providers []Provider - match map[string]*Version - translations map[string]*translator + rw sync.RWMutex + providers []Provider + match map[string]*Version + translatorMap map[string]*translator } var _ Manager = (*manager)(nil) // NewManager creates a manager that will allow for management -// of schema, the options allow for additional properties to be -// added to manager to enable additional locations of where to check -// for translations file. -func NewManager(targets []string, log *zap.Logger) (Manager, error) { +// of schema +func NewManager(targetSchemaURLS []string, log *zap.Logger) (Manager, error) { if log == nil { return nil, fmt.Errorf("logger: %w", errNilValueProvided) } - match := make(map[string]*Version, len(targets)) - for _, target := range targets { + match := make(map[string]*Version, len(targetSchemaURLS)) + for _, target := range targetSchemaURLS { family, version, err := GetFamilyAndVersion(target) if err != nil { return nil, err @@ -59,31 +57,31 @@ func NewManager(targets []string, log *zap.Logger) (Manager, error) { } return &manager{ - log: log, - match: match, - translations: make(map[string]*translator), + log: log, + match: match, + translatorMap: make(map[string]*translator), }, nil } func (m *manager) RequestTranslation(ctx context.Context, schemaURL string) Translation { family, version, err := GetFamilyAndVersion(schemaURL) if err != nil { - m.log.Debug("No valid schema url was provided, using no-op schema", + m.log.Error("No valid schema url was provided, using no-op schema", zap.String("schema-url", schemaURL), ) return nopTranslation{} } - target, match := m.match[family] + targetTranslation, match := m.match[family] if !match { - m.log.Debug("Not a known target, providing Nop Translation", + m.log.Warn("Not a known targetTranslation, providing Nop Translation", zap.String("schema-url", schemaURL), ) return nopTranslation{} } m.rw.RLock() - t, exists := m.translations[family] + t, exists := m.translatorMap[family] m.rw.RUnlock() if exists && t.SupportedVersion(version) { @@ -91,20 +89,20 @@ func (m *manager) RequestTranslation(ctx context.Context, schemaURL string) Tran } for _, p := range m.providers { - content, err := p.Lookup(ctx, schemaURL) + content, err := p.Retrieve(ctx, schemaURL) if err != nil { m.log.Error("Failed to lookup schemaURL", zap.Error(err), zap.String("schemaURL", schemaURL), ) - // todo(ankit) figure out what to do when the providers dont respond something good + return nopTranslation{} } - t, err := newTranslatorFromReader( + t, err := newTranslator( m.log.Named("translator").With( zap.String("family", family), - zap.Stringer("target", target), + zap.Stringer("target", targetTranslation), ), - joinSchemaFamilyAndVersion(family, target), + joinSchemaFamilyAndVersion(family, targetTranslation), content, ) if err != nil { @@ -112,7 +110,7 @@ func (m *manager) RequestTranslation(ctx context.Context, schemaURL string) Tran continue } m.rw.Lock() - m.translations[family] = t + m.translatorMap[family] = t m.rw.Unlock() return t } @@ -125,7 +123,7 @@ func (m *manager) SetProviders(providers ...Provider) error { return fmt.Errorf("zero providers set: %w", errNilValueProvided) } m.rw.Lock() + defer m.rw.Unlock() m.providers = append(m.providers[:0], providers...) - m.rw.Unlock() return nil } diff --git a/processor/schemaprocessor/internal/translation/provider.go b/processor/schemaprocessor/internal/translation/provider.go index d82eb19d3414..4910853e39a0 100644 --- a/processor/schemaprocessor/internal/translation/provider.go +++ b/processor/schemaprocessor/internal/translation/provider.go @@ -4,20 +4,18 @@ package translation // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/translation" import ( - "bytes" "context" - "embed" + "errors" "fmt" "io" "net/http" - "net/url" ) // Provider allows for collector extensions to be used to look up schemaURLs type Provider interface { - // Lookup whill check the underlying provider to see if content exists + // Retrieve whill check the underlying provider to see if content exists // for the provided schemaURL, in the even that it doesn't an error is returned. - Lookup(ctx context.Context, schemaURL string) (content io.Reader, err error) + Retrieve(ctx context.Context, schemaURL string) (string, error) } type httpProvider struct { @@ -26,48 +24,40 @@ type httpProvider struct { var _ Provider = (*httpProvider)(nil) +// NewHTTPProvider creates a new HTTP-based Provider. func NewHTTPProvider(client *http.Client) Provider { + if client == nil { + client = http.DefaultClient + } return &httpProvider{client: client} } -func (hp *httpProvider) Lookup(ctx context.Context, schemaURL string) (io.Reader, error) { +func (hp *httpProvider) Retrieve(ctx context.Context, schemaURL string) (string, error) { + if schemaURL == "" { + return "", errors.New("schema URL cannot be empty") + } + req, err := http.NewRequestWithContext(ctx, http.MethodGet, schemaURL, http.NoBody) if err != nil { - return nil, err + return "", fmt.Errorf("failed to create request: %w", err) } + resp, err := hp.client.Do(req) if err != nil { - return nil, err - } - content := bytes.NewBuffer(nil) - if _, err := content.ReadFrom(resp.Body); err != nil { - return nil, err - } - if err := resp.Body.Close(); err != nil { - return nil, err + return "", fmt.Errorf("request failed: %w", err) } + defer func() { + err = resp.Body.Close() + }() + if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("invalid status code returned: %d", resp.StatusCode) + return "", fmt.Errorf("unexpected status code: %d", resp.StatusCode) } - return content, nil -} -type testProvider struct { - fs *embed.FS -} - -func NewTestProvider(fs *embed.FS) Provider { - return &testProvider{fs: fs} -} - -func (tp testProvider) Lookup(_ context.Context, schemaURL string) (io.Reader, error) { - parsedPath, err := url.Parse(schemaURL) + data, err := io.ReadAll(resp.Body) if err != nil { - return nil, err + return "", fmt.Errorf("failed to read response body: %w", err) } - f, err := tp.fs.Open(parsedPath.Path[1:]) - if err != nil { - return nil, err - } - return f, nil + + return string(data), nil } diff --git a/processor/schemaprocessor/internal/translation/provider_test.go b/processor/schemaprocessor/internal/translation/provider_test.go index 407cf25a8579..679802015ec6 100644 --- a/processor/schemaprocessor/internal/translation/provider_test.go +++ b/processor/schemaprocessor/internal/translation/provider_test.go @@ -5,10 +5,13 @@ package translation import ( "context" + "embed" "fmt" "io" "net/http" "net/http/httptest" + "net/url" + "strings" "testing" "github.com/stretchr/testify/assert" @@ -22,7 +25,8 @@ func TestInvalidHTTPProviderTests(t *testing.T) { w.WriteHeader(http.StatusBadRequest) return } - _, err := io.Copy(w, LoadTranslationVersion(t, "complex_changeset.yml")) + data := LoadTranslationVersion(t, "complex_changeset.yml") + _, err := io.Copy(w, strings.NewReader(data)) assert.NoError(t, err, "Must not error when trying load dataset") })) t.Cleanup(s.Close) @@ -44,9 +48,33 @@ func TestInvalidHTTPProviderTests(t *testing.T) { for _, tc := range tests { t.Run(tc.scenario, func(t *testing.T) { p := NewHTTPProvider(s.Client()) - content, err := p.Lookup(context.Background(), tc.url) - assert.Nil(t, content, "Expected to be nil") + content, err := p.Retrieve(context.Background(), tc.url) + assert.Empty(t, content, "Expected to be empty") assert.Error(t, err, "Must have errored processing request") }) } } + +type testProvider struct { + fs *embed.FS +} + +func NewTestProvider(fs *embed.FS) Provider { + return &testProvider{fs: fs} +} + +func (tp testProvider) Retrieve(_ context.Context, schemaURL string) (string, error) { + parsedPath, err := url.Parse(schemaURL) + if err != nil { + return "", err + } + f, err := tp.fs.Open(parsedPath.Path[1:]) + if err != nil { + return "", err + } + data, err := io.ReadAll(f) + if err != nil { + return "", err + } + return string(data), nil +} diff --git a/processor/schemaprocessor/internal/translation/translation.go b/processor/schemaprocessor/internal/translation/translation.go index 71aeabeab625..93c4757a8db2 100644 --- a/processor/schemaprocessor/internal/translation/translation.go +++ b/processor/schemaprocessor/internal/translation/translation.go @@ -4,8 +4,8 @@ package translation // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/translation" import ( - "io" "sort" + "strings" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" @@ -107,8 +107,8 @@ func newTranslatorFromSchema(log *zap.Logger, targetSchemaURL string, schemaFile return t, nil } -func newTranslatorFromReader(log *zap.Logger, targetSchemaURL string, content io.Reader) (*translator, error) { - schemaFileSchema, err := encoder.Parse(content) +func newTranslator(log *zap.Logger, targetSchemaURL string, schema string) (*translator, error) { + schemaFileSchema, err := encoder.Parse(strings.NewReader(schema)) if err != nil { return nil, err } diff --git a/processor/schemaprocessor/internal/translation/translation_helpers_test.go b/processor/schemaprocessor/internal/translation/translation_helpers_test.go index 3f7d1cceaa36..9f451b635206 100644 --- a/processor/schemaprocessor/internal/translation/translation_helpers_test.go +++ b/processor/schemaprocessor/internal/translation/translation_helpers_test.go @@ -4,7 +4,6 @@ package translation import ( - "bytes" "embed" "fmt" "io" @@ -27,17 +26,21 @@ const ( //go:embed testdata var testdataFiles embed.FS -func LoadTranslationVersion(tb testing.TB, name string) io.Reader { +func LoadTranslationVersion(tb testing.TB, name string) string { tb.Helper() f, err := testdataFiles.Open(path.Join(prefix, name)) if !assert.NoError(tb, err, "Must not error when trying to open file") { - return bytes.NewBuffer(nil) + return "" } tb.Cleanup(func() { assert.NoError(tb, f.Close(), "Must not have issues trying to close static file") }) - return f + data, err := io.ReadAll(f) + if !assert.NoError(tb, err, "Must not error when trying to read file") { + return "" + } + return string(data) } func NewExampleLogs(tb testing.TB, at Version) plog.Logs { diff --git a/processor/schemaprocessor/internal/translation/translation_race_test.go b/processor/schemaprocessor/internal/translation/translation_race_test.go index bb971441e7ed..e698b9ebe581 100644 --- a/processor/schemaprocessor/internal/translation/translation_race_test.go +++ b/processor/schemaprocessor/internal/translation/translation_race_test.go @@ -15,7 +15,7 @@ import ( func TestRaceTranslationSpanChanges(t *testing.T) { t.Parallel() - tn, err := newTranslatorFromReader( + tn, err := newTranslator( zap.NewNop(), "https://example.com/1.7.0", LoadTranslationVersion(t, "complex_changeset.yml"), @@ -46,7 +46,7 @@ func TestRaceTranslationSpanChanges(t *testing.T) { func TestRaceTranslationMetricChanges(t *testing.T) { t.Parallel() - tn, err := newTranslatorFromReader( + tn, err := newTranslator( zap.NewNop(), "https://example.com/1.7.0", LoadTranslationVersion(t, "complex_changeset.yml"), @@ -78,7 +78,7 @@ func TestRaceTranslationMetricChanges(t *testing.T) { func TestRaceTranslationLogChanges(t *testing.T) { t.Parallel() - tn, err := newTranslatorFromReader( + tn, err := newTranslator( zap.NewNop(), "https://example.com/1.7.0", LoadTranslationVersion(t, "complex_changeset.yml"), diff --git a/processor/schemaprocessor/internal/translation/translation_test.go b/processor/schemaprocessor/internal/translation/translation_test.go index 5dcc9f1c5638..06790455d5d0 100644 --- a/processor/schemaprocessor/internal/translation/translation_test.go +++ b/processor/schemaprocessor/internal/translation/translation_test.go @@ -21,7 +21,7 @@ import ( func TestTranslationSupportedVersion(t *testing.T) { t.Parallel() - tn, err := newTranslatorFromReader( + tn, err := newTranslator( zaptest.NewLogger(t), "https://opentelemetry.io/schemas/1.9.0", LoadTranslationVersion(t, TranslationVersion190), @@ -107,7 +107,7 @@ func TestTranslationIteratorExact(t *testing.T) { for _, tc := range tests { t.Run(tc.scenario, func(t *testing.T) { - tn, err := newTranslatorFromReader(zaptest.NewLogger(t), tc.target, LoadTranslationVersion(t, TranslationVersion190)) + tn, err := newTranslator(zaptest.NewLogger(t), tc.target, LoadTranslationVersion(t, TranslationVersion190)) require.NoError(t, err, "Must have no error when creating translator") _, inVersion, err := GetFamilyAndVersion(tc.income) @@ -125,7 +125,7 @@ func TestTranslationIteratorExact(t *testing.T) { } func TestTranslationIterator(t *testing.T) { - tn, err := newTranslatorFromReader(zaptest.NewLogger(t), "https://opentelemetry.io/schemas/1.9.0", LoadTranslationVersion(t, TranslationVersion190)) + tn, err := newTranslator(zaptest.NewLogger(t), "https://opentelemetry.io/schemas/1.9.0", LoadTranslationVersion(t, TranslationVersion190)) require.NoError(t, err, "Must have no error when creating translator") ver := &Version{1, 0, 0} @@ -185,7 +185,7 @@ func TestTranslationSpanChanges(t *testing.T) { for _, tc := range tests { t.Run(tc.scenario, func(t *testing.T) { - tn, err := newTranslatorFromReader( + tn, err := newTranslator( zaptest.NewLogger(t), joinSchemaFamilyAndVersion("https://example.com/", &tc.target), LoadTranslationVersion(t, "complex_changeset.yml"), @@ -260,7 +260,7 @@ func TestTranslationLogChanges(t *testing.T) { for _, tc := range tests { t.Run(tc.scenario, func(t *testing.T) { - tn, err := newTranslatorFromReader( + tn, err := newTranslator( zaptest.NewLogger(t), joinSchemaFamilyAndVersion("https://example.com/", &tc.target), LoadTranslationVersion(t, "complex_changeset.yml"), @@ -332,7 +332,7 @@ func TestTranslationMetricChanges(t *testing.T) { for _, tc := range tests { t.Run(tc.scenario, func(t *testing.T) { - tn, err := newTranslatorFromReader( + tn, err := newTranslator( zaptest.NewLogger(t), joinSchemaFamilyAndVersion("https://example.com/", &tc.target), LoadTranslationVersion(t, "complex_changeset.yml"), @@ -362,7 +362,7 @@ func TestTranslationEquvialance_Logs(t *testing.T) { a, b := NewExampleLogs(t, Version{1, 0, 0}), NewExampleLogs(t, Version{1, 7, 0}) - tn, err := newTranslatorFromReader( + tn, err := newTranslator( zaptest.NewLogger(t), "https://example.com/1.4.0", LoadTranslationVersion(t, "complex_changeset.yml"), @@ -391,7 +391,7 @@ func TestTranslationEquvialance_Metrics(t *testing.T) { a, b := NewExampleMetrics(t, Version{1, 0, 0}), NewExampleMetrics(t, Version{1, 7, 0}) - tn, err := newTranslatorFromReader( + tn, err := newTranslator( zaptest.NewLogger(t), "https://example.com/1.4.0", LoadTranslationVersion(t, "complex_changeset.yml"), @@ -420,7 +420,7 @@ func TestTranslationEquvialance_Traces(t *testing.T) { a, b := NewExampleSpans(t, Version{1, 0, 0}), NewExampleSpans(t, Version{1, 7, 0}) - tn, err := newTranslatorFromReader( + tn, err := newTranslator( zaptest.NewLogger(t), "https://example.com/1.4.0", LoadTranslationVersion(t, "complex_changeset.yml"), @@ -451,7 +451,7 @@ func BenchmarkCreatingTranslation(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { - tn, err := newTranslatorFromReader( + tn, err := newTranslator( log, "https://opentelemetry.io/schemas/1.9.0", LoadTranslationVersion(b, TranslationVersion190), @@ -462,7 +462,7 @@ func BenchmarkCreatingTranslation(b *testing.B) { } func BenchmarkUpgradingMetrics(b *testing.B) { - tn, err := newTranslatorFromReader( + tn, err := newTranslator( zap.NewNop(), "https://example.com/1.7.0", LoadTranslationVersion(b, "complex_changeset.yml"), @@ -493,7 +493,7 @@ func BenchmarkUpgradingMetrics(b *testing.B) { } func BenchmarkUpgradingTraces(b *testing.B) { - tn, err := newTranslatorFromReader( + tn, err := newTranslator( zap.NewNop(), "https://example.com/1.7.0", LoadTranslationVersion(b, "complex_changeset.yml"), @@ -524,7 +524,7 @@ func BenchmarkUpgradingTraces(b *testing.B) { } func BenchmarkUpgradingLogs(b *testing.B) { - tn, err := newTranslatorFromReader( + tn, err := newTranslator( zap.NewNop(), "https://example.com/1.7.0", LoadTranslationVersion(b, "complex_changeset.yml"), From 6292d7079c1d56cf5355a4cbfead48abe525b57e Mon Sep 17 00:00:00 2001 From: Dinesh Gurumurthy Date: Sun, 23 Feb 2025 15:55:48 -0500 Subject: [PATCH 06/17] ignore error --- processor/schemaprocessor/internal/translation/provider.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/schemaprocessor/internal/translation/provider.go b/processor/schemaprocessor/internal/translation/provider.go index 4910853e39a0..e76768985cee 100644 --- a/processor/schemaprocessor/internal/translation/provider.go +++ b/processor/schemaprocessor/internal/translation/provider.go @@ -47,7 +47,7 @@ func (hp *httpProvider) Retrieve(ctx context.Context, schemaURL string) (string, return "", fmt.Errorf("request failed: %w", err) } defer func() { - err = resp.Body.Close() + _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK { From 8538020dbc16bb67104c7ed5cc1f1d3e3d3cc1d0 Mon Sep 17 00:00:00 2001 From: Dinesh Gurumurthy Date: Sun, 23 Feb 2025 17:23:11 -0500 Subject: [PATCH 07/17] Update codeowners --- .github/CODEOWNERS | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index c1bcd5949f78..b8c8d64b15bc 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -197,7 +197,7 @@ processor/resourcedetectionprocessor/ @open-telemetry processor/resourcedetectionprocessor/internal/dynatrace/ @open-telemetry/collector-contrib-approvers @bacherfl @evan-bradley processor/resourceprocessor/ @open-telemetry/collector-contrib-approvers @dmitryax processor/routingprocessor/ @open-telemetry/collector-contrib-approvers @jpkrohling -processor/schemaprocessor/ @open-telemetry/collector-contrib-approvers @MovieStoreGuy @ankitpatel96 +processor/schemaprocessor/ @open-telemetry/collector-contrib-approvers @MovieStoreGuy @ankitpatel96 @dineshg13 processor/spanprocessor/ @open-telemetry/collector-contrib-approvers @boostchicken processor/sumologicprocessor/ @open-telemetry/collector-contrib-approvers @rnishtala-sumo @chan-tim-sumo @echlebek processor/tailsamplingprocessor/ @open-telemetry/collector-contrib-approvers @jpkrohling @portertech From 2410bb3f574f0ee8118da8425cbbb4a6c51fb658 Mon Sep 17 00:00:00 2001 From: Dinesh Gurumurthy Date: Thu, 27 Feb 2025 11:24:07 +0000 Subject: [PATCH 08/17] PR feedback - let manager return an error --- processor/schemaprocessor/go.mod | 1 + processor/schemaprocessor/go.sum | 2 + .../translation/cacheable_provider.go | 68 +++++++++++++++++++ .../translation/cacheable_provider_test.go | 67 ++++++++++++++++++ .../internal/translation/manager.go | 52 +++++++------- .../internal/translation/manager_test.go | 41 ++++++++--- 6 files changed, 193 insertions(+), 38 deletions(-) create mode 100644 processor/schemaprocessor/internal/translation/cacheable_provider.go create mode 100644 processor/schemaprocessor/internal/translation/cacheable_provider_test.go diff --git a/processor/schemaprocessor/go.mod b/processor/schemaprocessor/go.mod index 98c985562ba1..efed7ffa633e 100644 --- a/processor/schemaprocessor/go.mod +++ b/processor/schemaprocessor/go.mod @@ -5,6 +5,7 @@ go 1.23.0 require ( github.com/google/go-cmp v0.7.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.120.1 + github.com/patrickmn/go-cache v2.1.0+incompatible github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/component v0.120.1-0.20250224010654-18e18b21da7a go.opentelemetry.io/collector/component/componenttest v0.120.1-0.20250224010654-18e18b21da7a diff --git a/processor/schemaprocessor/go.sum b/processor/schemaprocessor/go.sum index 0ee49e63ba01..58147ee48b5b 100644 --- a/processor/schemaprocessor/go.sum +++ b/processor/schemaprocessor/go.sum @@ -52,6 +52,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/processor/schemaprocessor/internal/translation/cacheable_provider.go b/processor/schemaprocessor/internal/translation/cacheable_provider.go new file mode 100644 index 000000000000..aa38e8340532 --- /dev/null +++ b/processor/schemaprocessor/internal/translation/cacheable_provider.go @@ -0,0 +1,68 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package translation + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/patrickmn/go-cache" +) + +// CacheableProvider is a provider that caches the result of another provider. +type CacheableProvider struct { + provider Provider + cache *cache.Cache + mu sync.Mutex + cooldown time.Duration + callcount int + limit int + lastErr error + resetTime time.Time +} + +// NewCacheableProvider creates a new CacheableProvider. +func NewCacheableProvider(provider Provider, cooldown time.Duration, limit int) Provider { + return &CacheableProvider{ + provider: provider, + // TODO make cache configurable + cache: cache.New(cache.NoExpiration, cache.NoExpiration), + cooldown: cooldown, + limit: limit, + } +} + +func (p *CacheableProvider) Retrieve(ctx context.Context, key string) (string, error) { + // Check if the key is in the cache. + if value, found := p.cache.Get(key); found { + return value.(string), nil + } + p.mu.Lock() + defer p.mu.Unlock() + // Check if the function is currently rate-limited + if time.Now().Before(p.resetTime) { + return "", fmt.Errorf("rate limited, last error: %w", p.lastErr) + } + + // Reset count if past cooldown period + if p.callcount >= p.limit { + p.callcount = 0 + } + p.callcount++ + + v, err := p.provider.Retrieve(ctx, key) + if err != nil { + p.lastErr = err + // If the call limit is reached, set the cooldown period + if p.callcount >= p.limit { + p.resetTime = time.Now().Add(p.cooldown) + } + return "", err + } + p.callcount = 0 + p.cache.Set(key, v, cache.NoExpiration) + return v, nil +} diff --git a/processor/schemaprocessor/internal/translation/cacheable_provider_test.go b/processor/schemaprocessor/internal/translation/cacheable_provider_test.go new file mode 100644 index 000000000000..ef2b386dfa09 --- /dev/null +++ b/processor/schemaprocessor/internal/translation/cacheable_provider_test.go @@ -0,0 +1,67 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package translation + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type firstErrorProvider struct { + cnt int + mu sync.Mutex +} + +func (p *firstErrorProvider) Retrieve(_ context.Context, key string) (string, error) { + p.mu.Lock() + defer p.mu.Unlock() + p.cnt++ + if p.cnt == 1 { + return "", fmt.Errorf("first error") + } + p.cnt = 0 + return key, nil +} + +func TestCacheableProvider(t *testing.T) { + tests := []struct { + name string + limit int + retry int + }{ + { + name: "limit 1", + limit: 1, + retry: 2, + }, + { + name: "limit 0", + limit: 0, + retry: 3, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create a new cacheable provider + provider := NewCacheableProvider(&firstErrorProvider{}, time.Nanosecond, tt.limit) + + var p string + var err error + for i := 0; i < tt.retry; i++ { + p, err = provider.Retrieve(context.Background(), "key") + if err == nil { + break + } + require.Error(t, err, "first error") + } + require.NoError(t, err, "no error") + require.Equal(t, "key", p, "value is key") + }) + } +} diff --git a/processor/schemaprocessor/internal/translation/manager.go b/processor/schemaprocessor/internal/translation/manager.go index 7220bb247a5b..d62387ca19f0 100644 --- a/processor/schemaprocessor/internal/translation/manager.go +++ b/processor/schemaprocessor/internal/translation/manager.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "sync" + "time" "go.uber.org/zap" ) @@ -18,15 +19,9 @@ var errNilValueProvided = errors.New("nil value provided") // with the most recent version that are requested. type Manager interface { // RequestTranslation will provide either the defined Translation - // if it is a known target, or, return a noop variation. - // In the event that a matched Translation, on a missed version - // there is a potential to block during this process. - // Otherwise, the translation will allow concurrent reads. - RequestTranslation(ctx context.Context, schemaURL string) Translation - - // SetProviders will update the list of providers used by the manager - // to look up schemaURLs - SetProviders(providers ...Provider) error + // if it is a known target and able to retrieve from Provider + // otherwise it will return an error. + RequestTranslation(ctx context.Context, schemaURL string) (Translation, error) } type manager struct { @@ -42,11 +37,14 @@ var _ Manager = (*manager)(nil) // NewManager creates a manager that will allow for management // of schema -func NewManager(targetSchemaURLS []string, log *zap.Logger) (Manager, error) { +func NewManager(targetSchemaURLS []string, log *zap.Logger, providers ...Provider) (Manager, error) { if log == nil { return nil, fmt.Errorf("logger: %w", errNilValueProvided) } + if len(providers) == 0 { + return nil, fmt.Errorf("zero providers set: %w", errNilValueProvided) + } match := make(map[string]*Version, len(targetSchemaURLS)) for _, target := range targetSchemaURLS { family, version, err := GetFamilyAndVersion(target) @@ -56,20 +54,28 @@ func NewManager(targetSchemaURLS []string, log *zap.Logger) (Manager, error) { match[family] = version } + // wrap provider with cacheable provider + var prs []Provider + for _, p := range providers { + // TODO make cache configurable + prs = append(prs, NewCacheableProvider(p, 5*time.Minute, 5)) + } + return &manager{ log: log, match: match, translatorMap: make(map[string]*translator), + providers: prs, }, nil } -func (m *manager) RequestTranslation(ctx context.Context, schemaURL string) Translation { +func (m *manager) RequestTranslation(ctx context.Context, schemaURL string) (Translation, error) { family, version, err := GetFamilyAndVersion(schemaURL) if err != nil { m.log.Error("No valid schema url was provided, using no-op schema", zap.String("schema-url", schemaURL), ) - return nopTranslation{} + return nil, err } targetTranslation, match := m.match[family] @@ -77,7 +83,7 @@ func (m *manager) RequestTranslation(ctx context.Context, schemaURL string) Tran m.log.Warn("Not a known targetTranslation, providing Nop Translation", zap.String("schema-url", schemaURL), ) - return nopTranslation{} + return nil, err } m.rw.RLock() @@ -85,7 +91,7 @@ func (m *manager) RequestTranslation(ctx context.Context, schemaURL string) Tran m.rw.RUnlock() if exists && t.SupportedVersion(version) { - return t + return t, nil } for _, p := range m.providers { @@ -95,7 +101,9 @@ func (m *manager) RequestTranslation(ctx context.Context, schemaURL string) Tran zap.Error(err), zap.String("schemaURL", schemaURL), ) - return nopTranslation{} + // If we fail to retrieve the schema, we should + // try the next provider + continue } t, err := newTranslator( m.log.Named("translator").With( @@ -112,18 +120,8 @@ func (m *manager) RequestTranslation(ctx context.Context, schemaURL string) Tran m.rw.Lock() m.translatorMap[family] = t m.rw.Unlock() - return t + return t, nil } - return nopTranslation{} -} - -func (m *manager) SetProviders(providers ...Provider) error { - if len(providers) == 0 { - return fmt.Errorf("zero providers set: %w", errNilValueProvided) - } - m.rw.Lock() - defer m.rw.Unlock() - m.providers = append(m.providers[:0], providers...) - return nil + return nil, fmt.Errorf("failed to retrieve translation for %s", schemaURL) } diff --git a/processor/schemaprocessor/internal/translation/manager_test.go b/processor/schemaprocessor/internal/translation/manager_test.go index a6326d5a7679..68173a9dc9a4 100644 --- a/processor/schemaprocessor/internal/translation/manager_test.go +++ b/processor/schemaprocessor/internal/translation/manager_test.go @@ -38,31 +38,50 @@ func TestRequestTranslation(t *testing.T) { m, err := NewManager( []string{schemaURL}, zaptest.NewLogger(t), + NewHTTPProvider(s.Client()), ) require.NoError(t, err, "Must not error when created manager") - require.NoError(t, m.SetProviders(NewHTTPProvider(s.Client())), "Must have no issues trying to set providers") - nop, ok := m.RequestTranslation(context.Background(), "/not/a/valid/schema/URL").(nopTranslation) - require.True(t, ok, "Must return a NoopTranslation if no valid schema URL is provided") - require.NotNil(t, nop, "Must have a valid translation") + tr, err := m.RequestTranslation(context.Background(), "/not/a/valid/schema/URL") + assert.Error(t, err, "Must error when requesting an invalid schema URL") + assert.Nil(t, tr, "Must not return a translation") - tn, ok := m.RequestTranslation(context.Background(), schemaURL).(*translator) - require.True(t, ok, "Can cast to the concrete type") - require.NotNil(t, tn, "Must have a valid translation") + tn, err := m.RequestTranslation(context.Background(), schemaURL) + require.NoError(t, err, "Must not error when requesting a valid schema URL") + require.NotNil(t, tn, "Must return a translation") assert.True(t, tn.SupportedVersion(&Version{1, 0, 0}), "Must have the version listed as supported") + trs, ok := tn.(*translator) + require.True(t, ok, "Can cast to the concrete type") count := 0 prevRev := &Version{1, 0, 0} - it, status := tn.iterator(prevRev) + it, status := trs.iterator(prevRev) assert.Equal(t, Update, status, "Must return a status of update") for currRev, more := it(); more; currRev, more = it() { assert.True(t, prevRev.LessThan(currRev.Version())) prevRev = currRev.Version() count++ } +} - tn, ok = m.RequestTranslation(context.Background(), schemaURL).(*translator) - require.True(t, ok, "Can cast to the concrete type") - require.NotNil(t, tn, "Must have a valid translation") +type errorProvider struct{} + +func (p *errorProvider) Retrieve(_ context.Context, _ string) (string, error) { + return "", fmt.Errorf("error") +} + +func TestManagerError(t *testing.T) { + t.Parallel() + + m, err := NewManager( + []string{"http://localhost/1.1.0"}, + zaptest.NewLogger(t), + &errorProvider{}, + ) + require.NoError(t, err, "Must not error when created manager") + + tr, err := m.RequestTranslation(context.Background(), "http://localhost/1.1.0") + assert.Error(t, err, "Must error when provider errors") + assert.Nil(t, tr, "Must not return a translation") } From 33b4c5527230ef82be17002bc9b4ee9a757ff184 Mon Sep 17 00:00:00 2001 From: Dinesh Gurumurthy Date: Thu, 27 Feb 2025 11:27:40 +0000 Subject: [PATCH 09/17] reorder the cache lookup --- .../internal/translation/cacheable_provider.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/processor/schemaprocessor/internal/translation/cacheable_provider.go b/processor/schemaprocessor/internal/translation/cacheable_provider.go index aa38e8340532..7f5a7fc40bd4 100644 --- a/processor/schemaprocessor/internal/translation/cacheable_provider.go +++ b/processor/schemaprocessor/internal/translation/cacheable_provider.go @@ -36,12 +36,14 @@ func NewCacheableProvider(provider Provider, cooldown time.Duration, limit int) } func (p *CacheableProvider) Retrieve(ctx context.Context, key string) (string, error) { + p.mu.Lock() + defer p.mu.Unlock() + // Check if the key is in the cache. if value, found := p.cache.Get(key); found { return value.(string), nil } - p.mu.Lock() - defer p.mu.Unlock() + // Check if the function is currently rate-limited if time.Now().Before(p.resetTime) { return "", fmt.Errorf("rate limited, last error: %w", p.lastErr) From eb57824211bfc63ca982641b72fffa3d774076be Mon Sep 17 00:00:00 2001 From: Dinesh Gurumurthy Date: Thu, 27 Feb 2025 11:33:18 +0000 Subject: [PATCH 10/17] update the cache lookup --- .../internal/translation/cacheable_provider.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/processor/schemaprocessor/internal/translation/cacheable_provider.go b/processor/schemaprocessor/internal/translation/cacheable_provider.go index 7f5a7fc40bd4..986d1fac64d4 100644 --- a/processor/schemaprocessor/internal/translation/cacheable_provider.go +++ b/processor/schemaprocessor/internal/translation/cacheable_provider.go @@ -36,10 +36,15 @@ func NewCacheableProvider(provider Provider, cooldown time.Duration, limit int) } func (p *CacheableProvider) Retrieve(ctx context.Context, key string) (string, error) { + // Check if the key is in the cache. + if value, found := p.cache.Get(key); found { + return value.(string), nil + } + p.mu.Lock() defer p.mu.Unlock() - // Check if the key is in the cache. + // Check if the key is in the cache again in case it was added while waiting for the lock. if value, found := p.cache.Get(key); found { return value.(string), nil } From 25ad0574759957fd421734d33fe4632d61776a81 Mon Sep 17 00:00:00 2001 From: Dinesh Gurumurthy Date: Thu, 27 Feb 2025 11:40:49 +0000 Subject: [PATCH 11/17] add import --- .../schemaprocessor/internal/translation/cacheable_provider.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/schemaprocessor/internal/translation/cacheable_provider.go b/processor/schemaprocessor/internal/translation/cacheable_provider.go index 986d1fac64d4..9e4c33c9fee3 100644 --- a/processor/schemaprocessor/internal/translation/cacheable_provider.go +++ b/processor/schemaprocessor/internal/translation/cacheable_provider.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package translation +package translation // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/translation" import ( "context" From 2a7306cbd806d9f2184d8fe459df0d1278459649 Mon Sep 17 00:00:00 2001 From: Dinesh Gurumurthy Date: Thu, 27 Feb 2025 14:18:45 +0000 Subject: [PATCH 12/17] add retry --- .../internal/translation/cacheable_provider_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/processor/schemaprocessor/internal/translation/cacheable_provider_test.go b/processor/schemaprocessor/internal/translation/cacheable_provider_test.go index ef2b386dfa09..aa721bd9fc92 100644 --- a/processor/schemaprocessor/internal/translation/cacheable_provider_test.go +++ b/processor/schemaprocessor/internal/translation/cacheable_provider_test.go @@ -38,12 +38,12 @@ func TestCacheableProvider(t *testing.T) { { name: "limit 1", limit: 1, - retry: 2, + retry: 4, }, { name: "limit 0", limit: 0, - retry: 3, + retry: 4, }, } for _, tt := range tests { From 80aad6ed4e13172a2fb1202328d25d1d975ffb3b Mon Sep 17 00:00:00 2001 From: Dinesh Gurumurthy Date: Thu, 27 Feb 2025 14:31:13 +0000 Subject: [PATCH 13/17] make time zero --- .../internal/translation/cacheable_provider_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/schemaprocessor/internal/translation/cacheable_provider_test.go b/processor/schemaprocessor/internal/translation/cacheable_provider_test.go index aa721bd9fc92..0adf6b6eae36 100644 --- a/processor/schemaprocessor/internal/translation/cacheable_provider_test.go +++ b/processor/schemaprocessor/internal/translation/cacheable_provider_test.go @@ -49,7 +49,7 @@ func TestCacheableProvider(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // Create a new cacheable provider - provider := NewCacheableProvider(&firstErrorProvider{}, time.Nanosecond, tt.limit) + provider := NewCacheableProvider(&firstErrorProvider{},0* time.Nanosecond, tt.limit) var p string var err error From 062905ca7e73fe1a0c94655487e7203da92830a4 Mon Sep 17 00:00:00 2001 From: Dinesh Gurumurthy Date: Thu, 27 Feb 2025 15:00:34 +0000 Subject: [PATCH 14/17] lint files --- .../internal/translation/cacheable_provider_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/schemaprocessor/internal/translation/cacheable_provider_test.go b/processor/schemaprocessor/internal/translation/cacheable_provider_test.go index 0adf6b6eae36..1883582dcf84 100644 --- a/processor/schemaprocessor/internal/translation/cacheable_provider_test.go +++ b/processor/schemaprocessor/internal/translation/cacheable_provider_test.go @@ -49,7 +49,7 @@ func TestCacheableProvider(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // Create a new cacheable provider - provider := NewCacheableProvider(&firstErrorProvider{},0* time.Nanosecond, tt.limit) + provider := NewCacheableProvider(&firstErrorProvider{}, 0*time.Nanosecond, tt.limit) var p string var err error From e90c0b17d665a74f3ed4438f29fbf170265d9818 Mon Sep 17 00:00:00 2001 From: Dinesh Gurumurthy Date: Sun, 2 Mar 2025 15:43:58 -0500 Subject: [PATCH 15/17] Add provider interface --- .../internal/translation/manager.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/processor/schemaprocessor/internal/translation/manager.go b/processor/schemaprocessor/internal/translation/manager.go index d62387ca19f0..6b109e6a3a2b 100644 --- a/processor/schemaprocessor/internal/translation/manager.go +++ b/processor/schemaprocessor/internal/translation/manager.go @@ -22,6 +22,9 @@ type Manager interface { // if it is a known target and able to retrieve from Provider // otherwise it will return an error. RequestTranslation(ctx context.Context, schemaURL string) (Translation, error) + + // AddProvider will add a provider to the Manager + AddProvider(p Provider) } type manager struct { @@ -42,9 +45,6 @@ func NewManager(targetSchemaURLS []string, log *zap.Logger, providers ...Provide return nil, fmt.Errorf("logger: %w", errNilValueProvided) } - if len(providers) == 0 { - return nil, fmt.Errorf("zero providers set: %w", errNilValueProvided) - } match := make(map[string]*Version, len(targetSchemaURLS)) for _, target := range targetSchemaURLS { family, version, err := GetFamilyAndVersion(target) @@ -125,3 +125,15 @@ func (m *manager) RequestTranslation(ctx context.Context, schemaURL string) (Tra return nil, fmt.Errorf("failed to retrieve translation for %s", schemaURL) } + +// AddProvider will add a provider to the Manager +func (m *manager) AddProvider(p Provider) { + if p == nil { + m.log.Error("Nil provider provided, not adding to manager") + return + } + if _, ok := p.(*CacheableProvider); !ok { + p = NewCacheableProvider(p, 5*time.Minute, 5) + } + m.providers = append(m.providers, p) +} From 486ce7537adc69ac5dfd7c9a4881478f37f85541 Mon Sep 17 00:00:00 2001 From: Dinesh Gurumurthy Date: Tue, 4 Mar 2025 10:42:47 -0500 Subject: [PATCH 16/17] PR feedback - add comments --- .../translation/cacheable_provider.go | 21 +++++++++++++------ .../internal/translation/manager.go | 8 +++---- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/processor/schemaprocessor/internal/translation/cacheable_provider.go b/processor/schemaprocessor/internal/translation/cacheable_provider.go index 9e4c33c9fee3..621707eb77e5 100644 --- a/processor/schemaprocessor/internal/translation/cacheable_provider.go +++ b/processor/schemaprocessor/internal/translation/cacheable_provider.go @@ -13,18 +13,27 @@ import ( ) // CacheableProvider is a provider that caches the result of another provider. +// If the provider returns an error, the CacheableProvider will retry the call till limit +// If the provider returns an error multiple times in a row, the CacheableProvider will rate limit the calls. type CacheableProvider struct { - provider Provider - cache *cache.Cache - mu sync.Mutex - cooldown time.Duration + provider Provider + cache *cache.Cache + mu sync.Mutex + // cooldown is the time to wait before retrying a failed call. + cooldown time.Duration + // callcount tracks the number of failed calls in a row. callcount int - limit int - lastErr error + // limit is the number of failed calls to allow before setting the cooldown period. + limit int + // lastErr is the last error returned by the provider + lastErr error + // resetTime is the time when the rate limit will be reset resetTime time.Time } // NewCacheableProvider creates a new CacheableProvider. +// The cooldown parameter is the time to wait before retrying a failed call. +// The limit parameter is the number of failed calls to allow before setting the cooldown period. func NewCacheableProvider(provider Provider, cooldown time.Duration, limit int) Provider { return &CacheableProvider{ provider: provider, diff --git a/processor/schemaprocessor/internal/translation/manager.go b/processor/schemaprocessor/internal/translation/manager.go index 6b109e6a3a2b..a8482ffc51ad 100644 --- a/processor/schemaprocessor/internal/translation/manager.go +++ b/processor/schemaprocessor/internal/translation/manager.go @@ -72,18 +72,18 @@ func NewManager(targetSchemaURLS []string, log *zap.Logger, providers ...Provide func (m *manager) RequestTranslation(ctx context.Context, schemaURL string) (Translation, error) { family, version, err := GetFamilyAndVersion(schemaURL) if err != nil { - m.log.Error("No valid schema url was provided, using no-op schema", - zap.String("schema-url", schemaURL), + m.log.Error("No valid schema url was provided", + zap.String("schema-url", schemaURL), zap.Error(err), ) return nil, err } targetTranslation, match := m.match[family] if !match { - m.log.Warn("Not a known targetTranslation, providing Nop Translation", + m.log.Warn("Not a known target translation", zap.String("schema-url", schemaURL), ) - return nil, err + return nil, fmt.Errorf("not a known targetTranslation: %s", family) } m.rw.RLock() From 95a07cde1661d10c8e8f2ddbac37b1cf3b5d40b9 Mon Sep 17 00:00:00 2001 From: Dinesh Gurumurthy Date: Tue, 4 Mar 2025 10:48:53 -0500 Subject: [PATCH 17/17] merge main --- processor/schemaprocessor/go.mod | 1 + 1 file changed, 1 insertion(+) diff --git a/processor/schemaprocessor/go.mod b/processor/schemaprocessor/go.mod index 235aa261fe40..4c0d0a1b359a 100644 --- a/processor/schemaprocessor/go.mod +++ b/processor/schemaprocessor/go.mod @@ -5,6 +5,7 @@ go 1.23.0 require ( github.com/google/go-cmp v0.7.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.121.0 + github.com/patrickmn/go-cache v2.1.0+incompatible github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/component v1.27.0 go.opentelemetry.io/collector/component/componenttest v0.121.0