Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce mapping transformer to allow transform mappings during index create/update or index template create/update #17635

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Change priority for scheduling reroute during timeout([#16445](https://github.com/opensearch-project/OpenSearch/pull/16445))
- Renaming the node role search to warm ([#17573](https://github.com/opensearch-project/OpenSearch/pull/17573))
- Introduce a new search node role to hold search only shards ([#17620](https://github.com/opensearch-project/OpenSearch/pull/17620))
- Introduce mapping transformer to allow transform mappings during index create/update or index template create/update ([#17635](https://github.com/opensearch-project/OpenSearch/pull/17635))

### Dependencies
- Bump `ch.qos.logback:logback-core` from 1.5.16 to 1.5.17 ([#17609](https://github.com/opensearch-project/OpenSearch/pull/17609))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.index.mapper.MappingTransformerRegistry;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand All @@ -56,6 +57,7 @@
public class TransportCreateIndexAction extends TransportClusterManagerNodeAction<CreateIndexRequest, CreateIndexResponse> {

private final MetadataCreateIndexService createIndexService;
private final MappingTransformerRegistry mappingTransformerRegistry;

@Inject
public TransportCreateIndexAction(
Expand All @@ -64,7 +66,9 @@ public TransportCreateIndexAction(
ThreadPool threadPool,
MetadataCreateIndexService createIndexService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
IndexNameExpressionResolver indexNameExpressionResolver,
MappingTransformerRegistry mappingTransformerRegistry

) {
super(
CreateIndexAction.NAME,
Expand All @@ -76,6 +80,7 @@ public TransportCreateIndexAction(
indexNameExpressionResolver
);
this.createIndexService = createIndexService;
this.mappingTransformerRegistry = mappingTransformerRegistry;
}

@Override
Expand Down Expand Up @@ -112,25 +117,31 @@ protected void clusterManagerOperation(
}

final String indexName = indexNameExpressionResolver.resolveDateMathExpression(request.index());
final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(
cause,
indexName,
request.index()
).ackTimeout(request.timeout())
.clusterManagerNodeTimeout(request.clusterManagerNodeTimeout())
.settings(request.settings())
.mappings(request.mappings())
.aliases(request.aliases())
.context(request.context())
.waitForActiveShards(request.waitForActiveShards());

createIndexService.createIndex(
updateRequest,
ActionListener.map(
listener,
response -> new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)
)
);

final String finalCause = cause;
final ActionListener<String> mappingTransformListener = ActionListener.wrap(transformedMappings -> {
final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(
finalCause,
indexName,
request.index()
).ackTimeout(request.timeout())
.clusterManagerNodeTimeout(request.clusterManagerNodeTimeout())
.settings(request.settings())
.mappings(transformedMappings)
.aliases(request.aliases())
.context(request.context())
.waitForActiveShards(request.waitForActiveShards());

createIndexService.createIndex(
updateRequest,
ActionListener.map(
listener,
response -> new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)
)
);
}, e -> { throw (RuntimeException) e; });
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exception is threw rather than using listener.onFailure because the behavior before this change is simply throw the exception. We don't want to change the existing behavior of how it handles the exception.


mappingTransformerRegistry.applyTransformers(request.mappings(), null, mappingTransformListener);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.index.Index;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.mapper.MappingTransformerRegistry;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand All @@ -72,6 +74,7 @@ public class TransportPutMappingAction extends TransportClusterManagerNodeAction

private final MetadataMappingService metadataMappingService;
private final RequestValidators<PutMappingRequest> requestValidators;
private final MappingTransformerRegistry mappingTransformerRegistry;

@Inject
public TransportPutMappingAction(
Expand All @@ -81,7 +84,8 @@ public TransportPutMappingAction(
final MetadataMappingService metadataMappingService,
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver,
final RequestValidators<PutMappingRequest> requestValidators
final RequestValidators<PutMappingRequest> requestValidators,
final MappingTransformerRegistry mappingTransformerRegistry
) {
super(
PutMappingAction.NAME,
Expand All @@ -94,6 +98,7 @@ public TransportPutMappingAction(
);
this.metadataMappingService = metadataMappingService;
this.requestValidators = Objects.requireNonNull(requestValidators);
this.mappingTransformerRegistry = mappingTransformerRegistry;
}

@Override
Expand Down Expand Up @@ -132,7 +137,13 @@ protected void clusterManagerOperation(
listener.onFailure(maybeValidationException.get());
return;
}
performMappingUpdate(concreteIndices, request, listener, metadataMappingService);

final ActionListener<String> mappingTransformListener = ActionListener.wrap(transformedMapping -> {
request.source(transformedMapping, MediaTypeRegistry.JSON);
performMappingUpdate(concreteIndices, request, listener, metadataMappingService);
}, e -> { throw (RuntimeException) e; });

mappingTransformerRegistry.applyTransformers(request.source(), null, mappingTransformListener);
} catch (IndexNotFoundException ex) {
logger.debug(() -> new ParameterizedMessage("failed to put mappings on indices [{}]", Arrays.asList(request.indices())), ex);
throw ex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,20 @@
import org.opensearch.cluster.metadata.MetadataIndexTemplateService;
import org.opensearch.cluster.metadata.Template;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.compress.CompressedXContent;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.index.mapper.MappingTransformerRegistry;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;

import reactor.util.annotation.NonNull;

/**
* An action for putting a single component template into the cluster state
*
Expand All @@ -65,6 +69,7 @@ public class TransportPutComponentTemplateAction extends TransportClusterManager

private final MetadataIndexTemplateService indexTemplateService;
private final IndexScopedSettings indexScopedSettings;
private final MappingTransformerRegistry mappingTransformerRegistry;

@Inject
public TransportPutComponentTemplateAction(
Expand All @@ -74,7 +79,8 @@ public TransportPutComponentTemplateAction(
MetadataIndexTemplateService indexTemplateService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
IndexScopedSettings indexScopedSettings
IndexScopedSettings indexScopedSettings,
MappingTransformerRegistry mappingTransformerRegistry
) {
super(
PutComponentTemplateAction.NAME,
Expand All @@ -87,6 +93,7 @@ public TransportPutComponentTemplateAction(
);
this.indexTemplateService = indexTemplateService;
this.indexScopedSettings = indexScopedSettings;
this.mappingTransformerRegistry = mappingTransformerRegistry;
}

@Override
Expand Down Expand Up @@ -121,13 +128,37 @@ protected void clusterManagerOperation(
template = new Template(settings, template.mappings(), template.aliases());
componentTemplate = new ComponentTemplate(template, componentTemplate.version(), componentTemplate.metadata());
}
indexTemplateService.putComponentTemplate(
request.cause(),
request.create(),
request.name(),
request.clusterManagerNodeTimeout(),
componentTemplate,
listener
);

final ActionListener<String> mappingTransformListener = getMappingTransformListener(request, listener, componentTemplate);

transformMapping(template, mappingTransformListener);
}

private ActionListener<String> getMappingTransformListener(
@NonNull final PutComponentTemplateAction.Request request,
@NonNull final ActionListener<AcknowledgedResponse> listener,
@NonNull final ComponentTemplate componentTemplate
) {
return ActionListener.wrap(transformedMappings -> {
if (transformedMappings != null && componentTemplate.template() != null) {
componentTemplate.template().setMappings(new CompressedXContent(transformedMappings));
}
indexTemplateService.putComponentTemplate(
request.cause(),
request.create(),
request.name(),
request.clusterManagerNodeTimeout(),
componentTemplate,
listener
);
}, e -> { throw (RuntimeException) e; });
}

private void transformMapping(final Template template, @NonNull final ActionListener<String> mappingTransformListener) {
if (template == null || template.mappings() == null) {
mappingTransformListener.onResponse(null);
} else {
mappingTransformerRegistry.applyTransformers(template.mappings().string(), null, mappingTransformListener);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,20 @@
import org.opensearch.cluster.metadata.ComposableIndexTemplate;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.MetadataIndexTemplateService;
import org.opensearch.cluster.metadata.Template;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.compress.CompressedXContent;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.index.mapper.MappingTransformerRegistry;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;

import reactor.util.annotation.NonNull;

/**
* An action for putting a composable index template into the cluster state
*
Expand All @@ -60,6 +65,7 @@ public class TransportPutComposableIndexTemplateAction extends TransportClusterM
AcknowledgedResponse> {

private final MetadataIndexTemplateService indexTemplateService;
private final MappingTransformerRegistry mappingTransformerRegistry;

@Inject
public TransportPutComposableIndexTemplateAction(
Expand All @@ -68,7 +74,8 @@ public TransportPutComposableIndexTemplateAction(
ThreadPool threadPool,
MetadataIndexTemplateService indexTemplateService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
IndexNameExpressionResolver indexNameExpressionResolver,
MappingTransformerRegistry mappingTransformerRegistry
) {
super(
PutComposableIndexTemplateAction.NAME,
Expand All @@ -80,6 +87,7 @@ public TransportPutComposableIndexTemplateAction(
indexNameExpressionResolver
);
this.indexTemplateService = indexTemplateService;
this.mappingTransformerRegistry = mappingTransformerRegistry;
}

@Override
Expand All @@ -103,15 +111,35 @@ protected void clusterManagerOperation(
final PutComposableIndexTemplateAction.Request request,
final ClusterState state,
final ActionListener<AcknowledgedResponse> listener
) throws IOException {
final ComposableIndexTemplate indexTemplate = request.indexTemplate();

final ActionListener<String> mappingTransformListener = ActionListener.wrap(transformedMappings -> {
if (transformedMappings != null && indexTemplate.template() != null) {
indexTemplate.template().setMappings(new CompressedXContent(transformedMappings));
}
indexTemplateService.putIndexTemplateV2(
request.cause(),
request.create(),
request.name(),
request.clusterManagerNodeTimeout(),
indexTemplate,
listener
);
}, e -> { throw (RuntimeException) e; });

transformMapping(indexTemplate, mappingTransformListener);
}

private void transformMapping(
@NonNull final ComposableIndexTemplate indexTemplate,
@NonNull final ActionListener<String> mappingTransformListener
) {
ComposableIndexTemplate indexTemplate = request.indexTemplate();
indexTemplateService.putIndexTemplateV2(
request.cause(),
request.create(),
request.name(),
request.clusterManagerNodeTimeout(),
indexTemplate,
listener
);
final Template template = indexTemplate.template();
if (template == null || template.mappings() == null) {
mappingTransformListener.onResponse(null);
} else {
mappingTransformerRegistry.applyTransformers(template.mappings().string(), null, mappingTransformListener);
}
}
}
Loading
Loading