Skip to content

[FEATURE] Reactive version of the clients #447

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencyResolutionManagement {
library("boot-test", "org.springframework.boot", "spring-boot-test").versionRef("spring-boot")
library("boot-test-autoconfigure", "org.springframework.boot", "spring-boot-test-autoconfigure").versionRef("spring-boot")
library("boot-testcontainers", "org.springframework.boot", "spring-boot-testcontainers").versionRef("spring-boot")
library("projectreactor", "io.projectreactor:reactor-test:3.7.5")
plugin("spring-boot", "org.springframework.boot").versionRef("spring-boot")
}

Expand Down
1 change: 1 addition & 0 deletions spring-data-opensearch/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dependencies {
compileOnly(springLibs.web)
compileOnly(opensearchLibs.java.client)

testImplementation(springLibs.projectreactor)
testImplementation(opensearchLibs.java.client)
testImplementation("jakarta.enterprise:jakarta.enterprise.cdi-api:3.0.0")
testImplementation("org.slf4j:log4j-over-slf4j:2.0.17")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
@SuppressWarnings("unused")
public final class OpenSearchClients {
public static final String IMPERATIVE_CLIENT = "imperative";
public static final String REACTIVE_CLIENT = "reactive";

/**
* Name of whose value can be used to correlate log messages for this request.
Expand Down Expand Up @@ -282,6 +283,93 @@ public static OpenSearchTransport getOpenSearchTransport(RestClient restClient,
}
// endregion

// region reactive client
/**
* Creates a new {@link ReactiveOpenSearchClient}
*
* @param clientConfiguration configuration options, must not be {@literal null}.
* @return the {@link ReactiveOpenSearchClient}
*/
public static ReactiveOpenSearchClient createReactive(ClientConfiguration clientConfiguration) {

Assert.notNull(clientConfiguration, "clientConfiguration must not be null");

return createReactive(getRestClient(clientConfiguration), null, DEFAULT_JSONP_MAPPER);
}

/**
* Creates a new {@link ReactiveOpenSearchClient}
*
* @param clientConfiguration configuration options, must not be {@literal null}.
* @param transportOptions options to be added to each request.
* @return the {@link ReactiveOpenSearchClient}
*/
public static ReactiveOpenSearchClient createReactive(ClientConfiguration clientConfiguration,
@Nullable TransportOptions transportOptions) {

Assert.notNull(clientConfiguration, "ClientConfiguration must not be null!");

return createReactive(getRestClient(clientConfiguration), transportOptions, DEFAULT_JSONP_MAPPER);
}

/**
* Creates a new {@link ReactiveOpenSearchClient}
*
* @param clientConfiguration configuration options, must not be {@literal null}.
* @param transportOptions options to be added to each request.
* @param jsonpMapper the JsonpMapper to use
* @return the {@link ReactiveOpenSearchClient}
*/
public static ReactiveOpenSearchClient createReactive(ClientConfiguration clientConfiguration,
@Nullable TransportOptions transportOptions, JsonpMapper jsonpMapper) {

Assert.notNull(clientConfiguration, "ClientConfiguration must not be null!");
Assert.notNull(jsonpMapper, "jsonpMapper must not be null");

return createReactive(getRestClient(clientConfiguration), transportOptions, jsonpMapper);
}

/**
* Creates a new {@link ReactiveOpenSearchClient}.
*
* @param restClient the underlying {@link RestClient}
* @return the {@link ReactiveOpenSearchClient}
*/
public static ReactiveOpenSearchClient createReactive(RestClient restClient) {
return createReactive(restClient, null, DEFAULT_JSONP_MAPPER);
}

/**
* Creates a new {@link ReactiveOpenSearchClient}.
*
* @param restClient the underlying {@link RestClient}
* @param transportOptions options to be added to each request.
* @return the {@link ReactiveOpenSearchClient}
*/
public static ReactiveOpenSearchClient createReactive(RestClient restClient,
@Nullable TransportOptions transportOptions, JsonpMapper jsonpMapper) {

Assert.notNull(restClient, "restClient must not be null");

var transport = getOpenSearchTransport(restClient, REACTIVE_CLIENT, transportOptions, jsonpMapper);
return createReactive(transport);
}

/**
* Creates a new {@link ReactiveOpenSearchClient} that uses the given {@link OpenSearchTransport}.
*
* @param transport the transport to use
* @return the {@link ReactiveOpenSearchClient}
*/
public static ReactiveOpenSearchClient createReactive(OpenSearchTransport transport) {

Assert.notNull(transport, "transport must not be null");

return new ReactiveOpenSearchClient(transport);
}
// endregion


private static List<String> formattedHosts(List<InetSocketAddress> hosts, boolean useSsl) {
return hosts.stream().map(it -> (useSsl ? "https" : "http") + "://" + it.getHostString() + ':' + it.getPort())
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.data.client.osc;

import org.opensearch.client.ApiClient;
import org.opensearch.client.json.JsonpMapper;
import org.opensearch.client.transport.Transport;
import org.reactivestreams.Publisher;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;

/**
* base class for a reactive template that uses on of the {@link ReactiveOpenSearchClient}'s child clients.
*/
public class ReactiveChildTemplate<T extends Transport, CLIENT extends ApiClient<T, CLIENT>> {
protected final CLIENT client;
protected final ElasticsearchConverter elasticsearchConverter;
protected final RequestConverter requestConverter;
protected final ResponseConverter responseConverter;
protected final OpenSearchExceptionTranslator exceptionTranslator;

public ReactiveChildTemplate(CLIENT client, ElasticsearchConverter elasticsearchConverter) {
this.client = client;
this.elasticsearchConverter = elasticsearchConverter;
JsonpMapper jsonpMapper = client._transport().jsonpMapper();
requestConverter = new RequestConverter(elasticsearchConverter, jsonpMapper);
responseConverter = new ResponseConverter(jsonpMapper);
exceptionTranslator = new OpenSearchExceptionTranslator(jsonpMapper);
}

/**
* Callback interface to be used with {@link #execute(ClientCallback)} for operating directly on the client.
*/
@FunctionalInterface
public interface ClientCallback<CLIENT, RESULT extends Publisher<?>> {
RESULT doWithClient(CLIENT client);
}

/**
* Execute a callback with the client and provide exception translation.
*
* @param callback the callback to execute, must not be {@literal null}
* @param <RESULT> the type returned from the callback
* @return the callback result
*/
public <RESULT> Publisher<RESULT> execute(ClientCallback<CLIENT, Publisher<RESULT>> callback) {

Assert.notNull(callback, "callback must not be null");

return Flux.defer(() -> callback.doWithClient(client)).onErrorMap(exceptionTranslator::translateException);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.data.client.osc;

import org.opensearch.client.opensearch.cluster.HealthRequest;
import org.opensearch.client.opensearch.cluster.HealthResponse;
import org.opensearch.client.transport.OpenSearchTransport;
import org.springframework.data.elasticsearch.core.cluster.ClusterHealth;
import org.springframework.data.elasticsearch.core.cluster.ReactiveClusterOperations;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import reactor.core.publisher.Mono;

/**
* Reactive cluster template
*/
public class ReactiveClusterTemplate
extends ReactiveChildTemplate<OpenSearchTransport, ReactiveOpenSearchClusterClient>
implements ReactiveClusterOperations {

public ReactiveClusterTemplate(ReactiveOpenSearchClusterClient client,
ElasticsearchConverter elasticsearchConverter) {
super(client, elasticsearchConverter);
}

@Override
public Mono<ClusterHealth> health() {

HealthRequest healthRequest = requestConverter.clusterHealthRequest();
Mono<HealthResponse> healthResponse = Mono.from(execute(client -> client.health(healthRequest)));
return healthResponse.map(responseConverter::clusterHealth);
}

}
Loading
Loading