Skip to content

Commit b054499

Browse files
committed
fix loop in update rep
Signed-off-by: kkewwei <[email protected]> Signed-off-by: kkewwei <[email protected]>
1 parent 1275017 commit b054499

File tree

8 files changed

+147
-13
lines changed

8 files changed

+147
-13
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4646
- Fix exists queries on nested flat_object fields throws exception ([#16803](https://github.com/opensearch-project/OpenSearch/pull/16803))
4747
- Add highlighting for wildcard search on `match_only_text` field ([#17101](https://github.com/opensearch-project/OpenSearch/pull/17101))
4848
- Fix illegal argument exception when creating a PIT ([#16781](https://github.com/opensearch-project/OpenSearch/pull/16781))
49+
- Fix simultaneously creating a snapshot and updating the repository can potentially trigger an infinite loop ([#17532](https://github.com/opensearch-project/OpenSearch/pull/17532))
4950

5051
### Security
5152

server/src/internalClusterTest/java/org/opensearch/repositories/RepositoriesServiceIT.java

+66
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@
3232

3333
package org.opensearch.repositories;
3434

35+
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
3536
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
37+
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
3638
import org.opensearch.cluster.metadata.RepositoryMetadata;
3739
import org.opensearch.common.settings.Settings;
3840
import org.opensearch.plugins.Plugin;
@@ -45,6 +47,8 @@
4547
import java.util.Collection;
4648
import java.util.Collections;
4749

50+
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
51+
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
4852
import static org.hamcrest.Matchers.equalTo;
4953
import static org.hamcrest.Matchers.hasSize;
5054
import static org.hamcrest.Matchers.instanceOf;
@@ -122,4 +126,66 @@ public void testSystemRepositoryCantBeCreated() {
122126

123127
assertThrows(RepositoryException.class, () -> createRepository(repositoryName, FsRepository.TYPE, repoSettings));
124128
}
129+
130+
public void testCreatSnapAndUpdateReposityCauseInfiniteLoop() throws InterruptedException {
131+
// create index
132+
internalCluster();
133+
String indexName = "test-index";
134+
createIndex(indexName, Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, 0).put(SETTING_NUMBER_OF_SHARDS, 1).build());
135+
index(indexName, "_doc", "1", Collections.singletonMap("user", generateRandomStringArray(1, 10, false, false)));
136+
flush(indexName);
137+
138+
// create repository
139+
final String repositoryName = "test-repo";
140+
Settings.Builder repoSettings = Settings.builder()
141+
.put("location", randomRepoPath())
142+
.put("max_snapshot_bytes_per_sec", "10mb")
143+
.put("max_restore_bytes_per_sec", "10mb");
144+
OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides(
145+
client().admin().cluster(),
146+
repositoryName,
147+
FsRepository.TYPE,
148+
true,
149+
repoSettings
150+
);
151+
152+
Thread thread = new Thread(() -> {
153+
String snapshotName = "test-snapshot";
154+
logger.info("--> starting snapshot");
155+
CreateSnapshotResponse createSnapshotResponse = client().admin()
156+
.cluster()
157+
.prepareCreateSnapshot(repositoryName, snapshotName)
158+
.setWaitForCompletion(true)
159+
.setIndices(indexName)
160+
.get();
161+
logger.info("--> finishing snapshot");
162+
});
163+
thread.start();
164+
165+
logger.info("--> begin to reset repository");
166+
repoSettings = Settings.builder().put("location", randomRepoPath()).put("max_snapshot_bytes_per_sec", "300mb");
167+
OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides(
168+
client().admin().cluster(),
169+
repositoryName,
170+
FsRepository.TYPE,
171+
true,
172+
repoSettings
173+
);
174+
logger.info("--> finish to reset repository");
175+
176+
GetRepositoriesRequest getRepositoriesRequest = new GetRepositoriesRequest(new String[] { repositoryName });
177+
try {
178+
GetRepositoriesResponse getRepositoriesResponse = client().admin().cluster().getRepositories(getRepositoriesRequest).get();
179+
assertThat(getRepositoriesResponse.repositories(), hasSize(1));
180+
RepositoryMetadata repositoryMetadata = getRepositoriesResponse.repositories().get(0);
181+
assertThat(repositoryMetadata.type(), equalTo(FsRepository.TYPE));
182+
assertThat(repositoryMetadata.settings().get("max_snapshot_bytes_per_sec"), equalTo("300mb"));
183+
assertThat(repositoryMetadata.settings().hasValue("max_restore_bytes_per_sec"), equalTo(false));
184+
} catch (Exception e) {
185+
throw new RuntimeException(e);
186+
}
187+
logger.info("--> finish to get response about repository");
188+
thread.join();
189+
}
190+
125191
}

server/src/main/java/org/opensearch/repositories/FilterRepository.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import java.util.Map;
5959
import java.util.function.Consumer;
6060
import java.util.function.Function;
61+
import java.util.function.Supplier;
6162

6263
/**
6364
* Repository that is filtered
@@ -288,9 +289,10 @@ public void updateState(ClusterState state) {
288289
public void executeConsistentStateUpdate(
289290
Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
290291
String source,
292+
Supplier<Repository> currentRepositeSupplier,
291293
Consumer<Exception> onFailure
292294
) {
293-
in.executeConsistentStateUpdate(createUpdateTask, source, onFailure);
295+
in.executeConsistentStateUpdate(createUpdateTask, source, currentRepositeSupplier, onFailure);
294296
}
295297

296298
@Override
@@ -345,4 +347,9 @@ public void stop() {
345347
public void close() {
346348
in.close();
347349
}
350+
351+
@Override
352+
public boolean isOpen() {
353+
return in.isOpen();
354+
}
348355
}

server/src/main/java/org/opensearch/repositories/Repository.java

+14
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import java.util.Map;
6666
import java.util.function.Consumer;
6767
import java.util.function.Function;
68+
import java.util.function.Supplier;
6869

6970
/**
7071
* An interface for interacting with a repository in snapshot and restore.
@@ -545,9 +546,19 @@ default IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotInfo snapshotInf
545546
void executeConsistentStateUpdate(
546547
Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
547548
String source,
549+
Supplier<Repository> currentRepositeSupplier,
548550
Consumer<Exception> onFailure
549551
);
550552

553+
@Deprecated
554+
default void executeConsistentStateUpdate(
555+
Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
556+
String source,
557+
Consumer<Exception> onFailure
558+
) {
559+
executeConsistentStateUpdate(createUpdateTask, source, () -> this, onFailure);
560+
}
561+
551562
/**
552563
* Clones a shard snapshot.
553564
*
@@ -611,4 +622,7 @@ default void reload(RepositoryMetadata repositoryMetadata) {}
611622
* Validate the repository metadata
612623
*/
613624
default void validateMetadata(RepositoryMetadata repositoryMetadata) {}
625+
626+
boolean isOpen();
627+
614628
}

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

+23-3
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,8 @@ protected static long calculateMaxWithinIntLimit(long defaultThresholdOfHeap, lo
551551
*/
552552
protected volatile int bufferSize;
553553

554+
private volatile boolean closed;
555+
554556
/**
555557
* Constructs new BlobStoreRepository
556558
* @param repositoryMetadata The metadata for this repository including name and settings
@@ -630,6 +632,7 @@ protected void doClose() {
630632
}
631633
if (store != null) {
632634
try {
635+
closed = true;
633636
store.close();
634637
} catch (Exception t) {
635638
logger.warn("cannot close blob store", t);
@@ -641,10 +644,22 @@ protected void doClose() {
641644
public void executeConsistentStateUpdate(
642645
Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
643646
String source,
647+
Supplier<Repository> currentRepositeSupplier,
644648
Consumer<Exception> onFailure
645649
) {
646-
final RepositoryMetadata repositoryMetadataStart = metadata;
647-
getRepositoryData(ActionListener.wrap(repositoryData -> {
650+
Repository currentRepository = this;
651+
final RepositoryMetadata repositoryMetadataStart;
652+
if (currentRepository != currentRepositeSupplier.get()) {
653+
if (this.isOpen()) {
654+
throw new IllegalStateException("the repository should be closed");
655+
}
656+
currentRepository = currentRepositeSupplier.get();
657+
repositoryMetadataStart = currentRepository.getMetadata();
658+
} else {
659+
repositoryMetadataStart = metadata;
660+
}
661+
662+
currentRepository.getRepositoryData(ActionListener.wrap(repositoryData -> {
648663
final ClusterStateUpdateTask updateTask = createUpdateTask.apply(repositoryData);
649664
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask(updateTask.priority()) {
650665

@@ -679,7 +694,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
679694
if (executedTask) {
680695
updateTask.clusterStateProcessed(source, oldState, newState);
681696
} else {
682-
executeConsistentStateUpdate(createUpdateTask, source, onFailure);
697+
executeConsistentStateUpdate(createUpdateTask, source, currentRepositeSupplier, onFailure);
683698
}
684699
}
685700

@@ -4690,6 +4705,11 @@ private void checkAborted() {
46904705
}
46914706
}
46924707

4708+
@Override
4709+
public boolean isOpen() {
4710+
return closed == false;
4711+
}
4712+
46934713
private static void failStoreIfCorrupted(Store store, Exception e) {
46944714
if (Lucene.isCorruptionException(e)) {
46954715
try {

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

+16-8
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl
451451
public TimeValue timeout() {
452452
return request.clusterManagerNodeTimeout();
453453
}
454-
}, "create_snapshot [" + snapshotName + ']', listener::onFailure);
454+
}, "create_snapshot [" + snapshotName + ']', () -> repositoriesService.repository(request.repository()), listener::onFailure);
455455
}
456456

457457
/**
@@ -640,7 +640,7 @@ public TimeValue timeout() {
640640
return request.clusterManagerNodeTimeout();
641641
}
642642

643-
}, "create_snapshot [" + snapshotName + ']', listener::onFailure);
643+
}, "create_snapshot [" + snapshotName + ']', () -> repositoriesService.repository(repositoryName), listener::onFailure);
644644
}
645645

646646
private void cleanOrphanTimestamp(String repoName, RepositoryData repositoryData) {
@@ -1062,7 +1062,11 @@ public void onFailure(Exception e) {
10621062
public TimeValue timeout() {
10631063
return request.clusterManagerNodeTimeout();
10641064
}
1065-
}, "clone_snapshot_v2 [" + request.source() + "][" + snapshotName + ']', listener::onFailure);
1065+
},
1066+
"clone_snapshot_v2 [" + request.source() + "][" + snapshotName + ']',
1067+
() -> repositoriesService.repository(repositoryName),
1068+
listener::onFailure
1069+
);
10661070
}
10671071

10681072
// TODO: It is worth revisiting the design choice of creating a placeholder entry in snapshots-in-progress here once we have a cache
@@ -1148,14 +1152,18 @@ public void onFailure(String source, Exception e) {
11481152
public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) {
11491153
logger.info("snapshot clone [{}] started", snapshot);
11501154
addListener(snapshot, ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure));
1151-
startCloning(repository, newEntry);
1155+
startCloning(repository, repositoryName, newEntry);
11521156
}
11531157

11541158
@Override
11551159
public TimeValue timeout() {
11561160
return request.clusterManagerNodeTimeout();
11571161
}
1158-
}, "clone_snapshot [" + request.source() + "][" + snapshotName + ']', listener::onFailure);
1162+
},
1163+
"clone_snapshot [" + request.source() + "][" + snapshotName + ']',
1164+
() -> repositoriesService.repository(repositoryName),
1165+
listener::onFailure
1166+
);
11591167
}
11601168

11611169
private static void ensureNoCleanupInProgress(ClusterState currentState, String repositoryName, String snapshotName) {
@@ -1189,7 +1197,7 @@ private static void ensureSnapshotNameAvailableInRepo(RepositoryData repositoryD
11891197
* @param repository repository to run operation on
11901198
* @param cloneEntry clone operation in the cluster state
11911199
*/
1192-
private void startCloning(Repository repository, SnapshotsInProgress.Entry cloneEntry) {
1200+
private void startCloning(Repository repository, String repositoryName, SnapshotsInProgress.Entry cloneEntry) {
11931201
final List<IndexId> indices = cloneEntry.indices();
11941202
final SnapshotId sourceSnapshot = cloneEntry.source();
11951203
final Snapshot targetSnapshot = cloneEntry.snapshot();
@@ -1310,7 +1318,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
13101318
logger.warn("Did not find expected entry [{}] in the cluster state", cloneEntry);
13111319
}
13121320
}
1313-
}, "start snapshot clone", onFailure), onFailure);
1321+
}, "start snapshot clone", () -> repositoriesService.repository(repositoryName), onFailure), onFailure);
13141322
}
13151323

