diff --git a/config/configauth/configauth.go b/config/configauth/configauth.go index 4df1fb1c2b4..62ded652078 100644 --- a/config/configauth/configauth.go +++ b/config/configauth/configauth.go @@ -10,9 +10,16 @@ import ( "context" "errors" "fmt" + "net/http" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/extension/extensionauth" + "go.opentelemetry.io/collector/internal/grpcutil" ) var ( @@ -20,6 +27,7 @@ var ( errNotHTTPClient = errors.New("requested authenticator is not a HTTP client authenticator") errNotGRPCClient = errors.New("requested authenticator is not a gRPC client authenticator") errNotServer = errors.New("requested authenticator is not a server authenticator") + errMetadataNotFound = errors.New("no request metadata found") ) // Authentication defines the auth settings for the receiver. @@ -28,8 +36,48 @@ type Authentication struct { AuthenticatorID component.ID `mapstructure:"authenticator,omitempty"` } -// GetServerAuthenticator attempts to select the appropriate extensionauth.Server from the list of extensions, -// based on the requested extension name. If an authenticator is not found, an error is returned. +// GetGRPCServerOptions attempts to select the appropriate extensionauth.Server from the list of extensions, +// based on the requested extension name and return the grpc.ServerOption to be used with the grpc.Server. +// If an authenticator is not found, an error is returned. +func (a Authentication) GetGRPCServerOptions(_ context.Context, extensions map[component.ID]component.Component) ([]grpc.ServerOption, error) { + ext, found := extensions[a.AuthenticatorID] + if !found { + return nil, fmt.Errorf("failed to resolve authenticator %q: %w", a.AuthenticatorID, errAuthenticatorNotFound) + } + + eauth, ok := ext.(extensionauth.Server) + if !ok { + return nil, errNotServer + } + + uInterceptor := func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) { + return authServerUnaryInterceptor(ctx, req, info, handler, eauth) + } + sInterceptors := func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + return authServerStreamInterceptor(srv, ss, info, handler, eauth) + } + + return []grpc.ServerOption{grpc.ChainUnaryInterceptor(uInterceptor), grpc.ChainStreamInterceptor(sInterceptors)}, nil +} + +// GetHTTPHandler attempts to select the appropriate extensionauth.Server from the list of extensions, +// based on the requested extension name and return the http.Handler to be used with the http.Server. +// If an authenticator is not found, an error is returned. +func (a Authentication) GetHTTPHandler(_ context.Context, extensions map[component.ID]component.Component, next http.Handler, reqParams []string) (http.Handler, error) { + ext, found := extensions[a.AuthenticatorID] + if !found { + return nil, fmt.Errorf("failed to resolve authenticator %q: %w", a.AuthenticatorID, errAuthenticatorNotFound) + } + + eauth, ok := ext.(extensionauth.Server) + if !ok { + return nil, errNotServer + } + + return authInterceptor(next, eauth, reqParams), nil +} + +// Deprecated: [v0.123.0] use GetGRPCServerOptions or GetHTTPServer. func (a Authentication) GetServerAuthenticator(_ context.Context, extensions map[component.ID]component.Component) (extensionauth.Server, error) { if ext, found := extensions[a.AuthenticatorID]; found { if server, ok := ext.(extensionauth.Server); ok { @@ -41,9 +89,7 @@ func (a Authentication) GetServerAuthenticator(_ context.Context, extensions map return nil, fmt.Errorf("failed to resolve authenticator %q: %w", a.AuthenticatorID, errAuthenticatorNotFound) } -// GetHTTPClientAuthenticator attempts to select the appropriate extensionauth.Client from the list of extensions, -// based on the component id of the extension. If an authenticator is not found, an error is returned. -// This should be only used by HTTP clients. +// Deprecated: [v0.123.0] use GetHTTPRoundTripper. func (a Authentication) GetHTTPClientAuthenticator(_ context.Context, extensions map[component.ID]component.Component) (extensionauth.HTTPClient, error) { if ext, found := extensions[a.AuthenticatorID]; found { if client, ok := ext.(extensionauth.HTTPClient); ok { @@ -54,9 +100,25 @@ func (a Authentication) GetHTTPClientAuthenticator(_ context.Context, extensions return nil, fmt.Errorf("failed to resolve authenticator %q: %w", a.AuthenticatorID, errAuthenticatorNotFound) } -// GetGRPCClientAuthenticator attempts to select the appropriate extensionauth.Client from the list of extensions, -// based on the component id of the extension. If an authenticator is not found, an error is returned. -// This should be only used by gRPC clients. +// GetHTTPRoundTripper attempts to select the appropriate extensionauth.Client from the list of extensions, +// based on the component id of the extension and return the http.RoundTripper to be used with the http.Client. +// If an authenticator is not found, an error is returned. This should be only used by HTTP clients. +func (a Authentication) GetHTTPRoundTripper(_ context.Context, extensions map[component.ID]component.Component, base http.RoundTripper) (http.RoundTripper, error) { + ext, found := extensions[a.AuthenticatorID] + if !found { + return nil, fmt.Errorf("failed to resolve authenticator %q: %w", a.AuthenticatorID, errAuthenticatorNotFound) + } + + // Currently only support `extensionauth.HTTPClient`. + client, ok := ext.(extensionauth.HTTPClient) + if !ok { + return nil, errNotHTTPClient + } + + return client.RoundTripper(base) +} + +// Deprecated: [v0.123.0] Use GetGRPCDialOptions. func (a Authentication) GetGRPCClientAuthenticator(_ context.Context, extensions map[component.ID]component.Component) (extensionauth.GRPCClient, error) { if ext, found := extensions[a.AuthenticatorID]; found { if client, ok := ext.(extensionauth.GRPCClient); ok { @@ -66,3 +128,74 @@ func (a Authentication) GetGRPCClientAuthenticator(_ context.Context, extensions } return nil, fmt.Errorf("failed to resolve authenticator %q: %w", a.AuthenticatorID, errAuthenticatorNotFound) } + +// GetGRPCDialOptions attempts to select the appropriate extensionauth.Client from the list of extensions, +// based on the component id of the extension and return the grpc.DialOptions to be used with grpc.ClientConn. +// If an authenticator is not found, an error is returned. This should be only used by gRPC clients. +func (a Authentication) GetGRPCDialOptions(_ context.Context, extensions map[component.ID]component.Component) ([]grpc.DialOption, error) { + ext, found := extensions[a.AuthenticatorID] + if !found { + return nil, fmt.Errorf("failed to resolve authenticator %q: %w", a.AuthenticatorID, errAuthenticatorNotFound) + } + + // Currently only support `extensionauth.GRPCClient`. + client, ok := ext.(extensionauth.GRPCClient) + if !ok { + return nil, errNotGRPCClient + } + + perRPCCredentials, err := client.PerRPCCredentials() + if err != nil { + return nil, err + } + + return []grpc.DialOption{grpc.WithPerRPCCredentials(perRPCCredentials)}, nil +} + +func authServerUnaryInterceptor(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler, eauth extensionauth.Server) (any, error) { + headers, ok := metadata.FromIncomingContext(ctx) + if !ok { + return nil, errMetadataNotFound + } + + ctx, err := eauth.Authenticate(ctx, headers) + if err != nil { + return nil, status.Error(codes.Unauthenticated, err.Error()) + } + + return handler(ctx, req) +} + +func authServerStreamInterceptor(srv any, stream grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler, eauth extensionauth.Server) error { + ctx := stream.Context() + headers, ok := metadata.FromIncomingContext(ctx) + if !ok { + return errMetadataNotFound + } + + ctx, err := eauth.Authenticate(ctx, headers) + if err != nil { + return status.Error(codes.Unauthenticated, err.Error()) + } + + return handler(srv, grpcutil.WrapServerStream(ctx, stream)) +} + +func authInterceptor(next http.Handler, eauth extensionauth.Server, requestParams []string) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + sources := r.Header + query := r.URL.Query() + for _, param := range requestParams { + if val, ok := query[param]; ok { + sources[param] = val + } + } + ctx, err := eauth.Authenticate(r.Context(), sources) + if err != nil { + http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) + return + } + + next.ServeHTTP(w, r.WithContext(ctx)) + }) +} diff --git a/config/configauth/configauth_test.go b/config/configauth/configauth_test.go index f1163828a88..00eade78568 100644 --- a/config/configauth/configauth_test.go +++ b/config/configauth/configauth_test.go @@ -5,33 +5,42 @@ package configauth import ( "context" + "net/http" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/extension/extensionauth/extensionauthtest" ) var mockID = component.MustNewID("mock") -func TestGetServer(t *testing.T) { +func TestGetHTTPHandler(t *testing.T) { testCases := []struct { - name string - authenticator extension.Extension - expected error + name string + extensions map[component.ID]component.Component + expected error }{ { - name: "obtain server authenticator", - authenticator: extensionauthtest.NewNopServer(), - expected: nil, + name: "not found", + extensions: map[component.ID]component.Component{}, + expected: errAuthenticatorNotFound, }, { - name: "not a server authenticator", - authenticator: extensionauthtest.NewNopClient(), - expected: errNotServer, + name: "not a server authenticator", + extensions: map[component.ID]component.Component{ + mockID: extensionauthtest.NewNopClient(), + }, + expected: errNotServer, + }, + { + name: "obtain server authenticator", + extensions: map[component.ID]component.Component{ + mockID: extensionauthtest.NewNopServer(), + }, + expected: nil, }, } for _, tt := range testCases { @@ -40,49 +49,88 @@ func TestGetServer(t *testing.T) { cfg := &Authentication{ AuthenticatorID: mockID, } - ext := map[component.ID]component.Component{ - mockID: tt.authenticator, - } - - authenticator, err := cfg.GetServerAuthenticator(context.Background(), ext) + handler, err := cfg.GetHTTPHandler(context.Background(), tt.extensions, nopHandler{}, []string{}) // verify if tt.expected != nil { require.ErrorIs(t, err, tt.expected) - assert.Nil(t, authenticator) } else { require.NoError(t, err) - assert.NotNil(t, authenticator) + assert.NotNil(t, handler) } }) } } -func TestGetServerFails(t *testing.T) { - cfg := &Authentication{ - AuthenticatorID: component.MustNewID("does_not_exist"), +func TestGetGRPCServerOptions(t *testing.T) { + testCases := []struct { + name string + extensions map[component.ID]component.Component + expected error + }{ + { + name: "not found", + extensions: map[component.ID]component.Component{}, + expected: errAuthenticatorNotFound, + }, + { + name: "not a server authenticator", + extensions: map[component.ID]component.Component{ + mockID: extensionauthtest.NewNopClient(), + }, + expected: errNotServer, + }, + { + name: "obtain server authenticator", + extensions: map[component.ID]component.Component{ + mockID: extensionauthtest.NewNopServer(), + }, + expected: nil, + }, } + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + // prepare + cfg := &Authentication{ + AuthenticatorID: mockID, + } - authenticator, err := cfg.GetServerAuthenticator(context.Background(), map[component.ID]component.Component{}) - require.ErrorIs(t, err, errAuthenticatorNotFound) - assert.Nil(t, authenticator) + opts, err := cfg.GetGRPCServerOptions(context.Background(), tt.extensions) + // verify + if tt.expected != nil { + require.ErrorIs(t, err, tt.expected) + } else { + require.NoError(t, err) + assert.Len(t, opts, 2) + } + }) + } } -func TestGetHTTPClient(t *testing.T) { +func TestGetHTTPRoundTripper(t *testing.T) { testCases := []struct { - name string - authenticator extension.Extension - expected error + name string + extensions map[component.ID]component.Component + expected error }{ { - name: "obtain client authenticator", - authenticator: extensionauthtest.NewNopClient(), - expected: nil, + name: "not found", + extensions: map[component.ID]component.Component{}, + expected: errAuthenticatorNotFound, }, { - name: "not a client authenticator", - authenticator: extensionauthtest.NewNopServer(), - expected: errNotHTTPClient, + name: "not a client authenticator", + extensions: map[component.ID]component.Component{ + mockID: extensionauthtest.NewNopServer(), + }, + expected: errNotHTTPClient, + }, + { + name: "obtain client authenticator", + extensions: map[component.ID]component.Component{ + mockID: extensionauthtest.NewNopClient(), + }, + expected: nil, }, } for _, tt := range testCases { @@ -91,29 +139,66 @@ func TestGetHTTPClient(t *testing.T) { cfg := &Authentication{ AuthenticatorID: mockID, } - ext := map[component.ID]component.Component{ - mockID: tt.authenticator, - } - - authenticator, err := cfg.GetHTTPClientAuthenticator(context.Background(), ext) - + rt, err := cfg.GetHTTPRoundTripper(context.Background(), tt.extensions, nopRoundTripper{}) // verify if tt.expected != nil { require.ErrorIs(t, err, tt.expected) - assert.Nil(t, authenticator) } else { require.NoError(t, err) - assert.NotNil(t, authenticator) + assert.NotNil(t, rt) } }) } } -func TestGetGRPCClientFails(t *testing.T) { - cfg := &Authentication{ - AuthenticatorID: component.MustNewID("does_not_exist"), +func TestGetGRPCDialOptions(t *testing.T) { + testCases := []struct { + name string + extensions map[component.ID]component.Component + expected error + }{ + { + name: "not found", + extensions: map[component.ID]component.Component{}, + expected: errAuthenticatorNotFound, + }, + { + name: "not a client authenticator", + extensions: map[component.ID]component.Component{ + mockID: extensionauthtest.NewNopServer(), + }, + expected: errNotGRPCClient, + }, + { + name: "obtain client authenticator", + extensions: map[component.ID]component.Component{ + mockID: extensionauthtest.NewNopClient(), + }, + expected: nil, + }, + } + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + // prepare + cfg := &Authentication{ + AuthenticatorID: mockID, + } + opts, err := cfg.GetGRPCDialOptions(context.Background(), tt.extensions) + // verify + if tt.expected != nil { + require.ErrorIs(t, err, tt.expected) + } else { + require.NoError(t, err) + assert.Len(t, opts, 1) + } + }) } - authenticator, err := cfg.GetGRPCClientAuthenticator(context.Background(), map[component.ID]component.Component{}) - require.ErrorIs(t, err, errAuthenticatorNotFound) - assert.Nil(t, authenticator) } + +type nopRoundTripper struct{} + +func (nopRoundTripper) RoundTrip(*http.Request) (*http.Response, error) { return &http.Response{}, nil } + +type nopHandler struct{} + +func (nopHandler) ServeHTTP(http.ResponseWriter, *http.Request) {} diff --git a/config/configauth/go.mod b/config/configauth/go.mod index d25ed774516..e05d03935af 100644 --- a/config/configauth/go.mod +++ b/config/configauth/go.mod @@ -4,11 +4,13 @@ go 1.23.0 require ( github.com/stretchr/testify v1.10.0 + go.opentelemetry.io/collector v0.122.1 go.opentelemetry.io/collector/component v1.28.1 go.opentelemetry.io/collector/extension v1.28.1 go.opentelemetry.io/collector/extension/extensionauth v0.122.1 go.opentelemetry.io/collector/extension/extensionauth/extensionauthtest v0.122.1 go.uber.org/goleak v1.3.0 + google.golang.org/grpc v1.71.0 ) require ( @@ -25,11 +27,12 @@ require ( golang.org/x/sys v0.31.0 // indirect golang.org/x/text v0.23.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect - google.golang.org/grpc v1.71.0 // indirect google.golang.org/protobuf v1.36.5 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) +replace go.opentelemetry.io/collector => ../../ + replace go.opentelemetry.io/collector/pdata => ../../pdata replace go.opentelemetry.io/collector/component => ../../component diff --git a/config/configgrpc/configgrpc.go b/config/configgrpc/configgrpc.go index 57890d037c3..f3ba1e63357 100644 --- a/config/configgrpc/configgrpc.go +++ b/config/configgrpc/configgrpc.go @@ -18,14 +18,14 @@ import ( "go.opentelemetry.io/otel" "google.golang.org/grpc" "google.golang.org/grpc/balancer" - "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" - "google.golang.org/grpc/status" + + "go.opentelemetry.io/collector/internal/grpcutil" "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" @@ -34,11 +34,8 @@ import ( "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/config/configopaque" "go.opentelemetry.io/collector/config/configtls" - "go.opentelemetry.io/collector/extension/extensionauth" ) -var errMetadataNotFound = errors.New("no request metadata found") - // KeepaliveClientConfig exposes the keepalive.ClientParameters to be used by the exporter. // Refer to the original data-structure for the meaning of each parameter: // https://godoc.org/google.golang.org/grpc/keepalive#ClientParameters @@ -322,16 +319,12 @@ func (gcs *ClientConfig) getGrpcDialOptions( return nil, errors.New("no extensions configuration available") } - grpcAuthenticator, cerr := gcs.Auth.GetGRPCClientAuthenticator(ctx, host.GetExtensions()) + dOpts, cerr := gcs.Auth.GetGRPCDialOptions(ctx, host.GetExtensions()) if cerr != nil { return nil, cerr } - perRPCCredentials, perr := grpcAuthenticator.PerRPCCredentials() - if perr != nil { - return nil, err - } - opts = append(opts, grpc.WithPerRPCCredentials(perRPCCredentials)) + opts = append(opts, dOpts...) } if gcs.BalancerName != "" { @@ -479,31 +472,25 @@ func (gss *ServerConfig) getGrpcServerOptions( var sInterceptors []grpc.StreamServerInterceptor if gss.Auth != nil { - authenticator, err := gss.Auth.GetServerAuthenticator(context.Background(), host.GetExtensions()) + sOpts, err := gss.Auth.GetGRPCServerOptions(context.Background(), host.GetExtensions()) if err != nil { return nil, err } - uInterceptors = append(uInterceptors, func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) { - return authUnaryServerInterceptor(ctx, req, info, handler, authenticator) - }) - sInterceptors = append(sInterceptors, func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - return authStreamServerInterceptor(srv, ss, info, handler, authenticator) - }) + opts = append(opts, sOpts...) } - otelOpts := []otelgrpc.Option{ + // Enable OpenTelemetry observability plugin. + opts = append(opts, grpc.StatsHandler(otelgrpc.NewServerHandler( otelgrpc.WithTracerProvider(settings.TracerProvider), otelgrpc.WithPropagators(otel.GetTextMapPropagator()), otelgrpc.WithMeterProvider(settings.MeterProvider), - } - - // Enable OpenTelemetry observability plugin. + ))) uInterceptors = append(uInterceptors, enhanceWithClientInformation(gss.IncludeMetadata)) sInterceptors = append(sInterceptors, enhanceStreamWithClientInformation(gss.IncludeMetadata)) - opts = append(opts, grpc.StatsHandler(otelgrpc.NewServerHandler(otelOpts...)), grpc.ChainUnaryInterceptor(uInterceptors...), grpc.ChainStreamInterceptor(sInterceptors...)) + opts = append(opts, grpc.ChainUnaryInterceptor(uInterceptors...), grpc.ChainStreamInterceptor(sInterceptors...)) for _, opt := range extraOpts { if wrapper, ok := opt.(grpcServerOptionWrapper); ok { @@ -538,7 +525,7 @@ func enhanceWithClientInformation(includeMetadata bool) func(ctx context.Context func enhanceStreamWithClientInformation(includeMetadata bool) func(srv any, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error { return func(srv any, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - return handler(srv, wrapServerStream(contextWithClient(ss.Context(), includeMetadata), ss)) + return handler(srv, grpcutil.WrapServerStream(contextWithClient(ss.Context(), includeMetadata), ss)) } } @@ -560,32 +547,3 @@ func contextWithClient(ctx context.Context, includeMetadata bool) context.Contex } return client.NewContext(ctx, cl) } - -func authUnaryServerInterceptor(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler, server extensionauth.Server) (any, error) { - headers, ok := metadata.FromIncomingContext(ctx) - if !ok { - return nil, errMetadataNotFound - } - - ctx, err := server.Authenticate(ctx, headers) - if err != nil { - return nil, status.Error(codes.Unauthenticated, err.Error()) - } - - return handler(ctx, req) -} - -func authStreamServerInterceptor(srv any, stream grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler, server extensionauth.Server) error { - ctx := stream.Context() - headers, ok := metadata.FromIncomingContext(ctx) - if !ok { - return errMetadataNotFound - } - - ctx, err := server.Authenticate(ctx, headers) - if err != nil { - return status.Error(codes.Unauthenticated, err.Error()) - } - - return handler(srv, wrapServerStream(ctx, stream)) -} diff --git a/config/configgrpc/go.mod b/config/configgrpc/go.mod index db3090dc077..a5dec526c15 100644 --- a/config/configgrpc/go.mod +++ b/config/configgrpc/go.mod @@ -5,6 +5,7 @@ go 1.23.0 require ( github.com/mostynb/go-grpc-compression v1.2.3 github.com/stretchr/testify v1.10.0 + go.opentelemetry.io/collector v0.122.1 go.opentelemetry.io/collector/client v1.28.1 go.opentelemetry.io/collector/component v1.28.1 go.opentelemetry.io/collector/component/componenttest v0.122.1 @@ -53,6 +54,8 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect ) +replace go.opentelemetry.io/collector => ../../ + replace go.opentelemetry.io/collector/client => ../../client replace go.opentelemetry.io/collector/config/configauth => ../configauth diff --git a/config/confighttp/confighttp.go b/config/confighttp/confighttp.go index 06dd9daa560..b44a7027109 100644 --- a/config/confighttp/confighttp.go +++ b/config/confighttp/confighttp.go @@ -29,7 +29,6 @@ import ( "go.opentelemetry.io/collector/config/confighttp/internal" "go.opentelemetry.io/collector/config/configopaque" "go.opentelemetry.io/collector/config/configtls" - "go.opentelemetry.io/collector/extension/extensionauth" ) const ( @@ -203,15 +202,11 @@ func (hcs *ClientConfig) ToClient(ctx context.Context, host component.Host, sett return nil, errors.New("extensions configuration not found") } - httpCustomAuthRoundTripper, aerr := hcs.Auth.GetHTTPClientAuthenticator(ctx, ext) + var aerr error + clientTransport, aerr = hcs.Auth.GetHTTPRoundTripper(ctx, ext, clientTransport) if aerr != nil { return nil, aerr } - - clientTransport, err = httpCustomAuthRoundTripper.RoundTripper(clientTransport) - if err != nil { - return nil, err - } } if len(hcs.Headers) > 0 { @@ -436,12 +431,11 @@ func (hss *ServerConfig) ToServer(_ context.Context, host component.Host, settin } if hss.Auth != nil { - server, err := hss.Auth.GetServerAuthenticator(context.Background(), host.GetExtensions()) - if err != nil { - return nil, err + var aErr error + handler, aErr = hss.Auth.GetHTTPHandler(context.Background(), host.GetExtensions(), handler, hss.Auth.RequestParameters) + if aErr != nil { + return nil, aErr } - - handler = authInterceptor(handler, server, hss.Auth.RequestParameters) } if hss.CORS != nil && len(hss.CORS.AllowedOrigins) > 0 { @@ -537,25 +531,6 @@ func NewDefaultCORSConfig() *CORSConfig { return &CORSConfig{} } -func authInterceptor(next http.Handler, server extensionauth.Server, requestParams []string) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - sources := r.Header - query := r.URL.Query() - for _, param := range requestParams { - if val, ok := query[param]; ok { - sources[param] = val - } - } - ctx, err := server.Authenticate(r.Context(), sources) - if err != nil { - http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) - return - } - - next.ServeHTTP(w, r.WithContext(ctx)) - }) -} - func maxRequestBodySizeInterceptor(next http.Handler, maxRecvSize int64) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { r.Body = http.MaxBytesReader(w, r.Body, maxRecvSize) diff --git a/config/confighttp/go.mod b/config/confighttp/go.mod index 7f41b53e9f3..a24918a7a86 100644 --- a/config/confighttp/go.mod +++ b/config/confighttp/go.mod @@ -35,6 +35,7 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/collector v0.122.1 // indirect go.opentelemetry.io/collector/pdata v1.28.1 // indirect go.opentelemetry.io/otel/metric v1.35.0 // indirect go.opentelemetry.io/otel/sdk v1.35.0 // indirect @@ -72,3 +73,5 @@ replace go.opentelemetry.io/collector/consumer => ../../consumer replace go.opentelemetry.io/collector/client => ../../client replace go.opentelemetry.io/collector/extension/extensionauth/extensionauthtest => ../../extension/extensionauth/extensionauthtest + +replace go.opentelemetry.io/collector => ../.. diff --git a/config/confighttp/xconfighttp/go.mod b/config/confighttp/xconfighttp/go.mod index dca1263317c..de6b3e807be 100644 --- a/config/confighttp/xconfighttp/go.mod +++ b/config/confighttp/xconfighttp/go.mod @@ -25,6 +25,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rs/cors v1.11.1 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/collector v0.122.1 // indirect go.opentelemetry.io/collector/client v1.28.1 // indirect go.opentelemetry.io/collector/component v1.28.1 // indirect go.opentelemetry.io/collector/config/configauth v0.122.1 // indirect @@ -72,3 +73,5 @@ replace go.opentelemetry.io/collector/extension => ../../../extension replace go.opentelemetry.io/collector/config/configtls => ../../configtls replace go.opentelemetry.io/collector/config/configcompression => ../../configcompression + +replace go.opentelemetry.io/collector => ../../.. diff --git a/go.mod b/go.mod index 76b54f66414..3062e3a0836 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,9 @@ require ( github.com/kr/pretty v0.3.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect + golang.org/x/net v0.34.0 // indirect golang.org/x/sys v0.29.0 // indirect + golang.org/x/text v0.21.0 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 8107e98e9db..08d83904ae9 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,16 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= @@ -20,6 +26,18 @@ github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= +go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= +go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= +go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE= +go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A= +go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU= +go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk= +go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w= +go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k= +go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= diff --git a/config/configgrpc/wrappedstream.go b/internal/grpcutil/wrappedstream.go similarity index 78% rename from config/configgrpc/wrappedstream.go rename to internal/grpcutil/wrappedstream.go index b802b78e1ff..2e566e3bd67 100644 --- a/config/configgrpc/wrappedstream.go +++ b/internal/grpcutil/wrappedstream.go @@ -2,7 +2,7 @@ // Copyright 2016 Michal Witkowski. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -package configgrpc // import "go.opentelemetry.io/collector/config/configgrpc" +package grpcutil // import "go.opentelemetry.io/collector/config/configgrpc" import ( "context" @@ -24,8 +24,8 @@ func (w *wrappedServerStream) Context() context.Context { return w.wrappedCtx } -// wrapServerStream returns a ServerStream with the new context. -func wrapServerStream(wrappedCtx context.Context, stream grpc.ServerStream) *wrappedServerStream { +// WrapServerStream returns a ServerStream with the new context. +func WrapServerStream(wrappedCtx context.Context, stream grpc.ServerStream) grpc.ServerStream { if existing, ok := stream.(*wrappedServerStream); ok { existing.wrappedCtx = wrappedCtx return existing diff --git a/config/configgrpc/wrappedstream_test.go b/internal/grpcutil/wrappedstream_test.go similarity index 75% rename from config/configgrpc/wrappedstream_test.go rename to internal/grpcutil/wrappedstream_test.go index 36a0dbfd3fa..69d4a880eea 100644 --- a/config/configgrpc/wrappedstream_test.go +++ b/internal/grpcutil/wrappedstream_test.go @@ -2,7 +2,7 @@ // Copyright 2016 Michal Witkowski. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -package configgrpc // import "go.opentelemetry.io/collector/internal/middleware" +package grpcutil // import "go.opentelemetry.io/collector/internal/middleware" import ( "context" @@ -23,16 +23,16 @@ func TestWrapServerStream(t *testing.T) { ctx := context.WithValue(context.TODO(), oneCtxKey, 1) fake := &fakeServerStream{ctx: ctx} assert.NotNil(t, fake.Context().Value(oneCtxKey), "values from fake must propagate to wrapper") - wrapped := wrapServerStream(context.WithValue(fake.Context(), otherCtxKey, 2), fake) + wrapped := WrapServerStream(context.WithValue(fake.Context(), otherCtxKey, 2), fake) assert.NotNil(t, wrapped.Context().Value(oneCtxKey), "values from wrapper must be set") assert.NotNil(t, wrapped.Context().Value(otherCtxKey), "values from wrapper must be set") } func TestDoubleWrapping(t *testing.T) { fake := &fakeServerStream{ctx: context.Background()} - wrapped := wrapServerStream(fake.Context(), fake) - assert.Same(t, wrapped, wrapServerStream(wrapped.Context(), wrapped)) // should be noop - assert.Equal(t, fake, wrapped.ServerStream) + wrapped := WrapServerStream(fake.Context(), fake) + assert.Same(t, wrapped, WrapServerStream(wrapped.Context(), wrapped)) // should be noop + assert.Equal(t, fake, wrapped.(*wrappedServerStream).ServerStream) } type fakeServerStream struct {