diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java index a167d8cb17e3b..00679a4c34468 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java @@ -20,13 +20,19 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.api.functions.async.AsyncBatchFunction; +import org.apache.flink.streaming.api.functions.async.AsyncBatchRetryStrategy; +import org.apache.flink.streaming.api.functions.async.AsyncBatchTimeoutPolicy; import org.apache.flink.streaming.api.functions.async.AsyncFunction; import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy; +import org.apache.flink.streaming.api.operators.async.AsyncBatchWaitOperatorFactory; import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator; import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory; +import org.apache.flink.streaming.api.operators.async.OrderedAsyncBatchWaitOperatorFactory; import org.apache.flink.util.Preconditions; import org.apache.flink.util.Utils; +import java.time.Duration; import java.util.concurrent.TimeUnit; import static org.apache.flink.streaming.util.retryable.AsyncRetryStrategies.NO_RETRY_STRATEGY; @@ -319,4 +325,285 @@ public static SingleOutputStreamOperator orderedWaitWithRetry( OutputMode.ORDERED, asyncRetryStrategy); } + + // ================================================================================ + // Batch Async Operations + // ================================================================================ + + /** + * Adds an AsyncBatchWaitOperator to process elements in batches. The order of output stream + * records may be reordered (unordered mode). + * + *

This method is particularly useful for high-latency inference workloads where batching can + * significantly improve throughput, such as machine learning model inference. + * + *

