Skip to content

Commit dc11eee

Browse files
committed
Introduce mapping transformer to allow transform mappings during index create/update or index template create/update
Signed-off-by: Bo Zhang <[email protected]>
1 parent 73669fe commit dc11eee

20 files changed

+1010
-67
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
88
- Change priority for scheduling reroute during timeout([#16445](https://github.com/opensearch-project/OpenSearch/pull/16445))
99
- Renaming the node role search to warm ([#17573](https://github.com/opensearch-project/OpenSearch/pull/17573))
1010
- Introduce a new search node role to hold search only shards ([#17620](https://github.com/opensearch-project/OpenSearch/pull/17620))
11+
- Introduce mapping transformer to allow transform mappings during index create/update or index template create/update ([#17635](https://github.com/opensearch-project/OpenSearch/pull/17635))
1112

1213
### Dependencies
1314
- Bump `ch.qos.logback:logback-core` from 1.5.16 to 1.5.17 ([#17609](https://github.com/opensearch-project/OpenSearch/pull/17609))

server/src/main/java/org/opensearch/action/admin/indices/create/TransportCreateIndexAction.java

+31-20
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.opensearch.common.inject.Inject;
4444
import org.opensearch.core.action.ActionListener;
4545
import org.opensearch.core.common.io.stream.StreamInput;
46+
import org.opensearch.index.mapper.MappingTransformerRegistry;
4647
import org.opensearch.threadpool.ThreadPool;
4748
import org.opensearch.transport.TransportService;
4849

@@ -56,6 +57,7 @@
5657
public class TransportCreateIndexAction extends TransportClusterManagerNodeAction<CreateIndexRequest, CreateIndexResponse> {
5758

5859
private final MetadataCreateIndexService createIndexService;
60+
private final MappingTransformerRegistry mappingTransformerRegistry;
5961

6062
@Inject
6163
public TransportCreateIndexAction(
@@ -64,7 +66,9 @@ public TransportCreateIndexAction(
6466
ThreadPool threadPool,
6567
MetadataCreateIndexService createIndexService,
6668
ActionFilters actionFilters,
67-
IndexNameExpressionResolver indexNameExpressionResolver
69+
IndexNameExpressionResolver indexNameExpressionResolver,
70+
MappingTransformerRegistry mappingTransformerRegistry
71+
6872
) {
6973
super(
7074
CreateIndexAction.NAME,
@@ -76,6 +80,7 @@ public TransportCreateIndexAction(
7680
indexNameExpressionResolver
7781
);
7882
this.createIndexService = createIndexService;
83+
this.mappingTransformerRegistry = mappingTransformerRegistry;
7984
}
8085

8186
@Override
@@ -112,25 +117,31 @@ protected void clusterManagerOperation(
112117
}
113118

114119
final String indexName = indexNameExpressionResolver.resolveDateMathExpression(request.index());
115-
final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(
116-
cause,
117-
indexName,
118-
request.index()
119-
).ackTimeout(request.timeout())
120-
.clusterManagerNodeTimeout(request.clusterManagerNodeTimeout())
121-
.settings(request.settings())
122-
.mappings(request.mappings())
123-
.aliases(request.aliases())
124-
.context(request.context())
125-
.waitForActiveShards(request.waitForActiveShards());
126-
127-
createIndexService.createIndex(
128-
updateRequest,
129-
ActionListener.map(
130-
listener,
131-
response -> new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)
132-
)
133-
);
120+
121+
final String finalCause = cause;
122+
final ActionListener<String> mappingTransformListener = ActionListener.wrap(transformedMappings -> {
123+
final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(
124+
finalCause,
125+
indexName,
126+
request.index()
127+
).ackTimeout(request.timeout())
128+
.clusterManagerNodeTimeout(request.clusterManagerNodeTimeout())
129+
.settings(request.settings())
130+
.mappings(transformedMappings)
131+
.aliases(request.aliases())
132+
.context(request.context())
133+
.waitForActiveShards(request.waitForActiveShards());
134+
135+
createIndexService.createIndex(
136+
updateRequest,
137+
ActionListener.map(
138+
listener,
139+
response -> new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)
140+
)
141+
);
142+
}, e -> { throw (RuntimeException) e; });
143+
144+
mappingTransformerRegistry.applyTransformers(request.mappings(), null, mappingTransformListener);
134145
}
135146

136147
}

server/src/main/java/org/opensearch/action/admin/indices/mapping/put/TransportPutMappingAction.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,9 @@
5050
import org.opensearch.core.action.ActionListener;
5151
import org.opensearch.core.common.io.stream.StreamInput;
5252
import org.opensearch.core.index.Index;
53+
import org.opensearch.core.xcontent.MediaTypeRegistry;
5354
import org.opensearch.index.IndexNotFoundException;
55+
import org.opensearch.index.mapper.MappingTransformerRegistry;
5456
import org.opensearch.threadpool.ThreadPool;
5557
import org.opensearch.transport.TransportService;
5658

@@ -72,6 +74,7 @@ public class TransportPutMappingAction extends TransportClusterManagerNodeAction
7274

7375
private final MetadataMappingService metadataMappingService;
7476
private final RequestValidators<PutMappingRequest> requestValidators;
77+
private final MappingTransformerRegistry mappingTransformerRegistry;
7578

7679
@Inject
7780
public TransportPutMappingAction(
@@ -81,7 +84,8 @@ public TransportPutMappingAction(
8184
final MetadataMappingService metadataMappingService,
8285
final ActionFilters actionFilters,
8386
final IndexNameExpressionResolver indexNameExpressionResolver,
84-
final RequestValidators<PutMappingRequest> requestValidators
87+
final RequestValidators<PutMappingRequest> requestValidators,
88+
MappingTransformerRegistry mappingTransformerRegistry
8589
) {
8690
super(
8791
PutMappingAction.NAME,
@@ -94,6 +98,7 @@ public TransportPutMappingAction(
9498
);
9599
this.metadataMappingService = metadataMappingService;
96100
this.requestValidators = Objects.requireNonNull(requestValidators);
101+
this.mappingTransformerRegistry = mappingTransformerRegistry;
97102
}
98103

99104
@Override
@@ -132,7 +137,13 @@ protected void clusterManagerOperation(
132137
listener.onFailure(maybeValidationException.get());
133138
return;
134139
}
135-
performMappingUpdate(concreteIndices, request, listener, metadataMappingService);
140+
141+
final ActionListener<String> mappingTransformListener = ActionListener.wrap(transformedMapping -> {
142+
request.source(transformedMapping, MediaTypeRegistry.JSON);
143+
performMappingUpdate(concreteIndices, request, listener, metadataMappingService);
144+
}, e -> { throw (RuntimeException) e; });
145+
146+
mappingTransformerRegistry.applyTransformers(request.source(), null, mappingTransformListener);
136147
} catch (IndexNotFoundException ex) {
137148
logger.debug(() -> new ParameterizedMessage("failed to put mappings on indices [{}]", Arrays.asList(request.indices())), ex);
138149
throw ex;

server/src/main/java/org/opensearch/action/admin/indices/template/put/TransportPutComponentTemplateAction.java

+41-9
Original file line numberDiff line numberDiff line change
@@ -44,16 +44,20 @@
4444
import org.opensearch.cluster.metadata.MetadataIndexTemplateService;
4545
import org.opensearch.cluster.metadata.Template;
4646
import org.opensearch.cluster.service.ClusterService;
47+
import org.opensearch.common.compress.CompressedXContent;
4748
import org.opensearch.common.inject.Inject;
4849
import org.opensearch.common.settings.IndexScopedSettings;
4950
import org.opensearch.common.settings.Settings;
5051
import org.opensearch.core.action.ActionListener;
5152
import org.opensearch.core.common.io.stream.StreamInput;
53+
import org.opensearch.index.mapper.MappingTransformerRegistry;
5254
import org.opensearch.threadpool.ThreadPool;
5355
import org.opensearch.transport.TransportService;
5456

5557
import java.io.IOException;
5658

59+
import reactor.util.annotation.NonNull;
60+
5761
/**
5862
* An action for putting a single component template into the cluster state
5963
*
@@ -65,6 +69,7 @@ public class TransportPutComponentTemplateAction extends TransportClusterManager
6569

6670
private final MetadataIndexTemplateService indexTemplateService;
6771
private final IndexScopedSettings indexScopedSettings;
72+
private final MappingTransformerRegistry mappingTransformerRegistry;
6873

6974
@Inject
7075
public TransportPutComponentTemplateAction(
@@ -74,7 +79,8 @@ public TransportPutComponentTemplateAction(
7479
MetadataIndexTemplateService indexTemplateService,
7580
ActionFilters actionFilters,
7681
IndexNameExpressionResolver indexNameExpressionResolver,
77-
IndexScopedSettings indexScopedSettings
82+
IndexScopedSettings indexScopedSettings,
83+
MappingTransformerRegistry mappingTransformerRegistry
7884
) {
7985
super(
8086
PutComponentTemplateAction.NAME,
@@ -87,6 +93,7 @@ public TransportPutComponentTemplateAction(
8793
);
8894
this.indexTemplateService = indexTemplateService;
8995
this.indexScopedSettings = indexScopedSettings;
96+
this.mappingTransformerRegistry = mappingTransformerRegistry;
9097
}
9198

9299
@Override
@@ -121,13 +128,38 @@ protected void clusterManagerOperation(
121128
template = new Template(settings, template.mappings(), template.aliases());
122129
componentTemplate = new ComponentTemplate(template, componentTemplate.version(), componentTemplate.metadata());
123130
}
124-
indexTemplateService.putComponentTemplate(
125-
request.cause(),
126-
request.create(),
127-
request.name(),
128-
request.clusterManagerNodeTimeout(),
129-
componentTemplate,
130-
listener
131-
);
131+
132+
final ActionListener<String> mappingTransformListener = getMappingTransformListener(request, listener, componentTemplate);
133+
134+
transformMapping(template, mappingTransformListener);
135+
136+
}
137+
138+
private ActionListener<String> getMappingTransformListener(
139+
@NonNull final PutComponentTemplateAction.Request request,
140+
@NonNull final ActionListener<AcknowledgedResponse> listener,
141+
@NonNull final ComponentTemplate componentTemplate
142+
) {
143+
return ActionListener.wrap(transformedMappings -> {
144+
if (transformedMappings != null && componentTemplate.template() != null) {
145+
componentTemplate.template().setMappings(new CompressedXContent(transformedMappings));
146+
}
147+
indexTemplateService.putComponentTemplate(
148+
request.cause(),
149+
request.create(),
150+
request.name(),
151+
request.clusterManagerNodeTimeout(),
152+
componentTemplate,
153+
listener
154+
);
155+
}, e -> { throw (RuntimeException) e; });
156+
}
157+
158+
private void transformMapping(final Template template, @NonNull final ActionListener<String> mappingTransformListener) {
159+
if (template == null || template.mappings() == null) {
160+
mappingTransformListener.onResponse(null);
161+
} else {
162+
mappingTransformerRegistry.applyTransformers(template.mappings().string(), null, mappingTransformListener);
163+
}
132164
}
133165
}

server/src/main/java/org/opensearch/action/admin/indices/template/put/TransportPutComposableIndexTemplateAction.java

+38-10
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,20 @@
4141
import org.opensearch.cluster.metadata.ComposableIndexTemplate;
4242
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
4343
import org.opensearch.cluster.metadata.MetadataIndexTemplateService;
44+
import org.opensearch.cluster.metadata.Template;
4445
import org.opensearch.cluster.service.ClusterService;
46+
import org.opensearch.common.compress.CompressedXContent;
4547
import org.opensearch.common.inject.Inject;
4648
import org.opensearch.core.action.ActionListener;
4749
import org.opensearch.core.common.io.stream.StreamInput;
50+
import org.opensearch.index.mapper.MappingTransformerRegistry;
4851
import org.opensearch.threadpool.ThreadPool;
4952
import org.opensearch.transport.TransportService;
5053

5154
import java.io.IOException;
5255

56+
import reactor.util.annotation.NonNull;
57+
5358
/**
5459
* An action for putting a composable index template into the cluster state
5560
*
@@ -60,6 +65,7 @@ public class TransportPutComposableIndexTemplateAction extends TransportClusterM
6065
AcknowledgedResponse> {
6166

6267
private final MetadataIndexTemplateService indexTemplateService;
68+
private final MappingTransformerRegistry mappingTransformerRegistry;
6369

6470
@Inject
6571
public TransportPutComposableIndexTemplateAction(
@@ -68,7 +74,8 @@ public TransportPutComposableIndexTemplateAction(
6874
ThreadPool threadPool,
6975
MetadataIndexTemplateService indexTemplateService,
7076
ActionFilters actionFilters,
71-
IndexNameExpressionResolver indexNameExpressionResolver
77+
IndexNameExpressionResolver indexNameExpressionResolver,
78+
MappingTransformerRegistry mappingTransformerRegistry
7279
) {
7380
super(
7481
PutComposableIndexTemplateAction.NAME,
@@ -80,6 +87,7 @@ public TransportPutComposableIndexTemplateAction(
8087
indexNameExpressionResolver
8188
);
8289
this.indexTemplateService = indexTemplateService;
90+
this.mappingTransformerRegistry = mappingTransformerRegistry;
8391
}
8492

8593
@Override
@@ -103,15 +111,35 @@ protected void clusterManagerOperation(
103111
final PutComposableIndexTemplateAction.Request request,
104112
final ClusterState state,
105113
final ActionListener<AcknowledgedResponse> listener
114+
) throws IOException {
115+
final ComposableIndexTemplate indexTemplate = request.indexTemplate();
116+
117+
final ActionListener<String> mappingTransformListener = ActionListener.wrap(transformedMappings -> {
118+
if (transformedMappings != null && indexTemplate.template() != null) {
119+
indexTemplate.template().setMappings(new CompressedXContent(transformedMappings));
120+
}
121+
indexTemplateService.putIndexTemplateV2(
122+
request.cause(),
123+
request.create(),
124+
request.name(),
125+
request.clusterManagerNodeTimeout(),
126+
indexTemplate,
127+
listener
128+
);
129+
}, e -> { throw (RuntimeException) e; });
130+
131+
transformMapping(indexTemplate, mappingTransformListener);
132+
}
133+
134+
private void transformMapping(
135+
@NonNull final ComposableIndexTemplate indexTemplate,
136+
@NonNull final ActionListener<String> mappingTransformListener
106137
) {
107-
ComposableIndexTemplate indexTemplate = request.indexTemplate();
108-
indexTemplateService.putIndexTemplateV2(
109-
request.cause(),
110-
request.create(),
111-
request.name(),
112-
request.clusterManagerNodeTimeout(),
113-
indexTemplate,
114-
listener
115-
);
138+
final Template template = indexTemplate.template();
139+
if (template == null || template.mappings() == null) {
140+
mappingTransformListener.onResponse(null);
141+
} else {
142+
mappingTransformerRegistry.applyTransformers(template.mappings().string(), null, mappingTransformListener);
143+
}
116144
}
117145
}

0 commit comments

Comments
 (0)