From bcf969e6d2d5a4dfbd0d934658a213af4763fd82 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Thu, 23 Oct 2025 18:59:42 +0800 Subject: [PATCH 1/7] make pulsar topic controller as statefui reconciler --- pkg/admin/dummy.go | 10 + pkg/admin/impl.go | 33 ++- pkg/admin/interface.go | 6 + pkg/connection/reconcile_topic.go | 22 ++ pkg/connection/topic_compaction_reconciler.go | 189 ++++++++++++++++++ tests/operator/resources_test.go | 23 +++ 6 files changed, 276 insertions(+), 7 deletions(-) create mode 100644 pkg/connection/topic_compaction_reconciler.go diff --git a/pkg/admin/dummy.go b/pkg/admin/dummy.go index c231ae3a..96201756 100644 --- a/pkg/admin/dummy.go +++ b/pkg/admin/dummy.go @@ -96,6 +96,16 @@ func (d *DummyPulsarAdmin) SetTopicClusters(string, *bool, []string) error { return nil } +// SetTopicCompactionThreshold is a fake implementation of SetTopicCompactionThreshold +func (d *DummyPulsarAdmin) SetTopicCompactionThreshold(string, *bool, int64) error { + return nil +} + +// RemoveTopicCompactionThreshold is a fake implementation of RemoveTopicCompactionThreshold +func (d *DummyPulsarAdmin) RemoveTopicCompactionThreshold(string, *bool) error { + return nil +} + // Close is a fake implements of Close func (d *DummyPulsarAdmin) Close() error { return nil diff --git a/pkg/admin/impl.go b/pkg/admin/impl.go index 3af09d55..867c498f 100644 --- a/pkg/admin/impl.go +++ b/pkg/admin/impl.go @@ -348,13 +348,6 @@ func (p *PulsarAdminClient) applyTopicPolicies(topicName *utils.TopicName, param } } - if params.CompactionThreshold != nil { - err = p.adminClient.Topics().SetCompactionThreshold(*topicName, *params.CompactionThreshold) - if err != nil { - return err - } - } - // Handle persistence policies if params.PersistencePolicies != nil { // Parse ManagedLedgerMaxMarkDeleteRate from string to float64 @@ -612,6 +605,32 @@ func (p *PulsarAdminClient) SetTopicClusters(name string, persistent *bool, clus return nil } +// SetTopicCompactionThreshold sets the compaction threshold for a topic. +func (p *PulsarAdminClient) SetTopicCompactionThreshold(name string, persistent *bool, value int64) error { + completeTopicName := MakeCompleteTopicName(name, persistent) + topicName, err := utils.GetTopicName(completeTopicName) + if err != nil { + return err + } + return p.adminClient.Topics().SetCompactionThreshold(*topicName, value) +} + +// RemoveTopicCompactionThreshold removes the compaction threshold from a topic. +func (p *PulsarAdminClient) RemoveTopicCompactionThreshold(name string, persistent *bool) error { + completeTopicName := MakeCompleteTopicName(name, persistent) + topicName, err := utils.GetTopicName(completeTopicName) + if err != nil { + return err + } + if err := p.adminClient.Topics().RemoveCompactionThreshold(*topicName); err != nil { + if IsNotFound(err) { + return nil + } + return err + } + return nil +} + func (p *PulsarAdminClient) applyNamespacePolicies(completeNSName string, params *NamespaceParams) error { naName, err := utils.GetNamespaceName(completeNSName) if err != nil { diff --git a/pkg/admin/interface.go b/pkg/admin/interface.go index 19d46bd0..60baf0b2 100644 --- a/pkg/admin/interface.go +++ b/pkg/admin/interface.go @@ -163,6 +163,12 @@ type PulsarAdmin interface { // SetTopicClusters resets the assigned clusters of the topic to the local default cluster SetTopicClusters(name string, persistent *bool, clusters []string) error + // SetTopicCompactionThreshold sets the compaction threshold for a topic + SetTopicCompactionThreshold(name string, persistent *bool, value int64) error + + // RemoveTopicCompactionThreshold removes the compaction threshold from a topic + RemoveTopicCompactionThreshold(name string, persistent *bool) error + // GrantPermissions grants permissions to multiple role with multiple actions // on a namespace or topic, each role will be granted the same actions GrantPermissions(p Permissioner) error diff --git a/pkg/connection/reconcile_topic.go b/pkg/connection/reconcile_topic.go index f9f0a725..8cc7ead7 100644 --- a/pkg/connection/reconcile_topic.go +++ b/pkg/connection/reconcile_topic.go @@ -239,6 +239,11 @@ func (r *PulsarTopicReconciler) ReconcileTopic(ctx context.Context, pulsarAdmin return creationErr } + if err := r.reconcileTopicCompactionState(ctx, topic); err != nil { + log.Error(err, "Failed to reconcile topic compaction state") + policyErrs = append(policyErrs, err) + } + if err := applySchema(pulsarAdmin, topic, log); err != nil { policyErrs = append(policyErrs, err) } @@ -310,6 +315,23 @@ func createTopicParams(topic *resourcev1alpha1.PulsarTopic) *admin.TopicParams { } } +func (r *PulsarTopicReconciler) reconcileTopicCompactionState(ctx context.Context, topic *resourcev1alpha1.PulsarTopic) error { + original := topic.DeepCopy() + reconciler := newTopicCompactionStateReconciler(r.log, r.conn.pulsarAdmin) + changed, err := reconciler.reconcile(ctx, topic) + if err != nil { + return err + } + if !changed { + return nil + } + + if err := r.conn.client.Patch(ctx, topic, client.MergeFrom(original)); err != nil { + return err + } + return nil +} + func (r *PulsarTopicReconciler) applyDefault(params *admin.TopicParams) { if params.Persistent == nil { // by default create persistent topic diff --git a/pkg/connection/topic_compaction_reconciler.go b/pkg/connection/topic_compaction_reconciler.go new file mode 100644 index 00000000..2f966005 --- /dev/null +++ b/pkg/connection/topic_compaction_reconciler.go @@ -0,0 +1,189 @@ +package connection + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + + "github.com/go-logr/logr" + + resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" + "github.com/streamnative/pulsar-resources-operator/pkg/admin" + "github.com/streamnative/pulsar-resources-operator/pkg/reconciler" +) + +const ( + // PulsarTopicCompactionStateAnnotation stores the last applied compaction threshold state + PulsarTopicCompactionStateAnnotation = "pulsartopics.resource.streamnative.io/compaction-state" +) + +// topicCompactionState represents the compaction policy state we track on PulsarTopic resources. +type topicCompactionState struct { + CompactionThreshold *int64 `json:"compactionThreshold,omitempty"` +} + +// topicCompactionStateReconciler reconciles compaction threshold changes using the stateful reconciler helper. +type topicCompactionStateReconciler struct { + base *reconciler.BaseStatefulReconciler[*resourcev1alpha1.PulsarTopic] + admin admin.PulsarAdmin + log logr.Logger + changed bool +} + +func newTopicCompactionStateReconciler(logger logr.Logger, adminClient admin.PulsarAdmin) *topicCompactionStateReconciler { + compactionLogger := logger.WithName("CompactionState") + return &topicCompactionStateReconciler{ + base: reconciler.NewBaseStatefulReconciler[*resourcev1alpha1.PulsarTopic](compactionLogger), + admin: adminClient, + log: compactionLogger, + } +} + +func (r *topicCompactionStateReconciler) reconcile(ctx context.Context, topic *resourcev1alpha1.PulsarTopic) (bool, error) { + r.changed = false + if err := r.base.Reconcile(ctx, topic, r); err != nil { + return false, err + } + return r.changed, nil +} + +// GetStateAnnotationKey implements reconciler.StatefulReconciler. +func (*topicCompactionStateReconciler) GetStateAnnotationKey() string { + return PulsarTopicCompactionStateAnnotation +} + +// ExtractCurrentState implements reconciler.StatefulReconciler. +func (*topicCompactionStateReconciler) ExtractCurrentState(topic *resourcev1alpha1.PulsarTopic) (interface{}, error) { + state := topicCompactionState{} + if topic.Spec.CompactionThreshold != nil { + value := *topic.Spec.CompactionThreshold + state.CompactionThreshold = &value + } + return state, nil +} + +// CompareStates implements reconciler.StatefulReconciler. +func (r *topicCompactionStateReconciler) CompareStates(previous, current interface{}) (reconciler.StateChangeOperations, error) { + prevState, err := decodeTopicCompactionState(previous) + if err != nil { + return reconciler.StateChangeOperations{}, err + } + currState, err := decodeTopicCompactionState(current) + if err != nil { + return reconciler.StateChangeOperations{}, err + } + + prevThreshold := prevState.CompactionThreshold + currThreshold := currState.CompactionThreshold + + ops := reconciler.StateChangeOperations{} + + switch { + case prevThreshold == nil && currThreshold != nil: + ops.ItemsToAdd = append(ops.ItemsToAdd, *currThreshold) + case prevThreshold != nil && currThreshold == nil: + ops.ItemsToRemove = append(ops.ItemsToRemove, *prevThreshold) + case prevThreshold != nil && currThreshold != nil && *prevThreshold != *currThreshold: + ops.ItemsToUpdate = append(ops.ItemsToUpdate, *currThreshold) + default: + return ops, nil + } + + r.changed = true + return ops, nil +} + +// ApplyOperations implements reconciler.StatefulReconciler. +func (r *topicCompactionStateReconciler) ApplyOperations(ctx context.Context, topic *resourcev1alpha1.PulsarTopic, + ops reconciler.StateChangeOperations) error { + for _, item := range ops.ItemsToAdd { + value, ok := item.(int64) + if !ok { + return fmt.Errorf("unexpected compaction threshold type %T in add operation", item) + } + if err := r.admin.SetTopicCompactionThreshold(topic.Spec.Name, topic.Spec.Persistent, value); err != nil { + return err + } + } + + for _, item := range ops.ItemsToUpdate { + value, ok := item.(int64) + if !ok { + return fmt.Errorf("unexpected compaction threshold type %T in update operation", item) + } + if err := r.admin.SetTopicCompactionThreshold(topic.Spec.Name, topic.Spec.Persistent, value); err != nil { + return err + } + } + + if len(ops.ItemsToRemove) > 0 { + if err := r.admin.RemoveTopicCompactionThreshold(topic.Spec.Name, topic.Spec.Persistent); err != nil { + return err + } + } + + return nil +} + +// UpdateStateAnnotation implements reconciler.StatefulReconciler. +func (r *topicCompactionStateReconciler) UpdateStateAnnotation(topic *resourcev1alpha1.PulsarTopic, currentState interface{}) error { + return r.base.UpdateAnnotation(topic, PulsarTopicCompactionStateAnnotation, currentState) +} + +func decodeTopicCompactionState(state interface{}) (topicCompactionState, error) { + if state == nil { + return topicCompactionState{}, nil + } + + switch value := state.(type) { + case topicCompactionState: + return value, nil + case map[string]interface{}: + return decodeTopicCompactionStateFromMap(value) + default: + bytes, err := json.Marshal(value) + if err != nil { + return topicCompactionState{}, err + } + var result topicCompactionState + if err := json.Unmarshal(bytes, &result); err != nil { + return topicCompactionState{}, err + } + return result, nil + } +} + +func decodeTopicCompactionStateFromMap(raw map[string]interface{}) (topicCompactionState, error) { + result := topicCompactionState{} + if raw == nil { + return result, nil + } + + value, exists := raw["compactionThreshold"] + if !exists || value == nil { + return result, nil + } + + switch typed := value.(type) { + case float64: + threshold := int64(typed) + result.CompactionThreshold = &threshold + case json.Number: + parsed, err := strconv.ParseInt(string(typed), 10, 64) + if err != nil { + return topicCompactionState{}, err + } + result.CompactionThreshold = &parsed + case int64: + threshold := typed + result.CompactionThreshold = &threshold + case int: + threshold := int64(typed) + result.CompactionThreshold = &threshold + default: + return topicCompactionState{}, fmt.Errorf("unsupported compaction threshold value type %T", value) + } + + return result, nil +} diff --git a/tests/operator/resources_test.go b/tests/operator/resources_test.go index 4bb85011..2ea6c8ec 100644 --- a/tests/operator/resources_test.go +++ b/tests/operator/resources_test.go @@ -493,6 +493,29 @@ var _ = Describe("Resources", func() { }, "20s", "100ms").Should(BeTrue()) }) + It("should reset compaction state annotation after removing threshold", func() { + Eventually(func() bool { + t := &v1alphav1.PulsarTopic{} + tns := types.NamespacedName{Namespace: namespaceName, Name: compactionTopicName} + Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed()) + + annotations := t.GetAnnotations() + if annotations == nil { + return true + } + + rawState, exists := annotations[connection.PulsarTopicCompactionStateAnnotation] + if !exists { + return true + } + + var state map[string]interface{} + Expect(json.Unmarshal([]byte(rawState), &state)).Should(Succeed()) + value, ok := state["compactionThreshold"] + return !ok || value == nil + }, "20s", "100ms").Should(BeTrue()) + }) + AfterAll(func() { // Clean up the compaction test topic after all tests complete if compactionTopic != nil { From b60aea1ce143756a6eb792eb5918c547e8e34c5d Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Fri, 24 Oct 2025 00:06:43 +0800 Subject: [PATCH 2/7] add debug logging --- pkg/connection/reconcile_topic.go | 112 +++++++++++++++++- pkg/connection/topic_compaction_reconciler.go | 15 +++ 2 files changed, 124 insertions(+), 3 deletions(-) diff --git a/pkg/connection/reconcile_topic.go b/pkg/connection/reconcile_topic.go index 8cc7ead7..c93a5cd4 100644 --- a/pkg/connection/reconcile_topic.go +++ b/pkg/connection/reconcile_topic.go @@ -94,6 +94,12 @@ func (r *PulsarTopicReconciler) ReconcileTopic(ctx context.Context, pulsarAdmin if !topic.DeletionTimestamp.IsZero() { log.Info("Deleting topic", "LifecyclePolicy", topic.Spec.LifecyclePolicy) + log.V(1).Info("Deleting topic in Pulsar admin", + "topicSpecName", topic.Spec.Name, + "persistent", pointerValue(topic.Spec.Persistent), + "partitions", pointerValue(topic.Spec.Partitions), + "generation", topic.Generation, + "observedGeneration", topic.Status.ObservedGeneration) if topic.Spec.LifecyclePolicy != resourcev1alpha1.KeepAfterDeletion { // TODO when geoReplicationRef is not nil, it should reset the replication clusters to @@ -119,9 +125,15 @@ func (r *PulsarTopicReconciler) ReconcileTopic(ctx context.Context, pulsarAdmin } } - if err := pulsarAdmin.DeleteTopic(topic.Spec.Name); err != nil && !admin.IsNotFound(err) { - log.Error(err, "Failed to delete topic") - return err + if err := pulsarAdmin.DeleteTopic(topic.Spec.Name); err != nil { + if admin.IsNotFound(err) { + log.V(1).Info("Topic already removed from Pulsar admin", "topicSpecName", topic.Spec.Name) + } else { + log.Error(err, "Failed to delete topic") + return err + } + } else { + log.V(1).Info("Deleted topic from Pulsar admin", "topicSpecName", topic.Spec.Name) } } @@ -180,6 +192,7 @@ func (r *PulsarTopicReconciler) ReconcileTopic(ctx context.Context, pulsarAdmin params := createTopicParams(topic) r.applyDefault(params) + action := determineTopicAdminAction(topic) if refs := topic.Spec.GeoReplicationRefs; len(refs) != 0 || len(topic.Spec.ReplicationClusters) > 0 { if len(refs) > 0 && len(topic.Spec.ReplicationClusters) > 0 { @@ -230,8 +243,29 @@ func (r *PulsarTopicReconciler) ReconcileTopic(ctx context.Context, pulsarAdmin topic.Status.GeoReplicationEnabled = false } + desiredConfig := summarizeTopicParamsForLogging(params) + log.V(1).Info("Reconciling Pulsar topic in admin", + "action", action, + "topicSpecName", topic.Spec.Name, + "generation", topic.Generation, + "observedGeneration", topic.Status.ObservedGeneration, + "desiredConfig", desiredConfig) + creationErr, policyErr := pulsarAdmin.ApplyTopic(topic.Spec.Name, params) log.Info("Apply topic", "creationErr", creationErr, "policyErr", policyErr) + if creationErr != nil || policyErr != nil { + log.V(1).Info("Pulsar topic apply completed with errors", + "action", action, + "topicSpecName", topic.Spec.Name, + "creationErr", creationErr, + "policyErr", policyErr, + "desiredConfig", desiredConfig) + } else { + log.V(1).Info("Pulsar topic apply completed successfully", + "action", action, + "topicSpecName", topic.Spec.Name, + "desiredConfig", desiredConfig) + } if policyErr != nil { policyErrs = append(policyErrs, policyErr) } @@ -315,6 +349,77 @@ func createTopicParams(topic *resourcev1alpha1.PulsarTopic) *admin.TopicParams { } } +func determineTopicAdminAction(topic *resourcev1alpha1.PulsarTopic) string { + if topic.Status.ObservedGeneration == 0 { + return "create" + } + return "update" +} + +func summarizeTopicParamsForLogging(params *admin.TopicParams) map[string]interface{} { + if params == nil { + return nil + } + + summary := make(map[string]interface{}) + addIfNotNil(summary, "persistent", params.Persistent) + addIfNotNil(summary, "partitions", params.Partitions) + addIfNotNil(summary, "maxProducers", params.MaxProducers) + addIfNotNil(summary, "maxConsumers", params.MaxConsumers) + addIfNotNil(summary, "messageTTL", params.MessageTTL) + addIfNotNil(summary, "maxUnAckedMessagesPerConsumer", params.MaxUnAckedMessagesPerConsumer) + addIfNotNil(summary, "maxUnAckedMessagesPerSubscription", params.MaxUnAckedMessagesPerSubscription) + addIfNotNil(summary, "retentionTime", params.RetentionTime) + addIfNotNil(summary, "retentionSize", params.RetentionSize) + addIfNotNil(summary, "backlogQuotaLimitTime", params.BacklogQuotaLimitTime) + addIfNotNil(summary, "backlogQuotaLimitSize", params.BacklogQuotaLimitSize) + addIfNotNil(summary, "backlogQuotaRetentionPolicy", params.BacklogQuotaRetentionPolicy) + addIfNotNil(summary, "deduplication", params.Deduplication) + addIfNotNil(summary, "compactionThreshold", params.CompactionThreshold) + addIfNotNil(summary, "persistencePolicies", params.PersistencePolicies) + addIfNotNil(summary, "delayedDelivery", params.DelayedDelivery) + addIfNotNil(summary, "dispatchRate", params.DispatchRate) + addIfNotNil(summary, "publishRate", params.PublishRate) + addIfNotNil(summary, "inactiveTopicPolicies", params.InactiveTopicPolicies) + addIfNotNil(summary, "subscribeRate", params.SubscribeRate) + addIfNotNil(summary, "maxMessageSize", params.MaxMessageSize) + addIfNotNil(summary, "maxConsumersPerSubscription", params.MaxConsumersPerSubscription) + addIfNotNil(summary, "maxSubscriptionsPerTopic", params.MaxSubscriptionsPerTopic) + addIfNotNil(summary, "schemaValidationEnforced", params.SchemaValidationEnforced) + addIfNotNil(summary, "subscriptionDispatchRate", params.SubscriptionDispatchRate) + addIfNotNil(summary, "replicatorDispatchRate", params.ReplicatorDispatchRate) + addIfNotNil(summary, "deduplicationSnapshotInterval", params.DeduplicationSnapshotInterval) + addIfNotNil(summary, "offloadPolicies", params.OffloadPolicies) + addIfNotNil(summary, "autoSubscriptionCreation", params.AutoSubscriptionCreation) + addIfNotNil(summary, "schemaCompatibilityStrategy", params.SchemaCompatibilityStrategy) + + if len(params.ReplicationClusters) > 0 { + summary["replicationClusters"] = params.ReplicationClusters + } + if len(params.Properties) > 0 { + summary["properties"] = params.Properties + } + + if len(summary) == 0 { + return nil + } + return summary +} + +func addIfNotNil[T any](summary map[string]interface{}, key string, value *T) { + if value == nil { + return + } + summary[key] = *value +} + +func pointerValue[T any](value *T) interface{} { + if value == nil { + return nil + } + return *value +} + func (r *PulsarTopicReconciler) reconcileTopicCompactionState(ctx context.Context, topic *resourcev1alpha1.PulsarTopic) error { original := topic.DeepCopy() reconciler := newTopicCompactionStateReconciler(r.log, r.conn.pulsarAdmin) @@ -329,6 +434,7 @@ func (r *PulsarTopicReconciler) reconcileTopicCompactionState(ctx context.Contex if err := r.conn.client.Patch(ctx, topic, client.MergeFrom(original)); err != nil { return err } + r.log.V(1).Info("Persisted compaction state annotation", "topic", topic.Name) return nil } diff --git a/pkg/connection/topic_compaction_reconciler.go b/pkg/connection/topic_compaction_reconciler.go index 2f966005..8628dd16 100644 --- a/pkg/connection/topic_compaction_reconciler.go +++ b/pkg/connection/topic_compaction_reconciler.go @@ -102,6 +102,9 @@ func (r *topicCompactionStateReconciler) ApplyOperations(ctx context.Context, to if !ok { return fmt.Errorf("unexpected compaction threshold type %T in add operation", item) } + r.log.V(1).Info("Setting topic compaction threshold", + "topicSpecName", topic.Spec.Name, + "threshold", value) if err := r.admin.SetTopicCompactionThreshold(topic.Spec.Name, topic.Spec.Persistent, value); err != nil { return err } @@ -112,12 +115,24 @@ func (r *topicCompactionStateReconciler) ApplyOperations(ctx context.Context, to if !ok { return fmt.Errorf("unexpected compaction threshold type %T in update operation", item) } + r.log.V(1).Info("Updating topic compaction threshold", + "topicSpecName", topic.Spec.Name, + "threshold", value) if err := r.admin.SetTopicCompactionThreshold(topic.Spec.Name, topic.Spec.Persistent, value); err != nil { return err } } if len(ops.ItemsToRemove) > 0 { + var previous interface{} + if len(ops.ItemsToRemove) == 1 { + previous = ops.ItemsToRemove[0] + } else { + previous = ops.ItemsToRemove + } + r.log.V(1).Info("Removing topic compaction threshold", + "topicSpecName", topic.Spec.Name, + "previous", previous) if err := r.admin.RemoveTopicCompactionThreshold(topic.Spec.Name, topic.Spec.Persistent); err != nil { return err } From 85d1fb258ba841ac82f0ee424bd766aed94c14de Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Fri, 24 Oct 2025 16:32:23 +0800 Subject: [PATCH 3/7] address other policies --- pkg/admin/dummy.go | 100 +++ pkg/admin/impl.go | 378 ++++++++++++ pkg/admin/interface.go | 75 +++ pkg/connection/reconcile_topic.go | 10 +- pkg/connection/topic_compaction_reconciler.go | 204 ------ .../topic_policy_state_reconciler.go | 581 ++++++++++++++++++ tests/operator/resources_test.go | 2 +- 7 files changed, 1140 insertions(+), 210 deletions(-) delete mode 100644 pkg/connection/topic_compaction_reconciler.go create mode 100644 pkg/connection/topic_policy_state_reconciler.go diff --git a/pkg/admin/dummy.go b/pkg/admin/dummy.go index 96201756..24556d0e 100644 --- a/pkg/admin/dummy.go +++ b/pkg/admin/dummy.go @@ -106,6 +106,106 @@ func (d *DummyPulsarAdmin) RemoveTopicCompactionThreshold(string, *bool) error { return nil } +func (d *DummyPulsarAdmin) RemoveTopicMessageTTL(string, *bool) error { + return nil +} + +func (d *DummyPulsarAdmin) RemoveTopicMaxProducers(string, *bool) error { + return nil +} + +func (d *DummyPulsarAdmin) RemoveTopicMaxConsumers(string, *bool) error { + return nil +} + +func (d *DummyPulsarAdmin) RemoveTopicMaxUnackedMessagesPerConsumer(string, *bool) error { + return nil +} + +func (d *DummyPulsarAdmin) RemoveTopicMaxUnackedMessagesPerSubscription(string, *bool) error { + return nil +} + +func (d *DummyPulsarAdmin) RemoveTopicRetention(string, *bool) error { + return nil +} + +func (d *DummyPulsarAdmin) RemoveTopicBacklogQuota(string, *bool, string) error { + return nil +} + +func (d *DummyPulsarAdmin) RemoveTopicDeduplicationStatus(string, *bool) error { + return nil +} + +func (d *DummyPulsarAdmin) RemoveTopicPersistence(string, *bool) error { + return nil +} + +func (d *DummyPulsarAdmin) RemoveTopicDelayedDelivery(string, *bool) error { + return nil +} + +func (d *DummyPulsarAdmin) RemoveTopicDispatchRate(string, *bool) error { + return nil +} + +func (d *DummyPulsarAdmin) RemoveTopicPublishRate(string, *bool) error { + return nil +} + +func (d *DummyPulsarAdmin) RemoveTopicInactiveTopicPolicies(string, *bool) error { + return nil +} + +func (d *DummyPulsarAdmin) RemoveTopicSubscribeRate(string, *bool) error { + return nil +} + +func (d *DummyPulsarAdmin) RemoveTopicMaxMessageSize(string, *bool) error { + return nil +} + +func (d *DummyPulsarAdmin) RemoveTopicMaxConsumersPerSubscription(string, *bool) error { + return nil +} + +func (d *DummyPulsarAdmin) RemoveTopicMaxSubscriptionsPerTopic(string, *bool) error { + return nil +} + +func (d *DummyPulsarAdmin) RemoveTopicSchemaValidationEnforced(string, *bool) error { + return nil +} + +func (d *DummyPulsarAdmin) RemoveTopicSubscriptionDispatchRate(string, *bool) error { + return nil +} + +func (d *DummyPulsarAdmin) RemoveTopicReplicatorDispatchRate(string, *bool) error { + return nil +} + +func (d *DummyPulsarAdmin) RemoveTopicDeduplicationSnapshotInterval(string, *bool) error { + return nil +} + +func (d *DummyPulsarAdmin) RemoveTopicOffloadPolicies(string, *bool) error { + return nil +} + +func (d *DummyPulsarAdmin) RemoveTopicAutoSubscriptionCreation(string, *bool) error { + return nil +} + +func (d *DummyPulsarAdmin) RemoveTopicSchemaCompatibilityStrategy(string, *bool) error { + return nil +} + +func (d *DummyPulsarAdmin) RemoveTopicProperty(string, *bool, string) error { + return nil +} + // Close is a fake implements of Close func (d *DummyPulsarAdmin) Close() error { return nil diff --git a/pkg/admin/impl.go b/pkg/admin/impl.go index 867c498f..ea96dce8 100644 --- a/pkg/admin/impl.go +++ b/pkg/admin/impl.go @@ -631,6 +631,384 @@ func (p *PulsarAdminClient) RemoveTopicCompactionThreshold(name string, persiste return nil } +func (p *PulsarAdminClient) RemoveTopicMessageTTL(name string, persistent *bool) error { + completeTopicName := MakeCompleteTopicName(name, persistent) + topicName, err := utils.GetTopicName(completeTopicName) + if err != nil { + return err + } + if err := p.adminClient.Topics().RemoveMessageTTL(*topicName); err != nil { + if IsNotFound(err) { + return nil + } + return err + } + return nil +} + +func (p *PulsarAdminClient) RemoveTopicMaxProducers(name string, persistent *bool) error { + completeTopicName := MakeCompleteTopicName(name, persistent) + topicName, err := utils.GetTopicName(completeTopicName) + if err != nil { + return err + } + if err := p.adminClient.Topics().RemoveMaxProducers(*topicName); err != nil { + if IsNotFound(err) { + return nil + } + return err + } + return nil +} + +func (p *PulsarAdminClient) RemoveTopicMaxConsumers(name string, persistent *bool) error { + completeTopicName := MakeCompleteTopicName(name, persistent) + topicName, err := utils.GetTopicName(completeTopicName) + if err != nil { + return err + } + if err := p.adminClient.Topics().RemoveMaxConsumers(*topicName); err != nil { + if IsNotFound(err) { + return nil + } + return err + } + return nil +} + +func (p *PulsarAdminClient) RemoveTopicMaxUnackedMessagesPerConsumer(name string, persistent *bool) error { + completeTopicName := MakeCompleteTopicName(name, persistent) + topicName, err := utils.GetTopicName(completeTopicName) + if err != nil { + return err + } + if err := p.adminClient.Topics().RemoveMaxUnackMessagesPerConsumer(*topicName); err != nil { + if IsNotFound(err) { + return nil + } + return err + } + return nil +} + +func (p *PulsarAdminClient) RemoveTopicMaxUnackedMessagesPerSubscription(name string, persistent *bool) error { + completeTopicName := MakeCompleteTopicName(name, persistent) + topicName, err := utils.GetTopicName(completeTopicName) + if err != nil { + return err + } + if err := p.adminClient.Topics().RemoveMaxUnackMessagesPerSubscription(*topicName); err != nil { + if IsNotFound(err) { + return nil + } + return err + } + return nil +} + +func (p *PulsarAdminClient) RemoveTopicRetention(name string, persistent *bool) error { + completeTopicName := MakeCompleteTopicName(name, persistent) + topicName, err := utils.GetTopicName(completeTopicName) + if err != nil { + return err + } + if err := p.adminClient.Topics().RemoveRetention(*topicName); err != nil { + if IsNotFound(err) { + return nil + } + return err + } + return nil +} + +func (p *PulsarAdminClient) RemoveTopicBacklogQuota(name string, persistent *bool, quotaType string) error { + if quotaType == "" { + return nil + } + completeTopicName := MakeCompleteTopicName(name, persistent) + topicName, err := utils.GetTopicName(completeTopicName) + if err != nil { + return err + } + if err := p.adminClient.Topics().RemoveBacklogQuota(*topicName, utils.BacklogQuotaType(quotaType)); err != nil { + if IsNotFound(err) { + return nil + } + return err + } + return nil +} + +func (p *PulsarAdminClient) RemoveTopicDeduplicationStatus(name string, persistent *bool) error { + completeTopicName := MakeCompleteTopicName(name, persistent) + topicName, err := utils.GetTopicName(completeTopicName) + if err != nil { + return err + } + if err := p.adminClient.Topics().RemoveDeduplicationStatus(*topicName); err != nil { + if IsNotFound(err) { + return nil + } + return err + } + return nil +} + +func (p *PulsarAdminClient) RemoveTopicPersistence(name string, persistent *bool) error { + completeTopicName := MakeCompleteTopicName(name, persistent) + topicName, err := utils.GetTopicName(completeTopicName) + if err != nil { + return err + } + if err := p.adminClient.Topics().RemovePersistence(*topicName); err != nil { + if IsNotFound(err) { + return nil + } + return err + } + return nil +} + +func (p *PulsarAdminClient) RemoveTopicDelayedDelivery(name string, persistent *bool) error { + completeTopicName := MakeCompleteTopicName(name, persistent) + topicName, err := utils.GetTopicName(completeTopicName) + if err != nil { + return err + } + if err := p.adminClient.Topics().RemoveDelayedDelivery(*topicName); err != nil { + if IsNotFound(err) { + return nil + } + return err + } + return nil +} + +func (p *PulsarAdminClient) RemoveTopicDispatchRate(name string, persistent *bool) error { + completeTopicName := MakeCompleteTopicName(name, persistent) + topicName, err := utils.GetTopicName(completeTopicName) + if err != nil { + return err + } + if err := p.adminClient.Topics().RemoveDispatchRate(*topicName); err != nil { + if IsNotFound(err) { + return nil + } + return err + } + return nil +} + +func (p *PulsarAdminClient) RemoveTopicPublishRate(name string, persistent *bool) error { + completeTopicName := MakeCompleteTopicName(name, persistent) + topicName, err := utils.GetTopicName(completeTopicName) + if err != nil { + return err + } + if err := p.adminClient.Topics().RemovePublishRate(*topicName); err != nil { + if IsNotFound(err) { + return nil + } + return err + } + return nil +} + +func (p *PulsarAdminClient) RemoveTopicInactiveTopicPolicies(name string, persistent *bool) error { + completeTopicName := MakeCompleteTopicName(name, persistent) + topicName, err := utils.GetTopicName(completeTopicName) + if err != nil { + return err + } + if err := p.adminClient.Topics().RemoveInactiveTopicPolicies(*topicName); err != nil { + if IsNotFound(err) { + return nil + } + return err + } + return nil +} + +func (p *PulsarAdminClient) RemoveTopicSubscribeRate(name string, persistent *bool) error { + completeTopicName := MakeCompleteTopicName(name, persistent) + topicName, err := utils.GetTopicName(completeTopicName) + if err != nil { + return err + } + if err := p.adminClient.Topics().RemoveSubscribeRate(*topicName); err != nil { + if IsNotFound(err) { + return nil + } + return err + } + return nil +} + +func (p *PulsarAdminClient) RemoveTopicMaxMessageSize(name string, persistent *bool) error { + completeTopicName := MakeCompleteTopicName(name, persistent) + topicName, err := utils.GetTopicName(completeTopicName) + if err != nil { + return err + } + if err := p.adminClient.Topics().RemoveMaxMessageSize(*topicName); err != nil { + if IsNotFound(err) { + return nil + } + return err + } + return nil +} + +func (p *PulsarAdminClient) RemoveTopicMaxConsumersPerSubscription(name string, persistent *bool) error { + completeTopicName := MakeCompleteTopicName(name, persistent) + topicName, err := utils.GetTopicName(completeTopicName) + if err != nil { + return err + } + if err := p.adminClient.Topics().RemoveMaxConsumersPerSubscription(*topicName); err != nil { + if IsNotFound(err) { + return nil + } + return err + } + return nil +} + +func (p *PulsarAdminClient) RemoveTopicMaxSubscriptionsPerTopic(name string, persistent *bool) error { + completeTopicName := MakeCompleteTopicName(name, persistent) + topicName, err := utils.GetTopicName(completeTopicName) + if err != nil { + return err + } + if err := p.adminClient.Topics().RemoveMaxSubscriptionsPerTopic(*topicName); err != nil { + if IsNotFound(err) { + return nil + } + return err + } + return nil +} + +func (p *PulsarAdminClient) RemoveTopicSchemaValidationEnforced(name string, persistent *bool) error { + completeTopicName := MakeCompleteTopicName(name, persistent) + topicName, err := utils.GetTopicName(completeTopicName) + if err != nil { + return err + } + if err := p.adminClient.Topics().RemoveSchemaValidationEnforced(*topicName); err != nil { + if IsNotFound(err) { + return nil + } + return err + } + return nil +} + +func (p *PulsarAdminClient) RemoveTopicSubscriptionDispatchRate(name string, persistent *bool) error { + completeTopicName := MakeCompleteTopicName(name, persistent) + topicName, err := utils.GetTopicName(completeTopicName) + if err != nil { + return err + } + if err := p.adminClient.Topics().RemoveSubscriptionDispatchRate(*topicName); err != nil { + if IsNotFound(err) { + return nil + } + return err + } + return nil +} + +func (p *PulsarAdminClient) RemoveTopicReplicatorDispatchRate(name string, persistent *bool) error { + completeTopicName := MakeCompleteTopicName(name, persistent) + topicName, err := utils.GetTopicName(completeTopicName) + if err != nil { + return err + } + if err := p.adminClient.Topics().RemoveReplicatorDispatchRate(*topicName); err != nil { + if IsNotFound(err) { + return nil + } + return err + } + return nil +} + +func (p *PulsarAdminClient) RemoveTopicDeduplicationSnapshotInterval(name string, persistent *bool) error { + completeTopicName := MakeCompleteTopicName(name, persistent) + topicName, err := utils.GetTopicName(completeTopicName) + if err != nil { + return err + } + if err := p.adminClient.Topics().RemoveDeduplicationSnapshotInterval(*topicName); err != nil { + if IsNotFound(err) { + return nil + } + return err + } + return nil +} + +func (p *PulsarAdminClient) RemoveTopicOffloadPolicies(name string, persistent *bool) error { + completeTopicName := MakeCompleteTopicName(name, persistent) + topicName, err := utils.GetTopicName(completeTopicName) + if err != nil { + return err + } + if err := p.adminClient.Topics().RemoveOffloadPolicies(*topicName); err != nil { + if IsNotFound(err) { + return nil + } + return err + } + return nil +} + +func (p *PulsarAdminClient) RemoveTopicAutoSubscriptionCreation(name string, persistent *bool) error { + completeTopicName := MakeCompleteTopicName(name, persistent) + topicName, err := utils.GetTopicName(completeTopicName) + if err != nil { + return err + } + if err := p.adminClient.Topics().RemoveAutoSubscriptionCreation(*topicName); err != nil { + if IsNotFound(err) { + return nil + } + return err + } + return nil +} + +func (p *PulsarAdminClient) RemoveTopicSchemaCompatibilityStrategy(name string, persistent *bool) error { + completeTopicName := MakeCompleteTopicName(name, persistent) + topicName, err := utils.GetTopicName(completeTopicName) + if err != nil { + return err + } + if err := p.adminClient.Topics().RemoveSchemaCompatibilityStrategy(*topicName); err != nil { + if IsNotFound(err) { + return nil + } + return err + } + return nil +} + +func (p *PulsarAdminClient) RemoveTopicProperty(name string, persistent *bool, key string) error { + completeTopicName := MakeCompleteTopicName(name, persistent) + topicName, err := utils.GetTopicName(completeTopicName) + if err != nil { + return err + } + if err := p.adminClient.Topics().RemoveProperty(*topicName, key); err != nil { + if IsNotFound(err) { + return nil + } + return err + } + return nil +} + func (p *PulsarAdminClient) applyNamespacePolicies(completeNSName string, params *NamespaceParams) error { naName, err := utils.GetNamespaceName(completeNSName) if err != nil { diff --git a/pkg/admin/interface.go b/pkg/admin/interface.go index 60baf0b2..23bcd180 100644 --- a/pkg/admin/interface.go +++ b/pkg/admin/interface.go @@ -169,6 +169,81 @@ type PulsarAdmin interface { // RemoveTopicCompactionThreshold removes the compaction threshold from a topic RemoveTopicCompactionThreshold(name string, persistent *bool) error + // RemoveTopicMessageTTL removes the message TTL policy for a topic + RemoveTopicMessageTTL(name string, persistent *bool) error + + // RemoveTopicMaxProducers removes the max producers limit for a topic + RemoveTopicMaxProducers(name string, persistent *bool) error + + // RemoveTopicMaxConsumers removes the max consumers limit for a topic + RemoveTopicMaxConsumers(name string, persistent *bool) error + + // RemoveTopicMaxUnackedMessagesPerConsumer removes the max unacked messages per consumer limit + RemoveTopicMaxUnackedMessagesPerConsumer(name string, persistent *bool) error + + // RemoveTopicMaxUnackedMessagesPerSubscription removes the max unacked messages per subscription limit + RemoveTopicMaxUnackedMessagesPerSubscription(name string, persistent *bool) error + + // RemoveTopicRetention removes the retention policy for a topic + RemoveTopicRetention(name string, persistent *bool) error + + // RemoveTopicBacklogQuota removes a backlog quota policy from a topic + RemoveTopicBacklogQuota(name string, persistent *bool, quotaType string) error + + // RemoveTopicDeduplicationStatus removes the deduplication status policy from a topic + RemoveTopicDeduplicationStatus(name string, persistent *bool) error + + // RemoveTopicPersistence removes persistence policies from a topic + RemoveTopicPersistence(name string, persistent *bool) error + + // RemoveTopicDelayedDelivery removes delayed delivery policy from a topic + RemoveTopicDelayedDelivery(name string, persistent *bool) error + + // RemoveTopicDispatchRate removes dispatch rate policy from a topic + RemoveTopicDispatchRate(name string, persistent *bool) error + + // RemoveTopicPublishRate removes publish rate policy from a topic + RemoveTopicPublishRate(name string, persistent *bool) error + + // RemoveTopicInactiveTopicPolicies removes inactive topic policies from a topic + RemoveTopicInactiveTopicPolicies(name string, persistent *bool) error + + // RemoveTopicSubscribeRate removes subscribe rate policy from a topic + RemoveTopicSubscribeRate(name string, persistent *bool) error + + // RemoveTopicMaxMessageSize removes max message size policy from a topic + RemoveTopicMaxMessageSize(name string, persistent *bool) error + + // RemoveTopicMaxConsumersPerSubscription removes max consumers per subscription policy from a topic + RemoveTopicMaxConsumersPerSubscription(name string, persistent *bool) error + + // RemoveTopicMaxSubscriptionsPerTopic removes max subscriptions per topic policy + RemoveTopicMaxSubscriptionsPerTopic(name string, persistent *bool) error + + // RemoveTopicSchemaValidationEnforced removes schema validation enforced override + RemoveTopicSchemaValidationEnforced(name string, persistent *bool) error + + // RemoveTopicSubscriptionDispatchRate removes subscription dispatch rate policy from a topic + RemoveTopicSubscriptionDispatchRate(name string, persistent *bool) error + + // RemoveTopicReplicatorDispatchRate removes replicator dispatch rate policy from a topic + RemoveTopicReplicatorDispatchRate(name string, persistent *bool) error + + // RemoveTopicDeduplicationSnapshotInterval removes deduplication snapshot interval policy from a topic + RemoveTopicDeduplicationSnapshotInterval(name string, persistent *bool) error + + // RemoveTopicOffloadPolicies removes offload policies from a topic + RemoveTopicOffloadPolicies(name string, persistent *bool) error + + // RemoveTopicAutoSubscriptionCreation removes auto subscription creation override for a topic + RemoveTopicAutoSubscriptionCreation(name string, persistent *bool) error + + // RemoveTopicSchemaCompatibilityStrategy removes schema compatibility override for a topic + RemoveTopicSchemaCompatibilityStrategy(name string, persistent *bool) error + + // RemoveTopicProperty removes a topic property + RemoveTopicProperty(name string, persistent *bool, key string) error + // GrantPermissions grants permissions to multiple role with multiple actions // on a namespace or topic, each role will be granted the same actions GrantPermissions(p Permissioner) error diff --git a/pkg/connection/reconcile_topic.go b/pkg/connection/reconcile_topic.go index c93a5cd4..cd4878e6 100644 --- a/pkg/connection/reconcile_topic.go +++ b/pkg/connection/reconcile_topic.go @@ -273,8 +273,8 @@ func (r *PulsarTopicReconciler) ReconcileTopic(ctx context.Context, pulsarAdmin return creationErr } - if err := r.reconcileTopicCompactionState(ctx, topic); err != nil { - log.Error(err, "Failed to reconcile topic compaction state") + if err := r.reconcileTopicPolicyState(ctx, topic); err != nil { + log.Error(err, "Failed to reconcile topic policy state") policyErrs = append(policyErrs, err) } @@ -420,9 +420,9 @@ func pointerValue[T any](value *T) interface{} { return *value } -func (r *PulsarTopicReconciler) reconcileTopicCompactionState(ctx context.Context, topic *resourcev1alpha1.PulsarTopic) error { +func (r *PulsarTopicReconciler) reconcileTopicPolicyState(ctx context.Context, topic *resourcev1alpha1.PulsarTopic) error { original := topic.DeepCopy() - reconciler := newTopicCompactionStateReconciler(r.log, r.conn.pulsarAdmin) + reconciler := newTopicPolicyStateReconciler(r.log, r.conn.pulsarAdmin) changed, err := reconciler.reconcile(ctx, topic) if err != nil { return err @@ -434,7 +434,7 @@ func (r *PulsarTopicReconciler) reconcileTopicCompactionState(ctx context.Contex if err := r.conn.client.Patch(ctx, topic, client.MergeFrom(original)); err != nil { return err } - r.log.V(1).Info("Persisted compaction state annotation", "topic", topic.Name) + r.log.V(1).Info("Persisted topic policy state annotation", "topic", topic.Name) return nil } diff --git a/pkg/connection/topic_compaction_reconciler.go b/pkg/connection/topic_compaction_reconciler.go deleted file mode 100644 index 8628dd16..00000000 --- a/pkg/connection/topic_compaction_reconciler.go +++ /dev/null @@ -1,204 +0,0 @@ -package connection - -import ( - "context" - "encoding/json" - "fmt" - "strconv" - - "github.com/go-logr/logr" - - resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" - "github.com/streamnative/pulsar-resources-operator/pkg/admin" - "github.com/streamnative/pulsar-resources-operator/pkg/reconciler" -) - -const ( - // PulsarTopicCompactionStateAnnotation stores the last applied compaction threshold state - PulsarTopicCompactionStateAnnotation = "pulsartopics.resource.streamnative.io/compaction-state" -) - -// topicCompactionState represents the compaction policy state we track on PulsarTopic resources. -type topicCompactionState struct { - CompactionThreshold *int64 `json:"compactionThreshold,omitempty"` -} - -// topicCompactionStateReconciler reconciles compaction threshold changes using the stateful reconciler helper. -type topicCompactionStateReconciler struct { - base *reconciler.BaseStatefulReconciler[*resourcev1alpha1.PulsarTopic] - admin admin.PulsarAdmin - log logr.Logger - changed bool -} - -func newTopicCompactionStateReconciler(logger logr.Logger, adminClient admin.PulsarAdmin) *topicCompactionStateReconciler { - compactionLogger := logger.WithName("CompactionState") - return &topicCompactionStateReconciler{ - base: reconciler.NewBaseStatefulReconciler[*resourcev1alpha1.PulsarTopic](compactionLogger), - admin: adminClient, - log: compactionLogger, - } -} - -func (r *topicCompactionStateReconciler) reconcile(ctx context.Context, topic *resourcev1alpha1.PulsarTopic) (bool, error) { - r.changed = false - if err := r.base.Reconcile(ctx, topic, r); err != nil { - return false, err - } - return r.changed, nil -} - -// GetStateAnnotationKey implements reconciler.StatefulReconciler. -func (*topicCompactionStateReconciler) GetStateAnnotationKey() string { - return PulsarTopicCompactionStateAnnotation -} - -// ExtractCurrentState implements reconciler.StatefulReconciler. -func (*topicCompactionStateReconciler) ExtractCurrentState(topic *resourcev1alpha1.PulsarTopic) (interface{}, error) { - state := topicCompactionState{} - if topic.Spec.CompactionThreshold != nil { - value := *topic.Spec.CompactionThreshold - state.CompactionThreshold = &value - } - return state, nil -} - -// CompareStates implements reconciler.StatefulReconciler. -func (r *topicCompactionStateReconciler) CompareStates(previous, current interface{}) (reconciler.StateChangeOperations, error) { - prevState, err := decodeTopicCompactionState(previous) - if err != nil { - return reconciler.StateChangeOperations{}, err - } - currState, err := decodeTopicCompactionState(current) - if err != nil { - return reconciler.StateChangeOperations{}, err - } - - prevThreshold := prevState.CompactionThreshold - currThreshold := currState.CompactionThreshold - - ops := reconciler.StateChangeOperations{} - - switch { - case prevThreshold == nil && currThreshold != nil: - ops.ItemsToAdd = append(ops.ItemsToAdd, *currThreshold) - case prevThreshold != nil && currThreshold == nil: - ops.ItemsToRemove = append(ops.ItemsToRemove, *prevThreshold) - case prevThreshold != nil && currThreshold != nil && *prevThreshold != *currThreshold: - ops.ItemsToUpdate = append(ops.ItemsToUpdate, *currThreshold) - default: - return ops, nil - } - - r.changed = true - return ops, nil -} - -// ApplyOperations implements reconciler.StatefulReconciler. -func (r *topicCompactionStateReconciler) ApplyOperations(ctx context.Context, topic *resourcev1alpha1.PulsarTopic, - ops reconciler.StateChangeOperations) error { - for _, item := range ops.ItemsToAdd { - value, ok := item.(int64) - if !ok { - return fmt.Errorf("unexpected compaction threshold type %T in add operation", item) - } - r.log.V(1).Info("Setting topic compaction threshold", - "topicSpecName", topic.Spec.Name, - "threshold", value) - if err := r.admin.SetTopicCompactionThreshold(topic.Spec.Name, topic.Spec.Persistent, value); err != nil { - return err - } - } - - for _, item := range ops.ItemsToUpdate { - value, ok := item.(int64) - if !ok { - return fmt.Errorf("unexpected compaction threshold type %T in update operation", item) - } - r.log.V(1).Info("Updating topic compaction threshold", - "topicSpecName", topic.Spec.Name, - "threshold", value) - if err := r.admin.SetTopicCompactionThreshold(topic.Spec.Name, topic.Spec.Persistent, value); err != nil { - return err - } - } - - if len(ops.ItemsToRemove) > 0 { - var previous interface{} - if len(ops.ItemsToRemove) == 1 { - previous = ops.ItemsToRemove[0] - } else { - previous = ops.ItemsToRemove - } - r.log.V(1).Info("Removing topic compaction threshold", - "topicSpecName", topic.Spec.Name, - "previous", previous) - if err := r.admin.RemoveTopicCompactionThreshold(topic.Spec.Name, topic.Spec.Persistent); err != nil { - return err - } - } - - return nil -} - -// UpdateStateAnnotation implements reconciler.StatefulReconciler. -func (r *topicCompactionStateReconciler) UpdateStateAnnotation(topic *resourcev1alpha1.PulsarTopic, currentState interface{}) error { - return r.base.UpdateAnnotation(topic, PulsarTopicCompactionStateAnnotation, currentState) -} - -func decodeTopicCompactionState(state interface{}) (topicCompactionState, error) { - if state == nil { - return topicCompactionState{}, nil - } - - switch value := state.(type) { - case topicCompactionState: - return value, nil - case map[string]interface{}: - return decodeTopicCompactionStateFromMap(value) - default: - bytes, err := json.Marshal(value) - if err != nil { - return topicCompactionState{}, err - } - var result topicCompactionState - if err := json.Unmarshal(bytes, &result); err != nil { - return topicCompactionState{}, err - } - return result, nil - } -} - -func decodeTopicCompactionStateFromMap(raw map[string]interface{}) (topicCompactionState, error) { - result := topicCompactionState{} - if raw == nil { - return result, nil - } - - value, exists := raw["compactionThreshold"] - if !exists || value == nil { - return result, nil - } - - switch typed := value.(type) { - case float64: - threshold := int64(typed) - result.CompactionThreshold = &threshold - case json.Number: - parsed, err := strconv.ParseInt(string(typed), 10, 64) - if err != nil { - return topicCompactionState{}, err - } - result.CompactionThreshold = &parsed - case int64: - threshold := typed - result.CompactionThreshold = &threshold - case int: - threshold := int64(typed) - result.CompactionThreshold = &threshold - default: - return topicCompactionState{}, fmt.Errorf("unsupported compaction threshold value type %T", value) - } - - return result, nil -} diff --git a/pkg/connection/topic_policy_state_reconciler.go b/pkg/connection/topic_policy_state_reconciler.go new file mode 100644 index 00000000..7bae9a17 --- /dev/null +++ b/pkg/connection/topic_policy_state_reconciler.go @@ -0,0 +1,581 @@ +package connection + +import ( + "context" + "encoding/json" + "fmt" + "sort" + + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" + "github.com/go-logr/logr" + + resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" + "github.com/streamnative/pulsar-resources-operator/pkg/admin" + "github.com/streamnative/pulsar-resources-operator/pkg/reconciler" +) + +const ( + // PulsarTopicPolicyStateAnnotation stores the last applied topic policy state. + // We keep the historic key to remain backward compatible with existing annotations. + PulsarTopicPolicyStateAnnotation = "pulsartopics.resource.streamnative.io/compaction-state" +) + +type topicPolicyState struct { + CompactionThreshold *int64 `json:"compactionThreshold,omitempty"` + MessageTTL bool `json:"messageTTL,omitempty"` + MaxProducers bool `json:"maxProducers,omitempty"` + MaxConsumers bool `json:"maxConsumers,omitempty"` + MaxUnAckedMessagesPerConsumer bool `json:"maxUnAckedMessagesPerConsumer,omitempty"` + MaxUnAckedMessagesPerSubscription bool `json:"maxUnAckedMessagesPerSubscription,omitempty"` + Retention bool `json:"retention,omitempty"` + BacklogQuotaType *string `json:"backlogQuotaType,omitempty"` + Deduplication bool `json:"deduplication,omitempty"` + PersistencePolicies bool `json:"persistencePolicies,omitempty"` + DelayedDelivery bool `json:"delayedDelivery,omitempty"` + DispatchRate bool `json:"dispatchRate,omitempty"` + PublishRate bool `json:"publishRate,omitempty"` + InactiveTopicPolicies bool `json:"inactiveTopicPolicies,omitempty"` + SubscribeRate bool `json:"subscribeRate,omitempty"` + MaxMessageSize bool `json:"maxMessageSize,omitempty"` + MaxConsumersPerSubscription bool `json:"maxConsumersPerSubscription,omitempty"` + MaxSubscriptionsPerTopic bool `json:"maxSubscriptionsPerTopic,omitempty"` + SchemaValidationEnforced bool `json:"schemaValidationEnforced,omitempty"` + SubscriptionDispatchRate bool `json:"subscriptionDispatchRate,omitempty"` + ReplicatorDispatchRate bool `json:"replicatorDispatchRate,omitempty"` + DeduplicationSnapshotInterval bool `json:"deduplicationSnapshotInterval,omitempty"` + OffloadPolicies bool `json:"offloadPolicies,omitempty"` + AutoSubscriptionCreation bool `json:"autoSubscriptionCreation,omitempty"` + SchemaCompatibilityStrategy bool `json:"schemaCompatibilityStrategy,omitempty"` + PropertiesKeys []string `json:"propertiesKeys,omitempty"` +} + +type topicPolicyOperation struct { + Kind string `json:"kind"` + Value interface{} `json:"value,omitempty"` +} + +const ( + policyOpSetCompactionThreshold = "setCompactionThreshold" + policyOpRemoveCompactionThreshold = "removeCompactionThreshold" + policyOpRemoveMessageTTL = "removeMessageTTL" + policyOpRemoveMaxProducers = "removeMaxProducers" + policyOpRemoveMaxConsumers = "removeMaxConsumers" + policyOpRemoveMaxUnackConsumer = "removeMaxUnackMessagesPerConsumer" + policyOpRemoveMaxUnackSubscription = "removeMaxUnackMessagesPerSubscription" + policyOpRemoveRetention = "removeRetention" + policyOpRemoveBacklogQuota = "removeBacklogQuota" + policyOpRemoveDeduplication = "removeDeduplication" + policyOpRemovePersistence = "removePersistence" + policyOpRemoveDelayedDelivery = "removeDelayedDelivery" + policyOpRemoveDispatchRate = "removeDispatchRate" + policyOpRemovePublishRate = "removePublishRate" + policyOpRemoveInactiveTopicPolicies = "removeInactiveTopicPolicies" + policyOpRemoveSubscribeRate = "removeSubscribeRate" + policyOpRemoveMaxMessageSize = "removeMaxMessageSize" + policyOpRemoveMaxConsumersPerSub = "removeMaxConsumersPerSubscription" + policyOpRemoveMaxSubscriptionsPerTopic = "removeMaxSubscriptionsPerTopic" + policyOpRemoveSchemaValidation = "removeSchemaValidationEnforced" + policyOpRemoveSubscriptionDispatch = "removeSubscriptionDispatchRate" + policyOpRemoveReplicatorDispatch = "removeReplicatorDispatchRate" + policyOpRemoveDedupSnapshotInterval = "removeDeduplicationSnapshotInterval" + policyOpRemoveOffloadPolicies = "removeOffloadPolicies" + policyOpRemoveAutoSubscription = "removeAutoSubscriptionCreation" + policyOpRemoveSchemaCompatibility = "removeSchemaCompatibilityStrategy" + policyOpRemoveProperty = "removeProperty" +) + +type topicPolicyStateReconciler struct { + base *reconciler.BaseStatefulReconciler[*resourcev1alpha1.PulsarTopic] + admin admin.PulsarAdmin + log logr.Logger + changed bool +} + +func newTopicPolicyStateReconciler(logger logr.Logger, adminClient admin.PulsarAdmin) *topicPolicyStateReconciler { + stateLogger := logger.WithName("PolicyState") + return &topicPolicyStateReconciler{ + base: reconciler.NewBaseStatefulReconciler[*resourcev1alpha1.PulsarTopic](stateLogger), + admin: adminClient, + log: stateLogger, + } +} + +func (r *topicPolicyStateReconciler) reconcile(ctx context.Context, topic *resourcev1alpha1.PulsarTopic) (bool, error) { + r.changed = false + if err := r.base.Reconcile(ctx, topic, r); err != nil { + return false, err + } + return r.changed, nil +} + +// GetStateAnnotationKey implements reconciler.StatefulReconciler. +func (*topicPolicyStateReconciler) GetStateAnnotationKey() string { + return PulsarTopicPolicyStateAnnotation +} + +// ExtractCurrentState implements reconciler.StatefulReconciler. +func (*topicPolicyStateReconciler) ExtractCurrentState(topic *resourcev1alpha1.PulsarTopic) (interface{}, error) { + state := topicPolicyState{} + + if topic.Spec.CompactionThreshold != nil { + value := *topic.Spec.CompactionThreshold + state.CompactionThreshold = &value + } + if topic.Spec.MessageTTL != nil { + state.MessageTTL = true + } + if topic.Spec.MaxProducers != nil { + state.MaxProducers = true + } + if topic.Spec.MaxConsumers != nil { + state.MaxConsumers = true + } + if topic.Spec.MaxUnAckedMessagesPerConsumer != nil { + state.MaxUnAckedMessagesPerConsumer = true + } + if topic.Spec.MaxUnAckedMessagesPerSubscription != nil { + state.MaxUnAckedMessagesPerSubscription = true + } + if topic.Spec.RetentionTime != nil || topic.Spec.RetentionSize != nil { + state.Retention = true + } + if (topic.Spec.BacklogQuotaLimitTime != nil || topic.Spec.BacklogQuotaLimitSize != nil) && + topic.Spec.BacklogQuotaRetentionPolicy != nil { + var quotaType string + if topic.Spec.BacklogQuotaLimitSize != nil { + quotaType = string(utils.DestinationStorage) + } else { + quotaType = string(utils.MessageAge) + } + state.BacklogQuotaType = "aType + } + if topic.Spec.Deduplication != nil { + state.Deduplication = true + } + if topic.Spec.PersistencePolicies != nil { + state.PersistencePolicies = true + } + if topic.Spec.DelayedDelivery != nil { + state.DelayedDelivery = true + } + if topic.Spec.DispatchRate != nil { + state.DispatchRate = true + } + if topic.Spec.PublishRate != nil { + state.PublishRate = true + } + if topic.Spec.InactiveTopicPolicies != nil { + state.InactiveTopicPolicies = true + } + if topic.Spec.SubscribeRate != nil { + state.SubscribeRate = true + } + if topic.Spec.MaxMessageSize != nil { + state.MaxMessageSize = true + } + if topic.Spec.MaxConsumersPerSubscription != nil { + state.MaxConsumersPerSubscription = true + } + if topic.Spec.MaxSubscriptionsPerTopic != nil { + state.MaxSubscriptionsPerTopic = true + } + if topic.Spec.SchemaValidationEnforced != nil { + state.SchemaValidationEnforced = true + } + if topic.Spec.SubscriptionDispatchRate != nil { + state.SubscriptionDispatchRate = true + } + if topic.Spec.ReplicatorDispatchRate != nil { + state.ReplicatorDispatchRate = true + } + if topic.Spec.DeduplicationSnapshotInterval != nil { + state.DeduplicationSnapshotInterval = true + } + if topic.Spec.OffloadPolicies != nil { + state.OffloadPolicies = true + } + if topic.Spec.AutoSubscriptionCreation != nil { + state.AutoSubscriptionCreation = true + } + if topic.Spec.SchemaCompatibilityStrategy != nil { + state.SchemaCompatibilityStrategy = true + } + if len(topic.Spec.Properties) > 0 { + keys := make([]string, 0, len(topic.Spec.Properties)) + for key := range topic.Spec.Properties { + keys = append(keys, key) + } + sort.Strings(keys) + state.PropertiesKeys = keys + } + + return state, nil +} + +// CompareStates implements reconciler.StatefulReconciler. +func (r *topicPolicyStateReconciler) CompareStates(previous, current interface{}) (reconciler.StateChangeOperations, error) { + prevState, err := decodeTopicPolicyState(previous) + if err != nil { + return reconciler.StateChangeOperations{}, err + } + currState, err := decodeTopicPolicyState(current) + if err != nil { + return reconciler.StateChangeOperations{}, err + } + + ops := reconciler.StateChangeOperations{} + + // Compaction threshold transitions + switch { + case prevState.CompactionThreshold == nil && currState.CompactionThreshold != nil: + r.changed = true + ops.ItemsToAdd = append(ops.ItemsToAdd, topicPolicyOperation{ + Kind: policyOpSetCompactionThreshold, + Value: *currState.CompactionThreshold, + }) + case prevState.CompactionThreshold != nil && currState.CompactionThreshold != nil && + *prevState.CompactionThreshold != *currState.CompactionThreshold: + r.changed = true + ops.ItemsToUpdate = append(ops.ItemsToUpdate, topicPolicyOperation{ + Kind: policyOpSetCompactionThreshold, + Value: *currState.CompactionThreshold, + }) + case prevState.CompactionThreshold != nil && currState.CompactionThreshold == nil: + r.changed = true + ops.ItemsToRemove = append(ops.ItemsToRemove, topicPolicyOperation{Kind: policyOpRemoveCompactionThreshold}) + } + + // Boolean policy removals + r.appendPolicyRemoval(prevState.MessageTTL, currState.MessageTTL, policyOpRemoveMessageTTL, &ops) + r.appendPolicyRemoval(prevState.MaxProducers, currState.MaxProducers, policyOpRemoveMaxProducers, &ops) + r.appendPolicyRemoval(prevState.MaxConsumers, currState.MaxConsumers, policyOpRemoveMaxConsumers, &ops) + r.appendPolicyRemoval(prevState.MaxUnAckedMessagesPerConsumer, currState.MaxUnAckedMessagesPerConsumer, + policyOpRemoveMaxUnackConsumer, &ops) + r.appendPolicyRemoval(prevState.MaxUnAckedMessagesPerSubscription, currState.MaxUnAckedMessagesPerSubscription, + policyOpRemoveMaxUnackSubscription, &ops) + r.appendPolicyRemoval(prevState.Retention, currState.Retention, policyOpRemoveRetention, &ops) + r.appendPolicyRemoval(prevState.Deduplication, currState.Deduplication, policyOpRemoveDeduplication, &ops) + r.appendPolicyRemoval(prevState.PersistencePolicies, currState.PersistencePolicies, policyOpRemovePersistence, &ops) + r.appendPolicyRemoval(prevState.DelayedDelivery, currState.DelayedDelivery, policyOpRemoveDelayedDelivery, &ops) + r.appendPolicyRemoval(prevState.DispatchRate, currState.DispatchRate, policyOpRemoveDispatchRate, &ops) + r.appendPolicyRemoval(prevState.PublishRate, currState.PublishRate, policyOpRemovePublishRate, &ops) + r.appendPolicyRemoval(prevState.InactiveTopicPolicies, currState.InactiveTopicPolicies, + policyOpRemoveInactiveTopicPolicies, &ops) + r.appendPolicyRemoval(prevState.SubscribeRate, currState.SubscribeRate, policyOpRemoveSubscribeRate, &ops) + r.appendPolicyRemoval(prevState.MaxMessageSize, currState.MaxMessageSize, policyOpRemoveMaxMessageSize, &ops) + r.appendPolicyRemoval(prevState.MaxConsumersPerSubscription, currState.MaxConsumersPerSubscription, + policyOpRemoveMaxConsumersPerSub, &ops) + r.appendPolicyRemoval(prevState.MaxSubscriptionsPerTopic, currState.MaxSubscriptionsPerTopic, + policyOpRemoveMaxSubscriptionsPerTopic, &ops) + r.appendPolicyRemoval(prevState.SchemaValidationEnforced, currState.SchemaValidationEnforced, + policyOpRemoveSchemaValidation, &ops) + r.appendPolicyRemoval(prevState.SubscriptionDispatchRate, currState.SubscriptionDispatchRate, + policyOpRemoveSubscriptionDispatch, &ops) + r.appendPolicyRemoval(prevState.ReplicatorDispatchRate, currState.ReplicatorDispatchRate, + policyOpRemoveReplicatorDispatch, &ops) + r.appendPolicyRemoval(prevState.DeduplicationSnapshotInterval, currState.DeduplicationSnapshotInterval, + policyOpRemoveDedupSnapshotInterval, &ops) + r.appendPolicyRemoval(prevState.OffloadPolicies, currState.OffloadPolicies, policyOpRemoveOffloadPolicies, &ops) + r.appendPolicyRemoval(prevState.AutoSubscriptionCreation, currState.AutoSubscriptionCreation, + policyOpRemoveAutoSubscription, &ops) + r.appendPolicyRemoval(prevState.SchemaCompatibilityStrategy, currState.SchemaCompatibilityStrategy, + policyOpRemoveSchemaCompatibility, &ops) + + // Backlog quota + if prevState.BacklogQuotaType != nil { + if currState.BacklogQuotaType == nil || *prevState.BacklogQuotaType != *currState.BacklogQuotaType { + r.changed = true + ops.ItemsToRemove = append(ops.ItemsToRemove, topicPolicyOperation{ + Kind: policyOpRemoveBacklogQuota, + Value: *prevState.BacklogQuotaType, + }) + } + } + + // Topic properties + if len(prevState.PropertiesKeys) > 0 { + currKeys := make(map[string]struct{}, len(currState.PropertiesKeys)) + for _, key := range currState.PropertiesKeys { + currKeys[key] = struct{}{} + } + for _, key := range prevState.PropertiesKeys { + if _, exists := currKeys[key]; !exists { + r.changed = true + ops.ItemsToRemove = append(ops.ItemsToRemove, topicPolicyOperation{ + Kind: policyOpRemoveProperty, + Value: key, + }) + } + } + } + + return ops, nil +} + +// ApplyOperations implements reconciler.StatefulReconciler. +func (r *topicPolicyStateReconciler) ApplyOperations(ctx context.Context, topic *resourcev1alpha1.PulsarTopic, + ops reconciler.StateChangeOperations) error { + for _, item := range append(ops.ItemsToAdd, ops.ItemsToUpdate...) { + op, err := normalizeTopicPolicyOperation(item) + if err != nil { + return err + } + switch op.Kind { + case policyOpSetCompactionThreshold: + value, err := toInt64(op.Value) + if err != nil { + return err + } + r.log.V(1).Info("Setting topic compaction threshold", + "topicSpecName", topic.Spec.Name, + "threshold", value) + if err := r.admin.SetTopicCompactionThreshold(topic.Spec.Name, topic.Spec.Persistent, value); err != nil { + return err + } + default: + return fmt.Errorf("unsupported topic policy operation kind %q in add/update phase", op.Kind) + } + } + + for _, item := range ops.ItemsToRemove { + op, err := normalizeTopicPolicyOperation(item) + if err != nil { + return err + } + switch op.Kind { + case policyOpRemoveCompactionThreshold: + r.log.V(1).Info("Removing topic compaction threshold", "topicSpecName", topic.Spec.Name) + if err := r.admin.RemoveTopicCompactionThreshold(topic.Spec.Name, topic.Spec.Persistent); err != nil { + return err + } + case policyOpRemoveMessageTTL: + r.log.V(1).Info("Removing topic message TTL", "topicSpecName", topic.Spec.Name) + if err := r.admin.RemoveTopicMessageTTL(topic.Spec.Name, topic.Spec.Persistent); err != nil { + return err + } + case policyOpRemoveMaxProducers: + r.log.V(1).Info("Removing topic max producers", "topicSpecName", topic.Spec.Name) + if err := r.admin.RemoveTopicMaxProducers(topic.Spec.Name, topic.Spec.Persistent); err != nil { + return err + } + case policyOpRemoveMaxConsumers: + r.log.V(1).Info("Removing topic max consumers", "topicSpecName", topic.Spec.Name) + if err := r.admin.RemoveTopicMaxConsumers(topic.Spec.Name, topic.Spec.Persistent); err != nil { + return err + } + case policyOpRemoveMaxUnackConsumer: + r.log.V(1).Info("Removing topic max unacked messages per consumer", "topicSpecName", topic.Spec.Name) + if err := r.admin.RemoveTopicMaxUnackedMessagesPerConsumer(topic.Spec.Name, topic.Spec.Persistent); err != nil { + return err + } + case policyOpRemoveMaxUnackSubscription: + r.log.V(1).Info("Removing topic max unacked messages per subscription", "topicSpecName", topic.Spec.Name) + if err := r.admin.RemoveTopicMaxUnackedMessagesPerSubscription(topic.Spec.Name, topic.Spec.Persistent); err != nil { + return err + } + case policyOpRemoveRetention: + r.log.V(1).Info("Removing topic retention policy", "topicSpecName", topic.Spec.Name) + if err := r.admin.RemoveTopicRetention(topic.Spec.Name, topic.Spec.Persistent); err != nil { + return err + } + case policyOpRemoveBacklogQuota: + quotaType, err := toString(op.Value) + if err != nil { + return err + } + r.log.V(1).Info("Removing topic backlog quota", "topicSpecName", topic.Spec.Name, "type", quotaType) + if err := r.admin.RemoveTopicBacklogQuota(topic.Spec.Name, topic.Spec.Persistent, quotaType); err != nil { + return err + } + case policyOpRemoveDeduplication: + r.log.V(1).Info("Removing topic deduplication status", "topicSpecName", topic.Spec.Name) + if err := r.admin.RemoveTopicDeduplicationStatus(topic.Spec.Name, topic.Spec.Persistent); err != nil { + return err + } + case policyOpRemovePersistence: + r.log.V(1).Info("Removing topic persistence policies", "topicSpecName", topic.Spec.Name) + if err := r.admin.RemoveTopicPersistence(topic.Spec.Name, topic.Spec.Persistent); err != nil { + return err + } + case policyOpRemoveDelayedDelivery: + r.log.V(1).Info("Removing topic delayed delivery policy", "topicSpecName", topic.Spec.Name) + if err := r.admin.RemoveTopicDelayedDelivery(topic.Spec.Name, topic.Spec.Persistent); err != nil { + return err + } + case policyOpRemoveDispatchRate: + r.log.V(1).Info("Removing topic dispatch rate", "topicSpecName", topic.Spec.Name) + if err := r.admin.RemoveTopicDispatchRate(topic.Spec.Name, topic.Spec.Persistent); err != nil { + return err + } + case policyOpRemovePublishRate: + r.log.V(1).Info("Removing topic publish rate", "topicSpecName", topic.Spec.Name) + if err := r.admin.RemoveTopicPublishRate(topic.Spec.Name, topic.Spec.Persistent); err != nil { + return err + } + case policyOpRemoveInactiveTopicPolicies: + r.log.V(1).Info("Removing inactive topic policies", "topicSpecName", topic.Spec.Name) + if err := r.admin.RemoveTopicInactiveTopicPolicies(topic.Spec.Name, topic.Spec.Persistent); err != nil { + return err + } + case policyOpRemoveSubscribeRate: + r.log.V(1).Info("Removing topic subscribe rate", "topicSpecName", topic.Spec.Name) + if err := r.admin.RemoveTopicSubscribeRate(topic.Spec.Name, topic.Spec.Persistent); err != nil { + return err + } + case policyOpRemoveMaxMessageSize: + r.log.V(1).Info("Removing topic max message size", "topicSpecName", topic.Spec.Name) + if err := r.admin.RemoveTopicMaxMessageSize(topic.Spec.Name, topic.Spec.Persistent); err != nil { + return err + } + case policyOpRemoveMaxConsumersPerSub: + r.log.V(1).Info("Removing topic max consumers per subscription", "topicSpecName", topic.Spec.Name) + if err := r.admin.RemoveTopicMaxConsumersPerSubscription(topic.Spec.Name, topic.Spec.Persistent); err != nil { + return err + } + case policyOpRemoveMaxSubscriptionsPerTopic: + r.log.V(1).Info("Removing topic max subscriptions per topic", "topicSpecName", topic.Spec.Name) + if err := r.admin.RemoveTopicMaxSubscriptionsPerTopic(topic.Spec.Name, topic.Spec.Persistent); err != nil { + return err + } + case policyOpRemoveSchemaValidation: + r.log.V(1).Info("Removing topic schema validation enforced flag", "topicSpecName", topic.Spec.Name) + if err := r.admin.RemoveTopicSchemaValidationEnforced(topic.Spec.Name, topic.Spec.Persistent); err != nil { + return err + } + case policyOpRemoveSubscriptionDispatch: + r.log.V(1).Info("Removing topic subscription dispatch rate", "topicSpecName", topic.Spec.Name) + if err := r.admin.RemoveTopicSubscriptionDispatchRate(topic.Spec.Name, topic.Spec.Persistent); err != nil { + return err + } + case policyOpRemoveReplicatorDispatch: + r.log.V(1).Info("Removing topic replicator dispatch rate", "topicSpecName", topic.Spec.Name) + if err := r.admin.RemoveTopicReplicatorDispatchRate(topic.Spec.Name, topic.Spec.Persistent); err != nil { + return err + } + case policyOpRemoveDedupSnapshotInterval: + r.log.V(1).Info("Removing topic deduplication snapshot interval", "topicSpecName", topic.Spec.Name) + if err := r.admin.RemoveTopicDeduplicationSnapshotInterval(topic.Spec.Name, topic.Spec.Persistent); err != nil { + return err + } + case policyOpRemoveOffloadPolicies: + r.log.V(1).Info("Removing topic offload policies", "topicSpecName", topic.Spec.Name) + if err := r.admin.RemoveTopicOffloadPolicies(topic.Spec.Name, topic.Spec.Persistent); err != nil { + return err + } + case policyOpRemoveAutoSubscription: + r.log.V(1).Info("Removing topic auto subscription creation override", "topicSpecName", topic.Spec.Name) + if err := r.admin.RemoveTopicAutoSubscriptionCreation(topic.Spec.Name, topic.Spec.Persistent); err != nil { + return err + } + case policyOpRemoveSchemaCompatibility: + r.log.V(1).Info("Removing topic schema compatibility strategy override", "topicSpecName", topic.Spec.Name) + if err := r.admin.RemoveTopicSchemaCompatibilityStrategy(topic.Spec.Name, topic.Spec.Persistent); err != nil { + return err + } + case policyOpRemoveProperty: + key, err := toString(op.Value) + if err != nil { + return err + } + r.log.V(1).Info("Removing topic property", "topicSpecName", topic.Spec.Name, "key", key) + if err := r.admin.RemoveTopicProperty(topic.Spec.Name, topic.Spec.Persistent, key); err != nil { + return err + } + default: + return fmt.Errorf("unsupported topic policy operation kind %q in removal phase", op.Kind) + } + } + + return nil +} + +// UpdateStateAnnotation implements reconciler.StatefulReconciler. +func (r *topicPolicyStateReconciler) UpdateStateAnnotation(topic *resourcev1alpha1.PulsarTopic, currentState interface{}) error { + return r.base.UpdateAnnotation(topic, PulsarTopicPolicyStateAnnotation, currentState) +} + +func (r *topicPolicyStateReconciler) appendPolicyRemoval(previous, current bool, kind string, + ops *reconciler.StateChangeOperations) { + if previous && !current { + r.changed = true + ops.ItemsToRemove = append(ops.ItemsToRemove, topicPolicyOperation{Kind: kind}) + } +} + +func decodeTopicPolicyState(state interface{}) (topicPolicyState, error) { + if state == nil { + return topicPolicyState{}, nil + } + + switch value := state.(type) { + case topicPolicyState: + return value, nil + case map[string]interface{}: + bytes, err := json.Marshal(value) + if err != nil { + return topicPolicyState{}, err + } + var result topicPolicyState + if err := json.Unmarshal(bytes, &result); err != nil { + return topicPolicyState{}, err + } + return result, nil + default: + bytes, err := json.Marshal(value) + if err != nil { + return topicPolicyState{}, err + } + var result topicPolicyState + if err := json.Unmarshal(bytes, &result); err != nil { + return topicPolicyState{}, err + } + return result, nil + } +} + +func normalizeTopicPolicyOperation(item interface{}) (topicPolicyOperation, error) { + switch value := item.(type) { + case topicPolicyOperation: + return value, nil + case map[string]interface{}: + bytes, err := json.Marshal(value) + if err != nil { + return topicPolicyOperation{}, err + } + var op topicPolicyOperation + if err := json.Unmarshal(bytes, &op); err != nil { + return topicPolicyOperation{}, err + } + return op, nil + default: + return topicPolicyOperation{}, fmt.Errorf("unexpected topic policy operation type %T", item) + } +} + +func toInt64(value interface{}) (int64, error) { + switch typed := value.(type) { + case int64: + return typed, nil + case int32: + return int64(typed), nil + case int: + return int64(typed), nil + case float64: + return int64(typed), nil + case json.Number: + return typed.Int64() + default: + return 0, fmt.Errorf("unsupported int64 value type %T", value) + } +} + +func toString(value interface{}) (string, error) { + switch typed := value.(type) { + case string: + return typed, nil + case json.Number: + return typed.String(), nil + default: + return "", fmt.Errorf("unsupported string value type %T", value) + } +} diff --git a/tests/operator/resources_test.go b/tests/operator/resources_test.go index 2ea6c8ec..d19783e6 100644 --- a/tests/operator/resources_test.go +++ b/tests/operator/resources_test.go @@ -504,7 +504,7 @@ var _ = Describe("Resources", func() { return true } - rawState, exists := annotations[connection.PulsarTopicCompactionStateAnnotation] + rawState, exists := annotations[connection.PulsarTopicPolicyStateAnnotation] if !exists { return true } From 092ea11ed0c9b2699a48ebd41c1bb431fa2d9e77 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Fri, 24 Oct 2025 20:13:11 +0800 Subject: [PATCH 4/7] fix header --- pkg/connection/topic_policy_state_reconciler.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/pkg/connection/topic_policy_state_reconciler.go b/pkg/connection/topic_policy_state_reconciler.go index 7bae9a17..efb0b65a 100644 --- a/pkg/connection/topic_policy_state_reconciler.go +++ b/pkg/connection/topic_policy_state_reconciler.go @@ -1,3 +1,17 @@ +// Copyright 2025 StreamNative +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package connection import ( From 85a66641af5f8d1cbf2cbd6d5f504875c2d72558 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Wed, 29 Oct 2025 23:13:13 +0800 Subject: [PATCH 5/7] fix deploy --- config/crd/kustomization.yaml | 39 ++++++++++++----------- pkg/admin/impl.go | 60 +++++++++++++++++++++++++++++------ 2 files changed, 71 insertions(+), 28 deletions(-) diff --git a/config/crd/kustomization.yaml b/config/crd/kustomization.yaml index 7007fbee..8006bcd0 100644 --- a/config/crd/kustomization.yaml +++ b/config/crd/kustomization.yaml @@ -16,24 +16,25 @@ # since it depends on service name and namespace that are out of this kustomize package. # It should be run by config/default resources: -- bases/resource.streamnative.io_pulsarconnections.yaml -- bases/resource.streamnative.io_pulsarnamespaces.yaml -- bases/resource.streamnative.io_pulsartenants.yaml -- bases/resource.streamnative.io_pulsartopics.yaml -- bases/resource.streamnative.io_pulsarpermissions.yaml -- bases/resource.streamnative.io_pulsargeoreplications.yaml -- bases/resource.streamnative.io_pulsarfunctions.yaml -- bases/resource.streamnative.io_pulsarpackages.yaml -- bases/resource.streamnative.io_pulsarsinks.yaml -- bases/resource.streamnative.io_pulsarsources.yaml -- bases/resource.streamnative.io_pulsarnsisolationpolicies.yaml -- bases/resource.streamnative.io_streamnativecloudconnections.yaml -- bases/resource.streamnative.io_computeworkspaces.yaml -- bases/resource.streamnative.io_computeflinkdeployments.yaml -- bases/resource.streamnative.io_secrets.yaml -- bases/resource.streamnative.io_serviceaccounts.yaml -- bases/resource.streamnative.io_serviceaccountbindings.yaml -- bases/resource.streamnative.io_apikeys.yaml + - bases/resource.streamnative.io_pulsarconnections.yaml + - bases/resource.streamnative.io_pulsarnamespaces.yaml + - bases/resource.streamnative.io_pulsartenants.yaml + - bases/resource.streamnative.io_pulsartopics.yaml + - bases/resource.streamnative.io_pulsarpermissions.yaml + - bases/resource.streamnative.io_pulsargeoreplications.yaml + - bases/resource.streamnative.io_pulsarfunctions.yaml + - bases/resource.streamnative.io_pulsarpackages.yaml + - bases/resource.streamnative.io_pulsarsinks.yaml + - bases/resource.streamnative.io_pulsarsources.yaml + - bases/resource.streamnative.io_pulsarnsisolationpolicies.yaml + - bases/resource.streamnative.io_streamnativecloudconnections.yaml + - bases/resource.streamnative.io_computeworkspaces.yaml + - bases/resource.streamnative.io_computeflinkdeployments.yaml + - bases/resource.streamnative.io_secrets.yaml + - bases/resource.streamnative.io_serviceaccounts.yaml + - bases/resource.streamnative.io_serviceaccountbindings.yaml + - bases/resource.streamnative.io_apikeys.yaml + - bases/resource.streamnative.io_rolebindings.yaml #+kubebuilder:scaffold:crdkustomizeresource patchesStrategicMerge: @@ -59,4 +60,4 @@ patchesStrategicMerge: # the following config is for teaching kustomize how to do kustomization for CRDs. configurations: -- kustomizeconfig.yaml + - kustomizeconfig.yaml diff --git a/pkg/admin/impl.go b/pkg/admin/impl.go index ea96dce8..c71a4ea4 100644 --- a/pkg/admin/impl.go +++ b/pkg/admin/impl.go @@ -280,6 +280,7 @@ func (p *PulsarAdminClient) applyTopicPolicies(topicName *utils.TopicName, param } } + var retentionPolicy *utils.RetentionPolicies if params.RetentionTime != nil || params.RetentionSize != nil { retentionTime := -1 retentionSize := -1 @@ -301,18 +302,16 @@ func (p *PulsarAdminClient) applyTopicPolicies(topicName *utils.TopicName, param retentionSize = int(params.RetentionSize.ScaledValue(resource.Mega)) } } - retentionPolicy := utils.NewRetentionPolicies(retentionTime, retentionSize) - err = p.adminClient.Topics().SetRetention(*topicName, retentionPolicy) - if err != nil { - return err - } + policy := utils.NewRetentionPolicies(retentionTime, retentionSize) + retentionPolicy = &policy } + var backlogQuotaPolicy *utils.BacklogQuota + var backlogQuotaType utils.BacklogQuotaType if (params.BacklogQuotaLimitTime != nil || params.BacklogQuotaLimitSize != nil) && params.BacklogQuotaRetentionPolicy != nil { backlogTime := int64(-1) backlogSize := int64(-1) - var backlogQuotaType utils.BacklogQuotaType if params.BacklogQuotaLimitTime != nil { t, err := params.BacklogQuotaLimitTime.Parse() if err != nil { @@ -325,13 +324,24 @@ func (p *PulsarAdminClient) applyTopicPolicies(topicName *utils.TopicName, param backlogSize = params.BacklogQuotaLimitSize.Value() backlogQuotaType = utils.DestinationStorage } - backlogQuotaPolicy := utils.BacklogQuota{ + backlogQuotaPolicy = &utils.BacklogQuota{ LimitTime: backlogTime, LimitSize: backlogSize, Policy: utils.RetentionPolicy(*params.BacklogQuotaRetentionPolicy), } - err = p.adminClient.Topics().SetBacklogQuota(*topicName, backlogQuotaPolicy, backlogQuotaType) - if err != nil { + } + + switch { + case retentionPolicy != nil && backlogQuotaPolicy != nil: + if err := p.applyRetentionAndBacklogPolicies(topicName, retentionPolicy, backlogQuotaPolicy, backlogQuotaType); err != nil { + return err + } + case retentionPolicy != nil: + if err := p.adminClient.Topics().SetRetention(*topicName, *retentionPolicy); err != nil { + return err + } + case backlogQuotaPolicy != nil: + if err := p.adminClient.Topics().SetBacklogQuota(*topicName, *backlogQuotaPolicy, backlogQuotaType); err != nil { return err } } @@ -577,6 +587,38 @@ func (p *PulsarAdminClient) applyTopicPolicies(topicName *utils.TopicName, param return nil } +func (p *PulsarAdminClient) applyRetentionAndBacklogPolicies(topicName *utils.TopicName, retention *utils.RetentionPolicies, + backlog *utils.BacklogQuota, backlogType utils.BacklogQuotaType) error { + if err := p.adminClient.Topics().SetRetention(*topicName, *retention); err != nil { + if !isRetentionBacklogOrderingError(err) { + return err + } + + if err := p.adminClient.Topics().SetBacklogQuota(*topicName, *backlog, backlogType); err != nil { + return err + } + + if err := p.adminClient.Topics().SetRetention(*topicName, *retention); err != nil { + return err + } + + return nil + } + + if err := p.adminClient.Topics().SetBacklogQuota(*topicName, *backlog, backlogType); err != nil { + return err + } + + return nil +} + +func isRetentionBacklogOrderingError(err error) bool { + if ErrorReason(err) != ReasonInvalidParameter { + return false + } + return strings.Contains(err.Error(), "Retention Quota must exceed configured backlog quota") +} + // GetTopicClusters get the assigned clusters of the topic to the local default cluster func (p *PulsarAdminClient) GetTopicClusters(name string, persistent *bool) ([]string, error) { completeTopicName := MakeCompleteTopicName(name, persistent) From 0139c4c641aa3bb9ce0da59e80e2828eb64d5573 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Wed, 29 Oct 2025 23:23:34 +0800 Subject: [PATCH 6/7] add docs and warn about compaction subscription --- docs/pulsar_topic.md | 4 ++-- pkg/connection/topic_policy_state_reconciler.go | 14 +++++++++++--- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/docs/pulsar_topic.md b/docs/pulsar_topic.md index 5e1888a6..f465082c 100644 --- a/docs/pulsar_topic.md +++ b/docs/pulsar_topic.md @@ -29,7 +29,7 @@ The `PulsarTopic` resource defines a topic in a Pulsar cluster. It allows you to | `geoReplicationRefs` | List of references to PulsarGeoReplication resources, used to enable geo-replication at the topic level. | No | | `replicationClusters` | List of clusters to which the topic is replicated. Use only if replicating clusters within the same Pulsar instance. | No | | `deduplication` | whether to enable message deduplication for the topic. | No | -| `compactionThreshold` | Size threshold in bytes for automatic topic compaction. When the topic reaches this size, compaction will be triggered automatically. | No | +| `compactionThreshold` | Size threshold in bytes for automatic topic compaction. When the topic reaches this size, compaction will be triggered automatically. If you later disable compaction (removing this field or setting it to `0`), manually delete the `__compaction` subscription to avoid backlog accumulation. | No | | `persistencePolicies` | Persistence configuration for the topic, controlling how data is stored and replicated in BookKeeper. See [persistencePolicies](#persistencePolicies) for more details. | No | | `delayedDelivery` | Delayed delivery policy for the topic, allowing messages to be delivered with a delay. See [delayedDelivery](#delayedDelivery) for more details. | No | | `dispatchRate` | Message dispatch rate limiting policy for the topic, controlling the rate at which messages are delivered to consumers. See [dispatchRate](#dispatchRate) for more details. | No | @@ -548,4 +548,4 @@ This example demonstrates a production-ready topic configuration with: - Message size limits (1MB maximum) - Consumer and subscription limits per topic - Custom properties for metadata tracking -- Controlled auto-subscription creation (disabled for production) \ No newline at end of file +- Controlled auto-subscription creation (disabled for production) diff --git a/pkg/connection/topic_policy_state_reconciler.go b/pkg/connection/topic_policy_state_reconciler.go index efb0b65a..1d4a41a8 100644 --- a/pkg/connection/topic_policy_state_reconciler.go +++ b/pkg/connection/topic_policy_state_reconciler.go @@ -340,9 +340,15 @@ func (r *topicPolicyStateReconciler) ApplyOperations(ctx context.Context, topic if err != nil { return err } - r.log.V(1).Info("Setting topic compaction threshold", - "topicSpecName", topic.Spec.Name, - "threshold", value) + if value <= 0 { + r.log.Info("WARN: topic compaction disabled; remove __compaction subscription to avoid backlog accumulation", + "topicSpecName", topic.Spec.Name, + "requestedThreshold", value) + } else { + r.log.V(1).Info("Setting topic compaction threshold", + "topicSpecName", topic.Spec.Name, + "threshold", value) + } if err := r.admin.SetTopicCompactionThreshold(topic.Spec.Name, topic.Spec.Persistent, value); err != nil { return err } @@ -358,6 +364,8 @@ func (r *topicPolicyStateReconciler) ApplyOperations(ctx context.Context, topic } switch op.Kind { case policyOpRemoveCompactionThreshold: + r.log.Info("WARN: topic compaction disabled; remove __compaction subscription to avoid backlog accumulation", + "topicSpecName", topic.Spec.Name) r.log.V(1).Info("Removing topic compaction threshold", "topicSpecName", topic.Spec.Name) if err := r.admin.RemoveTopicCompactionThreshold(topic.Spec.Name, topic.Spec.Persistent); err != nil { return err From c5f84d9fcc4c13af3c14dcbb22ad2a3b770c64e0 Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Thu, 30 Oct 2025 16:00:25 +0800 Subject: [PATCH 7/7] fix state annotation --- pkg/connection/topic_policy_state_reconciler.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/connection/topic_policy_state_reconciler.go b/pkg/connection/topic_policy_state_reconciler.go index 1d4a41a8..40d06fbf 100644 --- a/pkg/connection/topic_policy_state_reconciler.go +++ b/pkg/connection/topic_policy_state_reconciler.go @@ -30,8 +30,7 @@ import ( const ( // PulsarTopicPolicyStateAnnotation stores the last applied topic policy state. - // We keep the historic key to remain backward compatible with existing annotations. - PulsarTopicPolicyStateAnnotation = "pulsartopics.resource.streamnative.io/compaction-state" + PulsarTopicPolicyStateAnnotation = "pulsartopics.resource.streamnative.io/managed-state" ) type topicPolicyState struct {