Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] Schema Processor Revamp [Part 4] - Manager/Provider #37402

Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
96a2363
split out manager provider
ankitpatel96 Jan 22, 2025
5fe87d3
remove nolint
ankitpatel96 Jan 22, 2025
b5b9477
add dinesh as codeowner
ankitpatel96 Feb 5, 2025
ef63c0b
go generate
ankitpatel96 Feb 5, 2025
2dc1d4f
Merge remote-tracking branch 'origin/main' into ankit-schema-processo…
dineshg13 Feb 21, 2025
e160c06
PR feedback
dineshg13 Feb 23, 2025
5866a02
Merge remote-tracking branch 'origin/main' into ankit-schema-processo…
dineshg13 Feb 23, 2025
6292d70
ignore error
dineshg13 Feb 23, 2025
8538020
Update codeowners
dineshg13 Feb 23, 2025
b504fef
Merge remote-tracking branch 'origin/main' into ankit-schema-processo…
dineshg13 Feb 26, 2025
2410bb3
PR feedback - let manager return an error
dineshg13 Feb 27, 2025
33b4c55
reorder the cache lookup
dineshg13 Feb 27, 2025
eb57824
update the cache lookup
dineshg13 Feb 27, 2025
25ad057
add import
dineshg13 Feb 27, 2025
f8b6b18
Merge remote-tracking branch 'origin/main' into ankit-schema-processo…
dineshg13 Feb 27, 2025
2a7306c
add retry
dineshg13 Feb 27, 2025
80aad6e
make time zero
dineshg13 Feb 27, 2025
062905c
lint files
dineshg13 Feb 27, 2025
50deae7
Merge branch 'main' into ankit-schema-processor-4-manager-provider
dineshg13 Feb 27, 2025
e90c0b1
Add provider interface
dineshg13 Mar 2, 2025
15a4e0c
Merge branch 'main' into ankit-schema-processor-4-manager-provider
dineshg13 Mar 2, 2025
3b6a70e
Merge branch 'main' into ankit-schema-processor-4-manager-provider
dineshg13 Mar 3, 2025
486ce75
PR feedback - add comments
dineshg13 Mar 4, 2025
88f5c81
Merge remote-tracking branch 'origin/main' into ankit-schema-processo…
dineshg13 Mar 4, 2025
95a07cd
merge main
dineshg13 Mar 4, 2025
e24ec5c
Merge branch 'main' into ankit-schema-processor-4-manager-provider
dineshg13 Mar 4, 2025
3012ee9
Merge remote-tracking branch 'origin/main' into ankit-schema-processo…
dineshg13 Mar 4, 2025
1d655fa
Merge branch 'ankit-schema-processor-4-manager-provider' of github.co…
dineshg13 Mar 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ processor/resourcedetectionprocessor/ @open-telemetry
processor/resourcedetectionprocessor/internal/dynatrace/ @open-telemetry/collector-contrib-approvers @bacherfl @evan-bradley
processor/resourceprocessor/ @open-telemetry/collector-contrib-approvers @dmitryax
processor/routingprocessor/ @open-telemetry/collector-contrib-approvers @jpkrohling
processor/schemaprocessor/ @open-telemetry/collector-contrib-approvers @MovieStoreGuy @ankitpatel96
processor/schemaprocessor/ @open-telemetry/collector-contrib-approvers @MovieStoreGuy @ankitpatel96 @dineshg13
processor/spanprocessor/ @open-telemetry/collector-contrib-approvers @boostchicken
processor/sumologicprocessor/ @open-telemetry/collector-contrib-approvers @rnishtala-sumo @chan-tim-sumo @echlebek
processor/tailsamplingprocessor/ @open-telemetry/collector-contrib-approvers @jpkrohling @portertech
Expand Down
2 changes: 1 addition & 1 deletion processor/schemaprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
| Stability | [development]: traces, metrics, logs |
| Distributions | [] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Fschema%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Fschema) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Fschema%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Fschema) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@MovieStoreGuy](https://www.github.com/MovieStoreGuy), [@ankitpatel96](https://www.github.com/ankitpatel96) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@MovieStoreGuy](https://www.github.com/MovieStoreGuy), [@ankitpatel96](https://www.github.com/ankitpatel96), [@dineshg13](https://www.github.com/dineshg13) |

[development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development
<!-- end autogenerated section -->
Expand Down
129 changes: 129 additions & 0 deletions processor/schemaprocessor/internal/translation/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package translation // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/translation"

import (
"context"
"errors"
"fmt"
"sync"

"go.uber.org/zap"
)

var errNilValueProvided = errors.New("nil value provided")

// Manager is responsible for ensuring that schemas are kept up to date
// with the most recent version that are requested.
type Manager interface {
// RequestTranslation will provide either the defined Translation
// if it is a known target, or, return a noop variation.
// In the event that a matched Translation, on a missed version
// there is a potential to block during this process.
// Otherwise, the translation will allow concurrent reads.
RequestTranslation(ctx context.Context, schemaURL string) Translation

// SetProviders will update the list of providers used by the manager
// to look up schemaURLs
SetProviders(providers ...Provider) error
}

type manager struct {
log *zap.Logger

rw sync.RWMutex
providers []Provider
match map[string]*Version
translatorMap map[string]*translator
}

var _ Manager = (*manager)(nil)

// NewManager creates a manager that will allow for management
// of schema
func NewManager(targetSchemaURLS []string, log *zap.Logger) (Manager, error) {
if log == nil {
return nil, fmt.Errorf("logger: %w", errNilValueProvided)
}

match := make(map[string]*Version, len(targetSchemaURLS))
for _, target := range targetSchemaURLS {
family, version, err := GetFamilyAndVersion(target)
if err != nil {
return nil, err
}
match[family] = version
}

return &manager{
log: log,
match: match,
translatorMap: make(map[string]*translator),
}, nil
}

func (m *manager) RequestTranslation(ctx context.Context, schemaURL string) Translation {
family, version, err := GetFamilyAndVersion(schemaURL)
if err != nil {
m.log.Error("No valid schema url was provided, using no-op schema",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The message is unclear, what does "using no-op schema" mean?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. fixed it

zap.String("schema-url", schemaURL),
)
return nopTranslation{}
}

targetTranslation, match := m.match[family]
if !match {
m.log.Warn("Not a known targetTranslation, providing Nop Translation",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The message seems incorrect, we don't return Nop Translation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch. fixed it

zap.String("schema-url", schemaURL),
)
return nopTranslation{}
}

m.rw.RLock()
t, exists := m.translatorMap[family]
m.rw.RUnlock()

if exists && t.SupportedVersion(version) {
return t
}

for _, p := range m.providers {
content, err := p.Retrieve(ctx, schemaURL)
if err != nil {
m.log.Error("Failed to lookup schemaURL",
zap.Error(err),
zap.String("schemaURL", schemaURL),
)
return nopTranslation{}
}
t, err := newTranslator(
m.log.Named("translator").With(
zap.String("family", family),
zap.Stringer("target", targetTranslation),
),
joinSchemaFamilyAndVersion(family, targetTranslation),
content,
)
if err != nil {
m.log.Error("Failed to create translator", zap.Error(err))
continue
}
m.rw.Lock()
m.translatorMap[family] = t
m.rw.Unlock()
return t
}

return nopTranslation{}
}

func (m *manager) SetProviders(providers ...Provider) error {
if len(providers) == 0 {
return fmt.Errorf("zero providers set: %w", errNilValueProvided)
}
m.rw.Lock()
defer m.rw.Unlock()
m.providers = append(m.providers[:0], providers...)
return nil
}
68 changes: 68 additions & 0 deletions processor/schemaprocessor/internal/translation/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package translation

import (
"context"
_ "embed"
"fmt"
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)

//go:embed testdata/schema.yaml
var exampleTranslation []byte

func TranslationHandler(t *testing.T) http.Handler {
assert.NotEmpty(t, exampleTranslation, "SchemaContent MUST not be empty")
return http.HandlerFunc(func(wr http.ResponseWriter, _ *http.Request) {
_, err := wr.Write(exampleTranslation)
assert.NoError(t, err, "Must not have issues writing schema content")
})
}

func TestRequestTranslation(t *testing.T) {
t.Parallel()

s := httptest.NewServer(TranslationHandler(t))
t.Cleanup(s.Close)

schemaURL := fmt.Sprintf("%s/1.1.0", s.URL)

m, err := NewManager(
[]string{schemaURL},
zaptest.NewLogger(t),
)
require.NoError(t, err, "Must not error when created manager")
require.NoError(t, m.SetProviders(NewHTTPProvider(s.Client())), "Must have no issues trying to set providers")

nop, ok := m.RequestTranslation(context.Background(), "/not/a/valid/schema/URL").(nopTranslation)
require.True(t, ok, "Must return a NoopTranslation if no valid schema URL is provided")
require.NotNil(t, nop, "Must have a valid translation")

tn, ok := m.RequestTranslation(context.Background(), schemaURL).(*translator)
require.True(t, ok, "Can cast to the concrete type")
require.NotNil(t, tn, "Must have a valid translation")

assert.True(t, tn.SupportedVersion(&Version{1, 0, 0}), "Must have the version listed as supported")

count := 0
prevRev := &Version{1, 0, 0}
it, status := tn.iterator(prevRev)
assert.Equal(t, Update, status, "Must return a status of update")
for currRev, more := it(); more; currRev, more = it() {
assert.True(t, prevRev.LessThan(currRev.Version()))
prevRev = currRev.Version()
count++
}

tn, ok = m.RequestTranslation(context.Background(), schemaURL).(*translator)
require.True(t, ok, "Can cast to the concrete type")
require.NotNil(t, tn, "Must have a valid translation")
}
63 changes: 63 additions & 0 deletions processor/schemaprocessor/internal/translation/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package translation // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/translation"

import (
"context"
"errors"
"fmt"
"io"
"net/http"
)

// Provider allows for collector extensions to be used to look up schemaURLs
type Provider interface {
// Retrieve whill check the underlying provider to see if content exists
// for the provided schemaURL, in the even that it doesn't an error is returned.
Retrieve(ctx context.Context, schemaURL string) (string, error)
}

type httpProvider struct {
client *http.Client
}

var _ Provider = (*httpProvider)(nil)

// NewHTTPProvider creates a new HTTP-based Provider.
func NewHTTPProvider(client *http.Client) Provider {
if client == nil {
client = http.DefaultClient
}
return &httpProvider{client: client}
}

func (hp *httpProvider) Retrieve(ctx context.Context, schemaURL string) (string, error) {
if schemaURL == "" {
return "", errors.New("schema URL cannot be empty")
}

req, err := http.NewRequestWithContext(ctx, http.MethodGet, schemaURL, http.NoBody)
if err != nil {
return "", fmt.Errorf("failed to create request: %w", err)
}

resp, err := hp.client.Do(req)
if err != nil {
return "", fmt.Errorf("request failed: %w", err)
}
defer func() {
_ = resp.Body.Close()
}()

if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}

data, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("failed to read response body: %w", err)
}

return string(data), nil
}
80 changes: 80 additions & 0 deletions processor/schemaprocessor/internal/translation/provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package translation

