Skip to content

Commit 727bf5a

Browse files
ykmr1224dai-chen
authored andcommitted
Stabilize adaptive rate limit by considering current rate (opensearch-project#1027)
* Stabilize adaptive rate limit by considering current rate Signed-off-by: Tomoyuki Morita <[email protected]> * Fix bulk retry condition Signed-off-by: Tomoyuki Morita <[email protected]> * Fix FlintSparkConfSuite Signed-off-by: Tomoyuki Morita <[email protected]> * Fix FlintSparkConfSuite Signed-off-by: Tomoyuki Morita <[email protected]> * Reformat Signed-off-by: Tomoyuki Morita <[email protected]> * Use Queue interface instead of List Signed-off-by: Tomoyuki Morita <[email protected]> * Fix doc for BULK_INITIAL_BACKOFF Signed-off-by: Tomoyuki Morita <[email protected]> * Add retryable http status code Signed-off-by: Tomoyuki Morita <[email protected]> * Fix test Signed-off-by: Tomoyuki Morita <[email protected]> * Fix RequestRateMeter.addDataPoint Signed-off-by: Tomoyuki Morita <[email protected]> --------- Signed-off-by: Tomoyuki Morita <[email protected]>
1 parent a97442a commit 727bf5a

File tree

9 files changed

+204
-20
lines changed

9 files changed

+204
-20
lines changed

docs/index.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,8 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
543543
- `spark.datasource.flint.read.scroll_size`: default value is 100.
544544
- `spark.datasource.flint.read.scroll_duration`: default value is 5 minutes. scroll context keep alive duration.
545545
- `spark.datasource.flint.retry.max_retries`: max retries on failed HTTP request. default value is 3. Use 0 to disable retry.
546+
- `spark.datasource.flint.retry.bulk.max_retries`: max retries on failed bulk request. default value is 10. Use 0 to disable retry.
547+
- `spark.datasource.flint.retry.bulk.initial_backoff`: initial backoff in seconds for bulk request retry, default is 4.
546548
- `spark.datasource.flint.retry.http_status_codes`: retryable HTTP response status code list. default value is "429,502" (429 Too Many Request and 502 Bad Gateway).
547549
- `spark.datasource.flint.retry.exception_class_names`: retryable exception class name list. by default no retry on any exception thrown.
548550
- `spark.datasource.flint.read.support_shard`: default is true. set to false if index does not support shard (AWS OpenSearch Serverless collection). Do not use in production, this setting will be removed in later version.

flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@
1515
import dev.failsafe.event.ExecutionAttemptedEvent;
1616
import dev.failsafe.function.CheckedPredicate;
1717
import java.time.Duration;
18+
import java.util.Arrays;
1819
import java.util.Map;
1920
import java.util.Optional;
21+
import java.util.Set;
2022
import java.util.logging.Logger;
23+
import java.util.stream.Collectors;
2124
import org.opensearch.action.bulk.BulkResponse;
2225
import org.opensearch.flint.core.http.handler.ExceptionClassNameFailurePredicate;
2326
import org.opensearch.flint.core.http.handler.HttpAOSSResultPredicate;
@@ -41,8 +44,12 @@ public class FlintRetryOptions implements Serializable {
4144
*/
4245
public static final int DEFAULT_MAX_RETRIES = 3;
4346
public static final String MAX_RETRIES = "retry.max_retries";
47+
public static final int DEFAULT_BULK_MAX_RETRIES = 10;
48+
public static final String BULK_MAX_RETRIES = "retry.bulk.max_retries";
49+
public static final int DEFAULT_BULK_INITIAL_BACKOFF = 4;
50+
public static final String BULK_INITIAL_BACKOFF = "retry.bulk.initial_backoff";
4451

45-
public static final String DEFAULT_RETRYABLE_HTTP_STATUS_CODES = "429,502";
52+
public static final String DEFAULT_RETRYABLE_HTTP_STATUS_CODES = "429,500,502";
4653
public static final String RETRYABLE_HTTP_STATUS_CODES = "retry.http_status_codes";
4754

4855
/**
@@ -90,9 +97,9 @@ public <T> RetryPolicy<T> getRetryPolicy() {
9097
public RetryPolicy<BulkResponse> getBulkRetryPolicy(CheckedPredicate<BulkResponse> resultPredicate) {
9198
return RetryPolicy.<BulkResponse>builder()
9299
// Using higher initial backoff to mitigate throttling quickly
93-
.withBackoff(4, 30, SECONDS)
100+
.withBackoff(getBulkInitialBackoff(), 30, SECONDS)
94101
.withJitter(Duration.ofMillis(100))
95-
.withMaxRetries(getMaxRetries())
102+
.withMaxRetries(getBulkMaxRetries())
96103
// Do not retry on exception (will be handled by the other retry policy
97104
.handleIf((ex) -> false)
98105
.handleResultIf(resultPredicate)
@@ -122,10 +129,27 @@ public int getMaxRetries() {
122129
}
123130

124131
/**
125-
* @return retryable HTTP status code list
132+
* @return bulk maximum retry option value
133+
*/
134+
public int getBulkMaxRetries() {
135+
return Integer.parseInt(
136+
options.getOrDefault(BULK_MAX_RETRIES, String.valueOf(DEFAULT_BULK_MAX_RETRIES)));
137+
}
138+
139+
/**
140+
* @return maximum retry option value
126141
*/
127-
public String getRetryableHttpStatusCodes() {
128-
return options.getOrDefault(RETRYABLE_HTTP_STATUS_CODES, DEFAULT_RETRYABLE_HTTP_STATUS_CODES);
142+
public int getBulkInitialBackoff() {
143+
return Integer.parseInt(
144+
options.getOrDefault(BULK_INITIAL_BACKOFF, String.valueOf(DEFAULT_BULK_INITIAL_BACKOFF)));
145+
}
146+
147+
public Set<Integer> getRetryableHttpStatusCodes() {
148+
String statusCodes = options.getOrDefault(RETRYABLE_HTTP_STATUS_CODES, DEFAULT_RETRYABLE_HTTP_STATUS_CODES);
149+
return Arrays.stream(statusCodes.split(","))
150+
.map(String::trim)
151+
.map(Integer::valueOf)
152+
.collect(Collectors.toSet());
129153
}
130154

131155
/**

flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpStatusCodeResultPredicate.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,8 @@ public class HttpStatusCodeResultPredicate<T> implements CheckedPredicate<T> {
2626
*/
2727
private final Set<Integer> retryableStatusCodes;
2828

29-
public HttpStatusCodeResultPredicate(String httpStatusCodes) {
30-
this.retryableStatusCodes =
31-
Arrays.stream(httpStatusCodes.split(","))
32-
.map(String::trim)
33-
.map(Integer::valueOf)
34-
.collect(Collectors.toSet());
29+
public HttpStatusCodeResultPredicate(Set<Integer> httpStatusCodes) {
30+
this.retryableStatusCodes = httpStatusCodes;
3531
}
3632

3733
@Override

flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiterImpl.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@ public class BulkRequestRateLimiterImpl implements BulkRequestRateLimiter {
1919
private final long maxRate;
2020
private final long increaseStep;
2121
private final double decreaseRatio;
22+
private final RequestRateMeter requestRateMeter;
2223

2324
public BulkRequestRateLimiterImpl(FlintOptions flintOptions) {
2425
minRate = flintOptions.getBulkRequestMinRateLimitPerNode();
2526
maxRate = flintOptions.getBulkRequestMaxRateLimitPerNode();
2627
increaseStep = flintOptions.getBulkRequestRateLimitPerNodeIncreaseStep();
2728
decreaseRatio = flintOptions.getBulkRequestRateLimitPerNodeDecreaseRatio();
29+
requestRateMeter = new RequestRateMeter();
2830

2931
LOG.info("Setting rate limit for bulk request to " + minRate + " documents/sec");
3032
this.rateLimiter = RateLimiter.create(minRate);
@@ -42,14 +44,26 @@ public void acquirePermit() {
4244
public void acquirePermit(int permits) {
4345
this.rateLimiter.acquire(permits);
4446
LOG.info("Acquired " + permits + " permits");
47+
requestRateMeter.addDataPoint(System.currentTimeMillis(), permits);
4548
}
4649

4750
/**
4851
* Increase rate limit additively.
4952
*/
5053
@Override
5154
public void increaseRate() {
52-
setRate(getRate() + increaseStep);
55+
if (isEstimatedCurrentRateCloseToLimit()) {
56+
setRate(getRate() + increaseStep);
57+
} else {
58+
LOG.info("Rate increase was blocked.");
59+
}
60+
LOG.info("Current rate limit for bulk request is " + getRate() + " documents/sec");
61+
}
62+
63+
private boolean isEstimatedCurrentRateCloseToLimit() {
64+
long currentEstimatedRate = requestRateMeter.getCurrentEstimatedRate();
65+
LOG.info("Current estimated rate is " + currentEstimatedRate + " documents/sec");
66+
return getRate() * 0.8 < currentEstimatedRate;
5367
}
5468

5569
/**

flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkWrapper.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import dev.failsafe.function.CheckedPredicate;
1212
import java.util.Arrays;
1313
import java.util.List;
14+
import java.util.Set;
1415
import java.util.concurrent.atomic.AtomicInteger;
1516
import java.util.concurrent.atomic.AtomicReference;
1617
import java.util.logging.Logger;
@@ -34,10 +35,12 @@ public class OpenSearchBulkWrapper {
3435

3536
private final RetryPolicy<BulkResponse> retryPolicy;
3637
private final BulkRequestRateLimiter rateLimiter;
38+
private final Set<Integer> retryableStatusCodes;
3739

3840
public OpenSearchBulkWrapper(FlintRetryOptions retryOptions, BulkRequestRateLimiter rateLimiter) {
3941
this.retryPolicy = retryOptions.getBulkRetryPolicy(bulkItemRetryableResultPredicate);
4042
this.rateLimiter = rateLimiter;
43+
this.retryableStatusCodes = retryOptions.getRetryableHttpStatusCodes();
4144
}
4245

4346
/**
@@ -50,7 +53,6 @@ public OpenSearchBulkWrapper(FlintRetryOptions retryOptions, BulkRequestRateLimi
5053
* @return Last result
5154
*/
5255
public BulkResponse bulk(RestHighLevelClient client, BulkRequest bulkRequest, RequestOptions options) {
53-
rateLimiter.acquirePermit(bulkRequest.requests().size());
5456
return bulkWithPartialRetry(client, bulkRequest, options);
5557
}
5658

@@ -69,11 +71,13 @@ private BulkResponse bulkWithPartialRetry(RestHighLevelClient client, BulkReques
6971
})
7072
.get(() -> {
7173
requestCount.incrementAndGet();
74+
rateLimiter.acquirePermit(nextRequest.get().requests().size());
7275
BulkResponse response = client.bulk(nextRequest.get(), options);
7376

7477
if (!bulkItemRetryableResultPredicate.test(response)) {
7578
rateLimiter.increaseRate();
7679
} else {
80+
LOG.info("Bulk request failed. attempt = " + (requestCount.get() - 1));
7781
rateLimiter.decreaseRate();
7882
if (retryPolicy.getConfig().allowsRetries()) {
7983
nextRequest.set(getRetryableRequest(nextRequest.get(), response));
@@ -118,10 +122,10 @@ private static void verifyIdMatch(DocWriteRequest<?> request, BulkItemResponse r
118122
/**
119123
* A predicate to decide if a BulkResponse is retryable or not.
120124
*/
121-
private static final CheckedPredicate<BulkResponse> bulkItemRetryableResultPredicate = bulkResponse ->
125+
private final CheckedPredicate<BulkResponse> bulkItemRetryableResultPredicate = bulkResponse ->
122126
bulkResponse.hasFailures() && isRetryable(bulkResponse);
123127

124-
private static boolean isRetryable(BulkResponse bulkResponse) {
128+
private boolean isRetryable(BulkResponse bulkResponse) {
125129
if (Arrays.stream(bulkResponse.getItems())
126130
.anyMatch(itemResp -> isItemRetryable(itemResp))) {
127131
LOG.info("Found retryable failure in the bulk response");
@@ -130,12 +134,23 @@ private static boolean isRetryable(BulkResponse bulkResponse) {
130134
return false;
131135
}
132136

133-
private static boolean isItemRetryable(BulkItemResponse itemResponse) {
134-
return itemResponse.isFailed() && !isCreateConflict(itemResponse);
137+
private boolean isItemRetryable(BulkItemResponse itemResponse) {
138+
return itemResponse.isFailed() && !isCreateConflict(itemResponse)
139+
&& isFailureStatusRetryable(itemResponse);
135140
}
136141

137142
private static boolean isCreateConflict(BulkItemResponse itemResp) {
138143
return itemResp.getOpType() == DocWriteRequest.OpType.CREATE &&
139144
itemResp.getFailure().getStatus() == RestStatus.CONFLICT;
140145
}
146+
147+
private boolean isFailureStatusRetryable(BulkItemResponse itemResp) {
148+
if (retryableStatusCodes.contains(itemResp.getFailure().getStatus().getStatus())) {
149+
return true;
150+
} else {
151+
LOG.info("Found non-retryable failure in bulk response: " + itemResp.getFailure().getStatus()
152+
+ ", " + itemResp.getFailure().toString());
153+
return false;
154+
}
155+
}
141156
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.flint.core.storage;
7+
8+
import java.util.LinkedList;
9+
import java.util.List;
10+
import java.util.Queue;
11+
12+
/**
13+
* Track the current request rate based on the past requests within ESTIMATE_RANGE_DURATION_MSEC
14+
* milliseconds period.
15+
*/
16+
public class RequestRateMeter {
17+
private static final long ESTIMATE_RANGE_DURATION_MSEC = 3000;
18+
19+
private static class DataPoint {
20+
long timestamp;
21+
long requestCount;
22+
public DataPoint(long timestamp, long requestCount) {
23+
this.timestamp = timestamp;
24+
this.requestCount = requestCount;
25+
}
26+
}
27+
28+
private Queue<DataPoint> dataPoints = new LinkedList<>();
29+
private long currentSum = 0;
30+
31+
public synchronized void addDataPoint(long timestamp, long requestCount) {
32+
dataPoints.add(new DataPoint(timestamp, requestCount));
33+
currentSum += requestCount;
34+
removeOldDataPoints();
35+
}
36+
37+
public synchronized long getCurrentEstimatedRate() {
38+
removeOldDataPoints();
39+
return currentSum * 1000 / ESTIMATE_RANGE_DURATION_MSEC;
40+
}
41+
42+
private synchronized void removeOldDataPoints() {
43+
long curr = System.currentTimeMillis();
44+
while (!dataPoints.isEmpty() && dataPoints.peek().timestamp < curr - ESTIMATE_RANGE_DURATION_MSEC) {
45+
currentSum -= dataPoints.remove().requestCount;
46+
}
47+
}
48+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.flint.core.storage;
7+
8+
import org.junit.jupiter.api.BeforeEach;
9+
import org.junit.jupiter.api.Test;
10+
import static org.junit.jupiter.api.Assertions.*;
11+
12+
class RequestRateMeterTest {
13+
14+
private RequestRateMeter requestRateMeter;
15+
16+
@BeforeEach
17+
void setUp() {
18+
requestRateMeter = new RequestRateMeter();
19+
}
20+
21+
@Test
22+
void testAddDataPoint() {
23+
long timestamp = System.currentTimeMillis();
24+
requestRateMeter.addDataPoint(timestamp, 30);
25+
assertEquals(10, requestRateMeter.getCurrentEstimatedRate());
26+
}
27+
28+
@Test
29+
void testAddDataPointRemoveOldDataPoint() {
30+
long timestamp = System.currentTimeMillis();
31+
requestRateMeter.addDataPoint(timestamp - 4000, 30);
32+
requestRateMeter.addDataPoint(timestamp, 90);
33+
assertEquals(90 / 3, requestRateMeter.getCurrentEstimatedRate());
34+
}
35+
36+
@Test
37+
void testRemoveOldDataPoints() {
38+
long currentTime = System.currentTimeMillis();
39+
requestRateMeter.addDataPoint(currentTime - 4000, 30);
40+
requestRateMeter.addDataPoint(currentTime - 2000, 60);
41+
requestRateMeter.addDataPoint(currentTime, 90);
42+
43+
assertEquals((60 + 90)/3, requestRateMeter.getCurrentEstimatedRate());
44+
}
45+
46+
@Test
47+
void testGetCurrentEstimatedRate() {
48+
long currentTime = System.currentTimeMillis();
49+
requestRateMeter.addDataPoint(currentTime - 2500, 30);
50+
requestRateMeter.addDataPoint(currentTime - 1500, 60);
51+
requestRateMeter.addDataPoint(currentTime - 500, 90);
52+
53+
assertEquals((30 + 60 + 90)/3, requestRateMeter.getCurrentEstimatedRate());
54+
}
55+
56+
@Test
57+
void testEmptyRateMeter() {
58+
assertEquals(0, requestRateMeter.getCurrentEstimatedRate());
59+
}
60+
61+
@Test
62+
void testSingleDataPoint() {
63+
requestRateMeter.addDataPoint(System.currentTimeMillis(), 30);
64+
assertEquals(30 / 3, requestRateMeter.getCurrentEstimatedRate());
65+
}
66+
}

flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,18 @@ object FlintSparkConf {
136136
.doc("max retries on failed HTTP request, 0 means retry is disabled, default is 3")
137137
.createWithDefault(String.valueOf(FlintRetryOptions.DEFAULT_MAX_RETRIES))
138138

139+
val BULK_MAX_RETRIES =
140+
FlintConfig(s"spark.datasource.flint.${FlintRetryOptions.BULK_MAX_RETRIES}")
141+
.datasourceOption()
142+
.doc("max retries on failed HTTP request, 0 means retry is disabled, default is 10")
143+
.createWithDefault(String.valueOf(FlintRetryOptions.DEFAULT_BULK_MAX_RETRIES))
144+
145+
val BULK_INITIAL_BACKOFF =
146+
FlintConfig(s"spark.datasource.flint.${FlintRetryOptions.BULK_INITIAL_BACKOFF}")
147+
.datasourceOption()
148+
.doc("initial backoff in seconds for bulk request retry, default is 4s")
149+
.createWithDefault(String.valueOf(FlintRetryOptions.DEFAULT_BULK_INITIAL_BACKOFF))
150+
139151
val BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED =
140152
FlintConfig(
141153
s"spark.datasource.flint.${FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED}")
@@ -368,6 +380,8 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
368380
SCHEME,
369381
AUTH,
370382
MAX_RETRIES,
383+
BULK_MAX_RETRIES,
384+
BULK_INITIAL_BACKOFF,
371385
RETRYABLE_HTTP_STATUS_CODES,
372386
BULK_REQUEST_RATE_LIMIT_PER_NODE_ENABLED,
373387
BULK_REQUEST_MIN_RATE_LIMIT_PER_NODE,

flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import scala.collection.JavaConverters._
1111

1212
import org.opensearch.flint.core.FlintOptions
1313
import org.opensearch.flint.core.http.FlintRetryOptions._
14+
import org.scalatest.matchers.must.Matchers.contain
1415
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
1516

1617
import org.apache.spark.FlintSuite
@@ -46,7 +47,7 @@ class FlintSparkConfSuite extends FlintSuite {
4647
test("test retry options default values") {
4748
val retryOptions = FlintSparkConf().flintOptions().getRetryOptions
4849
retryOptions.getMaxRetries shouldBe DEFAULT_MAX_RETRIES
49-
retryOptions.getRetryableHttpStatusCodes shouldBe DEFAULT_RETRYABLE_HTTP_STATUS_CODES
50+
retryOptions.getRetryableHttpStatusCodes should contain theSameElementsAs Set(429, 500, 502)
5051
retryOptions.getRetryableExceptionClassNames shouldBe Optional.empty
5152
}
5253

@@ -60,7 +61,11 @@ class FlintSparkConfSuite extends FlintSuite {
6061
.getRetryOptions
6162

6263
retryOptions.getMaxRetries shouldBe 5
63-
retryOptions.getRetryableHttpStatusCodes shouldBe "429,502,503,504"
64+
retryOptions.getRetryableHttpStatusCodes should contain theSameElementsAs Set(
65+
429,
66+
502,
67+
503,
68+
504)
6469
retryOptions.getRetryableExceptionClassNames.get() shouldBe "java.net.ConnectException"
6570
}
6671

0 commit comments

Comments
 (0)