Skip to content

Commit 30ac0ad

Browse files
authored
make pulsar topic controller as stateful reconciler (#349)
* make pulsar topic controller as statefui reconciler * add debug logging * address other policies * fix header * fix deploy * add docs and warn about compaction subscription * fix state annotation
1 parent e1333fd commit 30ac0ad

File tree

8 files changed

+1424
-40
lines changed

8 files changed

+1424
-40
lines changed

config/crd/kustomization.yaml

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,25 @@
1616
# since it depends on service name and namespace that are out of this kustomize package.
1717
# It should be run by config/default
1818
resources:
19-
- bases/resource.streamnative.io_pulsarconnections.yaml
20-
- bases/resource.streamnative.io_pulsarnamespaces.yaml
21-
- bases/resource.streamnative.io_pulsartenants.yaml
22-
- bases/resource.streamnative.io_pulsartopics.yaml
23-
- bases/resource.streamnative.io_pulsarpermissions.yaml
24-
- bases/resource.streamnative.io_pulsargeoreplications.yaml
25-
- bases/resource.streamnative.io_pulsarfunctions.yaml
26-
- bases/resource.streamnative.io_pulsarpackages.yaml
27-
- bases/resource.streamnative.io_pulsarsinks.yaml
28-
- bases/resource.streamnative.io_pulsarsources.yaml
29-
- bases/resource.streamnative.io_pulsarnsisolationpolicies.yaml
30-
- bases/resource.streamnative.io_streamnativecloudconnections.yaml
31-
- bases/resource.streamnative.io_computeworkspaces.yaml
32-
- bases/resource.streamnative.io_computeflinkdeployments.yaml
33-
- bases/resource.streamnative.io_secrets.yaml
34-
- bases/resource.streamnative.io_serviceaccounts.yaml
35-
- bases/resource.streamnative.io_serviceaccountbindings.yaml
36-
- bases/resource.streamnative.io_apikeys.yaml
19+
- bases/resource.streamnative.io_pulsarconnections.yaml
20+
- bases/resource.streamnative.io_pulsarnamespaces.yaml
21+
- bases/resource.streamnative.io_pulsartenants.yaml
22+
- bases/resource.streamnative.io_pulsartopics.yaml
23+
- bases/resource.streamnative.io_pulsarpermissions.yaml
24+
- bases/resource.streamnative.io_pulsargeoreplications.yaml
25+
- bases/resource.streamnative.io_pulsarfunctions.yaml
26+
- bases/resource.streamnative.io_pulsarpackages.yaml
27+
- bases/resource.streamnative.io_pulsarsinks.yaml
28+
- bases/resource.streamnative.io_pulsarsources.yaml
29+
- bases/resource.streamnative.io_pulsarnsisolationpolicies.yaml
30+
- bases/resource.streamnative.io_streamnativecloudconnections.yaml
31+
- bases/resource.streamnative.io_computeworkspaces.yaml
32+
- bases/resource.streamnative.io_computeflinkdeployments.yaml
33+
- bases/resource.streamnative.io_secrets.yaml
34+
- bases/resource.streamnative.io_serviceaccounts.yaml
35+
- bases/resource.streamnative.io_serviceaccountbindings.yaml
36+
- bases/resource.streamnative.io_apikeys.yaml
37+
- bases/resource.streamnative.io_rolebindings.yaml
3738
#+kubebuilder:scaffold:crdkustomizeresource
3839

3940
patchesStrategicMerge:
@@ -59,4 +60,4 @@ patchesStrategicMerge:
5960

6061
# the following config is for teaching kustomize how to do kustomization for CRDs.
6162
configurations:
62-
- kustomizeconfig.yaml
63+
- kustomizeconfig.yaml

docs/pulsar_topic.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ The `PulsarTopic` resource defines a topic in a Pulsar cluster. It allows you to
2929
| `geoReplicationRefs` | List of references to PulsarGeoReplication resources, used to enable geo-replication at the topic level. | No |
3030
| `replicationClusters` | List of clusters to which the topic is replicated. Use only if replicating clusters within the same Pulsar instance. | No |
3131
| `deduplication` | whether to enable message deduplication for the topic. | No |
32-
| `compactionThreshold` | Size threshold in bytes for automatic topic compaction. When the topic reaches this size, compaction will be triggered automatically. | No |
32+
| `compactionThreshold` | Size threshold in bytes for automatic topic compaction. When the topic reaches this size, compaction will be triggered automatically. If you later disable compaction (removing this field or setting it to `0`), manually delete the `__compaction` subscription to avoid backlog accumulation. | No |
3333
| `persistencePolicies` | Persistence configuration for the topic, controlling how data is stored and replicated in BookKeeper. See [persistencePolicies](#persistencePolicies) for more details. | No |
3434
| `delayedDelivery` | Delayed delivery policy for the topic, allowing messages to be delivered with a delay. See [delayedDelivery](#delayedDelivery) for more details. | No |
3535
| `dispatchRate` | Message dispatch rate limiting policy for the topic, controlling the rate at which messages are delivered to consumers. See [dispatchRate](#dispatchRate) for more details. | No |
@@ -548,4 +548,4 @@ This example demonstrates a production-ready topic configuration with:
548548
- Message size limits (1MB maximum)
549549
- Consumer and subscription limits per topic
550550
- Custom properties for metadata tracking
551-
- Controlled auto-subscription creation (disabled for production)
551+
- Controlled auto-subscription creation (disabled for production)

pkg/admin/dummy.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,116 @@ func (d *DummyPulsarAdmin) SetTopicClusters(string, *bool, []string) error {
9696
return nil
9797
}
9898

99+
// SetTopicCompactionThreshold is a fake implementation of SetTopicCompactionThreshold
100+
func (d *DummyPulsarAdmin) SetTopicCompactionThreshold(string, *bool, int64) error {
101+
return nil
102+
}
103+
104+
// RemoveTopicCompactionThreshold is a fake implementation of RemoveTopicCompactionThreshold
105+
func (d *DummyPulsarAdmin) RemoveTopicCompactionThreshold(string, *bool) error {
106+
return nil
107+
}
108+
109+
func (d *DummyPulsarAdmin) RemoveTopicMessageTTL(string, *bool) error {
110+
return nil
111+
}
112+
113+
func (d *DummyPulsarAdmin) RemoveTopicMaxProducers(string, *bool) error {
114+
return nil
115+
}
116+
117+
func (d *DummyPulsarAdmin) RemoveTopicMaxConsumers(string, *bool) error {
118+
return nil
119+
}
120+
121+
func (d *DummyPulsarAdmin) RemoveTopicMaxUnackedMessagesPerConsumer(string, *bool) error {
122+
return nil
123+
}
124+
125+
func (d *DummyPulsarAdmin) RemoveTopicMaxUnackedMessagesPerSubscription(string, *bool) error {
126+
return nil
127+
}
128+
129+
func (d *DummyPulsarAdmin) RemoveTopicRetention(string, *bool) error {
130+
return nil
131+
}
132+
133+
func (d *DummyPulsarAdmin) RemoveTopicBacklogQuota(string, *bool, string) error {
134+
return nil
135+
}
136+
137+
func (d *DummyPulsarAdmin) RemoveTopicDeduplicationStatus(string, *bool) error {
138+
return nil
139+
}
140+
141+
func (d *DummyPulsarAdmin) RemoveTopicPersistence(string, *bool) error {
142+
return nil
143+
}
144+
145+
func (d *DummyPulsarAdmin) RemoveTopicDelayedDelivery(string, *bool) error {
146+
return nil
147+
}
148+
149+
func (d *DummyPulsarAdmin) RemoveTopicDispatchRate(string, *bool) error {
150+
return nil
151+
}
152+
153+
func (d *DummyPulsarAdmin) RemoveTopicPublishRate(string, *bool) error {
154+
return nil
155+
}
156+
157+
func (d *DummyPulsarAdmin) RemoveTopicInactiveTopicPolicies(string, *bool) error {
158+
return nil
159+
}
160+
161+
func (d *DummyPulsarAdmin) RemoveTopicSubscribeRate(string, *bool) error {
162+
return nil
163+
}
164+
165+
func (d *DummyPulsarAdmin) RemoveTopicMaxMessageSize(string, *bool) error {
166+
return nil
167+
}
168+
169+
func (d *DummyPulsarAdmin) RemoveTopicMaxConsumersPerSubscription(string, *bool) error {
170+
return nil
171+
}
172+
173+
func (d *DummyPulsarAdmin) RemoveTopicMaxSubscriptionsPerTopic(string, *bool) error {
174+
return nil
175+
}
176+
177+
func (d *DummyPulsarAdmin) RemoveTopicSchemaValidationEnforced(string, *bool) error {
178+
return nil
179+
}
180+
181+
func (d *DummyPulsarAdmin) RemoveTopicSubscriptionDispatchRate(string, *bool) error {
182+
return nil
183+
}
184+
185+
func (d *DummyPulsarAdmin) RemoveTopicReplicatorDispatchRate(string, *bool) error {
186+
return nil
187+
}
188+
189+
func (d *DummyPulsarAdmin) RemoveTopicDeduplicationSnapshotInterval(string, *bool) error {
190+
return nil
191+
}
192+
193+
func (d *DummyPulsarAdmin) RemoveTopicOffloadPolicies(string, *bool) error {
194+
return nil
195+
}
196+
197+
func (d *DummyPulsarAdmin) RemoveTopicAutoSubscriptionCreation(string, *bool) error {
198+
return nil
199+
}
200+
201+
func (d *DummyPulsarAdmin) RemoveTopicSchemaCompatibilityStrategy(string, *bool) error {
202+
return nil
203+
}
204+
205+
func (d *DummyPulsarAdmin) RemoveTopicProperty(string, *bool, string) error {
206+
return nil
207+
}
208+
99209
// Close is a fake implements of Close
100210
func (d *DummyPulsarAdmin) Close() error {
101211
return nil

0 commit comments

Comments
 (0)