13161324
private final Set<RepositoryShardId> currentlyCloning = Collections.synchronizedSet(new HashSet<>());
@@ -2639,7 +2647,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
26392647
public TimeValue timeout() {
26402648
return request.clusterManagerNodeTimeout();
26412649
}
2642-
}, "delete snapshot", listener::onFailure);
2650+
}, "delete snapshot", () -> repositoriesService.repository(repoName), listener::onFailure);
26432651
}
26442652

26452653
private static List<SnapshotId> matchingSnapshotIds(

server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java

+7
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@
9595
import java.util.concurrent.atomic.AtomicBoolean;
9696
import java.util.function.Consumer;
9797
import java.util.function.Function;
98+
import java.util.function.Supplier;
9899

99100
import static org.opensearch.repositories.blobstore.BlobStoreRepository.SYSTEM_REPOSITORY_SETTING;
100101
import static org.hamcrest.Matchers.equalTo;
@@ -815,6 +816,7 @@ public void updateState(final ClusterState state) {}
815816
public void executeConsistentStateUpdate(
816817
Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
817818
String source,
819+
Supplier<Repository> currentRepositeSupplier,
818820
Consumer<Exception> onFailure
819821
) {}
820822

@@ -841,6 +843,11 @@ public void cloneRemoteStoreIndexShardSnapshot(
841843

842844
}
843845

846+
@Override
847+
public boolean isOpen() {
848+
return isClosed == false;
849+
}
850+
844851
@Override
845852
public Lifecycle.State lifecycleState() {
846853
return null;

test/framework/src/main/java/org/opensearch/index/shard/RestoreOnlyRepository.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import java.util.Map;
6262
import java.util.function.Consumer;
6363
import java.util.function.Function;
64+
import java.util.function.Supplier;
6465

6566
import static java.util.Collections.emptyList;
6667
import static org.opensearch.repositories.RepositoryData.EMPTY_REPO_GEN;
@@ -73,14 +74,18 @@ public RestoreOnlyRepository(String indexName) {
7374
this.indexName = indexName;
7475
}
7576

77+
private volatile boolean closed;
78+
7679
@Override
7780
protected void doStart() {}
7881

7982
@Override
8083
protected void doStop() {}
8184

8285
@Override
83-
protected void doClose() {}
86+
protected void doClose() {
87+
closed = true;
88+
}
8489

8590
@Override
8691
public RepositoryMetadata getMetadata() {
@@ -222,6 +227,7 @@ public void updateState(final ClusterState state) {}
222227
public void executeConsistentStateUpdate(
223228
Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
224229
String source,
230+
Supplier<Repository> currentRepositeSupplier,
225231
Consumer<Exception> onFailure
226232
) {
227233
throw new UnsupportedOperationException("Unsupported for restore-only repository");
@@ -249,4 +255,9 @@ public void cloneRemoteStoreIndexShardSnapshot(
249255
) {
250256
throw new UnsupportedOperationException("Unsupported for restore-only repository");
251257
}
258+
259+
@Override
260+
public boolean isOpen() {
261+
return closed == false;
262+
}
252263
}

0 commit comments

Comments
 (0)