diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index 479fab642..35b0576fa 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -84,8 +84,14 @@ public class DorisBatchStreamLoad implements Serializable { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final List DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT)); - private static final long STREAM_LOAD_MAX_BYTES = 10 * 1024 * 1024 * 1024L; // 10 GB - private static final long STREAM_LOAD_MAX_ROWS = Integer.MAX_VALUE; + // Set the force flush threshold to 0.85 to prevent stream load tasks from failing when the data + // size exceeds 10GB. + private static final double STREAM_LOAD_FORCE_FLUSH_THRESHOLD_RATIO = 0.85; + private static final long STREAM_LOAD_MAX_BYTES = + (long) (10 * 1024 * 1024 * 1024L * STREAM_LOAD_FORCE_FLUSH_THRESHOLD_RATIO); + private static final long STREAM_LOAD_MAX_ROWS = + (long) (Integer.MAX_VALUE * STREAM_LOAD_FORCE_FLUSH_THRESHOLD_RATIO); + private final LabelGenerator labelGenerator; private final byte[] lineDelimiter; private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load"; @@ -224,7 +230,7 @@ public void writeRecord(String database, String table, byte[] record) { || buffer.getNumOfRecords() >= STREAM_LOAD_MAX_ROWS) { // The buffer capacity exceeds the stream load limit, flush boolean flush = bufferFullFlush(bufferKey); - LOG.info("trigger flush by buffer exceeding the limit, flush: {}", flush); + LOG.info("trigger flush by buffer exceeding the threshold limit, flush: {}", flush); } } @@ -244,11 +250,9 @@ private synchronized boolean doFlush( String bufferKey, boolean waitUtilDone, boolean bufferFull) { checkFlushException(); if (waitUtilDone || bufferFull) { - boolean flush = flush(bufferKey, waitUtilDone); - return flush; + return flush(bufferKey, waitUtilDone); } else if (flushQueue.size() < executionOptions.getFlushQueueSize()) { - boolean flush = flush(bufferKey, false); - return flush; + return flush(bufferKey, false); } return false; }