Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1241,7 +1241,12 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
@FieldContext(
dynamic = true,
category = CATEGORY_POLICIES,
doc = "Default interval to publish usage reports if resourceUsagePublishToTopic is enabled."
doc = "Interval (in seconds) for ResourceGroupService periodic tasks while resource groups are actively "
+ "attached to tenants or namespaces. Periodic tasks start automatically when the first attachment "
+ "is registered and stop automatically when no attachments remain. "
+ "If a ResourceUsageTransportManager is configured (see resourceUsageTransportClassName), "
+ "this interval also controls how frequently, usage reports are published for cross-broker "
+ "coordination. Dynamic changes take effect at runtime and reschedule any running tasks."
)
private int resourceUsageTransportPublishIntervalInSecs = 60;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ public void registerTenant(String resourceGroupName, String tenantName) throws P
// Associate this tenant name with the RG.
this.tenantToRGsMap.put(tenantName, rg);
rgTenantRegisters.labels(resourceGroupName).inc();

// First attachment may have just appeared, ensure schedulers start.
maybeStartSchedulers();
}

/**
Expand All @@ -235,6 +238,9 @@ public void unRegisterTenant(String resourceGroupName, String tenantName) throws
// Dissociate this tenant name from the RG.
this.tenantToRGsMap.remove(tenantName, rg);
rgTenantUnRegisters.labels(resourceGroupName).inc();

// If this was the last attachment (tenant or namespace), stop schedulers.
maybeStopSchedulersIfIdle();
}

/**
Expand Down Expand Up @@ -266,6 +272,9 @@ public void registerNameSpace(String resourceGroupName, NamespaceName fqNamespac
// Associate this NS-name with the RG.
this.namespaceToRGsMap.put(fqNamespaceName, rg);
rgNamespaceRegisters.labels(resourceGroupName).inc();

// First attachment may have just appeared, ensure schedulers start.
maybeStartSchedulers();
}

/**
Expand All @@ -290,6 +299,9 @@ public void unRegisterNameSpace(String resourceGroupName, NamespaceName fqNamesp
// Dissociate this NS-name from the RG.
this.namespaceToRGsMap.remove(fqNamespaceName, rg);
rgNamespaceUnRegisters.labels(resourceGroupName).inc();

// If this was the last attachment (tenant or namespace), stop schedulers.
maybeStopSchedulersIfIdle();
}

/**
Expand All @@ -306,10 +318,16 @@ public ResourceGroup getNamespaceResourceGroup(NamespaceName namespaceName) {
public void close() throws Exception {
if (aggregateLocalUsagePeriodicTask != null) {
aggregateLocalUsagePeriodicTask.cancel(true);
aggregateLocalUsagePeriodicTask = null;
}
if (calculateQuotaPeriodicTask != null) {
calculateQuotaPeriodicTask.cancel(true);
calculateQuotaPeriodicTask = null;
}

// Ensure the flag is consistent with the stopped state.
schedulersRunning.set(false);

resourceGroupsMap.clear();
tenantToRGsMap.clear();
namespaceToRGsMap.clear();
Expand Down Expand Up @@ -540,6 +558,12 @@ protected static Summary.Child.Value getRgQuotaCalculationTime() {
// Periodically aggregate the usage from all topics known to the BrokerService.
// Visibility for unit testing.
protected void aggregateResourceGroupLocalUsages() {
if (!schedulersRunning.get() || resourceGroupsMap.isEmpty()) {
return;
}
if (tenantToRGsMap.isEmpty() && namespaceToRGsMap.isEmpty()) {
return;
}
final Summary.Timer aggrUsageTimer = rgUsageAggregationLatency.startTimer();
BrokerService bs = this.pulsar.getBrokerService();
Map<String, TopicStatsImpl> topicStatsMap = bs.getTopicStats();
Expand Down Expand Up @@ -578,7 +602,7 @@ protected void aggregateResourceGroupLocalUsages() {
// cancel and re-schedule this task if the period of execution has changed.
ServiceConfiguration config = pulsar.getConfiguration();
long newPeriodInSeconds = config.getResourceUsageTransportPublishIntervalInSecs();
if (newPeriodInSeconds != this.aggregateLocalUsagePeriodInSeconds) {
if (schedulersRunning.get() && newPeriodInSeconds != this.aggregateLocalUsagePeriodInSeconds) {
if (this.aggregateLocalUsagePeriodicTask == null) {
log.error("aggregateResourceGroupLocalUsages: Unable to find running task to cancel when "
+ "publish period changed from {} to {} {}",
Expand All @@ -602,6 +626,12 @@ protected void aggregateResourceGroupLocalUsages() {
// from the reports received from other brokers.
// [Visibility for unit testing.]
protected void calculateQuotaForAllResourceGroups() {
if (!schedulersRunning.get() || resourceGroupsMap.isEmpty()) {
return;
}
if (tenantToRGsMap.isEmpty() && namespaceToRGsMap.isEmpty()) {
return;
}
// Calculate the quota for the next window for this RG, based on the observed usage.
final Summary.Timer quotaCalcTimer = rgQuotaCalculationLatency.startTimer();
BytesAndMessagesCount updatedQuota = new BytesAndMessagesCount();
Expand Down Expand Up @@ -668,7 +698,7 @@ protected void calculateQuotaForAllResourceGroups() {
// cancel and re-schedule this task if the period of execution has changed.
ServiceConfiguration config = pulsar.getConfiguration();
long newPeriodInSeconds = config.getResourceUsageTransportPublishIntervalInSecs();
if (newPeriodInSeconds != this.resourceUsagePublishPeriodInSeconds) {
if (schedulersRunning.get() && newPeriodInSeconds != this.resourceUsagePublishPeriodInSeconds) {
if (this.calculateQuotaPeriodicTask == null) {
log.error("calculateQuotaForAllResourceGroups: Unable to find running task to cancel when "
+ "publish period changed from {} to {} {}",
Expand All @@ -690,23 +720,60 @@ protected void calculateQuotaForAllResourceGroups() {
}
}

private void initialize() {
ServiceConfiguration config = this.pulsar.getConfiguration();
long periodInSecs = config.getResourceUsageTransportPublishIntervalInSecs();
this.aggregateLocalUsagePeriodInSeconds = this.resourceUsagePublishPeriodInSeconds = periodInSecs;
this.aggregateLocalUsagePeriodicTask = this.pulsar.getExecutor().scheduleAtFixedRate(
// True if at least one tenant or namespace is attached to any RG.
private boolean hasActiveAttachments() {
// Any tenant or namespace attachments imply the feature is "in use".
return !tenantToRGsMap.isEmpty() || !namespaceToRGsMap.isEmpty();
}

// Start periodic aggregation/quota schedulers if we actually need them.
private void maybeStartSchedulers() {
if (!hasActiveAttachments()) {
return;
}
if (schedulersRunning.compareAndSet(false, true)) {
final long periodInSecs = pulsar.getConfiguration().getResourceUsageTransportPublishIntervalInSecs();
this.aggregateLocalUsagePeriodInSeconds = this.resourceUsagePublishPeriodInSeconds = periodInSecs;
this.aggregateLocalUsagePeriodicTask = pulsar.getExecutor().scheduleAtFixedRate(
catchingAndLoggingThrowables(this::aggregateResourceGroupLocalUsages),
periodInSecs,
periodInSecs,
this.timeUnitScale);
this.calculateQuotaPeriodicTask = this.pulsar.getExecutor().scheduleAtFixedRate(
periodInSecs, periodInSecs, timeUnitScale);
this.calculateQuotaPeriodicTask = pulsar.getExecutor().scheduleAtFixedRate(
catchingAndLoggingThrowables(this::calculateQuotaForAllResourceGroups),
periodInSecs,
periodInSecs,
this.timeUnitScale);
maxIntervalForSuppressingReportsMSecs =
TimeUnit.SECONDS.toMillis(this.resourceUsagePublishPeriodInSeconds) * MaxUsageReportSuppressRounds;
periodInSecs, periodInSecs, timeUnitScale);
maxIntervalForSuppressingReportsMSecs =
TimeUnit.SECONDS.toMillis(this.resourceUsagePublishPeriodInSeconds) * MaxUsageReportSuppressRounds;
if (log.isInfoEnabled()) {
log.info("Started ResourceGroupService periodic tasks with period={} {}", periodInSecs, timeUnitScale);
}
}
}
// Stop schedulers when no attachments remain.
private void maybeStopSchedulersIfIdle() {
if (hasActiveAttachments()) {
return;
}
if (schedulersRunning.compareAndSet(true, false)) {
if (aggregateLocalUsagePeriodicTask != null) {
aggregateLocalUsagePeriodicTask.cancel(true);
aggregateLocalUsagePeriodicTask = null;
}
if (calculateQuotaPeriodicTask != null) {
calculateQuotaPeriodicTask.cancel(true);
calculateQuotaPeriodicTask = null;
}
if (log.isInfoEnabled()) {
log.info("Stopped ResourceGroupService periodic tasks due to no active attachments");
}
}
}

private void initialize() {
// Store the configured interval. Do not start periodic tasks unconditionally here.
// Schedulers are started by maybeStartSchedulers() when the first tenant/namespace attachment is registered.
final long periodInSecs = pulsar.getConfiguration().getResourceUsageTransportPublishIntervalInSecs();
this.aggregateLocalUsagePeriodInSeconds = this.resourceUsagePublishPeriodInSeconds = periodInSecs;
// if any tenant/namespace registrations already exist, maybeStartSchedulers() will start the schedulers now.
maybeStartSchedulers();
}

private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policies.data.ResourceGroup rgConfig)
Expand Down Expand Up @@ -761,6 +828,9 @@ private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policie
// Allow a pluggable scale on time units; for testing periodic functionality.
private TimeUnit timeUnitScale;

private final java.util.concurrent.atomic.AtomicBoolean schedulersRunning =
new java.util.concurrent.atomic.AtomicBoolean(false);

// The maximum number of successive rounds that we can suppress reporting local usage, because there was no
// substantial change from the prior round. This is to ensure the reporting does not become too chatty.
// Set this value to one more than the cadence of sending reports; e.g., if you want to send every 3rd report,
Expand Down Expand Up @@ -864,4 +934,9 @@ ScheduledFuture<?> getAggregateLocalUsagePeriodicTask() {
ScheduledFuture<?> getCalculateQuotaPeriodicTask() {
return this.calculateQuotaPeriodicTask;
}

@VisibleForTesting
boolean isSchedulersRunning() {
return schedulersRunning.get();
}
}
Loading
Loading