Skip to content

Commit 5fca46d

Browse files
committed
[#2873] Execute a reactive flush before a native query execution
Sometimes, Hibernate ORM needs to flush the session before executing a native query. This commit makes sure that a reactive operation is executed instead of the blocking one from Hiberante ORM.
1 parent aa10ce5 commit 5fca46d

File tree

5 files changed

+82
-12
lines changed

5 files changed

+82
-12
lines changed

hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/spi/ReactiveAbstractSelectionQuery.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
* Emulate {@link org.hibernate.query.spi.AbstractSelectionQuery}.
5151
* <p>
5252
* Hibernate Reactive implementations already extend another class,
53-
* they cannot extends {@link org.hibernate.query.spi.AbstractSelectionQuery too}.
53+
* they cannot extend {@link org.hibernate.query.spi.AbstractSelectionQuery too}.
5454
* This approach allows us to avoid duplicating code.
5555
* </p>
5656
* @param <R>
@@ -74,7 +74,7 @@ public class ReactiveAbstractSelectionQuery<R> {
7474

7575
private Set<String> fetchProfiles;
7676

77-
private final Runnable beforeQuery;
77+
private final Supplier<CompletionStage<Void>> beforeQuery;
7878

7979
private final Consumer<Boolean> afterQuery;
8080
private final Function<List<R>, R> uniqueElement;
@@ -93,7 +93,7 @@ public ReactiveAbstractSelectionQuery(
9393
Supplier<DomainParameterXref> getDomainParameterXref,
9494
Supplier<Class<?>> getResultType,
9595
Supplier<String> getQueryString,
96-
Runnable beforeQuery,
96+
Supplier<CompletionStage<Void>> beforeQuery,
9797
Consumer<Boolean> afterQuery,
9898
Function<List<R>, R> uniqueElement) {
9999
this(
@@ -121,7 +121,7 @@ public ReactiveAbstractSelectionQuery(
121121
Supplier<DomainParameterXref> getDomainParameterXref,
122122
Supplier<Class<?>> getResultType,
123123
Supplier<String> getQueryString,
124-
Runnable beforeQuery,
124+
Supplier<CompletionStage<Void>> beforeQuery,
125125
Consumer<Boolean> afterQuery,
126126
Function<List<R>, R> uniqueElement,
127127
InterpretationsKeySource interpretationsKeySource) {
@@ -200,8 +200,8 @@ private LockOptions getLockOptions() {
200200

201201
public CompletionStage<List<R>> reactiveList() {
202202
final Set<String> profiles = applyProfiles();
203-
beforeQuery.run();
204-
return doReactiveList()
203+
return beforeQuery.get()
204+
.thenCompose( v -> doReactiveList() )
205205
.handle( (list, error) -> {
206206
handleException( error );
207207
return list;

hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/sql/internal/ReactiveNativeQueryImpl.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.hibernate.reactive.query.sql.spi.ReactiveNativeQueryImplementor;
4343
import org.hibernate.reactive.query.sql.spi.ReactiveNonSelectQueryPlan;
4444
import org.hibernate.reactive.query.sqm.spi.ReactiveSelectQueryPlan;
45+
import org.hibernate.reactive.session.ReactiveSession;
4546
import org.hibernate.sql.exec.spi.Callback;
4647
import org.hibernate.type.BasicTypeReference;
4748

@@ -52,8 +53,11 @@
5253
import jakarta.persistence.LockModeType;
5354
import jakarta.persistence.Parameter;
5455
import jakarta.persistence.TemporalType;
55-
import jakarta.persistence.metamodel.Type;
5656
import jakarta.persistence.metamodel.SingularAttribute;
57+
import jakarta.persistence.metamodel.Type;
58+
59+
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;
60+
5761

5862
public class ReactiveNativeQueryImpl<R> extends NativeQueryImpl<R>
5963
implements ReactiveNativeQueryImplementor<R> {
@@ -123,13 +127,46 @@ private ReactiveAbstractSelectionQuery<R> createSelectionQueryDelegate(SharedSes
123127
this::getNull,
124128
this::getNull,
125129
this::getQueryString,
126-
this::beforeQuery,
130+
this::reactiveBeforeQuery,
127131
this::afterQuery,
128132
AbstractSelectionQuery::uniqueElement,
129133
null
130134
);
131135
}
132136

137+
protected CompletionStage<Void> reactiveBeforeQuery() {
138+
getQueryParameterBindings().validate();
139+
140+
final var session = getSession();
141+
session.prepareForQueryExecution( requiresTxn( getQueryOptions().getLockOptions().getLockMode() ) );
142+
return reactivePrepareForExecution()
143+
.thenAccept( v -> {
144+
prepareSessionFlushMode( session );
145+
prepareSessionCacheMode( session );
146+
} );
147+
}
148+
149+
protected CompletionStage<Void> reactivePrepareForExecution() {
150+
final var spaces = getSynchronizedQuerySpaces();
151+
if ( spaces == null || spaces.isEmpty() ) {
152+
// We need to flush. The query itself is not required to execute in a
153+
// transaction; if there is no transaction, the flush would throw a
154+
// TransactionRequiredException which would potentially break existing
155+
// apps, so we only do the flush if a transaction is in progress.
156+
if ( shouldFlush() ) {
157+
return ( (ReactiveSession) getSession() )
158+
.reactiveFlush()
159+
.thenAccept( v -> resetCallback() );
160+
}
161+
// Reset the callback before every execution
162+
resetCallback();
163+
}
164+
// Otherwise, the application specified query spaces via the Hibernate
165+
// SynchronizeableQuery and so the query will already perform a partial
166+
// flush according to the defined query spaces - no need for a full flush.
167+
return voidFuture();
168+
}
169+
133170
private CompletionStage<List<R>> doReactiveList() {
134171
return reactiveSelectPlan().reactivePerformList( this );
135172
}

hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/sqm/internal/ReactiveSqmQueryImpl.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@
7272
import jakarta.persistence.TemporalType;
7373
import jakarta.persistence.metamodel.Type;
7474

75+
import static org.hibernate.reactive.util.impl.CompletionStages.failedFuture;
76+
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;
77+
7578
/**
7679
* A reactive {@link SqmQueryImpl}
7780
*/
@@ -124,12 +127,22 @@ private ReactiveAbstractSelectionQuery<R> createSelectionQueryDelegate(SharedSes
124127
this::getDomainParameterXref,
125128
this::getResultType,
126129
this::getQueryString,
127-
this::beforeQuery,
130+
this::reactiveBeforeQuery,
128131
this::afterQuery,
129132
AbstractSelectionQuery::uniqueElement
130133
);
131134
}
132135

136+
private CompletionStage<Void> reactiveBeforeQuery() {
137+
try {
138+
beforeQuery();
139+
return voidFuture();
140+
}
141+
catch (Throwable e) {
142+
return failedFuture( e );
143+
}
144+
}
145+
133146
@Override
134147
public CompletionStage<R> reactiveUnique() {
135148
return selectionQueryDelegate.reactiveUnique();

hibernate-reactive-core/src/main/java/org/hibernate/reactive/query/sqm/internal/ReactiveSqmSelectionQueryImpl.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747
import java.util.stream.Stream;
4848

4949
import static org.hibernate.query.spi.SqlOmittingQueryOptions.omitSqlQueryOptions;
50+
import static org.hibernate.reactive.util.impl.CompletionStages.failedFuture;
51+
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;
5052

5153
/**
5254
* A reactive {@link SqmSelectionQueryImpl}
@@ -83,15 +85,24 @@ private ReactiveAbstractSelectionQuery<R> createSelectionQueryDelegate(SharedSes
8385
this::getDomainParameterXref,
8486
this::getResultType,
8587
this::getQueryString,
86-
this::beforeQuery,
88+
this::reactiveBeforeQuery,
8789
this::afterQuery,
8890
AbstractSelectionQuery::uniqueElement
8991
);
9092
}
9193

94+
private CompletionStage<Void> reactiveBeforeQuery() {
95+
try {
96+
beforeQuery();
97+
return voidFuture();
98+
}
99+
catch (Throwable e) {
100+
return failedFuture( e );
101+
}
102+
}
103+
92104
private CompletionStage<List<R>> doReactiveList() {
93-
getSession().prepareForQueryExecution( requiresTxn( getQueryOptions().getLockOptions()
94-
.findGreatestLockMode() ) );
105+
getSession().prepareForQueryExecution( requiresTxn( getQueryOptions().getLockOptions().findGreatestLockMode() ) );
95106

96107
final SqmSelectStatement<?> sqmStatement = getSqmStatement();
97108
final boolean containsCollectionFetches = sqmStatement.containsCollectionFetches();

hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveStatelessSessionImpl.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,11 @@ public class ReactiveStatelessSessionImpl extends StatelessSessionImpl implement
132132
private final ReactiveConnection reactiveConnection;
133133
private final ReactiveStatelessSessionImpl batchingHelperSession;
134134
private final PersistenceContext persistenceContext;
135+
private final boolean connectionProvided;
135136

136137
public ReactiveStatelessSessionImpl(SessionFactoryImpl factory, SessionCreationOptions options, ReactiveConnection connection) {
137138
super( factory, options );
139+
connectionProvided = options.getConnection() != null;
138140
reactiveConnection = connection;
139141
persistenceContext = new ReactivePersistenceContextAdapter( super.getPersistenceContext() );
140142
batchingHelperSession = new ReactiveStatelessSessionImpl( factory, options, reactiveConnection, persistenceContext );
@@ -150,6 +152,7 @@ private ReactiveStatelessSessionImpl(
150152
ReactiveConnection connection,
151153
PersistenceContext persistenceContext) {
152154
super( factory, options );
155+
connectionProvided = options.getConnection() != null;
153156
this.persistenceContext = persistenceContext;
154157
// StatelessSession should not allow JDBC batching, because that would change
155158
// its "immediate synchronous execution" model into something more like transactional
@@ -1019,6 +1022,12 @@ public void prepareForQueryExecution(boolean requiresTxn) {
10191022
// }
10201023
}
10211024

1025+
@Override
1026+
public boolean isTransactionInProgress() {
1027+
return connectionProvided || ( isOpenOrWaitingForAutoClose()
1028+
&& reactiveConnection.isTransactionInProgress() );
1029+
}
1030+
10221031
@Override
10231032
public <R> ReactiveSqmQueryImplementor<R> createReactiveQuery(String queryString, Class<R> expectedResultType) {
10241033
checkOpen();

0 commit comments

Comments
 (0)