The operator buffers incoming elements and triggers the async batch function when the + * buffer reaches {@code maxBatchSize}. Remaining elements are flushed when the input ends. + * + * @param in Input {@link DataStream} + * @param func {@link AsyncBatchFunction} to process batches of elements + * @param maxBatchSize Maximum number of elements to batch before triggering async invocation + * @param Type of input record + * @param Type of output record + * @return A new {@link SingleOutputStreamOperator} + */ + public static SingleOutputStreamOperator unorderedWaitBatch( + DataStream in, AsyncBatchFunction func, int maxBatchSize) { + return unorderedWaitBatch(in, func, maxBatchSize, 0L); + } + + /** + * Adds an AsyncBatchWaitOperator to process elements in batches with timeout support. The order + * of output stream records may be reordered (unordered mode). + * + *

This method is particularly useful for high-latency inference workloads where batching can + * significantly improve throughput, such as machine learning model inference. + * + *

The operator buffers incoming elements and triggers the async batch function when either: + * + *

+ * + *

Remaining elements are flushed when the input ends. + * + * @param in Input {@link DataStream} + * @param func {@link AsyncBatchFunction} to process batches of elements + * @param maxBatchSize Maximum number of elements to batch before triggering async invocation + * @param batchTimeoutMs Batch timeout in milliseconds; <= 0 means timeout is disabled + * @param Type of input record + * @param Type of output record + * @return A new {@link SingleOutputStreamOperator} + */ + public static SingleOutputStreamOperator unorderedWaitBatch( + DataStream in, + AsyncBatchFunction func, + int maxBatchSize, + long batchTimeoutMs) { + Preconditions.checkArgument(maxBatchSize > 0, "maxBatchSize must be greater than 0"); + + TypeInformation outTypeInfo = + TypeExtractor.getUnaryOperatorReturnType( + func, + AsyncBatchFunction.class, + 0, + 1, + new int[] {1, 0}, + in.getType(), + Utils.getCallLocationName(), + true); + + // create transform + AsyncBatchWaitOperatorFactory operatorFactory = + new AsyncBatchWaitOperatorFactory<>( + in.getExecutionEnvironment().clean(func), maxBatchSize, batchTimeoutMs); + + return in.transform("async batch wait operator", outTypeInfo, operatorFactory); + } + + /** + * Adds an AsyncBatchWaitOperator to process elements in batches with ordered output. The order + * of output stream records is guaranteed to be the same as input order. + * + *

This method is particularly useful for high-latency inference workloads where batching can + * significantly improve throughput while maintaining ordering guarantees, such as machine + * learning model inference with order-sensitive downstream processing. + * + *

The operator buffers incoming elements and triggers the async batch function when either: + * + *

+ * + *

Results are buffered and emitted in the original input order, regardless of async + * completion order. + * + * @param in Input {@link DataStream} + * @param func {@link AsyncBatchFunction} to process batches of elements + * @param maxBatchSize Maximum number of elements to batch before triggering async invocation + * @param maxWaitTime Maximum duration to wait before flushing a partial batch; Duration.ZERO or + * negative means timeout is disabled + * @param Type of input record + * @param Type of output record + * @return A new {@link SingleOutputStreamOperator} + */ + public static SingleOutputStreamOperator orderedWaitBatch( + DataStream in, + AsyncBatchFunction func, + int maxBatchSize, + Duration maxWaitTime) { + Preconditions.checkArgument(maxBatchSize > 0, "maxBatchSize must be greater than 0"); + Preconditions.checkNotNull(maxWaitTime, "maxWaitTime must not be null"); + + long batchTimeoutMs = maxWaitTime.toMillis(); + + TypeInformation outTypeInfo = + TypeExtractor.getUnaryOperatorReturnType( + func, + AsyncBatchFunction.class, + 0, + 1, + new int[] {1, 0}, + in.getType(), + Utils.getCallLocationName(), + true); + + // create transform + OrderedAsyncBatchWaitOperatorFactory operatorFactory = + new OrderedAsyncBatchWaitOperatorFactory<>( + in.getExecutionEnvironment().clean(func), maxBatchSize, batchTimeoutMs); + + return in.transform("ordered async batch wait operator", outTypeInfo, operatorFactory); + } + + /** + * Adds an AsyncBatchWaitOperator to process elements in batches with ordered output. The order + * of output stream records is guaranteed to be the same as input order. + * + *

This overload disables timeout-based batching. Batches are only flushed when the buffer + * reaches {@code maxBatchSize} or when the input ends. + * + * @param in Input {@link DataStream} + * @param func {@link AsyncBatchFunction} to process batches of elements + * @param maxBatchSize Maximum number of elements to batch before triggering async invocation + * @param Type of input record + * @param Type of output record + * @return A new {@link SingleOutputStreamOperator} + */ + public static SingleOutputStreamOperator orderedWaitBatch( + DataStream in, AsyncBatchFunction func, int maxBatchSize) { + return orderedWaitBatch(in, func, maxBatchSize, Duration.ZERO); + } + + // ================================================================================ + // Batch Async Operations with Retry and Timeout Support + // ================================================================================ + + /** + * Adds an AsyncBatchWaitOperator to process elements in batches with retry and timeout support. + * The order of output stream records may be reordered (unordered mode). + * + *

This method is particularly useful for high-latency inference workloads where: + * + *

+ * + *

The operator buffers incoming elements and triggers the async batch function when either: + * + *

+ * + * @param in Input {@link DataStream} + * @param func {@link AsyncBatchFunction} to process batches of elements + * @param maxBatchSize Maximum number of elements to batch before triggering async invocation + * @param batchTimeoutMs Batch timeout in milliseconds; <= 0 means timeout is disabled + * @param retryStrategy Retry strategy for failed batch operations + * @param asyncTimeoutPolicy Timeout policy for async batch operations + * @param Type of input record + * @param Type of output record + * @return A new {@link SingleOutputStreamOperator} + */ + public static SingleOutputStreamOperator unorderedWaitBatch( + DataStream in, + AsyncBatchFunction func, + int maxBatchSize, + long batchTimeoutMs, + AsyncBatchRetryStrategy retryStrategy, + AsyncBatchTimeoutPolicy asyncTimeoutPolicy) { + Preconditions.checkArgument(maxBatchSize > 0, "maxBatchSize must be greater than 0"); + Preconditions.checkNotNull(retryStrategy, "retryStrategy must not be null"); + Preconditions.checkNotNull(asyncTimeoutPolicy, "asyncTimeoutPolicy must not be null"); + + TypeInformation outTypeInfo = + TypeExtractor.getUnaryOperatorReturnType( + func, + AsyncBatchFunction.class, + 0, + 1, + new int[] {1, 0}, + in.getType(), + Utils.getCallLocationName(), + true); + + // create transform + AsyncBatchWaitOperatorFactory operatorFactory = + new AsyncBatchWaitOperatorFactory<>( + in.getExecutionEnvironment().clean(func), + maxBatchSize, + batchTimeoutMs, + retryStrategy, + asyncTimeoutPolicy); + + return in.transform("async batch wait operator", outTypeInfo, operatorFactory); + } + + /** + * Adds an AsyncBatchWaitOperator with retry support only. The order of output stream records + * may be reordered (unordered mode). + * + * @param in Input {@link DataStream} + * @param func {@link AsyncBatchFunction} to process batches of elements + * @param maxBatchSize Maximum number of elements to batch before triggering async invocation + * @param batchTimeoutMs Batch timeout in milliseconds; <= 0 means timeout is disabled + * @param retryStrategy Retry strategy for failed batch operations + * @param Type of input record + * @param Type of output record + * @return A new {@link SingleOutputStreamOperator} + */ + public static SingleOutputStreamOperator unorderedWaitBatchWithRetry( + DataStream in, + AsyncBatchFunction func, + int maxBatchSize, + long batchTimeoutMs, + AsyncBatchRetryStrategy retryStrategy) { + return unorderedWaitBatch( + in, + func, + maxBatchSize, + batchTimeoutMs, + retryStrategy, + AsyncBatchTimeoutPolicy.NO_TIMEOUT_POLICY); + } + + /** + * Adds an AsyncBatchWaitOperator with timeout support only. The order of output stream records + * may be reordered (unordered mode). + * + * @param in Input {@link DataStream} + * @param func {@link AsyncBatchFunction} to process batches of elements + * @param maxBatchSize Maximum number of elements to batch before triggering async invocation + * @param batchTimeoutMs Batch timeout in milliseconds; <= 0 means timeout is disabled + * @param asyncTimeoutPolicy Timeout policy for async batch operations + * @param Type of input record + * @param Type of output record + * @return A new {@link SingleOutputStreamOperator} + */ + @SuppressWarnings("unchecked") + public static SingleOutputStreamOperator unorderedWaitBatchWithTimeout( + DataStream in, + AsyncBatchFunction func, + int maxBatchSize, + long batchTimeoutMs, + AsyncBatchTimeoutPolicy asyncTimeoutPolicy) { + return unorderedWaitBatch( + in, + func, + maxBatchSize, + batchTimeoutMs, + (AsyncBatchRetryStrategy) + org.apache.flink.streaming.util.retryable.AsyncBatchRetryStrategies + .NO_RETRY_STRATEGY, + asyncTimeoutPolicy); + } + + // TODO: Add event-time based batching support in follow-up PR + // TODO: Add ordered batch operations with retry/timeout in follow-up PR } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncBatchFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncBatchFunction.java new file mode 100644 index 0000000000000..e8a4bf3fd0596 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncBatchFunction.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.async; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.Function; + +import java.io.Serializable; +import java.util.List; + +/** + * A function to trigger Async I/O operations in batches. + * + *

For each batch of inputs, an async I/O operation can be triggered via {@link + * #asyncInvokeBatch}, and once it has been done, the results can be collected by calling {@link + * ResultFuture#complete}. This is particularly useful for high-latency inference workloads where + * batching can significantly improve throughput. + * + *

Unlike {@link AsyncFunction} which processes one element at a time, this interface allows + * processing multiple elements together, which is beneficial for scenarios like: + * + *

    + *
  • Machine learning model inference where batching improves GPU utilization + *
  • External service calls that support batch APIs + *
  • Database queries that can be batched for efficiency + *
+ * + *

Example usage: + * + *

{@code
+ * public class BatchInferenceFunction implements AsyncBatchFunction {
+ *
+ *   public void asyncInvokeBatch(List inputs, ResultFuture resultFuture) {
+ *     // Submit batch inference request
+ *     CompletableFuture.supplyAsync(() -> {
+ *         List results = modelService.batchInference(inputs);
+ *         return results;
+ *     }).thenAccept(results -> resultFuture.complete(results));
+ *   }
+ * }
+ * }
+ * + * @param The type of the input elements. + * @param The type of the returned elements. + */ +@PublicEvolving +public interface AsyncBatchFunction extends Function, Serializable { + + /** + * Trigger async operation for a batch of stream inputs. + * + *

The implementation should process all inputs in the batch and complete the result future + * with all corresponding outputs. The number of outputs does not need to match the number of + * inputs - it depends on the specific use case. + * + * @param inputs a batch of elements coming from upstream tasks + * @param resultFuture to be completed with the result data for the entire batch + * @throws Exception in case of a user code error. An exception will make the task fail and + * trigger fail-over process. + */ + void asyncInvokeBatch(List inputs, ResultFuture resultFuture) throws Exception; + + // TODO: Add timeout handling in follow-up PR + // TODO: Add open/close lifecycle methods in follow-up PR +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncBatchRetryPredicate.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncBatchRetryPredicate.java new file mode 100644 index 0000000000000..dc9f12002f742 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncBatchRetryPredicate.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.async; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.Collection; +import java.util.Optional; +import java.util.function.Predicate; + +/** + * Interface that encapsulates predicates for determining when to retry a batch async operation. + * + *

This is the batch-equivalent of {@link AsyncRetryPredicate}, designed specifically for {@link + * AsyncBatchFunction} operations. + * + * @param The type of the output elements. + */ +@PublicEvolving +public interface AsyncBatchRetryPredicate { + + /** + * An Optional Java {@link Predicate} that defines a condition on the batch function's result + * which will trigger a retry operation. + * + *

This predicate is evaluated on the complete collection of results returned by {@link + * ResultFuture#complete(Collection)}. + * + * @return predicate on result of {@link Collection}, or empty if no result-based retry is + * configured + */ + Optional>> resultPredicate(); + + /** + * An Optional Java {@link Predicate} that defines a condition on the batch function's exception + * which will trigger a retry operation. + * + *

This predicate is evaluated on the exception passed to {@link + * ResultFuture#completeExceptionally(Throwable)}. + * + * @return predicate on {@link Throwable} exception, or empty if no exception-based retry is + * configured + */ + Optional> exceptionPredicate(); +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncBatchRetryStrategy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncBatchRetryStrategy.java new file mode 100644 index 0000000000000..9e3d44fb485d1 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncBatchRetryStrategy.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.async; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.Serializable; + +/** + * Interface encapsulates a retry strategy for batch async operations. + * + *

This is the batch-equivalent of {@link AsyncRetryStrategy}, designed specifically for {@link + * AsyncBatchFunction} operations. It defines: + * + *

    + *
  • Maximum number of retry attempts + *
  • Backoff delay between retries + *
  • Conditions under which retry should be triggered + *
+ * + *

Example usage: + * + *

{@code
+ * AsyncBatchRetryStrategy strategy = new AsyncBatchRetryStrategies
+ *     .FixedDelayRetryStrategyBuilder(3, 1000L)
+ *     .ifException(e -> e instanceof TimeoutException)
+ *     .build();
+ * }
+ * + * @param The type of the output elements. + */ +@PublicEvolving +public interface AsyncBatchRetryStrategy extends Serializable { + + /** + * Determines whether a retry attempt can be made based on the current number of attempts. + * + * @param currentAttempts the number of attempts already made (starts from 1) + * @return true if another retry attempt can be made, false otherwise + */ + boolean canRetry(int currentAttempts); + + /** + * Returns the backoff time in milliseconds before the next retry attempt. + * + * @param currentAttempts the number of attempts already made + * @return backoff time in milliseconds, or -1 if no retry should be performed + */ + long getBackoffTimeMillis(int currentAttempts); + + /** + * Returns the retry predicate that determines when a retry should be triggered. + * + * @return the retry predicate for this strategy + */ + AsyncBatchRetryPredicate getRetryPredicate(); +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncBatchTimeoutPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncBatchTimeoutPolicy.java new file mode 100644 index 0000000000000..c3bc6d411763c --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/AsyncBatchTimeoutPolicy.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.async; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.Serializable; +import java.time.Duration; + +/** + * Configuration for batch-level timeout behavior in async batch operations. + * + *

This policy defines: + * + *

    + *
  • The timeout duration for a batch async operation + *
  • The behavior when a timeout occurs (fail or emit partial results) + *
+ * + *

Example usage: + * + *

{@code
+ * // Fail on timeout after 5 seconds
+ * AsyncBatchTimeoutPolicy policy = AsyncBatchTimeoutPolicy.failOnTimeout(Duration.ofSeconds(5));
+ *
+ * // Allow partial results on timeout after 10 seconds
+ * AsyncBatchTimeoutPolicy policy = AsyncBatchTimeoutPolicy.allowPartialOnTimeout(Duration.ofSeconds(10));
+ * }
+ * + *

Note: Timeout is measured from the moment the batch async function is invoked until the + * result future is completed. If the timeout expires before completion: + * + *

    + *
  • With {@link TimeoutBehavior#FAIL}: The operator throws a timeout exception + *
  • With {@link TimeoutBehavior#ALLOW_PARTIAL}: The operator emits whatever results are + * available (may be empty) + *
+ */ +@PublicEvolving +public class AsyncBatchTimeoutPolicy implements Serializable { + + private static final long serialVersionUID = 1L; + + /** Constant indicating timeout is disabled. */ + private static final long NO_TIMEOUT = 0L; + + /** A policy that disables timeout (no timeout). */ + public static final AsyncBatchTimeoutPolicy NO_TIMEOUT_POLICY = + new AsyncBatchTimeoutPolicy(NO_TIMEOUT, TimeoutBehavior.FAIL); + + /** The timeout behavior when a batch operation times out. */ + public enum TimeoutBehavior { + /** + * Fail the operator when timeout occurs. This will cause the job to fail unless handled by + * a restart strategy. + */ + FAIL, + + /** + * Allow partial results when timeout occurs. If no results are available, an empty + * collection is emitted. + */ + ALLOW_PARTIAL + } + + private final long timeoutMs; + private final TimeoutBehavior behavior; + + private AsyncBatchTimeoutPolicy(long timeoutMs, TimeoutBehavior behavior) { + this.timeoutMs = timeoutMs; + this.behavior = behavior; + } + + /** + * Creates a timeout policy that fails the operator on timeout. + * + * @param timeout the timeout duration + * @return a timeout policy configured to fail on timeout + */ + public static AsyncBatchTimeoutPolicy failOnTimeout(Duration timeout) { + return new AsyncBatchTimeoutPolicy(timeout.toMillis(), TimeoutBehavior.FAIL); + } + + /** + * Creates a timeout policy that fails the operator on timeout. + * + * @param timeoutMs the timeout in milliseconds + * @return a timeout policy configured to fail on timeout + */ + public static AsyncBatchTimeoutPolicy failOnTimeout(long timeoutMs) { + return new AsyncBatchTimeoutPolicy(timeoutMs, TimeoutBehavior.FAIL); + } + + /** + * Creates a timeout policy that allows partial results on timeout. + * + * @param timeout the timeout duration + * @return a timeout policy configured to allow partial results + */ + public static AsyncBatchTimeoutPolicy allowPartialOnTimeout(Duration timeout) { + return new AsyncBatchTimeoutPolicy(timeout.toMillis(), TimeoutBehavior.ALLOW_PARTIAL); + } + + /** + * Creates a timeout policy that allows partial results on timeout. + * + * @param timeoutMs the timeout in milliseconds + * @return a timeout policy configured to allow partial results + */ + public static AsyncBatchTimeoutPolicy allowPartialOnTimeout(long timeoutMs) { + return new AsyncBatchTimeoutPolicy(timeoutMs, TimeoutBehavior.ALLOW_PARTIAL); + } + + /** + * Returns whether timeout is enabled. + * + * @return true if timeout is enabled, false otherwise + */ + public boolean isTimeoutEnabled() { + return timeoutMs > NO_TIMEOUT; + } + + /** + * Returns the timeout duration in milliseconds. + * + * @return timeout in milliseconds + */ + public long getTimeoutMs() { + return timeoutMs; + } + + /** + * Returns the timeout behavior. + * + * @return the behavior when timeout occurs + */ + public TimeoutBehavior getBehavior() { + return behavior; + } + + /** + * Returns whether partial results should be allowed on timeout. + * + * @return true if partial results are allowed, false if failure should occur + */ + public boolean shouldAllowPartialOnTimeout() { + return behavior == TimeoutBehavior.ALLOW_PARTIAL; + } + + @Override + public String toString() { + return "AsyncBatchTimeoutPolicy{" + + "timeoutMs=" + + timeoutMs + + ", behavior=" + + behavior + + '}'; + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncBatchWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncBatchWaitOperator.java new file mode 100644 index 0000000000000..42ac1cb5234a1 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncBatchWaitOperator.java @@ -0,0 +1,882 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram; +import org.apache.flink.streaming.api.functions.async.AsyncBatchFunction; +import org.apache.flink.streaming.api.functions.async.AsyncBatchRetryPredicate; +import org.apache.flink.streaming.api.functions.async.AsyncBatchRetryStrategy; +import org.apache.flink.streaming.api.functions.async.AsyncBatchTimeoutPolicy; +import org.apache.flink.streaming.api.functions.async.CollectionSupplier; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.retryable.AsyncBatchRetryStrategies; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; + +/** + * The {@link AsyncBatchWaitOperator} batches incoming stream records and invokes the {@link + * AsyncBatchFunction} when the batch size reaches the configured maximum or when the batch timeout + * is reached. + * + *

This operator implements unordered semantics only - results are emitted as soon as they are + * available, regardless of input order. This is suitable for AI inference workloads where order + * does not matter. + * + *

Key behaviors: + * + *

    + *
  • Buffer incoming records until batch size is reached OR timeout expires + *
  • Flush remaining records when end of input is signaled + *
  • Emit all results from the batch function to downstream + *
+ * + *

Timer lifecycle (when batchTimeoutMs > 0): + * + *

    + *
  • Timer is registered when first element is added to an empty buffer + *
  • Timer fires at: currentBatchStartTime + batchTimeoutMs + *
  • Timer is cleared when batch is flushed (by size, timeout, or end-of-input) + *
  • At most one timer is active at any time + *
+ * + *

Retry Support

+ * + *

This operator supports retry strategies for failed batch operations: + * + *

    + *
  • Configure via {@link AsyncBatchRetryStrategy} + *
  • Supports fixed delay and exponential backoff strategies + *
  • Retries are triggered based on exception or result predicates + *
  • Retry count metric: {@code batchRetryCount} + *
+ * + *

Timeout Support

+ * + *

This operator supports timeout policies for async batch operations: + * + *

    + *
  • Configure via {@link AsyncBatchTimeoutPolicy} + *
  • Supports fail-on-timeout or allow-partial-results behaviors + *
  • Timeout applies to individual async invocations (not batching) + *
  • Timeout count metric: {@code batchTimeoutCount} + *
+ * + *

Metrics

+ * + *

This operator exposes the following metrics for monitoring AI/ML inference workloads: + * + *

    + *
  • {@code batchSize} - Histogram of batch sizes (number of records per batch) + *
  • {@code batchLatencyMs} - Histogram of batch latency in milliseconds (time from first + * element buffered to batch flush) + *
  • {@code asyncCallDurationMs} - Histogram of async call duration in milliseconds (time from + * async invocation to completion) + *
  • {@code inflightBatches} - Gauge showing current number of in-flight async batch operations + *
  • {@code totalBatchesProcessed} - Counter of total batches processed + *
  • {@code totalRecordsProcessed} - Counter of total records processed + *
  • {@code asyncCallFailures} - Counter of failed async calls + *
  • {@code batchRetryCount} - Counter of batch retry attempts + *
  • {@code batchTimeoutCount} - Counter of batch timeouts + *
+ * + *

Future enhancements may include: + * + *

    + *
  • Ordered mode support + *
  • Event-time based batching + *
  • Multiple inflight batches + *
+ * + * @param Input type for the operator. + * @param Output type for the operator. + */ +@Internal +public class AsyncBatchWaitOperator extends AbstractStreamOperator + implements OneInputStreamOperator, BoundedOneInput, ProcessingTimeCallback { + + private static final long serialVersionUID = 1L; + + /** Constant indicating timeout is disabled. */ + private static final long NO_TIMEOUT = 0L; + + /** Default window size for histogram metrics. */ + private static final int METRICS_HISTOGRAM_WINDOW_SIZE = 1000; + + // ================================================================================ + // Metric names - exposed as constants for testing and documentation + // ================================================================================ + + /** Metric name for batch size histogram. */ + public static final String METRIC_BATCH_SIZE = "batchSize"; + + /** Metric name for batch latency histogram (in milliseconds). */ + public static final String METRIC_BATCH_LATENCY_MS = "batchLatencyMs"; + + /** Metric name for async call duration histogram (in milliseconds). */ + public static final String METRIC_ASYNC_CALL_DURATION_MS = "asyncCallDurationMs"; + + /** Metric name for in-flight batches gauge. */ + public static final String METRIC_INFLIGHT_BATCHES = "inflightBatches"; + + /** Metric name for total batches processed counter. */ + public static final String METRIC_TOTAL_BATCHES_PROCESSED = "totalBatchesProcessed"; + + /** Metric name for total records processed counter. */ + public static final String METRIC_TOTAL_RECORDS_PROCESSED = "totalRecordsProcessed"; + + /** Metric name for async call failures counter. */ + public static final String METRIC_ASYNC_CALL_FAILURES = "asyncCallFailures"; + + /** Metric name for batch retry count counter. */ + public static final String METRIC_BATCH_RETRY_COUNT = "batchRetryCount"; + + /** Metric name for batch timeout count counter. */ + public static final String METRIC_BATCH_TIMEOUT_COUNT = "batchTimeoutCount"; + + // ================================================================================ + // Configuration fields + // ================================================================================ + + /** The async batch function to invoke. */ + private final AsyncBatchFunction asyncBatchFunction; + + /** Maximum batch size before triggering async invocation. */ + private final int maxBatchSize; + + /** + * Batch timeout in milliseconds. When positive, a timer is registered to flush the batch after + * this duration since the first buffered element. A value <= 0 disables timeout-based batching. + */ + private final long batchTimeoutMs; + + /** Retry strategy for failed batch operations. */ + private final AsyncBatchRetryStrategy retryStrategy; + + /** Timeout policy for async batch operations. */ + private final AsyncBatchTimeoutPolicy timeoutPolicy; + + /** Buffer for incoming stream records. */ + private transient List buffer; + + /** Mailbox executor for processing async results on the main thread. */ + private final transient MailboxExecutor mailboxExecutor; + + /** Counter for in-flight async operations. */ + private transient int inFlightCount; + + // ================================================================================ + // Timer state fields for timeout-based batching + // ================================================================================ + + /** + * The processing time when the current batch started (i.e., when first element was added to + * empty buffer). Used to calculate timer fire time. + */ + private transient long currentBatchStartTime; + + /** Whether a timer is currently registered for the current batch. */ + private transient boolean timerRegistered; + + // ================================================================================ + // Metrics fields + // ================================================================================ + + /** + * Histogram tracking the size of each batch. Useful for monitoring batch efficiency and tuning + * maxBatchSize parameter. + */ + private transient Histogram batchSizeHistogram; + + /** + * Histogram tracking batch latency in milliseconds. Measures time from when first element is + * added to buffer until batch is flushed. Helps identify buffering overhead. + */ + private transient Histogram batchLatencyHistogram; + + /** + * Histogram tracking async call duration in milliseconds. Measures time from async invocation + * to completion callback. Critical for monitoring inference latency. + */ + private transient Histogram asyncCallDurationHistogram; + + /** + * Gauge showing current number of in-flight batches. Useful for monitoring backpressure and + * concurrency. + */ + @SuppressWarnings("unused") // Registered as gauge, kept as field reference + private transient Gauge inflightBatchesGauge; + + /** Counter for total batches processed. */ + private transient Counter totalBatchesProcessedCounter; + + /** Counter for total records processed. */ + private transient Counter totalRecordsProcessedCounter; + + /** Counter for failed async calls. */ + private transient Counter asyncCallFailuresCounter; + + /** Counter for batch retry attempts. */ + private transient Counter batchRetryCounter; + + /** Counter for batch timeouts. */ + private transient Counter batchTimeoutCounter; + + /** + * Creates an AsyncBatchWaitOperator with size-based batching only (no timeout, no retry, no + * async timeout). + * + * @param parameters Stream operator parameters + * @param asyncBatchFunction The async batch function to invoke + * @param maxBatchSize Maximum batch size before triggering async invocation + * @param mailboxExecutor Mailbox executor for processing async results + */ + @SuppressWarnings("unchecked") + public AsyncBatchWaitOperator( + @Nonnull StreamOperatorParameters parameters, + @Nonnull AsyncBatchFunction asyncBatchFunction, + int maxBatchSize, + @Nonnull MailboxExecutor mailboxExecutor) { + this( + parameters, + asyncBatchFunction, + maxBatchSize, + NO_TIMEOUT, + mailboxExecutor, + AsyncBatchRetryStrategies.noRetry(), + AsyncBatchTimeoutPolicy.NO_TIMEOUT_POLICY); + } + + /** + * Creates an AsyncBatchWaitOperator with size-based and optional timeout-based batching (no + * retry, no async timeout). + * + * @param parameters Stream operator parameters + * @param asyncBatchFunction The async batch function to invoke + * @param maxBatchSize Maximum batch size before triggering async invocation + * @param batchTimeoutMs Batch timeout in milliseconds; <= 0 means disabled + * @param mailboxExecutor Mailbox executor for processing async results + */ + @SuppressWarnings("unchecked") + public AsyncBatchWaitOperator( + @Nonnull StreamOperatorParameters parameters, + @Nonnull AsyncBatchFunction asyncBatchFunction, + int maxBatchSize, + long batchTimeoutMs, + @Nonnull MailboxExecutor mailboxExecutor) { + this( + parameters, + asyncBatchFunction, + maxBatchSize, + batchTimeoutMs, + mailboxExecutor, + AsyncBatchRetryStrategies.noRetry(), + AsyncBatchTimeoutPolicy.NO_TIMEOUT_POLICY); + } + + /** + * Creates an AsyncBatchWaitOperator with full configuration including retry and timeout + * policies. + * + * @param parameters Stream operator parameters + * @param asyncBatchFunction The async batch function to invoke + * @param maxBatchSize Maximum batch size before triggering async invocation + * @param batchTimeoutMs Batch timeout in milliseconds; <= 0 means disabled + * @param mailboxExecutor Mailbox executor for processing async results + * @param retryStrategy Retry strategy for failed batch operations + * @param timeoutPolicy Timeout policy for async batch operations + */ + @SuppressWarnings("unchecked") + public AsyncBatchWaitOperator( + @Nonnull StreamOperatorParameters parameters, + @Nonnull AsyncBatchFunction asyncBatchFunction, + int maxBatchSize, + long batchTimeoutMs, + @Nonnull MailboxExecutor mailboxExecutor, + @Nonnull AsyncBatchRetryStrategy retryStrategy, + @Nonnull AsyncBatchTimeoutPolicy timeoutPolicy) { + Preconditions.checkArgument(maxBatchSize > 0, "maxBatchSize must be greater than 0"); + this.asyncBatchFunction = Preconditions.checkNotNull(asyncBatchFunction); + this.maxBatchSize = maxBatchSize; + this.batchTimeoutMs = batchTimeoutMs; + this.mailboxExecutor = Preconditions.checkNotNull(mailboxExecutor); + this.retryStrategy = + (AsyncBatchRetryStrategy) + Preconditions.checkNotNull(retryStrategy, "retryStrategy must not be null"); + this.timeoutPolicy = + Preconditions.checkNotNull(timeoutPolicy, "timeoutPolicy must not be null"); + + // Setup the operator using parameters + setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput()); + } + + @Override + public void open() throws Exception { + super.open(); + this.buffer = new ArrayList<>(maxBatchSize); + this.inFlightCount = 0; + this.currentBatchStartTime = 0L; + this.timerRegistered = false; + + // Initialize metrics + registerMetrics(); + } + + /** + * Registers all metrics for this operator. + * + *

Metrics are registered under the operator's metric group and provide visibility into batch + * processing behavior for AI/ML inference workloads. + */ + private void registerMetrics() { + MetricGroup metricGroup = metrics; + + // Histogram for batch sizes + this.batchSizeHistogram = + metricGroup.histogram( + METRIC_BATCH_SIZE, + new DescriptiveStatisticsHistogram(METRICS_HISTOGRAM_WINDOW_SIZE)); + + // Histogram for batch latency (time from first element to flush) + this.batchLatencyHistogram = + metricGroup.histogram( + METRIC_BATCH_LATENCY_MS, + new DescriptiveStatisticsHistogram(METRICS_HISTOGRAM_WINDOW_SIZE)); + + // Histogram for async call duration + this.asyncCallDurationHistogram = + metricGroup.histogram( + METRIC_ASYNC_CALL_DURATION_MS, + new DescriptiveStatisticsHistogram(METRICS_HISTOGRAM_WINDOW_SIZE)); + + // Gauge for in-flight batches + this.inflightBatchesGauge = metricGroup.gauge(METRIC_INFLIGHT_BATCHES, () -> inFlightCount); + + // Counter for total batches processed + this.totalBatchesProcessedCounter = metricGroup.counter(METRIC_TOTAL_BATCHES_PROCESSED); + + // Counter for total records processed + this.totalRecordsProcessedCounter = metricGroup.counter(METRIC_TOTAL_RECORDS_PROCESSED); + + // Counter for failed async calls + this.asyncCallFailuresCounter = metricGroup.counter(METRIC_ASYNC_CALL_FAILURES); + + // Counter for batch retries + this.batchRetryCounter = metricGroup.counter(METRIC_BATCH_RETRY_COUNT); + + // Counter for batch timeouts + this.batchTimeoutCounter = metricGroup.counter(METRIC_BATCH_TIMEOUT_COUNT); + } + + @Override + public void processElement(StreamRecord element) throws Exception { + // If buffer is empty and timeout is enabled, record batch start time and register timer + if (buffer.isEmpty() && isTimeoutEnabled()) { + currentBatchStartTime = getProcessingTimeService().getCurrentProcessingTime(); + registerBatchTimer(); + } + + // Record batch start time for latency tracking (even without timeout) + if (buffer.isEmpty() && !isTimeoutEnabled()) { + currentBatchStartTime = System.currentTimeMillis(); + } + + buffer.add(element.getValue()); + + // Size-triggered flush: cancel pending timer and flush + if (buffer.size() >= maxBatchSize) { + flushBuffer(); + } + } + + /** + * Callback when processing time timer fires. Flushes the buffer if non-empty. + * + * @param timestamp The timestamp for which the timer was registered + */ + @Override + public void onProcessingTime(long timestamp) throws Exception { + // Timer fired - clear timer state first + timerRegistered = false; + + // Flush buffer if non-empty (timeout-triggered flush) + if (!buffer.isEmpty()) { + flushBuffer(); + } + } + + /** Flush the current buffer by invoking the async batch function. */ + private void flushBuffer() throws Exception { + if (buffer.isEmpty()) { + return; + } + + // Calculate batch latency (time from first element to now) + long batchLatencyMs; + if (isTimeoutEnabled()) { + batchLatencyMs = + getProcessingTimeService().getCurrentProcessingTime() - currentBatchStartTime; + } else { + batchLatencyMs = System.currentTimeMillis() - currentBatchStartTime; + } + + // Clear timer state since we're flushing the batch + clearTimerState(); + + // Create a copy of the buffer and clear it for new incoming elements + List batch = new ArrayList<>(buffer); + buffer.clear(); + + // Update metrics + int batchSize = batch.size(); + batchSizeHistogram.update(batchSize); + batchLatencyHistogram.update(batchLatencyMs); + totalBatchesProcessedCounter.inc(); + totalRecordsProcessedCounter.inc(batchSize); + + // Increment in-flight counter + inFlightCount++; + + // Record async call start time for duration tracking + long asyncCallStartTime = System.currentTimeMillis(); + + // Create result handler for this batch with retry support + BatchResultHandler resultHandler = new BatchResultHandler(batch, asyncCallStartTime); + + // Invoke the async batch function + asyncBatchFunction.asyncInvokeBatch(batch, resultHandler); + + // Register timeout if configured + resultHandler.registerTimeout(); + } + + @Override + public void endInput() throws Exception { + // Flush any remaining elements in the buffer + flushBuffer(); + + // Wait for all in-flight async operations to complete + while (inFlightCount > 0) { + mailboxExecutor.yield(); + } + } + + @Override + public void close() throws Exception { + super.close(); + } + + // ================================================================================ + // Timer management methods + // ================================================================================ + + /** Check if timeout-based batching is enabled. */ + private boolean isTimeoutEnabled() { + return batchTimeoutMs > NO_TIMEOUT; + } + + /** Register a processing time timer for the current batch. */ + private void registerBatchTimer() { + if (!timerRegistered && isTimeoutEnabled()) { + long fireTime = currentBatchStartTime + batchTimeoutMs; + getProcessingTimeService().registerTimer(fireTime, this); + timerRegistered = true; + } + } + + /** + * Clear timer state. Note: We don't explicitly cancel the timer because: 1. The timer callback + * checks buffer state before flushing 2. Cancelling timers has overhead 3. Timer will be + * ignored if buffer is empty when it fires + */ + private void clearTimerState() { + timerRegistered = false; + currentBatchStartTime = 0L; + } + + // ================================================================================ + // Test accessors + // ================================================================================ + + /** Returns the current buffer size. Visible for testing. */ + @VisibleForTesting + int getBufferSize() { + return buffer != null ? buffer.size() : 0; + } + + /** Returns the current in-flight count. Visible for testing. */ + @VisibleForTesting + int getInFlightCount() { + return inFlightCount; + } + + /** Returns the batch size histogram. Visible for testing. */ + @VisibleForTesting + Histogram getBatchSizeHistogram() { + return batchSizeHistogram; + } + + /** Returns the batch latency histogram. Visible for testing. */ + @VisibleForTesting + Histogram getBatchLatencyHistogram() { + return batchLatencyHistogram; + } + + /** Returns the async call duration histogram. Visible for testing. */ + @VisibleForTesting + Histogram getAsyncCallDurationHistogram() { + return asyncCallDurationHistogram; + } + + /** Returns the total batches processed counter. Visible for testing. */ + @VisibleForTesting + Counter getTotalBatchesProcessedCounter() { + return totalBatchesProcessedCounter; + } + + /** Returns the total records processed counter. Visible for testing. */ + @VisibleForTesting + Counter getTotalRecordsProcessedCounter() { + return totalRecordsProcessedCounter; + } + + /** Returns the async call failures counter. Visible for testing. */ + @VisibleForTesting + Counter getAsyncCallFailuresCounter() { + return asyncCallFailuresCounter; + } + + /** Returns the batch retry counter. Visible for testing. */ + @VisibleForTesting + Counter getBatchRetryCounter() { + return batchRetryCounter; + } + + /** Returns the batch timeout counter. Visible for testing. */ + @VisibleForTesting + Counter getBatchTimeoutCounter() { + return batchTimeoutCounter; + } + + /** + * A handler for the results of a batch async invocation. + * + *

This handler supports: + * + *

    + *
  • Normal completion with results + *
  • Exceptional completion with retry support + *
  • Timeout handling with configurable behavior + *
+ */ + private class BatchResultHandler implements ResultFuture { + + /** Guard against multiple completions. */ + private final AtomicBoolean completed = new AtomicBoolean(false); + + /** Start time of the async call for duration tracking. */ + private final long asyncCallStartTime; + + /** The batch of inputs for potential retry. */ + private final List batch; + + /** Current retry attempt count. */ + private final AtomicInteger currentAttempts = new AtomicInteger(1); + + /** Scheduled timeout future, if timeout is enabled. */ + private volatile ScheduledFuture timeoutFuture; + + /** Flag to track if timeout has occurred. */ + private final AtomicBoolean timedOut = new AtomicBoolean(false); + + BatchResultHandler(List batch, long asyncCallStartTime) { + this.batch = batch; + this.asyncCallStartTime = asyncCallStartTime; + } + + /** Register timeout if timeout policy is enabled. */ + void registerTimeout() { + if (timeoutPolicy.isTimeoutEnabled()) { + // Use ProcessingTimeService to register timeout timer + long timeoutFireTime = + getProcessingTimeService().getCurrentProcessingTime() + + timeoutPolicy.getTimeoutMs(); + timeoutFuture = + getProcessingTimeService() + .registerTimer(timeoutFireTime, timestamp -> handleTimeout()); + } + } + + /** Handle timeout expiration. */ + private void handleTimeout() { + if (timedOut.compareAndSet(false, true) && !completed.get()) { + // Cancel any pending operations + cancelTimeoutFuture(); + + // Update timeout metric + batchTimeoutCounter.inc(); + + // Record duration + long duration = System.currentTimeMillis() - asyncCallStartTime; + asyncCallDurationHistogram.update(duration); + + if (timeoutPolicy.shouldAllowPartialOnTimeout()) { + // Allow partial results - emit empty collection + mailboxExecutor.execute( + () -> { + if (completed.compareAndSet(false, true)) { + // Emit empty results (no results available on timeout) + inFlightCount--; + } + }, + "AsyncBatchWaitOperator#handleTimeoutPartial"); + } else { + // Fail on timeout + if (completed.compareAndSet(false, true)) { + asyncCallFailuresCounter.inc(); + getContainingTask() + .getEnvironment() + .failExternally( + new TimeoutException( + "Async batch operation timed out after " + + timeoutPolicy.getTimeoutMs() + + " ms")); + mailboxExecutor.execute( + () -> inFlightCount--, + "AsyncBatchWaitOperator#decrementInFlightOnTimeout"); + } + } + } + } + + /** Cancel the timeout future if it exists. */ + private void cancelTimeoutFuture() { + if (timeoutFuture != null && !timeoutFuture.isDone()) { + timeoutFuture.cancel(false); + } + } + + @Override + public void complete(Collection results) { + Preconditions.checkNotNull( + results, "Results must not be null, use empty collection to emit nothing"); + + // Check if already timed out + if (timedOut.get()) { + return; + } + + // Check if retry is needed based on result predicate + AsyncBatchRetryPredicate retryPredicate = retryStrategy.getRetryPredicate(); + Optional>> resultPredicateOpt = + retryPredicate.resultPredicate(); + + if (resultPredicateOpt.isPresent() + && resultPredicateOpt.get().test(results) + && retryStrategy.canRetry(currentAttempts.get())) { + // Schedule retry + scheduleRetry(null); + return; + } + + if (!completed.compareAndSet(false, true)) { + return; + } + + // Cancel timeout + cancelTimeoutFuture(); + + // Process results in the mailbox thread + mailboxExecutor.execute( + () -> processResults(results), "AsyncBatchWaitOperator#processResults"); + } + + @Override + public void completeExceptionally(Throwable error) { + // Check if already timed out + if (timedOut.get()) { + return; + } + + // Check if retry is needed based on exception predicate + AsyncBatchRetryPredicate retryPredicate = retryStrategy.getRetryPredicate(); + Optional> exceptionPredicateOpt = + retryPredicate.exceptionPredicate(); + + if (exceptionPredicateOpt.isPresent() + && exceptionPredicateOpt.get().test(error) + && retryStrategy.canRetry(currentAttempts.get())) { + // Schedule retry + scheduleRetry(error); + return; + } + + if (!completed.compareAndSet(false, true)) { + return; + } + + // Cancel timeout + cancelTimeoutFuture(); + + // Update failure metric + asyncCallFailuresCounter.inc(); + + // Record async call duration even for failures + long duration = System.currentTimeMillis() - asyncCallStartTime; + asyncCallDurationHistogram.update(duration); + + // Signal failure through the containing task + getContainingTask() + .getEnvironment() + .failExternally(new Exception("Async batch operation failed.", error)); + + // Decrement in-flight counter in mailbox thread + mailboxExecutor.execute( + () -> inFlightCount--, "AsyncBatchWaitOperator#decrementInFlight"); + } + + @Override + public void complete(CollectionSupplier supplier) { + Preconditions.checkNotNull( + supplier, "Supplier must not be null, return empty collection to emit nothing"); + + // Check if already timed out + if (timedOut.get()) { + return; + } + + if (!completed.compareAndSet(false, true)) { + return; + } + + // Cancel timeout + cancelTimeoutFuture(); + + mailboxExecutor.execute( + () -> { + try { + processResults(supplier.get()); + } catch (Throwable t) { + // Update failure metric + asyncCallFailuresCounter.inc(); + + // Record async call duration even for failures + long duration = System.currentTimeMillis() - asyncCallStartTime; + asyncCallDurationHistogram.update(duration); + + getContainingTask() + .getEnvironment() + .failExternally( + new Exception("Async batch operation failed.", t)); + inFlightCount--; + } + }, + "AsyncBatchWaitOperator#processResultsFromSupplier"); + } + + /** + * Schedule a retry attempt after the backoff delay. + * + * @param previousError the error that triggered the retry, or null if retry is based on + * result + */ + private void scheduleRetry(Throwable previousError) { + int attempt = currentAttempts.getAndIncrement(); + long backoffMs = retryStrategy.getBackoffTimeMillis(attempt); + + // Update retry metric + batchRetryCounter.inc(); + + // Schedule retry using ProcessingTimeService timer + long retryFireTime = getProcessingTimeService().getCurrentProcessingTime() + backoffMs; + getProcessingTimeService() + .registerTimer(retryFireTime, timestamp -> executeRetry(previousError)); + } + + /** + * Execute a retry attempt. + * + * @param previousError the error that triggered the retry, or null if retry is based on + * result + */ + private void executeRetry(Throwable previousError) { + // Check if already timed out or completed + if (timedOut.get() || completed.get()) { + return; + } + + try { + // Create a new result handler for this retry (reusing current handler state) + asyncBatchFunction.asyncInvokeBatch(batch, this); + } catch (Exception e) { + // Retry invocation failed immediately + Throwable cause = previousError != null ? previousError : e; + if (completed.compareAndSet(false, true)) { + cancelTimeoutFuture(); + asyncCallFailuresCounter.inc(); + long duration = System.currentTimeMillis() - asyncCallStartTime; + asyncCallDurationHistogram.update(duration); + getContainingTask() + .getEnvironment() + .failExternally( + new Exception("Async batch operation retry failed.", cause)); + mailboxExecutor.execute( + () -> inFlightCount--, + "AsyncBatchWaitOperator#decrementInFlightOnRetryFail"); + } + } + } + + private void processResults(Collection results) { + // Record async call duration + long duration = System.currentTimeMillis() - asyncCallStartTime; + asyncCallDurationHistogram.update(duration); + + // Emit all results downstream + for (OUT result : results) { + output.collect(new StreamRecord<>(result)); + } + // Decrement in-flight counter + inFlightCount--; + } + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncBatchWaitOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncBatchWaitOperatorFactory.java new file mode 100644 index 0000000000000..9c1379e5bb1bc --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncBatchWaitOperatorFactory.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.functions.async.AsyncBatchFunction; +import org.apache.flink.streaming.api.functions.async.AsyncBatchRetryStrategy; +import org.apache.flink.streaming.api.functions.async.AsyncBatchTimeoutPolicy; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.api.operators.legacy.YieldingOperatorFactory; +import org.apache.flink.streaming.util.retryable.AsyncBatchRetryStrategies; + +/** + * The factory of {@link AsyncBatchWaitOperator}. + * + * @param The input type of the operator + * @param The output type of the operator + */ +@Internal +public class AsyncBatchWaitOperatorFactory extends AbstractStreamOperatorFactory + implements OneInputStreamOperatorFactory, YieldingOperatorFactory { + + private static final long serialVersionUID = 1L; + + /** Constant indicating timeout is disabled. */ + private static final long NO_TIMEOUT = 0L; + + private final AsyncBatchFunction asyncBatchFunction; + private final int maxBatchSize; + private final long batchTimeoutMs; + private final AsyncBatchRetryStrategy retryStrategy; + private final AsyncBatchTimeoutPolicy timeoutPolicy; + + /** + * Creates a factory with size-based batching only (no timeout, no retry, no async timeout). + * + * @param asyncBatchFunction The async batch function + * @param maxBatchSize Maximum batch size before triggering async invocation + */ + public AsyncBatchWaitOperatorFactory( + AsyncBatchFunction asyncBatchFunction, int maxBatchSize) { + this(asyncBatchFunction, maxBatchSize, NO_TIMEOUT); + } + + /** + * Creates a factory with size-based and optional timeout-based batching (no retry, no async + * timeout). + * + * @param asyncBatchFunction The async batch function + * @param maxBatchSize Maximum batch size before triggering async invocation + * @param batchTimeoutMs Batch timeout in milliseconds; <= 0 means disabled + */ + @SuppressWarnings("unchecked") + public AsyncBatchWaitOperatorFactory( + AsyncBatchFunction asyncBatchFunction, int maxBatchSize, long batchTimeoutMs) { + this( + asyncBatchFunction, + maxBatchSize, + batchTimeoutMs, + AsyncBatchRetryStrategies.noRetry(), + AsyncBatchTimeoutPolicy.NO_TIMEOUT_POLICY); + } + + /** + * Creates a factory with full configuration including retry and timeout policies. + * + * @param asyncBatchFunction The async batch function + * @param maxBatchSize Maximum batch size before triggering async invocation + * @param batchTimeoutMs Batch timeout in milliseconds; <= 0 means disabled + * @param retryStrategy Retry strategy for failed batch operations + * @param timeoutPolicy Timeout policy for async batch operations + */ + public AsyncBatchWaitOperatorFactory( + AsyncBatchFunction asyncBatchFunction, + int maxBatchSize, + long batchTimeoutMs, + AsyncBatchRetryStrategy retryStrategy, + AsyncBatchTimeoutPolicy timeoutPolicy) { + this.asyncBatchFunction = asyncBatchFunction; + this.maxBatchSize = maxBatchSize; + this.batchTimeoutMs = batchTimeoutMs; + this.retryStrategy = retryStrategy; + this.timeoutPolicy = timeoutPolicy; + this.chainingStrategy = ChainingStrategy.ALWAYS; + } + + @Override + @SuppressWarnings("unchecked") + public > T createStreamOperator( + StreamOperatorParameters parameters) { + AsyncBatchWaitOperator operator = + new AsyncBatchWaitOperator<>( + parameters, + asyncBatchFunction, + maxBatchSize, + batchTimeoutMs, + getMailboxExecutor(), + retryStrategy, + timeoutPolicy); + return (T) operator; + } + + @Override + public Class getStreamOperatorClass(ClassLoader classLoader) { + return AsyncBatchWaitOperator.class; + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OrderedAsyncBatchWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OrderedAsyncBatchWaitOperator.java new file mode 100644 index 0000000000000..657605b0c1eb8 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OrderedAsyncBatchWaitOperator.java @@ -0,0 +1,674 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram; +import org.apache.flink.streaming.api.functions.async.AsyncBatchFunction; +import org.apache.flink.streaming.api.functions.async.CollectionSupplier; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * The {@link OrderedAsyncBatchWaitOperator} batches incoming stream records and invokes the {@link + * AsyncBatchFunction} when the batch size reaches the configured maximum or when the batch timeout + * is reached. + * + *

This operator implements ordered semantics - output records are emitted in the same + * order as input records, even though async batch invocations may complete out-of-order internally. + * + *

Ordering is achieved by: + * + *

    + *
  • Assigning a monotonic sequence number to each batch + *
  • Buffering completed batch results in a pending results map + *
  • Emitting results strictly in sequence order + *
+ * + *

Key behaviors: + * + *

    + *
  • Buffer incoming records until batch size is reached OR timeout expires + *
  • Flush remaining records when end of input is signaled + *
  • Wait for all batches to complete and emit in order before finishing + *
+ * + *

Timer lifecycle (when batchTimeoutMs > 0): + * + *

    + *
  • Timer is registered when first element is added to an empty buffer + *
  • Timer fires at: currentBatchStartTime + batchTimeoutMs + *
  • Timer is cleared when batch is flushed (by size, timeout, or end-of-input) + *
  • At most one timer is active at any time + *
+ * + *

Metrics

+ * + *

This operator exposes the following metrics for monitoring AI/ML inference workloads: + * + *

    + *
  • {@code batchSize} - Histogram of batch sizes (number of records per batch) + *
  • {@code batchLatencyMs} - Histogram of batch latency in milliseconds (time from first + * element buffered to batch flush) + *
  • {@code asyncCallDurationMs} - Histogram of async call duration in milliseconds (time from + * async invocation to completion) + *
  • {@code inflightBatches} - Gauge showing current number of in-flight async batch operations + *
  • {@code totalBatchesProcessed} - Counter of total batches processed + *
  • {@code totalRecordsProcessed} - Counter of total records processed + *
  • {@code asyncCallFailures} - Counter of failed async calls + *
  • {@code pendingOrderedBatches} - Gauge showing batches waiting for in-order emission + *
+ * + *

Future enhancements may include: + * + *

    + *
  • Event-time or watermark-based ordering + *
  • Multiple inflight batches concurrency control + *
  • Retry logic + *
+ * + * @param Input type for the operator. + * @param Output type for the operator. + */ +@Internal +public class OrderedAsyncBatchWaitOperator extends AbstractStreamOperator + implements OneInputStreamOperator, BoundedOneInput, ProcessingTimeCallback { + + private static final long serialVersionUID = 1L; + + /** Constant indicating timeout is disabled. */ + private static final long NO_TIMEOUT = 0L; + + /** Default window size for histogram metrics. */ + private static final int METRICS_HISTOGRAM_WINDOW_SIZE = 1000; + + // ================================================================================ + // Metric names - exposed as constants for testing and documentation + // ================================================================================ + + /** Metric name for batch size histogram. */ + public static final String METRIC_BATCH_SIZE = "batchSize"; + + /** Metric name for batch latency histogram (in milliseconds). */ + public static final String METRIC_BATCH_LATENCY_MS = "batchLatencyMs"; + + /** Metric name for async call duration histogram (in milliseconds). */ + public static final String METRIC_ASYNC_CALL_DURATION_MS = "asyncCallDurationMs"; + + /** Metric name for in-flight batches gauge. */ + public static final String METRIC_INFLIGHT_BATCHES = "inflightBatches"; + + /** Metric name for total batches processed counter. */ + public static final String METRIC_TOTAL_BATCHES_PROCESSED = "totalBatchesProcessed"; + + /** Metric name for total records processed counter. */ + public static final String METRIC_TOTAL_RECORDS_PROCESSED = "totalRecordsProcessed"; + + /** Metric name for async call failures counter. */ + public static final String METRIC_ASYNC_CALL_FAILURES = "asyncCallFailures"; + + /** Metric name for pending ordered batches gauge. */ + public static final String METRIC_PENDING_ORDERED_BATCHES = "pendingOrderedBatches"; + + // ================================================================================ + // Configuration fields + // ================================================================================ + + /** The async batch function to invoke. */ + private final AsyncBatchFunction asyncBatchFunction; + + /** Maximum batch size before triggering async invocation. */ + private final int maxBatchSize; + + /** + * Batch timeout in milliseconds. When positive, a timer is registered to flush the batch after + * this duration since the first buffered element. A value <= 0 disables timeout-based batching. + */ + private final long batchTimeoutMs; + + /** Buffer for incoming stream records. */ + private transient List buffer; + + /** Mailbox executor for processing async results on the main thread. */ + private final transient MailboxExecutor mailboxExecutor; + + /** Counter for in-flight async operations. */ + private transient int inFlightCount; + + // ================================================================================ + // Timer state fields for timeout-based batching + // ================================================================================ + + /** + * The processing time when the current batch started (i.e., when first element was added to + * empty buffer). Used to calculate timer fire time. + */ + private transient long currentBatchStartTime; + + /** Whether a timer is currently registered for the current batch. */ + private transient boolean timerRegistered; + + // ================================================================================ + // Ordered emission state fields + // ================================================================================ + + /** + * The sequence number to assign to the next batch. Monotonically increasing, starting from 0. + */ + private transient long nextBatchSequenceNumber; + + /** + * The sequence number of the next batch whose results should be emitted. Used to ensure + * strictly ordered output emission. + */ + private transient long nextExpectedSequenceNumber; + + /** + * Pending results buffer. Maps batch sequence number to completed results. Results are held + * here until all preceding batches have been emitted. + */ + private transient Map> pendingResults; + + // ================================================================================ + // Metrics fields + // ================================================================================ + + /** + * Histogram tracking the size of each batch. Useful for monitoring batch efficiency and tuning + * maxBatchSize parameter. + */ + private transient Histogram batchSizeHistogram; + + /** + * Histogram tracking batch latency in milliseconds. Measures time from when first element is + * added to buffer until batch is flushed. Helps identify buffering overhead. + */ + private transient Histogram batchLatencyHistogram; + + /** + * Histogram tracking async call duration in milliseconds. Measures time from async invocation + * to completion callback. Critical for monitoring inference latency. + */ + private transient Histogram asyncCallDurationHistogram; + + /** + * Gauge showing current number of in-flight batches. Useful for monitoring backpressure and + * concurrency. + */ + @SuppressWarnings("unused") // Registered as gauge, kept as field reference + private transient Gauge inflightBatchesGauge; + + /** Gauge showing number of batches waiting for in-order emission. */ + @SuppressWarnings("unused") // Registered as gauge, kept as field reference + private transient Gauge pendingOrderedBatchesGauge; + + /** Counter for total batches processed. */ + private transient Counter totalBatchesProcessedCounter; + + /** Counter for total records processed. */ + private transient Counter totalRecordsProcessedCounter; + + /** Counter for failed async calls. */ + private transient Counter asyncCallFailuresCounter; + + /** + * Creates an OrderedAsyncBatchWaitOperator with size-based batching only (no timeout). + * + * @param parameters Stream operator parameters + * @param asyncBatchFunction The async batch function to invoke + * @param maxBatchSize Maximum batch size before triggering async invocation + * @param mailboxExecutor Mailbox executor for processing async results + */ + public OrderedAsyncBatchWaitOperator( + @Nonnull StreamOperatorParameters parameters, + @Nonnull AsyncBatchFunction asyncBatchFunction, + int maxBatchSize, + @Nonnull MailboxExecutor mailboxExecutor) { + this(parameters, asyncBatchFunction, maxBatchSize, NO_TIMEOUT, mailboxExecutor); + } + + /** + * Creates an OrderedAsyncBatchWaitOperator with size-based and optional timeout-based batching. + * + * @param parameters Stream operator parameters + * @param asyncBatchFunction The async batch function to invoke + * @param maxBatchSize Maximum batch size before triggering async invocation + * @param batchTimeoutMs Batch timeout in milliseconds; <= 0 means disabled + * @param mailboxExecutor Mailbox executor for processing async results + */ + public OrderedAsyncBatchWaitOperator( + @Nonnull StreamOperatorParameters parameters, + @Nonnull AsyncBatchFunction asyncBatchFunction, + int maxBatchSize, + long batchTimeoutMs, + @Nonnull MailboxExecutor mailboxExecutor) { + Preconditions.checkArgument(maxBatchSize > 0, "maxBatchSize must be greater than 0"); + this.asyncBatchFunction = Preconditions.checkNotNull(asyncBatchFunction); + this.maxBatchSize = maxBatchSize; + this.batchTimeoutMs = batchTimeoutMs; + this.mailboxExecutor = Preconditions.checkNotNull(mailboxExecutor); + + // Setup the operator using parameters + setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput()); + } + + @Override + public void open() throws Exception { + super.open(); + this.buffer = new ArrayList<>(maxBatchSize); + this.inFlightCount = 0; + this.currentBatchStartTime = 0L; + this.timerRegistered = false; + + // Initialize ordered emission state + this.nextBatchSequenceNumber = 0L; + this.nextExpectedSequenceNumber = 0L; + this.pendingResults = new TreeMap<>(); + + // Initialize metrics + registerMetrics(); + } + + /** + * Registers all metrics for this operator. + * + *

Metrics are registered under the operator's metric group and provide visibility into batch + * processing behavior for AI/ML inference workloads. + */ + private void registerMetrics() { + MetricGroup metricGroup = metrics; + + // Histogram for batch sizes + this.batchSizeHistogram = + metricGroup.histogram( + METRIC_BATCH_SIZE, + new DescriptiveStatisticsHistogram(METRICS_HISTOGRAM_WINDOW_SIZE)); + + // Histogram for batch latency (time from first element to flush) + this.batchLatencyHistogram = + metricGroup.histogram( + METRIC_BATCH_LATENCY_MS, + new DescriptiveStatisticsHistogram(METRICS_HISTOGRAM_WINDOW_SIZE)); + + // Histogram for async call duration + this.asyncCallDurationHistogram = + metricGroup.histogram( + METRIC_ASYNC_CALL_DURATION_MS, + new DescriptiveStatisticsHistogram(METRICS_HISTOGRAM_WINDOW_SIZE)); + + // Gauge for in-flight batches + this.inflightBatchesGauge = metricGroup.gauge(METRIC_INFLIGHT_BATCHES, () -> inFlightCount); + + // Gauge for pending ordered batches (specific to ordered operator) + this.pendingOrderedBatchesGauge = + metricGroup.gauge( + METRIC_PENDING_ORDERED_BATCHES, + () -> pendingResults != null ? pendingResults.size() : 0); + + // Counter for total batches processed + this.totalBatchesProcessedCounter = metricGroup.counter(METRIC_TOTAL_BATCHES_PROCESSED); + + // Counter for total records processed + this.totalRecordsProcessedCounter = metricGroup.counter(METRIC_TOTAL_RECORDS_PROCESSED); + + // Counter for failed async calls + this.asyncCallFailuresCounter = metricGroup.counter(METRIC_ASYNC_CALL_FAILURES); + } + + @Override + public void processElement(StreamRecord element) throws Exception { + // If buffer is empty and timeout is enabled, record batch start time and register timer + if (buffer.isEmpty() && isTimeoutEnabled()) { + currentBatchStartTime = getProcessingTimeService().getCurrentProcessingTime(); + registerBatchTimer(); + } + + // Record batch start time for latency tracking (even without timeout) + if (buffer.isEmpty() && !isTimeoutEnabled()) { + currentBatchStartTime = System.currentTimeMillis(); + } + + buffer.add(element.getValue()); + + // Size-triggered flush: cancel pending timer and flush + if (buffer.size() >= maxBatchSize) { + flushBuffer(); + } + } + + /** + * Callback when processing time timer fires. Flushes the buffer if non-empty. + * + * @param timestamp The timestamp for which the timer was registered + */ + @Override + public void onProcessingTime(long timestamp) throws Exception { + // Timer fired - clear timer state first + timerRegistered = false; + + // Flush buffer if non-empty (timeout-triggered flush) + if (!buffer.isEmpty()) { + flushBuffer(); + } + } + + /** Flush the current buffer by invoking the async batch function. */ + private void flushBuffer() throws Exception { + if (buffer.isEmpty()) { + return; + } + + // Calculate batch latency (time from first element to now) + long batchLatencyMs; + if (isTimeoutEnabled()) { + batchLatencyMs = + getProcessingTimeService().getCurrentProcessingTime() - currentBatchStartTime; + } else { + batchLatencyMs = System.currentTimeMillis() - currentBatchStartTime; + } + + // Clear timer state since we're flushing the batch + clearTimerState(); + + // Create a copy of the buffer and clear it for new incoming elements + List batch = new ArrayList<>(buffer); + buffer.clear(); + + // Update metrics + int batchSize = batch.size(); + batchSizeHistogram.update(batchSize); + batchLatencyHistogram.update(batchLatencyMs); + totalBatchesProcessedCounter.inc(); + totalRecordsProcessedCounter.inc(batchSize); + + // Assign sequence number to this batch and increment counter + long batchSequenceNumber = nextBatchSequenceNumber++; + + // Increment in-flight counter + inFlightCount++; + + // Record async call start time for duration tracking + long asyncCallStartTime = System.currentTimeMillis(); + + // Create result handler for this batch with its sequence number + OrderedBatchResultHandler resultHandler = + new OrderedBatchResultHandler(batchSequenceNumber, asyncCallStartTime); + + // Invoke the async batch function + asyncBatchFunction.asyncInvokeBatch(batch, resultHandler); + } + + @Override + public void endInput() throws Exception { + // Flush any remaining elements in the buffer + flushBuffer(); + + // Wait for all in-flight async operations to complete and emit results in order + while (inFlightCount > 0 || !pendingResults.isEmpty()) { + mailboxExecutor.yield(); + } + } + + @Override + public void close() throws Exception { + super.close(); + } + + // ================================================================================ + // Timer management methods + // ================================================================================ + + /** Check if timeout-based batching is enabled. */ + private boolean isTimeoutEnabled() { + return batchTimeoutMs > NO_TIMEOUT; + } + + /** Register a processing time timer for the current batch. */ + private void registerBatchTimer() { + if (!timerRegistered && isTimeoutEnabled()) { + long fireTime = currentBatchStartTime + batchTimeoutMs; + getProcessingTimeService().registerTimer(fireTime, this); + timerRegistered = true; + } + } + + /** + * Clear timer state. Note: We don't explicitly cancel the timer because: 1. The timer callback + * checks buffer state before flushing 2. Cancelling timers has overhead 3. Timer will be + * ignored if buffer is empty when it fires + */ + private void clearTimerState() { + timerRegistered = false; + currentBatchStartTime = 0L; + } + + // ================================================================================ + // Ordered emission methods + // ================================================================================ + + /** + * Try to emit results in order. Called when a batch completes. Emits all consecutive completed + * batches starting from nextExpectedSequenceNumber. + */ + private void tryEmitInOrder() { + // Emit results in strict sequence order + while (pendingResults.containsKey(nextExpectedSequenceNumber)) { + Collection results = pendingResults.remove(nextExpectedSequenceNumber); + + // Emit all results from this batch + for (OUT result : results) { + output.collect(new StreamRecord<>(result)); + } + + // Move to next expected sequence number + nextExpectedSequenceNumber++; + } + } + + // ================================================================================ + // Test accessors + // ================================================================================ + + /** Returns the current buffer size. Visible for testing. */ + @VisibleForTesting + int getBufferSize() { + return buffer != null ? buffer.size() : 0; + } + + /** Returns the number of pending result batches. Visible for testing. */ + @VisibleForTesting + int getPendingResultsCount() { + return pendingResults != null ? pendingResults.size() : 0; + } + + /** Returns the current in-flight count. Visible for testing. */ + @VisibleForTesting + int getInFlightCount() { + return inFlightCount; + } + + /** Returns the batch size histogram. Visible for testing. */ + @VisibleForTesting + Histogram getBatchSizeHistogram() { + return batchSizeHistogram; + } + + /** Returns the batch latency histogram. Visible for testing. */ + @VisibleForTesting + Histogram getBatchLatencyHistogram() { + return batchLatencyHistogram; + } + + /** Returns the async call duration histogram. Visible for testing. */ + @VisibleForTesting + Histogram getAsyncCallDurationHistogram() { + return asyncCallDurationHistogram; + } + + /** Returns the total batches processed counter. Visible for testing. */ + @VisibleForTesting + Counter getTotalBatchesProcessedCounter() { + return totalBatchesProcessedCounter; + } + + /** Returns the total records processed counter. Visible for testing. */ + @VisibleForTesting + Counter getTotalRecordsProcessedCounter() { + return totalRecordsProcessedCounter; + } + + /** Returns the async call failures counter. Visible for testing. */ + @VisibleForTesting + Counter getAsyncCallFailuresCounter() { + return asyncCallFailuresCounter; + } + + /** + * A handler for the results of a batch async invocation that maintains ordering. + * + *

Results are stored in the pending results buffer and emitted in sequence order. + */ + private class OrderedBatchResultHandler implements ResultFuture { + + /** Guard against multiple completions. */ + private final AtomicBoolean completed = new AtomicBoolean(false); + + /** The sequence number of this batch. */ + private final long batchSequenceNumber; + + /** Start time of the async call for duration tracking. */ + private final long asyncCallStartTime; + + OrderedBatchResultHandler(long batchSequenceNumber, long asyncCallStartTime) { + this.batchSequenceNumber = batchSequenceNumber; + this.asyncCallStartTime = asyncCallStartTime; + } + + @Override + public void complete(Collection results) { + Preconditions.checkNotNull( + results, "Results must not be null, use empty collection to emit nothing"); + + if (!completed.compareAndSet(false, true)) { + return; + } + + // Process results in the mailbox thread + mailboxExecutor.execute( + () -> processResultsOrdered(results), + "OrderedAsyncBatchWaitOperator#processResultsOrdered"); + } + + @Override + public void completeExceptionally(Throwable error) { + if (!completed.compareAndSet(false, true)) { + return; + } + + // Update failure metric + asyncCallFailuresCounter.inc(); + + // Record async call duration even for failures + long duration = System.currentTimeMillis() - asyncCallStartTime; + asyncCallDurationHistogram.update(duration); + + // Signal failure through the containing task + getContainingTask() + .getEnvironment() + .failExternally(new Exception("Async batch operation failed.", error)); + + // Decrement in-flight counter in mailbox thread + mailboxExecutor.execute( + () -> inFlightCount--, "OrderedAsyncBatchWaitOperator#decrementInFlight"); + } + + @Override + public void complete(CollectionSupplier supplier) { + Preconditions.checkNotNull( + supplier, "Supplier must not be null, return empty collection to emit nothing"); + + if (!completed.compareAndSet(false, true)) { + return; + } + + mailboxExecutor.execute( + () -> { + try { + processResultsOrdered(supplier.get()); + } catch (Throwable t) { + // Update failure metric + asyncCallFailuresCounter.inc(); + + // Record async call duration even for failures + long duration = System.currentTimeMillis() - asyncCallStartTime; + asyncCallDurationHistogram.update(duration); + + getContainingTask() + .getEnvironment() + .failExternally( + new Exception("Async batch operation failed.", t)); + inFlightCount--; + } + }, + "OrderedAsyncBatchWaitOperator#processResultsFromSupplier"); + } + + /** + * Process results with ordering guarantee. + * + *

Results are added to the pending buffer and then we try to emit all consecutive + * completed batches in order. + */ + private void processResultsOrdered(Collection results) { + // Record async call duration + long duration = System.currentTimeMillis() - asyncCallStartTime; + asyncCallDurationHistogram.update(duration); + + // Store results in pending buffer keyed by sequence number + pendingResults.put(batchSequenceNumber, new ArrayList<>(results)); + + // Try to emit all consecutive completed batches + tryEmitInOrder(); + + // Decrement in-flight counter + inFlightCount--; + } + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OrderedAsyncBatchWaitOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OrderedAsyncBatchWaitOperatorFactory.java new file mode 100644 index 0000000000000..4801245f903d7 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/OrderedAsyncBatchWaitOperatorFactory.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.functions.async.AsyncBatchFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.api.operators.legacy.YieldingOperatorFactory; + +/** + * The factory of {@link OrderedAsyncBatchWaitOperator}. + * + *

This factory creates operators that maintain ordering guarantees - output records are emitted + * in the same order as input records, regardless of async completion order. + * + * @param The input type of the operator + * @param The output type of the operator + */ +@Internal +public class OrderedAsyncBatchWaitOperatorFactory + extends AbstractStreamOperatorFactory + implements OneInputStreamOperatorFactory, YieldingOperatorFactory { + + private static final long serialVersionUID = 1L; + + /** Constant indicating timeout is disabled. */ + private static final long NO_TIMEOUT = 0L; + + private final AsyncBatchFunction asyncBatchFunction; + private final int maxBatchSize; + private final long batchTimeoutMs; + + /** + * Creates a factory with size-based batching only (no timeout). + * + * @param asyncBatchFunction The async batch function + * @param maxBatchSize Maximum batch size before triggering async invocation + */ + public OrderedAsyncBatchWaitOperatorFactory( + AsyncBatchFunction asyncBatchFunction, int maxBatchSize) { + this(asyncBatchFunction, maxBatchSize, NO_TIMEOUT); + } + + /** + * Creates a factory with size-based and optional timeout-based batching. + * + * @param asyncBatchFunction The async batch function + * @param maxBatchSize Maximum batch size before triggering async invocation + * @param batchTimeoutMs Batch timeout in milliseconds; <= 0 means disabled + */ + public OrderedAsyncBatchWaitOperatorFactory( + AsyncBatchFunction asyncBatchFunction, int maxBatchSize, long batchTimeoutMs) { + this.asyncBatchFunction = asyncBatchFunction; + this.maxBatchSize = maxBatchSize; + this.batchTimeoutMs = batchTimeoutMs; + this.chainingStrategy = ChainingStrategy.ALWAYS; + } + + @Override + @SuppressWarnings("unchecked") + public > T createStreamOperator( + StreamOperatorParameters parameters) { + OrderedAsyncBatchWaitOperator operator = + new OrderedAsyncBatchWaitOperator<>( + parameters, + asyncBatchFunction, + maxBatchSize, + batchTimeoutMs, + getMailboxExecutor()); + return (T) operator; + } + + @Override + public Class getStreamOperatorClass(ClassLoader classLoader) { + return OrderedAsyncBatchWaitOperator.class; + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/AsyncBatchRetryStrategies.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/AsyncBatchRetryStrategies.java new file mode 100644 index 0000000000000..28bc33c475dfa --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/retryable/AsyncBatchRetryStrategies.java @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.retryable; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.streaming.api.functions.async.AsyncBatchRetryPredicate; +import org.apache.flink.streaming.api.functions.async.AsyncBatchRetryStrategy; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.util.Collection; +import java.util.Optional; +import java.util.function.Predicate; + +/** + * Utility class to create concrete {@link AsyncBatchRetryStrategy} implementations. + * + *

Provides commonly used retry strategies for batch async operations: + * + *

    + *
  • {@link FixedDelayRetryStrategy} - retries with a fixed delay between attempts + *
  • {@link ExponentialBackoffDelayRetryStrategy} - retries with exponentially increasing delays + *
+ * + *

NOTICE: For performance reasons, this utility's {@link AsyncBatchRetryStrategy} + * implementation assumes the attempt always starts from 1 and will only increase by 1 each time. + * + *

Example usage: + * + *

{@code
+ * // Fixed delay retry: max 3 attempts, 100ms between retries
+ * AsyncBatchRetryStrategy fixedDelay = new AsyncBatchRetryStrategies
+ *     .FixedDelayRetryStrategyBuilder(3, 100L)
+ *     .ifException(e -> e instanceof IOException)
+ *     .build();
+ *
+ * // Exponential backoff: max 5 attempts, initial 100ms, max 10s, multiplier 2.0
+ * AsyncBatchRetryStrategy exponential = new AsyncBatchRetryStrategies
+ *     .ExponentialBackoffDelayRetryStrategyBuilder(5, 100L, 10000L, 2.0)
+ *     .ifException(e -> e instanceof TimeoutException)
+ *     .build();
+ * }
+ */ +@PublicEvolving +public class AsyncBatchRetryStrategies { + + /** A strategy that never retries. Use this as the default when no retry is needed. */ + @SuppressWarnings("rawtypes") + public static final AsyncBatchRetryStrategy NO_RETRY_STRATEGY = new NoRetryStrategy(); + + /** + * Returns a type-safe no-retry strategy. + * + * @param the output type + * @return a strategy that never retries + */ + @SuppressWarnings("unchecked") + public static AsyncBatchRetryStrategy noRetry() { + return (AsyncBatchRetryStrategy) NO_RETRY_STRATEGY; + } + + /** A strategy that never retries batch operations. */ + private static class NoRetryStrategy implements AsyncBatchRetryStrategy { + private static final long serialVersionUID = 1L; + + private NoRetryStrategy() {} + + @Override + public boolean canRetry(int currentAttempts) { + return false; + } + + @Override + public long getBackoffTimeMillis(int currentAttempts) { + return -1; + } + + @Override + public AsyncBatchRetryPredicate getRetryPredicate() { + return new BatchRetryPredicate<>(null, null); + } + } + + /** Default implementation of {@link AsyncBatchRetryPredicate}. */ + private static class BatchRetryPredicate implements AsyncBatchRetryPredicate { + private final Predicate> resultPredicate; + private final Predicate exceptionPredicate; + + public BatchRetryPredicate( + Predicate> resultPredicate, + Predicate exceptionPredicate) { + this.resultPredicate = resultPredicate; + this.exceptionPredicate = exceptionPredicate; + } + + @Override + public Optional>> resultPredicate() { + return Optional.ofNullable(resultPredicate); + } + + @Override + public Optional> exceptionPredicate() { + return Optional.ofNullable(exceptionPredicate); + } + } + + /** + * A retry strategy that uses a fixed delay between retry attempts. + * + * @param the type of output elements + */ + public static class FixedDelayRetryStrategy implements AsyncBatchRetryStrategy { + private static final long serialVersionUID = 1L; + + private final int maxAttempts; + private final long backoffTimeMillis; + private final Predicate> resultPredicate; + private final Predicate exceptionPredicate; + + private FixedDelayRetryStrategy( + int maxAttempts, + long backoffTimeMillis, + Predicate> resultPredicate, + Predicate exceptionPredicate) { + this.maxAttempts = maxAttempts; + this.backoffTimeMillis = backoffTimeMillis; + this.resultPredicate = resultPredicate; + this.exceptionPredicate = exceptionPredicate; + } + + @Override + public boolean canRetry(int currentAttempts) { + return currentAttempts <= maxAttempts; + } + + @Override + public long getBackoffTimeMillis(int currentAttempts) { + return backoffTimeMillis; + } + + @Override + public AsyncBatchRetryPredicate getRetryPredicate() { + return new BatchRetryPredicate<>(resultPredicate, exceptionPredicate); + } + } + + /** + * Builder for creating a {@link FixedDelayRetryStrategy}. + * + * @param the type of output elements + */ + public static class FixedDelayRetryStrategyBuilder { + private final int maxAttempts; + private final long backoffTimeMillis; + private Predicate> resultPredicate; + private Predicate exceptionPredicate; + + /** + * Creates a builder with the specified retry parameters. + * + * @param maxAttempts maximum number of retry attempts (must be > 0) + * @param backoffTimeMillis delay in milliseconds between retries (must be > 0) + */ + public FixedDelayRetryStrategyBuilder(int maxAttempts, long backoffTimeMillis) { + Preconditions.checkArgument( + maxAttempts > 0, "maxAttempts should be greater than zero."); + Preconditions.checkArgument( + backoffTimeMillis > 0, "backoffTimeMillis should be greater than zero."); + this.maxAttempts = maxAttempts; + this.backoffTimeMillis = backoffTimeMillis; + } + + /** + * Sets the predicate to evaluate results and determine if a retry is needed. + * + * @param resultRetryPredicate predicate that returns true if retry should be triggered + * @return this builder for method chaining + */ + public FixedDelayRetryStrategyBuilder ifResult( + @Nonnull Predicate> resultRetryPredicate) { + this.resultPredicate = resultRetryPredicate; + return this; + } + + /** + * Sets the predicate to evaluate exceptions and determine if a retry is needed. + * + * @param exceptionRetryPredicate predicate that returns true if retry should be triggered + * @return this builder for method chaining + */ + public FixedDelayRetryStrategyBuilder ifException( + @Nonnull Predicate exceptionRetryPredicate) { + this.exceptionPredicate = exceptionRetryPredicate; + return this; + } + + /** + * Builds the retry strategy. + * + * @return the configured retry strategy + */ + public FixedDelayRetryStrategy build() { + return new FixedDelayRetryStrategy<>( + maxAttempts, backoffTimeMillis, resultPredicate, exceptionPredicate); + } + } + + /** + * A retry strategy that uses exponentially increasing delays between retry attempts. + * + *

The delay for attempt N is: min(initialDelay * multiplier^(N-1), maxRetryDelay) + * + * @param the type of output elements + */ + public static class ExponentialBackoffDelayRetryStrategy + implements AsyncBatchRetryStrategy { + private static final long serialVersionUID = 1L; + + private final int maxAttempts; + private final long maxRetryDelay; + private final long initialDelay; + private final double multiplier; + private final Predicate> resultPredicate; + private final Predicate exceptionPredicate; + + // Note: This field is mutable for tracking retry state. + // It's acceptable because each operator instance has its own strategy instance. + private long lastRetryDelay; + + private ExponentialBackoffDelayRetryStrategy( + int maxAttempts, + long initialDelay, + long maxRetryDelay, + double multiplier, + Predicate> resultPredicate, + Predicate exceptionPredicate) { + this.maxAttempts = maxAttempts; + this.maxRetryDelay = maxRetryDelay; + this.multiplier = multiplier; + this.resultPredicate = resultPredicate; + this.exceptionPredicate = exceptionPredicate; + this.initialDelay = initialDelay; + this.lastRetryDelay = initialDelay; + } + + @Override + public boolean canRetry(int currentAttempts) { + return currentAttempts <= maxAttempts; + } + + @Override + public long getBackoffTimeMillis(int currentAttempts) { + if (currentAttempts <= 1) { + // Reset to initialDelay for first attempt + this.lastRetryDelay = initialDelay; + return lastRetryDelay; + } + + long backoff = Math.min((long) (lastRetryDelay * multiplier), maxRetryDelay); + this.lastRetryDelay = backoff; + return backoff; + } + + @Override + public AsyncBatchRetryPredicate getRetryPredicate() { + return new BatchRetryPredicate<>(resultPredicate, exceptionPredicate); + } + } + + /** + * Builder for creating an {@link ExponentialBackoffDelayRetryStrategy}. + * + * @param the type of output elements + */ + public static class ExponentialBackoffDelayRetryStrategyBuilder { + private final int maxAttempts; + private final long initialDelay; + private final long maxRetryDelay; + private final double multiplier; + + private Predicate> resultPredicate; + private Predicate exceptionPredicate; + + /** + * Creates a builder with the specified exponential backoff parameters. + * + * @param maxAttempts maximum number of retry attempts (must be > 0) + * @param initialDelay initial delay in milliseconds (must be > 0) + * @param maxRetryDelay maximum delay in milliseconds (must be >= initialDelay) + * @param multiplier multiplier for delay increase (must be >= 1.0) + */ + public ExponentialBackoffDelayRetryStrategyBuilder( + int maxAttempts, long initialDelay, long maxRetryDelay, double multiplier) { + Preconditions.checkArgument( + maxAttempts > 0, "maxAttempts should be greater than zero."); + Preconditions.checkArgument( + initialDelay > 0, "initialDelay should be greater than zero."); + Preconditions.checkArgument( + maxRetryDelay >= initialDelay, + "maxRetryDelay should be greater than or equal to initialDelay."); + Preconditions.checkArgument( + multiplier >= 1.0, "multiplier should be greater than or equal to 1.0."); + this.maxAttempts = maxAttempts; + this.initialDelay = initialDelay; + this.maxRetryDelay = maxRetryDelay; + this.multiplier = multiplier; + } + + /** + * Sets the predicate to evaluate results and determine if a retry is needed. + * + * @param resultRetryPredicate predicate that returns true if retry should be triggered + * @return this builder for method chaining + */ + public ExponentialBackoffDelayRetryStrategyBuilder ifResult( + @Nonnull Predicate> resultRetryPredicate) { + this.resultPredicate = resultRetryPredicate; + return this; + } + + /** + * Sets the predicate to evaluate exceptions and determine if a retry is needed. + * + * @param exceptionRetryPredicate predicate that returns true if retry should be triggered + * @return this builder for method chaining + */ + public ExponentialBackoffDelayRetryStrategyBuilder ifException( + @Nonnull Predicate exceptionRetryPredicate) { + this.exceptionPredicate = exceptionRetryPredicate; + return this; + } + + /** + * Builds the retry strategy. + * + * @return the configured retry strategy + */ + public ExponentialBackoffDelayRetryStrategy build() { + return new ExponentialBackoffDelayRetryStrategy<>( + maxAttempts, + initialDelay, + maxRetryDelay, + multiplier, + resultPredicate, + exceptionPredicate); + } + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncBatchRetryAndTimeoutTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncBatchRetryAndTimeoutTest.java new file mode 100644 index 0000000000000..5174fef30ce44 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncBatchRetryAndTimeoutTest.java @@ -0,0 +1,624 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.streaming.api.functions.async.AsyncBatchFunction; +import org.apache.flink.streaming.api.functions.async.AsyncBatchRetryStrategy; +import org.apache.flink.streaming.api.functions.async.AsyncBatchTimeoutPolicy; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.retryable.AsyncBatchRetryStrategies; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for retry and timeout functionality in {@link AsyncBatchWaitOperator}. + * + *

These tests verify: + * + *

    + *
  • Retry on exception with fixed delay strategy + *
  • Retry on exception with exponential backoff strategy + *
  • Retry on result predicate + *
  • Timeout with fail behavior + *
  • Timeout with allow partial behavior + *
  • Retry and timeout interaction + *
+ */ +@Timeout(value = 100, unit = TimeUnit.SECONDS) +class AsyncBatchRetryAndTimeoutTest { + + // ================================================================================ + // Retry Tests + // ================================================================================ + + /** + * Test that retry works correctly with fixed delay strategy. + * + *

Scenario: Batch function fails twice then succeeds on third attempt. + */ + @Test + void testRetryWithFixedDelay() throws Exception { + final int maxBatchSize = 2; + final AtomicInteger attemptCount = new AtomicInteger(0); + final int failuresBeforeSuccess = 2; + + // Function that fails first 2 times, then succeeds + AsyncBatchFunction function = + (inputs, resultFuture) -> { + int attempt = attemptCount.incrementAndGet(); + if (attempt <= failuresBeforeSuccess) { + resultFuture.completeExceptionally( + new IOException("Simulated failure #" + attempt)); + } else { + resultFuture.complete( + inputs.stream().map(i -> i * 2).collect(Collectors.toList())); + } + }; + + // Retry strategy: max 3 attempts, 10ms delay, retry on IOException + AsyncBatchRetryStrategy retryStrategy = + new AsyncBatchRetryStrategies.FixedDelayRetryStrategyBuilder(3, 10L) + .ifException(e -> e instanceof IOException) + .build(); + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarnessWithRetry(function, maxBatchSize, retryStrategy)) { + + testHarness.open(); + + // Process 2 elements to trigger a batch + testHarness.processElement(new StreamRecord<>(1, 1L)); + testHarness.processElement(new StreamRecord<>(2, 2L)); + + testHarness.endInput(); + + // Verify retry happened + assertThat(attemptCount.get()).isEqualTo(3); + + // Verify outputs after successful retry + List outputs = + testHarness.getOutput().stream() + .filter(e -> e instanceof StreamRecord) + .map(e -> ((StreamRecord) e).getValue()) + .collect(Collectors.toList()); + + assertThat(outputs).containsExactlyInAnyOrder(2, 4); + + // Verify retry counter metric + AsyncBatchWaitOperator operator = + (AsyncBatchWaitOperator) testHarness.getOperator(); + assertThat(operator.getBatchRetryCounter().getCount()).isEqualTo(2); + } + } + + /** + * Test that retry works correctly with exponential backoff strategy. + * + *

Scenario: Batch function fails twice then succeeds on third attempt. + */ + @Test + void testRetryWithExponentialBackoff() throws Exception { + final int maxBatchSize = 2; + final AtomicInteger attemptCount = new AtomicInteger(0); + final List attemptTimes = new CopyOnWriteArrayList<>(); + + // Function that fails first 2 times, then succeeds + AsyncBatchFunction function = + (inputs, resultFuture) -> { + attemptTimes.add(System.currentTimeMillis()); + int attempt = attemptCount.incrementAndGet(); + if (attempt <= 2) { + resultFuture.completeExceptionally( + new RuntimeException("Simulated failure")); + } else { + resultFuture.complete(inputs); + } + }; + + // Exponential backoff: max 3 attempts, initial 10ms, max 100ms, multiplier 2.0 + AsyncBatchRetryStrategy retryStrategy = + new AsyncBatchRetryStrategies.ExponentialBackoffDelayRetryStrategyBuilder( + 3, 10L, 100L, 2.0) + .ifException(e -> e instanceof RuntimeException) + .build(); + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarnessWithRetry(function, maxBatchSize, retryStrategy)) { + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(1, 1L)); + testHarness.processElement(new StreamRecord<>(2, 2L)); + + testHarness.endInput(); + + // Verify 3 attempts were made + assertThat(attemptCount.get()).isEqualTo(3); + assertThat(attemptTimes).hasSize(3); + } + } + + /** + * Test that retry is triggered based on result predicate. + * + *

Scenario: Batch function returns empty result, triggers retry. + */ + @Test + void testRetryOnResultPredicate() throws Exception { + final int maxBatchSize = 2; + final AtomicInteger attemptCount = new AtomicInteger(0); + + // Function that returns empty result on first attempt + AsyncBatchFunction function = + (inputs, resultFuture) -> { + int attempt = attemptCount.incrementAndGet(); + if (attempt == 1) { + // Return empty result - should trigger retry + resultFuture.complete(Collections.emptyList()); + } else { + resultFuture.complete(inputs); + } + }; + + // Retry strategy: retry if result is empty + AsyncBatchRetryStrategy retryStrategy = + new AsyncBatchRetryStrategies.FixedDelayRetryStrategyBuilder(2, 10L) + .ifResult(results -> results.isEmpty()) + .build(); + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarnessWithRetry(function, maxBatchSize, retryStrategy)) { + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(1, 1L)); + testHarness.processElement(new StreamRecord<>(2, 2L)); + + testHarness.endInput(); + + // Verify retry happened + assertThat(attemptCount.get()).isEqualTo(2); + + // Verify outputs after retry + List outputs = + testHarness.getOutput().stream() + .filter(e -> e instanceof StreamRecord) + .map(e -> ((StreamRecord) e).getValue()) + .collect(Collectors.toList()); + + assertThat(outputs).containsExactlyInAnyOrder(1, 2); + } + } + + /** + * Test that retry fails after max attempts exhausted. + * + *

Scenario: All retry attempts fail. + */ + @Test + void testRetryExhausted() throws Exception { + final int maxBatchSize = 2; + final AtomicInteger attemptCount = new AtomicInteger(0); + + // Function that always fails + AsyncBatchFunction function = + (inputs, resultFuture) -> { + attemptCount.incrementAndGet(); + resultFuture.completeExceptionally(new IOException("Always fails")); + }; + + // Retry strategy: max 2 attempts + AsyncBatchRetryStrategy retryStrategy = + new AsyncBatchRetryStrategies.FixedDelayRetryStrategyBuilder(2, 5L) + .ifException(e -> e instanceof IOException) + .build(); + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarnessWithRetry(function, maxBatchSize, retryStrategy)) { + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(1, 1L)); + testHarness.processElement(new StreamRecord<>(2, 2L)); + + testHarness.endInput(); + + // Verify all retry attempts were made (1 initial + 2 retries = 3) + assertThat(attemptCount.get()).isEqualTo(3); + + // Verify failure is propagated + assertThat(testHarness.getEnvironment().getActualExternalFailureCause()) + .isPresent() + .get() + .satisfies(t -> assertThat(t.getCause()).isInstanceOf(IOException.class)); + + // Verify failure counter + AsyncBatchWaitOperator operator = + (AsyncBatchWaitOperator) testHarness.getOperator(); + assertThat(operator.getAsyncCallFailuresCounter().getCount()).isEqualTo(1); + } + } + + /** + * Test that non-matching exceptions are not retried. + * + *

Scenario: Function throws exception that doesn't match retry predicate. + */ + @Test + void testNoRetryForNonMatchingException() throws Exception { + final int maxBatchSize = 2; + final AtomicInteger attemptCount = new AtomicInteger(0); + + // Function that throws IllegalStateException + AsyncBatchFunction function = + (inputs, resultFuture) -> { + attemptCount.incrementAndGet(); + resultFuture.completeExceptionally(new IllegalStateException("Not retryable")); + }; + + // Retry strategy: only retry on IOException + AsyncBatchRetryStrategy retryStrategy = + new AsyncBatchRetryStrategies.FixedDelayRetryStrategyBuilder(3, 10L) + .ifException(e -> e instanceof IOException) + .build(); + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarnessWithRetry(function, maxBatchSize, retryStrategy)) { + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(1, 1L)); + testHarness.processElement(new StreamRecord<>(2, 2L)); + + testHarness.endInput(); + + // Verify no retry (only 1 attempt) + assertThat(attemptCount.get()).isEqualTo(1); + + // Verify retry counter is 0 + AsyncBatchWaitOperator operator = + (AsyncBatchWaitOperator) testHarness.getOperator(); + assertThat(operator.getBatchRetryCounter().getCount()).isEqualTo(0); + } + } + + // ================================================================================ + // Timeout Tests + // ================================================================================ + + /** + * Test timeout with fail behavior. + * + *

Scenario: Async operation takes too long, timeout triggers failure. + */ + @Test + void testTimeoutWithFailBehavior() throws Exception { + final int maxBatchSize = 2; + final CompletableFuture blockingFuture = new CompletableFuture<>(); + + // Function that blocks indefinitely + AsyncBatchFunction function = + (inputs, resultFuture) -> { + // Never completes - should timeout + blockingFuture.thenRun(() -> resultFuture.complete(inputs)); + }; + + // Timeout policy: fail after 50ms + AsyncBatchTimeoutPolicy timeoutPolicy = AsyncBatchTimeoutPolicy.failOnTimeout(50L); + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarnessWithTimeout(function, maxBatchSize, timeoutPolicy)) { + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(1, 1L)); + testHarness.processElement(new StreamRecord<>(2, 2L)); + + // Wait for timeout to occur + Thread.sleep(100); + + testHarness.endInput(); + + // Verify timeout failure + assertThat(testHarness.getEnvironment().getActualExternalFailureCause()) + .isPresent() + .get() + .satisfies(t -> assertThat(t).isInstanceOf(TimeoutException.class)); + + // Verify timeout counter + AsyncBatchWaitOperator operator = + (AsyncBatchWaitOperator) testHarness.getOperator(); + assertThat(operator.getBatchTimeoutCounter().getCount()).isEqualTo(1); + } + } + + /** + * Test timeout with allow partial behavior. + * + *

Scenario: Async operation takes too long, timeout allows partial results. + */ + @Test + void testTimeoutWithAllowPartialBehavior() throws Exception { + final int maxBatchSize = 2; + final CompletableFuture blockingFuture = new CompletableFuture<>(); + + // Function that blocks indefinitely + AsyncBatchFunction function = + (inputs, resultFuture) -> { + // Never completes - should timeout + blockingFuture.thenRun(() -> resultFuture.complete(inputs)); + }; + + // Timeout policy: allow partial after 50ms + AsyncBatchTimeoutPolicy timeoutPolicy = AsyncBatchTimeoutPolicy.allowPartialOnTimeout(50L); + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarnessWithTimeout(function, maxBatchSize, timeoutPolicy)) { + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(1, 1L)); + testHarness.processElement(new StreamRecord<>(2, 2L)); + + // Wait for timeout to occur + Thread.sleep(100); + + testHarness.endInput(); + + // Verify no failure (partial results allowed) + assertThat(testHarness.getEnvironment().getActualExternalFailureCause()).isEmpty(); + + // Verify timeout counter + AsyncBatchWaitOperator operator = + (AsyncBatchWaitOperator) testHarness.getOperator(); + assertThat(operator.getBatchTimeoutCounter().getCount()).isEqualTo(1); + + // No outputs since function never completed + assertThat(testHarness.getOutput()).isEmpty(); + } + } + + /** + * Test that completion before timeout succeeds normally. + * + *

Scenario: Async operation completes before timeout. + */ + @Test + void testCompletionBeforeTimeout() throws Exception { + final int maxBatchSize = 2; + + // Function that completes immediately + AsyncBatchFunction function = + (inputs, resultFuture) -> { + resultFuture.complete( + inputs.stream().map(i -> i * 2).collect(Collectors.toList())); + }; + + // Long timeout - should never trigger + AsyncBatchTimeoutPolicy timeoutPolicy = AsyncBatchTimeoutPolicy.failOnTimeout(10000L); + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarnessWithTimeout(function, maxBatchSize, timeoutPolicy)) { + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(1, 1L)); + testHarness.processElement(new StreamRecord<>(2, 2L)); + + testHarness.endInput(); + + // Verify no failure + assertThat(testHarness.getEnvironment().getActualExternalFailureCause()).isEmpty(); + + // Verify no timeout + AsyncBatchWaitOperator operator = + (AsyncBatchWaitOperator) testHarness.getOperator(); + assertThat(operator.getBatchTimeoutCounter().getCount()).isEqualTo(0); + + // Verify outputs + List outputs = + testHarness.getOutput().stream() + .filter(e -> e instanceof StreamRecord) + .map(e -> ((StreamRecord) e).getValue()) + .collect(Collectors.toList()); + + assertThat(outputs).containsExactlyInAnyOrder(2, 4); + } + } + + // ================================================================================ + // Combined Retry and Timeout Tests + // ================================================================================ + + /** + * Test that timeout cancels pending retry. + * + *

Scenario: Retrying when timeout occurs. + */ + @Test + void testTimeoutCancelsRetry() throws Exception { + final int maxBatchSize = 2; + final AtomicInteger attemptCount = new AtomicInteger(0); + final List> pendingFutures = new ArrayList<>(); + + // Function that fails and hangs on retry + AsyncBatchFunction function = + (inputs, resultFuture) -> { + int attempt = attemptCount.incrementAndGet(); + if (attempt == 1) { + resultFuture.completeExceptionally(new IOException("First attempt fails")); + } else { + // Hang on retry - should timeout + CompletableFuture future = new CompletableFuture<>(); + pendingFutures.add(future); + future.thenRun(() -> resultFuture.complete(inputs)); + } + }; + + // Retry with timeout + AsyncBatchRetryStrategy retryStrategy = + new AsyncBatchRetryStrategies.FixedDelayRetryStrategyBuilder(3, 5L) + .ifException(e -> e instanceof IOException) + .build(); + AsyncBatchTimeoutPolicy timeoutPolicy = AsyncBatchTimeoutPolicy.failOnTimeout(100L); + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarnessWithRetryAndTimeout( + function, maxBatchSize, retryStrategy, timeoutPolicy)) { + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(1, 1L)); + testHarness.processElement(new StreamRecord<>(2, 2L)); + + // Wait for timeout to occur + Thread.sleep(200); + + testHarness.endInput(); + + // Verify timeout occurred + AsyncBatchWaitOperator operator = + (AsyncBatchWaitOperator) testHarness.getOperator(); + assertThat(operator.getBatchTimeoutCounter().getCount()).isEqualTo(1); + } + } + + /** + * Test successful retry before timeout. + * + *

Scenario: Retry succeeds before timeout expires. + */ + @Test + void testRetrySucceedsBeforeTimeout() throws Exception { + final int maxBatchSize = 2; + final AtomicInteger attemptCount = new AtomicInteger(0); + + // Function that fails once then succeeds quickly + AsyncBatchFunction function = + (inputs, resultFuture) -> { + int attempt = attemptCount.incrementAndGet(); + if (attempt == 1) { + resultFuture.completeExceptionally(new IOException("First attempt fails")); + } else { + resultFuture.complete( + inputs.stream().map(i -> i * 2).collect(Collectors.toList())); + } + }; + + // Retry with long timeout + AsyncBatchRetryStrategy retryStrategy = + new AsyncBatchRetryStrategies.FixedDelayRetryStrategyBuilder(3, 5L) + .ifException(e -> e instanceof IOException) + .build(); + AsyncBatchTimeoutPolicy timeoutPolicy = AsyncBatchTimeoutPolicy.failOnTimeout(5000L); + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarnessWithRetryAndTimeout( + function, maxBatchSize, retryStrategy, timeoutPolicy)) { + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(1, 1L)); + testHarness.processElement(new StreamRecord<>(2, 2L)); + + testHarness.endInput(); + + // Verify retry happened + assertThat(attemptCount.get()).isEqualTo(2); + + // Verify no timeout + AsyncBatchWaitOperator operator = + (AsyncBatchWaitOperator) testHarness.getOperator(); + assertThat(operator.getBatchTimeoutCounter().getCount()).isEqualTo(0); + + // Verify outputs + List outputs = + testHarness.getOutput().stream() + .filter(e -> e instanceof StreamRecord) + .map(e -> ((StreamRecord) e).getValue()) + .collect(Collectors.toList()); + + assertThat(outputs).containsExactlyInAnyOrder(2, 4); + } + } + + // ================================================================================ + // Test Harness Helpers + // ================================================================================ + + private static OneInputStreamOperatorTestHarness createTestHarnessWithRetry( + AsyncBatchFunction function, + int maxBatchSize, + AsyncBatchRetryStrategy retryStrategy) + throws Exception { + return new OneInputStreamOperatorTestHarness<>( + new AsyncBatchWaitOperatorFactory<>( + function, + maxBatchSize, + 0L, + retryStrategy, + AsyncBatchTimeoutPolicy.NO_TIMEOUT_POLICY), + IntSerializer.INSTANCE); + } + + @SuppressWarnings("unchecked") + private static OneInputStreamOperatorTestHarness createTestHarnessWithTimeout( + AsyncBatchFunction function, + int maxBatchSize, + AsyncBatchTimeoutPolicy timeoutPolicy) + throws Exception { + return new OneInputStreamOperatorTestHarness<>( + new AsyncBatchWaitOperatorFactory<>( + function, + maxBatchSize, + 0L, + AsyncBatchRetryStrategies.noRetry(), + timeoutPolicy), + IntSerializer.INSTANCE); + } + + private static OneInputStreamOperatorTestHarness + createTestHarnessWithRetryAndTimeout( + AsyncBatchFunction function, + int maxBatchSize, + AsyncBatchRetryStrategy retryStrategy, + AsyncBatchTimeoutPolicy timeoutPolicy) + throws Exception { + return new OneInputStreamOperatorTestHarness<>( + new AsyncBatchWaitOperatorFactory<>( + function, maxBatchSize, 0L, retryStrategy, timeoutPolicy), + IntSerializer.INSTANCE); + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncBatchWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncBatchWaitOperatorTest.java new file mode 100644 index 0000000000000..7895ee74c5f08 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncBatchWaitOperatorTest.java @@ -0,0 +1,811 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.operators.testutils.ExpectedTestException; +import org.apache.flink.streaming.api.functions.async.AsyncBatchFunction; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@link AsyncBatchWaitOperator}. + * + *

These tests verify: + * + *

    + *
  • Batch size trigger - elements are batched correctly + *
  • Correct result emission - all outputs are emitted downstream + *
  • Exception propagation - errors fail the operator + *
+ */ +@Timeout(value = 100, unit = TimeUnit.SECONDS) +class AsyncBatchWaitOperatorTest { + + /** + * Test that the operator correctly batches elements based on maxBatchSize. + * + *

Input: 5 records with maxBatchSize = 3 + * + *

Expected: 2 batch invocations with sizes [3, 2] + */ + @Test + void testBatchSizeTrigger() throws Exception { + final int maxBatchSize = 3; + final List batchSizes = new CopyOnWriteArrayList<>(); + + AsyncBatchFunction function = + (inputs, resultFuture) -> { + batchSizes.add(inputs.size()); + // Return input * 2 for each element + List results = + inputs.stream().map(i -> i * 2).collect(Collectors.toList()); + resultFuture.complete(results); + }; + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarness(function, maxBatchSize)) { + + testHarness.open(); + + // Process 5 elements + testHarness.processElement(new StreamRecord<>(1, 1L)); + testHarness.processElement(new StreamRecord<>(2, 2L)); + testHarness.processElement(new StreamRecord<>(3, 3L)); + // First batch of 3 should be triggered here + + testHarness.processElement(new StreamRecord<>(4, 4L)); + testHarness.processElement(new StreamRecord<>(5, 5L)); + // Remaining 2 elements in buffer + + testHarness.endInput(); + // Second batch of 2 should be triggered on endInput + + // Verify batch sizes + assertThat(batchSizes).containsExactly(3, 2); + } + } + + /** Test that all results from the batch function are correctly emitted downstream. */ + @Test + void testCorrectResultEmission() throws Exception { + final int maxBatchSize = 3; + + // Function that doubles each input + AsyncBatchFunction function = + (inputs, resultFuture) -> { + List results = + inputs.stream().map(i -> i * 2).collect(Collectors.toList()); + resultFuture.complete(results); + }; + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarness(function, maxBatchSize)) { + + testHarness.open(); + + // Process 5 elements: 1, 2, 3, 4, 5 + for (int i = 1; i <= 5; i++) { + testHarness.processElement(new StreamRecord<>(i, i)); + } + + testHarness.endInput(); + + // Verify outputs: should be 2, 4, 6, 8, 10 + List outputs = + testHarness.getOutput().stream() + .filter(e -> e instanceof StreamRecord) + .map(e -> ((StreamRecord) e).getValue()) + .collect(Collectors.toList()); + + assertThat(outputs).containsExactlyInAnyOrder(2, 4, 6, 8, 10); + } + } + + /** Test that exceptions from the batch function are properly propagated. */ + @Test + void testExceptionPropagation() throws Exception { + final int maxBatchSize = 2; + + AsyncBatchFunction function = + (inputs, resultFuture) -> { + resultFuture.completeExceptionally(new ExpectedTestException()); + }; + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarness(function, maxBatchSize)) { + + testHarness.open(); + + // Process 2 elements to trigger a batch + testHarness.processElement(new StreamRecord<>(1, 1L)); + testHarness.processElement(new StreamRecord<>(2, 2L)); + + // The exception should be propagated - we need to yield to process the async result + // In the test harness, the exception is recorded in the environment + testHarness.endInput(); + + // Verify that the task environment received the exception + assertThat(testHarness.getEnvironment().getActualExternalFailureCause()) + .isPresent() + .get() + .satisfies( + t -> + assertThat(t.getCause()) + .isInstanceOf(ExpectedTestException.class)); + } + } + + /** Test async completion using CompletableFuture. */ + @Test + void testAsyncCompletion() throws Exception { + final int maxBatchSize = 2; + final AtomicInteger invocationCount = new AtomicInteger(0); + + AsyncBatchFunction function = + (inputs, resultFuture) -> { + invocationCount.incrementAndGet(); + // Simulate async processing + CompletableFuture.supplyAsync( + () -> + inputs.stream() + .map(i -> i * 3) + .collect(Collectors.toList())) + .thenAccept(resultFuture::complete); + }; + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarness(function, maxBatchSize)) { + + testHarness.open(); + + // Process 4 elements: should trigger 2 batches + for (int i = 1; i <= 4; i++) { + testHarness.processElement(new StreamRecord<>(i, i)); + } + + testHarness.endInput(); + + // Verify invocation count + assertThat(invocationCount.get()).isEqualTo(2); + + // Verify outputs: should be 3, 6, 9, 12 + List outputs = + testHarness.getOutput().stream() + .filter(e -> e instanceof StreamRecord) + .map(e -> ((StreamRecord) e).getValue()) + .collect(Collectors.toList()); + + assertThat(outputs).containsExactlyInAnyOrder(3, 6, 9, 12); + } + } + + /** Test that empty batches are not triggered. */ + @Test + void testEmptyInput() throws Exception { + final int maxBatchSize = 3; + final AtomicInteger invocationCount = new AtomicInteger(0); + + AsyncBatchFunction function = + (inputs, resultFuture) -> { + invocationCount.incrementAndGet(); + resultFuture.complete(Collections.emptyList()); + }; + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarness(function, maxBatchSize)) { + + testHarness.open(); + testHarness.endInput(); + + // No invocations should happen for empty input + assertThat(invocationCount.get()).isEqualTo(0); + assertThat(testHarness.getOutput()).isEmpty(); + } + } + + /** Test that batch function can return fewer or more outputs than inputs. */ + @Test + void testVariableOutputSize() throws Exception { + final int maxBatchSize = 3; + + // Function that returns only one output per batch (aggregation-style) + AsyncBatchFunction function = + (inputs, resultFuture) -> { + int sum = inputs.stream().mapToInt(Integer::intValue).sum(); + resultFuture.complete(Collections.singletonList(sum)); + }; + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarness(function, maxBatchSize)) { + + testHarness.open(); + + // Process 5 elements: 1, 2, 3, 4, 5 + for (int i = 1; i <= 5; i++) { + testHarness.processElement(new StreamRecord<>(i, i)); + } + + testHarness.endInput(); + + // First batch: 1+2+3 = 6, Second batch: 4+5 = 9 + List outputs = + testHarness.getOutput().stream() + .filter(e -> e instanceof StreamRecord) + .map(e -> ((StreamRecord) e).getValue()) + .collect(Collectors.toList()); + + assertThat(outputs).containsExactlyInAnyOrder(6, 9); + } + } + + /** Test single element batch (maxBatchSize = 1). */ + @Test + void testSingleElementBatch() throws Exception { + final int maxBatchSize = 1; + final List batchSizes = new CopyOnWriteArrayList<>(); + + AsyncBatchFunction function = + (inputs, resultFuture) -> { + batchSizes.add(inputs.size()); + resultFuture.complete(inputs); + }; + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarness(function, maxBatchSize)) { + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(1, 1L)); + testHarness.processElement(new StreamRecord<>(2, 2L)); + testHarness.processElement(new StreamRecord<>(3, 3L)); + + testHarness.endInput(); + + // Each element should trigger its own batch + assertThat(batchSizes).containsExactly(1, 1, 1); + } + } + + // ================================================================================ + // Timeout-based batching tests + // ================================================================================ + + /** + * Test that timeout triggers batch flush even when batch size is not reached. + * + *

maxBatchSize = 10, batchTimeoutMs = 50 + * + *

Send 1 record, advance processing time, expect asyncInvokeBatch called with size 1 + */ + @Test + void testTimeoutFlush() throws Exception { + final int maxBatchSize = 10; + final long batchTimeoutMs = 50L; + final List batchSizes = new CopyOnWriteArrayList<>(); + + AsyncBatchFunction function = + (inputs, resultFuture) -> { + batchSizes.add(inputs.size()); + resultFuture.complete(inputs); + }; + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarnessWithTimeout(function, maxBatchSize, batchTimeoutMs)) { + + testHarness.open(); + + // Set initial processing time + testHarness.setProcessingTime(0L); + + // Process 1 element - should start the timer + testHarness.processElement(new StreamRecord<>(1, 1L)); + + // Batch size not reached, no flush yet + assertThat(batchSizes).isEmpty(); + + // Advance processing time past timeout threshold + testHarness.setProcessingTime(batchTimeoutMs + 1); + + // Timer should have fired, triggering batch flush with size 1 + assertThat(batchSizes).containsExactly(1); + + testHarness.endInput(); + } + } + + /** + * Test that size-triggered flush happens before timeout when batch fills up quickly. + * + *

maxBatchSize = 2, batchTimeoutMs = 1 hour (3600000 ms) + * + *

Send 2 records immediately, verify batch is flushed by size, not by timeout + */ + @Test + void testSizeBeatsTimeout() throws Exception { + final int maxBatchSize = 2; + final long batchTimeoutMs = 3600000L; // 1 hour - should never be reached + final List batchSizes = new CopyOnWriteArrayList<>(); + + AsyncBatchFunction function = + (inputs, resultFuture) -> { + batchSizes.add(inputs.size()); + resultFuture.complete(inputs); + }; + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarnessWithTimeout(function, maxBatchSize, batchTimeoutMs)) { + + testHarness.open(); + + // Set initial processing time + testHarness.setProcessingTime(0L); + + // Process 2 elements immediately - should trigger batch by size + testHarness.processElement(new StreamRecord<>(1, 1L)); + testHarness.processElement(new StreamRecord<>(2, 2L)); + + // Batch should have been flushed by size (not timeout) + assertThat(batchSizes).containsExactly(2); + + // Even if we advance time, no additional flush should happen since buffer is empty + testHarness.setProcessingTime(batchTimeoutMs + 1); + assertThat(batchSizes).containsExactly(2); + + testHarness.endInput(); + } + } + + /** + * Test that timer is properly reset after batch flush. + * + *

First batch flushed by timeout, second batch starts a new timer. + */ + @Test + void testTimerResetAfterFlush() throws Exception { + final int maxBatchSize = 10; + final long batchTimeoutMs = 100L; + final List batchSizes = new CopyOnWriteArrayList<>(); + + AsyncBatchFunction function = + (inputs, resultFuture) -> { + batchSizes.add(inputs.size()); + resultFuture.complete(inputs); + }; + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarnessWithTimeout(function, maxBatchSize, batchTimeoutMs)) { + + testHarness.open(); + + // === First batch === + testHarness.setProcessingTime(0L); + testHarness.processElement(new StreamRecord<>(1, 1L)); + + // Advance time to trigger first timeout flush + testHarness.setProcessingTime(batchTimeoutMs + 1); + assertThat(batchSizes).containsExactly(1); + + // === Second batch === + // Start second batch at time 200 + testHarness.setProcessingTime(200L); + testHarness.processElement(new StreamRecord<>(2, 2L)); + testHarness.processElement(new StreamRecord<>(3, 3L)); + + // No flush yet - batch size not reached + assertThat(batchSizes).containsExactly(1); + + // Advance time to trigger second timeout flush (200 + 100 + 1 = 301) + testHarness.setProcessingTime(301L); + assertThat(batchSizes).containsExactly(1, 2); + + testHarness.endInput(); + } + } + + /** Test timeout with multiple batches interleaving size and timeout triggers. */ + @Test + void testMixedSizeAndTimeoutTriggers() throws Exception { + final int maxBatchSize = 3; + final long batchTimeoutMs = 100L; + final List batchSizes = new CopyOnWriteArrayList<>(); + + AsyncBatchFunction function = + (inputs, resultFuture) -> { + batchSizes.add(inputs.size()); + resultFuture.complete(inputs); + }; + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarnessWithTimeout(function, maxBatchSize, batchTimeoutMs)) { + + testHarness.open(); + testHarness.setProcessingTime(0L); + + // First batch: size-triggered + testHarness.processElement(new StreamRecord<>(1, 1L)); + testHarness.processElement(new StreamRecord<>(2, 2L)); + testHarness.processElement(new StreamRecord<>(3, 3L)); + assertThat(batchSizes).containsExactly(3); + + // Second batch: timeout-triggered + testHarness.setProcessingTime(200L); + testHarness.processElement(new StreamRecord<>(4, 4L)); + assertThat(batchSizes).containsExactly(3); // Not flushed yet + + testHarness.setProcessingTime(301L); // 200 + 100 + 1 + assertThat(batchSizes).containsExactly(3, 1); + + // Third batch: size-triggered again + testHarness.setProcessingTime(400L); + testHarness.processElement(new StreamRecord<>(5, 5L)); + testHarness.processElement(new StreamRecord<>(6, 6L)); + testHarness.processElement(new StreamRecord<>(7, 7L)); + assertThat(batchSizes).containsExactly(3, 1, 3); + + testHarness.endInput(); + } + } + + /** Test that timeout is disabled when batchTimeoutMs <= 0. */ + @Test + void testTimeoutDisabled() throws Exception { + final int maxBatchSize = 10; + final long batchTimeoutMs = 0L; // Disabled + final List batchSizes = new CopyOnWriteArrayList<>(); + + AsyncBatchFunction function = + (inputs, resultFuture) -> { + batchSizes.add(inputs.size()); + resultFuture.complete(inputs); + }; + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarnessWithTimeout(function, maxBatchSize, batchTimeoutMs)) { + + testHarness.open(); + testHarness.setProcessingTime(0L); + + // Process 1 element + testHarness.processElement(new StreamRecord<>(1, 1L)); + + // Advance time significantly - should not trigger flush since timeout is disabled + testHarness.setProcessingTime(1000000L); + assertThat(batchSizes).isEmpty(); + + // Flush happens only on endInput + testHarness.endInput(); + assertThat(batchSizes).containsExactly(1); + } + } + + // ================================================================================ + // Metrics tests + // ================================================================================ + + /** + * Test that batch size histogram is correctly updated. + * + *

Process 5 elements with maxBatchSize = 3, expect histogram to record batch sizes [3, 2]. + */ + @Test + void testBatchSizeMetric() throws Exception { + final int maxBatchSize = 3; + + AsyncBatchFunction function = + (inputs, resultFuture) -> { + resultFuture.complete(inputs); + }; + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarness(function, maxBatchSize)) { + + testHarness.open(); + + // Get the operator to access metrics + AsyncBatchWaitOperator operator = + (AsyncBatchWaitOperator) testHarness.getOperator(); + + // Process 5 elements + for (int i = 1; i <= 5; i++) { + testHarness.processElement(new StreamRecord<>(i, i)); + } + + testHarness.endInput(); + + // Verify batch size histogram recorded 2 batches + assertThat(operator.getBatchSizeHistogram().getCount()).isEqualTo(2); + } + } + + /** + * Test that total batches and records counters are correctly updated. + * + *

Process 7 elements with maxBatchSize = 3, expect 3 batches and 7 records. + */ + @Test + void testBatchAndRecordCounters() throws Exception { + final int maxBatchSize = 3; + + AsyncBatchFunction function = + (inputs, resultFuture) -> { + resultFuture.complete(inputs); + }; + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarness(function, maxBatchSize)) { + + testHarness.open(); + + AsyncBatchWaitOperator operator = + (AsyncBatchWaitOperator) testHarness.getOperator(); + + // Process 7 elements: should result in batches of [3, 3, 1] + for (int i = 1; i <= 7; i++) { + testHarness.processElement(new StreamRecord<>(i, i)); + } + + testHarness.endInput(); + + // Verify counters + assertThat(operator.getTotalBatchesProcessedCounter().getCount()).isEqualTo(3); + assertThat(operator.getTotalRecordsProcessedCounter().getCount()).isEqualTo(7); + } + } + + /** + * Test that async call duration histogram is updated on completion. + * + *

Process elements and verify duration is recorded. + */ + @Test + void testAsyncCallDurationMetric() throws Exception { + final int maxBatchSize = 2; + + AsyncBatchFunction function = + (inputs, resultFuture) -> { + // Simulate some processing time + CompletableFuture.runAsync( + () -> { + try { + Thread.sleep(10); // Small delay + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }) + .thenRun(() -> resultFuture.complete(inputs)); + }; + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarness(function, maxBatchSize)) { + + testHarness.open(); + + AsyncBatchWaitOperator operator = + (AsyncBatchWaitOperator) testHarness.getOperator(); + + // Process 2 elements to trigger a batch + testHarness.processElement(new StreamRecord<>(1, 1L)); + testHarness.processElement(new StreamRecord<>(2, 2L)); + + testHarness.endInput(); + + // Verify async call duration was recorded + assertThat(operator.getAsyncCallDurationHistogram().getCount()).isEqualTo(1); + } + } + + /** + * Test that async call failure counter is incremented on exception. + * + *

Process elements with a failing function and verify failure counter. + */ + @Test + void testAsyncCallFailureMetric() throws Exception { + final int maxBatchSize = 2; + + AsyncBatchFunction function = + (inputs, resultFuture) -> { + resultFuture.completeExceptionally(new ExpectedTestException()); + }; + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarness(function, maxBatchSize)) { + + testHarness.open(); + + AsyncBatchWaitOperator operator = + (AsyncBatchWaitOperator) testHarness.getOperator(); + + // Process 2 elements to trigger a batch (which will fail) + testHarness.processElement(new StreamRecord<>(1, 1L)); + testHarness.processElement(new StreamRecord<>(2, 2L)); + + testHarness.endInput(); + + // Verify failure counter was incremented + assertThat(operator.getAsyncCallFailuresCounter().getCount()).isEqualTo(1); + } + } + + /** + * Test that in-flight count is correctly tracked during processing. + * + *

This test verifies the gauge correctly reflects concurrent operations. + */ + @Test + void testInflightBatchesTracking() throws Exception { + final int maxBatchSize = 2; + final CompletableFuture blockingFuture = new CompletableFuture<>(); + + AsyncBatchFunction function = + (inputs, resultFuture) -> { + // Wait for explicit completion signal + blockingFuture.thenRun(() -> resultFuture.complete(inputs)); + }; + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarness(function, maxBatchSize)) { + + testHarness.open(); + + AsyncBatchWaitOperator operator = + (AsyncBatchWaitOperator) testHarness.getOperator(); + + // Initially no in-flight batches + assertThat(operator.getInFlightCount()).isEqualTo(0); + + // Process 2 elements to trigger a batch + testHarness.processElement(new StreamRecord<>(1, 1L)); + testHarness.processElement(new StreamRecord<>(2, 2L)); + + // Now there should be 1 in-flight batch + assertThat(operator.getInFlightCount()).isEqualTo(1); + + // Complete the blocking future to allow processing + blockingFuture.complete(null); + + testHarness.endInput(); + + // After completion, should be 0 + assertThat(operator.getInFlightCount()).isEqualTo(0); + } + } + + /** + * Test that batch latency histogram is recorded correctly. + * + *

Using timeout-based batching to measure latency. + */ + @Test + void testBatchLatencyMetric() throws Exception { + final int maxBatchSize = 10; + final long batchTimeoutMs = 100L; + + AsyncBatchFunction function = + (inputs, resultFuture) -> { + resultFuture.complete(inputs); + }; + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarnessWithTimeout(function, maxBatchSize, batchTimeoutMs)) { + + testHarness.open(); + testHarness.setProcessingTime(0L); + + AsyncBatchWaitOperator operator = + (AsyncBatchWaitOperator) testHarness.getOperator(); + + // Process 1 element + testHarness.processElement(new StreamRecord<>(1, 1L)); + + // Advance time to trigger timeout flush + testHarness.setProcessingTime(batchTimeoutMs + 1); + + testHarness.endInput(); + + // Verify batch latency was recorded + assertThat(operator.getBatchLatencyHistogram().getCount()).isEqualTo(1); + } + } + + /** + * Test metrics with multiple batches of different sizes. + * + *

Comprehensive test covering various batch sizes and timing scenarios. + */ + @Test + void testMetricsWithMultipleBatches() throws Exception { + final int maxBatchSize = 3; + final long batchTimeoutMs = 100L; + + AsyncBatchFunction function = + (inputs, resultFuture) -> { + resultFuture.complete(inputs); + }; + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarnessWithTimeout(function, maxBatchSize, batchTimeoutMs)) { + + testHarness.open(); + testHarness.setProcessingTime(0L); + + AsyncBatchWaitOperator operator = + (AsyncBatchWaitOperator) testHarness.getOperator(); + + // First batch: size-triggered (3 elements) + testHarness.processElement(new StreamRecord<>(1, 1L)); + testHarness.processElement(new StreamRecord<>(2, 2L)); + testHarness.processElement(new StreamRecord<>(3, 3L)); + + // Second batch: timeout-triggered (1 element) + testHarness.setProcessingTime(200L); + testHarness.processElement(new StreamRecord<>(4, 4L)); + testHarness.setProcessingTime(301L); + + // Third batch: size-triggered (3 elements) + testHarness.setProcessingTime(400L); + testHarness.processElement(new StreamRecord<>(5, 5L)); + testHarness.processElement(new StreamRecord<>(6, 6L)); + testHarness.processElement(new StreamRecord<>(7, 7L)); + + // Fourth batch: end-of-input (2 elements) + testHarness.processElement(new StreamRecord<>(8, 8L)); + testHarness.processElement(new StreamRecord<>(9, 9L)); + + testHarness.endInput(); + + // Verify metrics + assertThat(operator.getTotalBatchesProcessedCounter().getCount()).isEqualTo(4); + assertThat(operator.getTotalRecordsProcessedCounter().getCount()).isEqualTo(9); + assertThat(operator.getBatchSizeHistogram().getCount()).isEqualTo(4); + assertThat(operator.getBatchLatencyHistogram().getCount()).isEqualTo(4); + assertThat(operator.getAsyncCallDurationHistogram().getCount()).isEqualTo(4); + assertThat(operator.getAsyncCallFailuresCounter().getCount()).isEqualTo(0); + } + } + + private static OneInputStreamOperatorTestHarness createTestHarness( + AsyncBatchFunction function, int maxBatchSize) throws Exception { + return new OneInputStreamOperatorTestHarness<>( + new AsyncBatchWaitOperatorFactory<>(function, maxBatchSize), + IntSerializer.INSTANCE); + } + + private static OneInputStreamOperatorTestHarness createTestHarnessWithTimeout( + AsyncBatchFunction function, int maxBatchSize, long batchTimeoutMs) + throws Exception { + return new OneInputStreamOperatorTestHarness<>( + new AsyncBatchWaitOperatorFactory<>(function, maxBatchSize, batchTimeoutMs), + IntSerializer.INSTANCE); + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/OrderedAsyncBatchWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/OrderedAsyncBatchWaitOperatorTest.java new file mode 100644 index 0000000000000..4f5b48bbc53ab --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/OrderedAsyncBatchWaitOperatorTest.java @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.operators.testutils.ExpectedTestException; +import org.apache.flink.streaming.api.functions.async.AsyncBatchFunction; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@link OrderedAsyncBatchWaitOperator}. + * + *

These tests verify: + * + *

    + *
  • Strict ordering guarantee - output order matches input order + *
  • Batch + time trigger interaction with ordering + *
  • Exception propagation + *
+ */ +@Timeout(value = 100, unit = TimeUnit.SECONDS) +class OrderedAsyncBatchWaitOperatorTest { + + /** + * Test strict ordering guarantee even when async results complete out of order. + * + *

Inputs: [1, 2, 3, 4, 5] + * + *

Async batches complete in reverse order (second batch completes before first) + * + *

Output MUST be: [1, 2, 3, 4, 5] (same as input order) + */ + @Test + void testStrictOrderingGuarantee() throws Exception { + final int maxBatchSize = 3; + final List> batchFutures = new CopyOnWriteArrayList<>(); + final AtomicInteger batchIndex = new AtomicInteger(0); + + AsyncBatchFunction function = + (inputs, resultFuture) -> { + int currentBatch = batchIndex.getAndIncrement(); + CompletableFuture future = new CompletableFuture<>(); + batchFutures.add(future); + + // Store input for later completion + List inputCopy = new ArrayList<>(inputs); + + // Complete asynchronously when future is completed externally + future.thenRun(() -> resultFuture.complete(inputCopy)); + }; + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarness(function, maxBatchSize)) { + + testHarness.open(); + + // Process 5 elements: batch 0 = [1,2,3], batch 1 = [4,5] + for (int i = 1; i <= 5; i++) { + testHarness.processElement(new StreamRecord<>(i, i)); + } + + // Trigger end of input to flush remaining elements + // This creates batch 1 with [4, 5] + // At this point we have 2 batches pending + + // Wait for batches to be created + while (batchFutures.size() < 2) { + Thread.sleep(10); + } + + // Complete batches in REVERSE order (batch 1 first, then batch 0) + // This tests that output is still in original order + batchFutures.get(1).complete(null); // Complete batch [4, 5] first + Thread.sleep(50); // Give time for async processing + + batchFutures.get(0).complete(null); // Then complete batch [1, 2, 3] + + testHarness.endInput(); + + // Verify outputs are in strict input order: [1, 2, 3, 4, 5] + List outputs = + testHarness.getOutput().stream() + .filter(e -> e instanceof StreamRecord) + .map(e -> ((StreamRecord) e).getValue()) + .collect(Collectors.toList()); + + // MUST be in exact input order, not completion order + assertThat(outputs).containsExactly(1, 2, 3, 4, 5); + } + } + + /** Test ordering with synchronous completions - simple case to verify basic ordering. */ + @Test + void testOrderingWithSynchronousCompletion() throws Exception { + final int maxBatchSize = 2; + + // Function that immediately completes with input values + AsyncBatchFunction function = + (inputs, resultFuture) -> { + resultFuture.complete(inputs); + }; + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarness(function, maxBatchSize)) { + + testHarness.open(); + + // Process 6 elements: 3 batches of 2 + for (int i = 1; i <= 6; i++) { + testHarness.processElement(new StreamRecord<>(i, i)); + } + + testHarness.endInput(); + + // Verify outputs are in strict input order + List outputs = + testHarness.getOutput().stream() + .filter(e -> e instanceof StreamRecord) + .map(e -> ((StreamRecord) e).getValue()) + .collect(Collectors.toList()); + + assertThat(outputs).containsExactly(1, 2, 3, 4, 5, 6); + } + } + + /** + * Test batch + time trigger interaction with ordering preserved. + * + *

Small batch size with timeout, verify ordering across multiple batches. + */ + @Test + void testBatchAndTimeoutTriggerWithOrdering() throws Exception { + final int maxBatchSize = 3; + final long batchTimeoutMs = 100L; + + AsyncBatchFunction function = + (inputs, resultFuture) -> { + resultFuture.complete(inputs); + }; + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarnessWithTimeout(function, maxBatchSize, batchTimeoutMs)) { + + testHarness.open(); + testHarness.setProcessingTime(0L); + + // First batch: size-triggered (3 elements) + testHarness.processElement(new StreamRecord<>(1, 1L)); + testHarness.processElement(new StreamRecord<>(2, 2L)); + testHarness.processElement(new StreamRecord<>(3, 3L)); + + // Second batch: timeout-triggered (1 element) + testHarness.setProcessingTime(200L); + testHarness.processElement(new StreamRecord<>(4, 4L)); + testHarness.setProcessingTime(301L); // Trigger timeout + + // Third batch: size-triggered (3 elements) + testHarness.setProcessingTime(400L); + testHarness.processElement(new StreamRecord<>(5, 5L)); + testHarness.processElement(new StreamRecord<>(6, 6L)); + testHarness.processElement(new StreamRecord<>(7, 7L)); + + testHarness.endInput(); + + // Verify outputs are in strict input order + List outputs = + testHarness.getOutput().stream() + .filter(e -> e instanceof StreamRecord) + .map(e -> ((StreamRecord) e).getValue()) + .collect(Collectors.toList()); + + assertThat(outputs).containsExactly(1, 2, 3, 4, 5, 6, 7); + } + } + + /** Test exception propagation - exception in batch invocation fails fast. */ + @Test + void testExceptionPropagation() throws Exception { + final int maxBatchSize = 2; + + AsyncBatchFunction function = + (inputs, resultFuture) -> { + resultFuture.completeExceptionally(new ExpectedTestException()); + }; + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarness(function, maxBatchSize)) { + + testHarness.open(); + + // Process 2 elements to trigger a batch + testHarness.processElement(new StreamRecord<>(1, 1L)); + testHarness.processElement(new StreamRecord<>(2, 2L)); + + testHarness.endInput(); + + // Verify that the task environment received the exception + assertThat(testHarness.getEnvironment().getActualExternalFailureCause()) + .isPresent() + .get() + .satisfies( + t -> + assertThat(t.getCause()) + .isInstanceOf(ExpectedTestException.class)); + } + } + + /** Test ordering with delayed async completions simulating real async I/O. */ + @Test + void testOrderingWithDelayedAsyncCompletion() throws Exception { + final int maxBatchSize = 2; + final List completionOrder = new CopyOnWriteArrayList<>(); + + AsyncBatchFunction function = + (inputs, resultFuture) -> { + List inputCopy = new ArrayList<>(inputs); + int firstElement = inputCopy.get(0); + + // Simulate varying async delays - earlier batches take longer + CompletableFuture.runAsync( + () -> { + try { + // Batch starting with 1 delays more than batch starting with 3 + int delay = (5 - firstElement) * 20; + Thread.sleep(delay); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + completionOrder.add(firstElement); + resultFuture.complete(inputCopy); + }); + }; + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarness(function, maxBatchSize)) { + + testHarness.open(); + + // Process 4 elements: batch 0 = [1,2], batch 1 = [3,4] + for (int i = 1; i <= 4; i++) { + testHarness.processElement(new StreamRecord<>(i, i)); + } + + testHarness.endInput(); + + // Verify outputs are in strict input order regardless of completion order + List outputs = + testHarness.getOutput().stream() + .filter(e -> e instanceof StreamRecord) + .map(e -> ((StreamRecord) e).getValue()) + .collect(Collectors.toList()); + + assertThat(outputs).containsExactly(1, 2, 3, 4); + } + } + + /** Test that empty batches are not triggered. */ + @Test + void testEmptyInput() throws Exception { + final int maxBatchSize = 3; + final AtomicInteger invocationCount = new AtomicInteger(0); + + AsyncBatchFunction function = + (inputs, resultFuture) -> { + invocationCount.incrementAndGet(); + resultFuture.complete(Collections.emptyList()); + }; + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarness(function, maxBatchSize)) { + + testHarness.open(); + testHarness.endInput(); + + // No invocations should happen for empty input + assertThat(invocationCount.get()).isEqualTo(0); + assertThat(testHarness.getOutput()).isEmpty(); + } + } + + /** Test single element batch with ordering. */ + @Test + void testSingleElementBatchOrdering() throws Exception { + final int maxBatchSize = 1; + final List batchSizes = new CopyOnWriteArrayList<>(); + + AsyncBatchFunction function = + (inputs, resultFuture) -> { + batchSizes.add(inputs.size()); + resultFuture.complete(inputs); + }; + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarness(function, maxBatchSize)) { + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(3, 1L)); + testHarness.processElement(new StreamRecord<>(1, 2L)); + testHarness.processElement(new StreamRecord<>(2, 3L)); + + testHarness.endInput(); + + // Each element is its own batch + assertThat(batchSizes).containsExactly(1, 1, 1); + + // Verify outputs maintain input order (not value order) + List outputs = + testHarness.getOutput().stream() + .filter(e -> e instanceof StreamRecord) + .map(e -> ((StreamRecord) e).getValue()) + .collect(Collectors.toList()); + + // Output order should be [3, 1, 2] - same as input order + assertThat(outputs).containsExactly(3, 1, 2); + } + } + + /** Test that batch function can return different number of outputs while maintaining order. */ + @Test + void testVariableOutputSizeWithOrdering() throws Exception { + final int maxBatchSize = 2; + + // Function that returns sum of batch (one output per batch) + AsyncBatchFunction function = + (inputs, resultFuture) -> { + int sum = inputs.stream().mapToInt(Integer::intValue).sum(); + resultFuture.complete(Collections.singletonList(sum)); + }; + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarness(function, maxBatchSize)) { + + testHarness.open(); + + // Process 4 elements: batch 0 = [1,2] -> 3, batch 1 = [3,4] -> 7 + for (int i = 1; i <= 4; i++) { + testHarness.processElement(new StreamRecord<>(i, i)); + } + + testHarness.endInput(); + + // First batch outputs first (3), then second batch (7) + List outputs = + testHarness.getOutput().stream() + .filter(e -> e instanceof StreamRecord) + .map(e -> ((StreamRecord) e).getValue()) + .collect(Collectors.toList()); + + // Order should be [3, 7] - first batch result, then second batch result + assertThat(outputs).containsExactly(3, 7); + } + } + + /** Test ordering with many batches to verify sequence number handling. */ + @Test + void testManyBatchesOrdering() throws Exception { + final int maxBatchSize = 2; + final int totalElements = 20; + + AsyncBatchFunction function = + (inputs, resultFuture) -> { + resultFuture.complete(inputs); + }; + + try (OneInputStreamOperatorTestHarness testHarness = + createTestHarness(function, maxBatchSize)) { + + testHarness.open(); + + // Process many elements + for (int i = 1; i <= totalElements; i++) { + testHarness.processElement(new StreamRecord<>(i, i)); + } + + testHarness.endInput(); + + // Verify all outputs are in strict order + List outputs = + testHarness.getOutput().stream() + .filter(e -> e instanceof StreamRecord) + .map(e -> ((StreamRecord) e).getValue()) + .collect(Collectors.toList()); + + List expected = new ArrayList<>(); + for (int i = 1; i <= totalElements; i++) { + expected.add(i); + } + + assertThat(outputs).containsExactlyElementsOf(expected); + } + } + + private static OneInputStreamOperatorTestHarness createTestHarness( + AsyncBatchFunction function, int maxBatchSize) throws Exception { + return new OneInputStreamOperatorTestHarness<>( + new OrderedAsyncBatchWaitOperatorFactory<>(function, maxBatchSize), + IntSerializer.INSTANCE); + } + + private static OneInputStreamOperatorTestHarness createTestHarnessWithTimeout( + AsyncBatchFunction function, int maxBatchSize, long batchTimeoutMs) + throws Exception { + return new OneInputStreamOperatorTestHarness<>( + new OrderedAsyncBatchWaitOperatorFactory<>(function, maxBatchSize, batchTimeoutMs), + IntSerializer.INSTANCE); + } +}