Skip to content

Commit ecf9adf

Browse files
[main] System data streams are not being upgraded in the feature migration API (elastic#126409)
This commit adds support for system data streams reindexing. The system data stream migration extends the existing system indices migration task and uses the data stream reindex API. The system index migration task starts a reindex data stream task and tracks its status every second. Only one system index or system data stream is migrated at a time. If a data stream migration fails, the entire system index migration task will also fail. Port of elastic#123926
1 parent 728eb75 commit ecf9adf

File tree

55 files changed

+1634
-285
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+1634
-285
lines changed

docs/changelog/126409.yaml

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 126409
2+
summary: System data streams are not being upgraded in the feature migration API
3+
area: Infra/Core
4+
type: bug
5+
issues:
6+
- 122949

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamIT.java

+1
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
326326
.build(),
327327
Map.of(),
328328
List.of("product"),
329+
"product",
329330
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
330331
)
331332
);

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java

+1
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
275275
.build(),
276276
Map.of(),
277277
Collections.singletonList("test"),
278+
"test",
278279
new ExecutorNames(ThreadPool.Names.SYSTEM_CRITICAL_READ, ThreadPool.Names.SYSTEM_READ, ThreadPool.Names.SYSTEM_WRITE)
279280
)
280281
);

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/CrudSystemDataStreamLifecycleIT.java

+1
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
213213
.build(),
214214
Map.of(),
215215
List.of("product"),
216+
"product",
216217
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
217218
)
218219
);

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java

+1
Original file line numberDiff line numberDiff line change
@@ -1094,6 +1094,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
10941094
.build(),
10951095
Map.of(),
10961096
List.of(),
1097+
"test",
10971098
ExecutorNames.DEFAULT_SYSTEM_INDEX_THREAD_POOLS
10981099
)
10991100
);

server/src/internalClusterTest/java/org/elasticsearch/snapshots/SystemResourceSnapshotIT.java

+4
Original file line numberDiff line numberDiff line change
@@ -1153,6 +1153,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
11531153
.build(),
11541154
Map.of(),
11551155
List.of("product"),
1156+
"product",
11561157
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
11571158
)
11581159
);
@@ -1192,6 +1193,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
11921193
.build(),
11931194
Map.of(),
11941195
List.of("product"),
1196+
"product",
11951197
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
11961198
)
11971199
);
@@ -1231,6 +1233,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
12311233
.build(),
12321234
Map.of(),
12331235
List.of("product"),
1236+
"product",
12341237
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
12351238
)
12361239
);
@@ -1299,6 +1302,7 @@ public Collection<SystemDataStreamDescriptor> getSystemDataStreamDescriptors() {
12991302
.build(),
13001303
Map.of(),
13011304
List.of("product"),
1305+
"product",
13021306
ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS
13031307
)
13041308
);

server/src/main/java/module-info.java

+1
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@
282282
exports org.elasticsearch.indices.recovery;
283283
exports org.elasticsearch.indices.recovery.plan;
284284
exports org.elasticsearch.indices.store;
285+
exports org.elasticsearch.indices.system;
285286
exports org.elasticsearch.inference;
286287
exports org.elasticsearch.ingest;
287288
exports org.elasticsearch.internal

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java

