Skip to content

Commit dbf7a1e

Browse files
committed
Another fix using coroutine criteria executor with Hibernate (#2786)
(cherry picked from commit d162bed)
1 parent c3f6a89 commit dbf7a1e

File tree

16 files changed

+261
-13
lines changed

16 files changed

+261
-13
lines changed

data-hibernate-jpa/src/main/java/io/micronaut/data/hibernate/operations/HibernateJpaOperations.java

+17-2
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import io.micronaut.data.runtime.operations.ExecutorAsyncOperations;
5252
import io.micronaut.data.runtime.operations.ExecutorAsyncOperationsSupportingCriteria;
5353
import io.micronaut.data.runtime.operations.ExecutorReactiveOperations;
54+
import io.micronaut.data.runtime.operations.ExecutorReactiveOperationsSupportingCriteria;
5455
import io.micronaut.transaction.TransactionOperations;
5556
import jakarta.inject.Named;
5657
import jakarta.persistence.EntityManager;
@@ -585,9 +586,9 @@ public ExecutorAsyncOperations async() {
585586
@Override
586587
public ReactiveRepositoryOperations reactive() {
587588
if (dataConversionService instanceof DataConversionService asDataConversionService) {
588-
return new ExecutorReactiveOperations(async(), asDataConversionService);
589+
return new ExecutorReactiveOperationsSupportingCriteria((ExecutorAsyncOperationsSupportingCriteria) async(), asDataConversionService);
589590
}
590-
return new ExecutorReactiveOperations(async(), null);
591+
return new ExecutorReactiveOperationsSupportingCriteria((ExecutorAsyncOperationsSupportingCriteria) async(), null);
591592
}
592593

593594
@NonNull
@@ -630,6 +631,20 @@ public <T> List<T> findAll(CriteriaQuery<T> query) {
630631
return executeRead(session -> session.createQuery(query).getResultList());
631632
}
632633

634+
@Override
635+
public <T> List<T> findAll(CriteriaQuery<T> query, int offset, int limit) {
636+
return executeRead(session -> {
637+
Query<T> sessionQuery = session.createQuery(query);
638+
if (offset != -1) {
639+
sessionQuery = sessionQuery.setFetchSize(offset);
640+
}
641+
if (limit != -1) {
642+
sessionQuery = sessionQuery.setMaxResults(limit);
643+
}
644+
return sessionQuery.getResultList();
645+
});
646+
}
647+
633648
@Override
634649
public Optional<Number> updateAll(CriteriaUpdate<Number> query) {
635650
return Optional.ofNullable(executeWrite(session -> session.createMutationQuery(query).executeUpdate()));

data-hibernate-reactive/src/main/java/io/micronaut/data/hibernate/reactive/operations/DefaultHibernateReactiveRepositoryOperations.java

+14
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,20 @@ public <T> Flux<T> findAll(CriteriaQuery<T> query) {
421421
.flatMapIterable(res -> res);
422422
}
423423

424+
@Override
425+
public <T> Flux<T> findAll(CriteriaQuery<T> query, int offset, int limit) {
426+
return withSession(session -> helper.monoFromCompletionStage(() -> {
427+
Stage.SelectionQuery<T> sessionQuery = session.createQuery(query);
428+
if (offset != -1) {
429+
sessionQuery = sessionQuery.setFirstResult(offset);
430+
}
431+
if (limit != -1) {
432+
sessionQuery = sessionQuery.setMaxResults(limit);
433+
}
434+
return sessionQuery.getResultList();
435+
})).flatMapIterable(res -> res);
436+
}
437+
424438
@Override
425439
public Mono<Number> updateAll(CriteriaUpdate<Number> query) {
426440
return withSession(session -> helper.monoFromCompletionStage(() -> session.createQuery(query).executeUpdate()).map(n -> n));

data-model/src/main/java/io/micronaut/data/operations/CriteriaRepositoryOperations.java

+11
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,17 @@ public interface CriteriaRepositoryOperations {
5858
@NonNull
5959
<T> List<T> findAll(@NonNull CriteriaQuery<T> query);
6060

61+
/**
62+
* Finds all results for the given query.
63+
* @param query The query
64+
* @param offset The offset
65+
* @param limit The limit
66+
* @param <T> The generic type
67+
* @return An iterable result
68+
*/
69+
@NonNull
70+
<T> List<T> findAll(@NonNull CriteriaQuery<T> query, int offset, int limit);
71+
6172
/**
6273
* Executes an update for the given query and parameter values. If it is possible to
6374
* return the number of objects updated, then do so.

data-model/src/main/java/io/micronaut/data/operations/async/AsyncCriteriaRepositoryOperations.java

+11
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,17 @@ default AsyncCriteriaRepositoryOperations async() {
6161
*/
6262
<T> CompletionStage<List<T>> findAll(@NonNull CriteriaQuery<T> query);
6363

64+
/**
65+
* Finds all results for the given query.
66+
*
67+
* @param query The query
68+
* @param offset The offset
69+
* @param limit The limit
70+
* @param <T> The generic type
71+
* @return An iterable result
72+
*/
73+
<T> CompletionStage<List<T>> findAll(@NonNull CriteriaQuery<T> query, int offset, int limit);
74+
6475
/**
6576
* Executes an update for the given query and parameter values. If it is possible to
6677
* return the number of objects updated, then do so.

data-model/src/main/java/io/micronaut/data/operations/reactive/BlockingReactorCriteriaRepositoryOperations.java

+9
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,15 @@ default <T> List<T> findAll(@NonNull CriteriaQuery<T> query) {
6969
.orElseGet(List::of);
7070
}
7171

72+
@Override
73+
default <T> List<T> findAll(@NonNull CriteriaQuery<T> query, int limit, int offset) {
74+
return reactive().findAll(query, limit, offset)
75+
.collectList()
76+
.contextWrite(getContextView())
77+
.blockOptional()
78+
.orElseGet(List::of);
79+
}
80+
7281
@Override
7382
default Optional<Number> updateAll(@NonNull CriteriaUpdate<Number> query) {
7483
return reactive().updateAll(query)

data-model/src/main/java/io/micronaut/data/operations/reactive/ReactiveCriteriaRepositoryOperations.java

+11
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,17 @@ default ReactiveCriteriaRepositoryOperations reactive() {
6262
@NonNull
6363
<T> Publisher<T> findAll(@NonNull CriteriaQuery<T> query);
6464

65+
/**
66+
* Finds all results for the given query.
67+
* @param query The query
68+
* @param offset The offset
69+
* @param limit The limit
70+
* @param <T> The generic type
71+
* @return All result publisher
72+
*/
73+
@NonNull
74+
<T> Publisher<T> findAll(@NonNull CriteriaQuery<T> query, int offset, int limit);
75+
6576
/**
6677
* Executes an update for the given query and parameter values. If it is possible to
6778
* return the number of objects updated, then do so.

data-model/src/main/java/io/micronaut/data/operations/reactive/ReactorCriteriaRepositoryOperations.java

+3
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ public interface ReactorCriteriaRepositoryOperations extends ReactiveCriteriaRep
3838
@Override
3939
<T> Flux<T> findAll(@NonNull CriteriaQuery<T> query);
4040

41+
@Override
42+
<T> Flux<T> findAll(@NonNull CriteriaQuery<T> query, int offset, int limit);
43+
4144
@Override
4245
Mono<Number> updateAll(@NonNull CriteriaUpdate<Number> query);
4346

data-runtime/src/main/java/io/micronaut/data/runtime/intercept/criteria/AbstractSpecificationInterceptor.java

+24-3
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,12 @@ public <E, K> PreparedQuery<E, K> decorate(PreparedQuery<E, K> preparedQuery) {
138138
protected final Iterable<?> findAll(RepositoryMethodKey methodKey, MethodInvocationContext<T, R> context, Type type) {
139139
Set<JoinPath> methodJoinPaths = getMethodJoinPaths(methodKey, context);
140140
if (criteriaRepositoryOperations != null) {
141-
return criteriaRepositoryOperations.findAll(buildQuery(context, type, methodJoinPaths));
141+
CriteriaQuery<Object> query = buildQuery(context, type, methodJoinPaths);
142+
Pageable pageable = getPageable(context);
143+
if (pageable != null) {
144+
return criteriaRepositoryOperations.findAll(query, (int) pageable.getOffset(), pageable.getSize());
145+
}
146+
return criteriaRepositoryOperations.findAll(query);
142147
}
143148
return operations.findAll(preparedQueryForCriteria(methodKey, context, type, methodJoinPaths));
144149
}
@@ -314,7 +319,7 @@ private <E> StoredQuery<E, Object> buildFind(RepositoryMethodKey methodKey,
314319
Type type,
315320
Set<JoinPath> methodJoinPaths) {
316321

317-
CriteriaQuery<Object> criteriaQuery = buildQuery(context, type, methodJoinPaths);
322+
CriteriaQuery<Object> criteriaQuery = buildInternalQuery(context, type, methodJoinPaths);
318323
QueryBuilder sqlQueryBuilder = getQueryBuilder(methodKey, context);
319324
QueryResultPersistentEntityCriteriaQuery queryModelCriteriaQuery = (QueryResultPersistentEntityCriteriaQuery) criteriaQuery;
320325
QueryModel queryModel = queryModelCriteriaQuery.getQueryModel();
@@ -331,7 +336,7 @@ private <E> StoredQuery<E, Object> buildFind(RepositoryMethodKey methodKey,
331336
criteriaQuery.getResultType(), !pageable.isUnpaged(), joinPaths);
332337
}
333338

334-
protected final <N> CriteriaQuery<N> buildQuery(MethodInvocationContext<T, R> context, Type type, Set<JoinPath> methodJoinPaths) {
339+
private <N> CriteriaQuery<N> buildInternalQuery(MethodInvocationContext<T, R> context, Type type, Set<JoinPath> methodJoinPaths) {
335340
CriteriaQueryBuilder<N> builder = getCriteriaQueryBuilder(context, methodJoinPaths);
336341
CriteriaQuery<N> criteriaQuery = builder.build(criteriaBuilder);
337342

@@ -350,6 +355,22 @@ protected final <N> CriteriaQuery<N> buildQuery(MethodInvocationContext<T, R> co
350355
return criteriaQuery;
351356
}
352357

358+
protected final <N> CriteriaQuery<N> buildQuery(MethodInvocationContext<T, R> context, Type type, Set<JoinPath> methodJoinPaths) {
359+
CriteriaQueryBuilder<N> builder = getCriteriaQueryBuilder(context, methodJoinPaths);
360+
CriteriaQuery<N> criteriaQuery = builder.build(criteriaBuilder);
361+
362+
for (Object param : context.getParameterValues()) {
363+
if (param instanceof Sort sort) {
364+
if (sort.isSorted()) {
365+
Root<?> root = criteriaQuery.getRoots().stream().findFirst().orElseThrow(() -> new IllegalStateException("The root not found!"));
366+
criteriaQuery.orderBy(getOrders(sort, root, criteriaBuilder));
367+
break;
368+
}
369+
}
370+
}
371+
return criteriaQuery;
372+
}
373+
353374
/**
354375
* Find {@link io.micronaut.data.repository.jpa.criteria.QuerySpecification} in context.
355376
*

data-runtime/src/main/java/io/micronaut/data/runtime/intercept/criteria/async/AbstractAsyncSpecificationInterceptor.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@
2222
import io.micronaut.core.type.Argument;
2323
import io.micronaut.data.exceptions.DataAccessException;
2424
import io.micronaut.data.intercept.RepositoryMethodKey;
25+
import io.micronaut.data.model.Pageable;
2526
import io.micronaut.data.model.query.JoinPath;
2627
import io.micronaut.data.operations.RepositoryOperations;
2728
import io.micronaut.data.operations.async.AsyncCapableRepository;
2829
import io.micronaut.data.operations.async.AsyncCriteriaCapableRepository;
2930
import io.micronaut.data.operations.async.AsyncCriteriaRepositoryOperations;
3031
import io.micronaut.data.operations.async.AsyncRepositoryOperations;
3132
import io.micronaut.data.runtime.intercept.criteria.AbstractSpecificationInterceptor;
33+
import jakarta.persistence.criteria.CriteriaQuery;
3234

3335
import java.util.List;
3436
import java.util.Set;
@@ -76,7 +78,12 @@ protected AbstractAsyncSpecificationInterceptor(RepositoryOperations operations)
7678
protected final CompletionStage<Iterable<Object>> findAllAsync(RepositoryMethodKey methodKey, MethodInvocationContext<T, R> context, Type type) {
7779
Set<JoinPath> methodJoinPaths = getMethodJoinPaths(methodKey, context);
7880
if (asyncCriteriaOperations != null) {
79-
return asyncCriteriaOperations.findAll(buildQuery(context, type, methodJoinPaths)).thenApply(m -> m);
81+
CriteriaQuery<Object> criteriaQuery = buildQuery(context, type, methodJoinPaths);
82+
Pageable pageable = getPageable(context);
83+
if (pageable != null) {
84+
return asyncCriteriaOperations.findAll(criteriaQuery, (int) pageable.getOffset(), pageable.getSize()).thenApply(m -> m);
85+
}
86+
return asyncCriteriaOperations.findAll(criteriaQuery).thenApply(m -> m);
8087
}
8188
return asyncOperations.findAll(preparedQueryForCriteria(methodKey, context, type, methodJoinPaths));
8289
}

data-runtime/src/main/java/io/micronaut/data/runtime/intercept/criteria/reactive/AbstractReactiveSpecificationInterceptor.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@
1919
import io.micronaut.core.annotation.NonNull;
2020
import io.micronaut.data.exceptions.DataAccessException;
2121
import io.micronaut.data.intercept.RepositoryMethodKey;
22+
import io.micronaut.data.model.Pageable;
2223
import io.micronaut.data.model.query.JoinPath;
2324
import io.micronaut.data.operations.RepositoryOperations;
2425
import io.micronaut.data.operations.reactive.ReactiveCapableRepository;
2526
import io.micronaut.data.operations.reactive.ReactiveCriteriaCapableRepository;
2627
import io.micronaut.data.operations.reactive.ReactiveCriteriaRepositoryOperations;
2728
import io.micronaut.data.operations.reactive.ReactiveRepositoryOperations;
2829
import io.micronaut.data.runtime.intercept.criteria.AbstractSpecificationInterceptor;
30+
import jakarta.persistence.criteria.CriteriaQuery;
2931
import org.reactivestreams.Publisher;
3032
import reactor.core.publisher.Mono;
3133

@@ -56,7 +58,9 @@ protected AbstractReactiveSpecificationInterceptor(RepositoryOperations operatio
5658
} else {
5759
throw new DataAccessException("Datastore of type [" + operations.getClass() + "] does not support reactive operations");
5860
}
59-
if (operations instanceof ReactiveCriteriaRepositoryOperations reactiveCriteriaRepositoryOperations) {
61+
if (reactiveOperations instanceof ReactiveCriteriaRepositoryOperations reactiveCriteriaRepositoryOperations) {
62+
reactiveCriteriaOperations = reactiveCriteriaRepositoryOperations;
63+
} else if (operations instanceof ReactiveCriteriaRepositoryOperations reactiveCriteriaRepositoryOperations) {
6064
reactiveCriteriaOperations = reactiveCriteriaRepositoryOperations;
6165
} else if (operations instanceof ReactiveCriteriaCapableRepository repository) {
6266
reactiveCriteriaOperations = repository.reactive();
@@ -69,7 +73,12 @@ protected AbstractReactiveSpecificationInterceptor(RepositoryOperations operatio
6973
protected final Publisher<Object> findAllReactive(RepositoryMethodKey methodKey, MethodInvocationContext<T, R> context, Type type) {
7074
Set<JoinPath> methodJoinPaths = getMethodJoinPaths(methodKey, context);
7175
if (reactiveCriteriaOperations != null) {
72-
return reactiveCriteriaOperations.findAll(buildQuery(context, type, methodJoinPaths));
76+
CriteriaQuery<Object> criteriaQuery = buildQuery(context, type, methodJoinPaths);
77+
Pageable pageable = getPageable(context);
78+
if (pageable != null) {
79+
return reactiveCriteriaOperations.findAll(criteriaQuery, (int) pageable.getOffset(), pageable.getSize());
80+
}
81+
return reactiveCriteriaOperations.findAll(criteriaQuery);
7382
}
7483
return reactiveOperations.findAll(preparedQueryForCriteria(methodKey, context, type, methodJoinPaths));
7584
}

data-runtime/src/main/java/io/micronaut/data/runtime/operations/ExecutorAsyncOperations.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public ExecutorAsyncOperations(@NonNull RepositoryOperations operations, @NonNul
6464
}
6565

6666
@Internal
67-
<T> CompletableFuture<T> supplyAsync(Supplier<T> supplier) {
67+
final <T> CompletableFuture<T> supplyAsync(Supplier<T> supplier) {
6868
CompletableFuture<T> cf = new CompletableFuture<>();
6969
PropagatedContext propagatedContext = PropagatedContext.getOrEmpty();
7070
CompletableFuture.supplyAsync(PropagatedContext.wrapCurrent(supplier), executor).whenComplete((value, throwable) -> {

data-runtime/src/main/java/io/micronaut/data/runtime/operations/ExecutorAsyncOperationsSupportingCriteria.java

+5
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ public <T> CompletionStage<List<T>> findAll(CriteriaQuery<T> query) {
6666
return supplyAsync(() -> criteriaRepositoryOperations.findAll(query));
6767
}
6868

69+
@Override
70+
public <T> CompletionStage<List<T>> findAll(CriteriaQuery<T> query, int offset, int limit) {
71+
return supplyAsync(() -> criteriaRepositoryOperations.findAll(query, offset, limit));
72+
}
73+
6974
@Override
7075
public CompletionStage<Number> updateAll(CriteriaUpdate<Number> query) {
7176
return supplyAsync(() -> criteriaRepositoryOperations.updateAll(query).orElse(null));

data-runtime/src/main/java/io/micronaut/data/runtime/operations/ExecutorReactiveOperations.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ public <T> Publisher<Number> deleteAll(@NonNull DeleteBatchOperation<T> operatio
190190
.map(number -> convertNumberArgumentIfNecessary(number, operation.getResultArgument()));
191191
}
192192

193-
private <R> Mono<R> fromCompletableFuture(Supplier<CompletableFuture<R>> futureSupplier) {
193+
protected final <R> Mono<R> fromCompletableFuture(Supplier<CompletableFuture<R>> futureSupplier) {
194194
return Mono.fromCompletionStage(PropagatedContext.wrapCurrent(futureSupplier));
195195
}
196196

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright 2017-2024 original authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.micronaut.data.runtime.operations;
17+
18+
import io.micronaut.core.annotation.Experimental;
19+
import io.micronaut.data.operations.reactive.ReactiveCriteriaRepositoryOperations;
20+
import io.micronaut.data.runtime.convert.DataConversionService;
21+
import jakarta.persistence.criteria.CriteriaBuilder;
22+
import jakarta.persistence.criteria.CriteriaDelete;
23+
import jakarta.persistence.criteria.CriteriaQuery;
24+
import jakarta.persistence.criteria.CriteriaUpdate;
25+
import org.reactivestreams.Publisher;
26+
27+
/**
28+
* A variation of {@link ExecutorReactiveOperations} that supports {@link ReactiveCriteriaRepositoryOperations}.
29+
* @author Denis Stepanov
30+
*/
31+
@Experimental
32+
public class ExecutorReactiveOperationsSupportingCriteria extends ExecutorReactiveOperations implements ReactiveCriteriaRepositoryOperations {
33+
34+
private final ExecutorAsyncOperationsSupportingCriteria asyncOperations;
35+
36+
public ExecutorReactiveOperationsSupportingCriteria(ExecutorAsyncOperationsSupportingCriteria asyncOperations,
37+
DataConversionService dataConversionService) {
38+
super(asyncOperations, dataConversionService);
39+
this.asyncOperations = asyncOperations;
40+
}
41+
42+
@Override
43+
public CriteriaBuilder getCriteriaBuilder() {
44+
return asyncOperations.getCriteriaBuilder();
45+
}
46+
47+
@Override
48+
public <R> Publisher<R> findOne(CriteriaQuery<R> query) {
49+
return fromCompletableFuture(() -> asyncOperations.findOne(query).toCompletableFuture());
50+
}
51+
52+
@Override
53+
public <T> Publisher<T> findAll(CriteriaQuery<T> query) {
54+
return fromCompletableFuture(() -> asyncOperations.findAll(query).toCompletableFuture()).flatMapIterable(list -> list);
55+
}
56+
57+
@Override
58+
public <T> Publisher<T> findAll(CriteriaQuery<T> query, int offset, int limit) {
59+
return fromCompletableFuture(() -> asyncOperations.findAll(query, offset, limit).toCompletableFuture()).flatMapIterable(list -> list);
60+
}
61+
62+
@Override
63+
public Publisher<Number> updateAll(CriteriaUpdate<Number> query) {
64+
return fromCompletableFuture(() -> asyncOperations.updateAll(query).toCompletableFuture());
65+
}
66+
67+
@Override
68+
public Publisher<Number> deleteAll(CriteriaDelete<Number> query) {
69+
return fromCompletableFuture(() -> asyncOperations.deleteAll(query).toCompletableFuture());
70+
}
71+
}

doc-examples/hibernate-example-kotlin/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ dependencies {
2121
implementation mnKotlin.micronaut.kotlin.runtime
2222
implementation mnValidation.micronaut.validation
2323
implementation libs.kotlin.coroutines
24+
implementation libs.kotlin.coroutines.reactive
2425

2526
kapt mnValidation.micronaut.validation
2627
kapt projects.micronautDataProcessor

0 commit comments

Comments
 (0)