diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 30fef55ece3bd..e63c7d2b286d3 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1241,7 +1241,12 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece @FieldContext( dynamic = true, category = CATEGORY_POLICIES, - doc = "Default interval to publish usage reports if resourceUsagePublishToTopic is enabled." + doc = "Interval (in seconds) for ResourceGroupService periodic tasks while resource groups are actively " + + "attached to tenants or namespaces. Periodic tasks start automatically when the first attachment " + + "is registered and stop automatically when no attachments remain. " + + "If a ResourceUsageTransportManager is configured (see resourceUsageTransportClassName), " + + "this interval also controls how frequently, usage reports are published for cross-broker " + + "coordination. Dynamic changes take effect at runtime and reschedule any running tasks." ) private int resourceUsageTransportPublishIntervalInSecs = 60; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java index 29633ab19feff..379f9f870ea70 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java @@ -213,6 +213,9 @@ public void registerTenant(String resourceGroupName, String tenantName) throws P // Associate this tenant name with the RG. this.tenantToRGsMap.put(tenantName, rg); rgTenantRegisters.labels(resourceGroupName).inc(); + + // Ensure schedulers are started if this is the first registration. + maybeStartSchedulers(); } /** @@ -235,6 +238,9 @@ public void unRegisterTenant(String resourceGroupName, String tenantName) throws // Dissociate this tenant name from the RG. this.tenantToRGsMap.remove(tenantName, rg); rgTenantUnRegisters.labels(resourceGroupName).inc(); + + // If this was the last registration (tenant or namespace), stop schedulers. + maybeStopSchedulersIfIdle(); } /** @@ -266,6 +272,9 @@ public void registerNameSpace(String resourceGroupName, NamespaceName fqNamespac // Associate this NS-name with the RG. this.namespaceToRGsMap.put(fqNamespaceName, rg); rgNamespaceRegisters.labels(resourceGroupName).inc(); + + // Ensure schedulers are started if this is the first registration. + maybeStartSchedulers(); } /** @@ -290,6 +299,9 @@ public void unRegisterNameSpace(String resourceGroupName, NamespaceName fqNamesp // Dissociate this NS-name from the RG. this.namespaceToRGsMap.remove(fqNamespaceName, rg); rgNamespaceUnRegisters.labels(resourceGroupName).inc(); + + // If this was the last registration (tenant or namespace), stop schedulers. + maybeStopSchedulersIfIdle(); } /** @@ -306,10 +318,16 @@ public ResourceGroup getNamespaceResourceGroup(NamespaceName namespaceName) { public void close() throws Exception { if (aggregateLocalUsagePeriodicTask != null) { aggregateLocalUsagePeriodicTask.cancel(true); + aggregateLocalUsagePeriodicTask = null; } if (calculateQuotaPeriodicTask != null) { calculateQuotaPeriodicTask.cancel(true); + calculateQuotaPeriodicTask = null; } + + // Ensure the flag is consistent with the stopped state. + schedulersRunning.set(false); + resourceGroupsMap.clear(); tenantToRGsMap.clear(); namespaceToRGsMap.clear(); @@ -540,6 +558,9 @@ protected static Summary.Child.Value getRgQuotaCalculationTime() { // Periodically aggregate the usage from all topics known to the BrokerService. // Visibility for unit testing. protected void aggregateResourceGroupLocalUsages() { + if (!shouldRunPeriodicTasks()) { + return; + } final Summary.Timer aggrUsageTimer = rgUsageAggregationLatency.startTimer(); BrokerService bs = this.pulsar.getBrokerService(); Map topicStatsMap = bs.getTopicStats(); @@ -578,7 +599,7 @@ protected void aggregateResourceGroupLocalUsages() { // cancel and re-schedule this task if the period of execution has changed. ServiceConfiguration config = pulsar.getConfiguration(); long newPeriodInSeconds = config.getResourceUsageTransportPublishIntervalInSecs(); - if (newPeriodInSeconds != this.aggregateLocalUsagePeriodInSeconds) { + if (schedulersRunning.get() && newPeriodInSeconds != this.aggregateLocalUsagePeriodInSeconds) { if (this.aggregateLocalUsagePeriodicTask == null) { log.error("aggregateResourceGroupLocalUsages: Unable to find running task to cancel when " + "publish period changed from {} to {} {}", @@ -602,6 +623,9 @@ protected void aggregateResourceGroupLocalUsages() { // from the reports received from other brokers. // [Visibility for unit testing.] protected void calculateQuotaForAllResourceGroups() { + if (!shouldRunPeriodicTasks()) { + return; + } // Calculate the quota for the next window for this RG, based on the observed usage. final Summary.Timer quotaCalcTimer = rgQuotaCalculationLatency.startTimer(); BytesAndMessagesCount updatedQuota = new BytesAndMessagesCount(); @@ -668,7 +692,7 @@ protected void calculateQuotaForAllResourceGroups() { // cancel and re-schedule this task if the period of execution has changed. ServiceConfiguration config = pulsar.getConfiguration(); long newPeriodInSeconds = config.getResourceUsageTransportPublishIntervalInSecs(); - if (newPeriodInSeconds != this.resourceUsagePublishPeriodInSeconds) { + if (schedulersRunning.get() && newPeriodInSeconds != this.resourceUsagePublishPeriodInSeconds) { if (this.calculateQuotaPeriodicTask == null) { log.error("calculateQuotaForAllResourceGroups: Unable to find running task to cancel when " + "publish period changed from {} to {} {}", @@ -690,23 +714,72 @@ protected void calculateQuotaForAllResourceGroups() { } } - private void initialize() { - ServiceConfiguration config = this.pulsar.getConfiguration(); - long periodInSecs = config.getResourceUsageTransportPublishIntervalInSecs(); - this.aggregateLocalUsagePeriodInSeconds = this.resourceUsagePublishPeriodInSeconds = periodInSecs; - this.aggregateLocalUsagePeriodicTask = this.pulsar.getExecutor().scheduleAtFixedRate( + // Returns true if at least one tenant or namespace is registered to resource group. + private boolean hasActiveResourceGroups() { + return !tenantToRGsMap.isEmpty() || !namespaceToRGsMap.isEmpty(); + } + + /** + * Whether the periodic ResourceGroupService tasks (aggregation & quota calculation) should run. + * True only when: + * 1. the scheduler flag is set, + * 2. at least one Resource Group exists locally, and + * 3. at least one tenant or namespace is registered to Resource Group. + */ + private boolean shouldRunPeriodicTasks() { + return schedulersRunning.get() + && !resourceGroupsMap.isEmpty() + && hasActiveResourceGroups(); + } + + // Start periodic aggregation/quota schedulers if we actually need them. + private void maybeStartSchedulers() { + if (!hasActiveResourceGroups()) { + return; + } + if (schedulersRunning.compareAndSet(false, true)) { + final long periodInSecs = pulsar.getConfiguration().getResourceUsageTransportPublishIntervalInSecs(); + this.aggregateLocalUsagePeriodInSeconds = this.resourceUsagePublishPeriodInSeconds = periodInSecs; + this.aggregateLocalUsagePeriodicTask = pulsar.getExecutor().scheduleAtFixedRate( catchingAndLoggingThrowables(this::aggregateResourceGroupLocalUsages), - periodInSecs, - periodInSecs, - this.timeUnitScale); - this.calculateQuotaPeriodicTask = this.pulsar.getExecutor().scheduleAtFixedRate( + periodInSecs, periodInSecs, timeUnitScale); + this.calculateQuotaPeriodicTask = pulsar.getExecutor().scheduleAtFixedRate( catchingAndLoggingThrowables(this::calculateQuotaForAllResourceGroups), - periodInSecs, - periodInSecs, - this.timeUnitScale); - maxIntervalForSuppressingReportsMSecs = - TimeUnit.SECONDS.toMillis(this.resourceUsagePublishPeriodInSeconds) * MaxUsageReportSuppressRounds; + periodInSecs, periodInSecs, timeUnitScale); + maxIntervalForSuppressingReportsMSecs = + TimeUnit.SECONDS.toMillis(this.resourceUsagePublishPeriodInSeconds) * MaxUsageReportSuppressRounds; + if (log.isInfoEnabled()) { + log.info("Started ResourceGroupService periodic tasks with period={} {}", periodInSecs, timeUnitScale); + } + } + } + // Stop schedulers when no tenant or namespace registrations remain. + private void maybeStopSchedulersIfIdle() { + if (hasActiveResourceGroups()) { + return; + } + if (schedulersRunning.compareAndSet(true, false)) { + if (aggregateLocalUsagePeriodicTask != null) { + aggregateLocalUsagePeriodicTask.cancel(true); + aggregateLocalUsagePeriodicTask = null; + } + if (calculateQuotaPeriodicTask != null) { + calculateQuotaPeriodicTask.cancel(true); + calculateQuotaPeriodicTask = null; + } + if (log.isInfoEnabled()) { + log.info("Stopped ResourceGroupService periodic tasks because no registrations remain"); + } + } + } + private void initialize() { + // Store the configured interval. Do not start periodic tasks unconditionally here. + // Schedulers are started by maybeStartSchedulers() when the first tenant/namespace is registered. + final long periodInSecs = pulsar.getConfiguration().getResourceUsageTransportPublishIntervalInSecs(); + this.aggregateLocalUsagePeriodInSeconds = this.resourceUsagePublishPeriodInSeconds = periodInSecs; + // if any tenant/namespace registrations already exist, maybeStartSchedulers() will start the schedulers now. + maybeStartSchedulers(); } private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policies.data.ResourceGroup rgConfig) @@ -761,6 +834,9 @@ private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policie // Allow a pluggable scale on time units; for testing periodic functionality. private TimeUnit timeUnitScale; + private final java.util.concurrent.atomic.AtomicBoolean schedulersRunning = + new java.util.concurrent.atomic.AtomicBoolean(false); + // The maximum number of successive rounds that we can suppress reporting local usage, because there was no // substantial change from the prior round. This is to ensure the reporting does not become too chatty. // Set this value to one more than the cadence of sending reports; e.g., if you want to send every 3rd report, @@ -864,4 +940,9 @@ ScheduledFuture getAggregateLocalUsagePeriodicTask() { ScheduledFuture getCalculateQuotaPeriodicTask() { return this.calculateQuotaPeriodicTask; } + + @VisibleForTesting + boolean isSchedulersRunning() { + return schedulersRunning.get(); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java index 63b483cef5e52..534492894cfba 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java @@ -228,9 +228,6 @@ public void testResourceGroupOps() throws PulsarAdminException, InterruptedExcep Assert.assertTrue(myBoolSet.contains(false)); } - rgs.unRegisterTenant(rgName, tenantName); - rgs.unRegisterNameSpace(rgName, tenantAndNamespace); - BytesAndMessagesCount publishQuota = rgs.getPublishRateLimiters(rgName); // Calculated quota is synthetically set to the number of quota-calculation callbacks. @@ -241,7 +238,7 @@ public void testResourceGroupOps() throws PulsarAdminException, InterruptedExcep Assert.assertEquals(publishQuota.bytes, rgConfig.getPublishRateInBytes().longValue()); } - // Calculate the quota synchronously to avoid waiting for a periodic call within ResourceGroupService. + // Calculate the quota synchronously while an attachment still exists rgs.calculateQuotaForAllResourceGroups(); publishQuota = rgs.getPublishRateLimiters(rgName); // The bytes/messages are (synthetically) set from numAnonymousQuotaCalculations in the above round of @@ -250,18 +247,57 @@ public void testResourceGroupOps() throws PulsarAdminException, InterruptedExcep Assert.assertTrue(publishQuota.messages > 0 && publishQuota.messages <= numAnonymousQuotaCalculations); Assert.assertTrue(publishQuota.bytes > 0 && publishQuota.bytes <= numAnonymousQuotaCalculations); + // Now it is safe to detach. After this point the service is intentionally idle. + rgs.unRegisterTenant(rgName, tenantName); + rgs.unRegisterNameSpace(rgName, tenantAndNamespace); + rgs.resourceGroupDelete(rgName); Assert.assertThrows(PulsarAdminException.class, () -> rgs.getPublishRateLimiters(rgName)); Assert.assertEquals(rgs.getNumResourceGroups(), 0); } - @Test + /** + * Validates that ResourceGroupService#close() cancels scheduled tasks, clears futures and state. + * Steps: + * 1) Start periodic tasks by creating a resource group and attaching a namespace. + * 2) Assert both futures are non-null (tasks are scheduled) and the schedulersRunning flag is true. + * 3) Let try-with-resources close the service, then assert both futures are null, schedulersRunning is false, + * and the resource group map is cleared. + */ + @Test(timeOut = 60000) public void testClose() throws Exception { - ResourceGroupService service = new ResourceGroupService(pulsar, TimeUnit.MILLISECONDS, null, null); - service.close(); - Assert.assertTrue(service.getAggregateLocalUsagePeriodicTask().isCancelled()); - Assert.assertTrue(service.getCalculateQuotaPeriodicTask().isCancelled()); + final String rg = "rg-close"; + final NamespaceName ns = NamespaceName.get("t-close/ns-close"); + + // Create the service once so we can assert state after the try-with-resources closes it. + ResourceGroupService service = createResourceGroupService(); + try (ResourceGroupService ignored = service) { + // Start the periodic tasks: create RG and attach a namespace. + service.resourceGroupCreate(rg, new org.apache.pulsar.common.policies.data.ResourceGroup()); + service.registerNameSpace(rg, ns); + + // Ensure tasks are started + Assert.assertNotNull(service.getAggregateLocalUsagePeriodicTask(), + "Aggregate task should be scheduled."); + Assert.assertNotNull(service.getCalculateQuotaPeriodicTask(), + "Quota task should be scheduled."); + Assert.assertTrue(service.isSchedulersRunning(), + "SchedulersRunning flag should be true when tasks are active."); + + // Do not call service.close() here. The try-with-resources will close it and ensure cleanup. + } + + // After the try-with-resources block, service.close() has been invoked automatically. + // Postconditions: futures cleared to null, internal state cleared, flag off. + Assert.assertNull(service.getAggregateLocalUsagePeriodicTask(), + "Aggregate task future must be null after close()."); + Assert.assertNull(service.getCalculateQuotaPeriodicTask(), + "Quota task future must be null after close()."); + Assert.assertEquals(service.getNumResourceGroups(), 0, + "Resource group map should be cleared by close()."); + Assert.assertFalse(service.isSchedulersRunning(), + "SchedulersRunning flag should be false after close()."); } private ResourceGroupService rgs; @@ -274,4 +310,252 @@ private void prepareData() throws PulsarAdminException { this.conf.setResourceUsageTransportPublishIntervalInSecs(PUBLISH_INTERVAL_SECS); admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build()); } + /** + * Helper method to create a fresh ResourceGroupService instance for testing. + * Each test should create its own instance to ensure isolation. + */ + private ResourceGroupService createResourceGroupService() { + return new ResourceGroupService(pulsar); + } + + /** + * Validates the lazy scheduling lifecycle and deterministic rescheduling of ResourceGroupService. + * Asserts that: + * 1) On cold start, and after creating a Resource Group (RG) without any attachments, + * no periodic tasks are scheduled. + * 2) Registering the first attachment (tenant or namespace) starts both periodic tasks. + * 3) Updating the publish interval causes rescheduling + * - calling aggregateResourceGroupLocalUsages() reschedules only the aggregation task; + * - calling calculateQuotaForAllResourceGroups() reschedules only the quota-calculation task. + * 4) When the last attachment is unregistered (i.e., no tenants or namespaces remain attached to any RG), + * both periodic tasks are cancelled and their ScheduledFuture fields are cleared. + */ + + @Test(timeOut = 60000) + public void testLazyStartStopAndReschedule() throws Exception { + final String rgName = "rg-lazy"; + final NamespaceName ns = NamespaceName.get("t-lazy/ns-lazy"); + final int oldInterval = this.conf.getResourceUsageTransportPublishIntervalInSecs(); + + try (ResourceGroupService rgs = createResourceGroupService()) { + // Cold start: nothing scheduled + Assert.assertNull(rgs.getAggregateLocalUsagePeriodicTask(), + "Aggregate task should be null on cold start."); + Assert.assertNull(rgs.getCalculateQuotaPeriodicTask(), + "Quota task should be null on cold start."); + + // Create a resource group but do not attach yet. There should still be nothing scheduled. + rgs.resourceGroupCreate(rgName, new org.apache.pulsar.common.policies.data.ResourceGroup()); + Assert.assertNull(rgs.getAggregateLocalUsagePeriodicTask(), + "Aggregate task should remain null without attachments."); + Assert.assertNull(rgs.getCalculateQuotaPeriodicTask(), + "Quota task should remain null without attachments."); + + // Attach a namespace. Both schedulers must start. + rgs.registerNameSpace(rgName, ns); + Assert.assertNotNull(rgs.getAggregateLocalUsagePeriodicTask(), + "Aggregate task should start on first attachment."); + Assert.assertNotNull(rgs.getCalculateQuotaPeriodicTask(), + "Quota task should start on first attachment."); + Assert.assertFalse(rgs.getAggregateLocalUsagePeriodicTask().isCancelled(), + "Aggregate task should be running."); + Assert.assertFalse(rgs.getCalculateQuotaPeriodicTask().isCancelled(), + "Quota task should be running."); + + // Capture current scheduled futures + java.util.concurrent.ScheduledFuture oldAgg = rgs.getAggregateLocalUsagePeriodicTask(); + java.util.concurrent.ScheduledFuture oldCalc = rgs.getCalculateQuotaPeriodicTask(); + + // Change publish interval dynamically + int newPeriod = oldInterval + 1; + this.conf.setResourceUsageTransportPublishIntervalInSecs(newPeriod); + + // Trigger aggregate reschedule + rgs.aggregateResourceGroupLocalUsages(); + java.util.concurrent.ScheduledFuture midAgg = rgs.getAggregateLocalUsagePeriodicTask(); + java.util.concurrent.ScheduledFuture midCalc = rgs.getCalculateQuotaPeriodicTask(); + + Assert.assertNotSame(midAgg, oldAgg, "Aggregate task should be rescheduled with a new future."); + Assert.assertTrue(oldAgg.isCancelled(), "Old aggregate task should be cancelled on reschedule."); + Assert.assertSame(midCalc, oldCalc, "Quota task should not be rescheduled by aggregate path."); + Assert.assertFalse(oldCalc.isCancelled(), + "Old quota task should still be active before its own reschedule."); + Assert.assertFalse(midAgg.isCancelled(), "New aggregate task should be active."); + + // Now trigger calculate reschedule + rgs.calculateQuotaForAllResourceGroups(); + java.util.concurrent.ScheduledFuture newAgg = rgs.getAggregateLocalUsagePeriodicTask(); + java.util.concurrent.ScheduledFuture newCalc = rgs.getCalculateQuotaPeriodicTask(); + + Assert.assertSame(newAgg, midAgg, + "Aggregate task was already rescheduled. Future should remain the same."); + Assert.assertNotSame(newCalc, oldCalc, "Quota task should be rescheduled with a new future."); + Assert.assertTrue(oldCalc.isCancelled(), "Old quota task should be cancelled on its reschedule."); + Assert.assertFalse(newCalc.isCancelled(), "New quota task should be active."); + + // Detach the last attachment. Schedulers must stop and futures should be cleared. + rgs.unRegisterNameSpace(rgName, ns); + Assert.assertNull(rgs.getAggregateLocalUsagePeriodicTask(), + "Aggregate task should be cleared after last detach."); + Assert.assertNull(rgs.getCalculateQuotaPeriodicTask(), + "Quota task should be cleared after last detach."); + + // Cleanup resource group + rgs.resourceGroupDelete(rgName); + Assert.assertEquals(rgs.getNumResourceGroups(), 0, "Resource group should be deleted."); + } finally { + this.conf.setResourceUsageTransportPublishIntervalInSecs(oldInterval); + } + } + + /** + * Verifies that creating a ResourceGroup without tenant or namespace attachments + * does NOT trigger scheduler initialization. This ensures the lazy-start optimization + * correctly avoids unnecessary overhead when ResourceGroups are configured but unused. + */ + @Test(timeOut = 60000) + public void testNoStartOnRGCreateOnly() throws Exception { + final String rg = "rg-create-only"; + final int oldInterval = this.conf.getResourceUsageTransportPublishIntervalInSecs(); + + try (ResourceGroupService rgs = createResourceGroupService()) { + // No tasks at cold start + Assert.assertNull(rgs.getAggregateLocalUsagePeriodicTask(), + "Aggregate task should be null at cold start."); + Assert.assertNull(rgs.getCalculateQuotaPeriodicTask(), "Quota task should be null at cold start."); + + // Create resource group without attachments. There should still be nothing scheduled. + rgs.resourceGroupCreate(rg, new org.apache.pulsar.common.policies.data.ResourceGroup()); + Assert.assertNull(rgs.getAggregateLocalUsagePeriodicTask(), + "Aggregate task should remain null without attachments."); + Assert.assertNull(rgs.getCalculateQuotaPeriodicTask(), + "Quota task should remain null without attachments."); + + // Calling periodic methods directly must not create schedulers + rgs.aggregateResourceGroupLocalUsages(); + rgs.calculateQuotaForAllResourceGroups(); + Assert.assertNull(rgs.getAggregateLocalUsagePeriodicTask(), + "Aggregate task should remain null after direct calls."); + Assert.assertNull(rgs.getCalculateQuotaPeriodicTask(), + "Quota task should remain null after direct calls."); + + // Dynamic config while stopped must not trigger scheduling + this.conf.setResourceUsageTransportPublishIntervalInSecs(oldInterval + 7); + rgs.aggregateResourceGroupLocalUsages(); + rgs.calculateQuotaForAllResourceGroups(); + Assert.assertNull(rgs.getAggregateLocalUsagePeriodicTask(), + "Aggregate task should remain null after config change while stopped."); + Assert.assertNull(rgs.getCalculateQuotaPeriodicTask(), + "Quota task should remain null after config change while stopped."); + + // Cleanup resource group + rgs.resourceGroupDelete(rg); + Assert.assertEquals(rgs.getNumResourceGroups(), 0, "Resource group should be deleted."); + } finally { + this.conf.setResourceUsageTransportPublishIntervalInSecs(oldInterval); + } + } + + /** + * Validates that attaching a tenant (without namespace attachment) to a ResourceGroup + * triggers scheduler initialization, and that detaching the last tenant stops the schedulers. + */ + @Test(timeOut = 60000) + public void testStartOnTenantAttachment() throws Exception { + final String rg = "rg-tenant-only"; + final String tenant = "t-attach"; + + try (ResourceGroupService rgs = createResourceGroupService()) { + rgs.resourceGroupCreate(rg, new org.apache.pulsar.common.policies.data.ResourceGroup()); + rgs.registerTenant(rg, tenant); + + Assert.assertNotNull(rgs.getAggregateLocalUsagePeriodicTask(), + "Aggregate task should start on first tenant attachment."); + Assert.assertNotNull(rgs.getCalculateQuotaPeriodicTask(), + "Quota task should start on first tenant attachment."); + Assert.assertFalse(rgs.getAggregateLocalUsagePeriodicTask().isCancelled(), + "Aggregate task should be running."); + Assert.assertFalse(rgs.getCalculateQuotaPeriodicTask().isCancelled(), + "Quota task should be running."); + + // Detach and ensure schedulers stop + rgs.unRegisterTenant(rg, tenant); + Assert.assertNull(rgs.getAggregateLocalUsagePeriodicTask(), + "Aggregate task should be cleared after last detach."); + Assert.assertNull(rgs.getCalculateQuotaPeriodicTask(), + "Quota task should be cleared after last detach."); + + rgs.resourceGroupDelete(rg); + Assert.assertEquals(rgs.getNumResourceGroups(), 0, "Resource group should be deleted."); + } + } + + /** + * Tests scheduler lifecycle when a ResourceGroup has both tenant and namespace attachments. + * Verifies that schedulers remain active as long as ANY attachment exists, and only stop + * when the last attachment (tenant or namespace) is removed. + */ + @Test(timeOut = 60000) + public void testStopOnLastDetachWithMixedRefs() throws Exception { + final String rg = "rg-mixed"; + final String tenant = "t-mixed"; + final NamespaceName ns = NamespaceName.get("t-mixed/ns1"); + + try (ResourceGroupService rgs = createResourceGroupService()) { + rgs.resourceGroupCreate(rg, new org.apache.pulsar.common.policies.data.ResourceGroup()); + + // Attach both a tenant and a namespace + rgs.registerTenant(rg, tenant); + rgs.registerNameSpace(rg, ns); + + Assert.assertNotNull(rgs.getAggregateLocalUsagePeriodicTask(), "Aggregate task should be started."); + Assert.assertNotNull(rgs.getCalculateQuotaPeriodicTask(), "Quota task should be started."); + + // Remove one reference. Tasks should still be present. + rgs.unRegisterTenant(rg, tenant); + Assert.assertNotNull(rgs.getAggregateLocalUsagePeriodicTask(), + "Aggregate task should remain with remaining namespace attachment."); + Assert.assertNotNull(rgs.getCalculateQuotaPeriodicTask(), + "Quota task should remain with remaining namespace attachment."); + + // Remove last reference. Tasks should stop. + rgs.unRegisterNameSpace(rg, ns); + Assert.assertNull(rgs.getAggregateLocalUsagePeriodicTask(), + "Aggregate task should be cleared after last detach."); + Assert.assertNull(rgs.getCalculateQuotaPeriodicTask(), + "Quota task should be cleared after last detach."); + + rgs.resourceGroupDelete(rg); + Assert.assertEquals(rgs.getNumResourceGroups(), 0, "Resource group should be deleted."); + } + } + + /** + * Ensures that dynamic configuration changes to the publish interval do NOT cause + * scheduler initialization when no attachments exist. This validates the guard logic + * that prevents spurious rescheduling attempts on stopped schedulers. + */ + @Test(timeOut = 60000) + public void testNoRescheduleWhenStopped() throws Exception { + final int oldInterval = this.conf.getResourceUsageTransportPublishIntervalInSecs(); + + try (ResourceGroupService rgs = createResourceGroupService()) { + // Ensure stopped state + Assert.assertNull(rgs.getAggregateLocalUsagePeriodicTask(), "Aggregate task should be null."); + Assert.assertNull(rgs.getCalculateQuotaPeriodicTask(), "Quota task should be null."); + + // Change interval while stopped + this.conf.setResourceUsageTransportPublishIntervalInSecs(oldInterval + 13); + + // Call both periodic methods directly. Futures must remain null. + rgs.aggregateResourceGroupLocalUsages(); + rgs.calculateQuotaForAllResourceGroups(); + Assert.assertNull(rgs.getAggregateLocalUsagePeriodicTask(), "Aggregate task should remain null."); + Assert.assertNull(rgs.getCalculateQuotaPeriodicTask(), "Quota task should remain null."); + } finally { + this.conf.setResourceUsageTransportPublishIntervalInSecs(oldInterval); + } + } + + } \ No newline at end of file