Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit e1ba550

Browse files
himshikhahainenber
authored andcommittedOct 1, 2024
Optimize checksum creation for remote cluster state (opensearch-project#16046)
* Support parallelisation in remote publication checksum computation Signed-off-by: Himshikha Gupta <himshikh@amazon.com>
1 parent 3039c2c commit e1ba550

File tree

6 files changed

+164
-82
lines changed

6 files changed

+164
-82
lines changed
 

‎server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java

+99-51
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212
import org.apache.logging.log4j.Logger;
1313
import org.opensearch.cluster.ClusterState;
1414
import org.opensearch.cluster.metadata.DiffableStringMap;
15+
import org.opensearch.common.CheckedFunction;
1516
import org.opensearch.common.io.stream.BytesStreamOutput;
1617
import org.opensearch.common.settings.Settings;
18+
import org.opensearch.common.unit.TimeValue;
1719
import org.opensearch.core.common.io.stream.BufferedChecksumStreamOutput;
1820
import org.opensearch.core.common.io.stream.StreamInput;
1921
import org.opensearch.core.common.io.stream.StreamOutput;
@@ -22,11 +24,15 @@
2224
import org.opensearch.core.xcontent.XContentBuilder;
2325
import org.opensearch.core.xcontent.XContentParseException;
2426
import org.opensearch.core.xcontent.XContentParser;
27+
import org.opensearch.threadpool.ThreadPool;
2528

2629
import java.io.IOException;
2730
import java.util.ArrayList;
2831
import java.util.List;
2932
import java.util.Objects;
33+
import java.util.concurrent.CountDownLatch;
34+
import java.util.concurrent.ExecutorService;
35+
import java.util.function.Consumer;
3036

3137
import com.jcraft.jzlib.JZlib;
3238

@@ -37,6 +43,7 @@
3743
*/
3844
public class ClusterStateChecksum implements ToXContentFragment, Writeable {
3945

46+
public static final int COMPONENT_SIZE = 11;
4047
static final String ROUTING_TABLE_CS = "routing_table";
4148
static final String NODES_CS = "discovery_nodes";
4249
static final String BLOCKS_CS = "blocks";
@@ -65,62 +72,103 @@ public class ClusterStateChecksum implements ToXContentFragment, Writeable {
6572
long indicesChecksum;
6673
long clusterStateChecksum;
6774

68-
public ClusterStateChecksum(ClusterState clusterState) {
69-
try (
70-
BytesStreamOutput out = new BytesStreamOutput();
71-
BufferedChecksumStreamOutput checksumOut = new BufferedChecksumStreamOutput(out)
72-
) {
73-
clusterState.routingTable().writeVerifiableTo(checksumOut);
74-
routingTableChecksum = checksumOut.getChecksum();
75-
76-
checksumOut.reset();
77-
clusterState.nodes().writeVerifiableTo(checksumOut);
78-
nodesChecksum = checksumOut.getChecksum();
79-
80-
checksumOut.reset();
81-
clusterState.coordinationMetadata().writeVerifiableTo(checksumOut);
82-
coordinationMetadataChecksum = checksumOut.getChecksum();
83-
84-
// Settings create sortedMap by default, so no explicit sorting required here.
85-
checksumOut.reset();
86-
Settings.writeSettingsToStream(clusterState.metadata().persistentSettings(), checksumOut);
87-
settingMetadataChecksum = checksumOut.getChecksum();
88-
89-
checksumOut.reset();
90-
Settings.writeSettingsToStream(clusterState.metadata().transientSettings(), checksumOut);
91-
transientSettingsMetadataChecksum = checksumOut.getChecksum();
92-
93-
checksumOut.reset();
94-
clusterState.metadata().templatesMetadata().writeVerifiableTo(checksumOut);
95-
templatesMetadataChecksum = checksumOut.getChecksum();
96-
97-
checksumOut.reset();
98-
checksumOut.writeStringCollection(clusterState.metadata().customs().keySet());
99-
customMetadataMapChecksum = checksumOut.getChecksum();
100-
101-
checksumOut.reset();
102-
((DiffableStringMap) clusterState.metadata().hashesOfConsistentSettings()).writeTo(checksumOut);
103-
hashesOfConsistentSettingsChecksum = checksumOut.getChecksum();
104-
105-
checksumOut.reset();
106-
checksumOut.writeMapValues(
75+
public ClusterStateChecksum(ClusterState clusterState, ThreadPool threadpool) {
76+
long start = threadpool.relativeTimeInNanos();
77+
ExecutorService executorService = threadpool.executor(ThreadPool.Names.REMOTE_STATE_CHECKSUM);
78+
CountDownLatch latch = new CountDownLatch(COMPONENT_SIZE);
79+
80+
executeChecksumTask((stream) -> {
81+
clusterState.routingTable().writeVerifiableTo(stream);
82+
return null;
83+
}, checksum -> routingTableChecksum = checksum, executorService, latch);
84+
85+
executeChecksumTask((stream) -> {
86+
clusterState.nodes().writeVerifiableTo(stream);
87+
return null;
88+
}, checksum -> nodesChecksum = checksum, executorService, latch);
89+
90+
executeChecksumTask((stream) -> {
91+
clusterState.coordinationMetadata().writeVerifiableTo(stream);
92+
return null;
93+
}, checksum -> coordinationMetadataChecksum = checksum, executorService, latch);
94+
95+
executeChecksumTask((stream) -> {
96+
Settings.writeSettingsToStream(clusterState.metadata().persistentSettings(), stream);
97+
return null;
98+
}, checksum -> settingMetadataChecksum = checksum, executorService, latch);
99+
100+
executeChecksumTask((stream) -> {
101+
Settings.writeSettingsToStream(clusterState.metadata().transientSettings(), stream);
102+
return null;
103+
}, checksum -> transientSettingsMetadataChecksum = checksum, executorService, latch);
104+
105+
executeChecksumTask((stream) -> {
106+
clusterState.metadata().templatesMetadata().writeVerifiableTo(stream);
107+
return null;
108+
}, checksum -> templatesMetadataChecksum = checksum, executorService, latch);
109+
110+
executeChecksumTask((stream) -> {
111+
stream.writeStringCollection(clusterState.metadata().customs().keySet());
112+
return null;
113+
}, checksum -> customMetadataMapChecksum = checksum, executorService, latch);
114+
115+
executeChecksumTask((stream) -> {
116+
((DiffableStringMap) clusterState.metadata().hashesOfConsistentSettings()).writeTo(stream);
117+
return null;
118+
}, checksum -> hashesOfConsistentSettingsChecksum = checksum, executorService, latch);
119+
120+
executeChecksumTask((stream) -> {
121+
stream.writeMapValues(
107122
clusterState.metadata().indices(),
108-
(stream, value) -> value.writeVerifiableTo((BufferedChecksumStreamOutput) stream)
123+
(checksumStream, value) -> value.writeVerifiableTo((BufferedChecksumStreamOutput) checksumStream)
109124
);
110-
indicesChecksum = checksumOut.getChecksum();
111-
112-
checksumOut.reset();
113-
clusterState.blocks().writeVerifiableTo(checksumOut);
114-
blocksChecksum = checksumOut.getChecksum();
115-
116-
checksumOut.reset();
117-
checksumOut.writeStringCollection(clusterState.customs().keySet());
118-
clusterStateCustomsChecksum = checksumOut.getChecksum();
119-
} catch (IOException e) {
120-
logger.error("Failed to create checksum for cluster state.", e);
125+
return null;
126+
}, checksum -> indicesChecksum = checksum, executorService, latch);
127+
128+
executeChecksumTask((stream) -> {
129+
clusterState.blocks().writeVerifiableTo(stream);
130+
return null;
131+
}, checksum -> blocksChecksum = checksum, executorService, latch);
132+
133+
executeChecksumTask((stream) -> {
134+
stream.writeStringCollection(clusterState.customs().keySet());
135+
return null;
136+
}, checksum -> clusterStateCustomsChecksum = checksum, executorService, latch);
137+
138+
try {
139+
latch.await();
140+
} catch (InterruptedException e) {
121141
throw new RemoteStateTransferException("Failed to create checksum for cluster state.", e);
122142
}
123143
createClusterStateChecksum();
144+
logger.debug("Checksum execution time {}", TimeValue.nsecToMSec(threadpool.relativeTimeInNanos() - start));
145+
}
146+
147+
private void executeChecksumTask(
148+
CheckedFunction<BufferedChecksumStreamOutput, Void, IOException> checksumTask,
149+
Consumer<Long> checksumConsumer,
150+
ExecutorService executorService,
151+
CountDownLatch latch
152+
) {
153+
executorService.execute(() -> {
154+
try {
155+
long checksum = createChecksum(checksumTask);
156+
checksumConsumer.accept(checksum);
157+
latch.countDown();
158+
} catch (IOException e) {
159+
throw new RemoteStateTransferException("Failed to execute checksum task", e);
160+
}
161+
});
162+
}
163+
164+
private long createChecksum(CheckedFunction<BufferedChecksumStreamOutput, Void, IOException> task) throws IOException {
165+
try (
166+
BytesStreamOutput out = new BytesStreamOutput();
167+
BufferedChecksumStreamOutput checksumOut = new BufferedChecksumStreamOutput(out)
168+
) {
169+
task.apply(checksumOut);
170+
return checksumOut.getChecksum();
171+
}
124172
}
125173

126174
private void createClusterStateChecksum() {

‎server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,9 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat
332332
uploadedMetadataResults,
333333
previousClusterUUID,
334334
clusterStateDiffManifest,
335-
!remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState) : null,
335+
!remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE)
336+
? new ClusterStateChecksum(clusterState, threadpool)
337+
: null,
336338
false,
337339
codecVersion
338340
);
@@ -539,7 +541,9 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
539541
uploadedMetadataResults,
540542
previousManifest.getPreviousClusterUUID(),
541543
clusterStateDiffManifest,
542-
!remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState) : null,
544+
!remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE)
545+
? new ClusterStateChecksum(clusterState, threadpool)
546+
: null,
543547
false,
544548
previousManifest.getCodecVersion()
545549
);
@@ -1010,7 +1014,9 @@ public RemoteClusterStateManifestInfo markLastStateAsCommitted(
10101014
uploadedMetadataResults,
10111015
previousManifest.getPreviousClusterUUID(),
10121016
previousManifest.getDiffManifest(),
1013-
!remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState) : null,
1017+
!remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE)
1018+
? new ClusterStateChecksum(clusterState, threadpool)
1019+
: null,
10141020
true,
10151021
previousManifest.getCodecVersion()
10161022
);
@@ -1631,7 +1637,7 @@ void validateClusterStateFromChecksum(
16311637
String localNodeId,
16321638
boolean isFullStateDownload
16331639
) {
1634-
ClusterStateChecksum newClusterStateChecksum = new ClusterStateChecksum(clusterState);
1640+
ClusterStateChecksum newClusterStateChecksum = new ClusterStateChecksum(clusterState, threadpool);
16351641
List<String> failedValidation = newClusterStateChecksum.getMismatchEntities(manifest.getClusterStateChecksum());
16361642
if (failedValidation.isEmpty()) {
16371643
return;

‎server/src/main/java/org/opensearch/threadpool/ThreadPool.java

+7
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.opensearch.core.service.ReportingService;
5454
import org.opensearch.core.xcontent.ToXContentFragment;
5555
import org.opensearch.core.xcontent.XContentBuilder;
56+
import org.opensearch.gateway.remote.ClusterStateChecksum;
5657
import org.opensearch.node.Node;
5758

5859
import java.io.IOException;
@@ -118,6 +119,7 @@ public static class Names {
118119
public static final String REMOTE_RECOVERY = "remote_recovery";
119120
public static final String REMOTE_STATE_READ = "remote_state_read";
120121
public static final String INDEX_SEARCHER = "index_searcher";
122+
public static final String REMOTE_STATE_CHECKSUM = "remote_state_checksum";
121123
}
122124

123125
/**
@@ -191,6 +193,7 @@ public static ThreadPoolType fromType(String type) {
191193
map.put(Names.REMOTE_RECOVERY, ThreadPoolType.SCALING);
192194
map.put(Names.REMOTE_STATE_READ, ThreadPoolType.SCALING);
193195
map.put(Names.INDEX_SEARCHER, ThreadPoolType.RESIZABLE);
196+
map.put(Names.REMOTE_STATE_CHECKSUM, ThreadPoolType.FIXED);
194197
THREAD_POOL_TYPES = Collections.unmodifiableMap(map);
195198
}
196199

@@ -307,6 +310,10 @@ public ThreadPool(
307310
runnableTaskListener
308311
)
309312
);
313+
builders.put(
314+
Names.REMOTE_STATE_CHECKSUM,
315+
new FixedExecutorBuilder(settings, Names.REMOTE_STATE_CHECKSUM, ClusterStateChecksum.COMPONENT_SIZE, 1000)
316+
);
310317

311318
for (final ExecutorBuilder<?> builder : customBuilders) {
312319
if (builders.containsKey(builder.name())) {

‎server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@
3434
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute;
3535
import org.opensearch.test.EqualsHashCodeTestUtils;
3636
import org.opensearch.test.OpenSearchTestCase;
37+
import org.opensearch.threadpool.TestThreadPool;
38+
import org.opensearch.threadpool.ThreadPool;
39+
import org.junit.After;
3740

3841
import java.io.IOException;
3942
import java.util.ArrayList;
@@ -64,6 +67,14 @@
6467

6568
public class ClusterMetadataManifestTests extends OpenSearchTestCase {
6669

70+
private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
71+
72+
@After
73+
public void teardown() throws Exception {
74+
super.tearDown();
75+
threadPool.shutdown();
76+
}
77+
6778
public void testClusterMetadataManifestXContentV0() throws IOException {
6879
UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path", CODEC_V0);
6980
ClusterMetadataManifest originalManifest = ClusterMetadataManifest.builder()
@@ -214,7 +225,7 @@ public void testClusterMetadataManifestSerializationEqualsHashCode() {
214225
"indicesRoutingDiffPath"
215226
)
216227
)
217-
.checksum(new ClusterStateChecksum(createClusterState()))
228+
.checksum(new ClusterStateChecksum(createClusterState(), threadPool))
218229
.build();
219230
{ // Mutate Cluster Term
220231
EqualsHashCodeTestUtils.checkEqualsAndHashCode(
@@ -647,7 +658,7 @@ public void testClusterMetadataManifestXContentV4() throws IOException {
647658
UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path");
648659
UploadedMetadataAttribute uploadedMetadataAttribute = new UploadedMetadataAttribute("attribute_name", "testing_attribute");
649660
final StringKeyDiffProvider<IndexRoutingTable> routingTableIncrementalDiff = Mockito.mock(StringKeyDiffProvider.class);
650-
ClusterStateChecksum checksum = new ClusterStateChecksum(createClusterState());
661+
ClusterStateChecksum checksum = new ClusterStateChecksum(createClusterState(), threadPool);
651662
ClusterMetadataManifest originalManifest = ClusterMetadataManifest.builder()
652663
.clusterTerm(1L)
653664
.stateVersion(1L)

‎server/src/test/java/org/opensearch/gateway/remote/ClusterStateChecksumTests.java

+21-11
Original file line numberDiff line numberDiff line change
@@ -34,21 +34,31 @@
3434
import org.opensearch.core.xcontent.XContentBuilder;
3535
import org.opensearch.core.xcontent.XContentParser;
3636
import org.opensearch.test.OpenSearchTestCase;
37+
import org.opensearch.threadpool.TestThreadPool;
38+
import org.opensearch.threadpool.ThreadPool;
39+
import org.junit.After;
3740

3841
import java.io.IOException;
3942
import java.util.EnumSet;
4043
import java.util.List;
4144
import java.util.Map;
4245

4346
public class ClusterStateChecksumTests extends OpenSearchTestCase {
47+
private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
48+
49+
@After
50+
public void teardown() throws Exception {
51+
super.tearDown();
52+
threadPool.shutdown();
53+
}
4454

4555
public void testClusterStateChecksumEmptyClusterState() {
46-
ClusterStateChecksum checksum = new ClusterStateChecksum(ClusterState.EMPTY_STATE);
56+
ClusterStateChecksum checksum = new ClusterStateChecksum(ClusterState.EMPTY_STATE, threadPool);
4757
assertNotNull(checksum);
4858
}
4959

5060
public void testClusterStateChecksum() {
51-
ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState());
61+
ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState(), threadPool);
5262
assertNotNull(checksum);
5363
assertTrue(checksum.routingTableChecksum != 0);
5464
assertTrue(checksum.nodesChecksum != 0);
@@ -65,8 +75,8 @@ public void testClusterStateChecksum() {
6575
}
6676

6777
public void testClusterStateMatchChecksum() {
68-
ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState());
69-
ClusterStateChecksum newChecksum = new ClusterStateChecksum(generateClusterState());
78+
ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState(), threadPool);
79+
ClusterStateChecksum newChecksum = new ClusterStateChecksum(generateClusterState(), threadPool);
7080
assertNotNull(checksum);
7181
assertNotNull(newChecksum);
7282
assertEquals(checksum.routingTableChecksum, newChecksum.routingTableChecksum);
@@ -84,7 +94,7 @@ public void testClusterStateMatchChecksum() {
8494
}
8595

8696
public void testXContentConversion() throws IOException {
87-
ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState());
97+
ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState(), threadPool);
8898
final XContentBuilder builder = JsonXContent.contentBuilder();
8999
builder.startObject();
90100
checksum.toXContent(builder, ToXContent.EMPTY_PARAMS);
@@ -97,7 +107,7 @@ public void testXContentConversion() throws IOException {
97107
}
98108

99109
public void testSerialization() throws IOException {
100-
ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState());
110+
ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState(), threadPool);
101111
BytesStreamOutput output = new BytesStreamOutput();
102112
checksum.writeTo(output);
103113

@@ -109,10 +119,10 @@ public void testSerialization() throws IOException {
109119

110120
public void testGetMismatchEntities() {
111121
ClusterState clsState1 = generateClusterState();
112-
ClusterStateChecksum checksum = new ClusterStateChecksum(clsState1);
122+
ClusterStateChecksum checksum = new ClusterStateChecksum(clsState1, threadPool);
113123
assertTrue(checksum.getMismatchEntities(checksum).isEmpty());
114124

115-
ClusterStateChecksum checksum2 = new ClusterStateChecksum(clsState1);
125+
ClusterStateChecksum checksum2 = new ClusterStateChecksum(clsState1, threadPool);
116126
assertTrue(checksum.getMismatchEntities(checksum2).isEmpty());
117127

118128
ClusterState clsState2 = ClusterState.builder(ClusterName.DEFAULT)
@@ -122,7 +132,7 @@ public void testGetMismatchEntities() {
122132
.customs(Map.of())
123133
.metadata(Metadata.EMPTY_METADATA)
124134
.build();
125-
ClusterStateChecksum checksum3 = new ClusterStateChecksum(clsState2);
135+
ClusterStateChecksum checksum3 = new ClusterStateChecksum(clsState2, threadPool);
126136
List<String> mismatches = checksum.getMismatchEntities(checksum3);
127137
assertFalse(mismatches.isEmpty());
128138
assertEquals(11, mismatches.size());
@@ -151,8 +161,8 @@ public void testGetMismatchEntitiesUnorderedInput() {
151161
ClusterState state2 = ClusterState.builder(state1).nodes(nodes1).build();
152162
ClusterState state3 = ClusterState.builder(state1).nodes(nodes2).build();
153163

154-
ClusterStateChecksum checksum1 = new ClusterStateChecksum(state2);
155-
ClusterStateChecksum checksum2 = new ClusterStateChecksum(state3);
164+
ClusterStateChecksum checksum1 = new ClusterStateChecksum(state2, threadPool);
165+
ClusterStateChecksum checksum2 = new ClusterStateChecksum(state3, threadPool);
156166
assertEquals(checksum2, checksum1);
157167
}
158168

‎server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java

+14-14
Original file line numberDiff line numberDiff line change
@@ -3123,7 +3123,7 @@ public void testWriteFullMetadataSuccessWithChecksumValidationEnabled() throws I
31233123
.previousClusterUUID("prev-cluster-uuid")
31243124
.routingTableVersion(1L)
31253125
.indicesRouting(List.of(uploadedIndiceRoutingMetadata))
3126-
.checksum(new ClusterStateChecksum(clusterState))
3126+
.checksum(new ClusterStateChecksum(clusterState, threadPool))
31273127
.build();
31283128

31293129
assertThat(manifest.getIndices().size(), is(1));
@@ -3193,7 +3193,7 @@ public void testWriteIncrementalMetadataSuccessWithChecksumValidationEnabled() t
31933193

31943194
final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder()
31953195
.indices(Collections.emptyList())
3196-
.checksum(new ClusterStateChecksum(clusterState))
3196+
.checksum(new ClusterStateChecksum(clusterState, threadPool))
31973197
.build();
31983198
when((blobStoreRepository.basePath())).thenReturn(BlobPath.cleanPath().add("base-path"));
31993199

@@ -3219,7 +3219,7 @@ public void testWriteIncrementalMetadataSuccessWithChecksumValidationEnabled() t
32193219
.previousClusterUUID("prev-cluster-uuid")
32203220
.routingTableVersion(1)
32213221
.indicesRouting(List.of(uploadedIndiceRoutingMetadata))
3222-
.checksum(new ClusterStateChecksum(clusterState))
3222+
.checksum(new ClusterStateChecksum(clusterState, threadPool))
32233223
.build();
32243224

32253225
assertThat(manifest.getIndices().size(), is(1));
@@ -3245,7 +3245,7 @@ public void testWriteIncrementalMetadataSuccessWithChecksumValidationModeNone()
32453245

32463246
final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder()
32473247
.indices(Collections.emptyList())
3248-
.checksum(new ClusterStateChecksum(clusterState))
3248+
.checksum(new ClusterStateChecksum(clusterState, threadPool))
32493249
.build();
32503250
when((blobStoreRepository.basePath())).thenReturn(BlobPath.cleanPath().add("base-path"));
32513251

@@ -3271,7 +3271,7 @@ public void testWriteIncrementalMetadataSuccessWithChecksumValidationModeNone()
32713271
.previousClusterUUID("prev-cluster-uuid")
32723272
.routingTableVersion(1)
32733273
.indicesRouting(List.of(uploadedIndiceRoutingMetadata))
3274-
.checksum(new ClusterStateChecksum(clusterState))
3274+
.checksum(new ClusterStateChecksum(clusterState, threadPool))
32753275
.build();
32763276

32773277
assertThat(manifest.getIndices().size(), is(1));
@@ -3349,7 +3349,7 @@ public void testGetClusterStateForManifestWithChecksumValidationEnabled() throws
33493349
initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE);
33503350
ClusterState clusterState = generateClusterStateWithAllAttributes().build();
33513351
ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum(
3352-
new ClusterStateChecksum(clusterState)
3352+
new ClusterStateChecksum(clusterState, threadPool)
33533353
).build();
33543354
remoteClusterStateService.start();
33553355
RemoteClusterStateService mockService = spy(remoteClusterStateService);
@@ -3382,7 +3382,7 @@ public void testGetClusterStateForManifestWithChecksumValidationModeNone() throw
33823382
initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.NONE);
33833383
ClusterState clusterState = generateClusterStateWithAllAttributes().build();
33843384
ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum(
3385-
new ClusterStateChecksum(clusterState)
3385+
new ClusterStateChecksum(clusterState, threadPool)
33863386
).build();
33873387
remoteClusterStateService.start();
33883388
RemoteClusterStateService mockService = spy(remoteClusterStateService);
@@ -3415,7 +3415,7 @@ public void testGetClusterStateForManifestWithChecksumValidationEnabledWithMisma
34153415
initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE);
34163416
ClusterState clusterState = generateClusterStateWithAllAttributes().build();
34173417
ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum(
3418-
new ClusterStateChecksum(clusterState)
3418+
new ClusterStateChecksum(clusterState, threadPool)
34193419
).build();
34203420
remoteClusterStateService.start();
34213421
RemoteClusterStateService mockService = spy(remoteClusterStateService);
@@ -3465,7 +3465,7 @@ public void testGetClusterStateForManifestWithChecksumValidationDebugWithMismatc
34653465
);
34663466
ClusterState clusterState = generateClusterStateWithAllAttributes().build();
34673467
ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum(
3468-
new ClusterStateChecksum(clusterState)
3468+
new ClusterStateChecksum(clusterState, threadPool)
34693469
).build();
34703470
remoteClusterStateService.start();
34713471
RemoteClusterStateService mockService = spy(remoteClusterStateService);
@@ -3505,7 +3505,7 @@ public void testGetClusterStateUsingDiffWithChecksum() throws IOException {
35053505
initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE);
35063506
ClusterState clusterState = generateClusterStateWithAllAttributes().build();
35073507
ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum(
3508-
new ClusterStateChecksum(clusterState)
3508+
new ClusterStateChecksum(clusterState, threadPool)
35093509
).diffManifest(ClusterStateDiffManifest.builder().build()).build();
35103510

35113511
remoteClusterStateService.start();
@@ -3547,7 +3547,7 @@ public void testGetClusterStateUsingDiffWithChecksumModeNone() throws IOExceptio
35473547
initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.NONE);
35483548
ClusterState clusterState = generateClusterStateWithAllAttributes().build();
35493549
ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum(
3550-
new ClusterStateChecksum(clusterState)
3550+
new ClusterStateChecksum(clusterState, threadPool)
35513551
).diffManifest(ClusterStateDiffManifest.builder().build()).build();
35523552

35533553
remoteClusterStateService.start();
@@ -3589,7 +3589,7 @@ public void testGetClusterStateUsingDiffWithChecksumModeDebugMismatch() throws I
35893589
initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.DEBUG);
35903590
ClusterState clusterState = generateClusterStateWithAllAttributes().build();
35913591
ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum(
3592-
new ClusterStateChecksum(clusterState)
3592+
new ClusterStateChecksum(clusterState, threadPool)
35933593
).diffManifest(ClusterStateDiffManifest.builder().build()).build();
35943594

35953595
remoteClusterStateService.start();
@@ -3630,7 +3630,7 @@ public void testGetClusterStateUsingDiffWithChecksumModeTraceMismatch() throws I
36303630
initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.TRACE);
36313631
ClusterState clusterState = generateClusterStateWithAllAttributes().build();
36323632
ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum(
3633-
new ClusterStateChecksum(clusterState)
3633+
new ClusterStateChecksum(clusterState, threadPool)
36343634
).diffManifest(ClusterStateDiffManifest.builder().build()).build();
36353635

36363636
remoteClusterStateService.start();
@@ -3692,7 +3692,7 @@ public void testGetClusterStateUsingDiffWithChecksumMismatch() throws IOExceptio
36923692
initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE);
36933693
ClusterState clusterState = generateClusterStateWithAllAttributes().build();
36943694
ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum(
3695-
new ClusterStateChecksum(clusterState)
3695+
new ClusterStateChecksum(clusterState, threadPool)
36963696
).diffManifest(ClusterStateDiffManifest.builder().build()).build();
36973697

36983698
remoteClusterStateService.start();

0 commit comments

Comments
 (0)
Please sign in to comment.