|
10 | 10 |
|
11 | 11 | import org.apache.logging.log4j.LogManager;
|
12 | 12 | import org.apache.logging.log4j.Logger;
|
13 |
| -import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; |
14 | 13 | import org.opensearch.action.admin.cluster.node.stats.NodeStats;
|
15 | 14 | import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
|
16 | 15 | import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
|
17 | 16 | import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
|
18 | 17 | import org.opensearch.client.Client;
|
19 |
| -import org.opensearch.cluster.ClusterState; |
20 | 18 | import org.opensearch.cluster.coordination.CoordinationState;
|
21 | 19 | import org.opensearch.cluster.coordination.PersistedStateRegistry;
|
22 |
| -import org.opensearch.cluster.coordination.PublishClusterStateStats; |
23 | 20 | import org.opensearch.common.blobstore.BlobPath;
|
24 | 21 | import org.opensearch.common.settings.Settings;
|
25 | 22 | import org.opensearch.common.util.concurrent.ThreadContext;
|
|
60 | 57 | import java.util.List;
|
61 | 58 | import java.util.Locale;
|
62 | 59 | import java.util.Map;
|
63 |
| -import java.util.Objects; |
64 | 60 | import java.util.Set;
|
65 | 61 | import java.util.concurrent.ExecutionException;
|
66 | 62 | import java.util.function.Function;
|
67 | 63 | import java.util.stream.Collectors;
|
68 | 64 |
|
69 |
| -import static org.opensearch.action.admin.cluster.node.info.NodesInfoRequest.Metric.SETTINGS; |
70 | 65 | import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.DISCOVERY;
|
71 | 66 | import static org.opensearch.cluster.coordination.PublicationTransportHandler.PUBLISH_REMOTE_STATE_ACTION_NAME;
|
72 | 67 | import static org.opensearch.cluster.coordination.PublicationTransportHandler.PUBLISH_STATE_ACTION_NAME;
|
73 |
| -import static org.opensearch.cluster.metadata.Metadata.isGlobalStateEquals; |
74 | 68 | import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.DISCOVERY_NODES;
|
75 | 69 | import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
|
76 |
| -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING; |
77 | 70 | import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY;
|
78 | 71 | import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
|
79 | 72 | import static org.opensearch.gateway.remote.RemoteDownloadStats.CHECKSUM_VALIDATION_FAILED_COUNT;
|
@@ -269,113 +262,6 @@ public void testRemotePublicationDownloadStats() {
|
269 | 262 | assertDataNodeDownloadStats(nodesStatsResponseDataNode.getNodes().get(0));
|
270 | 263 | }
|
271 | 264 |
|
272 |
| - public void testRemotePublicationDisabledByRollingRestart() throws Exception { |
273 |
| - prepareCluster(3, 2, INDEX_NAME, 1, 2); |
274 |
| - ensureStableCluster(5); |
275 |
| - ensureGreen(INDEX_NAME); |
276 |
| - |
277 |
| - Set<String> clusterManagers = internalCluster().getClusterManagerNames(); |
278 |
| - Set<String> restartedMasters = new HashSet<>(); |
279 |
| - |
280 |
| - for (String clusterManager : clusterManagers) { |
281 |
| - internalCluster().restartNode(clusterManager, new InternalTestCluster.RestartCallback() { |
282 |
| - @Override |
283 |
| - public Settings onNodeStopped(String nodeName) { |
284 |
| - restartedMasters.add(nodeName); |
285 |
| - return Settings.builder().put(REMOTE_PUBLICATION_SETTING_KEY, false).build(); |
286 |
| - } |
287 |
| - |
288 |
| - @Override |
289 |
| - public void doAfterNodes(int n, Client client) { |
290 |
| - String activeCM = internalCluster().getClusterManagerName(); |
291 |
| - Set<String> followingCMs = clusterManagers.stream() |
292 |
| - .filter(node -> !Objects.equals(node, activeCM)) |
293 |
| - .collect(Collectors.toSet()); |
294 |
| - boolean activeCMRestarted = restartedMasters.contains(activeCM); |
295 |
| - NodesStatsResponse response = client().admin() |
296 |
| - .cluster() |
297 |
| - .prepareNodesStats(followingCMs.toArray(new String[0])) |
298 |
| - .clear() |
299 |
| - .addMetric(DISCOVERY.metricName()) |
300 |
| - .get(); |
301 |
| - // after master is flipped to restarted master, publication should happen on Transport |
302 |
| - response.getNodes().forEach(nodeStats -> { |
303 |
| - if (activeCMRestarted) { |
304 |
| - PublishClusterStateStats stats = nodeStats.getDiscoveryStats().getPublishStats(); |
305 |
| - assertTrue( |
306 |
| - stats.getFullClusterStateReceivedCount() > 0 || stats.getCompatibleClusterStateDiffReceivedCount() > 0 |
307 |
| - ); |
308 |
| - } else { |
309 |
| - DiscoveryStats stats = nodeStats.getDiscoveryStats(); |
310 |
| - assertEquals(0, stats.getPublishStats().getFullClusterStateReceivedCount()); |
311 |
| - assertEquals(0, stats.getPublishStats().getCompatibleClusterStateDiffReceivedCount()); |
312 |
| - assertEquals(0, stats.getPublishStats().getIncompatibleClusterStateDiffReceivedCount()); |
313 |
| - } |
314 |
| - }); |
315 |
| - |
316 |
| - NodesInfoResponse nodesInfoResponse = client().admin() |
317 |
| - .cluster() |
318 |
| - .prepareNodesInfo(activeCM) |
319 |
| - .clear() |
320 |
| - .addMetric(SETTINGS.metricName()) |
321 |
| - .get(); |
322 |
| - // if masterRestarted is true Publication Setting should be false, and vice versa |
323 |
| - assertTrue(REMOTE_PUBLICATION_SETTING.get(nodesInfoResponse.getNodes().get(0).getSettings()) != activeCMRestarted); |
324 |
| - |
325 |
| - followingCMs.forEach(node -> { |
326 |
| - PersistedStateRegistry registry = internalCluster().getInstance(PersistedStateRegistry.class, node); |
327 |
| - CoordinationState.PersistedState remoteState = registry.getPersistedState( |
328 |
| - PersistedStateRegistry.PersistedStateType.REMOTE |
329 |
| - ); |
330 |
| - if (activeCMRestarted) { |
331 |
| - assertNull(remoteState.getLastAcceptedState()); |
332 |
| - assertNull(remoteState.getLastAcceptedManifest()); |
333 |
| - } else { |
334 |
| - ClusterState localState = registry.getPersistedState(PersistedStateRegistry.PersistedStateType.LOCAL) |
335 |
| - .getLastAcceptedState(); |
336 |
| - ClusterState remotePersistedState = remoteState.getLastAcceptedState(); |
337 |
| - assertTrue(isGlobalStateEquals(localState.metadata(), remotePersistedState.metadata())); |
338 |
| - assertEquals(localState.nodes(), remotePersistedState.nodes()); |
339 |
| - assertEquals(localState.routingTable(), remotePersistedState.routingTable()); |
340 |
| - assertEquals(localState.customs(), remotePersistedState.customs()); |
341 |
| - } |
342 |
| - }); |
343 |
| - } |
344 |
| - }); |
345 |
| - |
346 |
| - } |
347 |
| - ensureGreen(INDEX_NAME); |
348 |
| - ensureStableCluster(5); |
349 |
| - |
350 |
| - String activeCM = internalCluster().getClusterManagerName(); |
351 |
| - Set<String> followingCMs = clusterManagers.stream().filter(node -> !Objects.equals(node, activeCM)).collect(Collectors.toSet()); |
352 |
| - NodesStatsResponse response = client().admin() |
353 |
| - .cluster() |
354 |
| - .prepareNodesStats(followingCMs.toArray(new String[0])) |
355 |
| - .clear() |
356 |
| - .addMetric(DISCOVERY.metricName()) |
357 |
| - .get(); |
358 |
| - response.getNodes().forEach(nodeStats -> { |
359 |
| - PublishClusterStateStats stats = nodeStats.getDiscoveryStats().getPublishStats(); |
360 |
| - assertTrue(stats.getFullClusterStateReceivedCount() > 0 || stats.getCompatibleClusterStateDiffReceivedCount() > 0); |
361 |
| - }); |
362 |
| - NodesInfoResponse nodesInfoResponse = client().admin() |
363 |
| - .cluster() |
364 |
| - .prepareNodesInfo(activeCM) |
365 |
| - .clear() |
366 |
| - .addMetric(SETTINGS.metricName()) |
367 |
| - .get(); |
368 |
| - // if masterRestarted is true Publication Setting should be false, and vice versa |
369 |
| - assertFalse(REMOTE_PUBLICATION_SETTING.get(nodesInfoResponse.getNodes().get(0).getSettings())); |
370 |
| - |
371 |
| - followingCMs.forEach(node -> { |
372 |
| - PersistedStateRegistry registry = internalCluster().getInstance(PersistedStateRegistry.class, node); |
373 |
| - CoordinationState.PersistedState remoteState = registry.getPersistedState(PersistedStateRegistry.PersistedStateType.REMOTE); |
374 |
| - assertNull(remoteState.getLastAcceptedState()); |
375 |
| - assertNull(remoteState.getLastAcceptedManifest()); |
376 |
| - }); |
377 |
| - } |
378 |
| - |
379 | 265 | public void testMasterReElectionUsesIncrementalUpload() throws IOException {
|
380 | 266 | prepareCluster(3, 2, INDEX_NAME, 1, 1);
|
381 | 267 | PersistedStateRegistry persistedStateRegistry = internalCluster().getClusterManagerNodeInstance(PersistedStateRegistry.class);
|
|
0 commit comments