Skip to content

Commit afcad68

Browse files
committed
Introduce mapping transformer
Signed-off-by: Bo Zhang <[email protected]>
1 parent 80cb033 commit afcad68

20 files changed

+1033
-69
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4242
- Disable scoring of keyword term search by default, fallback logic with new use_similarity:true parameter ([#17889](https://github.com/opensearch-project/OpenSearch/pull/17889))
4343
- Add versioning support in pull-based ingestion ([#17918](https://github.com/opensearch-project/OpenSearch/pull/17918))
4444
- Introducing MergedSegmentWarmerFactory to support the extension of IndexWriter.IndexReaderWarmer ([#17881](https://github.com/opensearch-project/OpenSearch/pull/17881))
45+
- Introduce mapping transformer to allow transform mappings during index create/update or index template create/update ([#17635](https://github.com/opensearch-project/OpenSearch/pull/17635))
4546

4647
### Changed
4748
- Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912))

server/src/main/java/org/opensearch/action/admin/indices/create/TransportCreateIndexAction.java

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.opensearch.common.inject.Inject;
4444
import org.opensearch.core.action.ActionListener;
4545
import org.opensearch.core.common.io.stream.StreamInput;
46+
import org.opensearch.index.mapper.MappingTransformerRegistry;
4647
import org.opensearch.threadpool.ThreadPool;
4748
import org.opensearch.transport.TransportService;
4849

@@ -56,6 +57,7 @@
5657
public class TransportCreateIndexAction extends TransportClusterManagerNodeAction<CreateIndexRequest, CreateIndexResponse> {
5758

5859
private final MetadataCreateIndexService createIndexService;
60+
private final MappingTransformerRegistry mappingTransformerRegistry;
5961

6062
@Inject
6163
public TransportCreateIndexAction(
@@ -64,7 +66,8 @@ public TransportCreateIndexAction(
6466
ThreadPool threadPool,
6567
MetadataCreateIndexService createIndexService,
6668
ActionFilters actionFilters,
67-
IndexNameExpressionResolver indexNameExpressionResolver
69+
IndexNameExpressionResolver indexNameExpressionResolver,
70+
MappingTransformerRegistry mappingTransformerRegistry
6871
) {
6972
super(
7073
CreateIndexAction.NAME,
@@ -76,6 +79,7 @@ public TransportCreateIndexAction(
7679
indexNameExpressionResolver
7780
);
7881
this.createIndexService = createIndexService;
82+
this.mappingTransformerRegistry = mappingTransformerRegistry;
7983
}
8084

8185
@Override
@@ -112,25 +116,31 @@ protected void clusterManagerOperation(
112116
}
113117

114118
final String indexName = indexNameExpressionResolver.resolveDateMathExpression(request.index());
115-
final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(
116-
cause,
117-
indexName,
118-
request.index()
119-
).ackTimeout(request.timeout())
120-
.clusterManagerNodeTimeout(request.clusterManagerNodeTimeout())
121-
.settings(request.settings())
122-
.mappings(request.mappings())
123-
.aliases(request.aliases())
124-
.context(request.context())
125-
.waitForActiveShards(request.waitForActiveShards());
126-
127-
createIndexService.createIndex(
128-
updateRequest,
129-
ActionListener.map(
130-
listener,
131-
response -> new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)
132-
)
133-
);
119+
120+
final String finalCause = cause;
121+
final ActionListener<String> mappingTransformListener = ActionListener.wrap(transformedMappings -> {
122+
final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(
123+
finalCause,
124+
indexName,
125+
request.index()
126+
).ackTimeout(request.timeout())
127+
.clusterManagerNodeTimeout(request.clusterManagerNodeTimeout())
128+
.settings(request.settings())
129+
.mappings(transformedMappings)
130+
.aliases(request.aliases())
131+
.context(request.context())
132+
.waitForActiveShards(request.waitForActiveShards());
133+
134+
createIndexService.createIndex(
135+
updateRequest,
136+
ActionListener.map(
137+
listener,
138+
response -> new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)
139+
)
140+
);
141+
}, listener::onFailure);
142+
143+
mappingTransformerRegistry.applyTransformers(request.mappings(), null, mappingTransformListener);
134144
}
135145

136146
}

server/src/main/java/org/opensearch/action/admin/indices/mapping/put/TransportPutMappingAction.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,9 @@
5050
import org.opensearch.core.action.ActionListener;
5151
import org.opensearch.core.common.io.stream.StreamInput;
5252
import org.opensearch.core.index.Index;
53+
import org.opensearch.core.xcontent.MediaTypeRegistry;
5354
import org.opensearch.index.IndexNotFoundException;
55+
import org.opensearch.index.mapper.MappingTransformerRegistry;
5456
import org.opensearch.threadpool.ThreadPool;
5557
import org.opensearch.transport.TransportService;
5658

@@ -72,6 +74,7 @@ public class TransportPutMappingAction extends TransportClusterManagerNodeAction
7274

7375
private final MetadataMappingService metadataMappingService;
7476
private final RequestValidators<PutMappingRequest> requestValidators;
77+
private final MappingTransformerRegistry mappingTransformerRegistry;
7578

7679
@Inject
7780
public TransportPutMappingAction(
@@ -81,7 +84,8 @@ public TransportPutMappingAction(
8184
final MetadataMappingService metadataMappingService,
8285
final ActionFilters actionFilters,
8386
final IndexNameExpressionResolver indexNameExpressionResolver,
84-
final RequestValidators<PutMappingRequest> requestValidators
87+
final RequestValidators<PutMappingRequest> requestValidators,
88+
final MappingTransformerRegistry mappingTransformerRegistry
8589
) {
8690
super(
8791
PutMappingAction.NAME,
@@ -94,6 +98,7 @@ public TransportPutMappingAction(
9498
);
9599
this.metadataMappingService = metadataMappingService;
96100
this.requestValidators = Objects.requireNonNull(requestValidators);
101+
this.mappingTransformerRegistry = mappingTransformerRegistry;
97102
}
98103

99104
@Override
@@ -132,7 +137,13 @@ protected void clusterManagerOperation(
132137
listener.onFailure(maybeValidationException.get());
133138
return;
134139
}
135-
performMappingUpdate(concreteIndices, request, listener, metadataMappingService);
140+
141+
final ActionListener<String> mappingTransformListener = ActionListener.wrap(transformedMapping -> {
142+
request.source(transformedMapping, MediaTypeRegistry.JSON);
143+
performMappingUpdate(concreteIndices, request, listener, metadataMappingService);
144+
}, listener::onFailure);
145+
146+
mappingTransformerRegistry.applyTransformers(request.source(), null, mappingTransformListener);
136147
} catch (IndexNotFoundException ex) {
137148
logger.debug(() -> new ParameterizedMessage("failed to put mappings on indices [{}]", Arrays.asList(request.indices())), ex);
138149
throw ex;

server/src/main/java/org/opensearch/action/admin/indices/template/put/TransportPutComponentTemplateAction.java

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,16 +44,20 @@
4444
import org.opensearch.cluster.metadata.MetadataIndexTemplateService;
4545
import org.opensearch.cluster.metadata.Template;
4646
import org.opensearch.cluster.service.ClusterService;
47+
import org.opensearch.common.compress.CompressedXContent;
4748
import org.opensearch.common.inject.Inject;
4849
import org.opensearch.common.settings.IndexScopedSettings;
4950
import org.opensearch.common.settings.Settings;
5051
import org.opensearch.core.action.ActionListener;
5152
import org.opensearch.core.common.io.stream.StreamInput;
53+
import org.opensearch.index.mapper.MappingTransformerRegistry;
5254
import org.opensearch.threadpool.ThreadPool;
5355
import org.opensearch.transport.TransportService;
5456

5557
import java.io.IOException;
5658

59+
import reactor.util.annotation.NonNull;
60+
5761
/**
5862
* An action for putting a single component template into the cluster state
5963
*
@@ -65,6 +69,7 @@ public class TransportPutComponentTemplateAction extends TransportClusterManager
6569

6670
private final MetadataIndexTemplateService indexTemplateService;
6771
private final IndexScopedSettings indexScopedSettings;
72+
private final MappingTransformerRegistry mappingTransformerRegistry;
6873

6974
@Inject
7075
public TransportPutComponentTemplateAction(
@@ -74,7 +79,8 @@ public TransportPutComponentTemplateAction(
7479
MetadataIndexTemplateService indexTemplateService,
7580
ActionFilters actionFilters,
7681
IndexNameExpressionResolver indexNameExpressionResolver,
77-
IndexScopedSettings indexScopedSettings
82+
IndexScopedSettings indexScopedSettings,
83+
MappingTransformerRegistry mappingTransformerRegistry
7884
) {
7985
super(
8086
PutComponentTemplateAction.NAME,
@@ -87,6 +93,7 @@ public TransportPutComponentTemplateAction(
8793
);
8894
this.indexTemplateService = indexTemplateService;
8995
this.indexScopedSettings = indexScopedSettings;
96+
this.mappingTransformerRegistry = mappingTransformerRegistry;
9097
}
9198

9299
@Override
@@ -121,13 +128,37 @@ protected void clusterManagerOperation(
121128
template = new Template(settings, template.mappings(), template.aliases());
122129
componentTemplate = new ComponentTemplate(template, componentTemplate.version(), componentTemplate.metadata());
123130
}
124-
indexTemplateService.putComponentTemplate(
125-
request.cause(),
126-
request.create(),
127-
request.name(),
128-
request.clusterManagerNodeTimeout(),
129-
componentTemplate,
130-
listener
131-
);
131+
132+
final ActionListener<String> mappingTransformListener = getMappingTransformListener(request, listener, componentTemplate);
133+
134+
transformMapping(template, mappingTransformListener);
135+
}
136+
137+
private ActionListener<String> getMappingTransformListener(
138+
@NonNull final PutComponentTemplateAction.Request request,
139+
@NonNull final ActionListener<AcknowledgedResponse> listener,
140+
@NonNull final ComponentTemplate componentTemplate
141+
) {
142+
return ActionListener.wrap(transformedMappings -> {
143+
if (transformedMappings != null && componentTemplate.template() != null) {
144+
componentTemplate.template().setMappings(new CompressedXContent(transformedMappings));
145+
}
146+
indexTemplateService.putComponentTemplate(
147+
request.cause(),
148+
request.create(),
149+
request.name(),
150+
request.clusterManagerNodeTimeout(),
151+
componentTemplate,
152+
listener
153+
);
154+
}, listener::onFailure);
155+
}
156+
157+
private void transformMapping(final Template template, @NonNull final ActionListener<String> mappingTransformListener) {
158+
if (template == null || template.mappings() == null) {
159+
mappingTransformListener.onResponse(null);
160+
} else {
161+
mappingTransformerRegistry.applyTransformers(template.mappings().string(), null, mappingTransformListener);
162+
}
132163
}
133164
}

server/src/main/java/org/opensearch/action/admin/indices/template/put/TransportPutComposableIndexTemplateAction.java

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,19 @@
4141
import org.opensearch.cluster.metadata.ComposableIndexTemplate;
4242
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
4343
import org.opensearch.cluster.metadata.MetadataIndexTemplateService;
44+
import org.opensearch.cluster.metadata.Template;
4445
import org.opensearch.cluster.service.ClusterService;
46+
import org.opensearch.common.compress.CompressedXContent;
4547
import org.opensearch.common.inject.Inject;
4648
import org.opensearch.core.action.ActionListener;
4749
import org.opensearch.core.common.io.stream.StreamInput;
50+
import org.opensearch.index.mapper.MappingTransformerRegistry;
4851
import org.opensearch.threadpool.ThreadPool;
4952
import org.opensearch.transport.TransportService;
5053

5154
import java.io.IOException;
5255

56+
import reactor.util.annotation.NonNull;
5357
/**
5458
* An action for putting a composable index template into the cluster state
5559
*
@@ -60,6 +64,7 @@ public class TransportPutComposableIndexTemplateAction extends TransportClusterM
6064
AcknowledgedResponse> {
6165

6266
private final MetadataIndexTemplateService indexTemplateService;
67+
private final MappingTransformerRegistry mappingTransformerRegistry;
6368

6469
@Inject
6570
public TransportPutComposableIndexTemplateAction(
@@ -68,7 +73,8 @@ public TransportPutComposableIndexTemplateAction(
6873
ThreadPool threadPool,
6974
MetadataIndexTemplateService indexTemplateService,
7075
ActionFilters actionFilters,
71-
IndexNameExpressionResolver indexNameExpressionResolver
76+
IndexNameExpressionResolver indexNameExpressionResolver,
77+
MappingTransformerRegistry mappingTransformerRegistry
7278
) {
7379
super(
7480
PutComposableIndexTemplateAction.NAME,
@@ -80,6 +86,7 @@ public TransportPutComposableIndexTemplateAction(
8086
indexNameExpressionResolver
8187
);
8288
this.indexTemplateService = indexTemplateService;
89+
this.mappingTransformerRegistry = mappingTransformerRegistry;
8390
}
8491

8592
@Override
@@ -103,15 +110,35 @@ protected void clusterManagerOperation(
103110
final PutComposableIndexTemplateAction.Request request,
104111
final ClusterState state,
105112
final ActionListener<AcknowledgedResponse> listener
113+
) throws IOException {
114+
final ComposableIndexTemplate indexTemplate = request.indexTemplate();
115+
116+
final ActionListener<String> mappingTransformListener = ActionListener.wrap(transformedMappings -> {
117+
if (transformedMappings != null && indexTemplate.template() != null) {
118+
indexTemplate.template().setMappings(new CompressedXContent(transformedMappings));
119+
}
120+
indexTemplateService.putIndexTemplateV2(
121+
request.cause(),
122+
request.create(),
123+
request.name(),
124+
request.clusterManagerNodeTimeout(),
125+
indexTemplate,
126+
listener
127+
);
128+
}, listener::onFailure);
129+
130+
transformMapping(indexTemplate, mappingTransformListener);
131+
}
132+
133+
private void transformMapping(
134+
@NonNull final ComposableIndexTemplate indexTemplate,
135+
@NonNull final ActionListener<String> mappingTransformListener
106136
) {
107-
ComposableIndexTemplate indexTemplate = request.indexTemplate();
108-
indexTemplateService.putIndexTemplateV2(
109-
request.cause(),
110-
request.create(),
111-
request.name(),
112-
request.clusterManagerNodeTimeout(),
113-
indexTemplate,
114-
listener
115-
);
137+
final Template template = indexTemplate.template();
138+
if (template == null || template.mappings() == null) {
139+
mappingTransformListener.onResponse(null);
140+
} else {
141+
mappingTransformerRegistry.applyTransformers(template.mappings().string(), null, mappingTransformListener);
142+
}
116143
}
117144
}

0 commit comments

Comments
 (0)