Skip to content

Adding client option to override shutdown timeout. #760

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@
import com.uber.m3.tally.Scope;
import com.uber.m3.util.ImmutableMap;
import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
@@ -37,6 +38,7 @@ public final class WorkflowClientOptions {
private static final WorkflowClientInterceptor[] EMPTY_INTERCEPTOR_ARRAY =
new WorkflowClientInterceptor[0];
private static final List<ContextPropagator> EMPTY_CONTEXT_PROPAGATORS = Arrays.asList();
private static final Duration DEFAULT_WORKER_SHUTDOWN_TIME = Duration.ofSeconds(10);

static {
DEFAULT_INSTANCE = new Builder().build();
@@ -62,6 +64,7 @@ public static final class Builder {
private String identity = ManagementFactory.getRuntimeMXBean().getName();;
private List<ContextPropagator> contextPropagators = EMPTY_CONTEXT_PROPAGATORS;
private QueryRejectCondition queryRejectCondition;
private Duration timeForWorkerShutdown = DEFAULT_WORKER_SHUTDOWN_TIME;

private Builder() {}

@@ -72,6 +75,7 @@ private Builder(WorkflowClientOptions options) {
metricsScope = options.getMetricsScope();
identity = options.getIdentity();
queryRejectCondition = options.getQueryRejectCondition();
timeForWorkerShutdown = options.getTimeForWorkerShutdown();
}

public Builder setDomain(String domain) {
@@ -139,6 +143,17 @@ public Builder setQueryRejectCondition(QueryRejectCondition queryRejectCondition
return this;
}

/**
* Time for worker shutdown is an optional field used as the amount of time alloted for handling
* worker shutdown after SIGTERM signal. Default is 10 seconds.
*
* @param timeForWorkerShutdown
*/
public Builder setTimeForWorkerShutdown(Duration timeForWorkerShutdown) {
this.timeForWorkerShutdown = timeForWorkerShutdown;
return this;
}

public WorkflowClientOptions build() {
metricsScope = metricsScope.tagged(ImmutableMap.of(MetricsTag.DOMAIN, domain));
return new WorkflowClientOptions(
@@ -148,7 +163,8 @@ public WorkflowClientOptions build() {
metricsScope,
identity,
contextPropagators,
queryRejectCondition);
queryRejectCondition,
timeForWorkerShutdown);
}
}

@@ -159,6 +175,7 @@ public WorkflowClientOptions build() {
private final String identity;
private final List<ContextPropagator> contextPropagators;
private final QueryRejectCondition queryRejectCondition;
private final Duration timeForWorkerShutdown;

private WorkflowClientOptions(
String domain,
@@ -167,14 +184,16 @@ private WorkflowClientOptions(
Scope metricsScope,
String identity,
List<ContextPropagator> contextPropagators,
QueryRejectCondition queryRejectCondition) {
QueryRejectCondition queryRejectCondition,
Duration timeForWorkerShutdown) {
this.domain = domain;
this.dataConverter = dataConverter;
this.interceptors = interceptors;
this.metricsScope = metricsScope;
this.identity = identity;
this.contextPropagators = contextPropagators;
this.queryRejectCondition = queryRejectCondition;
this.timeForWorkerShutdown = timeForWorkerShutdown;
}

public String getDomain() {
@@ -205,6 +224,10 @@ public QueryRejectCondition getQueryRejectCondition() {
return queryRejectCondition;
}

public Duration getTimeForWorkerShutdown() {
return timeForWorkerShutdown;
}

@Override
public String toString() {
return "WorkflowClientOptions{"
@@ -222,6 +245,8 @@ public String toString() {
+ contextPropagators
+ ", queryRejectCondition="
+ queryRejectCondition
+ ", timeForWorkerShutdown="
+ timeForWorkerShutdown
+ '}';
}

@@ -235,7 +260,8 @@ public boolean equals(Object o) {
&& Arrays.equals(interceptors, that.interceptors)
&& com.google.common.base.Objects.equal(identity, that.identity)
&& com.google.common.base.Objects.equal(contextPropagators, that.contextPropagators)
&& queryRejectCondition == that.queryRejectCondition;
&& queryRejectCondition == that.queryRejectCondition
&& com.google.common.base.Objects.equal(timeForWorkerShutdown, that.timeForWorkerShutdown);
}

@Override
@@ -246,6 +272,7 @@ public int hashCode() {
Arrays.hashCode(interceptors),
identity,
contextPropagators,
queryRejectCondition);
queryRejectCondition,
timeForWorkerShutdown);
}
}
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@

import com.uber.cadence.internal.common.InternalUtils;
import com.uber.cadence.worker.WorkerFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -27,7 +28,7 @@ public class WorkerShutDownHandler {
private static final List<WorkerFactory> workerFactories = new ArrayList<>();
private static Thread registeredHandler;

public static void registerHandler() {
public static void registerHandler(Duration workerShutdownTimeout) {
if (registeredHandler != null) {
return;
}
@@ -44,10 +45,13 @@ public void run() {
workerFactory.shutdownNow();
}

long remainingTimeout = 10000;
long remainingTimeoutMillis =
TimeUnit.SECONDS.toMillis(workerShutdownTimeout.getSeconds())
+ TimeUnit.NANOSECONDS.toMillis(workerShutdownTimeout.getNano());

for (WorkerFactory workerFactory : workerFactories) {
final long timeoutMillis = remainingTimeout;
remainingTimeout =
final long timeoutMillis = remainingTimeoutMillis;
remainingTimeoutMillis =
InternalUtils.awaitTermination(
timeoutMillis,
() -> workerFactory.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS));
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.uber.cadence.internal.worker.autoscaler;

import java.time.Duration;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PollerAutoScaler {

private static final Logger LOGGER = LoggerFactory.getLogger(PollerAutoScaler.class);

private Duration coolDownTime;
private PollerUsageEstimator pollerUsageEstimator;
private Recommender recommender;
private ResizableSemaphore semaphore;
private int semaphoreSize;
private boolean shutingDown;

public PollerAutoScaler(
Duration coolDownTime, PollerUsageEstimator pollerUsageEstimator, Recommender recommender) {
this.coolDownTime = coolDownTime;
this.pollerUsageEstimator = pollerUsageEstimator;
this.recommender = recommender;
this.semaphore = new ResizableSemaphore(recommender.getUpperValue());
this.semaphoreSize = recommender.getUpperValue();
}

public void start() {
Executors.newSingleThreadExecutor()
.submit(
new Runnable() {
@Override
public void run() {
while (!shutingDown) {
try {
Thread.sleep(coolDownTime.toMillis());
if (!shutingDown) {
resizePollers();
}
} catch (InterruptedException e) {
}
}
}
});
}

public void stop() {
LOGGER.info("shutting down poller autoscaler");
shutingDown = true;
}

protected void resizePollers() {
PollerUsage pollerUsage = pollerUsageEstimator.estimate();
int pollerCount =
recommender.recommend(this.semaphoreSize, pollerUsage.getPollerUtilizationRate());

int diff = this.semaphoreSize - pollerCount;
if (diff < 0) {
semaphore.release(diff * -1);
} else {
semaphore.decreasePermits(diff);
}

LOGGER.info(String.format("resized pollers to: %d", pollerCount));
this.semaphoreSize = pollerCount;
}

public int getLowerPollerAmount() {
return recommender.getLowerValue();
}

public int getUpperPollerAmount() {
return recommender.getUpperValue();
}

public PollerUsageEstimator getPollerUsageEstimator() {
return pollerUsageEstimator;
}

public Recommender getRecommender() {
return recommender;
}

public void acquire() throws InterruptedException {
semaphore.acquire();
}

public void release() {
semaphore.release();
}

// For testing
protected int getSemaphoreSize() {
return semaphoreSize;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.uber.cadence.internal.worker.autoscaler;

public class PollerUsage {

private final float pollerUtilizationRate;

public PollerUsage(float pollerUtilizationRate) {
this.pollerUtilizationRate = pollerUtilizationRate;
}

public float getPollerUtilizationRate() {
return pollerUtilizationRate;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.uber.cadence.internal.worker.autoscaler;

public class PollerUsageEstimator {

private int noopTaskCount;
private int actionableTaskCount;

public void increaseNoopTaskCount() {
noopTaskCount += 1;
}

public void increaseActionableTaskCount() {
actionableTaskCount += 1;
}

public PollerUsage estimate() {
if (noopTaskCount + actionableTaskCount == 0) {
return new PollerUsage(0);
}
PollerUsage result =
new PollerUsage((actionableTaskCount * 1f) / (noopTaskCount + actionableTaskCount));
reset();
return result;
}

public void reset() {
noopTaskCount = 0;
actionableTaskCount = 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.uber.cadence.internal.worker.autoscaler;

public class Recommender {

private final float targetPollerUtilRate;
private final int upperValue;
private final int lowerValue;

public Recommender(float targetPollerUtilRate, int upperValue, int lowerValue) {
this.targetPollerUtilRate = targetPollerUtilRate;
this.upperValue = upperValue;
this.lowerValue = lowerValue;
}

public int recommend(int currentPollers, float pollerUtilizationRate) {
int recommended = 0;

if (pollerUtilizationRate == 1) {
return upperValue;
}

float r = currentPollers * pollerUtilizationRate / targetPollerUtilRate;
r = Math.min(upperValue, Math.max(lowerValue, r));
recommended += r;
return recommended;
}

public int getUpperValue() {
return upperValue;
}

public int getLowerValue() {
return lowerValue;
}
}
Loading