From 89f7857a1248c2e218a0835de00e4aba730d658f Mon Sep 17 00:00:00 2001 From: Pradeepta Choudhury Date: Mon, 10 Mar 2025 20:18:11 +0530 Subject: [PATCH 1/5] Draft PR for plugin based approach for custom evaluator for scaling metric evaluation --- .../flink/autoscaler/JobAutoScalerImpl.java | 33 ++- .../autoscaler/ScalingMetricEvaluator.java | 98 ++++++- .../autoscaler/config/AutoScalerOptions.java | 18 ++ .../autoscaler/metrics/CustomEvaluator.java | 111 ++++++++ .../metrics/CustomEvaluatorOptions.java | 36 +++ .../AutoScalerCustomEvaluatorTest.java | 185 +++++++++++++ .../autoscaler/BacklogBasedScalingTest.java | 4 +- .../DelayedScaleDownEndToEndTest.java | 3 +- .../autoscaler/JobAutoScalerImplTest.java | 161 ++++++++++- .../MetricsCollectionAndEvaluationTest.java | 9 +- .../RecommendedParallelismTest.java | 4 +- .../ScalingMetricEvaluatorTest.java | 261 +++++++++++++++++- .../metrics/TestCustomEvaluator.java | 46 +++ .../kubernetes/operator/FlinkOperator.java | 6 +- .../autoscaler/AutoscalerFactory.java | 8 +- .../operator/autoscaler/AutoscalerUtils.java | 65 +++++ .../test/assembly/test-plugins-assembly.xml | 3 +- .../autoscaler/AutoscalerFactoryTest.java | 5 +- .../autoscaler/AutoscalerUtilsTest.java | 66 +++++ .../TestingFlinkDeploymentController.java | 3 +- ...e.flink.autoscaler.metrics.CustomEvaluator | 19 ++ 21 files changed, 1112 insertions(+), 32 deletions(-) create mode 100644 flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CustomEvaluator.java create mode 100644 flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CustomEvaluatorOptions.java create mode 100644 flink-autoscaler/src/test/java/org/apache/flink/autoscaler/AutoScalerCustomEvaluatorTest.java create mode 100644 flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/TestCustomEvaluator.java create mode 100644 flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerUtils.java create mode 100644 flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerUtilsTest.java create mode 100644 flink-kubernetes-operator/src/test/resources/META-INF/services/org.apache.flink.autoscaler.metrics.CustomEvaluator diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java index d8ee54abfd..12cc9b4ee4 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java @@ -19,14 +19,17 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.autoscaler.config.AutoScalerOptions; import org.apache.flink.autoscaler.event.AutoScalerEventHandler; import org.apache.flink.autoscaler.exceptions.NotReadyException; import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics; +import org.apache.flink.autoscaler.metrics.CustomEvaluator; import org.apache.flink.autoscaler.metrics.EvaluatedMetrics; import org.apache.flink.autoscaler.realizer.ScalingRealizer; import org.apache.flink.autoscaler.state.AutoScalerStateStore; import org.apache.flink.autoscaler.tuning.ConfigChanges; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.util.Preconditions; @@ -36,11 +39,14 @@ import java.time.Clock; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import static org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.CUSTOM_EVALUATOR_NAME; import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.initRecommendedParallelism; import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.resetRecommendedParallelism; +import static org.apache.flink.autoscaler.metrics.CustomEvaluatorOptions.CUSTOM_EVALUATOR_CLASS; import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.getTrimmedScalingHistory; import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.getTrimmedScalingTracking; @@ -58,6 +64,7 @@ public class JobAutoScalerImpl> private final AutoScalerEventHandler eventHandler; private final ScalingRealizer scalingRealizer; private final AutoScalerStateStore stateStore; + private final Map customEvaluators; private Clock clock = Clock.systemDefaultZone(); @@ -73,13 +80,15 @@ public JobAutoScalerImpl( ScalingExecutor scalingExecutor, AutoScalerEventHandler eventHandler, ScalingRealizer scalingRealizer, - AutoScalerStateStore stateStore) { + AutoScalerStateStore stateStore, + Map customEvaluators) { this.metricsCollector = metricsCollector; this.evaluator = evaluator; this.scalingExecutor = scalingExecutor; this.eventHandler = eventHandler; this.scalingRealizer = scalingRealizer; this.stateStore = stateStore; + this.customEvaluators = customEvaluators; } @Override @@ -203,8 +212,15 @@ private void runScalingLogic(Context ctx, AutoscalerFlinkMetrics autoscalerMetri // Scaling tracking data contains previous restart times that are taken into account var restartTime = scalingTracking.getMaxRestartTimeOrDefault(ctx.getConfiguration()); + + var customEvaluatorWithConfig = getCustomEvaluatorIfRequired(ctx.getConfiguration()); + var evaluatedMetrics = - evaluator.evaluate(ctx.getConfiguration(), collectedMetrics, restartTime); + evaluator.evaluate( + ctx.getConfiguration(), + collectedMetrics, + restartTime, + customEvaluatorWithConfig); LOG.debug("Evaluated metrics: {}", evaluatedMetrics); lastEvaluatedMetrics.put(ctx.getJobKey(), evaluatedMetrics); @@ -259,4 +275,17 @@ void setClock(Clock clock) { this.metricsCollector.setClock(clock); this.scalingExecutor.setClock(clock); } + + @VisibleForTesting + protected Tuple2 getCustomEvaluatorIfRequired( + Configuration conf) { + var customEvaluatorName = conf.get(CUSTOM_EVALUATOR_NAME); + var customEvaluatorConfig = AutoScalerOptions.forCustomEvaluator(conf, customEvaluatorName); + CustomEvaluator evaluator = + Optional.ofNullable(customEvaluatorConfig.get(CUSTOM_EVALUATOR_CLASS)) + .map(this.customEvaluators::get) + .orElse(null); + + return evaluator != null ? new Tuple2<>(evaluator, customEvaluatorConfig) : null; + } } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java index 58c5dbe4a4..a24a86ab3d 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java @@ -18,9 +18,11 @@ package org.apache.flink.autoscaler; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.autoscaler.config.AutoScalerOptions; import org.apache.flink.autoscaler.metrics.CollectedMetricHistory; import org.apache.flink.autoscaler.metrics.CollectedMetrics; +import org.apache.flink.autoscaler.metrics.CustomEvaluator; import org.apache.flink.autoscaler.metrics.EvaluatedMetrics; import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; import org.apache.flink.autoscaler.metrics.MetricAggregator; @@ -28,6 +30,7 @@ import org.apache.flink.autoscaler.topology.JobTopology; import org.apache.flink.autoscaler.utils.AutoScalerUtils; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.slf4j.Logger; @@ -38,6 +41,7 @@ import java.time.Duration; import java.time.Instant; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -72,7 +76,10 @@ public class ScalingMetricEvaluator { private static final Logger LOG = LoggerFactory.getLogger(ScalingMetricEvaluator.class); public EvaluatedMetrics evaluate( - Configuration conf, CollectedMetricHistory collectedMetrics, Duration restartTime) { + Configuration conf, + CollectedMetricHistory collectedMetrics, + Duration restartTime, + @Nullable Tuple2 customEvaluatorWithConfig) { LOG.debug("Restart time used in metrics evaluation: {}", restartTime); var scalingOutput = new HashMap>(); var metricsHistory = collectedMetrics.getMetricHistory(); @@ -80,6 +87,23 @@ public EvaluatedMetrics evaluate( boolean processingBacklog = isProcessingBacklog(topology, metricsHistory, conf); + var customEvaluationSession = + Optional.ofNullable(customEvaluatorWithConfig) + .map( + info -> + Tuple2.of( + info.f0, + new CustomEvaluator.Context( + new UnmodifiableConfiguration(conf), + Collections.unmodifiableSortedMap( + metricsHistory), + Collections.unmodifiableMap(scalingOutput), + topology, + processingBacklog, + restartTime, + info.f1))) + .orElse(null); + for (var vertex : topology.getVerticesInTopologicalOrder()) { scalingOutput.put( vertex, @@ -90,7 +114,8 @@ public EvaluatedMetrics evaluate( topology, vertex, processingBacklog, - restartTime)); + restartTime, + customEvaluationSession)); } var globalMetrics = evaluateGlobalMetrics(metricsHistory); @@ -132,7 +157,8 @@ private Map evaluateMetrics( JobTopology topology, JobVertexID vertex, boolean processingBacklog, - Duration restartTime) { + Duration restartTime, + @Nullable Tuple2 customEvaluationSession) { var latestVertexMetrics = metricsHistory.get(metricsHistory.lastKey()).getVertexMetrics().get(vertex); @@ -142,6 +168,7 @@ private Map evaluateMetrics( double inputRateAvg = getRate(ScalingMetric.NUM_RECORDS_IN, vertex, metricsHistory); var evaluatedMetrics = new HashMap(); + computeTargetDataRate( topology, vertex, @@ -175,6 +202,24 @@ private Map evaluateMetrics( EvaluatedScalingMetric.of(vertexInfo.getNumSourcePartitions())); computeProcessingRateThresholds(evaluatedMetrics, conf, processingBacklog, restartTime); + + Optional.ofNullable(customEvaluationSession) + .map( + session -> + runCustomEvaluator( + vertex, + Collections.unmodifiableMap(evaluatedMetrics), + session)) + .filter(customEvaluatedMetrics -> !customEvaluatedMetrics.isEmpty()) + .ifPresent( + customEvaluatedMetrics -> { + LOG.info( + "Merging custom evaluated metrics for vertex {}: {}", + vertex, + customEvaluatedMetrics); + mergeEvaluatedMetricsMaps(evaluatedMetrics, customEvaluatedMetrics); + }); + return evaluatedMetrics; } @@ -585,4 +630,51 @@ protected static double computeEdgeDataRate( to); return getRate(ScalingMetric.NUM_RECORDS_OUT, from, metricsHistory); } + + @VisibleForTesting + protected static Map runCustomEvaluator( + JobVertexID vertex, + Map evaluatedMetrics, + Tuple2 customEvaluationSession) { + try { + return customEvaluationSession.f0.evaluateVertexMetrics( + vertex, evaluatedMetrics, customEvaluationSession.f1); + } catch (UnsupportedOperationException e) { + LOG.warn( + "Custom evaluator {} tried accessing an un-modifiable view.", + customEvaluationSession.f0.getClass(), + e); + } catch (Exception e) { + LOG.warn( + "Custom evaluator {} threw an exception.", + customEvaluationSession.f0.getClass(), + e); + } + + return Collections.emptyMap(); + } + + @VisibleForTesting + protected static void mergeEvaluatedMetricsMaps( + Map actual, + @Nullable Map incoming) { + Optional.ofNullable(incoming) + .ifPresent( + customEvaluatedMetric -> + customEvaluatedMetric.forEach( + (scalingMetric, evaluatedScalingMetric) -> + actual.merge( + scalingMetric, + evaluatedScalingMetric, + ScalingMetricEvaluator + ::mergeEvaluatedScalingMetric))); + } + + @VisibleForTesting + protected static EvaluatedScalingMetric mergeEvaluatedScalingMetric( + EvaluatedScalingMetric actual, EvaluatedScalingMetric incoming) { + return new EvaluatedScalingMetric( + !Double.isNaN(incoming.getCurrent()) ? incoming.getCurrent() : actual.getCurrent(), + !Double.isNaN(incoming.getAverage()) ? incoming.getAverage() : actual.getAverage()); + } } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java index 980db2f4cc..5bcdb0cc20 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java @@ -21,6 +21,8 @@ import org.apache.flink.autoscaler.metrics.MetricAggregator; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DelegatingConfiguration; import org.apache.flink.configuration.MemorySize; import java.time.Duration; @@ -31,6 +33,7 @@ public class AutoScalerOptions { public static final String OLD_K8S_OP_CONF_PREFIX = "kubernetes.operator."; public static final String AUTOSCALER_CONF_PREFIX = "job.autoscaler."; + public static final String CUSTOM_EVALUATOR_CONF_PREFIX = "metrics.custom-evaluator."; private static String oldOperatorConfigKey(String key) { return OLD_K8S_OP_CONF_PREFIX + AUTOSCALER_CONF_PREFIX + key; @@ -382,4 +385,19 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { "scaling.key-group.partitions.adjust.mode")) .withDescription( "How to adjust the parallelism of Source vertex or upstream shuffle is keyBy"); + + public static final ConfigOption CUSTOM_EVALUATOR_NAME = + autoScalerConfig(CUSTOM_EVALUATOR_CONF_PREFIX + "name") + .stringType() + .defaultValue(null) + .withFallbackKeys(oldOperatorConfigKey(CUSTOM_EVALUATOR_CONF_PREFIX + "name")) + .withDescription("Name of the custom evaluator to be used."); + + public static Configuration forCustomEvaluator( + Configuration configuration, String customEvaluatorName) { + + return new DelegatingConfiguration( + configuration, + AUTOSCALER_CONF_PREFIX + CUSTOM_EVALUATOR_CONF_PREFIX + customEvaluatorName + "."); + } } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CustomEvaluator.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CustomEvaluator.java new file mode 100644 index 0000000000..2fb7f6a204 --- /dev/null +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CustomEvaluator.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.metrics; + +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.plugin.Plugin; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import lombok.Getter; + +import java.time.Duration; +import java.time.Instant; +import java.util.Map; +import java.util.SortedMap; + +/** + * Interface for custom evaluators that allow custom scaling metric evaluations. Implementations of + * this interface can provide custom logic to evaluate vertex metrics and merge them with internally + * evaluated metrics. + */ +public interface CustomEvaluator extends Plugin { + + /** + * Evaluates scaling metrics for a given job vertex based on the internally evaluated metrics + * and context. + * + * @param vertex The {@link JobVertexID} identifying the vertex whose metrics are being + * evaluated. + * @param evaluatedMetrics An un-modifiable view of current vertex internally evaluated metrics. + * @param evaluationContext The evaluation context providing job-related configurations and + * historical metrics. + * @return A map of evaluated scaling metrics for the vertex which would get merged with + * internally evaluated metrics. + * @throws UnsupportedOperationException if an attempt is made to modify the {@code + * evaluatedMetrics}, {@code Context.jobConf}, {@code Context.metricsHistory}, {@code + * Context.evaluatedVertexMetrics}. + */ + Map evaluateVertexMetrics( + JobVertexID vertex, + Map evaluatedMetrics, + Context evaluationContext); + + /** + * Context providing relevant job and metric information to assist in custom metric evaluation. + */ + @Getter + class Context { + private final Configuration jobConf; + + private final SortedMap metricsHistory; + + private final Map> + evaluatedVertexMetrics; + + private final JobTopology topology; + + private final boolean processingBacklog; + + private final Duration restartTime; + + private final Configuration customEvaluatorConf; + + /** + * Constructs a new {@link Context} instance. + * + * @param jobConf An un-modifiable view of job's configuration. + * @param metricsHistory An un-modifiable view of historical record of collected metrics, + * ordered by timestamp. + * @param evaluatedVertexMetrics This map contains an un-modifiable view of evaluated + * metrics for previously evaluated vertex. Note: evaluation of Vertex for scaling + * metrics happens topologically. + * @param topology The job topology representing the structure of the Flink job. + * @param processingBacklog Indicates whether the job is processing backlog. + * @param restartTime Maximum restart time based on scaling records. + * @param customEvaluatorConf The configuration associated with the custom evaluator. + */ + public Context( + Configuration jobConf, + SortedMap metricsHistory, + Map> evaluatedVertexMetrics, + JobTopology topology, + boolean processingBacklog, + Duration restartTime, + Configuration customEvaluatorConf) { + this.jobConf = jobConf; + this.metricsHistory = metricsHistory; + this.evaluatedVertexMetrics = evaluatedVertexMetrics; + this.topology = topology; + this.processingBacklog = processingBacklog; + this.restartTime = restartTime; + this.customEvaluatorConf = customEvaluatorConf; + } + } +} diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CustomEvaluatorOptions.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CustomEvaluatorOptions.java new file mode 100644 index 0000000000..f6ddd65901 --- /dev/null +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CustomEvaluatorOptions.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.metrics; + +import org.apache.flink.configuration.ConfigOption; + +import static org.apache.flink.autoscaler.config.AutoScalerOptions.CUSTOM_EVALUATOR_NAME; +import static org.apache.flink.configuration.ConfigOptions.key; + +/** Defines configuration options for {@link CustomEvaluator}. */ +public class CustomEvaluatorOptions { + public static final ConfigOption CUSTOM_EVALUATOR_CLASS = + key("class") + .stringType() + .noDefaultValue() + .withDescription( + String.format( + "Class name of the custom evaluator to be used. Required to be passed if %s is set.", + CUSTOM_EVALUATOR_NAME)); +} diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/AutoScalerCustomEvaluatorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/AutoScalerCustomEvaluatorTest.java new file mode 100644 index 0000000000..5a86385887 --- /dev/null +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/AutoScalerCustomEvaluatorTest.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.event.TestingEventCollector; +import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics; +import org.apache.flink.autoscaler.metrics.CustomEvaluator; +import org.apache.flink.autoscaler.metrics.TestCustomEvaluator; +import org.apache.flink.autoscaler.metrics.TestMetrics; +import org.apache.flink.autoscaler.realizer.TestingScalingRealizer; +import org.apache.flink.autoscaler.state.AutoScalerStateStore; +import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore; +import org.apache.flink.autoscaler.topology.IOMetrics; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.topology.VertexInfo; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import java.util.Map; + +import static org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext; +import static org.apache.flink.autoscaler.topology.ShipStrategy.REBALANCE; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** Unit test for testing the integration of custom evaluators with the Flink job autoscaler. */ +public class AutoScalerCustomEvaluatorTest { + private JobAutoScalerContext context; + private AutoScalerStateStore> stateStore; + + private TestingMetricsCollector> metricsCollector; + + private JobVertexID source1, sink; + + private JobAutoScalerImpl> autoscaler; + + @BeforeEach + public void setup() { + context = createDefaultJobAutoScalerContext(); + + TestingEventCollector> eventCollector = + new TestingEventCollector<>(); + stateStore = new InMemoryAutoScalerStateStore<>(); + + ScalingExecutor> scalingExecutor = + new ScalingExecutor<>(eventCollector, stateStore); + String testCustomEvaluatorName = "testEvaluator"; + String testCustomEvaluatorClassName = TestCustomEvaluator.class.getName(); + + var customEvaluators = createTestCustomEvaluator(); + + source1 = new JobVertexID(); + sink = new JobVertexID(); + + metricsCollector = + new TestingMetricsCollector<>( + new JobTopology( + new VertexInfo(source1, Map.of(), 1, 720, new IOMetrics(0, 0, 0)), + new VertexInfo( + sink, + Map.of(source1, REBALANCE), + 1, + 720, + new IOMetrics(0, 0, 0)))); + + var defaultConf = context.getConfiguration(); + defaultConf.set(AutoScalerOptions.AUTOSCALER_ENABLED, true); + defaultConf.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO); + defaultConf.set(AutoScalerOptions.RESTART_TIME, Duration.ofSeconds(1)); + defaultConf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(2)); + defaultConf.set(AutoScalerOptions.SCALING_ENABLED, true); + defaultConf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.); + defaultConf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) Integer.MAX_VALUE); + defaultConf.set(AutoScalerOptions.UTILIZATION_TARGET, 0.8); + defaultConf.set(AutoScalerOptions.UTILIZATION_MAX, 0.9); + defaultConf.set(AutoScalerOptions.UTILIZATION_MIN, 0.7); + defaultConf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO); + defaultConf.set(AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD, Duration.ofSeconds(1)); + + defaultConf.set(AutoScalerOptions.CUSTOM_EVALUATOR_NAME, testCustomEvaluatorName); + + defaultConf.set( + ConfigOptions.key( + AutoScalerOptions.AUTOSCALER_CONF_PREFIX + + AutoScalerOptions.CUSTOM_EVALUATOR_CONF_PREFIX + + testCustomEvaluatorName + + ".class") + .stringType() + .noDefaultValue(), + testCustomEvaluatorClassName); + + autoscaler = + new JobAutoScalerImpl<>( + metricsCollector, + new ScalingMetricEvaluator(), + scalingExecutor, + eventCollector, + new TestingScalingRealizer<>(), + stateStore, + customEvaluators); + + // Reset custom window size to default + metricsCollector.setTestMetricWindowSize(null); + } + + @Test + public void test() throws Exception { + /* Test scaling up. */ + var now = Instant.ofEpochMilli(0); + setClocksTo(now); + metricsCollector.setJobUpdateTs(now); + // Adjust metric window size, so we can fill the metric window with two metrics + metricsCollector.setTestMetricWindowSize(Duration.ofSeconds(1)); + metricsCollector.updateMetrics( + source1, + TestMetrics.builder() + .numRecordsIn(0) + .numRecordsOut(0) + .numRecordsInPerSec(500.) + .maxBusyTimePerSec(8) + .pendingRecords(0L) + .build()); + metricsCollector.updateMetrics( + sink, TestMetrics.builder().numRecordsIn(0).maxBusyTimePerSec(850).build()); + + autoscaler.scale(context); + + now = now.plus(Duration.ofSeconds(1)); + setClocksTo(now); + + metricsCollector.updateMetrics( + source1, m -> m.setNumRecordsIn(500), m -> m.setNumRecordsOut(500)); + metricsCollector.updateMetrics(sink, m -> m.setNumRecordsIn(500)); + + autoscaler.scale(context); + + var scaledParallelism = ScalingExecutorTest.getScaledParallelism(stateStore, context); + assertEquals(3, scaledParallelism.get(source1)); + assertEquals(200, scaledParallelism.get(sink)); + assertFlinkMetricsCount(1, 0); + } + + private void setClocksTo(Instant time) { + var clock = Clock.fixed(time, ZoneId.systemDefault()); + autoscaler.setClock(clock); + } + + private void assertFlinkMetricsCount(int scalingCount, int balancedCount) { + AutoscalerFlinkMetrics autoscalerFlinkMetrics = + autoscaler.flinkMetrics.get(context.getJobKey()); + assertEquals(scalingCount, autoscalerFlinkMetrics.getNumScalingsCount()); + assertEquals(balancedCount, autoscalerFlinkMetrics.getNumBalancedCount()); + } + + private Map createTestCustomEvaluator() { + var testCustomEvaluator = new TestCustomEvaluator(); + testCustomEvaluator.configure(new Configuration()); + return Map.of(testCustomEvaluator.getClass().getName(), testCustomEvaluator); + } +} diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java index df950bb4c0..956ecf32d8 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java @@ -40,6 +40,7 @@ import java.time.Duration; import java.time.Instant; import java.time.ZoneId; +import java.util.Collections; import java.util.Map; import java.util.SortedMap; @@ -108,7 +109,8 @@ public void setup() { scalingExecutor, eventCollector, new TestingScalingRealizer<>(), - stateStore); + stateStore, + Collections.emptyMap()); // Reset custom window size to default metricsCollector.setTestMetricWindowSize(null); diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownEndToEndTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownEndToEndTest.java index 420d9df3a0..9eca91295e 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownEndToEndTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/DelayedScaleDownEndToEndTest.java @@ -122,7 +122,8 @@ public void setup() throws Exception { new ScalingExecutor<>(eventCollector, stateStore), eventCollector, scalingRealizer, - stateStore); + stateStore, + Collections.emptyMap()); // initially the last evaluated metrics are empty assertThat(autoscaler.lastEvaluatedMetrics.get(context.getJobKey())).isNull(); diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java index 552971d7a5..c741ce989a 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java @@ -23,7 +23,9 @@ import org.apache.flink.autoscaler.exceptions.NotReadyException; import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics; import org.apache.flink.autoscaler.metrics.CollectedMetrics; +import org.apache.flink.autoscaler.metrics.CustomEvaluator; import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.metrics.TestCustomEvaluator; import org.apache.flink.autoscaler.metrics.TestMetrics; import org.apache.flink.autoscaler.realizer.ScalingRealizer; import org.apache.flink.autoscaler.realizer.TestingScalingRealizer; @@ -32,6 +34,8 @@ import org.apache.flink.autoscaler.topology.VertexInfo; import org.apache.flink.autoscaler.tuning.ConfigChanges; import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.metrics.Gauge; @@ -53,6 +57,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import static java.util.Map.entry; @@ -63,6 +68,9 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; /** Tests for JobAutoScalerImpl. */ @@ -111,7 +119,8 @@ void testMetricReporting() throws Exception { scalingExecutor, eventCollector, scalingRealizer, - stateStore); + stateStore, + Collections.emptyMap()); autoscaler.scale(context); @@ -147,7 +156,13 @@ private static double getGaugeValue( void testErrorReporting() throws Exception { var autoscaler = new JobAutoScalerImpl<>( - null, null, null, eventCollector, scalingRealizer, stateStore); + null, + null, + null, + eventCollector, + scalingRealizer, + stateStore, + Collections.emptyMap()); autoscaler.scale(context); Assertions.assertEquals( @@ -182,7 +197,8 @@ protected Collection queryAggregatedMetricNames( null, eventCollector, scalingRealizer, - stateStore); + stateStore, + Collections.emptyMap()); // Should not produce an error autoscaler.scale(context); @@ -219,7 +235,8 @@ public void realizeParallelismOverrides( null, eventCollector, realizeParallelismOverridesWithExceptionsScalingRealizer, - stateStore); + stateStore, + Collections.emptyMap()); // Should produce an error autoscaler.scale(context); @@ -236,7 +253,8 @@ void testParallelismOverrides() throws Exception { null, eventCollector, scalingRealizer, - stateStore); + stateStore, + Collections.emptyMap()); // Initially we should return empty overrides, do not crate any state assertThat(autoscaler.getParallelismOverrides(context)).isEmpty(); @@ -304,7 +322,13 @@ public void testApplyAutoscalerParallelism() throws Exception { var overrides = new HashMap(); var autoscaler = new JobAutoScalerImpl<>( - null, null, null, eventCollector, scalingRealizer, stateStore) { + null, + null, + null, + eventCollector, + scalingRealizer, + stateStore, + Collections.emptyMap()) { public Map getParallelismOverrides( JobAutoScalerContext ctx) { return new HashMap<>(overrides); @@ -354,7 +378,13 @@ void testApplyConfigOverrides() throws Exception { context.getConfiguration().set(AutoScalerOptions.MEMORY_TUNING_ENABLED, true); var autoscaler = new JobAutoScalerImpl<>( - null, null, null, eventCollector, scalingRealizer, stateStore); + null, + null, + null, + eventCollector, + scalingRealizer, + stateStore, + Collections.emptyMap()); // Initially we should return empty overrides, do not crate any state assertThat(stateStore.getConfigChanges(context).getOverrides()).isEmpty(); @@ -402,7 +432,13 @@ void testAutoscalerDisabled() throws Exception { var autoscaler = new JobAutoScalerImpl<>( - null, null, null, eventCollector, scalingRealizer, stateStore); + null, + null, + null, + eventCollector, + scalingRealizer, + stateStore, + Collections.emptyMap()); autoscaler.scale(context); assertTrue(stateStore.getScalingHistory(context).isEmpty()); @@ -427,4 +463,113 @@ private void assertParallelismOverrides(Map expectedOverrides) { private TestingScalingRealizer.Event> getEvent() { return scalingRealizer.events.poll(); } + + @Test + void testGetCustomEvaluatorIfRequiredWithCustomEvaluator() { + var topology = new JobTopology(); + + CustomEvaluator testCustomEvaluator = new TestCustomEvaluator(); + testCustomEvaluator.configure(new Configuration()); + var testCustomEvaluators = + Map.of(testCustomEvaluator.getClass().getName(), testCustomEvaluator); + + String testCustomEvaluatorName = "testCustomEvaluator"; + String testCustomEvaluatorClassName = TestCustomEvaluator.class.getName(); + + var defaultConf = context.getConfiguration(); + defaultConf.set(AutoScalerOptions.CUSTOM_EVALUATOR_NAME, testCustomEvaluatorName); + + defaultConf.set( + ConfigOptions.key( + AutoScalerOptions.AUTOSCALER_CONF_PREFIX + + AutoScalerOptions.CUSTOM_EVALUATOR_CONF_PREFIX + + testCustomEvaluatorName + + ".class") + .stringType() + .noDefaultValue(), + testCustomEvaluatorClassName); + + defaultConf.set( + ConfigOptions.key( + AutoScalerOptions.AUTOSCALER_CONF_PREFIX + + AutoScalerOptions.CUSTOM_EVALUATOR_CONF_PREFIX + + testCustomEvaluatorName + + ".k1") + .stringType() + .noDefaultValue(), + "v1"); + + defaultConf.set( + ConfigOptions.key( + AutoScalerOptions.AUTOSCALER_CONF_PREFIX + + AutoScalerOptions.CUSTOM_EVALUATOR_CONF_PREFIX + + testCustomEvaluatorName + + ".k2") + .stringType() + .noDefaultValue(), + "v2"); + + var autoscaler = + new JobAutoScalerImpl<>( + null, + null, + null, + eventCollector, + scalingRealizer, + stateStore, + testCustomEvaluators); + + var customEvaluatorWithConfig = + autoscaler.getCustomEvaluatorIfRequired(context.getConfiguration()); + assertNotNull(customEvaluatorWithConfig); + assertInstanceOf(CustomEvaluator.class, customEvaluatorWithConfig.f0); + var customEvaluatorConfig = customEvaluatorWithConfig.f1; + assertNotNull(customEvaluatorConfig); + int expectedKeyCount = 3; + assertEquals(expectedKeyCount, customEvaluatorConfig.keySet().size()); + + Set expectedKeys = Set.of("class", "k1", "k2"); + assertTrue(customEvaluatorConfig.keySet().containsAll(expectedKeys)); + } + + @Test + void testGetCustomEvaluatorIfRequiredWithoutCustomEvaluator() { + var topology = new JobTopology(); + + var autoscaler = + new JobAutoScalerImpl<>( + null, + null, + null, + eventCollector, + scalingRealizer, + stateStore, + Collections.emptyMap()); + + var customEvaluatorWithConfig = + autoscaler.getCustomEvaluatorIfRequired(context.getConfiguration()); + assertNull(customEvaluatorWithConfig); + } + + @Test + void testGetCustomEvaluatorIfRequiredWithCustomEvaluatorButNotConfigured() { + CustomEvaluator testCustomEvaluator = new TestCustomEvaluator(); + testCustomEvaluator.configure(new Configuration()); + var testCustomEvaluators = + Map.of(testCustomEvaluator.getClass().getName(), testCustomEvaluator); + + var autoscaler = + new JobAutoScalerImpl<>( + null, + null, + null, + eventCollector, + scalingRealizer, + stateStore, + testCustomEvaluators); + + var customEvaluatorWithConfig = + autoscaler.getCustomEvaluatorIfRequired(context.getConfiguration()); + assertNull(customEvaluatorWithConfig); + } } diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java index 1176075f9c..7b9841c8dc 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java @@ -181,7 +181,7 @@ public void testEndToEnd() throws Exception { assertEquals(3, collectedMetrics.getMetricHistory().size()); assertTrue(collectedMetrics.isFullyCollected()); - var evaluation = evaluator.evaluate(conf, collectedMetrics, restartTime); + var evaluation = evaluator.evaluate(conf, collectedMetrics, restartTime, null); scalingExecutor.scaleResource( context, evaluation, @@ -387,7 +387,7 @@ public void testTolerateAbsenceOfPendingRecordsMetric() throws Exception { var collectedMetrics = metricsCollector.updateMetrics(context, stateStore); var evaluation = - evaluator.evaluate(context.getConfiguration(), collectedMetrics, restartTime); + evaluator.evaluate(context.getConfiguration(), collectedMetrics, restartTime, null); assertEquals( 500., evaluation @@ -647,7 +647,7 @@ public void testScaleDownWithZeroProcessingRate() throws Exception { var collectedMetrics = collectMetrics(); var evaluation = - evaluator.evaluate(context.getConfiguration(), collectedMetrics, restartTime); + evaluator.evaluate(context.getConfiguration(), collectedMetrics, restartTime, null); assertEquals( 0, evaluation @@ -698,7 +698,8 @@ public void testScaleDownWithZeroProcessingRate() throws Exception { .getMetricHistory() .put(Instant.ofEpochSecond(1234), new CollectedMetrics(newMetrics, Map.of())); - evaluation = evaluator.evaluate(context.getConfiguration(), collectedMetrics, restartTime); + evaluation = + evaluator.evaluate(context.getConfiguration(), collectedMetrics, restartTime, null); assertEquals( 3., evaluation diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java index f34aa1fc09..2902d25fae 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java @@ -37,6 +37,7 @@ import java.time.Duration; import java.time.Instant; import java.time.ZoneId; +import java.util.Collections; import java.util.Map; import static org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext; @@ -98,7 +99,8 @@ public void setup() { new ScalingExecutor<>(eventCollector, stateStore), eventCollector, new TestingScalingRealizer<>(), - stateStore); + stateStore, + Collections.emptyMap()); // Reset custom window size to default metricsCollector.setTestMetricWindowSize(null); diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java index 03e8fce587..9e6a2709ec 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java @@ -21,12 +21,15 @@ import org.apache.flink.autoscaler.config.AutoScalerOptions; import org.apache.flink.autoscaler.metrics.CollectedMetricHistory; import org.apache.flink.autoscaler.metrics.CollectedMetrics; +import org.apache.flink.autoscaler.metrics.CustomEvaluator; import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; import org.apache.flink.autoscaler.metrics.MetricAggregator; import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.metrics.TestCustomEvaluator; import org.apache.flink.autoscaler.topology.JobTopology; import org.apache.flink.autoscaler.topology.VertexInfo; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.junit.jupiter.api.Test; @@ -112,7 +115,8 @@ public void testLagBasedSourceScaling() { .evaluate( conf, new CollectedMetricHistory(topology, metricHistory, Instant.now()), - Duration.ZERO) + Duration.ZERO, + null) .getVertexMetrics(); assertEquals( @@ -146,7 +150,8 @@ public void testLagBasedSourceScaling() { .evaluate( conf, new CollectedMetricHistory(topology, metricHistory, Instant.now()), - Duration.ZERO) + Duration.ZERO, + null) .getVertexMetrics(); assertEquals( EvaluatedScalingMetric.avg(150), @@ -169,7 +174,8 @@ public void testLagBasedSourceScaling() { .evaluate( conf, new CollectedMetricHistory(topology, metricHistory, Instant.now()), - Duration.ZERO) + Duration.ZERO, + null) .getVertexMetrics(); assertEquals( EvaluatedScalingMetric.avg(150), @@ -191,7 +197,8 @@ public void testLagBasedSourceScaling() { .evaluate( conf, new CollectedMetricHistory(topology, metricHistory, Instant.now()), - Duration.ZERO) + Duration.ZERO, + null) .getVertexMetrics(); assertEquals( EvaluatedScalingMetric.avg(150), @@ -250,7 +257,8 @@ public void testLagBasedSourceScaling() { .evaluate( conf, new CollectedMetricHistory(topology, metricHistory, Instant.now()), - Duration.ZERO) + Duration.ZERO, + null) .getVertexMetrics(); assertEquals( EvaluatedScalingMetric.avg(100), @@ -260,6 +268,95 @@ public void testLagBasedSourceScaling() { evaluatedMetrics.get(sink).get(ScalingMetric.TARGET_DATA_RATE)); } + @Test + public void testEvaluateWithCustomEvaluator() { + var source = new JobVertexID(); + var sink = new JobVertexID(); + + var topology = + new JobTopology( + new VertexInfo(source, Collections.emptyMap(), 1, 1, null), + new VertexInfo(sink, Map.of(source, REBALANCE), 1, 1, null)); + + var metricHistory = new TreeMap(); + + metricHistory.put( + Instant.ofEpochMilli(1000), + new CollectedMetrics( + Map.of( + source, + Map.of( + ScalingMetric.LAG, + 0., + ScalingMetric.NUM_RECORDS_IN, + 0., + ScalingMetric.NUM_RECORDS_OUT, + 0., + ScalingMetric.LOAD, + .1), + sink, + Map.of(ScalingMetric.NUM_RECORDS_IN, 0., ScalingMetric.LOAD, .1)), + Map.of())); + + metricHistory.put( + Instant.ofEpochMilli(2000), + new CollectedMetrics( + Map.of( + source, + Map.of( + ScalingMetric.LAG, + 0., + ScalingMetric.NUM_RECORDS_IN, + 100., + ScalingMetric.NUM_RECORDS_OUT, + 200., + ScalingMetric.LOAD, + .4), + sink, + Map.of(ScalingMetric.NUM_RECORDS_IN, 200., ScalingMetric.LOAD, .2)), + Map.of())); + + var conf = new Configuration(); + + conf.set(CATCH_UP_DURATION, Duration.ofSeconds(2)); + CustomEvaluator customEvaluator = new TestCustomEvaluator(); + var customEvaluatorWithConfig = new Tuple2<>(customEvaluator, new Configuration()); + + var evaluatedMetrics = + evaluator + .evaluate( + conf, + new CollectedMetricHistory(topology, metricHistory, Instant.now()), + Duration.ZERO, + customEvaluatorWithConfig) + .getVertexMetrics(); + + assertEquals( + EvaluatedScalingMetric.avg(.25), + evaluatedMetrics.get(source).get(ScalingMetric.LOAD)); + + assertEquals( + EvaluatedScalingMetric.avg(.15), + evaluatedMetrics.get(sink).get(ScalingMetric.LOAD)); + + assertEquals( + EvaluatedScalingMetric.avg(100000.0), + evaluatedMetrics.get(source).get(ScalingMetric.TARGET_DATA_RATE)); + assertEquals( + EvaluatedScalingMetric.of(0.0), + evaluatedMetrics.get(source).get(ScalingMetric.CATCH_UP_DATA_RATE)); + assertEquals( + EvaluatedScalingMetric.avg(200000.0), + evaluatedMetrics.get(sink).get(ScalingMetric.TARGET_DATA_RATE)); + assertEquals( + EvaluatedScalingMetric.of(0.0), + evaluatedMetrics.get(sink).get(ScalingMetric.CATCH_UP_DATA_RATE)); + assertEquals( + EvaluatedScalingMetric.of(0.0), + evaluatedMetrics.get(source).get(ScalingMetric.LAG)); + assertFalse(evaluatedMetrics.get(sink).containsKey(ScalingMetric.LAG)); + } + @Test public void testUtilizationBoundaryComputation() { @@ -838,4 +935,158 @@ private Tuple2 getThresholds( map.get(ScalingMetric.SCALE_UP_RATE_THRESHOLD).getCurrent(), map.get(ScalingMetric.SCALE_DOWN_RATE_THRESHOLD).getCurrent()); } + + @Test + public void testRunCustomEvaluator() { + var source = new JobVertexID(); + var sink = new JobVertexID(); + + var topology = + new JobTopology( + new VertexInfo(source, Collections.emptyMap(), 1, 1, null), + new VertexInfo(sink, Map.of(source, REBALANCE), 1, 1, null)); + + var metricHistory = new TreeMap(); + + metricHistory.put( + Instant.ofEpochMilli(1000), + new CollectedMetrics( + Map.of( + source, + Map.of( + ScalingMetric.LAG, + 0., + ScalingMetric.NUM_RECORDS_IN, + 0., + ScalingMetric.NUM_RECORDS_OUT, + 0., + ScalingMetric.LOAD, + .1), + sink, + Map.of(ScalingMetric.NUM_RECORDS_IN, 0., ScalingMetric.LOAD, .1)), + Map.of())); + + metricHistory.put( + Instant.ofEpochMilli(2000), + new CollectedMetrics( + Map.of( + source, + Map.of( + ScalingMetric.LAG, + 0., + ScalingMetric.NUM_RECORDS_IN, + 100., + ScalingMetric.NUM_RECORDS_OUT, + 200., + ScalingMetric.LOAD, + .4), + sink, + Map.of(ScalingMetric.NUM_RECORDS_IN, 200., ScalingMetric.LOAD, .2)), + Map.of())); + + var conf = new Configuration(); + CustomEvaluator customEvaluator = new TestCustomEvaluator(); + var evaluatedMetrics = new HashMap(); + + var testCustomEvaluationSession = + Tuple2.of( + customEvaluator, + new CustomEvaluator.Context( + new UnmodifiableConfiguration(conf), + Collections.unmodifiableSortedMap(metricHistory), + Collections.unmodifiableMap( + new HashMap< + JobVertexID, + Map>()), + topology, + false, + Duration.ZERO, + new Configuration())); + + var testCustomEvaluatorResult = + ScalingMetricEvaluator.runCustomEvaluator( + source, evaluatedMetrics, testCustomEvaluationSession); + + assertFalse(testCustomEvaluatorResult.isEmpty()); + + assertTrue(testCustomEvaluatorResult.containsKey(ScalingMetric.TARGET_DATA_RATE)); + + assertEquals( + EvaluatedScalingMetric.avg(100000.0), + testCustomEvaluatorResult.get(ScalingMetric.TARGET_DATA_RATE)); + } + + @Test + public void testMergeEvaluatedMetricsMaps() { + Map actual = new HashMap<>(); + actual.put(ScalingMetric.TARGET_DATA_RATE, EvaluatedScalingMetric.avg(50000.0)); + actual.put(ScalingMetric.LOAD, EvaluatedScalingMetric.avg(0.5)); + + // Case 1: Merge with null (should not modify actual) + ScalingMetricEvaluator.mergeEvaluatedMetricsMaps(actual, null); + assertEquals(2, actual.size()); + assertEquals( + EvaluatedScalingMetric.avg(50000.0), actual.get(ScalingMetric.TARGET_DATA_RATE)); + assertEquals(EvaluatedScalingMetric.avg(0.5), actual.get(ScalingMetric.LOAD)); + + // Case 2: Merge with an empty map (should not modify actual) + ScalingMetricEvaluator.mergeEvaluatedMetricsMaps(actual, Collections.emptyMap()); + assertEquals(2, actual.size()); + assertEquals( + EvaluatedScalingMetric.avg(50000.0), actual.get(ScalingMetric.TARGET_DATA_RATE)); + assertEquals(EvaluatedScalingMetric.avg(0.5), actual.get(ScalingMetric.LOAD)); + + // Case 3: Merge with an incoming map + Map incoming = new HashMap<>(); + incoming.put(ScalingMetric.TARGET_DATA_RATE, EvaluatedScalingMetric.avg(100000.0)); + incoming.put(ScalingMetric.LAG, new EvaluatedScalingMetric(10.0, 10.0)); + + ScalingMetricEvaluator.mergeEvaluatedMetricsMaps(actual, incoming); + assertEquals(3, actual.size()); + + assertTrue(actual.containsKey(ScalingMetric.LAG)); + assertEquals(new EvaluatedScalingMetric(10.0, 10.0), actual.get(ScalingMetric.LAG)); + + assertTrue(actual.containsKey(ScalingMetric.TARGET_DATA_RATE)); + assertEquals( + EvaluatedScalingMetric.avg(100000.0), actual.get(ScalingMetric.TARGET_DATA_RATE)); + + assertTrue(actual.containsKey(ScalingMetric.LOAD)); + assertEquals(EvaluatedScalingMetric.avg(0.5), actual.get(ScalingMetric.LOAD)); + } + + @Test + public void testMergeEvaluatedScalingMetric() { + // Case 1 + EvaluatedScalingMetric actual = new EvaluatedScalingMetric(50.0, 100.0); + EvaluatedScalingMetric incoming = new EvaluatedScalingMetric(60.0, 120.0); + + EvaluatedScalingMetric result = + ScalingMetricEvaluator.mergeEvaluatedScalingMetric(actual, incoming); + + assertEquals(60.0, result.getCurrent()); + assertEquals(120.0, result.getAverage()); + + // Case 2 + EvaluatedScalingMetric incomingWithNaN = new EvaluatedScalingMetric(Double.NaN, Double.NaN); + result = ScalingMetricEvaluator.mergeEvaluatedScalingMetric(actual, incomingWithNaN); + + assertEquals(50.0, result.getCurrent()); + assertEquals(100.0, result.getAverage()); + + // Case 3 + EvaluatedScalingMetric incomingWithPartialNaN = + new EvaluatedScalingMetric(Double.NaN, 130.0); + result = ScalingMetricEvaluator.mergeEvaluatedScalingMetric(actual, incomingWithPartialNaN); + + assertEquals(50.0, result.getCurrent(), "Current value should remain unchanged"); + assertEquals(130.0, result.getAverage(), "Average value should be updated from incoming"); + + // Case 4 + EvaluatedScalingMetric actualWithNaN = new EvaluatedScalingMetric(Double.NaN, Double.NaN); + result = ScalingMetricEvaluator.mergeEvaluatedScalingMetric(actualWithNaN, incoming); + + assertEquals(60.0, result.getCurrent(), "Current value should be updated from incoming"); + assertEquals(120.0, result.getAverage(), "Average value should be updated from incoming"); + } } diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/TestCustomEvaluator.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/TestCustomEvaluator.java new file mode 100644 index 0000000000..cc26e17db0 --- /dev/null +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/TestCustomEvaluator.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.metrics; + +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * A test implementation of the {@link CustomEvaluator} interface that provides custom scaling + * metric evaluations for job vertices in a Flink job. + */ +public class TestCustomEvaluator implements CustomEvaluator { + @Override + public Map evaluateVertexMetrics( + JobVertexID vertex, + Map evaluatedMetrics, + Context evaluationContext) { + if (evaluationContext.getTopology().isSource(vertex)) { + var customEvaluatedMetrics = new HashMap(); + customEvaluatedMetrics.put( + ScalingMetric.TARGET_DATA_RATE, EvaluatedScalingMetric.avg(100000.0)); + + return customEvaluatedMetrics; + } + return Collections.emptyMap(); + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java index 4bd2836f7a..cf03073c39 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java @@ -174,7 +174,9 @@ void registerDeploymentController() { var statusRecorder = StatusRecorder.create(client, metricManager, listeners); var clusterResourceManager = ClusterResourceManager.of(configManager.getDefaultConfig(), client); - var autoscaler = AutoscalerFactory.create(client, eventRecorder, clusterResourceManager); + var autoscaler = + AutoscalerFactory.create( + client, eventRecorder, clusterResourceManager, configManager); var reconcilerFactory = new ReconcilerFactory(eventRecorder, statusRecorder, autoscaler); var observerFactory = new FlinkDeploymentObserverFactory(eventRecorder); var canaryResourceManager = new CanaryResourceManager(configManager); @@ -198,7 +200,7 @@ void registerSessionJobController() { var metricManager = MetricManager.createFlinkSessionJobMetricManager(baseConfig, metricGroup); var statusRecorder = StatusRecorder.create(client, metricManager, listeners); - var autoscaler = AutoscalerFactory.create(client, eventRecorder, null); + var autoscaler = AutoscalerFactory.create(client, eventRecorder, null, configManager); var reconciler = new SessionJobReconciler(eventRecorder, statusRecorder, autoscaler); var observer = new FlinkSessionJobObserver(eventRecorder); var canaryResourceManager = new CanaryResourceManager(configManager); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java index 7bbb5492ec..d97d339381 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java @@ -24,6 +24,7 @@ import org.apache.flink.autoscaler.ScalingMetricEvaluator; import org.apache.flink.kubernetes.operator.autoscaler.state.ConfigMapStore; import org.apache.flink.kubernetes.operator.autoscaler.state.KubernetesAutoScalerStateStore; +import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.resources.ClusterResourceManager; import org.apache.flink.kubernetes.operator.utils.EventRecorder; @@ -36,10 +37,12 @@ public class AutoscalerFactory { public static JobAutoScaler create( KubernetesClient client, EventRecorder eventRecorder, - ClusterResourceManager clusterResourceManager) { + ClusterResourceManager clusterResourceManager, + FlinkConfigManager configManager) { var stateStore = new KubernetesAutoScalerStateStore(new ConfigMapStore(client)); var eventHandler = new KubernetesAutoScalerEventHandler(eventRecorder); + var customEvaluators = AutoscalerUtils.discoverCustomEvaluators(configManager); return new JobAutoScalerImpl<>( new RestApiMetricsCollector<>(), @@ -47,6 +50,7 @@ public static JobAutoScaler create( new ScalingExecutor<>(eventHandler, stateStore, clusterResourceManager), eventHandler, new KubernetesScalingRealizer(), - stateStore); + stateStore, + customEvaluators); } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerUtils.java new file mode 100644 index 0000000000..2f82ce3f7d --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerUtils.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler; + +import org.apache.flink.autoscaler.metrics.CustomEvaluator; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.core.plugin.PluginUtils; +import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** Autoscaler related utility methods for Operator. */ +public class AutoscalerUtils { + private static final Logger LOG = LoggerFactory.getLogger(AutoscalerUtils.class); + + /** + * discovers custom evaluator's for autoscaler. + * + * @param configManager Flink Config manager + * @return A map of discovered custom evaluators, where the key is the fully qualified class + * name of the custom evaluator and the value is the corresponding instance. + */ + public static Map discoverCustomEvaluators( + FlinkConfigManager configManager) { + var conf = configManager.getDefaultConfig(); + Map customEvaluators = new HashMap<>(); + + PluginUtils.createPluginManagerFromRootFolder(conf) + .load(CustomEvaluator.class) + .forEachRemaining( + customEvaluator -> { + String customEvaluatorClass = customEvaluator.getClass().getName(); + LOG.info( + "Discovered custom evaluator for autoscaler from plugin directory[{}]: {}.", + System.getenv() + .getOrDefault( + ConfigConstants.ENV_FLINK_PLUGINS_DIR, + ConfigConstants.DEFAULT_FLINK_PLUGINS_DIRS), + customEvaluatorClass); + customEvaluator.configure(conf); + customEvaluators.put(customEvaluatorClass, customEvaluator); + }); + return customEvaluators; + } +} diff --git a/flink-kubernetes-operator/src/test/assembly/test-plugins-assembly.xml b/flink-kubernetes-operator/src/test/assembly/test-plugins-assembly.xml index 3cb8b5d176..67526784a4 100644 --- a/flink-kubernetes-operator/src/test/assembly/test-plugins-assembly.xml +++ b/flink-kubernetes-operator/src/test/assembly/test-plugins-assembly.xml @@ -32,6 +32,7 @@ under the License. org/apache/flink/kubernetes/operator/listener/TestingListener.class org/apache/flink/kubernetes/operator/autoscaler/TestingAutoscaler.class org/apache/flink/kubernetes/operator/mutator/TestMutator.java + org/apache/flink/autoscaler/metrics/TestCustomEvaluator.java @@ -40,4 +41,4 @@ under the License. /META-INF/services - + \ No newline at end of file diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactoryTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactoryTest.java index f848d491a6..6b884044b8 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactoryTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactoryTest.java @@ -19,6 +19,8 @@ import org.apache.flink.autoscaler.JobAutoScaler; import org.apache.flink.autoscaler.JobAutoScalerImpl; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.resources.ClusterResourceManager; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.kubernetes.operator.utils.FlinkResourceEventCollector; @@ -47,7 +49,8 @@ void testLoadDefaultImplementation() { new EventRecorder( new FlinkResourceEventCollector(), new FlinkStateSnapshotEventCollector()), - new ClusterResourceManager(Duration.ZERO, kubernetesClient)); + new ClusterResourceManager(Duration.ZERO, kubernetesClient), + new FlinkConfigManager(new Configuration())); Assertions.assertTrue(autoScaler instanceof JobAutoScalerImpl); } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerUtilsTest.java new file mode 100644 index 0000000000..e84863facf --- /dev/null +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerUtilsTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.autoscaler; + +import org.apache.flink.autoscaler.metrics.TestCustomEvaluator; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.TestUtils; +import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** Unit tests for {@link AutoscalerUtils}. */ +public class AutoscalerUtilsTest { + @TempDir public Path temporaryFolder; + + @Test + public void testDiscoverCustomEvaluators() throws IOException { + Map originalEnv = System.getenv(); + try { + Map systemEnv = new HashMap<>(originalEnv); + systemEnv.put( + ConfigConstants.ENV_FLINK_PLUGINS_DIR, + TestUtils.getTestPluginsRootDir(temporaryFolder)); + TestUtils.setEnv(systemEnv); + + // Discover evaluators + var discoveredEvaluators = + AutoscalerUtils.discoverCustomEvaluators( + new FlinkConfigManager(new Configuration())) + .keySet(); + // Expected evaluators + var expectedEvaluators = new HashSet<>(List.of(TestCustomEvaluator.class.getName())); + + assertEquals(expectedEvaluators, discoveredEvaluators); + } finally { + TestUtils.setEnv(originalEnv); + } + } +} diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java index 09885b965a..6ac12dfddf 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java @@ -104,7 +104,8 @@ public TestingFlinkDeploymentController( flinkService.getKubernetesClient(), eventRecorder, new ClusterResourceManager( - Duration.ZERO, flinkService.getKubernetesClient()))); + Duration.ZERO, flinkService.getKubernetesClient()), + configManager)); canaryResourceManager = new CanaryResourceManager<>(configManager); flinkDeploymentController = new FlinkDeploymentController( diff --git a/flink-kubernetes-operator/src/test/resources/META-INF/services/org.apache.flink.autoscaler.metrics.CustomEvaluator b/flink-kubernetes-operator/src/test/resources/META-INF/services/org.apache.flink.autoscaler.metrics.CustomEvaluator new file mode 100644 index 0000000000..a150d9783a --- /dev/null +++ b/flink-kubernetes-operator/src/test/resources/META-INF/services/org.apache.flink.autoscaler.metrics.CustomEvaluator @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.flink.autoscaler.metrics.TestCustomEvaluator \ No newline at end of file From d1cbb119036d56e495203b9d353686bb38975641 Mon Sep 17 00:00:00 2001 From: Pradeepta Choudhury Date: Tue, 11 Mar 2025 13:22:08 +0530 Subject: [PATCH 2/5] 1. adding comment for supporting fallback key with delegating configuration. 2. add sample custom evaluator for simple trend adjustor --- .../autoscaler/config/AutoScalerOptions.java | 2 +- .../metrics/SimpleTrendAdjustor.java | 108 ++++++++++++++++++ 2 files changed, 109 insertions(+), 1 deletion(-) create mode 100644 flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/SimpleTrendAdjustor.java diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java index 5bcdb0cc20..8b6face0da 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java @@ -395,7 +395,7 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { public static Configuration forCustomEvaluator( Configuration configuration, String customEvaluatorName) { - + // add support for fallBackKey with DelegatingConfiguration. return new DelegatingConfiguration( configuration, AUTOSCALER_CONF_PREFIX + CUSTOM_EVALUATOR_CONF_PREFIX + customEvaluatorName + "."); diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/SimpleTrendAdjustor.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/SimpleTrendAdjustor.java new file mode 100644 index 0000000000..d37d648aba --- /dev/null +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/SimpleTrendAdjustor.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.metrics; + +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import java.time.Instant; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.SortedMap; + +/** + * A simple implementation of the {@link CustomEvaluator} interface that adjusts scaling metrics + * based on recent historical trends. This evaluator applies a weighted moving average to refine the + * target data rate for source job vertices, enabling more responsive scaling decisions. + */ +public class SimpleTrendAdjustor implements CustomEvaluator { + @Override + public Map evaluateVertexMetrics( + JobVertexID vertex, + Map evaluatedMetrics, + Context evaluationContext) { + + if (!evaluationContext.getTopology().isSource(vertex)) { + return Collections.emptyMap(); + } + + var customEvaluatedMetrics = new HashMap(); + + // Extract current target data rate + EvaluatedScalingMetric targetDataRateMetric = + evaluatedMetrics.get(ScalingMetric.TARGET_DATA_RATE); + double currentTargetRate = + (targetDataRateMetric != null) ? targetDataRateMetric.getAverage() : 0.0; + + // Compute historical trend adjustment + double trendAdjustment = + computeTrendAdjustment(vertex, evaluationContext.getMetricsHistory()); + + // Apply a dynamic adjustment based on recent trends + double adjustedTargetRate = currentTargetRate + trendAdjustment; + + // Store the updated metric + customEvaluatedMetrics.put( + ScalingMetric.TARGET_DATA_RATE, EvaluatedScalingMetric.avg(adjustedTargetRate)); + + return customEvaluatedMetrics; + } + + /** + * Computes a trend-based adjustment using recent historical metrics. Uses a simple weighted + * moving average over the last few recorded metrics. + */ + private double computeTrendAdjustment( + JobVertexID vertex, SortedMap metricsHistory) { + if (metricsHistory.isEmpty()) { + // Fallback: apply no increase if no history is available + return 0.; + } + + double totalWeight = 0.0; + double weightedSum = 0.0; + // Increasing weight for more recent data points + int weight = 1; + + // Iterate over the last N entries (e.g., last 5 data points) + int count = 0; + for (var entry : metricsHistory.values()) { + Double historicalRate = + entry.getVertexMetrics().get(vertex).get(ScalingMetric.TARGET_DATA_RATE); + if (historicalRate != null) { + weightedSum += historicalRate * weight; + totalWeight += weight; + weight++; + count++; + } + if (count >= 5) { // Limit to last 5 points + break; + } + } + + return (totalWeight > 0) + ? (weightedSum / totalWeight) + - metricsHistory + .get(metricsHistory.lastKey()) + .getVertexMetrics() + .get(vertex) + .get(ScalingMetric.TARGET_DATA_RATE) + : 0.; + } +} From 7aa418533e1a536784e6cf5b4eeb66052012cac1 Mon Sep 17 00:00:00 2001 From: Pradeepta Choudhury Date: Fri, 14 Mar 2025 11:12:51 +0530 Subject: [PATCH 3/5] =?UTF-8?q?1.=20Adding=20getName=20method=20to=20Custo?= =?UTF-8?q?mEvaluator=20to=20derive=20the=20name=20from=20the=20interface.?= =?UTF-8?q?=202.=20Updating=20javadoc=E2=80=99s=20for=20the=20added=20meth?= =?UTF-8?q?ods.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../flink/autoscaler/JobAutoScalerImpl.java | 20 +++-- .../autoscaler/ScalingMetricEvaluator.java | 24 +++++ .../autoscaler/metrics/CustomEvaluator.java | 7 ++ .../metrics/CustomEvaluatorOptions.java | 36 -------- .../autoscaler/JobAutoScalerImplTest.java | 87 ++++++++++++++----- .../metrics/SimpleTrendAdjustor.java | 5 ++ .../metrics/TestCustomEvaluator.java | 5 ++ .../operator/autoscaler/AutoscalerUtils.java | 21 +++-- .../autoscaler/AutoscalerUtilsTest.java | 3 +- 9 files changed, 133 insertions(+), 75 deletions(-) delete mode 100644 flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CustomEvaluatorOptions.java diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java index 12cc9b4ee4..d440020d75 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java @@ -46,7 +46,6 @@ import static org.apache.flink.autoscaler.config.AutoScalerOptions.CUSTOM_EVALUATOR_NAME; import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.initRecommendedParallelism; import static org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics.resetRecommendedParallelism; -import static org.apache.flink.autoscaler.metrics.CustomEvaluatorOptions.CUSTOM_EVALUATOR_CLASS; import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.getTrimmedScalingHistory; import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.getTrimmedScalingTracking; @@ -279,13 +278,16 @@ void setClock(Clock clock) { @VisibleForTesting protected Tuple2 getCustomEvaluatorIfRequired( Configuration conf) { - var customEvaluatorName = conf.get(CUSTOM_EVALUATOR_NAME); - var customEvaluatorConfig = AutoScalerOptions.forCustomEvaluator(conf, customEvaluatorName); - CustomEvaluator evaluator = - Optional.ofNullable(customEvaluatorConfig.get(CUSTOM_EVALUATOR_CLASS)) - .map(this.customEvaluators::get) - .orElse(null); - - return evaluator != null ? new Tuple2<>(evaluator, customEvaluatorConfig) : null; + return Optional.ofNullable(conf.get(CUSTOM_EVALUATOR_NAME)) + .map( + name -> { + CustomEvaluator evaluator = customEvaluators.get(name); + return evaluator != null + ? new Tuple2<>( + evaluator, + AutoScalerOptions.forCustomEvaluator(conf, name)) + : null; + }) + .orElse(null); } } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java index a24a86ab3d..74c7784659 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java @@ -631,6 +631,16 @@ protected static double computeEdgeDataRate( return getRate(ScalingMetric.NUM_RECORDS_OUT, from, metricsHistory); } + /** + * Executes the provided custom evaluator for the given job vertex. Calls {@link + * CustomEvaluator#evaluateVertexMetrics} to evaluate scaling metrics. + * + * @param vertex The job vertex being evaluated. + * @param evaluatedMetrics Current evaluated metrics. + * @param customEvaluationSession A tuple containing the custom evaluator and evaluation + * context. + * @return A map of scaling metrics, with its corresponding evaluated scaling metric. + */ @VisibleForTesting protected static Map runCustomEvaluator( JobVertexID vertex, @@ -654,6 +664,13 @@ protected static Map runCustomEvaluator( return Collections.emptyMap(); } + /** + * Merges the incoming evaluated metrics into actual evaluated metrics. + * + * @param actual The target evaluated metrics map to merge into. + * @param incoming The incoming map containing new evaluated metrics map to be merged + * (nullable). + */ @VisibleForTesting protected static void mergeEvaluatedMetricsMaps( Map actual, @@ -670,6 +687,13 @@ protected static void mergeEvaluatedMetricsMaps( ::mergeEvaluatedScalingMetric))); } + /** + * Merges two {@link EvaluatedScalingMetric} instances. + * + * @param actual The existing evaluated scaling metric. + * @param incoming The incoming evaluated scaling metric. + * @return A new {@link EvaluatedScalingMetric} instance with merged values. + */ @VisibleForTesting protected static EvaluatedScalingMetric mergeEvaluatedScalingMetric( EvaluatedScalingMetric actual, EvaluatedScalingMetric incoming) { diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CustomEvaluator.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CustomEvaluator.java index 2fb7f6a204..650c559335 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CustomEvaluator.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CustomEvaluator.java @@ -37,6 +37,13 @@ */ public interface CustomEvaluator extends Plugin { + /** + * Returns the name of the custom evaluator. + * + * @return The name of the custom evaluator. + */ + String getName(); + /** * Evaluates scaling metrics for a given job vertex based on the internally evaluated metrics * and context. diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CustomEvaluatorOptions.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CustomEvaluatorOptions.java deleted file mode 100644 index f6ddd65901..0000000000 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CustomEvaluatorOptions.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.autoscaler.metrics; - -import org.apache.flink.configuration.ConfigOption; - -import static org.apache.flink.autoscaler.config.AutoScalerOptions.CUSTOM_EVALUATOR_NAME; -import static org.apache.flink.configuration.ConfigOptions.key; - -/** Defines configuration options for {@link CustomEvaluator}. */ -public class CustomEvaluatorOptions { - public static final ConfigOption CUSTOM_EVALUATOR_CLASS = - key("class") - .stringType() - .noDefaultValue() - .withDescription( - String.format( - "Class name of the custom evaluator to be used. Required to be passed if %s is set.", - CUSTOM_EVALUATOR_NAME)); -} diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java index c741ce989a..52f3c1646b 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java @@ -466,29 +466,15 @@ private TestingScalingRealizer.Event> getEven @Test void testGetCustomEvaluatorIfRequiredWithCustomEvaluator() { - var topology = new JobTopology(); - CustomEvaluator testCustomEvaluator = new TestCustomEvaluator(); testCustomEvaluator.configure(new Configuration()); - var testCustomEvaluators = - Map.of(testCustomEvaluator.getClass().getName(), testCustomEvaluator); + var testCustomEvaluators = Map.of(testCustomEvaluator.getName(), testCustomEvaluator); - String testCustomEvaluatorName = "testCustomEvaluator"; - String testCustomEvaluatorClassName = TestCustomEvaluator.class.getName(); + String testCustomEvaluatorName = "test-custom-evaluator"; var defaultConf = context.getConfiguration(); defaultConf.set(AutoScalerOptions.CUSTOM_EVALUATOR_NAME, testCustomEvaluatorName); - defaultConf.set( - ConfigOptions.key( - AutoScalerOptions.AUTOSCALER_CONF_PREFIX - + AutoScalerOptions.CUSTOM_EVALUATOR_CONF_PREFIX - + testCustomEvaluatorName - + ".class") - .stringType() - .noDefaultValue(), - testCustomEvaluatorClassName); - defaultConf.set( ConfigOptions.key( AutoScalerOptions.AUTOSCALER_CONF_PREFIX @@ -525,17 +511,15 @@ void testGetCustomEvaluatorIfRequiredWithCustomEvaluator() { assertInstanceOf(CustomEvaluator.class, customEvaluatorWithConfig.f0); var customEvaluatorConfig = customEvaluatorWithConfig.f1; assertNotNull(customEvaluatorConfig); - int expectedKeyCount = 3; + int expectedKeyCount = 2; assertEquals(expectedKeyCount, customEvaluatorConfig.keySet().size()); - Set expectedKeys = Set.of("class", "k1", "k2"); + Set expectedKeys = Set.of("k1", "k2"); assertTrue(customEvaluatorConfig.keySet().containsAll(expectedKeys)); } @Test void testGetCustomEvaluatorIfRequiredWithoutCustomEvaluator() { - var topology = new JobTopology(); - var autoscaler = new JobAutoScalerImpl<>( null, @@ -555,8 +539,33 @@ void testGetCustomEvaluatorIfRequiredWithoutCustomEvaluator() { void testGetCustomEvaluatorIfRequiredWithCustomEvaluatorButNotConfigured() { CustomEvaluator testCustomEvaluator = new TestCustomEvaluator(); testCustomEvaluator.configure(new Configuration()); - var testCustomEvaluators = - Map.of(testCustomEvaluator.getClass().getName(), testCustomEvaluator); + var testCustomEvaluators = Map.of(testCustomEvaluator.getName(), testCustomEvaluator); + + var autoscaler = + new JobAutoScalerImpl<>( + null, + null, + null, + eventCollector, + scalingRealizer, + stateStore, + testCustomEvaluators); + + var customEvaluatorWithConfig = + autoscaler.getCustomEvaluatorIfRequired(context.getConfiguration()); + assertNull(customEvaluatorWithConfig); + } + + @Test + void testGetCustomEvaluatorIfRequiredWithCustomEvaluatorButNoMatches() { + CustomEvaluator testCustomEvaluator = new TestCustomEvaluator(); + testCustomEvaluator.configure(new Configuration()); + var testCustomEvaluators = Map.of(testCustomEvaluator.getName(), testCustomEvaluator); + + String testCustomEvaluatorName = "test-custom-evaluator-no-match"; + + var defaultConf = context.getConfiguration(); + defaultConf.set(AutoScalerOptions.CUSTOM_EVALUATOR_NAME, testCustomEvaluatorName); var autoscaler = new JobAutoScalerImpl<>( @@ -572,4 +581,38 @@ void testGetCustomEvaluatorIfRequiredWithCustomEvaluatorButNotConfigured() { autoscaler.getCustomEvaluatorIfRequired(context.getConfiguration()); assertNull(customEvaluatorWithConfig); } + + @Test + void testGetCustomEvaluatorIfRequiredWithCustomEvaluatorNoConfig() { + CustomEvaluator testCustomEvaluator = new TestCustomEvaluator(); + testCustomEvaluator.configure(new Configuration()); + var testCustomEvaluators = Map.of(testCustomEvaluator.getName(), testCustomEvaluator); + + String testCustomEvaluatorName = "test-custom-evaluator"; + + var defaultConf = context.getConfiguration(); + defaultConf.set(AutoScalerOptions.CUSTOM_EVALUATOR_NAME, testCustomEvaluatorName); + + var autoscaler = + new JobAutoScalerImpl<>( + null, + null, + null, + eventCollector, + scalingRealizer, + stateStore, + testCustomEvaluators); + + var customEvaluatorWithConfig = + autoscaler.getCustomEvaluatorIfRequired(context.getConfiguration()); + assertNotNull(customEvaluatorWithConfig); + assertInstanceOf(CustomEvaluator.class, customEvaluatorWithConfig.f0); + var customEvaluatorConfig = customEvaluatorWithConfig.f1; + assertNotNull(customEvaluatorConfig); + int expectedKeyCount = 0; + assertEquals(expectedKeyCount, customEvaluatorConfig.keySet().size()); + + Set expectedKeys = Set.of(); + assertTrue(customEvaluatorConfig.keySet().containsAll(expectedKeys)); + } } diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/SimpleTrendAdjustor.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/SimpleTrendAdjustor.java index d37d648aba..12b555f4e1 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/SimpleTrendAdjustor.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/SimpleTrendAdjustor.java @@ -32,6 +32,11 @@ * target data rate for source job vertices, enabling more responsive scaling decisions. */ public class SimpleTrendAdjustor implements CustomEvaluator { + @Override + public String getName() { + return "simple-trend-adjustor"; + } + @Override public Map evaluateVertexMetrics( JobVertexID vertex, diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/TestCustomEvaluator.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/TestCustomEvaluator.java index cc26e17db0..55b186b0f2 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/TestCustomEvaluator.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/TestCustomEvaluator.java @@ -29,6 +29,11 @@ * metric evaluations for job vertices in a Flink job. */ public class TestCustomEvaluator implements CustomEvaluator { + @Override + public String getName() { + return "test-custom-evaluator"; + } + @Override public Map evaluateVertexMetrics( JobVertexID vertex, diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerUtils.java index 2f82ce3f7d..e2ef8d8b6c 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerUtils.java @@ -37,28 +37,37 @@ public class AutoscalerUtils { * discovers custom evaluator's for autoscaler. * * @param configManager Flink Config manager - * @return A map of discovered custom evaluators, where the key is the fully qualified class - * name of the custom evaluator and the value is the corresponding instance. + * @return A map of discovered custom evaluators, where the key is the evaluator name provided + * by {@link CustomEvaluator#getName()}) and the value is the corresponding instance. */ public static Map discoverCustomEvaluators( FlinkConfigManager configManager) { var conf = configManager.getDefaultConfig(); Map customEvaluators = new HashMap<>(); - PluginUtils.createPluginManagerFromRootFolder(conf) .load(CustomEvaluator.class) .forEachRemaining( customEvaluator -> { - String customEvaluatorClass = customEvaluator.getClass().getName(); + String customEvaluatorName = customEvaluator.getName(); LOG.info( "Discovered custom evaluator for autoscaler from plugin directory[{}]: {}.", System.getenv() .getOrDefault( ConfigConstants.ENV_FLINK_PLUGINS_DIR, ConfigConstants.DEFAULT_FLINK_PLUGINS_DIRS), - customEvaluatorClass); + customEvaluatorName); customEvaluator.configure(conf); - customEvaluators.put(customEvaluatorClass, customEvaluator); + if (customEvaluators.containsKey(customEvaluatorName)) { + LOG.warn( + "Duplicate custom evaluator name [{}] detected. Overwriting existing [{}] with [{}].", + customEvaluatorName, + customEvaluators + .get(customEvaluatorName) + .getClass() + .getName(), + customEvaluator.getClass().getName()); + } + customEvaluators.put(customEvaluatorName, customEvaluator); }); return customEvaluators; } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerUtilsTest.java index e84863facf..2d1f365daf 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerUtilsTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerUtilsTest.java @@ -18,7 +18,6 @@ package org.apache.flink.kubernetes.operator.autoscaler; -import org.apache.flink.autoscaler.metrics.TestCustomEvaluator; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.TestUtils; @@ -56,7 +55,7 @@ public void testDiscoverCustomEvaluators() throws IOException { new FlinkConfigManager(new Configuration())) .keySet(); // Expected evaluators - var expectedEvaluators = new HashSet<>(List.of(TestCustomEvaluator.class.getName())); + var expectedEvaluators = new HashSet<>(List.of("test-custom-evaluator")); assertEquals(expectedEvaluators, discoveredEvaluators); } finally { From 7a57e0557560ffa94e85efd632cb5b80a0c79d9b Mon Sep 17 00:00:00 2001 From: Pradeepta Choudhury Date: Tue, 29 Apr 2025 02:55:44 +0530 Subject: [PATCH 4/5] 1. Refactoring tests for the plugin 2. Passing null for custom evaluators for standalone autoscaler entrypoint. --- .../StandaloneAutoscalerEntrypoint.java | 3 +- .../AutoScalerCustomEvaluatorTest.java | 16 +- .../autoscaler/JobAutoScalerImplTest.java | 174 +++++++----------- .../ScalingMetricEvaluatorTest.java | 13 +- 4 files changed, 72 insertions(+), 134 deletions(-) diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java index c249727025..957af4c41c 100644 --- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java +++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java @@ -94,7 +94,8 @@ JobAutoScaler createJobAutoscaler( new ScalingExecutor<>(eventHandler, stateStore), eventHandler, new RescaleApiScalingRealizer<>(eventHandler), - stateStore); + stateStore, + null); } @VisibleForTesting diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/AutoScalerCustomEvaluatorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/AutoScalerCustomEvaluatorTest.java index 5a86385887..20d5374e70 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/AutoScalerCustomEvaluatorTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/AutoScalerCustomEvaluatorTest.java @@ -31,7 +31,6 @@ import org.apache.flink.autoscaler.topology.IOMetrics; import org.apache.flink.autoscaler.topology.JobTopology; import org.apache.flink.autoscaler.topology.VertexInfo; -import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -69,8 +68,7 @@ public void setup() { ScalingExecutor> scalingExecutor = new ScalingExecutor<>(eventCollector, stateStore); - String testCustomEvaluatorName = "testEvaluator"; - String testCustomEvaluatorClassName = TestCustomEvaluator.class.getName(); + String testCustomEvaluatorName = "test-custom-evaluator"; var customEvaluators = createTestCustomEvaluator(); @@ -104,16 +102,6 @@ public void setup() { defaultConf.set(AutoScalerOptions.CUSTOM_EVALUATOR_NAME, testCustomEvaluatorName); - defaultConf.set( - ConfigOptions.key( - AutoScalerOptions.AUTOSCALER_CONF_PREFIX - + AutoScalerOptions.CUSTOM_EVALUATOR_CONF_PREFIX - + testCustomEvaluatorName - + ".class") - .stringType() - .noDefaultValue(), - testCustomEvaluatorClassName); - autoscaler = new JobAutoScalerImpl<>( metricsCollector, @@ -180,6 +168,6 @@ private void assertFlinkMetricsCount(int scalingCount, int balancedCount) { private Map createTestCustomEvaluator() { var testCustomEvaluator = new TestCustomEvaluator(); testCustomEvaluator.configure(new Configuration()); - return Map.of(testCustomEvaluator.getClass().getName(), testCustomEvaluator); + return Map.of(testCustomEvaluator.getName(), testCustomEvaluator); } } diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java index 52f3c1646b..03dfc409ba 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java @@ -465,16 +465,41 @@ private TestingScalingRealizer.Event> getEven } @Test - void testGetCustomEvaluatorIfRequiredWithCustomEvaluator() { + void testGetCustomEvaluatorIfRequired() { CustomEvaluator testCustomEvaluator = new TestCustomEvaluator(); testCustomEvaluator.configure(new Configuration()); var testCustomEvaluators = Map.of(testCustomEvaluator.getName(), testCustomEvaluator); + var autoscalerWithCustomEvaluator = + new JobAutoScalerImpl<>( + null, + null, + null, + eventCollector, + scalingRealizer, + stateStore, + testCustomEvaluators); + String testCustomEvaluatorName = "test-custom-evaluator"; var defaultConf = context.getConfiguration(); + + // Case 1: Custom evaluator configured. defaultConf.set(AutoScalerOptions.CUSTOM_EVALUATOR_NAME, testCustomEvaluatorName); + var customEvaluatorWithConfig = + autoscalerWithCustomEvaluator.getCustomEvaluatorIfRequired( + context.getConfiguration()); + assertNotNull(customEvaluatorWithConfig); + assertInstanceOf(CustomEvaluator.class, customEvaluatorWithConfig.f0); + var customEvaluatorConfig = customEvaluatorWithConfig.f1; + assertNotNull(customEvaluatorConfig); + assertEquals(0, customEvaluatorConfig.keySet().size()); + + Set expectedKeys = Set.of(); + assertEquals(expectedKeys, customEvaluatorConfig.keySet()); + + // Case 2: Custom evaluator configured with additional configs for the plugin. defaultConf.set( ConfigOptions.key( AutoScalerOptions.AUTOSCALER_CONF_PREFIX @@ -495,32 +520,39 @@ void testGetCustomEvaluatorIfRequiredWithCustomEvaluator() { .noDefaultValue(), "v2"); - var autoscaler = - new JobAutoScalerImpl<>( - null, - null, - null, - eventCollector, - scalingRealizer, - stateStore, - testCustomEvaluators); - - var customEvaluatorWithConfig = - autoscaler.getCustomEvaluatorIfRequired(context.getConfiguration()); - assertNotNull(customEvaluatorWithConfig); - assertInstanceOf(CustomEvaluator.class, customEvaluatorWithConfig.f0); - var customEvaluatorConfig = customEvaluatorWithConfig.f1; - assertNotNull(customEvaluatorConfig); - int expectedKeyCount = 2; - assertEquals(expectedKeyCount, customEvaluatorConfig.keySet().size()); - - Set expectedKeys = Set.of("k1", "k2"); - assertTrue(customEvaluatorConfig.keySet().containsAll(expectedKeys)); - } - - @Test - void testGetCustomEvaluatorIfRequiredWithoutCustomEvaluator() { - var autoscaler = + var customEvaluatorWithConfigContainingAdditionalKeys = + autoscalerWithCustomEvaluator.getCustomEvaluatorIfRequired( + context.getConfiguration()); + assertNotNull(customEvaluatorWithConfigContainingAdditionalKeys); + assertInstanceOf( + CustomEvaluator.class, customEvaluatorWithConfigContainingAdditionalKeys.f0); + var customEvaluatorConfigContainingAdditionalKeys = + customEvaluatorWithConfigContainingAdditionalKeys.f1; + assertNotNull(customEvaluatorConfigContainingAdditionalKeys); + assertEquals(2, customEvaluatorConfigContainingAdditionalKeys.keySet().size()); + + expectedKeys = Set.of("k1", "k2"); + assertEquals(expectedKeys, customEvaluatorConfigContainingAdditionalKeys.keySet()); + + // Case 3: Custom evaluator configured but with a custom evaluator name that is not + // available. + defaultConf.set(AutoScalerOptions.CUSTOM_EVALUATOR_NAME, "test-custom-evaluator-no-match"); + + var customEvaluatorWithConfigNoMatch = + autoscalerWithCustomEvaluator.getCustomEvaluatorIfRequired( + context.getConfiguration()); + assertNull(customEvaluatorWithConfigNoMatch); + + // Case 4: Custom evaluator not configured. + defaultConf.removeConfig(AutoScalerOptions.CUSTOM_EVALUATOR_NAME); + + var customEvaluatorNotConfigured = + autoscalerWithCustomEvaluator.getCustomEvaluatorIfRequired( + context.getConfiguration()); + assertNull(customEvaluatorNotConfigured); + + // Case 5: No custom evaluators available. + var autoscalerWithoutCustomEvaluator = new JobAutoScalerImpl<>( null, null, @@ -530,89 +562,9 @@ void testGetCustomEvaluatorIfRequiredWithoutCustomEvaluator() { stateStore, Collections.emptyMap()); - var customEvaluatorWithConfig = - autoscaler.getCustomEvaluatorIfRequired(context.getConfiguration()); - assertNull(customEvaluatorWithConfig); - } - - @Test - void testGetCustomEvaluatorIfRequiredWithCustomEvaluatorButNotConfigured() { - CustomEvaluator testCustomEvaluator = new TestCustomEvaluator(); - testCustomEvaluator.configure(new Configuration()); - var testCustomEvaluators = Map.of(testCustomEvaluator.getName(), testCustomEvaluator); - - var autoscaler = - new JobAutoScalerImpl<>( - null, - null, - null, - eventCollector, - scalingRealizer, - stateStore, - testCustomEvaluators); - - var customEvaluatorWithConfig = - autoscaler.getCustomEvaluatorIfRequired(context.getConfiguration()); - assertNull(customEvaluatorWithConfig); - } - - @Test - void testGetCustomEvaluatorIfRequiredWithCustomEvaluatorButNoMatches() { - CustomEvaluator testCustomEvaluator = new TestCustomEvaluator(); - testCustomEvaluator.configure(new Configuration()); - var testCustomEvaluators = Map.of(testCustomEvaluator.getName(), testCustomEvaluator); - - String testCustomEvaluatorName = "test-custom-evaluator-no-match"; - - var defaultConf = context.getConfiguration(); - defaultConf.set(AutoScalerOptions.CUSTOM_EVALUATOR_NAME, testCustomEvaluatorName); - - var autoscaler = - new JobAutoScalerImpl<>( - null, - null, - null, - eventCollector, - scalingRealizer, - stateStore, - testCustomEvaluators); - - var customEvaluatorWithConfig = - autoscaler.getCustomEvaluatorIfRequired(context.getConfiguration()); - assertNull(customEvaluatorWithConfig); - } - - @Test - void testGetCustomEvaluatorIfRequiredWithCustomEvaluatorNoConfig() { - CustomEvaluator testCustomEvaluator = new TestCustomEvaluator(); - testCustomEvaluator.configure(new Configuration()); - var testCustomEvaluators = Map.of(testCustomEvaluator.getName(), testCustomEvaluator); - - String testCustomEvaluatorName = "test-custom-evaluator"; - - var defaultConf = context.getConfiguration(); - defaultConf.set(AutoScalerOptions.CUSTOM_EVALUATOR_NAME, testCustomEvaluatorName); - - var autoscaler = - new JobAutoScalerImpl<>( - null, - null, - null, - eventCollector, - scalingRealizer, - stateStore, - testCustomEvaluators); - - var customEvaluatorWithConfig = - autoscaler.getCustomEvaluatorIfRequired(context.getConfiguration()); - assertNotNull(customEvaluatorWithConfig); - assertInstanceOf(CustomEvaluator.class, customEvaluatorWithConfig.f0); - var customEvaluatorConfig = customEvaluatorWithConfig.f1; - assertNotNull(customEvaluatorConfig); - int expectedKeyCount = 0; - assertEquals(expectedKeyCount, customEvaluatorConfig.keySet().size()); - - Set expectedKeys = Set.of(); - assertTrue(customEvaluatorConfig.keySet().containsAll(expectedKeys)); + var customEvaluatorConfigNoCustomEvaluators = + autoscalerWithoutCustomEvaluator.getCustomEvaluatorIfRequired( + context.getConfiguration()); + assertNull(customEvaluatorConfigNoCustomEvaluators); } } diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java index 9e6a2709ec..5a4577dfef 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java @@ -994,10 +994,7 @@ public void testRunCustomEvaluator() { new CustomEvaluator.Context( new UnmodifiableConfiguration(conf), Collections.unmodifiableSortedMap(metricHistory), - Collections.unmodifiableMap( - new HashMap< - JobVertexID, - Map>()), + Collections.unmodifiableMap(new HashMap<>()), topology, false, Duration.ZERO, @@ -1079,14 +1076,14 @@ public void testMergeEvaluatedScalingMetric() { new EvaluatedScalingMetric(Double.NaN, 130.0); result = ScalingMetricEvaluator.mergeEvaluatedScalingMetric(actual, incomingWithPartialNaN); - assertEquals(50.0, result.getCurrent(), "Current value should remain unchanged"); - assertEquals(130.0, result.getAverage(), "Average value should be updated from incoming"); + assertEquals(50.0, result.getCurrent()); + assertEquals(130.0, result.getAverage()); // Case 4 EvaluatedScalingMetric actualWithNaN = new EvaluatedScalingMetric(Double.NaN, Double.NaN); result = ScalingMetricEvaluator.mergeEvaluatedScalingMetric(actualWithNaN, incoming); - assertEquals(60.0, result.getCurrent(), "Current value should be updated from incoming"); - assertEquals(120.0, result.getAverage(), "Average value should be updated from incoming"); + assertEquals(60.0, result.getCurrent()); + assertEquals(120.0, result.getAverage()); } } From 67d0799617a3e1bc13447109b24cd112e765b737 Mon Sep 17 00:00:00 2001 From: Pradeepta Choudhury Date: Wed, 28 May 2025 23:09:46 +0530 Subject: [PATCH 5/5] updating name for evaluator plugin to match naming convention for plugins. --- docs/content.zh/docs/operations/plugins.md | 2 + .../flink/autoscaler/JobAutoScalerImpl.java | 10 +- .../autoscaler/ScalingMetricEvaluator.java | 17 ++- ...tor.java => FlinkAutoscalerEvaluator.java} | 4 +- .../AutoScalerCustomEvaluatorTest.java | 4 +- .../autoscaler/JobAutoScalerImplTest.java | 9 +- .../ScalingMetricEvaluatorTest.java | 8 +- .../metrics/SimpleTrendAdjustor.java | 113 ------------------ .../metrics/TestCustomEvaluator.java | 6 +- .../operator/autoscaler/AutoscalerUtils.java | 11 +- ...toscaler.metrics.FlinkAutoscalerEvaluator} | 0 11 files changed, 40 insertions(+), 144 deletions(-) rename flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/{CustomEvaluator.java => FlinkAutoscalerEvaluator.java} (96%) delete mode 100644 flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/SimpleTrendAdjustor.java rename flink-kubernetes-operator/src/test/resources/META-INF/services/{org.apache.flink.autoscaler.metrics.CustomEvaluator => org.apache.flink.autoscaler.metrics.FlinkAutoscalerEvaluator} (100%) diff --git a/docs/content.zh/docs/operations/plugins.md b/docs/content.zh/docs/operations/plugins.md index 3abd2c3a76..14d5228528 100644 --- a/docs/content.zh/docs/operations/plugins.md +++ b/docs/content.zh/docs/operations/plugins.md @@ -188,3 +188,5 @@ The following steps demonstrate how to develop and use a custom mutator. ```text 2023-12-12 06:26:56,667 o.a.f.k.o.u.MutatorUtils [INFO ] Discovered mutator from plugin directory[/opt/flink/plugins]: org.apache.flink.mutator.CustomFlinkMutator. ``` + +## Flink Autoscaler Custom Evaluator diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java index d440020d75..ddfd78cf32 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java @@ -24,8 +24,8 @@ import org.apache.flink.autoscaler.event.AutoScalerEventHandler; import org.apache.flink.autoscaler.exceptions.NotReadyException; import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics; -import org.apache.flink.autoscaler.metrics.CustomEvaluator; import org.apache.flink.autoscaler.metrics.EvaluatedMetrics; +import org.apache.flink.autoscaler.metrics.FlinkAutoscalerEvaluator; import org.apache.flink.autoscaler.realizer.ScalingRealizer; import org.apache.flink.autoscaler.state.AutoScalerStateStore; import org.apache.flink.autoscaler.tuning.ConfigChanges; @@ -63,7 +63,7 @@ public class JobAutoScalerImpl> private final AutoScalerEventHandler eventHandler; private final ScalingRealizer scalingRealizer; private final AutoScalerStateStore stateStore; - private final Map customEvaluators; + private final Map customEvaluators; private Clock clock = Clock.systemDefaultZone(); @@ -80,7 +80,7 @@ public JobAutoScalerImpl( AutoScalerEventHandler eventHandler, ScalingRealizer scalingRealizer, AutoScalerStateStore stateStore, - Map customEvaluators) { + Map customEvaluators) { this.metricsCollector = metricsCollector; this.evaluator = evaluator; this.scalingExecutor = scalingExecutor; @@ -276,12 +276,12 @@ void setClock(Clock clock) { } @VisibleForTesting - protected Tuple2 getCustomEvaluatorIfRequired( + protected Tuple2 getCustomEvaluatorIfRequired( Configuration conf) { return Optional.ofNullable(conf.get(CUSTOM_EVALUATOR_NAME)) .map( name -> { - CustomEvaluator evaluator = customEvaluators.get(name); + FlinkAutoscalerEvaluator evaluator = customEvaluators.get(name); return evaluator != null ? new Tuple2<>( evaluator, diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java index 74c7784659..aa3d02e270 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java @@ -22,9 +22,9 @@ import org.apache.flink.autoscaler.config.AutoScalerOptions; import org.apache.flink.autoscaler.metrics.CollectedMetricHistory; import org.apache.flink.autoscaler.metrics.CollectedMetrics; -import org.apache.flink.autoscaler.metrics.CustomEvaluator; import org.apache.flink.autoscaler.metrics.EvaluatedMetrics; import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.FlinkAutoscalerEvaluator; import org.apache.flink.autoscaler.metrics.MetricAggregator; import org.apache.flink.autoscaler.metrics.ScalingMetric; import org.apache.flink.autoscaler.topology.JobTopology; @@ -79,7 +79,9 @@ public EvaluatedMetrics evaluate( Configuration conf, CollectedMetricHistory collectedMetrics, Duration restartTime, - @Nullable Tuple2 customEvaluatorWithConfig) { + @Nullable + Tuple2 + customEvaluatorWithConfig) { LOG.debug("Restart time used in metrics evaluation: {}", restartTime); var scalingOutput = new HashMap>(); var metricsHistory = collectedMetrics.getMetricHistory(); @@ -93,7 +95,7 @@ public EvaluatedMetrics evaluate( info -> Tuple2.of( info.f0, - new CustomEvaluator.Context( + new FlinkAutoscalerEvaluator.Context( new UnmodifiableConfiguration(conf), Collections.unmodifiableSortedMap( metricsHistory), @@ -158,7 +160,9 @@ private Map evaluateMetrics( JobVertexID vertex, boolean processingBacklog, Duration restartTime, - @Nullable Tuple2 customEvaluationSession) { + @Nullable + Tuple2 + customEvaluationSession) { var latestVertexMetrics = metricsHistory.get(metricsHistory.lastKey()).getVertexMetrics().get(vertex); @@ -633,7 +637,7 @@ protected static double computeEdgeDataRate( /** * Executes the provided custom evaluator for the given job vertex. Calls {@link - * CustomEvaluator#evaluateVertexMetrics} to evaluate scaling metrics. + * FlinkAutoscalerEvaluator#evaluateVertexMetrics} to evaluate scaling metrics. * * @param vertex The job vertex being evaluated. * @param evaluatedMetrics Current evaluated metrics. @@ -645,7 +649,8 @@ protected static double computeEdgeDataRate( protected static Map runCustomEvaluator( JobVertexID vertex, Map evaluatedMetrics, - Tuple2 customEvaluationSession) { + Tuple2 + customEvaluationSession) { try { return customEvaluationSession.f0.evaluateVertexMetrics( vertex, evaluatedMetrics, customEvaluationSession.f1); diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CustomEvaluator.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkAutoscalerEvaluator.java similarity index 96% rename from flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CustomEvaluator.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkAutoscalerEvaluator.java index 650c559335..976f79b620 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CustomEvaluator.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkAutoscalerEvaluator.java @@ -31,11 +31,11 @@ import java.util.SortedMap; /** - * Interface for custom evaluators that allow custom scaling metric evaluations. Implementations of + * Interface for custom evaluators that allow tailored scaling metric evaluations. Implementations of * this interface can provide custom logic to evaluate vertex metrics and merge them with internally * evaluated metrics. */ -public interface CustomEvaluator extends Plugin { +public interface FlinkAutoscalerEvaluator extends Plugin { /** * Returns the name of the custom evaluator. diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/AutoScalerCustomEvaluatorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/AutoScalerCustomEvaluatorTest.java index 20d5374e70..00bf08e376 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/AutoScalerCustomEvaluatorTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/AutoScalerCustomEvaluatorTest.java @@ -22,7 +22,7 @@ import org.apache.flink.autoscaler.config.AutoScalerOptions; import org.apache.flink.autoscaler.event.TestingEventCollector; import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics; -import org.apache.flink.autoscaler.metrics.CustomEvaluator; +import org.apache.flink.autoscaler.metrics.FlinkAutoscalerEvaluator; import org.apache.flink.autoscaler.metrics.TestCustomEvaluator; import org.apache.flink.autoscaler.metrics.TestMetrics; import org.apache.flink.autoscaler.realizer.TestingScalingRealizer; @@ -165,7 +165,7 @@ private void assertFlinkMetricsCount(int scalingCount, int balancedCount) { assertEquals(balancedCount, autoscalerFlinkMetrics.getNumBalancedCount()); } - private Map createTestCustomEvaluator() { + private Map createTestCustomEvaluator() { var testCustomEvaluator = new TestCustomEvaluator(); testCustomEvaluator.configure(new Configuration()); return Map.of(testCustomEvaluator.getName(), testCustomEvaluator); diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java index 03dfc409ba..407b5f28b6 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java @@ -23,7 +23,7 @@ import org.apache.flink.autoscaler.exceptions.NotReadyException; import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics; import org.apache.flink.autoscaler.metrics.CollectedMetrics; -import org.apache.flink.autoscaler.metrics.CustomEvaluator; +import org.apache.flink.autoscaler.metrics.FlinkAutoscalerEvaluator; import org.apache.flink.autoscaler.metrics.ScalingMetric; import org.apache.flink.autoscaler.metrics.TestCustomEvaluator; import org.apache.flink.autoscaler.metrics.TestMetrics; @@ -466,7 +466,7 @@ private TestingScalingRealizer.Event> getEven @Test void testGetCustomEvaluatorIfRequired() { - CustomEvaluator testCustomEvaluator = new TestCustomEvaluator(); + FlinkAutoscalerEvaluator testCustomEvaluator = new TestCustomEvaluator(); testCustomEvaluator.configure(new Configuration()); var testCustomEvaluators = Map.of(testCustomEvaluator.getName(), testCustomEvaluator); @@ -491,7 +491,7 @@ void testGetCustomEvaluatorIfRequired() { autoscalerWithCustomEvaluator.getCustomEvaluatorIfRequired( context.getConfiguration()); assertNotNull(customEvaluatorWithConfig); - assertInstanceOf(CustomEvaluator.class, customEvaluatorWithConfig.f0); + assertInstanceOf(FlinkAutoscalerEvaluator.class, customEvaluatorWithConfig.f0); var customEvaluatorConfig = customEvaluatorWithConfig.f1; assertNotNull(customEvaluatorConfig); assertEquals(0, customEvaluatorConfig.keySet().size()); @@ -525,7 +525,8 @@ void testGetCustomEvaluatorIfRequired() { context.getConfiguration()); assertNotNull(customEvaluatorWithConfigContainingAdditionalKeys); assertInstanceOf( - CustomEvaluator.class, customEvaluatorWithConfigContainingAdditionalKeys.f0); + FlinkAutoscalerEvaluator.class, + customEvaluatorWithConfigContainingAdditionalKeys.f0); var customEvaluatorConfigContainingAdditionalKeys = customEvaluatorWithConfigContainingAdditionalKeys.f1; assertNotNull(customEvaluatorConfigContainingAdditionalKeys); diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java index 5a4577dfef..28179be4c8 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java @@ -21,8 +21,8 @@ import org.apache.flink.autoscaler.config.AutoScalerOptions; import org.apache.flink.autoscaler.metrics.CollectedMetricHistory; import org.apache.flink.autoscaler.metrics.CollectedMetrics; -import org.apache.flink.autoscaler.metrics.CustomEvaluator; import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.FlinkAutoscalerEvaluator; import org.apache.flink.autoscaler.metrics.MetricAggregator; import org.apache.flink.autoscaler.metrics.ScalingMetric; import org.apache.flink.autoscaler.metrics.TestCustomEvaluator; @@ -319,7 +319,7 @@ public void testEvaluateWithCustomEvaluator() { var conf = new Configuration(); conf.set(CATCH_UP_DURATION, Duration.ofSeconds(2)); - CustomEvaluator customEvaluator = new TestCustomEvaluator(); + FlinkAutoscalerEvaluator customEvaluator = new TestCustomEvaluator(); var customEvaluatorWithConfig = new Tuple2<>(customEvaluator, new Configuration()); var evaluatedMetrics = @@ -985,13 +985,13 @@ public void testRunCustomEvaluator() { Map.of())); var conf = new Configuration(); - CustomEvaluator customEvaluator = new TestCustomEvaluator(); + FlinkAutoscalerEvaluator customEvaluator = new TestCustomEvaluator(); var evaluatedMetrics = new HashMap(); var testCustomEvaluationSession = Tuple2.of( customEvaluator, - new CustomEvaluator.Context( + new FlinkAutoscalerEvaluator.Context( new UnmodifiableConfiguration(conf), Collections.unmodifiableSortedMap(metricHistory), Collections.unmodifiableMap(new HashMap<>()), diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/SimpleTrendAdjustor.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/SimpleTrendAdjustor.java deleted file mode 100644 index 12b555f4e1..0000000000 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/SimpleTrendAdjustor.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.autoscaler.metrics; - -import org.apache.flink.runtime.jobgraph.JobVertexID; - -import java.time.Instant; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.SortedMap; - -/** - * A simple implementation of the {@link CustomEvaluator} interface that adjusts scaling metrics - * based on recent historical trends. This evaluator applies a weighted moving average to refine the - * target data rate for source job vertices, enabling more responsive scaling decisions. - */ -public class SimpleTrendAdjustor implements CustomEvaluator { - @Override - public String getName() { - return "simple-trend-adjustor"; - } - - @Override - public Map evaluateVertexMetrics( - JobVertexID vertex, - Map evaluatedMetrics, - Context evaluationContext) { - - if (!evaluationContext.getTopology().isSource(vertex)) { - return Collections.emptyMap(); - } - - var customEvaluatedMetrics = new HashMap(); - - // Extract current target data rate - EvaluatedScalingMetric targetDataRateMetric = - evaluatedMetrics.get(ScalingMetric.TARGET_DATA_RATE); - double currentTargetRate = - (targetDataRateMetric != null) ? targetDataRateMetric.getAverage() : 0.0; - - // Compute historical trend adjustment - double trendAdjustment = - computeTrendAdjustment(vertex, evaluationContext.getMetricsHistory()); - - // Apply a dynamic adjustment based on recent trends - double adjustedTargetRate = currentTargetRate + trendAdjustment; - - // Store the updated metric - customEvaluatedMetrics.put( - ScalingMetric.TARGET_DATA_RATE, EvaluatedScalingMetric.avg(adjustedTargetRate)); - - return customEvaluatedMetrics; - } - - /** - * Computes a trend-based adjustment using recent historical metrics. Uses a simple weighted - * moving average over the last few recorded metrics. - */ - private double computeTrendAdjustment( - JobVertexID vertex, SortedMap metricsHistory) { - if (metricsHistory.isEmpty()) { - // Fallback: apply no increase if no history is available - return 0.; - } - - double totalWeight = 0.0; - double weightedSum = 0.0; - // Increasing weight for more recent data points - int weight = 1; - - // Iterate over the last N entries (e.g., last 5 data points) - int count = 0; - for (var entry : metricsHistory.values()) { - Double historicalRate = - entry.getVertexMetrics().get(vertex).get(ScalingMetric.TARGET_DATA_RATE); - if (historicalRate != null) { - weightedSum += historicalRate * weight; - totalWeight += weight; - weight++; - count++; - } - if (count >= 5) { // Limit to last 5 points - break; - } - } - - return (totalWeight > 0) - ? (weightedSum / totalWeight) - - metricsHistory - .get(metricsHistory.lastKey()) - .getVertexMetrics() - .get(vertex) - .get(ScalingMetric.TARGET_DATA_RATE) - : 0.; - } -} diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/TestCustomEvaluator.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/TestCustomEvaluator.java index 55b186b0f2..b1426f6bc9 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/TestCustomEvaluator.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/TestCustomEvaluator.java @@ -25,10 +25,10 @@ import java.util.Map; /** - * A test implementation of the {@link CustomEvaluator} interface that provides custom scaling - * metric evaluations for job vertices in a Flink job. + * A test implementation of the {@link FlinkAutoscalerEvaluator} interface that provides + * custom scaling metric evaluations for job vertices in a Flink job. */ -public class TestCustomEvaluator implements CustomEvaluator { +public class TestCustomEvaluator implements FlinkAutoscalerEvaluator { @Override public String getName() { return "test-custom-evaluator"; diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerUtils.java index e2ef8d8b6c..3ea5f077a2 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerUtils.java @@ -18,7 +18,7 @@ package org.apache.flink.kubernetes.operator.autoscaler; -import org.apache.flink.autoscaler.metrics.CustomEvaluator; +import org.apache.flink.autoscaler.metrics.FlinkAutoscalerEvaluator; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.core.plugin.PluginUtils; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; @@ -38,14 +38,15 @@ public class AutoscalerUtils { * * @param configManager Flink Config manager * @return A map of discovered custom evaluators, where the key is the evaluator name provided - * by {@link CustomEvaluator#getName()}) and the value is the corresponding instance. + * by {@link FlinkAutoscalerEvaluator#getName()}) and the value is the corresponding + * instance. */ - public static Map discoverCustomEvaluators( + public static Map discoverCustomEvaluators( FlinkConfigManager configManager) { var conf = configManager.getDefaultConfig(); - Map customEvaluators = new HashMap<>(); + Map customEvaluators = new HashMap<>(); PluginUtils.createPluginManagerFromRootFolder(conf) - .load(CustomEvaluator.class) + .load(FlinkAutoscalerEvaluator.class) .forEachRemaining( customEvaluator -> { String customEvaluatorName = customEvaluator.getName(); diff --git a/flink-kubernetes-operator/src/test/resources/META-INF/services/org.apache.flink.autoscaler.metrics.CustomEvaluator b/flink-kubernetes-operator/src/test/resources/META-INF/services/org.apache.flink.autoscaler.metrics.FlinkAutoscalerEvaluator similarity index 100% rename from flink-kubernetes-operator/src/test/resources/META-INF/services/org.apache.flink.autoscaler.metrics.CustomEvaluator rename to flink-kubernetes-operator/src/test/resources/META-INF/services/org.apache.flink.autoscaler.metrics.FlinkAutoscalerEvaluator