Skip to content

Commit a9b8daf

Browse files
committed
[FEATURE] Reactive version of the clients
Signed-off-by: Andriy Redko <[email protected]>
1 parent fc9fd94 commit a9b8daf

32 files changed

+6906
-2
lines changed

settings.gradle.kts

+1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ dependencyResolutionManagement {
3030
library("boot-test", "org.springframework.boot", "spring-boot-test").versionRef("spring-boot")
3131
library("boot-test-autoconfigure", "org.springframework.boot", "spring-boot-test-autoconfigure").versionRef("spring-boot")
3232
library("boot-testcontainers", "org.springframework.boot", "spring-boot-testcontainers").versionRef("spring-boot")
33+
library("projectreactor", "io.projectreactor:reactor-test:3.7.5")
3334
plugin("spring-boot", "org.springframework.boot").versionRef("spring-boot")
3435
}
3536

spring-data-opensearch/build.gradle.kts

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ dependencies {
3838
compileOnly(springLibs.web)
3939
compileOnly(opensearchLibs.java.client)
4040

41+
testImplementation(springLibs.projectreactor)
4142
testImplementation(opensearchLibs.java.client)
4243
testImplementation("jakarta.enterprise:jakarta.enterprise.cdi-api:3.0.0")
4344
testImplementation("org.slf4j:log4j-over-slf4j:2.0.17")

spring-data-opensearch/src/main/java/org/opensearch/data/client/osc/OpenSearchClients.java

+88
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
@SuppressWarnings("unused")
6363
public final class OpenSearchClients {
6464
public static final String IMPERATIVE_CLIENT = "imperative";
65+
public static final String REACTIVE_CLIENT = "reactive";
6566

6667
/**
6768
* Name of whose value can be used to correlate log messages for this request.
@@ -282,6 +283,93 @@ public static OpenSearchTransport getOpenSearchTransport(RestClient restClient,
282283
}
283284
// endregion
284285

286+
// region reactive client
287+
/**
288+
* Creates a new {@link ReactiveOpenSearchClient}
289+
*
290+
* @param clientConfiguration configuration options, must not be {@literal null}.
291+
* @return the {@link ReactiveOpenSearchClient}
292+
*/
293+
public static ReactiveOpenSearchClient createReactive(ClientConfiguration clientConfiguration) {
294+
295+
Assert.notNull(clientConfiguration, "clientConfiguration must not be null");
296+
297+
return createReactive(getRestClient(clientConfiguration), null, DEFAULT_JSONP_MAPPER);
298+
}
299+
300+
/**
301+
* Creates a new {@link ReactiveOpenSearchClient}
302+
*
303+
* @param clientConfiguration configuration options, must not be {@literal null}.
304+
* @param transportOptions options to be added to each request.
305+
* @return the {@link ReactiveOpenSearchClient}
306+
*/
307+
public static ReactiveOpenSearchClient createReactive(ClientConfiguration clientConfiguration,
308+
@Nullable TransportOptions transportOptions) {
309+
310+
Assert.notNull(clientConfiguration, "ClientConfiguration must not be null!");
311+
312+
return createReactive(getRestClient(clientConfiguration), transportOptions, DEFAULT_JSONP_MAPPER);
313+
}
314+
315+
/**
316+
* Creates a new {@link ReactiveOpenSearchClient}
317+
*
318+
* @param clientConfiguration configuration options, must not be {@literal null}.
319+
* @param transportOptions options to be added to each request.
320+
* @param jsonpMapper the JsonpMapper to use
321+
* @return the {@link ReactiveOpenSearchClient}
322+
*/
323+
public static ReactiveOpenSearchClient createReactive(ClientConfiguration clientConfiguration,
324+
@Nullable TransportOptions transportOptions, JsonpMapper jsonpMapper) {
325+
326+
Assert.notNull(clientConfiguration, "ClientConfiguration must not be null!");
327+
Assert.notNull(jsonpMapper, "jsonpMapper must not be null");
328+
329+
return createReactive(getRestClient(clientConfiguration), transportOptions, jsonpMapper);
330+
}
331+
332+
/**
333+
* Creates a new {@link ReactiveOpenSearchClient}.
334+
*
335+
* @param restClient the underlying {@link RestClient}
336+
* @return the {@link ReactiveOpenSearchClient}
337+
*/
338+
public static ReactiveOpenSearchClient createReactive(RestClient restClient) {
339+
return createReactive(restClient, null, DEFAULT_JSONP_MAPPER);
340+
}
341+
342+
/**
343+
* Creates a new {@link ReactiveOpenSearchClient}.
344+
*
345+
* @param restClient the underlying {@link RestClient}
346+
* @param transportOptions options to be added to each request.
347+
* @return the {@link ReactiveOpenSearchClient}
348+
*/
349+
public static ReactiveOpenSearchClient createReactive(RestClient restClient,
350+
@Nullable TransportOptions transportOptions, JsonpMapper jsonpMapper) {
351+
352+
Assert.notNull(restClient, "restClient must not be null");
353+
354+
var transport = getOpenSearchTransport(restClient, REACTIVE_CLIENT, transportOptions, jsonpMapper);
355+
return createReactive(transport);
356+
}
357+
358+
/**
359+
* Creates a new {@link ReactiveOpenSearchClient} that uses the given {@link OpenSearchTransport}.
360+
*
361+
* @param transport the transport to use
362+
* @return the {@link ReactiveOpenSearchClient}
363+
*/
364+
public static ReactiveOpenSearchClient createReactive(OpenSearchTransport transport) {
365+
366+
Assert.notNull(transport, "transport must not be null");
367+
368+
return new ReactiveOpenSearchClient(transport);
369+
}
370+
// endregion
371+
372+
285373
private static List<String> formattedHosts(List<InetSocketAddress> hosts, boolean useSsl) {
286374
return hosts.stream().map(it -> (useSsl ? "https" : "http") + "://" + it.getHostString() + ':' + it.getPort())
287375
.collect(Collectors.toList());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.data.client.osc;
11+
12+
import org.opensearch.client.ApiClient;
13+
import org.opensearch.client.json.JsonpMapper;
14+
import org.opensearch.client.transport.Transport;
15+
import org.reactivestreams.Publisher;
16+
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
17+
import org.springframework.util.Assert;
18+
import reactor.core.publisher.Flux;
19+
20+
/**
21+
* base class for a reactive template that uses on of the {@link ReactiveOpenSearchClient}'s child clients.
22+
*/
23+
public class ReactiveChildTemplate<T extends Transport, CLIENT extends ApiClient<T, CLIENT>> {
24+
protected final CLIENT client;
25+
protected final ElasticsearchConverter elasticsearchConverter;
26+
protected final RequestConverter requestConverter;
27+
protected final ResponseConverter responseConverter;
28+
protected final OpenSearchExceptionTranslator exceptionTranslator;
29+
30+
public ReactiveChildTemplate(CLIENT client, ElasticsearchConverter elasticsearchConverter) {
31+
this.client = client;
32+
this.elasticsearchConverter = elasticsearchConverter;
33+
JsonpMapper jsonpMapper = client._transport().jsonpMapper();
34+
requestConverter = new RequestConverter(elasticsearchConverter, jsonpMapper);
35+
responseConverter = new ResponseConverter(jsonpMapper);
36+
exceptionTranslator = new OpenSearchExceptionTranslator(jsonpMapper);
37+
}
38+
39+
/**
40+
* Callback interface to be used with {@link #execute(ClientCallback)} for operating directly on the client.
41+
*/
42+
@FunctionalInterface
43+
public interface ClientCallback<CLIENT, RESULT extends Publisher<?>> {
44+
RESULT doWithClient(CLIENT client);
45+
}
46+
47+
/**
48+
* Execute a callback with the client and provide exception translation.
49+
*
50+
* @param callback the callback to execute, must not be {@literal null}
51+
* @param <RESULT> the type returned from the callback
52+
* @return the callback result
53+
*/
54+
public <RESULT> Publisher<RESULT> execute(ClientCallback<CLIENT, Publisher<RESULT>> callback) {
55+
56+
Assert.notNull(callback, "callback must not be null");
57+
58+
return Flux.defer(() -> callback.doWithClient(client)).onErrorMap(exceptionTranslator::translateException);
59+
}
60+
61+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.data.client.osc;
11+
12+
import org.opensearch.client.opensearch.cluster.HealthRequest;
13+
import org.opensearch.client.opensearch.cluster.HealthResponse;
14+
import org.opensearch.client.transport.OpenSearchTransport;
15+
import org.springframework.data.elasticsearch.core.cluster.ClusterHealth;
16+
import org.springframework.data.elasticsearch.core.cluster.ReactiveClusterOperations;
17+
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
18+
import reactor.core.publisher.Mono;
19+
20+
/**
21+
* Reactive cluster template
22+
*/
23+
public class ReactiveClusterTemplate
24+
extends ReactiveChildTemplate<OpenSearchTransport, ReactiveOpenSearchClusterClient>
25+
implements ReactiveClusterOperations {
26+
27+
public ReactiveClusterTemplate(ReactiveOpenSearchClusterClient client,
28+
ElasticsearchConverter elasticsearchConverter) {
29+
super(client, elasticsearchConverter);
30+
}
31+
32+
@Override
33+
public Mono<ClusterHealth> health() {
34+
35+
HealthRequest healthRequest = requestConverter.clusterHealthRequest();
36+
Mono<HealthResponse> healthResponse = Mono.from(execute(client -> client.health(healthRequest)));
37+
return healthResponse.map(responseConverter::clusterHealth);
38+
}
39+
40+
}

0 commit comments

Comments
 (0)