+1
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,7 @@ private static void addBackingIndex(
382382
mapperSupplier,
383383
false,
384384
failureStore,
385+
dataStream.isSystem(),
385386
nodeSettings
386387
);
387388
} catch (IOException e) {

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataMigrateToDataStreamService.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ static ClusterState migrateToDataStream(
156156
ProjectMetadata.Builder mb = ProjectMetadata.builder(project);
157157
for (Index index : alias.getIndices()) {
158158
IndexMetadata im = project.index(index);
159-
prepareBackingIndex(mb, im, request.aliasName, mapperSupplier, true, false, Settings.EMPTY);
159+
prepareBackingIndex(mb, im, request.aliasName, mapperSupplier, true, false, false, Settings.EMPTY);
160160
}
161161
ClusterState updatedState = ClusterState.builder(projectState.cluster()).putProjectMetadata(mb).build();
162162

@@ -212,6 +212,8 @@ static void validateRequest(ProjectMetadata project, MigrateToDataStreamClusterS
212212
* exception should be thrown in that case instead
213213
* @param failureStore <code>true</code> if the index is being migrated into the data stream's failure store, <code>false</code> if it
214214
* is being migrated into the data stream's backing indices
215+
* @param makeSystem <code>true</code> if the index is being migrated into the system data stream, <code>false</code> if it
216+
* is being migrated into non-system data stream
215217
* @param nodeSettings The settings for the current node
216218
*/
217219
static void prepareBackingIndex(
@@ -221,6 +223,7 @@ static void prepareBackingIndex(
221223
Function<IndexMetadata, MapperService> mapperSupplier,
222224
boolean removeAlias,
223225
boolean failureStore,
226+
boolean makeSystem,
224227
Settings nodeSettings
225228
) throws IOException {
226229
MappingMetadata mm = im.mapping();
@@ -251,6 +254,7 @@ static void prepareBackingIndex(
251254
imb.mappingVersion(im.getMappingVersion() + 1)
252255
.mappingsUpdatedVersion(IndexVersion.current())
253256
.putMapping(new MappingMetadata(mapper));
257+
imb.system(makeSystem);
254258
b.put(imb);
255259
}
256260

server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java

+4
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,10 @@ private List<IndexMetadata> updateIndices(ClusterState currentState, List<Index>
241241
final List<IndexMetadata> updatedMetadata = new ArrayList<>();
242242
for (Index index : indices) {
243243
IndexMetadata indexMetadata = metadata.indexMetadata(index);
244+
// this might happen because update is async and the index might have been deleted between task creation and execution
245+
if (indexMetadata == null) {
246+
continue;
247+
}
244248
final boolean shouldBeSystem = shouldBeSystem(indexMetadata);
245249
IndexMetadata updatedIndexMetadata = updateIndexIfNecessary(indexMetadata, shouldBeSystem);
246250
if (updatedIndexMetadata != null) {

server/src/main/java/org/elasticsearch/cluster/routing/allocation/shards/ShardsAvailabilityHealthIndicatorService.java

+4-49
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@
7878
import java.util.stream.Stream;
7979

8080
import static java.util.stream.Collectors.joining;
81-
import static java.util.stream.Collectors.toMap;
8281
import static java.util.stream.Collectors.toSet;
8382
import static org.elasticsearch.cluster.health.ClusterShardHealth.getInactivePrimaryHealth;
8483
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_INCLUDE_GROUP_PREFIX;
@@ -1152,7 +1151,8 @@ public List<HealthIndicatorImpact> getImpacts() {
11521151

11531152
/**
11541153
* Returns the diagnosis for unassigned primary and replica shards.
1155-
* @param verbose true if the diagnosis should be generated, false if they should be omitted.
1154+
*
1155+
* @param verbose true if the diagnosis should be generated, false if they should be omitted.
11561156
* @param maxAffectedResourcesCount the max number of affected resources to be returned as part of the diagnosis
11571157
* @return The diagnoses list the indicator identified. Alternatively, an empty list if none were found or verbose is false.
11581158
*/
@@ -1243,23 +1243,6 @@ static List<Diagnosis.Resource> getRestoreFromSnapshotAffectedResources(
12431243
}
12441244
}
12451245

1246-
Map<String, Set<ProjectIndexName>> featureToDsBackingIndices = getSystemDsBackingIndicesForProjects(
1247-
systemIndices,
1248-
affectedProjects,
1249-
metadata
1250-
);
1251-
1252-
// the shards_availability indicator works with indices so let's remove the feature states data streams backing indices from
1253-
// the list of affected indices (the feature state will cover the restore of these indices too)
1254-
for (Map.Entry<String, Set<ProjectIndexName>> featureToBackingIndices : featureToDsBackingIndices.entrySet()) {
1255-
for (ProjectIndexName featureIndex : featureToBackingIndices.getValue()) {
1256-
if (restoreFromSnapshotIndices.contains(featureIndex)) {
1257-
affectedFeatureStates.add(featureToBackingIndices.getKey());
1258-
affectedIndices.remove(featureIndex);
1259-
}
1260-
}
1261-
}
1262-
12631246
if (affectedIndices.isEmpty() == false) {
12641247
affectedResources.add(
12651248
new Diagnosis.Resource(
@@ -1281,7 +1264,7 @@ static List<Diagnosis.Resource> getRestoreFromSnapshotAffectedResources(
12811264
}
12821265

12831266
/**
1284-
* Retrieve the system indices for the projects and group them by Feature
1267+
* Retrieve the system indices and indices backing system data streams for the projects and group them by Feature
12851268
*/
12861269
private static Map<String, Set<ProjectIndexName>> getSystemIndicesForProjects(
12871270
SystemIndices systemIndices,
@@ -1293,7 +1276,7 @@ private static Map<String, Set<ProjectIndexName>> getSystemIndicesForProjects(
12931276
.collect(
12941277
Collectors.toMap(
12951278
SystemIndices.Feature::getName,
1296-
feature -> feature.getIndexDescriptors()
1279+
feature -> feature.getSystemResourceDescriptors()
12971280
.stream()
12981281
.flatMap(
12991282
descriptor -> projects.stream()
@@ -1307,34 +1290,6 @@ private static Map<String, Set<ProjectIndexName>> getSystemIndicesForProjects(
13071290
)
13081291
);
13091292
}
1310-
1311-
/**
1312-
* Retrieve the backing indices for system data stream for the projects and group them by Feature
1313-
*/
1314-
private static Map<String, Set<ProjectIndexName>> getSystemDsBackingIndicesForProjects(
1315-
SystemIndices systemIndices,
1316-
Set<ProjectId> projects,
1317-
Metadata metadata
1318-
) {
1319-
return systemIndices.getFeatures()
1320-
.stream()
1321-
.collect(
1322-
toMap(
1323-
SystemIndices.Feature::getName,
1324-
feature -> feature.getDataStreamDescriptors()
1325-
.stream()
1326-
.flatMap(
1327-
descriptor -> projects.stream()
1328-
.flatMap(
1329-
projectId -> descriptor.getBackingIndexNames(metadata.getProject(projectId))
1330-
.stream()
1331-
.map(index -> new ProjectIndexName(projectId, index))
1332-
)
1333-
)
1334-
.collect(Collectors.toSet())
1335-
)
1336-
);
1337-
}
13381293
}
13391294

13401295
public static class SearchableSnapshotsState {

server/src/main/java/org/elasticsearch/indices/AssociatedIndexDescriptor.java

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.lucene.util.automaton.CharacterRunAutomaton;
1414
import org.apache.lucene.util.automaton.RegExp;
1515
import org.elasticsearch.cluster.metadata.ProjectMetadata;
16+
import org.elasticsearch.indices.system.IndexPatternMatcher;
1617

1718
import java.util.List;
1819
import java.util.Objects;

server/src/main/java/org/elasticsearch/indices/IndexPatternMatcher.java renamed to server/src/main/java/org/elasticsearch/indices/IndexMatcher.java

+4-12
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,15 @@
1515
import java.util.List;
1616

1717
/**
18-
* An IndexPatternMatcher holds an index pattern in a string and, given a
19-
* {@link Metadata} object, can return a list of index names matching that pattern.
18+
* An IndexMatcher given a {@link Metadata} object, can return a list of index names matching that pattern.
2019
*/
21-
public interface IndexPatternMatcher {
22-
/**
23-
* @return A pattern, either with a wildcard or simple regex, describing indices that are
24-
* related to a system feature. Such indices may be system indices or associated
25-
* indices.
26-
*/
27-
String getIndexPattern();
28-
20+
public interface IndexMatcher {
2921
/**
3022
* Retrieves a list of all indices which match this descriptor's pattern. Implementations
3123
* may include other special information when matching indices, such as aliases.
32-
*
24+
* <p>
3325
* This cannot be done via {@link org.elasticsearch.cluster.metadata.IndexNameExpressionResolver} because that class can only handle
34-
* simple wildcard expressions, but system index name patterns may use full Lucene regular expression syntax,
26+
* simple wildcard expressions, but system index name patterns may use full Lucene regular expression syntax.
3527
*
3628
* @param project The current metadata to get the list of matching indices from
3729
* @return A list of index names that match this descriptor

server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java

+25-2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.cluster.metadata.Metadata;
1616
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1717
import org.elasticsearch.index.Index;
18+
import org.elasticsearch.indices.system.SystemResourceDescriptor;
1819

1920
import java.util.Collections;
2021
import java.util.List;
@@ -45,14 +46,15 @@
4546
* <p>The descriptor also provides names for the thread pools that Elasticsearch should use to read, search, or modify the descriptor’s
4647
* indices.
4748
*/
48-
public class SystemDataStreamDescriptor {
49+
public class SystemDataStreamDescriptor implements SystemResourceDescriptor {
4950

5051
private final String dataStreamName;
5152
private final String description;
5253
private final Type type;
5354
private final ComposableIndexTemplate composableIndexTemplate;
5455
private final Map<String, ComponentTemplate> componentTemplates;
5556
private final List<String> allowedElasticProductOrigins;
57+
private final String origin;
5658
private final ExecutorNames executorNames;
5759

5860
/**
@@ -66,6 +68,7 @@ public class SystemDataStreamDescriptor {
6668
* {@link ComposableIndexTemplate}
6769
* @param allowedElasticProductOrigins a list of product origin values that are allowed to access this data stream if the
6870
* type is {@link Type#EXTERNAL}. Must not be {@code null}
71+
* @param origin specifies the origin to use when creating or updating the data stream
6972
* @param executorNames thread pools that should be used for operations on the system data stream
7073
*/
7174
public SystemDataStreamDescriptor(
@@ -75,6 +78,7 @@ public SystemDataStreamDescriptor(
7578
ComposableIndexTemplate composableIndexTemplate,
7679
Map<String, ComponentTemplate> componentTemplates,
7780
List<String> allowedElasticProductOrigins,
81+
String origin,
7882
ExecutorNames executorNames
7983
) {
8084
this.dataStreamName = Objects.requireNonNull(dataStreamName, "dataStreamName must be specified");
@@ -96,6 +100,7 @@ public SystemDataStreamDescriptor(
96100
throw new IllegalArgumentException("External system data stream without allowed products is not a valid combination");
97101
}
98102
this.executorNames = Objects.nonNull(executorNames) ? executorNames : ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS;
103+
this.origin = origin;
99104
}
100105

101106
public String getDataStreamName() {
@@ -125,6 +130,11 @@ public List<String> getBackingIndexNames(ProjectMetadata projectMetadata) {
125130
return Stream.concat(dataStream.getIndices().stream(), dataStream.getFailureIndices().stream()).map(Index::getName).toList();
126131
}
127132

133+
@Override
134+
public List<String> getMatchingIndices(ProjectMetadata metadata) {
135+
return getBackingIndexNames(metadata);
136+
}
137+
128138
public String getDescription() {
129139
return description;
130140
}
@@ -133,6 +143,17 @@ public ComposableIndexTemplate getComposableIndexTemplate() {
133143
return composableIndexTemplate;
134144
}
135145

146+
@Override
147+
public String getOrigin() {
148+
return origin;
149+
}
150+
151+
@Override
152+
public boolean isAutomaticallyManaged() {
153+
return true;
154+
}
155+
156+
@Override
136157
public boolean isExternal() {
137158
return type == Type.EXTERNAL;
138159
}
@@ -142,9 +163,10 @@ public String getBackingIndexPattern() {
142163
}
143164

144165
private static String backingIndexPatternForDataStream(String dataStream) {
145-
return DataStream.BACKING_INDEX_PREFIX + dataStream + "-*";
166+
return ".(migrated-){0,}[fd]s-" + dataStream + "-*";
146167
}
147168

169+
@Override
148170
public List<String> getAllowedElasticProductOrigins() {
149171
return allowedElasticProductOrigins;
150172
}
@@ -157,6 +179,7 @@ public Map<String, ComponentTemplate> getComponentTemplates() {
157179
* Get the names of the thread pools that should be used for operations on this data stream.
158180
* @return Names for get, search, and write executors.
159181
*/
182+
@Override
160183
public ExecutorNames getThreadPoolNames() {
161184
return this.executorNames;
162185
}

0 commit comments

Comments
 (0)