From 2609d9aebaed3140ac6419820751903d7060ee90 Mon Sep 17 00:00:00 2001 From: Hank Freund Date: Wed, 13 Nov 2024 13:52:09 -0800 Subject: [PATCH] WIP: Implement attached clusters direct controller take 2 --- apis/containerattached/v1beta1/refs.go | 185 +++++++++ .../attachedcluster_controller.go | 369 ++++++++++++++++++ .../containerattachedcluster_mappings.go | 19 +- pkg/controller/direct/maputils.go | 17 +- pkg/controller/direct/register/register.go | 1 + 5 files changed, 588 insertions(+), 3 deletions(-) create mode 100644 pkg/controller/direct/containerattached/attachedcluster_controller.go diff --git a/apis/containerattached/v1beta1/refs.go b/apis/containerattached/v1beta1/refs.go index 14d84ba824..5f2221e73b 100644 --- a/apis/containerattached/v1beta1/refs.go +++ b/apis/containerattached/v1beta1/refs.go @@ -13,6 +13,22 @@ // limitations under the License. package v1beta1 +import ( + "context" + "fmt" + "strconv" + "strings" + + resourcemanager "cloud.google.com/go/resourcemanager/apiv3" + resourcemanagerpb "cloud.google.com/go/resourcemanager/apiv3/resourcemanagerpb" + refsv1beta1 "github.com/GoogleCloudPlatform/k8s-config-connector/apis/refs/v1beta1" + "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct" + // apierrors "k8s.io/apimachinery/pkg/api/errors" + // "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + // "k8s.io/apimachinery/pkg/runtime/schema" + // "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) type FleetProjectRef struct { /* The project of the fleet. Allowed value: The Google Cloud resource name of a `Project` resource (format: `projects/{{name}}`).*/ @@ -22,3 +38,172 @@ type FleetProjectRef struct { /* Namespace of the project resource. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ */ Namespace string `json:"namespace,omitempty"` } + +type ContainerAttachedClusterRef struct { + // A reference to an externally managed ContainerAttachedCluster resource. + // Should be in the format `projects//locations//attachedClusters/`. + External string `json:"external,omitempty"` + // The `name` of a `ContainerAttachedCluster` resource. + Name string `json:"name,omitempty"` + // The `namespace` of a `ContainerAttachedCluster` resource. + Namespace string `json:"namespace,omitempty"` + // The location where this cluster is registered. + Location string `json:"location,omitempty"` + // The parent location where this ContainerAttachedCluster resource lives. + // Should be in the format `projects//locations/`. + parent string +} + +// ResolveExternal will resolve the project ID to its numeric form and populate the External field of the FleetProjectRef. +func (r *FleetProjectRef) ResolveExternal(ctx context.Context, projectsClient *resourcemanager.ProjectsClient) error { + projectID, err := r.parseProjectID() + if err != nil { + return err + } + projectNumber, err := strconv.ParseInt(projectID, 10, 64) + if err != nil { + req := &resourcemanagerpb.GetProjectRequest{ + Name: "projects/" + projectID, + } + project, err := projectsClient.GetProject(ctx, req) + if err != nil { + return fmt.Errorf("error getting project %q: %w", req.Name, err) + } + n, err := strconv.ParseInt(strings.TrimPrefix(project.Name, "projects/"), 10, 64) + if err != nil { + return fmt.Errorf("error parsing project number for %q: %w", project.Name, err) + } + projectNumber = n + } + r.External = fmt.Sprintf("projects/%d", projectNumber) + return nil +} + +func (r *FleetProjectRef) parseProjectID() (string, error) { + if r.External != "" { + tokens := strings.Split(r.External, "/") + if len(tokens) != 2 || tokens[0] != "projects" { + return "", fmt.Errorf("format of fleet project ref external %q is unrecognized (should be projects/)", r.External) + } + return tokens[1], nil + } + if r.Name != "" { + return r.Name, nil + } + return "", fmt.Errorf("no fleet project ref specified") +} + +// ResolveFleetProjectRef will fill out the complete details for the fleet project ref, complete with project ID. +// func ResolveFleetProjectRef(ctx context.Context, reader client.Reader, ref *FleetProjectRef, src *ContainerAttachedCluster) error { +// if ref == nil { +// return nil +// } + +// key := types.NamespacedName{ +// Namespace: ref.Namespace, +// Name: ref.Name, +// } +// if key.Namespace == "" { +// key.Namespace = src.GetNamespace() +// } +// project := &unstructured.Unstructured{} +// project.SetGroupVersionKind(schema.GroupVersionKind{ +// Group: "resourcemanager.cnrm.cloud.google.com", +// Version: "v1beta1", +// Kind: "Project", +// }) +// if err := reader.Get(ctx, key, project); err != nil { +// if apierrors.IsNotFound(err) { +// return fmt.Errorf("referenced Project %v not found", key) +// } +// return fmt.Errorf("error reading referenced Project %v: %w", key, err) +// } +// projectID, err := refsv1beta1.GetResourceID(project) +// if err != nil { +// return err +// } +// ref.External = fmt.Sprintf("projects/%s", projectID) +// return nil +// } + +// // ConvertToProjectNumber converts the external reference to use a project number. +// func (ref *ComputeNetworkRef) ConvertToProjectNumber(ctx context.Context, projectsClient *resourcemanager.ProjectsClient) error { +// if ref == nil { +// return nil +// } + +// id, err := ParseComputeNetworkID(ref.External) +// if err != nil { +// return err +// } + +// // Check if the project number is already a valid integer +// // If not, we need to look it up +// projectNumber, err := strconv.ParseInt(id.Project, 10, 64) +// if err != nil { +// req := &resourcemanagerpb.GetProjectRequest{ +// Name: "projects/" + id.Project, +// } +// project, err := projectsClient.GetProject(ctx, req) +// if err != nil { +// return fmt.Errorf("error getting project %q: %w", req.Name, err) +// } +// n, err := strconv.ParseInt(strings.TrimPrefix(project.Name, "projects/"), 10, 64) +// if err != nil { +// return fmt.Errorf("error parsing project number for %q: %w", project.Name, err) +// } +// projectNumber = n +// } +// id.Project = strconv.FormatInt(projectNumber, 10) +// ref.External = id.String() +// return nil +// } + +// NewContainerAttachedClusterRef builds a ContainerAttachedClusterRef from the ConfigConnector ContainerAttachedCluster object. +func NewContainerAttachedClusterRef(ctx context.Context, reader client.Reader, obj *ContainerAttachedCluster) (*ContainerAttachedClusterRef, error) { + projectRef, err := refsv1beta1.ResolveProject(ctx, reader, obj, obj.Spec.ProjectRef) + if err != nil { + return nil, err + } + projectID := projectRef.ProjectID + if projectID == "" { + return nil, fmt.Errorf("cannot resolve project") + } + + location := obj.Spec.Location + if location == "" { + return nil, fmt.Errorf("cannot resolve location") + } + + resourceID := direct.ValueOf(obj.Spec.ResourceID) + if resourceID == "" { + resourceID = obj.GetName() + } + if resourceID == "" { + return nil, fmt.Errorf("cannot resolve resource ID") + } + + parent := "projects/" + projectID + "/locations/" + location + return &ContainerAttachedClusterRef{ + External: parent + "/attachedClusters/" + resourceID, + Name: resourceID, + Location: location, + parent: parent, + }, nil +} + +func (r *ContainerAttachedClusterRef) Parent() (string, error) { + if r.parent != "" { + return r.parent, nil + } + if r.External != "" { + r.External = strings.TrimPrefix(r.External, "/") + tokens := strings.Split(r.External, "/") + if len(tokens) != 6 || tokens[0] != "projects" || tokens[2] != "locations" || tokens[4] != "attachedClusters" { + return "", fmt.Errorf("format of ContainerAttachedCluster external=%q was not known (use projects//locations//attachedClusters/)", r.External) + } + r.parent = "projects/" + tokens[1] + "/locations/" + tokens[3] + return r.parent, nil + } + return "", fmt.Errorf("ContainerAttachedClusterRef not normalized to External form or not created from `New()`") +} diff --git a/pkg/controller/direct/containerattached/attachedcluster_controller.go b/pkg/controller/direct/containerattached/attachedcluster_controller.go new file mode 100644 index 0000000000..996084c410 --- /dev/null +++ b/pkg/controller/direct/containerattached/attachedcluster_controller.go @@ -0,0 +1,369 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package containerattached + +import ( + "context" + // "crypto/tls" + "errors" + "fmt" + "log" + "reflect" + + krm "github.com/GoogleCloudPlatform/k8s-config-connector/apis/containerattached/v1beta1" + refs "github.com/GoogleCloudPlatform/k8s-config-connector/apis/refs/v1beta1" + "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/config" + "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct" + "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct/directbase" + "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct/registry" + + gcp "cloud.google.com/go/gkemulticloud/apiv1" + cloudresourcemanager "cloud.google.com/go/resourcemanager/apiv3" + + containerattachedpb "cloud.google.com/go/gkemulticloud/apiv1/gkemulticloudpb" + "github.com/googleapis/gax-go/v2/apierror" + "google.golang.org/api/option" + // "google.golang.org/grpc/credentials" + "google.golang.org/grpc" + "google.golang.org/grpc/status" + "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/types/known/fieldmaskpb" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func init() { + registry.RegisterModel(krm.ContainerAttachedClusterGVK, NewContainerAttachedClusterModel) +} + +func NewContainerAttachedClusterModel(ctx context.Context, config *config.ControllerConfig) (directbase.Model, error) { + return &modelContainerAttachedCluster{config: *config}, nil +} + +var _ directbase.Model = &modelContainerAttachedCluster{} + +type modelContainerAttachedCluster struct { + config config.ControllerConfig +} + +func loggingUnaryInterceptor() grpc.UnaryClientInterceptor { + return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + err := invoker(ctx, method, req, reply, cc, opts...) + log.Printf(":HF: Invoked method: %v, %+v", method, err) + md, ok := metadata.FromOutgoingContext(ctx) + log.Printf(":HF: md: %+v, ok: %t", md, ok) + if ok { + log.Println("Metadata:") + for k, v := range md { + log.Printf("Key: %v, Value: %v", k, v) + } + } + reqb, merr := protojson.Marshal(req.(protoreflect.ProtoMessage)) + if merr == nil { + log.Printf(":HF: Request: %s", reqb) + } + log.Printf(":HF: merr: %+v", merr) + return err + } +} + +func (m *modelContainerAttachedCluster) client(ctx context.Context, endpoint string) (*gcp.AttachedClustersClient, error) { + var opts []option.ClientOption + // Not working ("WithHTTPClient is incompatible with gRPC dial options"). GRPCClientOptions() gives the same error, and + // the implementation looks functionally identical. + // opts, err := m.config.GRPCClientOptions() + // if err != nil { + // return nil, err + // } + + // Added an interceptor to add more logging, but couldn't find anything useful. + // opts = append(opts, option.WithGRPCDialOption(grpc.WithUnaryInterceptor(loggingUnaryInterceptor()))) + opts = append(opts, option.WithEndpoint(endpoint)) + gcpClient, err := gcp.NewAttachedClustersClient(ctx, opts...) + // gcpClient, err := gcp.NewAttachedClustersClient(ctx) + if err != nil { + return nil, fmt.Errorf("building AttachedCluster client: %w", err) + } + return gcpClient, err +} + +func (m *modelContainerAttachedCluster) projectsClient(ctx context.Context) (*cloudresourcemanager.ProjectsClient, error) { + opts, err := m.config.RESTClientOptions() + if err != nil { + return nil, err + } + + crmClient, err := cloudresourcemanager.NewProjectsRESTClient(ctx, opts...) + if err != nil { + return nil, fmt.Errorf("building cloudresourcemanager client: %w", err) + } + return crmClient, err +} + +func endpoint(location string) string { + return fmt.Sprintf("%s-gkemulticloud.googleapis.com:443", location) +} + +func (m *modelContainerAttachedCluster) AdapterForObject(ctx context.Context, reader client.Reader, u *unstructured.Unstructured) (directbase.Adapter, error) { + log := klog.FromContext(ctx) + log.V(0).Info(":HF: AdapterForObject") + obj := &krm.ContainerAttachedCluster{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, &obj); err != nil { + return nil, fmt.Errorf("error converting to %T: %w", obj, err) + } + + id, err := krm.NewContainerAttachedClusterRef(ctx, reader, obj) + if err != nil { + return nil, err + } + + projectsClient, err := m.projectsClient(ctx) + if err != nil { + return nil, err + } + + // err = krm.ResolveFleetProjectRef(ctx, reader, &obj.Spec.Fleet.ProjectRef, obj) + err = obj.Spec.Fleet.ProjectRef.ResolveExternal(ctx, projectsClient) + if err != nil { + // return nil, err + log.V(0).Info(fmt.Sprintf("HF: ResolveFleetProjectRef error: %v", err)) + } + log.V(0).Info(fmt.Sprintf("HF: ResolveFleetProjectRef: %v", obj.Spec.Fleet.ProjectRef)) + + // Get containerattached GCP client + endpoint := endpoint(id.Location) + gcpClient, err := m.client(ctx, endpoint) + if err != nil { + return nil, err + } + + return &ContainerAttachedClusterAdapter{ + id: id, + gcpClient: gcpClient, + desired: obj, + }, nil +} + +func (m *modelContainerAttachedCluster) AdapterForURL(ctx context.Context, url string) (directbase.Adapter, error) { + log := klog.FromContext(ctx) + log.V(0).Info(":HF: AdapterForURL") + // TODO: Support URLs + return nil, nil +} + +type ContainerAttachedClusterAdapter struct { + id *krm.ContainerAttachedClusterRef + gcpClient *gcp.AttachedClustersClient + desired *krm.ContainerAttachedCluster + actual *containerattachedpb.AttachedCluster +} + +var _ directbase.Adapter = &ContainerAttachedClusterAdapter{} + +func (a *ContainerAttachedClusterAdapter) Find(ctx context.Context) (bool, error) { + log := klog.FromContext(ctx) + //log.V(2).Info("getting ContainerAttachedCluster", "name", a.id.External) + log.V(0).Info(":HF: getting ContainerAttachedCluster", "name", a.id.External, "id", a.id) + + req := &containerattachedpb.GetAttachedClusterRequest{Name: a.id.External} + attachedclusterpb, err := a.gcpClient.GetAttachedCluster(ctx, req) + if err != nil { + log.V(0).Info(":HF: getting ContainerAttachedCluster: error", "err", err) + + var ae *apierror.APIError + if errors.As(err, &ae) { + log.V(0).Info(ae.Reason()) + log.V(0).Info(":HF:", "help", ae.Details().Help.GetLinks()) + } + if s, ok := status.FromError(err); ok { + log.V(0).Info(s.Message()) + for _, d := range s.Proto().Details { + log.V(0).Info(":HF:", "d", d) + } + } + + if direct.IsNotFound(err) { + return false, nil + } + return false, fmt.Errorf("getting ContainerAttachedCluster %q: %w", a.id.External, err) + } + + log.V(0).Info(":HF: getting ContainerAttachedCluster: success", "actual", attachedclusterpb) + a.actual = attachedclusterpb + return true, nil +} + +func (a *ContainerAttachedClusterAdapter) Create(ctx context.Context, createOp *directbase.CreateOperation) error { + log := klog.FromContext(ctx) + // log.V(2).Info("creating AttachedCluster", "name", a.id.Name) + log.V(0).Info(":HF: creating AttachedCluster", "name", a.id.Name, "id", a.id) + mapCtx := &direct.MapContext{} + + desired := a.desired.DeepCopy() + resource := ContainerAttachedClusterSpec_ToProto(mapCtx, &desired.Spec) + if mapCtx.Err() != nil { + return mapCtx.Err() + } + + // TODO(user): Complete the gcp "CREATE" or "INSERT" request with required fields. + parent, err := a.id.Parent() + if err != nil { + return err + } + req := &containerattachedpb.CreateAttachedClusterRequest{ + Parent: parent, + AttachedCluster: resource, + } + op, err := a.gcpClient.CreateAttachedCluster(ctx, req) + if err != nil { + return fmt.Errorf("creating AttachedCluster %s: %w", a.id.External, err) + } + created, err := op.Wait(ctx) + if err != nil { + return fmt.Errorf("AttachedCluster %s waiting creation: %w", a.id.External, err) + } + log.V(2).Info("successfully created AttachedCluster", "name", a.id.External) + + status := &krm.ContainerAttachedClusterStatus{} + status.ObservedState = ContainerAttachedClusterStatusObservedState_FromProto(mapCtx, created) + if mapCtx.Err() != nil { + return mapCtx.Err() + } + return createOp.UpdateStatus(ctx, status, nil) +} + +func (a *ContainerAttachedClusterAdapter) Update(ctx context.Context, updateOp *directbase.UpdateOperation) error { + log := klog.FromContext(ctx) + log.V(2).Info("updating AttachedCluster", "name", a.id.External) + mapCtx := &direct.MapContext{} + + desired := a.desired.DeepCopy() + resource := ContainerAttachedClusterSpec_ToProto(mapCtx, &desired.Spec) + if mapCtx.Err() != nil { + return mapCtx.Err() + } + + // Mask of fields to update. At least one path must be supplied in + // this field. + // + // - `annotations`. + // - `authorization.admin_users`. + // - `binary_authorization.evaluation_mode`. + // - `description`. + // - `logging_config.component_config.enable_components`. + // - `monitoring_config.managed_prometheus_config.enabled`. + // - `platform_version`. + updateMask := &fieldmaskpb.FieldMask{} + if !reflect.DeepEqual(a.desired.Spec.Annotations, a.actual.Annotations) { + updateMask.Paths = append(updateMask.Paths, "annotations") + } + if !reflect.DeepEqual(a.desired.Spec.Authorization.AdminUsers, a.actual.Authorization.AdminUsers) { + updateMask.Paths = append(updateMask.Paths, "authorization.admin_users") + } + if !reflect.DeepEqual(a.desired.Spec.BinaryAuthorization.EvaluationMode, a.actual.BinaryAuthorization.EvaluationMode) { + updateMask.Paths = append(updateMask.Paths, "binary_authorization.evaluation_mode") + } + if !reflect.DeepEqual(a.desired.Spec.Description, a.actual.Description) { + updateMask.Paths = append(updateMask.Paths, "description") + } + if !reflect.DeepEqual(a.desired.Spec.LoggingConfig.ComponentConfig.EnableComponents, a.actual.LoggingConfig.ComponentConfig.EnableComponents) { + updateMask.Paths = append(updateMask.Paths, "logging_config.component_config.enable_components") + } + if !reflect.DeepEqual(a.desired.Spec.MonitoringConfig.ManagedPrometheusConfig.Enabled, a.actual.MonitoringConfig.ManagedPrometheusConfig.Enabled) { + updateMask.Paths = append(updateMask.Paths, "monitoring_config.managed_prometheus_config.enabled") + } + if !reflect.DeepEqual(a.desired.Spec.PlatformVersion, a.actual.PlatformVersion) { + updateMask.Paths = append(updateMask.Paths, "platform_version") + } + + if len(updateMask.Paths) == 0 { + log.V(2).Info("no field needs update", "name", a.id.External) + return nil + } + req := &containerattachedpb.UpdateAttachedClusterRequest{ + UpdateMask: updateMask, + AttachedCluster: resource, + } + op, err := a.gcpClient.UpdateAttachedCluster(ctx, req) + if err != nil { + return fmt.Errorf("updating AttachedCluster %s: %w", a.id.External, err) + } + updated, err := op.Wait(ctx) + if err != nil { + return fmt.Errorf("AttachedCluster %s waiting update: %w", a.id.External, err) + } + log.V(2).Info("successfully updated AttachedCluster", "name", a.id.External) + + status := &krm.ContainerAttachedClusterStatus{} + status.ObservedState = ContainerAttachedClusterStatusObservedState_FromProto(mapCtx, updated) + if mapCtx.Err() != nil { + return mapCtx.Err() + } + return updateOp.UpdateStatus(ctx, status, nil) +} + +func (a *ContainerAttachedClusterAdapter) Export(ctx context.Context) (*unstructured.Unstructured, error) { + if a.actual == nil { + return nil, fmt.Errorf("Find() not called") + } + u := &unstructured.Unstructured{} + + obj := &krm.ContainerAttachedCluster{} + mapCtx := &direct.MapContext{} + obj.Spec = direct.ValueOf(ContainerAttachedClusterSpec_FromProto(mapCtx, a.actual)) + if mapCtx.Err() != nil { + return nil, mapCtx.Err() + } + parent, err := a.id.Parent() + if err != nil { + return nil, err + } + obj.Spec.ProjectRef = &refs.ProjectRef{External: parent} + obj.Spec.Location = a.id.Location + uObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) + if err != nil { + return nil, err + } + + u.SetName(a.id.Name) + u.SetGroupVersionKind(krm.ContainerAttachedClusterGVK) + + u.Object = uObj + return u, nil +} + +// Delete implements the Adapter interface. +func (a *ContainerAttachedClusterAdapter) Delete(ctx context.Context, deleteOp *directbase.DeleteOperation) (bool, error) { + log := klog.FromContext(ctx) + log.V(2).Info("deleting AttachedCluster", "name", a.id.External) + + req := &containerattachedpb.DeleteAttachedClusterRequest{Name: a.id.External} + op, err := a.gcpClient.DeleteAttachedCluster(ctx, req) + if err != nil { + return false, fmt.Errorf("deleting AttachedCluster %s: %w", a.id.External, err) + } + log.V(2).Info("successfully deleted AttachedCluster", "name", a.id.External) + + err = op.Wait(ctx) + if err != nil { + return false, fmt.Errorf("waiting delete AttachedCluster %s: %w", a.id.External, err) + } + return true, nil +} diff --git a/pkg/controller/direct/containerattached/containerattachedcluster_mappings.go b/pkg/controller/direct/containerattached/containerattachedcluster_mappings.go index fe16e959af..e5a0a651a8 100644 --- a/pkg/controller/direct/containerattached/containerattachedcluster_mappings.go +++ b/pkg/controller/direct/containerattached/containerattachedcluster_mappings.go @@ -15,6 +15,9 @@ package containerattached import ( + "fmt" + "log" + pb "cloud.google.com/go/gkemulticloud/apiv1/gkemulticloudpb" krm "github.com/GoogleCloudPlatform/k8s-config-connector/apis/containerattached/v1beta1" "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct" @@ -36,9 +39,14 @@ func Fleet_ToProto(mapCtx *direct.MapContext, in *krm.Fleet) *pb.Fleet { } out := &pb.Fleet{} if in.ProjectRef.External == "" { - mapCtx.Errorf("Fleet project external reference was not pre-resolved") + if in.ProjectRef.Name == "" { + mapCtx.Errorf("Fleet project reference is missing") + } + in.ProjectRef.External = fmt.Sprintf("projects/%s", in.ProjectRef.Name) + // krm.ResolveFleetProjectRef(ctx, ) } out.Project = in.ProjectRef.External + log.Printf("HF: out.Project: %s, in.ProjectRef: %+v", out.Project, in.ProjectRef) out.Membership = direct.ValueOf(in.Membership) return out } @@ -139,6 +147,15 @@ func ContainerAttachedClusterSpec_ToProto(mapCtx *direct.MapContext, in *krm.Con return out } +func ContainerAttachedClusterStatusObservedState_FromProto(mapCtx *direct.MapContext, in *pb.AttachedCluster) *krm.ContainerAttachedClusterObservedState { + if in == nil { + return nil + } + return &krm.ContainerAttachedClusterObservedState{ + FleetMembership: direct.PtrTo(in.GetFleet().GetMembership()), + } +} + func LoggingComponentConfig_FromProto(mapCtx *direct.MapContext, in *pb.LoggingComponentConfig) *krm.LoggingComponentConfig { if in == nil { return nil diff --git a/pkg/controller/direct/maputils.go b/pkg/controller/direct/maputils.go index 34ba5a03f5..74512c2785 100644 --- a/pkg/controller/direct/maputils.go +++ b/pkg/controller/direct/maputils.go @@ -23,6 +23,8 @@ import ( "time" "github.com/googleapis/gax-go/v2/apierror" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" @@ -223,9 +225,9 @@ func ValueOf[T any](p *T) T { return v } -// IsNotFound returns true if the given error is an HTTP 404. +// IsNotFound returns true if the given error is an HTTP 404 or a NotFound GRPC status. func IsNotFound(err error) bool { - return HasHTTPCode(err, 404) + return HasHTTPCode(err, 404) || HasGRPCStatus(err, codes.NotFound) } // IsBadRequest returns true if the given error is an HTTP 400. @@ -249,6 +251,17 @@ func HasHTTPCode(err error, code int) bool { return false } +func HasGRPCStatus(err error, code codes.Code) bool { + if err == nil { + return false + } + grpcStatus, ok := status.FromError(err) + if !ok { + return false + } + return grpcStatus.Code() == code +} + func Duration_ToProto(mapCtx *MapContext, in *string) *durationpb.Duration { if in == nil { return nil diff --git a/pkg/controller/direct/register/register.go b/pkg/controller/direct/register/register.go index 5a478ad8e4..a4f517fd2f 100644 --- a/pkg/controller/direct/register/register.go +++ b/pkg/controller/direct/register/register.go @@ -25,6 +25,7 @@ import ( _ "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct/cloudbuild" _ "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct/compute/firewallpolicyrule" _ "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct/compute/forwardingrule" + _ "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct/containerattached" _ "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct/dataflow" _ "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct/dataform" _ "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/direct/discoveryengine"