import (
"context"
"embed"
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"

"github.com/stretchr/testify/assert"
)

func TestInvalidHTTPProviderTests(t *testing.T) {
t.Parallel()

s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.RequestURI != "/1.7.0" {
w.WriteHeader(http.StatusBadRequest)
return
}
data := LoadTranslationVersion(t, "complex_changeset.yml")
_, err := io.Copy(w, strings.NewReader(data))
assert.NoError(t, err, "Must not error when trying load dataset")
}))
t.Cleanup(s.Close)

tests := []struct {
scenario string
url string
}{
{
scenario: "A failed request happens",
url: fmt.Sprint(s.URL, "/not/a/valid/path/1.7.0"),
},
{
scenario: "invalid url",
url: "unix:///localhost",
},
}

for _, tc := range tests {
t.Run(tc.scenario, func(t *testing.T) {
p := NewHTTPProvider(s.Client())
content, err := p.Retrieve(context.Background(), tc.url)
assert.Empty(t, content, "Expected to be empty")
assert.Error(t, err, "Must have errored processing request")
})
}
}

type testProvider struct {
fs *embed.FS
}

func NewTestProvider(fs *embed.FS) Provider {
return &testProvider{fs: fs}
}

func (tp testProvider) Retrieve(_ context.Context, schemaURL string) (string, error) {
parsedPath, err := url.Parse(schemaURL)
if err != nil {
return "", err
}
f, err := tp.fs.Open(parsedPath.Path[1:])
if err != nil {
return "", err
}
data, err := io.ReadAll(f)
if err != nil {
return "", err
}
return string(data), nil
}
Loading
Loading