diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java index c249727025..957af4c41c 100644 --- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java +++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java @@ -94,7 +94,8 @@ JobAutoScaler createJobAutoscaler( new ScalingExecutor<>(eventHandler, stateStore), eventHandler, new RescaleApiScalingRealizer<>(eventHandler), - stateStore); + stateStore, + null); } @VisibleForTesting diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java index d8ee54abfd..d440020d75 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java @@ -19,14 +19,17 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.autoscaler.config.AutoScalerOptions; import org.apache.flink.autoscaler.event.AutoScalerEventHandler; import org.apache.flink.autoscaler.exceptions.NotReadyException; import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics; +import org.apache.flink.autoscaler.metrics.CustomEvaluator; import org.apache.flink.autoscaler.metrics.EvaluatedMetrics; import org.apache.flink.autoscaler.realizer.ScalingRealizer; import org.apache.flink.autoscaler.state.AutoScalerStateStore; import org.apache.flink.autoscaler.tuning.ConfigChanges; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.util.Preconditions; @@ -36,9 +39,11 @@ import java.time.Clock; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import static org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.CUSTOM_EVALUATOR_NAME; import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.initRecommendedParallelism; import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.resetRecommendedParallelism; import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.getTrimmedScalingHistory; @@ -58,6 +63,7 @@ public class JobAutoScalerImpl> private final AutoScalerEventHandler eventHandler; private final ScalingRealizer scalingRealizer; private final AutoScalerStateStore stateStore; + private final Map customEvaluators; private Clock clock = Clock.systemDefaultZone(); @@ -73,13 +79,15 @@ public JobAutoScalerImpl( ScalingExecutor scalingExecutor, AutoScalerEventHandler eventHandler, ScalingRealizer scalingRealizer, - AutoScalerStateStore stateStore) { + AutoScalerStateStore stateStore, + Map customEvaluators) { this.metricsCollector = metricsCollector; this.evaluator = evaluator; this.scalingExecutor = scalingExecutor; this.eventHandler = eventHandler; this.scalingRealizer = scalingRealizer; this.stateStore = stateStore; + this.customEvaluators = customEvaluators; } @Override @@ -203,8 +211,15 @@ private void runScalingLogic(Context ctx, AutoscalerFlinkMetrics autoscalerMetri // Scaling tracking data contains previous restart times that are taken into account var restartTime = scalingTracking.getMaxRestartTimeOrDefault(ctx.getConfiguration()); + + var customEvaluatorWithConfig = getCustomEvaluatorIfRequired(ctx.getConfiguration()); + var evaluatedMetrics = - evaluator.evaluate(ctx.getConfiguration(), collectedMetrics, restartTime); + evaluator.evaluate( + ctx.getConfiguration(), + collectedMetrics, + restartTime, + customEvaluatorWithConfig); LOG.debug("Evaluated metrics: {}", evaluatedMetrics); lastEvaluatedMetrics.put(ctx.getJobKey(), evaluatedMetrics); @@ -259,4 +274,20 @@ void setClock(Clock clock) { this.metricsCollector.setClock(clock); this.scalingExecutor.setClock(clock); } + + @VisibleForTesting + protected Tuple2 getCustomEvaluatorIfRequired( + Configuration conf) { + return Optional.ofNullable(conf.get(CUSTOM_EVALUATOR_NAME)) + .map( + name -> { + CustomEvaluator evaluator = customEvaluators.get(name); + return evaluator != null + ? new Tuple2<>( + evaluator, + AutoScalerOptions.forCustomEvaluator(conf, name)) + : null; + }) + .orElse(null); + } } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java index 58c5dbe4a4..74c7784659 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java @@ -18,9 +18,11 @@ package org.apache.flink.autoscaler; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.autoscaler.config.AutoScalerOptions; import org.apache.flink.autoscaler.metrics.CollectedMetricHistory; import org.apache.flink.autoscaler.metrics.CollectedMetrics; +import org.apache.flink.autoscaler.metrics.CustomEvaluator; import org.apache.flink.autoscaler.metrics.EvaluatedMetrics; import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; import org.apache.flink.autoscaler.metrics.MetricAggregator; @@ -28,6 +30,7 @@ import org.apache.flink.autoscaler.topology.JobTopology; import org.apache.flink.autoscaler.utils.AutoScalerUtils; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.slf4j.Logger; @@ -38,6 +41,7 @@ import java.time.Duration; import java.time.Instant; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -72,7 +76,10 @@ public class ScalingMetricEvaluator { private static final Logger LOG = LoggerFactory.getLogger(ScalingMetricEvaluator.class); public EvaluatedMetrics evaluate( - Configuration conf, CollectedMetricHistory collectedMetrics, Duration restartTime) { + Configuration conf, + CollectedMetricHistory collectedMetrics, + Duration restartTime, + @Nullable Tuple2 customEvaluatorWithConfig) { LOG.debug("Restart time used in metrics evaluation: {}", restartTime); var scalingOutput = new HashMap>(); var metricsHistory = collectedMetrics.getMetricHistory(); @@ -80,6 +87,23 @@ public EvaluatedMetrics evaluate( boolean processingBacklog = isProcessingBacklog(topology, metricsHistory, conf); + var customEvaluationSession = + Optional.ofNullable(customEvaluatorWithConfig) + .map( + info -> + Tuple2.of( + info.f0, + new CustomEvaluator.Context( + new UnmodifiableConfiguration(conf), + Collections.unmodifiableSortedMap( + metricsHistory), + Collections.unmodifiableMap(scalingOutput), + topology, + processingBacklog, + restartTime, + info.f1))) + .orElse(null); + for (var vertex : topology.getVerticesInTopologicalOrder()) { scalingOutput.put( vertex, @@ -90,7 +114,8 @@ public EvaluatedMetrics evaluate( topology, vertex, processingBacklog, - restartTime)); + restartTime, + customEvaluationSession)); } var globalMetrics = evaluateGlobalMetrics(metricsHistory); @@ -132,7 +157,8 @@ private Map evaluateMetrics( JobTopology topology, JobVertexID vertex, boolean processingBacklog, - Duration restartTime) { + Duration restartTime, + @Nullable Tuple2 customEvaluationSession) { var latestVertexMetrics = metricsHistory.get(metricsHistory.lastKey()).getVertexMetrics().get(vertex); @@ -142,6 +168,7 @@ private Map evaluateMetrics( double inputRateAvg = getRate(ScalingMetric.NUM_RECORDS_IN, vertex, metricsHistory); var evaluatedMetrics = new HashMap(); + computeTargetDataRate( topology, vertex, @@ -175,6 +202,24 @@ private Map evaluateMetrics( EvaluatedScalingMetric.of(vertexInfo.getNumSourcePartitions())); computeProcessingRateThresholds(evaluatedMetrics, conf, processingBacklog, restartTime); + + Optional.ofNullable(customEvaluationSession) + .map( + session -> + runCustomEvaluator( + vertex, + Collections.unmodifiableMap(evaluatedMetrics), + session)) + .filter(customEvaluatedMetrics -> !customEvaluatedMetrics.isEmpty()) + .ifPresent( + customEvaluatedMetrics -> { + LOG.info( + "Merging custom evaluated metrics for vertex {}: {}", + vertex, + customEvaluatedMetrics); + mergeEvaluatedMetricsMaps(evaluatedMetrics, customEvaluatedMetrics); + }); + return evaluatedMetrics; } @@ -585,4 +630,75 @@ protected static double computeEdgeDataRate( to); return getRate(ScalingMetric.NUM_RECORDS_OUT, from, metricsHistory); } + + /** + * Executes the provided custom evaluator for the given job vertex. Calls {@link + * CustomEvaluator#evaluateVertexMetrics} to evaluate scaling metrics. + * + * @param vertex The job vertex being evaluated. + * @param evaluatedMetrics Current evaluated metrics. + * @param customEvaluationSession A tuple containing the custom evaluator and evaluation + * context. + * @return A map of scaling metrics, with its corresponding evaluated scaling metric. + */ + @VisibleForTesting + protected static Map runCustomEvaluator( + JobVertexID vertex, + Map evaluatedMetrics, + Tuple2 customEvaluationSession) { + try { + return customEvaluationSession.f0.evaluateVertexMetrics( + vertex, evaluatedMetrics, customEvaluationSession.f1); + } catch (UnsupportedOperationException e) { + LOG.warn( + "Custom evaluator {} tried accessing an un-modifiable view.", + customEvaluationSession.f0.getClass(), + e); + } catch (Exception e) { + LOG.warn( + "Custom evaluator {} threw an exception.", + customEvaluationSession.f0.getClass(), + e); + } + + return Collections.emptyMap(); + } + + /** + * Merges the incoming evaluated metrics into actual evaluated metrics. + * + * @param actual The target evaluated metrics map to merge into. + * @param incoming The incoming map containing new evaluated metrics map to be merged + * (nullable). + */ + @VisibleForTesting + protected static void mergeEvaluatedMetricsMaps( + Map actual, + @Nullable Map incoming) { + Optional.ofNullable(incoming) + .ifPresent( + customEvaluatedMetric -> + customEvaluatedMetric.forEach( + (scalingMetric, evaluatedScalingMetric) -> + actual.merge( + scalingMetric, + evaluatedScalingMetric, + ScalingMetricEvaluator + ::mergeEvaluatedScalingMetric))); + } + + /** + * Merges two {@link EvaluatedScalingMetric} instances. + * + * @param actual The existing evaluated scaling metric. + * @param incoming The incoming evaluated scaling metric. + * @return A new {@link EvaluatedScalingMetric} instance with merged values. + */ + @VisibleForTesting + protected static EvaluatedScalingMetric mergeEvaluatedScalingMetric( + EvaluatedScalingMetric actual, EvaluatedScalingMetric incoming) { + return new EvaluatedScalingMetric( + !Double.isNaN(incoming.getCurrent()) ? incoming.getCurrent() : actual.getCurrent(), + !Double.isNaN(incoming.getAverage()) ? incoming.getAverage() : actual.getAverage()); + } } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java index 980db2f4cc..8b6face0da 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java @@ -21,6 +21,8 @@ import org.apache.flink.autoscaler.metrics.MetricAggregator; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DelegatingConfiguration; import org.apache.flink.configuration.MemorySize; import java.time.Duration; @@ -31,6 +33,7 @@ public class AutoScalerOptions { public static final String OLD_K8S_OP_CONF_PREFIX = "kubernetes.operator."; public static final String AUTOSCALER_CONF_PREFIX = "job.autoscaler."; + public static final String CUSTOM_EVALUATOR_CONF_PREFIX = "metrics.custom-evaluator."; private static String oldOperatorConfigKey(String key) { return OLD_K8S_OP_CONF_PREFIX + AUTOSCALER_CONF_PREFIX + key; @@ -382,4 +385,19 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { "scaling.key-group.partitions.adjust.mode")) .withDescription( "How to adjust the parallelism of Source vertex or upstream shuffle is keyBy"); + + public static final ConfigOption CUSTOM_EVALUATOR_NAME = + autoScalerConfig(CUSTOM_EVALUATOR_CONF_PREFIX + "name") + .stringType() + .defaultValue(null) + .withFallbackKeys(oldOperatorConfigKey(CUSTOM_EVALUATOR_CONF_PREFIX + "name")) + .withDescription("Name of the custom evaluator to be used."); + + public static Configuration forCustomEvaluator( + Configuration configuration, String customEvaluatorName) { + // add support for fallBackKey with DelegatingConfiguration. + return new DelegatingConfiguration( + configuration, + AUTOSCALER_CONF_PREFIX + CUSTOM_EVALUATOR_CONF_PREFIX + customEvaluatorName + "."); + } } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CustomEvaluator.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CustomEvaluator.java new file mode 100644 index 0000000000..650c559335 --- /dev/null +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CustomEvaluator.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.metrics; + +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.plugin.Plugin; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import lombok.Getter; + +import java.time.Duration; +import java.time.Instant; +import java.util.Map; +import java.util.SortedMap; + +/** + * Interface for custom evaluators that allow custom scaling metric evaluations. Implementations of + * this interface can provide custom logic to evaluate vertex metrics and merge them with internally + * evaluated metrics. + */ +public interface CustomEvaluator extends Plugin { + + /** + * Returns the name of the custom evaluator. + * + * @return The name of the custom evaluator. + */ + String getName(); + + /** + * Evaluates scaling metrics for a given job vertex based on the internally evaluated metrics + * and context. + * + * @param vertex The {@link JobVertexID} identifying the vertex whose metrics are being + * evaluated. + * @param evaluatedMetrics An un-modifiable view of current vertex internally evaluated metrics. + * @param evaluationContext The evaluation context providing job-related configurations and + * historical metrics. + * @return A map of evaluated scaling metrics for the vertex which would get merged with + * internally evaluated metrics. + * @throws UnsupportedOperationException if an attempt is made to modify the {@code + * evaluatedMetrics}, {@code Context.jobConf}, {@code Context.metricsHistory}, {@code + * Context.evaluatedVertexMetrics}. + */ + Map evaluateVertexMetrics( + JobVertexID vertex, + Map evaluatedMetrics, + Context evaluationContext); + + /** + * Context providing relevant job and metric information to assist in custom metric evaluation. + */ + @Getter + class Context { + private final Configuration jobConf; + + private final SortedMap metricsHistory; + + private final Map> + evaluatedVertexMetrics; + + private final JobTopology topology; + + private final boolean processingBacklog; + + private final Duration restartTime; + + private final Configuration customEvaluatorConf; + + /** + * Constructs a new {@link Context} instance. + * + * @param jobConf An un-modifiable view of job's configuration. + * @param metricsHistory An un-modifiable view of historical record of collected metrics, + * ordered by timestamp. + * @param evaluatedVertexMetrics This map contains an un-modifiable view of evaluated + * metrics for previously evaluated vertex. Note: evaluation of Vertex for scaling + * metrics happens topologically. + * @param topology The job topology representing the structure of the Flink job. + * @param processingBacklog Indicates whether the job is processing backlog. + * @param restartTime Maximum restart time based on scaling records. + * @param customEvaluatorConf The configuration associated with the custom evaluator. + */ + public Context( + Configuration jobConf, + SortedMap metricsHistory, + Map> evaluatedVertexMetrics, + JobTopology topology, + boolean processingBacklog, + Duration restartTime, + Configuration customEvaluatorConf) { + this.jobConf = jobConf; + this.metricsHistory = metricsHistory; + this.evaluatedVertexMetrics = evaluatedVertexMetrics; + this.topology = topology; + this.processingBacklog = processingBacklog; + this.restartTime = restartTime; + this.customEvaluatorConf = customEvaluatorConf; + } + } +} diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/AutoScalerCustomEvaluatorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/AutoScalerCustomEvaluatorTest.java new file mode 100644 index 0000000000..20d5374e70 --- /dev/null +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/AutoScalerCustomEvaluatorTest.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.event.TestingEventCollector; +import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics; +import org.apache.flink.autoscaler.metrics.CustomEvaluator; +import org.apache.flink.autoscaler.metrics.TestCustomEvaluator; +import org.apache.flink.autoscaler.metrics.TestMetrics; +import org.apache.flink.autoscaler.realizer.TestingScalingRealizer; +import org.apache.flink.autoscaler.state.AutoScalerStateStore; +import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore; +import org.apache.flink.autoscaler.topology.IOMetrics; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.topology.VertexInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import java.util.Map; + +import static org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext; +import static org.apache.flink.autoscaler.topology.ShipStrategy.REBALANCE; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** Unit test for testing the integration of custom evaluators with the Flink job autoscaler. */ +public class AutoScalerCustomEvaluatorTest { + private JobAutoScalerContext context; + private AutoScalerStateStore> stateStore; + + private TestingMetricsCollector> metricsCollector; + + private JobVertexID source1, sink; + + private JobAutoScalerImpl> autoscaler; + + @BeforeEach + public void setup() { + context = createDefaultJobAutoScalerContext(); + + TestingEventCollector> eventCollector = + new TestingEventCollector<>(); + stateStore = new InMemoryAutoScalerStateStore<>(); + + ScalingExecutor> scalingExecutor = + new ScalingExecutor<>(eventCollector, stateStore); + String testCustomEvaluatorName = "test-custom-evaluator"; + + var customEvaluators = createTestCustomEvaluator(); + + source1 = new JobVertexID(); + sink = new JobVertexID(); + + metricsCollector = + new TestingMetricsCollector<>( + new JobTopology( + new VertexInfo(source1, Map.of(), 1, 720, new IOMetrics(0, 0, 0)), + new VertexInfo( + sink, + Map.of(source1, REBALANCE), + 1, + 720, + new IOMetrics(0, 0, 0)))); + + var defaultConf = context.getConfiguration(); + defaultConf.set(AutoScalerOptions.AUTOSCALER_ENABLED, true); + defaultConf.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO); + defaultConf.set(AutoScalerOptions.RESTART_TIME, Duration.ofSeconds(1)); + defaultConf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(2)); + defaultConf.set(AutoScalerOptions.SCALING_ENABLED, true); + defaultConf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.); + defaultConf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) Integer.MAX_VALUE); + defaultConf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.8); + defaultConf.set(AutoScalerOptions.UTILIZATION_MAX, 0.9); + defaultConf.set(AutoScalerOptions.UTILIZATION_MIN, 0.7); + defaultConf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO); + defaultConf.set(AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD, Duration.ofSeconds(1)); + + defaultConf.set(AutoScalerOptions.CUSTOM_EVALUATOR_NAME, testCustomEvaluatorName); + + autoscaler = + new JobAutoScalerImpl<>( + metricsCollector, + new ScalingMetricEvaluator(), + scalingExecutor, + eventCollector, + new TestingScalingRealizer<>(), + stateStore, + customEvaluators); + + // Reset custom window size to default + metricsCollector.setTestMetricWindowSize(null); + } + + @Test + public void test() throws Exception { + /* Test scaling up. */ + var now = Instant.ofEpochMilli(0); + setClocksTo(now); + metricsCollector.setJobUpdateTs(now); + // Adjust metric window size, so we can fill the metric window with two metrics + metricsCollector.setTestMetricWindowSize(Duration.ofSeconds(1)); + metricsCollector.updateMetrics( + source1, + TestMetrics.builder() + .numRecordsIn(0) + .numRecordsOut(0) + .numRecordsInPerSec(500.) + .maxBusyTimePerSec(8) + .pendingRecords(0L) + .build()); + metricsCollector.updateMetrics( + sink, TestMetrics.builder().numRecordsIn(0).maxBusyTimePerSec(850).build()); + + autoscaler.scale(context); + + now = now.plus(Duration.ofSeconds(1)); + setClocksTo(now); + + metricsCollector.updateMetrics( + source1, m -> m.setNumRecordsIn(500), m -> m.setNumRecordsOut(500)); + metricsCollector.updateMetrics(sink, m -> m.setNumRecordsIn(500)); + + autoscaler.scale(context); + + var scaledParallelism = ScalingExecutorTest.getScaledParallelism(stateStore, context); + assertEquals(3, scaledParallelism.get(source1)); + assertEquals(200, scaledParallelism.get(sink)); + assertFlinkMetricsCount(1, 0); + } + + private void setClocksTo(Instant time) { + var clock = Clock.fixed(time, ZoneId.systemDefault()); + autoscaler.setClock(clock); + } + + private void assertFlinkMetricsCount(int scalingCount, int balancedCount) { + AutoscalerFlinkMetrics autoscalerFlinkMetrics = + autoscaler.flinkMetrics.get(context.getJobKey()); + assertEquals(scalingCount, autoscalerFlinkMetrics.getNumScalingsCount()); + assertEquals(balancedCount, autoscalerFlinkMetrics.getNumBalancedCount()); + } + + private Map createTestCustomEvaluator() { + var testCustomEvaluator = new TestCustomEvaluator(); + testCustomEvaluator.configure(new Configuration()); + return Map.of(testCustomEvaluator.getName(), testCustomEvaluator); + } +} diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java index df950bb4c0..956ecf32d8 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java @@ -40,6 +40,7 @@ import java.time.Duration; import java.time.Instant; import java.time.ZoneId; +import java.util.Collections; import java.util.Map; import java.util.SortedMap; @@ -108,7 +109,8 @@ public void setup() { scalingExecutor, eventCollector, new TestingScalingRealizer<>(), - stateStore); + stateStore, + Collections.emptyMap()); // Reset custom window size to default metricsCollector.setTestMetricWindowSize(null); diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownEndToEndTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownEndToEndTest.java index 420d9df3a0..9eca91295e 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownEndToEndTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownEndToEndTest.java @@ -122,7 +122,8 @@ public void setup() throws Exception { new ScalingExecutor<>(eventCollector, stateStore), eventCollector, scalingRealizer, - stateStore); + stateStore, + Collections.emptyMap()); // initially the last evaluated metrics are empty assertThat(autoscaler.lastEvaluatedMetrics.get(context.getJobKey())).isNull(); diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java index 552971d7a5..03dfc409ba 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java @@ -23,7 +23,9 @@ import org.apache.flink.autoscaler.exceptions.NotReadyException; import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics; import org.apache.flink.autoscaler.metrics.CollectedMetrics; +import org.apache.flink.autoscaler.metrics.CustomEvaluator; import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.metrics.TestCustomEvaluator; import org.apache.flink.autoscaler.metrics.TestMetrics; import org.apache.flink.autoscaler.realizer.ScalingRealizer; import org.apache.flink.autoscaler.realizer.TestingScalingRealizer; @@ -32,6 +34,8 @@ import org.apache.flink.autoscaler.topology.VertexInfo; import org.apache.flink.autoscaler.tuning.ConfigChanges; import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.metrics.Gauge; @@ -53,6 +57,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import static java.util.Map.entry; @@ -63,6 +68,9 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; /** Tests for JobAutoScalerImpl. */ @@ -111,7 +119,8 @@ void testMetricReporting() throws Exception { scalingExecutor, eventCollector, scalingRealizer, - stateStore); + stateStore, + Collections.emptyMap()); autoscaler.scale(context); @@ -147,7 +156,13 @@ private static double getGaugeValue( void testErrorReporting() throws Exception { var autoscaler = new JobAutoScalerImpl<>( - null, null, null, eventCollector, scalingRealizer, stateStore); + null, + null, + null, + eventCollector, + scalingRealizer, + stateStore, + Collections.emptyMap()); autoscaler.scale(context); Assertions.assertEquals( @@ -182,7 +197,8 @@ protected Collection queryAggregatedMetricNames( null, eventCollector, scalingRealizer, - stateStore); + stateStore, + Collections.emptyMap()); // Should not produce an error autoscaler.scale(context); @@ -219,7 +235,8 @@ public void realizeParallelismOverrides( null, eventCollector, realizeParallelismOverridesWithExceptionsScalingRealizer, - stateStore); + stateStore, + Collections.emptyMap()); // Should produce an error autoscaler.scale(context); @@ -236,7 +253,8 @@ void testParallelismOverrides() throws Exception { null, eventCollector, scalingRealizer, - stateStore); + stateStore, + Collections.emptyMap()); // Initially we should return empty overrides, do not crate any state assertThat(autoscaler.getParallelismOverrides(context)).isEmpty(); @@ -304,7 +322,13 @@ public void testApplyAutoscalerParallelism() throws Exception { var overrides = new HashMap(); var autoscaler = new JobAutoScalerImpl<>( - null, null, null, eventCollector, scalingRealizer, stateStore) { + null, + null, + null, + eventCollector, + scalingRealizer, + stateStore, + Collections.emptyMap()) { public Map getParallelismOverrides( JobAutoScalerContext ctx) { return new HashMap<>(overrides); @@ -354,7 +378,13 @@ void testApplyConfigOverrides() throws Exception { context.getConfiguration().set(AutoScalerOptions.MEMORY_TUNING_ENABLED, true); var autoscaler = new JobAutoScalerImpl<>( - null, null, null, eventCollector, scalingRealizer, stateStore); + null, + null, + null, + eventCollector, + scalingRealizer, + stateStore, + Collections.emptyMap()); // Initially we should return empty overrides, do not crate any state assertThat(stateStore.getConfigChanges(context).getOverrides()).isEmpty(); @@ -402,7 +432,13 @@ void testAutoscalerDisabled() throws Exception { var autoscaler = new JobAutoScalerImpl<>( - null, null, null, eventCollector, scalingRealizer, stateStore); + null, + null, + null, + eventCollector, + scalingRealizer, + stateStore, + Collections.emptyMap()); autoscaler.scale(context); assertTrue(stateStore.getScalingHistory(context).isEmpty()); @@ -427,4 +463,108 @@ private void assertParallelismOverrides(Map expectedOverrides) { private TestingScalingRealizer.Event> getEvent() { return scalingRealizer.events.poll(); } + + @Test + void testGetCustomEvaluatorIfRequired() { + CustomEvaluator testCustomEvaluator = new TestCustomEvaluator(); + testCustomEvaluator.configure(new Configuration()); + var testCustomEvaluators = Map.of(testCustomEvaluator.getName(), testCustomEvaluator); + + var autoscalerWithCustomEvaluator = + new JobAutoScalerImpl<>( + null, + null, + null, + eventCollector, + scalingRealizer, + stateStore, + testCustomEvaluators); + + String testCustomEvaluatorName = "test-custom-evaluator"; + + var defaultConf = context.getConfiguration(); + + // Case 1: Custom evaluator configured. + defaultConf.set(AutoScalerOptions.CUSTOM_EVALUATOR_NAME, testCustomEvaluatorName); + + var customEvaluatorWithConfig = + autoscalerWithCustomEvaluator.getCustomEvaluatorIfRequired( + context.getConfiguration()); + assertNotNull(customEvaluatorWithConfig); + assertInstanceOf(CustomEvaluator.class, customEvaluatorWithConfig.f0); + var customEvaluatorConfig = customEvaluatorWithConfig.f1; + assertNotNull(customEvaluatorConfig); + assertEquals(0, customEvaluatorConfig.keySet().size()); + + Set expectedKeys = Set.of(); + assertEquals(expectedKeys, customEvaluatorConfig.keySet()); + + // Case 2: Custom evaluator configured with additional configs for the plugin. + defaultConf.set( + ConfigOptions.key( + AutoScalerOptions.AUTOSCALER_CONF_PREFIX + + AutoScalerOptions.CUSTOM_EVALUATOR_CONF_PREFIX + + testCustomEvaluatorName + + ".k1") + .stringType() + .noDefaultValue(), + "v1"); + + defaultConf.set( + ConfigOptions.key( + AutoScalerOptions.AUTOSCALER_CONF_PREFIX + + AutoScalerOptions.CUSTOM_EVALUATOR_CONF_PREFIX + + testCustomEvaluatorName + + ".k2") + .stringType() + .noDefaultValue(), + "v2"); + + var customEvaluatorWithConfigContainingAdditionalKeys = + autoscalerWithCustomEvaluator.getCustomEvaluatorIfRequired( + context.getConfiguration()); + assertNotNull(customEvaluatorWithConfigContainingAdditionalKeys); + assertInstanceOf( + CustomEvaluator.class, customEvaluatorWithConfigContainingAdditionalKeys.f0); + var customEvaluatorConfigContainingAdditionalKeys = + customEvaluatorWithConfigContainingAdditionalKeys.f1; + assertNotNull(customEvaluatorConfigContainingAdditionalKeys); + assertEquals(2, customEvaluatorConfigContainingAdditionalKeys.keySet().size()); + + expectedKeys = Set.of("k1", "k2"); + assertEquals(expectedKeys, customEvaluatorConfigContainingAdditionalKeys.keySet()); + + // Case 3: Custom evaluator configured but with a custom evaluator name that is not + // available. + defaultConf.set(AutoScalerOptions.CUSTOM_EVALUATOR_NAME, "test-custom-evaluator-no-match"); + + var customEvaluatorWithConfigNoMatch = + autoscalerWithCustomEvaluator.getCustomEvaluatorIfRequired( + context.getConfiguration()); + assertNull(customEvaluatorWithConfigNoMatch); + + // Case 4: Custom evaluator not configured. + defaultConf.removeConfig(AutoScalerOptions.CUSTOM_EVALUATOR_NAME); + + var customEvaluatorNotConfigured = + autoscalerWithCustomEvaluator.getCustomEvaluatorIfRequired( + context.getConfiguration()); + assertNull(customEvaluatorNotConfigured); + + // Case 5: No custom evaluators available. + var autoscalerWithoutCustomEvaluator = + new JobAutoScalerImpl<>( + null, + null, + null, + eventCollector, + scalingRealizer, + stateStore, + Collections.emptyMap()); + + var customEvaluatorConfigNoCustomEvaluators = + autoscalerWithoutCustomEvaluator.getCustomEvaluatorIfRequired( + context.getConfiguration()); + assertNull(customEvaluatorConfigNoCustomEvaluators); + } } diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java index 1176075f9c..7b9841c8dc 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java @@ -181,7 +181,7 @@ public void testEndToEnd() throws Exception { assertEquals(3, collectedMetrics.getMetricHistory().size()); assertTrue(collectedMetrics.isFullyCollected()); - var evaluation = evaluator.evaluate(conf, collectedMetrics, restartTime); + var evaluation = evaluator.evaluate(conf, collectedMetrics, restartTime, null); scalingExecutor.scaleResource( context, evaluation, @@ -387,7 +387,7 @@ public void testTolerateAbsenceOfPendingRecordsMetric() throws Exception { var collectedMetrics = metricsCollector.updateMetrics(context, stateStore); var evaluation = - evaluator.evaluate(context.getConfiguration(), collectedMetrics, restartTime); + evaluator.evaluate(context.getConfiguration(), collectedMetrics, restartTime, null); assertEquals( 500., evaluation @@ -647,7 +647,7 @@ public void testScaleDownWithZeroProcessingRate() throws Exception { var collectedMetrics = collectMetrics(); var evaluation = - evaluator.evaluate(context.getConfiguration(), collectedMetrics, restartTime); + evaluator.evaluate(context.getConfiguration(), collectedMetrics, restartTime, null); assertEquals( 0, evaluation @@ -698,7 +698,8 @@ public void testScaleDownWithZeroProcessingRate() throws Exception { .getMetricHistory() .put(Instant.ofEpochSecond(1234), new CollectedMetrics(newMetrics, Map.of())); - evaluation = evaluator.evaluate(context.getConfiguration(), collectedMetrics, restartTime); + evaluation = + evaluator.evaluate(context.getConfiguration(), collectedMetrics, restartTime, null); assertEquals( 3., evaluation diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java index f34aa1fc09..2902d25fae 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java @@ -37,6 +37,7 @@ import java.time.Duration; import java.time.Instant; import java.time.ZoneId; +import java.util.Collections; import java.util.Map; import static org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext; @@ -98,7 +99,8 @@ public void setup() { new ScalingExecutor<>(eventCollector, stateStore), eventCollector, new TestingScalingRealizer<>(), - stateStore); + stateStore, + Collections.emptyMap()); // Reset custom window size to default metricsCollector.setTestMetricWindowSize(null); diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java index 03e8fce587..5a4577dfef 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java @@ -21,12 +21,15 @@ import org.apache.flink.autoscaler.config.AutoScalerOptions; import org.apache.flink.autoscaler.metrics.CollectedMetricHistory; import org.apache.flink.autoscaler.metrics.CollectedMetrics; +import org.apache.flink.autoscaler.metrics.CustomEvaluator; import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; import org.apache.flink.autoscaler.metrics.MetricAggregator; import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.metrics.TestCustomEvaluator; import org.apache.flink.autoscaler.topology.JobTopology; import org.apache.flink.autoscaler.topology.VertexInfo; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.junit.jupiter.api.Test; @@ -112,7 +115,8 @@ public void testLagBasedSourceScaling() { .evaluate( conf, new CollectedMetricHistory(topology, metricHistory, Instant.now()), - Duration.ZERO) + Duration.ZERO, + null) .getVertexMetrics(); assertEquals( @@ -146,7 +150,8 @@ public void testLagBasedSourceScaling() { .evaluate( conf, new CollectedMetricHistory(topology, metricHistory, Instant.now()), - Duration.ZERO) + Duration.ZERO, + null) .getVertexMetrics(); assertEquals( EvaluatedScalingMetric.avg(150), @@ -169,7 +174,8 @@ public void testLagBasedSourceScaling() { .evaluate( conf, new CollectedMetricHistory(topology, metricHistory, Instant.now()), - Duration.ZERO) + Duration.ZERO, + null) .getVertexMetrics(); assertEquals( EvaluatedScalingMetric.avg(150), @@ -191,7 +197,8 @@ public void testLagBasedSourceScaling() { .evaluate( conf, new CollectedMetricHistory(topology, metricHistory, Instant.now()), - Duration.ZERO) + Duration.ZERO, + null) .getVertexMetrics(); assertEquals( EvaluatedScalingMetric.avg(150), @@ -250,7 +257,8 @@ public void testLagBasedSourceScaling() { .evaluate( conf, new CollectedMetricHistory(topology, metricHistory, Instant.now()), - Duration.ZERO) + Duration.ZERO, + null) .getVertexMetrics(); assertEquals( EvaluatedScalingMetric.avg(100), @@ -260,6 +268,95 @@ public void testLagBasedSourceScaling() { evaluatedMetrics.get(sink).get(ScalingMetric.TARGET_DATA_RATE)); } + @Test + public void testEvaluateWithCustomEvaluator() { + var source = new JobVertexID(); + var sink = new JobVertexID(); + + var topology = + new JobTopology( + new VertexInfo(source, Collections.emptyMap(), 1, 1, null), + new VertexInfo(sink, Map.of(source, REBALANCE), 1, 1, null)); + + var metricHistory = new TreeMap(); + + metricHistory.put( + Instant.ofEpochMilli(1000), + new CollectedMetrics( + Map.of( + source, + Map.of( + ScalingMetric.LAG, + 0., + ScalingMetric.NUM_RECORDS_IN, + 0., + ScalingMetric.NUM_RECORDS_OUT, + 0., + ScalingMetric.LOAD, + .1), + sink, + Map.of(ScalingMetric.NUM_RECORDS_IN, 0., ScalingMetric.LOAD, .1)), + Map.of())); + + metricHistory.put( + Instant.ofEpochMilli(2000), + new CollectedMetrics( + Map.of( + source, + Map.of( + ScalingMetric.LAG, + 0., + ScalingMetric.NUM_RECORDS_IN, + 100., + ScalingMetric.NUM_RECORDS_OUT, + 200., + ScalingMetric.LOAD, + .4), + sink, + Map.of(ScalingMetric.NUM_RECORDS_IN, 200., ScalingMetric.LOAD, .2)), + Map.of())); + + var conf = new Configuration(); + + conf.set(CATCH_UP_DURATION, Duration.ofSeconds(2)); + CustomEvaluator customEvaluator = new TestCustomEvaluator(); + var customEvaluatorWithConfig = new Tuple2<>(customEvaluator, new Configuration()); + + var evaluatedMetrics = + evaluator + .evaluate( + conf, + new CollectedMetricHistory(topology, metricHistory, Instant.now()), + Duration.ZERO, + customEvaluatorWithConfig) + .getVertexMetrics(); + + assertEquals( + EvaluatedScalingMetric.avg(.25), + evaluatedMetrics.get(source).get(ScalingMetric.LOAD)); + + assertEquals( + EvaluatedScalingMetric.avg(.15), + evaluatedMetrics.get(sink).get(ScalingMetric.LOAD)); + + assertEquals( + EvaluatedScalingMetric.avg(100000.0), + evaluatedMetrics.get(source).get(ScalingMetric.TARGET_DATA_RATE)); + assertEquals( + EvaluatedScalingMetric.of(0.0), + evaluatedMetrics.get(source).get(ScalingMetric.CATCH_UP_DATA_RATE)); + assertEquals( + EvaluatedScalingMetric.avg(200000.0), + evaluatedMetrics.get(sink).get(ScalingMetric.TARGET_DATA_RATE)); + assertEquals( + EvaluatedScalingMetric.of(0.0), + evaluatedMetrics.get(sink).get(ScalingMetric.CATCH_UP_DATA_RATE)); + assertEquals( + EvaluatedScalingMetric.of(0.0), + evaluatedMetrics.get(source).get(ScalingMetric.LAG)); + assertFalse(evaluatedMetrics.get(sink).containsKey(ScalingMetric.LAG)); + } + @Test public void testUtilizationBoundaryComputation() { @@ -838,4 +935,155 @@ private Tuple2 getThresholds( map.get(ScalingMetric.SCALE_UP_RATE_THRESHOLD).getCurrent(), map.get(ScalingMetric.SCALE_DOWN_RATE_THRESHOLD).getCurrent()); } + + @Test + public void testRunCustomEvaluator() { + var source = new JobVertexID(); + var sink = new JobVertexID(); + + var topology = + new JobTopology( + new VertexInfo(source, Collections.emptyMap(), 1, 1, null), + new VertexInfo(sink, Map.of(source, REBALANCE), 1, 1, null)); + + var metricHistory = new TreeMap(); + + metricHistory.put( + Instant.ofEpochMilli(1000), + new CollectedMetrics( + Map.of( + source, + Map.of( + ScalingMetric.LAG, + 0., + ScalingMetric.NUM_RECORDS_IN, + 0., + ScalingMetric.NUM_RECORDS_OUT, + 0., + ScalingMetric.LOAD, + .1), + sink, + Map.of(ScalingMetric.NUM_RECORDS_IN, 0., ScalingMetric.LOAD, .1)), + Map.of())); + + metricHistory.put( + Instant.ofEpochMilli(2000), + new CollectedMetrics( + Map.of( + source, + Map.of( + ScalingMetric.LAG, + 0., + ScalingMetric.NUM_RECORDS_IN, + 100., + ScalingMetric.NUM_RECORDS_OUT, + 200., + ScalingMetric.LOAD, + .4), + sink, + Map.of(ScalingMetric.NUM_RECORDS_IN, 200., ScalingMetric.LOAD, .2)), + Map.of())); + + var conf = new Configuration(); + CustomEvaluator customEvaluator = new TestCustomEvaluator(); + var evaluatedMetrics = new HashMap(); + + var testCustomEvaluationSession = + Tuple2.of( + customEvaluator, + new CustomEvaluator.Context( + new UnmodifiableConfiguration(conf), + Collections.unmodifiableSortedMap(metricHistory), + Collections.unmodifiableMap(new HashMap<>()), + topology, + false, + Duration.ZERO, + new Configuration())); + + var testCustomEvaluatorResult = + ScalingMetricEvaluator.runCustomEvaluator( + source, evaluatedMetrics, testCustomEvaluationSession); + + assertFalse(testCustomEvaluatorResult.isEmpty()); + + assertTrue(testCustomEvaluatorResult.containsKey(ScalingMetric.TARGET_DATA_RATE)); + + assertEquals( + EvaluatedScalingMetric.avg(100000.0), + testCustomEvaluatorResult.get(ScalingMetric.TARGET_DATA_RATE)); + } + + @Test + public void testMergeEvaluatedMetricsMaps() { + Map actual = new HashMap<>(); + actual.put(ScalingMetric.TARGET_DATA_RATE, EvaluatedScalingMetric.avg(50000.0)); + actual.put(ScalingMetric.LOAD, EvaluatedScalingMetric.avg(0.5)); + + // Case 1: Merge with null (should not modify actual) + ScalingMetricEvaluator.mergeEvaluatedMetricsMaps(actual, null); + assertEquals(2, actual.size()); + assertEquals( + EvaluatedScalingMetric.avg(50000.0), actual.get(ScalingMetric.TARGET_DATA_RATE)); + assertEquals(EvaluatedScalingMetric.avg(0.5), actual.get(ScalingMetric.LOAD)); + + // Case 2: Merge with an empty map (should not modify actual) + ScalingMetricEvaluator.mergeEvaluatedMetricsMaps(actual, Collections.emptyMap()); + assertEquals(2, actual.size()); + assertEquals( + EvaluatedScalingMetric.avg(50000.0), actual.get(ScalingMetric.TARGET_DATA_RATE)); + assertEquals(EvaluatedScalingMetric.avg(0.5), actual.get(ScalingMetric.LOAD)); + + // Case 3: Merge with an incoming map + Map incoming = new HashMap<>(); + incoming.put(ScalingMetric.TARGET_DATA_RATE, EvaluatedScalingMetric.avg(100000.0)); + incoming.put(ScalingMetric.LAG, new EvaluatedScalingMetric(10.0, 10.0)); + + ScalingMetricEvaluator.mergeEvaluatedMetricsMaps(actual, incoming); + assertEquals(3, actual.size()); + + assertTrue(actual.containsKey(ScalingMetric.LAG)); + assertEquals(new EvaluatedScalingMetric(10.0, 10.0), actual.get(ScalingMetric.LAG)); + + assertTrue(actual.containsKey(ScalingMetric.TARGET_DATA_RATE)); + assertEquals( + EvaluatedScalingMetric.avg(100000.0), actual.get(ScalingMetric.TARGET_DATA_RATE)); + + assertTrue(actual.containsKey(ScalingMetric.LOAD)); + assertEquals(EvaluatedScalingMetric.avg(0.5), actual.get(ScalingMetric.LOAD)); + } + + @Test + public void testMergeEvaluatedScalingMetric() { + // Case 1 + EvaluatedScalingMetric actual = new EvaluatedScalingMetric(50.0, 100.0); + EvaluatedScalingMetric incoming = new EvaluatedScalingMetric(60.0, 120.0); + + EvaluatedScalingMetric result = + ScalingMetricEvaluator.mergeEvaluatedScalingMetric(actual, incoming); + + assertEquals(60.0, result.getCurrent()); + assertEquals(120.0, result.getAverage()); + + // Case 2 + EvaluatedScalingMetric incomingWithNaN = new EvaluatedScalingMetric(Double.NaN, Double.NaN); + result = ScalingMetricEvaluator.mergeEvaluatedScalingMetric(actual, incomingWithNaN); + + assertEquals(50.0, result.getCurrent()); + assertEquals(100.0, result.getAverage()); + + // Case 3 + EvaluatedScalingMetric incomingWithPartialNaN = + new EvaluatedScalingMetric(Double.NaN, 130.0); + result = ScalingMetricEvaluator.mergeEvaluatedScalingMetric(actual, incomingWithPartialNaN); + + assertEquals(50.0, result.getCurrent()); + assertEquals(130.0, result.getAverage()); + + // Case 4 + EvaluatedScalingMetric actualWithNaN = new EvaluatedScalingMetric(Double.NaN, Double.NaN); + result = ScalingMetricEvaluator.mergeEvaluatedScalingMetric(actualWithNaN, incoming); + + assertEquals(60.0, result.getCurrent()); + assertEquals(120.0, result.getAverage()); + } } diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/SimpleTrendAdjustor.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/SimpleTrendAdjustor.java new file mode 100644 index 0000000000..12b555f4e1 --- /dev/null +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/SimpleTrendAdjustor.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.metrics; + +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import java.time.Instant; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.SortedMap; + +/** + * A simple implementation of the {@link CustomEvaluator} interface that adjusts scaling metrics + * based on recent historical trends. This evaluator applies a weighted moving average to refine the + * target data rate for source job vertices, enabling more responsive scaling decisions. + */ +public class SimpleTrendAdjustor implements CustomEvaluator { + @Override + public String getName() { + return "simple-trend-adjustor"; + } + + @Override + public Map evaluateVertexMetrics( + JobVertexID vertex, + Map evaluatedMetrics, + Context evaluationContext) { + + if (!evaluationContext.getTopology().isSource(vertex)) { + return Collections.emptyMap(); + } + + var customEvaluatedMetrics = new HashMap(); + + // Extract current target data rate + EvaluatedScalingMetric targetDataRateMetric = + evaluatedMetrics.get(ScalingMetric.TARGET_DATA_RATE); + double currentTargetRate = + (targetDataRateMetric != null) ? targetDataRateMetric.getAverage() : 0.0; + + // Compute historical trend adjustment + double trendAdjustment = + computeTrendAdjustment(vertex, evaluationContext.getMetricsHistory()); + + // Apply a dynamic adjustment based on recent trends + double adjustedTargetRate = currentTargetRate + trendAdjustment; + + // Store the updated metric + customEvaluatedMetrics.put( + ScalingMetric.TARGET_DATA_RATE, EvaluatedScalingMetric.avg(adjustedTargetRate)); + + return customEvaluatedMetrics; + } + + /** + * Computes a trend-based adjustment using recent historical metrics. Uses a simple weighted + * moving average over the last few recorded metrics. + */ + private double computeTrendAdjustment( + JobVertexID vertex, SortedMap metricsHistory) { + if (metricsHistory.isEmpty()) { + // Fallback: apply no increase if no history is available + return 0.; + } + + double totalWeight = 0.0; + double weightedSum = 0.0; + // Increasing weight for more recent data points + int weight = 1; + + // Iterate over the last N entries (e.g., last 5 data points) + int count = 0; + for (var entry : metricsHistory.values()) { + Double historicalRate = + entry.getVertexMetrics().get(vertex).get(ScalingMetric.TARGET_DATA_RATE); + if (historicalRate != null) { + weightedSum += historicalRate * weight; + totalWeight += weight; + weight++; + count++; + } + if (count >= 5) { // Limit to last 5 points + break; + } + } + + return (totalWeight > 0) + ? (weightedSum / totalWeight) + - metricsHistory + .get(metricsHistory.lastKey()) + .getVertexMetrics() + .get(vertex) + .get(ScalingMetric.TARGET_DATA_RATE) + : 0.; + } +} diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/TestCustomEvaluator.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/TestCustomEvaluator.java new file mode 100644 index 0000000000..55b186b0f2 --- /dev/null +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/TestCustomEvaluator.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.metrics; + +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * A test implementation of the {@link CustomEvaluator} interface that provides custom scaling + * metric evaluations for job vertices in a Flink job. + */ +public class TestCustomEvaluator implements CustomEvaluator { + @Override + public String getName() { + return "test-custom-evaluator"; + } + + @Override + public Map evaluateVertexMetrics( + JobVertexID vertex, + Map evaluatedMetrics, + Context evaluationContext) { + if (evaluationContext.getTopology().isSource(vertex)) { + var customEvaluatedMetrics = new HashMap(); + customEvaluatedMetrics.put( + ScalingMetric.TARGET_DATA_RATE, EvaluatedScalingMetric.avg(100000.0)); + + return customEvaluatedMetrics; + } + return Collections.emptyMap(); + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java index 4bd2836f7a..cf03073c39 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java @@ -174,7 +174,9 @@ void registerDeploymentController() { var statusRecorder = StatusRecorder.create(client, metricManager, listeners); var clusterResourceManager = ClusterResourceManager.of(configManager.getDefaultConfig(), client); - var autoscaler = AutoscalerFactory.create(client, eventRecorder, clusterResourceManager); + var autoscaler = + AutoscalerFactory.create( + client, eventRecorder, clusterResourceManager, configManager); var reconcilerFactory = new ReconcilerFactory(eventRecorder, statusRecorder, autoscaler); var observerFactory = new FlinkDeploymentObserverFactory(eventRecorder); var canaryResourceManager = new CanaryResourceManager(configManager); @@ -198,7 +200,7 @@ void registerSessionJobController() { var metricManager = MetricManager.createFlinkSessionJobMetricManager(baseConfig, metricGroup); var statusRecorder = StatusRecorder.create(client, metricManager, listeners); - var autoscaler = AutoscalerFactory.create(client, eventRecorder, null); + var autoscaler = AutoscalerFactory.create(client, eventRecorder, null, configManager); var reconciler = new SessionJobReconciler(eventRecorder, statusRecorder, autoscaler); var observer = new FlinkSessionJobObserver(eventRecorder); var canaryResourceManager = new CanaryResourceManager(configManager); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java index 7bbb5492ec..d97d339381 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java @@ -24,6 +24,7 @@ import org.apache.flink.autoscaler.ScalingMetricEvaluator; import org.apache.flink.kubernetes.operator.autoscaler.state.ConfigMapStore; import org.apache.flink.kubernetes.operator.autoscaler.state.KubernetesAutoScalerStateStore; +import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.resources.ClusterResourceManager; import org.apache.flink.kubernetes.operator.utils.EventRecorder; @@ -36,10 +37,12 @@ public class AutoscalerFactory { public static JobAutoScaler create( KubernetesClient client, EventRecorder eventRecorder, - ClusterResourceManager clusterResourceManager) { + ClusterResourceManager clusterResourceManager, + FlinkConfigManager configManager) { var stateStore = new KubernetesAutoScalerStateStore(new ConfigMapStore(client)); var eventHandler = new KubernetesAutoScalerEventHandler(eventRecorder); + var customEvaluators = AutoscalerUtils.discoverCustomEvaluators(configManager); return new JobAutoScalerImpl<>( new RestApiMetricsCollector<>(), @@ -47,6 +50,7 @@ public static JobAutoScaler create( new ScalingExecutor<>(eventHandler, stateStore, clusterResourceManager), eventHandler, new KubernetesScalingRealizer(), - stateStore); + stateStore, + customEvaluators); } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerUtils.java new file mode 100644 index 0000000000..e2ef8d8b6c --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerUtils.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler; + +import org.apache.flink.autoscaler.metrics.CustomEvaluator; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.core.plugin.PluginUtils; +import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** Autoscaler related utility methods for Operator. */ +public class AutoscalerUtils { + private static final Logger LOG = LoggerFactory.getLogger(AutoscalerUtils.class); + + /** + * discovers custom evaluator's for autoscaler. + * + * @param configManager Flink Config manager + * @return A map of discovered custom evaluators, where the key is the evaluator name provided + * by {@link CustomEvaluator#getName()}) and the value is the corresponding instance. + */ + public static Map discoverCustomEvaluators( + FlinkConfigManager configManager) { + var conf = configManager.getDefaultConfig(); + Map customEvaluators = new HashMap<>(); + PluginUtils.createPluginManagerFromRootFolder(conf) + .load(CustomEvaluator.class) + .forEachRemaining( + customEvaluator -> { + String customEvaluatorName = customEvaluator.getName(); + LOG.info( + "Discovered custom evaluator for autoscaler from plugin directory[{}]: {}.", + System.getenv() + .getOrDefault( + ConfigConstants.ENV_FLINK_PLUGINS_DIR, + ConfigConstants.DEFAULT_FLINK_PLUGINS_DIRS), + customEvaluatorName); + customEvaluator.configure(conf); + if (customEvaluators.containsKey(customEvaluatorName)) { + LOG.warn( + "Duplicate custom evaluator name [{}] detected. Overwriting existing [{}] with [{}].", + customEvaluatorName, + customEvaluators + .get(customEvaluatorName) + .getClass() + .getName(), + customEvaluator.getClass().getName()); + } + customEvaluators.put(customEvaluatorName, customEvaluator); + }); + return customEvaluators; + } +} diff --git a/flink-kubernetes-operator/src/test/assembly/test-plugins-assembly.xml b/flink-kubernetes-operator/src/test/assembly/test-plugins-assembly.xml index 3cb8b5d176..67526784a4 100644 --- a/flink-kubernetes-operator/src/test/assembly/test-plugins-assembly.xml +++ b/flink-kubernetes-operator/src/test/assembly/test-plugins-assembly.xml @@ -32,6 +32,7 @@ under the License. org/apache/flink/kubernetes/operator/listener/TestingListener.class org/apache/flink/kubernetes/operator/autoscaler/TestingAutoscaler.class org/apache/flink/kubernetes/operator/mutator/TestMutator.java + org/apache/flink/autoscaler/metrics/TestCustomEvaluator.java @@ -40,4 +41,4 @@ under the License. /META-INF/services - + \ No newline at end of file diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactoryTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactoryTest.java index f848d491a6..6b884044b8 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactoryTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactoryTest.java @@ -19,6 +19,8 @@ import org.apache.flink.autoscaler.JobAutoScaler; import org.apache.flink.autoscaler.JobAutoScalerImpl; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.resources.ClusterResourceManager; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.kubernetes.operator.utils.FlinkResourceEventCollector; @@ -47,7 +49,8 @@ void testLoadDefaultImplementation() { new EventRecorder( new FlinkResourceEventCollector(), new FlinkStateSnapshotEventCollector()), - new ClusterResourceManager(Duration.ZERO, kubernetesClient)); + new ClusterResourceManager(Duration.ZERO, kubernetesClient), + new FlinkConfigManager(new Configuration())); Assertions.assertTrue(autoScaler instanceof JobAutoScalerImpl); } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerUtilsTest.java new file mode 100644 index 0000000000..2d1f365daf --- /dev/null +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerUtilsTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.TestUtils; +import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** Unit tests for {@link AutoscalerUtils}. */ +public class AutoscalerUtilsTest { + @TempDir public Path temporaryFolder; + + @Test + public void testDiscoverCustomEvaluators() throws IOException { + Map originalEnv = System.getenv(); + try { + Map systemEnv = new HashMap<>(originalEnv); + systemEnv.put( + ConfigConstants.ENV_FLINK_PLUGINS_DIR, + TestUtils.getTestPluginsRootDir(temporaryFolder)); + TestUtils.setEnv(systemEnv); + + // Discover evaluators + var discoveredEvaluators = + AutoscalerUtils.discoverCustomEvaluators( + new FlinkConfigManager(new Configuration())) + .keySet(); + // Expected evaluators + var expectedEvaluators = new HashSet<>(List.of("test-custom-evaluator")); + + assertEquals(expectedEvaluators, discoveredEvaluators); + } finally { + TestUtils.setEnv(originalEnv); + } + } +} diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java index 09885b965a..6ac12dfddf 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java @@ -104,7 +104,8 @@ public TestingFlinkDeploymentController( flinkService.getKubernetesClient(), eventRecorder, new ClusterResourceManager( - Duration.ZERO, flinkService.getKubernetesClient()))); + Duration.ZERO, flinkService.getKubernetesClient()), + configManager)); canaryResourceManager = new CanaryResourceManager<>(configManager); flinkDeploymentController = new FlinkDeploymentController( diff --git a/flink-kubernetes-operator/src/test/resources/META-INF/services/org.apache.flink.autoscaler.metrics.CustomEvaluator b/flink-kubernetes-operator/src/test/resources/META-INF/services/org.apache.flink.autoscaler.metrics.CustomEvaluator new file mode 100644 index 0000000000..a150d9783a --- /dev/null +++ b/flink-kubernetes-operator/src/test/resources/META-INF/services/org.apache.flink.autoscaler.metrics.CustomEvaluator @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.flink.autoscaler.metrics.TestCustomEvaluator \ No newline at end of file