@@ -1047,36 +1047,42 @@ void runDelete(SnapshotDeleteListener listener) {
10471047 }
10481048
10491049 private void runWithUniqueShardMetadataNaming (SnapshotDeleteListener listener ) {
1050- // First write the new shard state metadata (with the removed snapshot) and compute deletion targets
1051- final ListenableFuture <Void > writeShardMetaDataAndComputeDeletesStep = new ListenableFuture <>();
1052- writeUpdatedShardMetadataAndComputeDeletes (writeShardMetaDataAndComputeDeletesStep );
1053- // Once we have put the new shard-level metadata into place, we can update the repository metadata as follows:
1054- // 1. Remove the snapshots from the list of existing snapshots
1055- // 2. Update the index shard generations of all updated shard folders
1056- //
1057- // Note: If we fail updating any of the individual shard paths, none of them are changed since the newly created
1058- // index-${gen_uuid} will not be referenced by the existing RepositoryData and new RepositoryData is only
1059- // written if all shard paths have been successfully updated.
1060- final ListenableFuture <RepositoryData > writeUpdatedRepoDataStep = new ListenableFuture <>();
1061- writeShardMetaDataAndComputeDeletesStep .addListener (ActionListener .wrap (ignored -> {
1062- final ShardGenerations .Builder builder = ShardGenerations .builder ();
1063- for (ShardSnapshotMetaDeleteResult newGen : shardDeleteResults ) {
1064- builder .put (newGen .indexId , newGen .shardId , newGen .newGeneration );
1065- }
1066- updateRepositoryData (
1067- originalRepositoryData .removeSnapshots (snapshotIds , builder .build ()),
1068- ActionListener .wrap (writeUpdatedRepoDataStep ::onResponse , listener ::onFailure )
1050+ SubscribableListener
1051+
1052+ // First write the new shard state metadata (without the removed snapshots) and compute deletion targets
1053+ .newForked (this ::writeUpdatedShardMetadataAndComputeDeletes )
1054+
1055+ .<RepositoryData >andThen ((l , ignored ) -> {
1056+ // Once we have put the new shard-level metadata into place, we can update the repository metadata as follows:
1057+ // 1. Remove the snapshots from the list of existing snapshots
1058+ // 2. Update the index shard generations of all updated shard folders
1059+ //
1060+ // Note: If we fail updating any of the individual shard paths, it may not have a new BlobStoreIndexShardSnapshots blob,
1061+ // but if it does then that doesn't matter, we're naming them uniquely as index-{$shard-generation-uuid} and it won't be
1062+ // referenced by the new RepositoryData and will be cleaned up by a subsequent delete.
1063+ //
1064+ // TODO should we even write the new RepositoryData unless all shard paths have been successfully updated? See #100569.
1065+ final ShardGenerations .Builder builder = ShardGenerations .builder ();
1066+ for (ShardSnapshotMetaDeleteResult newGen : shardDeleteResults ) {
1067+ builder .put (newGen .indexId , newGen .shardId , newGen .newGeneration );
1068+ }
1069+ updateRepositoryData (originalRepositoryData .removeSnapshots (snapshotIds , builder .build ()), l );
1070+ })
1071+
1072+ .addListener (
1073+ ActionListener .wrap (
1074+ // Once we have updated the repository, run the clean-ups
1075+ newRepositoryData -> {
1076+ listener .onRepositoryDataWritten (newRepositoryData );
1077+ // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
1078+ try (var refs = new RefCountingRunnable (listener ::onDone )) {
1079+ cleanupUnlinkedRootAndIndicesBlobs (newRepositoryData , refs .acquireListener ());
1080+ cleanupUnlinkedShardLevelBlobs (shardDeleteResults , refs .acquireListener ());
1081+ }
1082+ },
1083+ listener ::onFailure
1084+ )
10691085 );
1070- }, listener ::onFailure ));
1071- // Once we have updated the repository, run the clean-ups
1072- writeUpdatedRepoDataStep .addListener (ActionListener .wrap (newRepositoryData -> {
1073- listener .onRepositoryDataWritten (newRepositoryData );
1074- // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
1075- try (var refs = new RefCountingRunnable (listener ::onDone )) {
1076- cleanupUnlinkedRootAndIndicesBlobs (newRepositoryData , refs .acquireListener ().map (ignored -> null ));
1077- cleanupUnlinkedShardLevelBlobs (shardDeleteResults , refs .acquireListener ());
1078- }
1079- }, listener ::onFailure ));
10801086 }
10811087
10821088 private void runWithLegacyNumericShardMetadataNaming (SnapshotDeleteListener listener ) {
@@ -1089,7 +1095,7 @@ private void runWithLegacyNumericShardMetadataNaming(SnapshotDeleteListener list
10891095 listener .onDone ();
10901096 })) {
10911097 // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
1092- cleanupUnlinkedRootAndIndicesBlobs (newRepositoryData , refs .acquireListener (). map ( ignored -> null ) );
1098+ cleanupUnlinkedRootAndIndicesBlobs (newRepositoryData , refs .acquireListener ());
10931099
10941100 // writeIndexGen finishes on master-service thread so must fork here.
10951101 snapshotExecutor .execute (
0 commit comments