From f53d2fd76aab733dbf8479c57799e5e384780f69 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Fri, 18 Apr 2025 10:26:06 -0400 Subject: [PATCH] [FEATURE] Reactive version of the clients Signed-off-by: Andriy Redko --- settings.gradle.kts | 1 + spring-data-opensearch/build.gradle.kts | 1 + .../data/client/osc/OpenSearchClients.java | 88 + .../client/osc/ReactiveChildTemplate.java | 61 + .../client/osc/ReactiveClusterTemplate.java | 40 + .../client/osc/ReactiveIndicesTemplate.java | 483 +++++ .../client/osc/ReactiveOpenSearchClient.java | 455 +++++ .../osc/ReactiveOpenSearchClusterClient.java | 96 + .../osc/ReactiveOpenSearchConfiguration.java | 120 ++ .../osc/ReactiveOpenSearchIndicesClient.java | 616 ++++++ .../osc/ReactiveOpenSearchTemplate.java | 674 +++++++ .../data/client/osc/RequestConverter.java | 49 +- .../client/JUnit5SampleReactiveOSCTests.java | 39 + ...activeOpenSearchConfigurationOSCTests.java | 76 + ...ctiveElasticsearchOSCIntegrationTests.java | 165 ++ ...eactivePointInTimeOSCIntegrationTests.java | 35 + .../ReactiveReindexOSCIntegrationTests.java | 31 + ...tiveSearchTemplateOSCIntegrationTests.java | 31 + ...OperationsReactiveOSCIntegrationTests.java | 24 + .../ReactiveCallbackOSCIntegrationTests.java | 36 + ...iveIndexOperationsOSCIntegrationTests.java | 41 + ...ctiveIndexTemplateOSCIntegrationTests.java | 41 + ...ieldNamingStrategyOSCIntegrationTests.java | 50 + ...eactiveSearchAfterOSCIntegrationTests.java | 38 + .../ReactiveRoutingOSCIntegrationTests.java | 37 + .../ReactiveSuggestOSCIntegrationTests.java | 66 + ...activeOpenSearchTemplateConfiguration.java | 67 + .../opensearch/data/client/osc/DevTests.java | 38 + .../osc/ReactiveIndicesTemplateTest.java | 73 + .../data/client/osc/RestClientsTest.java | 415 ++++ ...asticsearchRepositoriesRegistrarTests.java | 104 + ...ReactiveElasticsearchIntegrationTests.java | 1670 +++++++++++++++++ .../ReactivePointInTimeIntegrationTests.java | 112 ++ .../core/ReactiveReindexIntegrationTests.java | 160 ++ ...eactiveSearchTemplateIntegrationTests.java | 150 ++ ...terOperationsReactiveIntegrationTests.java | 59 + .../ReactiveCallbackIntegrationTests.java | 143 ++ ...activeIndexOperationsIntegrationTests.java | 466 +++++ ...ReactiveIndexTemplateIntegrationTests.java | 445 +++++ ...veFieldNamingStrategyIntegrationTests.java | 104 + .../ReactiveSearchAfterIntegrationTests.java | 154 ++ .../core/routing/ReactiveRoutingTests.java | 173 ++ .../ReactiveSuggestIntegrationTests.java | 197 ++ .../testcontainers-opensearch.properties | 2 +- 44 files changed, 7923 insertions(+), 3 deletions(-) create mode 100644 spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/ReactiveChildTemplate.java create mode 100644 spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/ReactiveClusterTemplate.java create mode 100644 spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/ReactiveIndicesTemplate.java create mode 100644 spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/ReactiveOpenSearchClient.java create mode 100644 spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/ReactiveOpenSearchClusterClient.java create mode 100644 spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/ReactiveOpenSearchConfiguration.java create mode 100644 spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/ReactiveOpenSearchIndicesClient.java create mode 100644 spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/ReactiveOpenSearchTemplate.java create mode 100644 spring-data-opensearch/src/test/java/org/opensearch/data/client/JUnit5SampleReactiveOSCTests.java create mode 100644 spring-data-opensearch/src/test/java/org/opensearch/data/client/config/configuration/ReactiveOpenSearchConfigurationOSCTests.java create mode 100644 spring-data-opensearch/src/test/java/org/opensearch/data/client/core/ReactiveElasticsearchOSCIntegrationTests.java create mode 100644 spring-data-opensearch/src/test/java/org/opensearch/data/client/core/ReactivePointInTimeOSCIntegrationTests.java create mode 100644 spring-data-opensearch/src/test/java/org/opensearch/data/client/core/ReactiveReindexOSCIntegrationTests.java create mode 100644 spring-data-opensearch/src/test/java/org/opensearch/data/client/core/ReactiveSearchTemplateOSCIntegrationTests.java create mode 100644 spring-data-opensearch/src/test/java/org/opensearch/data/client/core/cluster/ClusterOperationsReactiveOSCIntegrationTests.java create mode 100644 spring-data-opensearch/src/test/java/org/opensearch/data/client/core/event/ReactiveCallbackOSCIntegrationTests.java create mode 100644 spring-data-opensearch/src/test/java/org/opensearch/data/client/core/index/ReactiveIndexOperationsOSCIntegrationTests.java create mode 100644 spring-data-opensearch/src/test/java/org/opensearch/data/client/core/index/ReactiveIndexTemplateOSCIntegrationTests.java create mode 100644 spring-data-opensearch/src/test/java/org/opensearch/data/client/core/mapping/ReactiveFieldNamingStrategyOSCIntegrationTests.java create mode 100644 spring-data-opensearch/src/test/java/org/opensearch/data/client/core/paginating/ReactiveSearchAfterOSCIntegrationTests.java create mode 100644 spring-data-opensearch/src/test/java/org/opensearch/data/client/core/routing/ReactiveRoutingOSCIntegrationTests.java create mode 100644 spring-data-opensearch/src/test/java/org/opensearch/data/client/core/suggest/ReactiveSuggestOSCIntegrationTests.java create mode 100644 spring-data-opensearch/src/test/java/org/opensearch/data/client/junit/jupiter/ReactiveOpenSearchTemplateConfiguration.java create mode 100644 spring-data-opensearch/src/test/java/org/opensearch/data/client/osc/ReactiveIndicesTemplateTest.java create mode 100644 spring-data-opensearch/src/test/java/org/opensearch/data/client/osc/RestClientsTest.java create mode 100644 spring-data-opensearch/src/test/java/org/opensearch/data/client/repository/config/ReactiveElasticsearchRepositoriesRegistrarTests.java create mode 100644 spring-data-opensearch/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchIntegrationTests.java create mode 100644 spring-data-opensearch/src/test/java/org/springframework/data/elasticsearch/core/ReactivePointInTimeIntegrationTests.java create mode 100644 spring-data-opensearch/src/test/java/org/springframework/data/elasticsearch/core/ReactiveReindexIntegrationTests.java create mode 100644 spring-data-opensearch/src/test/java/org/springframework/data/elasticsearch/core/ReactiveSearchTemplateIntegrationTests.java create mode 100644 spring-data-opensearch/src/test/java/org/springframework/data/elasticsearch/core/cluster/ClusterOperationsReactiveIntegrationTests.java create mode 100644 spring-data-opensearch/src/test/java/org/springframework/data/elasticsearch/core/event/ReactiveCallbackIntegrationTests.java create mode 100644 spring-data-opensearch/src/test/java/org/springframework/data/elasticsearch/core/index/ReactiveIndexOperationsIntegrationTests.java create mode 100644 spring-data-opensearch/src/test/java/org/springframework/data/elasticsearch/core/index/ReactiveIndexTemplateIntegrationTests.java create mode 100644 spring-data-opensearch/src/test/java/org/springframework/data/elasticsearch/core/mapping/ReactiveFieldNamingStrategyIntegrationTests.java create mode 100644 spring-data-opensearch/src/test/java/org/springframework/data/elasticsearch/core/paginating/ReactiveSearchAfterIntegrationTests.java create mode 100644 spring-data-opensearch/src/test/java/org/springframework/data/elasticsearch/core/routing/ReactiveRoutingTests.java create mode 100644 spring-data-opensearch/src/test/java/org/springframework/data/elasticsearch/core/suggest/ReactiveSuggestIntegrationTests.java diff --git a/settings.gradle.kts b/settings.gradle.kts index 0d82c9fd..17139730 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -30,6 +30,7 @@ dependencyResolutionManagement { library("boot-test", "org.springframework.boot", "spring-boot-test").versionRef("spring-boot") library("boot-test-autoconfigure", "org.springframework.boot", "spring-boot-test-autoconfigure").versionRef("spring-boot") library("boot-testcontainers", "org.springframework.boot", "spring-boot-testcontainers").versionRef("spring-boot") + library("projectreactor", "io.projectreactor:reactor-test:3.7.5") plugin("spring-boot", "org.springframework.boot").versionRef("spring-boot") } diff --git a/spring-data-opensearch/build.gradle.kts b/spring-data-opensearch/build.gradle.kts index 7c890f8a..a7a86f67 100644 --- a/spring-data-opensearch/build.gradle.kts +++ b/spring-data-opensearch/build.gradle.kts @@ -38,6 +38,7 @@ dependencies { compileOnly(springLibs.web) compileOnly(opensearchLibs.java.client) + testImplementation(springLibs.projectreactor) testImplementation(opensearchLibs.java.client) testImplementation("jakarta.enterprise:jakarta.enterprise.cdi-api:3.0.0") testImplementation("org.slf4j:log4j-over-slf4j:2.0.17") diff --git a/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/OpenSearchClients.java b/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/OpenSearchClients.java index 0d11e22d..7133da83 100644 --- a/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/OpenSearchClients.java +++ b/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/OpenSearchClients.java @@ -62,6 +62,7 @@ @SuppressWarnings("unused") public final class OpenSearchClients { public static final String IMPERATIVE_CLIENT = "imperative"; + public static final String REACTIVE_CLIENT = "reactive"; /** * Name of whose value can be used to correlate log messages for this request. @@ -282,6 +283,93 @@ public static OpenSearchTransport getOpenSearchTransport(RestClient restClient, } // endregion + // region reactive client + /** + * Creates a new {@link ReactiveOpenSearchClient} + * + * @param clientConfiguration configuration options, must not be {@literal null}. + * @return the {@link ReactiveOpenSearchClient} + */ + public static ReactiveOpenSearchClient createReactive(ClientConfiguration clientConfiguration) { + + Assert.notNull(clientConfiguration, "clientConfiguration must not be null"); + + return createReactive(getRestClient(clientConfiguration), null, DEFAULT_JSONP_MAPPER); + } + + /** + * Creates a new {@link ReactiveOpenSearchClient} + * + * @param clientConfiguration configuration options, must not be {@literal null}. + * @param transportOptions options to be added to each request. + * @return the {@link ReactiveOpenSearchClient} + */ + public static ReactiveOpenSearchClient createReactive(ClientConfiguration clientConfiguration, + @Nullable TransportOptions transportOptions) { + + Assert.notNull(clientConfiguration, "ClientConfiguration must not be null!"); + + return createReactive(getRestClient(clientConfiguration), transportOptions, DEFAULT_JSONP_MAPPER); + } + + /** + * Creates a new {@link ReactiveOpenSearchClient} + * + * @param clientConfiguration configuration options, must not be {@literal null}. + * @param transportOptions options to be added to each request. + * @param jsonpMapper the JsonpMapper to use + * @return the {@link ReactiveOpenSearchClient} + */ + public static ReactiveOpenSearchClient createReactive(ClientConfiguration clientConfiguration, + @Nullable TransportOptions transportOptions, JsonpMapper jsonpMapper) { + + Assert.notNull(clientConfiguration, "ClientConfiguration must not be null!"); + Assert.notNull(jsonpMapper, "jsonpMapper must not be null"); + + return createReactive(getRestClient(clientConfiguration), transportOptions, jsonpMapper); + } + + /** + * Creates a new {@link ReactiveOpenSearchClient}. + * + * @param restClient the underlying {@link RestClient} + * @return the {@link ReactiveOpenSearchClient} + */ + public static ReactiveOpenSearchClient createReactive(RestClient restClient) { + return createReactive(restClient, null, DEFAULT_JSONP_MAPPER); + } + + /** + * Creates a new {@link ReactiveOpenSearchClient}. + * + * @param restClient the underlying {@link RestClient} + * @param transportOptions options to be added to each request. + * @return the {@link ReactiveOpenSearchClient} + */ + public static ReactiveOpenSearchClient createReactive(RestClient restClient, + @Nullable TransportOptions transportOptions, JsonpMapper jsonpMapper) { + + Assert.notNull(restClient, "restClient must not be null"); + + var transport = getOpenSearchTransport(restClient, REACTIVE_CLIENT, transportOptions, jsonpMapper); + return createReactive(transport); + } + + /** + * Creates a new {@link ReactiveOpenSearchClient} that uses the given {@link OpenSearchTransport}. + * + * @param transport the transport to use + * @return the {@link ReactiveOpenSearchClient} + */ + public static ReactiveOpenSearchClient createReactive(OpenSearchTransport transport) { + + Assert.notNull(transport, "transport must not be null"); + + return new ReactiveOpenSearchClient(transport); + } + // endregion + + private static List formattedHosts(List hosts, boolean useSsl) { return hosts.stream().map(it -> (useSsl ? "https" : "http") + "://" + it.getHostString() + ':' + it.getPort()) .collect(Collectors.toList()); diff --git a/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/ReactiveChildTemplate.java b/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/ReactiveChildTemplate.java new file mode 100644 index 00000000..70ac14e3 --- /dev/null +++ b/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/ReactiveChildTemplate.java @@ -0,0 +1,61 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.data.client.osc; + +import org.opensearch.client.ApiClient; +import org.opensearch.client.json.JsonpMapper; +import org.opensearch.client.transport.Transport; +import org.reactivestreams.Publisher; +import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; +import org.springframework.util.Assert; +import reactor.core.publisher.Flux; + +/** + * base class for a reactive template that uses on of the {@link ReactiveOpenSearchClient}'s child clients. + */ +public class ReactiveChildTemplate> { + protected final CLIENT client; + protected final ElasticsearchConverter elasticsearchConverter; + protected final RequestConverter requestConverter; + protected final ResponseConverter responseConverter; + protected final OpenSearchExceptionTranslator exceptionTranslator; + + public ReactiveChildTemplate(CLIENT client, ElasticsearchConverter elasticsearchConverter) { + this.client = client; + this.elasticsearchConverter = elasticsearchConverter; + JsonpMapper jsonpMapper = client._transport().jsonpMapper(); + requestConverter = new RequestConverter(elasticsearchConverter, jsonpMapper); + responseConverter = new ResponseConverter(jsonpMapper); + exceptionTranslator = new OpenSearchExceptionTranslator(jsonpMapper); + } + + /** + * Callback interface to be used with {@link #execute(ClientCallback)} for operating directly on the client. + */ + @FunctionalInterface + public interface ClientCallback> { + RESULT doWithClient(CLIENT client); + } + + /** + * Execute a callback with the client and provide exception translation. + * + * @param callback the callback to execute, must not be {@literal null} + * @param the type returned from the callback + * @return the callback result + */ + public Publisher execute(ClientCallback> callback) { + + Assert.notNull(callback, "callback must not be null"); + + return Flux.defer(() -> callback.doWithClient(client)).onErrorMap(exceptionTranslator::translateException); + } + +} diff --git a/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/ReactiveClusterTemplate.java b/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/ReactiveClusterTemplate.java new file mode 100644 index 00000000..04ea4490 --- /dev/null +++ b/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/ReactiveClusterTemplate.java @@ -0,0 +1,40 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.data.client.osc; + +import org.opensearch.client.opensearch.cluster.HealthRequest; +import org.opensearch.client.opensearch.cluster.HealthResponse; +import org.opensearch.client.transport.OpenSearchTransport; +import org.springframework.data.elasticsearch.core.cluster.ClusterHealth; +import org.springframework.data.elasticsearch.core.cluster.ReactiveClusterOperations; +import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; +import reactor.core.publisher.Mono; + +/** + * Reactive cluster template + */ +public class ReactiveClusterTemplate + extends ReactiveChildTemplate + implements ReactiveClusterOperations { + + public ReactiveClusterTemplate(ReactiveOpenSearchClusterClient client, + ElasticsearchConverter elasticsearchConverter) { + super(client, elasticsearchConverter); + } + + @Override + public Mono health() { + + HealthRequest healthRequest = requestConverter.clusterHealthRequest(); + Mono healthResponse = Mono.from(execute(client -> client.health(healthRequest))); + return healthResponse.map(responseConverter::clusterHealth); + } + +} diff --git a/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/ReactiveIndicesTemplate.java b/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/ReactiveIndicesTemplate.java new file mode 100644 index 00000000..9fed3374 --- /dev/null +++ b/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/ReactiveIndicesTemplate.java @@ -0,0 +1,483 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.data.client.osc; + +import static org.springframework.util.StringUtils.hasText; + +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import org.opensearch.client.opensearch._types.AcknowledgedResponseBase; +import org.opensearch.client.opensearch.indices.CreateIndexRequest; +import org.opensearch.client.opensearch.indices.CreateIndexResponse; +import org.opensearch.client.opensearch.indices.DeleteIndexRequest; +import org.opensearch.client.opensearch.indices.DeleteIndexResponse; +import org.opensearch.client.opensearch.indices.DeleteTemplateResponse; +import org.opensearch.client.opensearch.indices.ExistsRequest; +import org.opensearch.client.opensearch.indices.GetAliasRequest; +import org.opensearch.client.opensearch.indices.GetAliasResponse; +import org.opensearch.client.opensearch.indices.GetIndexRequest; +import org.opensearch.client.opensearch.indices.GetIndicesSettingsRequest; +import org.opensearch.client.opensearch.indices.GetIndicesSettingsResponse; +import org.opensearch.client.opensearch.indices.GetMappingRequest; +import org.opensearch.client.opensearch.indices.GetMappingResponse; +import org.opensearch.client.opensearch.indices.GetTemplateResponse; +import org.opensearch.client.opensearch.indices.PutIndexTemplateResponse; +import org.opensearch.client.opensearch.indices.PutMappingResponse; +import org.opensearch.client.opensearch.indices.PutTemplateResponse; +import org.opensearch.client.opensearch.indices.RefreshRequest; +import org.opensearch.client.opensearch.indices.UpdateAliasesRequest; +import org.opensearch.client.opensearch.indices.UpdateAliasesResponse; +import org.opensearch.client.transport.OpenSearchTransport; +import org.opensearch.client.transport.endpoints.BooleanResponse; +import org.springframework.core.annotation.AnnotatedElementUtils; +import org.springframework.dao.InvalidDataAccessApiUsageException; +import org.springframework.data.elasticsearch.NoSuchIndexException; +import org.springframework.data.elasticsearch.annotations.Mapping; +import org.springframework.data.elasticsearch.core.IndexInformation; +import org.springframework.data.elasticsearch.core.ReactiveIndexOperations; +import org.springframework.data.elasticsearch.core.ReactiveResourceUtil; +import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; +import org.springframework.data.elasticsearch.core.document.Document; +import org.springframework.data.elasticsearch.core.index.AliasActions; +import org.springframework.data.elasticsearch.core.index.AliasData; +import org.springframework.data.elasticsearch.core.index.DeleteComponentTemplateRequest; +import org.springframework.data.elasticsearch.core.index.DeleteIndexTemplateRequest; +import org.springframework.data.elasticsearch.core.index.DeleteTemplateRequest; +import org.springframework.data.elasticsearch.core.index.ExistsComponentTemplateRequest; +import org.springframework.data.elasticsearch.core.index.ExistsIndexTemplateRequest; +import org.springframework.data.elasticsearch.core.index.ExistsTemplateRequest; +import org.springframework.data.elasticsearch.core.index.GetComponentTemplateRequest; +import org.springframework.data.elasticsearch.core.index.GetIndexTemplateRequest; +import org.springframework.data.elasticsearch.core.index.GetTemplateRequest; +import org.springframework.data.elasticsearch.core.index.PutComponentTemplateRequest; +import org.springframework.data.elasticsearch.core.index.PutIndexTemplateRequest; +import org.springframework.data.elasticsearch.core.index.PutTemplateRequest; +import org.springframework.data.elasticsearch.core.index.ReactiveMappingBuilder; +import org.springframework.data.elasticsearch.core.index.Settings; +import org.springframework.data.elasticsearch.core.index.TemplateData; +import org.springframework.data.elasticsearch.core.index.TemplateResponse; +import org.springframework.data.elasticsearch.core.mapping.Alias; +import org.springframework.data.elasticsearch.core.mapping.CreateIndexSettings; +import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity; +import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * Reactive Indices Template + */ +public class ReactiveIndicesTemplate + extends ReactiveChildTemplate + implements ReactiveIndexOperations { + + // we need a cluster client as well because ES has put some methods from the indices API into the cluster client + // (component templates) + private final ReactiveClusterTemplate clusterTemplate; + + @Nullable private final Class boundClass; + private final IndexCoordinates boundIndexCoordinates; + + public ReactiveIndicesTemplate(ReactiveOpenSearchIndicesClient client, ReactiveClusterTemplate clusterTemplate, + ElasticsearchConverter elasticsearchConverter, IndexCoordinates index) { + + super(client, elasticsearchConverter); + + Assert.notNull(index, "index must not be null"); + Assert.notNull(clusterTemplate, "clusterTemplate must not be null"); + + this.clusterTemplate = clusterTemplate; + this.boundClass = null; + this.boundIndexCoordinates = index; + } + + public ReactiveIndicesTemplate(ReactiveOpenSearchIndicesClient client, ReactiveClusterTemplate clusterTemplate, + ElasticsearchConverter elasticsearchConverter, Class clazz) { + + super(client, elasticsearchConverter); + + Assert.notNull(clazz, "clazz must not be null"); + Assert.notNull(clusterTemplate, "clusterTemplate must not be null"); + + this.clusterTemplate = clusterTemplate; + this.boundClass = clazz; + this.boundIndexCoordinates = getIndexCoordinatesFor(clazz); + } + + @Override + public Mono create() { + + IndexCoordinates indexCoordinates = getIndexCoordinates(); + + if (boundClass != null) { + return createSettings(boundClass).flatMap(settings -> doCreate(indexCoordinates, settings, null)); + } else { + return doCreate(indexCoordinates, new Settings(), null); + } + } + + @Override + public Mono create(Map settings) { + + Assert.notNull(settings, "settings must not be null"); + + return doCreate(getIndexCoordinates(), settings, null); + } + + @Override + public Mono create(Map settings, Document mapping) { + + Assert.notNull(settings, "settings must not be null"); + Assert.notNull(mapping, "mapping must not be null"); + + return doCreate(getIndexCoordinates(), settings, mapping); + } + + @Override + public Mono createWithMapping() { + return createSettings() // + .flatMap(settings -> // + createMapping().flatMap(mapping -> // + doCreate(getIndexCoordinates(), settings, mapping))); // + } + + private Mono doCreate(IndexCoordinates indexCoordinates, Map settings, + @Nullable Document mapping) { + Set aliases = (boundClass != null) ? getAliasesFor(boundClass) : new HashSet<>(); + CreateIndexSettings indexSettings = CreateIndexSettings.builder(indexCoordinates) + .withAliases(aliases) + .withSettings(settings) + .withMapping(mapping) + .build(); + + CreateIndexRequest createIndexRequest = requestConverter.indicesCreateRequest(indexSettings); + Mono createIndexResponse = Mono.from(execute(client -> client.create(createIndexRequest))); + return createIndexResponse.map(CreateIndexResponse::acknowledged); + } + + @Override + public Mono delete() { + return exists().flatMap(exists -> { + + if (exists) { + DeleteIndexRequest deleteIndexRequest = requestConverter.indicesDeleteRequest(getIndexCoordinates()); + return Mono.from(execute(client -> client.delete(deleteIndexRequest))) // + .map(DeleteIndexResponse::acknowledged) // + .onErrorResume(NoSuchIndexException.class, e -> Mono.just(false)); + } else { + return Mono.just(false); + } + }); + + } + + @Override + public Mono exists() { + + ExistsRequest existsRequest = requestConverter.indicesExistsRequest(getIndexCoordinates()); + Mono existsResponse = Mono.from(execute(client -> client.exists(existsRequest))); + return existsResponse.map(BooleanResponse::value); + } + + @Override + public Mono refresh() { + RefreshRequest refreshRequest = requestConverter.indicesRefreshRequest(getIndexCoordinates()); + return Mono.from(execute(client -> client.refresh(refreshRequest))).then(); + } + + @Override + public Mono createMapping() { + return createMapping(checkForBoundClass()); + } + + @Override + public Mono createMapping(Class clazz) { + + Assert.notNull(clazz, "clazz must not be null"); + + Mapping mappingAnnotation = AnnotatedElementUtils.findMergedAnnotation(clazz, Mapping.class); + + if (mappingAnnotation != null) { + String mappingPath = mappingAnnotation.mappingPath(); + + if (hasText(mappingPath)) { + return ReactiveResourceUtil.loadDocument(mappingAnnotation.mappingPath(), "@Mapping"); + } + } + + return new ReactiveMappingBuilder(elasticsearchConverter).buildReactivePropertyMapping(clazz).map(Document::parse); + } + + @Override + public Mono putMapping(Mono mapping) { + + Assert.notNull(mapping, "mapping must not be null"); + + Mono putMappingResponse = mapping + .map(document -> requestConverter.indicesPutMappingRequest(getIndexCoordinates(), document)) // + .flatMap(putMappingRequest -> Mono.from(client.putMapping(putMappingRequest))); + return putMappingResponse.map(PutMappingResponse::acknowledged); + } + + @Override + public Mono getMapping() { + + IndexCoordinates indexCoordinates = getIndexCoordinates(); + GetMappingRequest getMappingRequest = requestConverter.indicesGetMappingRequest(indexCoordinates); + Mono getMappingResponse = Mono.from(execute(client -> client.getMapping(getMappingRequest))); + return getMappingResponse.map(response -> responseConverter.indicesGetMapping(response, indexCoordinates)); + } + + @Override + public Mono createSettings() { + return createSettings(checkForBoundClass()); + } + + @Override + public Mono createSettings(Class clazz) { + + Assert.notNull(clazz, "clazz must not be null"); + + ElasticsearchPersistentEntity persistentEntity = elasticsearchConverter.getMappingContext() + .getRequiredPersistentEntity(clazz); + String settingPath = persistentEntity.settingPath(); + return hasText(settingPath) // + ? ReactiveResourceUtil.loadDocument(settingPath, "@Setting") // + .map(Settings::new) // + : Mono.just(persistentEntity.getDefaultSettings()); + } + + @Override + public Mono getSettings(boolean includeDefaults) { + + GetIndicesSettingsRequest getSettingsRequest = requestConverter.indicesGetSettingsRequest(getIndexCoordinates(), + includeDefaults); + Mono getSettingsResponse = Mono + .from(execute(client -> client.getSettings(getSettingsRequest))); + return getSettingsResponse + .map(response -> responseConverter.indicesGetSettings(response, getIndexCoordinates().getIndexName())); + } + + @Override + public Mono alias(AliasActions aliasActions) { + + Assert.notNull(aliasActions, "aliasActions must not be null"); + + UpdateAliasesRequest updateAliasesRequest = requestConverter.indicesUpdateAliasesRequest(aliasActions); + Mono updateAliasesResponse = Mono + .from(execute(client -> client.updateAliases(updateAliasesRequest))); + return updateAliasesResponse.map(AcknowledgedResponseBase::acknowledged); + } + + @Override + public Mono>> getAliases(String... aliasNames) { + return getAliases(aliasNames, null); + } + + @Override + public Mono>> getAliasesForIndex(String... indexNames) { + return getAliases(null, indexNames); + } + + private Mono>> getAliases(@Nullable String[] aliasNames, @Nullable String[] indexNames) { + + GetAliasRequest getAliasRequest = requestConverter.indicesGetAliasRequest(aliasNames, indexNames); + Mono getAliasResponse = Mono.from(execute(client -> client.getAlias(getAliasRequest))); + return getAliasResponse.map(responseConverter::indicesGetAliasData); + } + + @Override + public Mono putTemplate(PutTemplateRequest putTemplateRequest) { + + Assert.notNull(putTemplateRequest, "putTemplateRequest must not be null"); + + org.opensearch.client.opensearch.indices.PutTemplateRequest putTemplateRequestES = requestConverter + .indicesPutTemplateRequest(putTemplateRequest); + Mono putTemplateResponse = Mono + .from(execute(client -> client.putTemplate(putTemplateRequestES))); + return putTemplateResponse.map(PutTemplateResponse::acknowledged); + } + + @Override + public Mono putComponentTemplate(PutComponentTemplateRequest putComponentTemplateRequest) { + + Assert.notNull(putComponentTemplateRequest, "putComponentTemplateRequest must not be null"); + + org.opensearch.client.opensearch.cluster.PutComponentTemplateRequest putComponentTemplateRequestES = requestConverter + .clusterPutComponentTemplateRequest(putComponentTemplateRequest); + return Mono.from(clusterTemplate.execute(client -> client.putComponentTemplate(putComponentTemplateRequestES))) + .map(AcknowledgedResponseBase::acknowledged); + } + + @Override + public Flux getComponentTemplate(GetComponentTemplateRequest getComponentTemplateRequest) { + + Assert.notNull(getComponentTemplateRequest, "getComponentTemplateRequest must not be null"); + + org.opensearch.client.opensearch.cluster.GetComponentTemplateRequest getComponentTemplateRequestES = requestConverter + .clusterGetComponentTemplateRequest(getComponentTemplateRequest); + return Flux.from(clusterTemplate.execute(client -> client.getComponentTemplate(getComponentTemplateRequestES))) + .flatMapIterable(responseConverter::clusterGetComponentTemplates); + } + + @Override + public Mono existsComponentTemplate(ExistsComponentTemplateRequest existsComponentTemplateRequest) { + + Assert.notNull(existsComponentTemplateRequest, "existsComponentTemplateRequest must not be null"); + + org.opensearch.client.opensearch.cluster.ExistsComponentTemplateRequest existsComponentTemplateRequestES = requestConverter + .clusterExistsComponentTemplateRequest(existsComponentTemplateRequest); + + return Mono + .from(clusterTemplate.execute(client -> client.existsComponentTemplate(existsComponentTemplateRequestES))) + .map(BooleanResponse::value); + } + + @Override + public Mono deleteComponentTemplate(DeleteComponentTemplateRequest deleteComponentTemplateRequest) { + + Assert.notNull(deleteComponentTemplateRequest, "deleteComponentTemplateRequest must not be null"); + + org.opensearch.client.opensearch.cluster.DeleteComponentTemplateRequest deleteComponentTemplateRequestES = requestConverter + .clusterDeleteComponentTemplateRequest(deleteComponentTemplateRequest); + return Mono + .from(clusterTemplate.execute(client -> client.deleteComponentTemplate(deleteComponentTemplateRequestES))) + .map(AcknowledgedResponseBase::acknowledged); + } + + @Override + public Mono putIndexTemplate(PutIndexTemplateRequest putIndexTemplateRequest) { + + Assert.notNull(putIndexTemplateRequest, "putIndexTemplateRequest must not be null"); + + org.opensearch.client.opensearch.indices.PutIndexTemplateRequest putIndexTemplateRequestES = requestConverter + .indicesPutIndexTemplateRequest(putIndexTemplateRequest); + + return Mono.from(execute(client -> client.putIndexTemplate(putIndexTemplateRequestES))) + .map(PutIndexTemplateResponse::acknowledged); + } + + @Override + public Mono existsIndexTemplate(ExistsIndexTemplateRequest existsIndexTemplateRequest) { + + Assert.notNull(existsIndexTemplateRequest, "existsIndexTemplateRequest must not be null"); + + org.opensearch.client.opensearch.indices.ExistsIndexTemplateRequest existsIndexTemplateRequestES = requestConverter + .indicesExistsIndexTemplateRequest(existsIndexTemplateRequest); + return Mono.from(execute(client -> client.existsIndexTemplate(existsIndexTemplateRequestES))) + .map(BooleanResponse::value); + } + + @Override + public Flux getIndexTemplate(GetIndexTemplateRequest getIndexTemplateRequest) { + + Assert.notNull(getIndexTemplateRequest, "getIndexTemplateRequest must not be null"); + + org.opensearch.client.opensearch.indices.GetIndexTemplateRequest getIndexTemplateRequestES = requestConverter + .indicesGetIndexTemplateRequest(getIndexTemplateRequest); + return Mono.from(execute(client -> client.getIndexTemplate(getIndexTemplateRequestES))) + .flatMapIterable(responseConverter::getIndexTemplates); + } + + @Override + public Mono deleteIndexTemplate(DeleteIndexTemplateRequest deleteIndexTemplateRequest) { + + Assert.notNull(deleteIndexTemplateRequest, "deleteIndexTemplateRequest must not be null"); + + org.opensearch.client.opensearch.indices.DeleteIndexTemplateRequest deleteIndexTemplateRequestES = requestConverter + .indicesDeleteIndexTemplateRequest(deleteIndexTemplateRequest); + return Mono.from(execute(client -> client.deleteIndexTemplate(deleteIndexTemplateRequestES))) + .map(AcknowledgedResponseBase::acknowledged); + } + + @Override + public Mono getTemplate(GetTemplateRequest getTemplateRequest) { + + Assert.notNull(getTemplateRequest, "getTemplateRequest must not be null"); + + org.opensearch.client.opensearch.indices.GetTemplateRequest getTemplateRequestES = requestConverter + .indicesGetTemplateRequest(getTemplateRequest); + Mono getTemplateResponse = Mono + .from(execute(client -> client.getTemplate(getTemplateRequestES))); + + return getTemplateResponse.flatMap(response -> { + if (response != null) { + TemplateData templateData = responseConverter.indicesGetTemplateData(response, + getTemplateRequest.getTemplateName()); + if (templateData != null) { + return Mono.just(templateData); + } + } + return Mono.empty(); + }); + } + + @Override + public Mono existsTemplate(ExistsTemplateRequest existsTemplateRequest) { + + Assert.notNull(existsTemplateRequest, "existsTemplateRequest must not be null"); + + org.opensearch.client.opensearch.indices.ExistsTemplateRequest existsTemplateRequestES = requestConverter + .indicesExistsTemplateRequest(existsTemplateRequest); + return Mono.from(execute(client -> client.existsTemplate(existsTemplateRequestES))).map(BooleanResponse::value); + } + + @Override + public Mono deleteTemplate(DeleteTemplateRequest deleteTemplateRequest) { + + Assert.notNull(deleteTemplateRequest, "deleteTemplateRequest must not be null"); + + org.opensearch.client.opensearch.indices.DeleteTemplateRequest deleteTemplateRequestES = requestConverter + .indicesDeleteTemplateRequest(deleteTemplateRequest); + return Mono.from(execute(client -> client.deleteTemplate(deleteTemplateRequestES))) + .map(DeleteTemplateResponse::acknowledged); + } + + @Override + public Flux getInformation(IndexCoordinates index) { + + GetIndexRequest request = requestConverter.indicesGetIndexRequest(index); + + return Mono.from(execute(client -> client.get(request))) // + .map(responseConverter::indicesGetIndexInformations) // + .flatMapMany(Flux::fromIterable); + } + + @Override + public IndexCoordinates getIndexCoordinates() { + return (boundClass != null) ? getIndexCoordinatesFor(boundClass) : Objects.requireNonNull(boundIndexCoordinates); + } + + // region helper functions + private IndexCoordinates getIndexCoordinatesFor(Class clazz) { + return elasticsearchConverter.getMappingContext().getRequiredPersistentEntity(clazz).getIndexCoordinates(); + } + + /** + * Get the {@link Alias} of the provided class. + * + * @param clazz provided class that can be used to extract aliases. + */ + private Set getAliasesFor(Class clazz) { + return elasticsearchConverter.getMappingContext().getRequiredPersistentEntity(clazz).getAliases(); + } + + private Class checkForBoundClass() { + if (boundClass == null) { + throw new InvalidDataAccessApiUsageException("IndexOperations are not bound"); + } + return boundClass; + } + // endregion + +} diff --git a/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/ReactiveOpenSearchClient.java b/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/ReactiveOpenSearchClient.java new file mode 100644 index 00000000..83873698 --- /dev/null +++ b/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/ReactiveOpenSearchClient.java @@ -0,0 +1,455 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.data.client.osc; + +import java.io.IOException; +import java.util.function.Function; +import org.opensearch.client.ApiClient; +import org.opensearch.client.opensearch._types.ErrorResponse; +import org.opensearch.client.opensearch.core.BulkRequest; +import org.opensearch.client.opensearch.core.BulkResponse; +import org.opensearch.client.opensearch.core.ClearScrollRequest; +import org.opensearch.client.opensearch.core.ClearScrollResponse; +import org.opensearch.client.opensearch.core.CountRequest; +import org.opensearch.client.opensearch.core.CountResponse; +import org.opensearch.client.opensearch.core.DeleteByQueryRequest; +import org.opensearch.client.opensearch.core.DeleteByQueryResponse; +import org.opensearch.client.opensearch.core.DeleteRequest; +import org.opensearch.client.opensearch.core.DeleteResponse; +import org.opensearch.client.opensearch.core.DeleteScriptRequest; +import org.opensearch.client.opensearch.core.DeleteScriptResponse; +import org.opensearch.client.opensearch.core.ExistsRequest; +import org.opensearch.client.opensearch.core.GetRequest; +import org.opensearch.client.opensearch.core.GetResponse; +import org.opensearch.client.opensearch.core.GetScriptRequest; +import org.opensearch.client.opensearch.core.GetScriptResponse; +import org.opensearch.client.opensearch.core.IndexRequest; +import org.opensearch.client.opensearch.core.IndexResponse; +import org.opensearch.client.opensearch.core.InfoRequest; +import org.opensearch.client.opensearch.core.InfoResponse; +import org.opensearch.client.opensearch.core.MgetRequest; +import org.opensearch.client.opensearch.core.MgetResponse; +import org.opensearch.client.opensearch.core.PingRequest; +import org.opensearch.client.opensearch.core.PutScriptRequest; +import org.opensearch.client.opensearch.core.PutScriptResponse; +import org.opensearch.client.opensearch.core.ReindexRequest; +import org.opensearch.client.opensearch.core.ReindexResponse; +import org.opensearch.client.opensearch.core.ScrollRequest; +import org.opensearch.client.opensearch.core.ScrollResponse; +import org.opensearch.client.opensearch.core.SearchRequest; +import org.opensearch.client.opensearch.core.SearchResponse; +import org.opensearch.client.opensearch.core.SearchTemplateRequest; +import org.opensearch.client.opensearch.core.SearchTemplateResponse; +import org.opensearch.client.opensearch.core.UpdateRequest; +import org.opensearch.client.opensearch.core.UpdateResponse; +import org.opensearch.client.opensearch.core.pit.CreatePitRequest; +import org.opensearch.client.opensearch.core.pit.CreatePitResponse; +import org.opensearch.client.opensearch.core.pit.DeletePitRequest; +import org.opensearch.client.opensearch.core.pit.DeletePitResponse; +import org.opensearch.client.transport.JsonEndpoint; +import org.opensearch.client.transport.OpenSearchTransport; +import org.opensearch.client.transport.TransportOptions; +import org.opensearch.client.transport.endpoints.BooleanResponse; +import org.opensearch.client.transport.endpoints.EndpointWithResponseMapperAttr; +import org.opensearch.client.util.ObjectBuilder; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; +import reactor.core.publisher.Mono; + +/** + * Reactive version of {@link org.opensearch.client.opensearch.OpenSearchClient}. + */ +public class ReactiveOpenSearchClient extends ApiClient + implements AutoCloseable { + + public ReactiveOpenSearchClient(OpenSearchTransport transport) { + super(transport, null); + } + + public ReactiveOpenSearchClient(OpenSearchTransport transport, @Nullable TransportOptions transportOptions) { + super(transport, transportOptions); + } + + @Override + public ReactiveOpenSearchClient withTransportOptions(@Nullable TransportOptions transportOptions) { + return new ReactiveOpenSearchClient(transport, transportOptions); + } + + @Override + public void close() throws IOException { + } + + // region child clients + + public ReactiveOpenSearchClusterClient cluster() { + return new ReactiveOpenSearchClusterClient(transport, transportOptions); + } + + public ReactiveOpenSearchIndicesClient indices() { + return new ReactiveOpenSearchIndicesClient(transport, transportOptions); + } + + // endregion + // region info + + public Mono info() { + return Mono + .fromFuture(transport.performRequestAsync(InfoRequest._INSTANCE, InfoRequest._ENDPOINT, transportOptions)); + } + + public Mono ping() { + return Mono + .fromFuture(transport.performRequestAsync(PingRequest._INSTANCE, PingRequest._ENDPOINT, transportOptions)); + } + + // endregion + // region document + + public Mono index(IndexRequest request) { + + Assert.notNull(request, "request must not be null"); + + return Mono.fromFuture(transport.performRequestAsync(request, IndexRequest._ENDPOINT, transportOptions)); + } + + public Mono index(Function, ObjectBuilder>> fn) { + + Assert.notNull(fn, "fn must not be null"); + + return index(fn.apply(new IndexRequest.Builder<>()).build()); + } + + public Mono bulk(BulkRequest request) { + + Assert.notNull(request, "request must not be null"); + + return Mono.fromFuture(transport.performRequestAsync(request, BulkRequest._ENDPOINT, transportOptions)); + } + + public Mono bulk(Function> fn) { + + Assert.notNull(fn, "fn must not be null"); + + return bulk(fn.apply(new BulkRequest.Builder()).build()); + } + + public Mono> get(GetRequest request, Class tClass) { + + Assert.notNull(request, "request must not be null"); + + // code adapted from + // org.opensearch.client.opensearch.OpenSearchClient.get(org.opensearch.client.opensearch.core.GetRequest, + // java.lang.Class) + // noinspection unchecked + JsonEndpoint, ErrorResponse> endpoint = (JsonEndpoint, ErrorResponse>) GetRequest._ENDPOINT; + endpoint = new EndpointWithResponseMapperAttr<>(endpoint, + "org.opensearch.client:Deserializer:_global.get.TDocument", + getDeserializer(tClass)); + + return Mono.fromFuture(transport.performRequestAsync(request, endpoint, transportOptions)); + } + + public Mono exists(ExistsRequest request) { + + Assert.notNull(request, "request must not be null"); + + return Mono.fromFuture(transport.performRequestAsync(request, ExistsRequest._ENDPOINT, transportOptions)); + } + + public Mono> update(UpdateRequest request, Class clazz) { + + Assert.notNull(request, "request must not be null"); + + // noinspection unchecked + JsonEndpoint, UpdateResponse, ErrorResponse> endpoint = new EndpointWithResponseMapperAttr( + UpdateRequest._ENDPOINT, "org.opensearch.client:Deserializer:_global.update.TDocument", + this.getDeserializer(clazz)); + return Mono.fromFuture(transport.performRequestAsync(request, endpoint, this.transportOptions)); + } + + public Mono> update( + Function, ObjectBuilder>> fn, Class clazz) { + + Assert.notNull(fn, "fn must not be null"); + + return update(fn.apply(new UpdateRequest.Builder<>()).build(), clazz); + } + + public Mono> get(Function> fn, Class tClass) { + Assert.notNull(fn, "fn must not be null"); + + return get(fn.apply(new GetRequest.Builder()).build(), tClass); + } + + public Mono> mget(MgetRequest request, Class clazz) { + + Assert.notNull(request, "request must not be null"); + Assert.notNull(clazz, "clazz must not be null"); + + // noinspection unchecked + JsonEndpoint, ErrorResponse> endpoint = (JsonEndpoint, ErrorResponse>) MgetRequest._ENDPOINT; + endpoint = new EndpointWithResponseMapperAttr<>(endpoint, + "org.opensearch.client:Deserializer:_global.mget.TDocument", + this.getDeserializer(clazz)); + + return Mono.fromFuture(transport.performRequestAsync(request, endpoint, transportOptions)); + } + + public Mono> mget(Function> fn, Class clazz) { + + Assert.notNull(fn, "fn must not be null"); + + return mget(fn.apply(new MgetRequest.Builder()).build(), clazz); + } + + public Mono reindex(ReindexRequest request) { + + Assert.notNull(request, "request must not be null"); + + return Mono.fromFuture(transport.performRequestAsync(request, ReindexRequest._ENDPOINT, transportOptions)); + } + + public Mono reindex(Function> fn) { + + Assert.notNull(fn, "fn must not be null"); + + return reindex(fn.apply(new ReindexRequest.Builder()).build()); + } + + public Mono delete(DeleteRequest request) { + + Assert.notNull(request, "request must not be null"); + + return Mono.fromFuture(transport.performRequestAsync(request, DeleteRequest._ENDPOINT, transportOptions)); + } + + public Mono delete(Function> fn) { + + Assert.notNull(fn, "fn must not be null"); + + return delete(fn.apply(new DeleteRequest.Builder()).build()); + } + + public Mono deleteByQuery(DeleteByQueryRequest request) { + + Assert.notNull(request, "request must not be null"); + + return Mono.fromFuture(transport.performRequestAsync(request, DeleteByQueryRequest._ENDPOINT, transportOptions)); + } + + public Mono deleteByQuery( + Function> fn) { + + Assert.notNull(fn, "fn must not be null"); + + return deleteByQuery(fn.apply(new DeleteByQueryRequest.Builder()).build()); + } + + /** + * @since 5.4 + */ + public Mono count(CountRequest request) { + + Assert.notNull(request, "request must not be null"); + + return Mono.fromFuture(transport.performRequestAsync(request, CountRequest._ENDPOINT, transportOptions)); + } + + /** + * @since 5.4 + */ + public Mono count(Function> fn) { + + Assert.notNull(fn, "fn must not be null"); + + return count(fn.apply(new CountRequest.Builder()).build()); + } + + // endregion + // region search + + public Mono> search(SearchRequest request, Class tDocumentClass) { + + Assert.notNull(request, "request must not be null"); + Assert.notNull(tDocumentClass, "tDocumentClass must not be null"); + + return Mono.fromFuture(transport.performRequestAsync(request, + SearchRequest.createSearchEndpoint(this.getDeserializer(tDocumentClass)), transportOptions)); + } + + public Mono> search(Function> fn, + Class tDocumentClass) { + + Assert.notNull(fn, "fn must not be null"); + Assert.notNull(tDocumentClass, "tDocumentClass must not be null"); + + return search(fn.apply(new SearchRequest.Builder()).build(), tDocumentClass); + } + + /** + * @since 5.1 + */ + public Mono> searchTemplate(SearchTemplateRequest request, Class tDocumentClass) { + + Assert.notNull(request, "request must not be null"); + Assert.notNull(tDocumentClass, "tDocumentClass must not be null"); + + return Mono.fromFuture(transport.performRequestAsync(request, + SearchTemplateRequest.createSearchTemplateEndpoint(this.getDeserializer(tDocumentClass)), transportOptions)); + } + + /** + * @since 5.1 + */ + public Mono> searchTemplate( + Function> fn, Class tDocumentClass) { + + Assert.notNull(fn, "fn must not be null"); + + return searchTemplate(fn.apply(new SearchTemplateRequest.Builder()).build(), tDocumentClass); + } + + public Mono> scroll(ScrollRequest request, Class tDocumentClass) { + + Assert.notNull(request, "request must not be null"); + Assert.notNull(tDocumentClass, "tDocumentClass must not be null"); + + // code adapted from + // org.opensearch.client.opensearch.OpenSearchClient.scroll(org.opensearch.client.opensearch.core.ScrollRequest, + // java.lang.Class) + // noinspection unchecked + JsonEndpoint, ErrorResponse> endpoint = (JsonEndpoint, ErrorResponse>) ScrollRequest._ENDPOINT; + endpoint = new EndpointWithResponseMapperAttr<>(endpoint, + "org.opensearch.client:Deserializer:_global.scroll.TDocument", getDeserializer(tDocumentClass)); + + return Mono.fromFuture(transport.performRequestAsync(request, endpoint, transportOptions)); + } + + public Mono> scroll(Function> fn, + Class tDocumentClass) { + + Assert.notNull(fn, "fn must not be null"); + Assert.notNull(tDocumentClass, "tDocumentClass must not be null"); + + return scroll(fn.apply(new ScrollRequest.Builder()).build(), tDocumentClass); + } + + public Mono clearScroll(ClearScrollRequest request) { + + Assert.notNull(request, "request must not be null"); + + return Mono.fromFuture(transport.performRequestAsync(request, ClearScrollRequest._ENDPOINT, transportOptions)); + } + + public Mono clearScroll( + Function> fn) { + + Assert.notNull(fn, "fn must not be null"); + + return clearScroll(fn.apply(new ClearScrollRequest.Builder()).build()); + } + + // endregion + + // region script api + /** + * @since 5.1 + */ + public Mono putScript(PutScriptRequest request) { + + Assert.notNull(request, "request must not be null"); + + return Mono.fromFuture(transport.performRequestAsync(request, PutScriptRequest._ENDPOINT, transportOptions)); + } + + /** + * @since 5.1 + */ + public Mono putScript(Function> fn) { + + Assert.notNull(fn, "fn must not be null"); + + return putScript(fn.apply(new PutScriptRequest.Builder()).build()); + } + + /** + * @since 5.1 + */ + public Mono getScript(GetScriptRequest request) { + + Assert.notNull(request, "request must not be null"); + + return Mono.fromFuture(transport.performRequestAsync(request, GetScriptRequest._ENDPOINT, transportOptions)); + } + + /** + * @since 5.1 + */ + public Mono getScript(Function> fn) { + + Assert.notNull(fn, "fn must not be null"); + + return getScript(fn.apply(new GetScriptRequest.Builder()).build()); + } + + /** + * @since 5.1 + */ + public Mono deleteScript(DeleteScriptRequest request) { + + Assert.notNull(request, "request must not be null"); + + return Mono.fromFuture(transport.performRequestAsync(request, DeleteScriptRequest._ENDPOINT, transportOptions)); + } + + /** + * @since 5.1 + */ + public Mono deleteScript( + Function> fn) { + + Assert.notNull(fn, "fn must not be null"); + + return deleteScript(fn.apply(new DeleteScriptRequest.Builder()).build()); + } + + /** + * @since 5.0 + */ + public Mono openPointInTime(CreatePitRequest request) { + + Assert.notNull(request, "request must not be null"); + + return Mono.fromFuture(transport.performRequestAsync(request, CreatePitRequest._ENDPOINT, transportOptions)); + } + + /* + * @since 5.0 + */ + public Mono closePointInTime(DeletePitRequest request) { + + Assert.notNull(request, "request must not be null"); + + return Mono.fromFuture(transport.performRequestAsync(request, DeletePitRequest._ENDPOINT, transportOptions)); + } + + /** + * @since 5.0 + */ + public Mono closePointInTime( + Function> fn) { + + Assert.notNull(fn, "fn must not be null"); + + return closePointInTime(fn.apply(new DeletePitRequest.Builder()).build()); + } + + // endregion +} diff --git a/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/ReactiveOpenSearchClusterClient.java b/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/ReactiveOpenSearchClusterClient.java new file mode 100644 index 00000000..b1f731e9 --- /dev/null +++ b/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/ReactiveOpenSearchClusterClient.java @@ -0,0 +1,96 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.data.client.osc; + +import java.util.function.Function; +import org.opensearch.client.ApiClient; +import org.opensearch.client.opensearch.cluster.DeleteComponentTemplateRequest; +import org.opensearch.client.opensearch.cluster.DeleteComponentTemplateResponse; +import org.opensearch.client.opensearch.cluster.ExistsComponentTemplateRequest; +import org.opensearch.client.opensearch.cluster.GetComponentTemplateRequest; +import org.opensearch.client.opensearch.cluster.GetComponentTemplateResponse; +import org.opensearch.client.opensearch.cluster.HealthRequest; +import org.opensearch.client.opensearch.cluster.HealthResponse; +import org.opensearch.client.opensearch.cluster.PutComponentTemplateRequest; +import org.opensearch.client.opensearch.cluster.PutComponentTemplateResponse; +import org.opensearch.client.transport.OpenSearchTransport; +import org.opensearch.client.transport.TransportOptions; +import org.opensearch.client.transport.endpoints.BooleanResponse; +import org.opensearch.client.util.ObjectBuilder; +import org.springframework.lang.Nullable; +import reactor.core.publisher.Mono; + +/** + * Reactive version of the {@link org.opensearch.client.opensearch.cluster.OpenSearchClusterClient} + */ +public class ReactiveOpenSearchClusterClient + extends ApiClient { + + public ReactiveOpenSearchClusterClient(OpenSearchTransport transport, + @Nullable TransportOptions transportOptions) { + super(transport, transportOptions); + } + + @Override + public ReactiveOpenSearchClusterClient withTransportOptions(@Nullable TransportOptions transportOptions) { + return new ReactiveOpenSearchClusterClient(transport, transportOptions); + } + + public Mono health(HealthRequest healthRequest) { + return Mono.fromFuture(transport.performRequestAsync(healthRequest, HealthRequest._ENDPOINT, transportOptions)); + } + + public Mono health(Function> fn) { + return health(fn.apply(new HealthRequest.Builder()).build()); + } + + public Mono putComponentTemplate( + PutComponentTemplateRequest putComponentTemplateRequest) { + return Mono.fromFuture(transport.performRequestAsync(putComponentTemplateRequest, + PutComponentTemplateRequest._ENDPOINT, transportOptions)); + } + + public Mono putComponentTemplate( + Function> fn) { + return putComponentTemplate(fn.apply(new PutComponentTemplateRequest.Builder()).build()); + } + + public Mono getComponentTemplate( + GetComponentTemplateRequest getComponentTemplateRequest) { + return Mono.fromFuture(transport.performRequestAsync(getComponentTemplateRequest, + GetComponentTemplateRequest._ENDPOINT, transportOptions)); + } + + public Mono getComponentTemplate( + Function> fn) { + return getComponentTemplate(fn.apply(new GetComponentTemplateRequest.Builder()).build()); + } + + public Mono existsComponentTemplate(ExistsComponentTemplateRequest existsComponentTemplateRequest) { + return Mono.fromFuture(transport.performRequestAsync(existsComponentTemplateRequest, + ExistsComponentTemplateRequest._ENDPOINT, transportOptions)); + } + + public Mono existsComponentTemplate( + Function> fn) { + return existsComponentTemplate(fn.apply(new ExistsComponentTemplateRequest.Builder()).build()); + } + + public Mono deleteComponentTemplate( + DeleteComponentTemplateRequest deleteComponentTemplateRequest) { + return Mono.fromFuture(transport.performRequestAsync(deleteComponentTemplateRequest, + DeleteComponentTemplateRequest._ENDPOINT, transportOptions)); + } + + public Mono deleteComponentTemplate( + Function> fn) { + return deleteComponentTemplate(fn.apply(new DeleteComponentTemplateRequest.Builder()).build()); + } +} diff --git a/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/ReactiveOpenSearchConfiguration.java b/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/ReactiveOpenSearchConfiguration.java new file mode 100644 index 00000000..ce853b43 --- /dev/null +++ b/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/ReactiveOpenSearchConfiguration.java @@ -0,0 +1,120 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.data.client.osc; + +import org.opensearch.client.RequestOptions; +import org.opensearch.client.RestClient; +import org.opensearch.client.json.JsonpMapper; +import org.opensearch.client.json.jackson.JacksonJsonpMapper; +import org.opensearch.client.transport.OpenSearchTransport; +import org.opensearch.client.transport.TransportOptions; +import org.opensearch.client.transport.rest_client.RestClientOptions; +import org.springframework.context.annotation.Bean; +import org.springframework.data.elasticsearch.client.ClientConfiguration; +import org.springframework.data.elasticsearch.config.ElasticsearchConfigurationSupport; +import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations; +import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; +import org.springframework.util.Assert; + +/** + * Base class for a @{@link org.springframework.context.annotation.Configuration} class to set up the OpenSearch + * connection using the {@link ReactiveOpenSearchClient}. This class exposes different parts of the setup as Spring + * beans. Deriving * classes must provide the {@link ClientConfiguration} to use. + */ +public abstract class ReactiveOpenSearchConfiguration extends ElasticsearchConfigurationSupport { + + /** + * Must be implemented by deriving classes to provide the {@link ClientConfiguration}. + * + * @return configuration, must not be {@literal null} + */ + @Bean(name = "elasticsearchClientConfiguration") + public abstract ClientConfiguration clientConfiguration(); + + /** + * Provides the underlying low level RestClient. + * + * @param clientConfiguration configuration for the client, must not be {@literal null} + * @return RestClient + */ + @Bean + public RestClient elasticsearchRestClient(ClientConfiguration clientConfiguration) { + + Assert.notNull(clientConfiguration, "clientConfiguration must not be null"); + + return OpenSearchClients.getRestClient(clientConfiguration); + } + + /** + * Provides the Elasticsearch transport to be used. The default implementation uses the {@link RestClient} bean and + * the {@link JsonpMapper} bean provided in this class. + * + * @return the {@link OpenSearchTransport} + * @since 5.2 + */ + @Bean + public OpenSearchTransport opensearchTransport(RestClient restClient, JsonpMapper jsonpMapper) { + + Assert.notNull(restClient, "restClient must not be null"); + Assert.notNull(jsonpMapper, "jsonpMapper must not be null"); + + return OpenSearchClients.getOpenSearchTransport(restClient, OpenSearchClients.REACTIVE_CLIENT, + transportOptions(), jsonpMapper); + } + + /** + * Provides the {@link ReactiveOpenSearchClient} instance used. + * + * @param transport the OpenSearchTransport to use + * @return ReactiveOpenSearchClient instance. + */ + @Bean + public ReactiveOpenSearchClient reactiveOpenSearchClient(OpenSearchTransport transport) { + + Assert.notNull(transport, "transport must not be null"); + + return OpenSearchClients.createReactive(transport); + } + + /** + * Creates {@link ReactiveElasticsearchOperations}. + * + * @return never {@literal null}. + */ + @Bean(name = { "reactiveElasticsearchOperations", "reactiveElasticsearchTemplate", "reactiveOpensearchOperations", "reactiveOpensearchTemplate" }) + public ReactiveElasticsearchOperations reactiveElasticsearchOperations(ElasticsearchConverter elasticsearchConverter, + ReactiveOpenSearchClient reactiveElasticsearchClient) { + + ReactiveOpenSearchTemplate template = new ReactiveOpenSearchTemplate(reactiveElasticsearchClient, + elasticsearchConverter); + template.setRefreshPolicy(refreshPolicy()); + + return template; + } + + /** + * Provides the JsonpMapper that is used in the {@link #opensearchTransport(RestClient, JsonpMapper)} method and + * exposes it as a bean. + * + * @return the {@link JsonpMapper} to use + * @since 5.2 + */ + @Bean + public JsonpMapper jsonpMapper() { + return new JacksonJsonpMapper(); + } + + /** + * @return the options that should be added to every request. Must not be {@literal null} + */ + public TransportOptions transportOptions() { + return new RestClientOptions(RequestOptions.DEFAULT).toBuilder().build(); + } +} diff --git a/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/ReactiveOpenSearchIndicesClient.java b/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/ReactiveOpenSearchIndicesClient.java new file mode 100644 index 00000000..00a30a6b --- /dev/null +++ b/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/ReactiveOpenSearchIndicesClient.java @@ -0,0 +1,616 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.data.client.osc; + +import java.util.function.Function; +import org.opensearch.client.ApiClient; +import org.opensearch.client.opensearch.indices.AddBlockRequest; +import org.opensearch.client.opensearch.indices.AddBlockResponse; +import org.opensearch.client.opensearch.indices.AnalyzeRequest; +import org.opensearch.client.opensearch.indices.AnalyzeResponse; +import org.opensearch.client.opensearch.indices.ClearCacheRequest; +import org.opensearch.client.opensearch.indices.ClearCacheResponse; +import org.opensearch.client.opensearch.indices.CloneIndexRequest; +import org.opensearch.client.opensearch.indices.CloneIndexResponse; +import org.opensearch.client.opensearch.indices.CloseIndexRequest; +import org.opensearch.client.opensearch.indices.CloseIndexResponse; +import org.opensearch.client.opensearch.indices.CreateDataStreamRequest; +import org.opensearch.client.opensearch.indices.CreateDataStreamResponse; +import org.opensearch.client.opensearch.indices.CreateIndexRequest; +import org.opensearch.client.opensearch.indices.CreateIndexResponse; +import org.opensearch.client.opensearch.indices.DataStreamsStatsRequest; +import org.opensearch.client.opensearch.indices.DataStreamsStatsResponse; +import org.opensearch.client.opensearch.indices.DeleteAliasRequest; +import org.opensearch.client.opensearch.indices.DeleteAliasResponse; +import org.opensearch.client.opensearch.indices.DeleteDataStreamRequest; +import org.opensearch.client.opensearch.indices.DeleteDataStreamResponse; +import org.opensearch.client.opensearch.indices.DeleteIndexRequest; +import org.opensearch.client.opensearch.indices.DeleteIndexResponse; +import org.opensearch.client.opensearch.indices.DeleteIndexTemplateRequest; +import org.opensearch.client.opensearch.indices.DeleteIndexTemplateResponse; +import org.opensearch.client.opensearch.indices.DeleteTemplateRequest; +import org.opensearch.client.opensearch.indices.DeleteTemplateResponse; +import org.opensearch.client.opensearch.indices.DiskUsageRequest; +import org.opensearch.client.opensearch.indices.DiskUsageResponse; +import org.opensearch.client.opensearch.indices.ExistsAliasRequest; +import org.opensearch.client.opensearch.indices.ExistsIndexTemplateRequest; +import org.opensearch.client.opensearch.indices.ExistsRequest; +import org.opensearch.client.opensearch.indices.ExistsTemplateRequest; +import org.opensearch.client.opensearch.indices.FlushRequest; +import org.opensearch.client.opensearch.indices.FlushResponse; +import org.opensearch.client.opensearch.indices.ForcemergeRequest; +import org.opensearch.client.opensearch.indices.ForcemergeResponse; +import org.opensearch.client.opensearch.indices.GetAliasRequest; +import org.opensearch.client.opensearch.indices.GetAliasResponse; +import org.opensearch.client.opensearch.indices.GetDataStreamRequest; +import org.opensearch.client.opensearch.indices.GetDataStreamResponse; +import org.opensearch.client.opensearch.indices.GetFieldMappingRequest; +import org.opensearch.client.opensearch.indices.GetFieldMappingResponse; +import org.opensearch.client.opensearch.indices.GetIndexRequest; +import org.opensearch.client.opensearch.indices.GetIndexResponse; +import org.opensearch.client.opensearch.indices.GetIndexTemplateRequest; +import org.opensearch.client.opensearch.indices.GetIndexTemplateResponse; +import org.opensearch.client.opensearch.indices.GetIndicesSettingsRequest; +import org.opensearch.client.opensearch.indices.GetIndicesSettingsResponse; +import org.opensearch.client.opensearch.indices.GetMappingRequest; +import org.opensearch.client.opensearch.indices.GetMappingResponse; +import org.opensearch.client.opensearch.indices.GetTemplateRequest; +import org.opensearch.client.opensearch.indices.GetTemplateResponse; +import org.opensearch.client.opensearch.indices.IndicesStatsRequest; +import org.opensearch.client.opensearch.indices.IndicesStatsResponse; +import org.opensearch.client.opensearch.indices.OpenRequest; +import org.opensearch.client.opensearch.indices.OpenResponse; +import org.opensearch.client.opensearch.indices.PutAliasRequest; +import org.opensearch.client.opensearch.indices.PutAliasResponse; +import org.opensearch.client.opensearch.indices.PutIndexTemplateRequest; +import org.opensearch.client.opensearch.indices.PutIndexTemplateResponse; +import org.opensearch.client.opensearch.indices.PutIndicesSettingsRequest; +import org.opensearch.client.opensearch.indices.PutIndicesSettingsResponse; +import org.opensearch.client.opensearch.indices.PutMappingRequest; +import org.opensearch.client.opensearch.indices.PutMappingResponse; +import org.opensearch.client.opensearch.indices.PutTemplateRequest; +import org.opensearch.client.opensearch.indices.PutTemplateResponse; +import org.opensearch.client.opensearch.indices.RecoveryRequest; +import org.opensearch.client.opensearch.indices.RecoveryResponse; +import org.opensearch.client.opensearch.indices.RefreshRequest; +import org.opensearch.client.opensearch.indices.RefreshResponse; +import org.opensearch.client.opensearch.indices.ResolveIndexRequest; +import org.opensearch.client.opensearch.indices.ResolveIndexResponse; +import org.opensearch.client.opensearch.indices.RolloverRequest; +import org.opensearch.client.opensearch.indices.RolloverResponse; +import org.opensearch.client.opensearch.indices.SegmentsRequest; +import org.opensearch.client.opensearch.indices.SegmentsResponse; +import org.opensearch.client.opensearch.indices.ShardStoresRequest; +import org.opensearch.client.opensearch.indices.ShardStoresResponse; +import org.opensearch.client.opensearch.indices.ShrinkRequest; +import org.opensearch.client.opensearch.indices.ShrinkResponse; +import org.opensearch.client.opensearch.indices.SimulateIndexTemplateRequest; +import org.opensearch.client.opensearch.indices.SimulateIndexTemplateResponse; +import org.opensearch.client.opensearch.indices.SimulateTemplateRequest; +import org.opensearch.client.opensearch.indices.SimulateTemplateResponse; +import org.opensearch.client.opensearch.indices.SplitRequest; +import org.opensearch.client.opensearch.indices.SplitResponse; +import org.opensearch.client.opensearch.indices.UpdateAliasesRequest; +import org.opensearch.client.opensearch.indices.UpdateAliasesResponse; +import org.opensearch.client.opensearch.indices.ValidateQueryRequest; +import org.opensearch.client.opensearch.indices.ValidateQueryResponse; +import org.opensearch.client.transport.OpenSearchTransport; +import org.opensearch.client.transport.TransportOptions; +import org.opensearch.client.transport.endpoints.BooleanResponse; +import org.opensearch.client.util.ObjectBuilder; +import org.springframework.lang.Nullable; +import reactor.core.publisher.Mono; + +/** + * Reactive version of the {@link org.opensearch.client.opensearch.indices.OpenSearchIndicesClient} + */ +public class ReactiveOpenSearchIndicesClient + extends ApiClient { + + public ReactiveOpenSearchIndicesClient(OpenSearchTransport transport, + @Nullable TransportOptions transportOptions) { + super(transport, transportOptions); + } + + @Override + public ReactiveOpenSearchIndicesClient withTransportOptions(@Nullable TransportOptions transportOptions) { + return new ReactiveOpenSearchIndicesClient(transport, transportOptions); + } + + public Mono addBlock(AddBlockRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, AddBlockRequest._ENDPOINT, transportOptions)); + } + + public Mono addBlock(Function> fn) { + return addBlock(fn.apply(new AddBlockRequest.Builder()).build()); + } + + public Mono analyze(AnalyzeRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, AnalyzeRequest._ENDPOINT, transportOptions)); + } + + public Mono analyze(Function> fn) { + return analyze(fn.apply(new AnalyzeRequest.Builder()).build()); + } + + public Mono analyze() { + return analyze(builder -> builder); + } + + public Mono clearCache(ClearCacheRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, ClearCacheRequest._ENDPOINT, transportOptions)); + } + + public Mono clearCache(Function> fn) { + return clearCache(fn.apply(new ClearCacheRequest.Builder()).build()); + } + + public Mono clearCache() { + return clearCache(builder -> builder); + } + + public Mono clone(CloneIndexRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, CloneIndexRequest._ENDPOINT, transportOptions)); + } + + public Mono clone(Function> fn) { + return clone(fn.apply(new CloneIndexRequest.Builder()).build()); + } + + public Mono close(CloseIndexRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, CloseIndexRequest._ENDPOINT, transportOptions)); + } + + public Mono close(Function> fn) { + return close(fn.apply(new CloseIndexRequest.Builder()).build()); + } + + public Mono create(CreateIndexRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, CreateIndexRequest._ENDPOINT, transportOptions)); + } + + public Mono create(Function> fn) { + return create(fn.apply(new CreateIndexRequest.Builder()).build()); + } + + public Mono createDataStream(CreateDataStreamRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, CreateDataStreamRequest._ENDPOINT, transportOptions)); + } + + public Mono createDataStream( + Function> fn) { + return createDataStream(fn.apply(new CreateDataStreamRequest.Builder()).build()); + } + + public Mono dataStreamsStats(DataStreamsStatsRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, DataStreamsStatsRequest._ENDPOINT, transportOptions)); + } + + public Mono dataStreamsStats( + Function> fn) { + return dataStreamsStats(fn.apply(new DataStreamsStatsRequest.Builder()).build()); + } + + public Mono dataStreamsStats() { + return dataStreamsStats(builder -> builder); + } + + public Mono delete(DeleteIndexRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, DeleteIndexRequest._ENDPOINT, transportOptions)); + } + + public Mono delete(Function> fn) { + return delete(fn.apply(new DeleteIndexRequest.Builder()).build()); + } + + public Mono deleteAlias(DeleteAliasRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, DeleteAliasRequest._ENDPOINT, transportOptions)); + } + + public Mono deleteAlias( + Function> fn) { + return deleteAlias(fn.apply(new DeleteAliasRequest.Builder()).build()); + } + + public Mono deleteDataStream(DeleteDataStreamRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, DeleteDataStreamRequest._ENDPOINT, transportOptions)); + } + + public Mono deleteDataStream( + Function> fn) { + return deleteDataStream(fn.apply(new DeleteDataStreamRequest.Builder()).build()); + } + + public Mono deleteIndexTemplate(DeleteIndexTemplateRequest request) { + return Mono + .fromFuture(transport.performRequestAsync(request, DeleteIndexTemplateRequest._ENDPOINT, transportOptions)); + } + + public Mono deleteIndexTemplate( + Function> fn) { + return deleteIndexTemplate(fn.apply(new DeleteIndexTemplateRequest.Builder()).build()); + } + + public Mono deleteTemplate(DeleteTemplateRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, DeleteTemplateRequest._ENDPOINT, transportOptions)); + } + + public Mono deleteTemplate( + Function> fn) { + return deleteTemplate(fn.apply(new DeleteTemplateRequest.Builder()).build()); + } + + public Mono diskUsage(DiskUsageRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, DiskUsageRequest._ENDPOINT, transportOptions)); + } + + public Mono diskUsage(Function> fn) { + return diskUsage(fn.apply(new DiskUsageRequest.Builder()).build()); + } + + public Mono exists(ExistsRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, ExistsRequest._ENDPOINT, transportOptions)); + } + + public Mono exists(Function> fn) { + return exists(fn.apply(new ExistsRequest.Builder()).build()); + } + + public Mono existsAlias(ExistsAliasRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, ExistsAliasRequest._ENDPOINT, transportOptions)); + } + + public Mono existsAlias(Function> fn) { + return existsAlias(fn.apply(new ExistsAliasRequest.Builder()).build()); + } + + public Mono existsIndexTemplate(ExistsIndexTemplateRequest request) { + return Mono + .fromFuture(transport.performRequestAsync(request, ExistsIndexTemplateRequest._ENDPOINT, transportOptions)); + } + + public Mono existsIndexTemplate( + Function> fn) { + return existsIndexTemplate(fn.apply(new ExistsIndexTemplateRequest.Builder()).build()); + } + + public Mono existsTemplate(ExistsTemplateRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, ExistsTemplateRequest._ENDPOINT, transportOptions)); + } + + public Mono existsTemplate( + Function> fn) { + return existsTemplate(fn.apply(new ExistsTemplateRequest.Builder()).build()); + } + + public Mono flush(FlushRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, FlushRequest._ENDPOINT, transportOptions)); + } + + public Mono flush(Function> fn) { + return flush(fn.apply(new FlushRequest.Builder()).build()); + } + + public Mono flush() { + return flush(builder -> builder); + } + + @SuppressWarnings("SpellCheckingInspection") + public Mono forcemerge(ForcemergeRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, ForcemergeRequest._ENDPOINT, transportOptions)); + } + + @SuppressWarnings("SpellCheckingInspection") + public Mono forcemerge(Function> fn) { + return forcemerge(fn.apply(new ForcemergeRequest.Builder()).build()); + } + + @SuppressWarnings("SpellCheckingInspection") + public Mono forcemerge() { + return forcemerge(builder -> builder); + } + + public Mono get(GetIndexRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, GetIndexRequest._ENDPOINT, transportOptions)); + } + + public Mono get(Function> fn) { + return get(fn.apply(new GetIndexRequest.Builder()).build()); + } + + public Mono getAlias(GetAliasRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, GetAliasRequest._ENDPOINT, transportOptions)); + } + + public Mono getAlias(Function> fn) { + return getAlias(fn.apply(new GetAliasRequest.Builder()).build()); + } + + public Mono getAlias() { + return getAlias(builder -> builder); + } + + public Mono getDataStream(GetDataStreamRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, GetDataStreamRequest._ENDPOINT, transportOptions)); + } + + public Mono getDataStream( + Function> fn) { + return getDataStream(fn.apply(new GetDataStreamRequest.Builder()).build()); + } + + public Mono getDataStream() { + return getDataStream(builder -> builder); + } + + public Mono getFieldMapping(GetFieldMappingRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, GetFieldMappingRequest._ENDPOINT, transportOptions)); + } + + public Mono getFieldMapping( + Function> fn) { + return getFieldMapping(fn.apply(new GetFieldMappingRequest.Builder()).build()); + } + + public Mono getIndexTemplate(GetIndexTemplateRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, GetIndexTemplateRequest._ENDPOINT, transportOptions)); + } + + public Mono getIndexTemplate( + Function> fn) { + return getIndexTemplate(fn.apply(new GetIndexTemplateRequest.Builder()).build()); + } + + public Mono getIndexTemplate() { + return getIndexTemplate(builder -> builder); + } + + public Mono getMapping(GetMappingRequest getMappingRequest) { + return Mono + .fromFuture(transport.performRequestAsync(getMappingRequest, GetMappingRequest._ENDPOINT, transportOptions)); + } + + public Mono getMapping(Function> fn) { + return getMapping(fn.apply(new GetMappingRequest.Builder()).build()); + } + + public Mono getMapping() { + return getMapping(builder -> builder); + } + + public Mono getSettings(GetIndicesSettingsRequest request) { + return Mono + .fromFuture(transport.performRequestAsync(request, GetIndicesSettingsRequest._ENDPOINT, transportOptions)); + } + + public Mono getSettings( + Function> fn) { + return getSettings(fn.apply(new GetIndicesSettingsRequest.Builder()).build()); + } + + public Mono getSettings() { + return getSettings(builder -> builder); + } + + public Mono getTemplate(GetTemplateRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, GetTemplateRequest._ENDPOINT, transportOptions)); + } + + public Mono getTemplate( + Function> fn) { + return getTemplate(fn.apply(new GetTemplateRequest.Builder()).build()); + } + + public Mono getTemplate() { + return getTemplate(builder -> builder); + } + + public Mono open(OpenRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, OpenRequest._ENDPOINT, transportOptions)); + } + + public Mono open(Function> fn) { + return open(fn.apply(new OpenRequest.Builder()).build()); + } + + public Mono putAlias(PutAliasRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, PutAliasRequest._ENDPOINT, transportOptions)); + } + + public Mono putAlias(Function> fn) { + return putAlias(fn.apply(new PutAliasRequest.Builder()).build()); + } + + public Mono putIndexTemplate(PutIndexTemplateRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, PutIndexTemplateRequest._ENDPOINT, transportOptions)); + } + + public Mono putIndexTemplate( + Function> fn) { + return putIndexTemplate(fn.apply(new PutIndexTemplateRequest.Builder()).build()); + } + + public Mono putMapping(PutMappingRequest putMappingRequest) { + return Mono + .fromFuture(transport.performRequestAsync(putMappingRequest, PutMappingRequest._ENDPOINT, transportOptions)); + } + + public Mono putMapping(Function> fn) { + return putMapping(fn.apply(new PutMappingRequest.Builder()).build()); + } + + public Mono putSettings(PutIndicesSettingsRequest request) { + return Mono + .fromFuture(transport.performRequestAsync(request, PutIndicesSettingsRequest._ENDPOINT, transportOptions)); + } + + public Mono putSettings( + Function> fn) { + return putSettings(fn.apply(new PutIndicesSettingsRequest.Builder()).build()); + } + + public Mono putSettings() { + return putSettings(builder -> builder); + } + + public Mono putTemplate(PutTemplateRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, PutTemplateRequest._ENDPOINT, transportOptions)); + } + + public Mono putTemplate( + Function> fn) { + return putTemplate(fn.apply(new PutTemplateRequest.Builder()).build()); + } + + public Mono recovery(RecoveryRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, RecoveryRequest._ENDPOINT, transportOptions)); + } + + public Mono recovery(Function> fn) { + return recovery(fn.apply(new RecoveryRequest.Builder()).build()); + } + + public Mono recovery() { + return recovery(builder -> builder); + } + + public Mono refresh(RefreshRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, RefreshRequest._ENDPOINT, transportOptions)); + } + + public Mono refresh(Function> fn) { + return refresh(fn.apply(new RefreshRequest.Builder()).build()); + } + + public Mono refresh() { + return refresh(builder -> builder); + } + + public Mono resolveIndex(ResolveIndexRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, ResolveIndexRequest._ENDPOINT, transportOptions)); + } + + public Mono resolveIndex( + Function> fn) { + return resolveIndex(fn.apply(new ResolveIndexRequest.Builder()).build()); + } + + public Mono rollover(RolloverRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, RolloverRequest._ENDPOINT, transportOptions)); + } + + public Mono rollover(Function> fn) { + return rollover(fn.apply(new RolloverRequest.Builder()).build()); + } + + public Mono segments(SegmentsRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, SegmentsRequest._ENDPOINT, transportOptions)); + } + + public Mono segments(Function> fn) { + return segments(fn.apply(new SegmentsRequest.Builder()).build()); + } + + public Mono segments() { + return segments(builder -> builder); + } + + public Mono shardStores(ShardStoresRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, ShardStoresRequest._ENDPOINT, transportOptions)); + } + + public Mono shardStores( + Function> fn) { + return shardStores(fn.apply(new ShardStoresRequest.Builder()).build()); + } + + public Mono shardStores() { + return shardStores(builder -> builder); + } + + public Mono shrink(ShrinkRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, ShrinkRequest._ENDPOINT, transportOptions)); + } + + public Mono shrink(Function> fn) { + return shrink(fn.apply(new ShrinkRequest.Builder()).build()); + } + + public Mono simulateIndexTemplate(SimulateIndexTemplateRequest request) { + return Mono + .fromFuture(transport.performRequestAsync(request, SimulateIndexTemplateRequest._ENDPOINT, transportOptions)); + } + + public Mono simulateIndexTemplate( + Function> fn) { + return simulateIndexTemplate(fn.apply(new SimulateIndexTemplateRequest.Builder()).build()); + } + + public Mono simulateTemplate(SimulateTemplateRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, SimulateTemplateRequest._ENDPOINT, transportOptions)); + } + + public Mono simulateTemplate( + Function> fn) { + return simulateTemplate(fn.apply(new SimulateTemplateRequest.Builder()).build()); + } + + public Mono simulateTemplate() { + return simulateTemplate(builder -> builder); + } + + public Mono split(SplitRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, SplitRequest._ENDPOINT, transportOptions)); + } + + public Mono split(Function> fn) { + return split(fn.apply(new SplitRequest.Builder()).build()); + } + + public Mono stats(IndicesStatsRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, IndicesStatsRequest._ENDPOINT, transportOptions)); + } + + public Mono stats( + Function> fn) { + return stats(fn.apply(new IndicesStatsRequest.Builder()).build()); + } + + public Mono stats() { + return stats(builder -> builder); + } + + public Mono updateAliases(UpdateAliasesRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, UpdateAliasesRequest._ENDPOINT, transportOptions)); + } + + public Mono updateAliases( + Function> fn) { + return updateAliases(fn.apply(new UpdateAliasesRequest.Builder()).build()); + } + + public Mono updateAliases() { + return updateAliases(builder -> builder); + } + + public Mono validateQuery(ValidateQueryRequest request) { + return Mono.fromFuture(transport.performRequestAsync(request, ValidateQueryRequest._ENDPOINT, transportOptions)); + } + + public Mono validateQuery( + Function> fn) { + return validateQuery(fn.apply(new ValidateQueryRequest.Builder()).build()); + } + + public Mono validateQuery() { + return validateQuery(builder -> builder); + } + +} diff --git a/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/ReactiveOpenSearchTemplate.java b/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/ReactiveOpenSearchTemplate.java new file mode 100644 index 00000000..4c9691d7 --- /dev/null +++ b/spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/ReactiveOpenSearchTemplate.java @@ -0,0 +1,674 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.data.client.osc; + +import static org.opensearch.client.util.ApiTypeHelper.*; +import static org.opensearch.data.client.osc.TypeUtils.*; + +import java.time.Duration; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.opensearch.client.json.JsonpMapper; +import org.opensearch.client.opensearch._types.Result; +import org.opensearch.client.opensearch.core.*; +import org.opensearch.client.opensearch.core.bulk.BulkResponseItem; +import org.opensearch.client.opensearch.core.pit.CreatePitResponse; +import org.opensearch.client.opensearch.core.pit.DeletePitRequest; +import org.opensearch.client.transport.Version; +import org.opensearch.client.transport.endpoints.BooleanResponse; +import org.reactivestreams.Publisher; +import org.springframework.data.domain.Sort; +import org.springframework.data.elasticsearch.BulkFailureException; +import org.springframework.data.elasticsearch.NoSuchIndexException; +import org.springframework.data.elasticsearch.UncategorizedElasticsearchException; +import org.springframework.data.elasticsearch.client.UnsupportedBackendOperation; +import org.springframework.data.elasticsearch.core.AbstractReactiveElasticsearchTemplate; +import org.springframework.data.elasticsearch.core.AggregationContainer; +import org.springframework.data.elasticsearch.core.IndexedObjectInformation; +import org.springframework.data.elasticsearch.core.MultiGetItem; +import org.springframework.data.elasticsearch.core.ReactiveIndexOperations; +import org.springframework.data.elasticsearch.core.cluster.ReactiveClusterOperations; +import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; +import org.springframework.data.elasticsearch.core.document.Document; +import org.springframework.data.elasticsearch.core.document.SearchDocument; +import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse; +import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; +import org.springframework.data.elasticsearch.core.query.*; +import org.springframework.data.elasticsearch.core.query.UpdateResponse; +import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; +import org.springframework.data.elasticsearch.core.reindex.ReindexResponse; +import org.springframework.data.elasticsearch.core.script.Script; +import org.springframework.data.elasticsearch.core.sql.SqlResponse; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; +import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; + +/** + * Implementation of {@link org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations} using the new + * OpenSearch client. + */ +public class ReactiveOpenSearchTemplate extends AbstractReactiveElasticsearchTemplate { + + private static final Log LOGGER = LogFactory.getLog(ReactiveOpenSearchTemplate.class); + + private final ReactiveOpenSearchClient client; + private final RequestConverter requestConverter; + private final ResponseConverter responseConverter; + private final JsonpMapper jsonpMapper; + private final OpenSearchExceptionTranslator exceptionTranslator; + + public ReactiveOpenSearchTemplate(ReactiveOpenSearchClient client, ElasticsearchConverter converter) { + super(converter); + + Assert.notNull(client, "client must not be null"); + + this.client = client; + this.jsonpMapper = client._transport().jsonpMapper(); + requestConverter = new RequestConverter(converter, jsonpMapper); + responseConverter = new ResponseConverter(jsonpMapper); + exceptionTranslator = new OpenSearchExceptionTranslator(jsonpMapper); + } + + // region Document operations + @Override + protected Mono> doIndex(T entity, IndexCoordinates index) { + + IndexRequest indexRequest = requestConverter.documentIndexRequest(getIndexQuery(entity), index, + getRefreshPolicy()); + return Mono.just(entity) // + .zipWith(// + Mono.from(execute(client -> client.index(indexRequest))) // + .map(indexResponse -> new IndexResponseMetaData(indexResponse.id(), // + indexResponse.index(), // + indexResponse.seqNo(), // + indexResponse.primaryTerm(), // + indexResponse.version() // + ))); + } + + @Override + public Flux saveAll(Mono> entitiesPublisher, IndexCoordinates index) { + + Assert.notNull(entitiesPublisher, "entitiesPublisher must not be null!"); + + return entitiesPublisher // + .flatMapMany(entities -> Flux.fromIterable(entities) // + .concatMap(entity -> maybeCallbackBeforeConvert(entity, index)) // + ).collectList() // + .map(Entities::new) // + .flatMapMany(entities -> { + + if (entities.isEmpty()) { + return Flux.empty(); + } + + return doBulkOperation(entities.indexQueries(), BulkOptions.defaultOptions(), index)// + .index() // + .flatMap(indexAndResponse -> { + T savedEntity = entities.entityAt(indexAndResponse.getT1()); + BulkResponseItem response = indexAndResponse.getT2(); + var updatedEntity = entityOperations.updateIndexedObject( + savedEntity, new IndexedObjectInformation( // + response.id(), // + response.index(), // + response.seqNo(), // + response.primaryTerm(), // + response.version()), + converter, + routingResolver); + return maybeCallbackAfterSave(updatedEntity, index); + }); + }); + } + + @Override + protected Mono doExists(String id, IndexCoordinates index) { + + Assert.notNull(id, "id must not be null"); + Assert.notNull(index, "index must not be null"); + + ExistsRequest existsRequest = requestConverter.documentExistsRequest(id, routingResolver.getRouting(), index); + + return Mono.from(execute( + ((ClientCallback>) client -> client.exists(existsRequest)))) + .map(BooleanResponse::value) // + .onErrorReturn(NoSuchIndexException.class, false); + } + + @Override + public Mono delete(DeleteQuery query, Class entityType, IndexCoordinates index) { + Assert.notNull(query, "query must not be null"); + + DeleteByQueryRequest request = requestConverter.documentDeleteByQueryRequest(query, routingResolver.getRouting(), + entityType, index, getRefreshPolicy()); + return Mono.from(execute(client -> client.deleteByQuery(request))).map(responseConverter::byQueryResponse); + } + + @Override + public Mono delete(Query query, Class entityType, IndexCoordinates index) { + Assert.notNull(query, "query must not be null"); + + DeleteByQueryRequest request = requestConverter.documentDeleteByQueryRequest(query, routingResolver.getRouting(), + entityType, index, getRefreshPolicy()); + + return Mono.from(execute(client -> client.deleteByQuery(request))).map(responseConverter::byQueryResponse); + } + + @Override + public Mono get(String id, Class entityType, IndexCoordinates index) { + + Assert.notNull(id, "id must not be null"); + Assert.notNull(entityType, "entityType must not be null"); + Assert.notNull(index, "index must not be null"); + + GetRequest getRequest = requestConverter.documentGetRequest(id, routingResolver.getRouting(), index); + + Mono> getResponse = Mono + .from(execute(client -> client.get(getRequest, EntityAsMap.class))); + + ReadDocumentCallback callback = new ReadDocumentCallback<>(converter, entityType, index); + return getResponse.flatMap(response -> callback.toEntity(DocumentAdapters.from(response))); + } + + @Override + public Mono reindex(ReindexRequest reindexRequest) { + + Assert.notNull(reindexRequest, "reindexRequest must not be null"); + + org.opensearch.client.opensearch.core.ReindexRequest reindexRequestES = requestConverter.reindex(reindexRequest, + true); + + return Mono.from(execute( // + client -> client.reindex(reindexRequestES))).map(responseConverter::reindexResponse); + } + + @Override + public Mono submitReindex(ReindexRequest reindexRequest) { + + Assert.notNull(reindexRequest, "reindexRequest must not be null"); + + org.opensearch.client.opensearch.core.ReindexRequest reindexRequestES = requestConverter.reindex(reindexRequest, + false); + + return Mono.from(execute( // + client -> client.reindex(reindexRequestES))) + .flatMap(response -> (response.task() == null) + ? Mono.error( + new UnsupportedBackendOperation("ElasticsearchClient did not return a task id on submit request")) + : Mono.just(response.task())); + } + + @Override + public Mono update(UpdateQuery updateQuery, IndexCoordinates index) { + + Assert.notNull(updateQuery, "UpdateQuery must not be null"); + Assert.notNull(index, "Index must not be null"); + + UpdateRequest request = requestConverter.documentUpdateRequest(updateQuery, index, getRefreshPolicy(), + routingResolver.getRouting()); + + return Mono.from(execute(client -> client.update(request, Document.class))).flatMap(response -> { + UpdateResponse.Result result = result(response.result()); + return result == null ? Mono.empty() : Mono.just(UpdateResponse.of(result)); + }); + } + + @Override + public Mono updateByQuery(UpdateQuery updateQuery, IndexCoordinates index) { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Mono bulkUpdate(List queries, BulkOptions bulkOptions, IndexCoordinates index) { + + Assert.notNull(queries, "List of UpdateQuery must not be null"); + Assert.notNull(bulkOptions, "BulkOptions must not be null"); + Assert.notNull(index, "Index must not be null"); + + return doBulkOperation(queries, bulkOptions, index).then(); + } + + private Flux doBulkOperation(List queries, BulkOptions bulkOptions, IndexCoordinates index) { + + BulkRequest bulkRequest = requestConverter.documentBulkRequest(queries, bulkOptions, index, getRefreshPolicy()); + return client.bulk(bulkRequest) + .onErrorMap(e -> new UncategorizedElasticsearchException("Error executing bulk request", e)) + .flatMap(this::checkForBulkOperationFailure) // + .flatMapMany(response -> Flux.fromIterable(response.items())); + + } + + private Mono checkForBulkOperationFailure(BulkResponse bulkResponse) { + + if (bulkResponse.errors()) { + Map failedDocuments = new HashMap<>(); + + for (BulkResponseItem item : bulkResponse.items()) { + + if (item.error() != null) { + failedDocuments.put(item.id(), new BulkFailureException.FailureDetails(item.status(), item.error().reason())); + } + } + BulkFailureException exception = new BulkFailureException( + "Bulk operation has failures. Use ElasticsearchException.getFailedDocuments() for detailed messages [" + + failedDocuments + ']', + failedDocuments); + return Mono.error(exception); + } else { + return Mono.just(bulkResponse); + } + } + + @Override + protected Mono doDeleteById(String id, @Nullable String routing, IndexCoordinates index) { + + Assert.notNull(id, "id must not be null"); + Assert.notNull(index, "index must not be null"); + + return Mono.defer(() -> { + DeleteRequest deleteRequest = requestConverter.documentDeleteRequest(id, routing, index, getRefreshPolicy()); + return doDelete(deleteRequest); + }); + } + + private Mono doDelete(DeleteRequest request) { + + return Mono.from(execute(client -> client.delete(request))) // + .flatMap(deleteResponse -> { + if (deleteResponse.result() == Result.NotFound) { + return Mono.empty(); + } + return Mono.just(deleteResponse.id()); + }).onErrorResume(NoSuchIndexException.class, it -> Mono.empty()); + } + + @Override + public Flux> multiGet(Query query, Class clazz, IndexCoordinates index) { + + Assert.notNull(query, "query must not be null"); + Assert.notNull(clazz, "clazz must not be null"); + + MgetRequest request = requestConverter.documentMgetRequest(query, clazz, index); + + ReadDocumentCallback callback = new ReadDocumentCallback<>(converter, clazz, index); + + Publisher> response = execute(client -> client.mget(request, EntityAsMap.class)); + + return Mono.from(response)// + .flatMapMany(it -> Flux.fromIterable(DocumentAdapters.from(it))) // + .flatMap(multiGetItem -> { + if (multiGetItem.isFailed()) { + return Mono.just(MultiGetItem.of(null, multiGetItem.getFailure())); + } else { + return callback.toEntity(multiGetItem.getItem()) // + .map(t -> MultiGetItem.of(t, multiGetItem.getFailure())); + } + }); + } + + // endregion + + @Override + protected ReactiveOpenSearchTemplate doCopy() { + return new ReactiveOpenSearchTemplate(client, converter); + } + + // region search operations + + @Override + protected Flux doFind(Query query, Class clazz, IndexCoordinates index) { + + Assert.notNull(query, "query must not be null"); + Assert.notNull(clazz, "clazz must not be null"); + Assert.notNull(index, "index must not be null"); + + if (query instanceof SearchTemplateQuery searchTemplateQuery) { + return Flux.defer(() -> doSearch(searchTemplateQuery, clazz, index)); + } else { + return Flux.defer(() -> { + boolean queryIsUnbounded = !(query.getPageable().isPaged() || query.isLimiting()); + return queryIsUnbounded ? doFindUnbounded(query, clazz, index) : doFindBounded(query, clazz, index); + }); + } + } + + private Flux doFindUnbounded(Query query, Class clazz, IndexCoordinates index) { + + if (query instanceof BaseQuery baseQuery) { + var pitKeepAlive = Duration.ofMinutes(5); + // setup functions for Flux.usingWhen() + Mono resourceSupplier = openPointInTime(index, pitKeepAlive, true) + .map(pit -> new PitSearchAfter(baseQuery, pit)); + + Function> asyncComplete = this::cleanupPit; + + BiFunction> asyncError = (psa, ex) -> { + if (LOGGER.isErrorEnabled()) { + LOGGER.error("Error during pit/search_after", ex); + } + return cleanupPit(psa); + }; + + Function> asyncCancel = psa -> { + if (LOGGER.isWarnEnabled()) { + LOGGER.warn("pit/search_after was cancelled"); + } + return cleanupPit(psa); + }; + + Function>> resourceClosure = psa -> { + + baseQuery.setPointInTime(new Query.PointInTime(psa.getPit(), pitKeepAlive)); + + // only add _shard_doc if there is not a field_collapse and a sort with the same name + boolean addShardDoc = true; + + if (query instanceof NativeQuery nativeQuery && nativeQuery.getFieldCollapse() != null) { + var field = nativeQuery.getFieldCollapse().field(); + + if (nativeQuery.getSortOptions().stream() + .anyMatch(sortOptions -> sortOptions.isField() && sortOptions.field().field().equals(field))) { + addShardDoc = false; + } + + if (query.getSort() != null + && query.getSort().stream().anyMatch(order -> order.getProperty().equals(field))) { + addShardDoc = false; + } + } + + if (addShardDoc) { + baseQuery.addSort(Sort.by("_shard_doc")); + } + + SearchRequest firstSearchRequest = requestConverter.searchRequest(baseQuery, routingResolver.getRouting(), + clazz, index, false, true); + + return Mono.from(execute(client -> client.search(firstSearchRequest, EntityAsMap.class))) + .expand(entityAsMapSearchResponse -> { + + var hits = entityAsMapSearchResponse.hits().hits(); + if (CollectionUtils.isEmpty(hits)) { + return Mono.empty(); + } + + List sortOptions = hits.get(hits.size() - 1).sortVals().stream().map(TypeUtils::toObject) + .collect(Collectors.toList()); + baseQuery.setSearchAfter(sortOptions); + SearchRequest followSearchRequest = requestConverter.searchRequest(baseQuery, + routingResolver.getRouting(), clazz, index, false, true); + return Mono.from(execute(client -> client.search(followSearchRequest, EntityAsMap.class))); + }); + + }; + + Flux> searchResponses = Flux.usingWhen(resourceSupplier, resourceClosure, asyncComplete, + asyncError, asyncCancel); + return searchResponses.flatMapIterable(entityAsMapSearchResponse -> entityAsMapSearchResponse.hits().hits()) + .map(entityAsMapHit -> DocumentAdapters.from(entityAsMapHit, jsonpMapper)); + } else { + return Flux.error(new IllegalArgumentException("Query must be derived from BaseQuery")); + } + } + + private Publisher cleanupPit(PitSearchAfter psa) { + var baseQuery = psa.getBaseQuery(); + baseQuery.setPointInTime(null); + baseQuery.setSearchAfter(null); + baseQuery.setSort(psa.getSort()); + var pit = psa.getPit(); + return StringUtils.hasText(pit) ? closePointInTime(pit) : Mono.empty(); + } + + static private class PitSearchAfter { + private final BaseQuery baseQuery; + @Nullable private final Sort sort; + private final String pit; + + PitSearchAfter(BaseQuery baseQuery, String pit) { + this.baseQuery = baseQuery; + this.sort = baseQuery.getSort(); + this.pit = pit; + } + + public BaseQuery getBaseQuery() { + return baseQuery; + } + + @Nullable + public Sort getSort() { + return sort; + } + + public String getPit() { + return pit; + } + } + + @Override + protected Mono doCount(Query query, Class entityType, IndexCoordinates index) { + + Assert.notNull(query, "query must not be null"); + Assert.notNull(index, "index must not be null"); + + SearchRequest searchRequest = requestConverter.searchRequest(query, routingResolver.getRouting(), entityType, index, + true); + + return Mono.from(execute(client -> client.search(searchRequest, EntityAsMap.class))) + .map(searchResponse -> searchResponse.hits().total() != null ? searchResponse.hits().total().value() : 0L); + } + + private Flux doFindBounded(Query query, Class clazz, IndexCoordinates index) { + + SearchRequest searchRequest = requestConverter.searchRequest(query, routingResolver.getRouting(), clazz, index, + false, false); + + return Mono.from(execute(client -> client.search(searchRequest, EntityAsMap.class))) // + .flatMapIterable(entityAsMapSearchResponse -> entityAsMapSearchResponse.hits().hits()) // + .map(entityAsMapHit -> DocumentAdapters.from(entityAsMapHit, jsonpMapper)); + } + + private Flux doSearch(SearchTemplateQuery query, Class clazz, IndexCoordinates index) { + + var request = requestConverter.searchTemplate(query, routingResolver.getRouting(), index); + + return Mono.from(execute(client -> client.searchTemplate(request, EntityAsMap.class))) // + .flatMapIterable(entityAsMapSearchResponse -> entityAsMapSearchResponse.hits().hits()) // + .map(entityAsMapHit -> DocumentAdapters.from(entityAsMapHit, jsonpMapper)); + } + + @Override + protected Mono doFindForResponse(Query query, Class clazz, IndexCoordinates index) { + + Assert.notNull(query, "query must not be null"); + Assert.notNull(index, "index must not be null"); + + SearchRequest searchRequest = requestConverter.searchRequest(query, routingResolver.getRouting(), clazz, index, + false); + + // noinspection unchecked + SearchDocumentCallback callback = new ReadSearchDocumentCallback<>((Class) clazz, index); + SearchDocumentResponse.EntityCreator entityCreator = searchDocument -> callback.toEntity(searchDocument) + .toFuture(); + + return Mono.from(execute(client -> client.search(searchRequest, EntityAsMap.class))) + .map(searchResponse -> SearchDocumentResponseBuilder.from(searchResponse, entityCreator, jsonpMapper)); + } + + @Override + public Flux> aggregate(Query query, Class entityType, IndexCoordinates index) { + + return doFindForResponse(query, entityType, index).flatMapMany(searchDocumentResponse -> { + OpenSearchAggregations aggregations = (OpenSearchAggregations) searchDocumentResponse.getAggregations(); + return aggregations == null ? Flux.empty() : Flux.fromIterable(aggregations.aggregations()); + }); + } + + @Override + public Mono openPointInTime(IndexCoordinates index, Duration keepAlive, Boolean ignoreUnavailable) { + + Assert.notNull(index, "index must not be null"); + Assert.notNull(keepAlive, "keepAlive must not be null"); + Assert.notNull(ignoreUnavailable, "ignoreUnavailable must not be null"); + + var request = requestConverter.searchOpenPointInTimeRequest(index, keepAlive, ignoreUnavailable); + return Mono.from(execute(client -> client.openPointInTime(request))).map(CreatePitResponse::pitId); + } + + @Override + public Mono closePointInTime(String pit) { + + Assert.notNull(pit, "pit must not be null"); + + DeletePitRequest request = requestConverter.searchClosePointInTime(pit); + return Mono.from(execute(client -> client.closePointInTime(request))).map(r -> !r.pits().isEmpty()); + } + + // endregion + + // region script operations + @Override + public Mono putScript(Script script) { + + Assert.notNull(script, "script must not be null"); + + var request = requestConverter.scriptPut(script); + return Mono.from(execute(client -> client.putScript(request))).map(PutScriptResponse::acknowledged); + } + + @Override + public Mono