Skip to content

Commit 3a7314e

Browse files
authored
Merge pull request #68 from TheovanKraay/sync-sample-for-bulk-support
add sync sample for bulk support
2 parents 34f3b5b + a7cbba5 commit 3a7314e

File tree

2 files changed

+540
-0
lines changed

2 files changed

+540
-0
lines changed
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
/*
5+
The BulkWriter class is an attempt to provide guidance for creating
6+
a higher level abstraction over the existing low level Java Bulk API
7+
*/
8+
package com.azure.cosmos.examples.bulk.sync;
9+
10+
import com.azure.cosmos.CosmosContainer;
11+
import com.azure.cosmos.CosmosException;
12+
import com.azure.cosmos.implementation.HttpConstants;
13+
import com.azure.cosmos.models.CosmosBulkExecutionOptions;
14+
import com.azure.cosmos.models.CosmosBulkItemResponse;
15+
import com.azure.cosmos.models.CosmosBulkOperationResponse;
16+
import com.azure.cosmos.models.CosmosItemOperation;
17+
import org.slf4j.Logger;
18+
import org.slf4j.LoggerFactory;
19+
import reactor.core.publisher.Sinks;
20+
import java.util.concurrent.Semaphore;
21+
22+
public class BulkWriter {
23+
private static final Logger logger = LoggerFactory.getLogger(BulkWriter.class);
24+
25+
private final Sinks.Many<CosmosItemOperation> bulkInputEmitter = Sinks.many().unicast().onBackpressureBuffer();
26+
private final int cpuCount = Runtime.getRuntime().availableProcessors();
27+
28+
//Max items to be buffered to avoid out of memory error
29+
private final Semaphore semaphore = new Semaphore(1024 * 167 / cpuCount);
30+
31+
private final Sinks.EmitFailureHandler emitFailureHandler =
32+
(signalType, emitResult) -> {
33+
if (emitResult.equals(Sinks.EmitResult.FAIL_NON_SERIALIZED)) {
34+
logger.debug("emitFailureHandler - Signal: [{}], Result: [{}]", signalType, emitResult);
35+
return true;
36+
} else {
37+
logger.error("emitFailureHandler - Signal: [{}], Result: [{}]", signalType, emitResult);
38+
return false;
39+
}
40+
};
41+
42+
private final CosmosContainer cosmosContainer;
43+
44+
public BulkWriter(CosmosContainer cosmosContainer) {
45+
this.cosmosContainer = cosmosContainer;
46+
}
47+
48+
public void scheduleWrites(CosmosItemOperation cosmosItemOperation) {
49+
while(!semaphore.tryAcquire()) {
50+
logger.info("Unable to acquire permit");
51+
}
52+
logger.info("Acquired permit");
53+
scheduleInternalWrites(cosmosItemOperation);
54+
}
55+
56+
private void scheduleInternalWrites(CosmosItemOperation cosmosItemOperation) {
57+
bulkInputEmitter.emitNext(cosmosItemOperation, emitFailureHandler);
58+
}
59+
60+
public Iterable<CosmosBulkOperationResponse<Object>> execute() {
61+
return this.execute(null);
62+
}
63+
64+
public Iterable<CosmosBulkOperationResponse<Object>> execute(CosmosBulkExecutionOptions bulkOptions) {
65+
if (bulkOptions == null) {
66+
bulkOptions = new CosmosBulkExecutionOptions();
67+
}
68+
bulkInputEmitter.tryEmitComplete();
69+
Iterable<CosmosBulkOperationResponse<Object>> bulkOperationResponse = cosmosContainer
70+
.executeBulkOperations(
71+
bulkInputEmitter.asFlux().toIterable(),
72+
bulkOptions);
73+
for (CosmosBulkOperationResponse<Object> response : bulkOperationResponse) {
74+
processBulkOperationResponse(
75+
response.getResponse(),
76+
response.getOperation(),
77+
response.getException());
78+
}
79+
semaphore.release();
80+
return bulkOperationResponse;
81+
}
82+
83+
84+
85+
86+
private void processBulkOperationResponse(
87+
CosmosBulkItemResponse itemResponse,
88+
CosmosItemOperation itemOperation,
89+
Exception exception) {
90+
91+
if (exception != null) {
92+
handleException(itemOperation, exception);
93+
} else {
94+
processResponseCode(itemResponse, itemOperation);
95+
}
96+
}
97+
98+
private void processResponseCode(
99+
CosmosBulkItemResponse itemResponse,
100+
CosmosItemOperation itemOperation) {
101+
102+
if (itemResponse.isSuccessStatusCode()) {
103+
logger.info(
104+
"The operation for Item ID: [{}] Item PartitionKey Value: [{}] completed successfully " +
105+
"with a response status code: [{}]",
106+
itemOperation.getId(),
107+
itemOperation.getPartitionKeyValue(),
108+
itemResponse.getStatusCode());
109+
} else if (shouldRetry(itemResponse.getStatusCode())) {
110+
logger.info(
111+
"The operation for Item ID: [{}] Item PartitionKey Value: [{}] will be retried",
112+
itemOperation.getId(),
113+
itemOperation.getPartitionKeyValue());
114+
//re-scheduling
115+
scheduleWrites(itemOperation);
116+
} else {
117+
logger.info(
118+
"The operation for Item ID: [{}] Item PartitionKey Value: [{}] did not complete successfully " +
119+
"with a response status code: [{}]",
120+
itemOperation.getId(),
121+
itemOperation.getPartitionKeyValue(),
122+
itemResponse.getStatusCode());
123+
}
124+
}
125+
126+
private void handleException(CosmosItemOperation itemOperation, Exception exception) {
127+
if (!(exception instanceof CosmosException)) {
128+
logger.info(
129+
"The operation for Item ID: [{}] Item PartitionKey Value: [{}] encountered an unexpected failure",
130+
itemOperation.getId(),
131+
itemOperation.getPartitionKeyValue());
132+
} else {
133+
if (shouldRetry(((CosmosException) exception).getStatusCode())) {
134+
logger.info(
135+
"The operation for Item ID: [{}] Item PartitionKey Value: [{}] will be retried",
136+
itemOperation.getId(),
137+
itemOperation.getPartitionKeyValue());
138+
139+
//re-scheduling
140+
scheduleWrites(itemOperation);
141+
}
142+
}
143+
}
144+
145+
private boolean shouldRetry(int statusCode) {
146+
return statusCode == HttpConstants.StatusCodes.REQUEST_TIMEOUT ||
147+
statusCode == HttpConstants.StatusCodes.TOO_MANY_REQUESTS;
148+
}
149+
}

0 commit comments

Comments
 (0)