Skip to content

Commit ee6ebd2

Browse files
authored
Fixed inmemory queries batching (#148)
2 parents 423c108 + b9339ea commit ee6ebd2

File tree

13 files changed

+394
-106
lines changed

13 files changed

+394
-106
lines changed

jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,13 @@
77
import java.util.concurrent.atomic.AtomicReference;
88

99
import tech.ydb.core.Status;
10-
import tech.ydb.core.StatusCode;
1110
import tech.ydb.core.grpc.GrpcReadStream;
1211
import tech.ydb.jdbc.YdbConst;
1312
import tech.ydb.jdbc.YdbQueryResult;
1413
import tech.ydb.jdbc.YdbResultSet;
1514
import tech.ydb.jdbc.YdbStatement;
1615
import tech.ydb.jdbc.YdbTracer;
1716
import tech.ydb.jdbc.common.YdbTypes;
18-
import tech.ydb.jdbc.exception.YdbRetryableException;
1917
import tech.ydb.jdbc.impl.YdbQueryResultReader;
2018
import tech.ydb.jdbc.impl.YdbQueryResultStatic;
2119
import tech.ydb.jdbc.impl.YdbResultSetMemory;
@@ -89,24 +87,6 @@ public void ensureOpened() throws SQLException {
8987
}
9088
}
9189

92-
@Override
93-
public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String preparedYql, Params params)
94-
throws SQLException {
95-
boolean insideTx = isInsideTransaction();
96-
while (true) {
97-
try {
98-
return executeQueryImpl(statement, query, preparedYql, params);
99-
} catch (YdbRetryableException ex) {
100-
if (insideTx || ex.getStatus().getCode() != StatusCode.BAD_SESSION) {
101-
throw ex;
102-
}
103-
}
104-
}
105-
}
106-
107-
protected abstract YdbQueryResult executeQueryImpl(YdbStatement statement, YdbQuery query, String preparedYql,
108-
Params params) throws SQLException;
109-
11090
@Override
11191
public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query) throws SQLException {
11292
ensureOpened();

jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java

Lines changed: 69 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import tech.ydb.core.Status;
1414
import tech.ydb.jdbc.YdbConst;
1515
import tech.ydb.jdbc.YdbQueryResult;
16-
import tech.ydb.jdbc.YdbResultSet;
1716
import tech.ydb.jdbc.YdbStatement;
1817
import tech.ydb.jdbc.YdbTracer;
1918
import tech.ydb.jdbc.impl.YdbQueryResultExplain;
@@ -225,12 +224,11 @@ public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException
225224
}
226225

227226
@Override
228-
protected YdbQueryResult executeQueryImpl(YdbStatement statement, YdbQuery query, String preparedYql, Params params)
227+
public YdbResultSetMemory[] executeInMemoryQuery(YdbStatement statement, String preparedYql, Params params)
229228
throws SQLException {
230229
ensureOpened();
231230

232231
YdbValidator validator = statement.getValidator();
233-
234232
int timeout = statement.getQueryTimeout();
235233
ExecuteQuerySettings.Builder settings = ExecuteQuerySettings.newBuilder();
236234
if (timeout > 0) {
@@ -251,41 +249,6 @@ protected YdbQueryResult executeQueryImpl(YdbStatement statement, YdbQuery query
251249

252250
String yql = prefixPragma + preparedYql;
253251

254-
if (useStreamResultSet) {
255-
tracer.trace("--> stream query");
256-
tracer.query(yql);
257-
String msg = "STREAM_QUERY >>\n" + yql;
258-
259-
final YdbQueryResultReader reader = new YdbQueryResultReader(types, statement, query) {
260-
@Override
261-
public void onClose(Status status, Throwable th) {
262-
if (th != null) {
263-
tracer.trace("<-- " + th.getMessage());
264-
}
265-
if (status != null) {
266-
validator.addStatusIssues(status);
267-
tracer.trace("<-- " + status.toString());
268-
}
269-
270-
if (localTx.isActive()) {
271-
tracer.setId(localTx.getId());
272-
} else {
273-
if (tx.compareAndSet(localTx, null)) {
274-
localTx.getSession().close();
275-
}
276-
tracer.close();
277-
}
278-
279-
super.onClose(status, th);
280-
}
281-
};
282-
283-
settings = settings.withGrpcFlowControl(reader);
284-
QueryStream stream = localTx.createQuery(yql, isAutoCommit, params, settings.build());
285-
validator.execute(msg, tracer, () -> reader.load(validator, stream));
286-
return updateCurrentResult(reader);
287-
}
288-
289252
try {
290253
tracer.trace("--> data query");
291254
tracer.query(yql);
@@ -296,11 +259,12 @@ public void onClose(Status status, Throwable th) {
296259
);
297260
validator.addStatusIssues(result.getIssueList());
298261

299-
YdbResultSet[] readers = new YdbResultSet[result.getResultSetCount()];
262+
YdbResultSetMemory[] readers = new YdbResultSetMemory[result.getResultSetCount()];
300263
for (int idx = 0; idx < readers.length; idx++) {
301264
readers[idx] = new YdbResultSetMemory(types, statement, result.getResultSet(idx));
302265
}
303-
return updateCurrentResult(new YdbQueryResultStatic(query, readers));
266+
267+
return readers;
304268
} finally {
305269
if (!localTx.isActive()) {
306270
if (tx.compareAndSet(localTx, null)) {
@@ -316,6 +280,71 @@ public void onClose(Status status, Throwable th) {
316280
}
317281
}
318282

283+
@Override
284+
public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String preparedYql, Params params)
285+
throws SQLException {
286+
ensureOpened();
287+
288+
if (!useStreamResultSet) {
289+
YdbResultSetMemory[] readers = executeInMemoryQuery(statement, preparedYql, params);
290+
return updateCurrentResult(new YdbQueryResultStatic(query, readers));
291+
}
292+
293+
YdbValidator validator = statement.getValidator();
294+
int timeout = statement.getQueryTimeout();
295+
ExecuteQuerySettings.Builder settings = ExecuteQuerySettings.newBuilder();
296+
if (timeout > 0) {
297+
settings = settings.withRequestTimeout(timeout, TimeUnit.SECONDS);
298+
}
299+
300+
QueryTransaction nextTx = tx.get();
301+
while (nextTx == null) {
302+
nextTx = createNewQuerySession(validator).createNewTransaction(txMode);
303+
if (!tx.compareAndSet(null, nextTx)) {
304+
nextTx.getSession().close();
305+
nextTx = tx.get();
306+
}
307+
}
308+
309+
final QueryTransaction localTx = nextTx;
310+
YdbTracer tracer = statement.getConnection().getCtx().getTracer();
311+
312+
String yql = prefixPragma + preparedYql;
313+
314+
tracer.trace("--> stream query");
315+
tracer.query(yql);
316+
String msg = "STREAM_QUERY >>\n" + yql;
317+
318+
YdbQueryResultReader reader = new YdbQueryResultReader(types, statement, query) {
319+
@Override
320+
public void onClose(Status status, Throwable th) {
321+
if (th != null) {
322+
tracer.trace("<-- " + th.getMessage());
323+
}
324+
if (status != null) {
325+
validator.addStatusIssues(status);
326+
tracer.trace("<-- " + status.toString());
327+
}
328+
329+
if (localTx.isActive()) {
330+
tracer.setId(localTx.getId());
331+
} else {
332+
if (tx.compareAndSet(localTx, null)) {
333+
localTx.getSession().close();
334+
}
335+
tracer.close();
336+
}
337+
338+
super.onClose(status, th);
339+
}
340+
};
341+
342+
settings = settings.withGrpcFlowControl(reader);
343+
QueryStream stream = localTx.createQuery(yql, isAutoCommit, params, settings.build());
344+
validator.execute(msg, tracer, () -> reader.load(validator, stream));
345+
return updateCurrentResult(reader);
346+
}
347+
319348
@Override
320349
public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query) throws SQLException {
321350
ensureOpened();

jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java

Lines changed: 42 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import tech.ydb.jdbc.YdbConst;
99
import tech.ydb.jdbc.YdbQueryResult;
10-
import tech.ydb.jdbc.YdbResultSet;
1110
import tech.ydb.jdbc.YdbStatement;
1211
import tech.ydb.jdbc.YdbTracer;
1312
import tech.ydb.jdbc.impl.YdbQueryResultExplain;
@@ -201,40 +200,53 @@ public YdbQueryResult executeExplainQuery(YdbStatement statement, YdbQuery query
201200
}
202201
}
203202

204-
@Override
205-
protected YdbQueryResult executeQueryImpl(YdbStatement statement, YdbQuery query, String preparedYql, Params params)
206-
throws SQLException {
207-
ensureOpened();
208-
209-
YdbValidator validator = statement.getValidator();
203+
private DataQueryResult executeTableQuery(YdbValidator validator, YdbTracer tracer, String yql,
204+
ExecuteDataQuerySettings settings, Params prms) throws SQLException {
210205
Session session = tx.getSession(validator);
211-
String yql = prefixPragma + preparedYql;
212-
YdbTracer tracer = statement.getConnection().getCtx().getTracer();
213-
tracer.trace("--> data query");
214-
tracer.query(yql);
215-
216206
try {
207+
tracer.trace("--> data query");
208+
tracer.query(yql);
209+
217210
DataQueryResult result = validator.call(
218-
QueryType.DATA_QUERY + " >>\n" + yql, tracer,
219-
() -> session.executeDataQuery(yql, tx.txControl(), params, dataQuerySettings(statement))
211+
QueryType.DATA_QUERY + " >>\n" + yql,
212+
tracer,
213+
() -> session.executeDataQuery(yql, tx.txControl(), prms, settings)
220214
);
221215
updateState(tx.withDataQuery(session, result.getTxId()));
222216

223-
YdbResultSet[] readers = new YdbResultSet[result.getResultSetCount()];
224-
for (int idx = 0; idx < result.getResultSetCount(); idx += 1) {
225-
ResultSetReader rs = result.getResultSet(idx);
226-
if (failOnTruncatedResult && rs.isTruncated()) {
227-
String msg = String.format(YdbConst.RESULT_IS_TRUNCATED, idx, rs.getRowCount());
228-
throw new SQLException(msg);
217+
if (failOnTruncatedResult) {
218+
for (int idx = 0; idx < result.getResultSetCount(); idx += 1) {
219+
ResultSetReader rs = result.getResultSet(idx);
220+
if (rs.isTruncated()) {
221+
String msg = String.format(YdbConst.RESULT_IS_TRUNCATED, idx, rs.getRowCount());
222+
throw new SQLException(msg);
223+
}
229224
}
230-
231-
readers[idx] = new YdbResultSetMemory(types, statement, rs);
232225
}
233226

234-
return updateCurrentResult(new YdbQueryResultStatic(query, readers));
227+
return result;
235228
} catch (SQLException | RuntimeException ex) {
236229
updateState(tx.withRollback(session));
237230
throw ex;
231+
}
232+
}
233+
234+
@Override
235+
public YdbResultSetMemory[] executeInMemoryQuery(YdbStatement statement, String preparedYql, Params params)
236+
throws SQLException {
237+
ensureOpened();
238+
239+
YdbValidator validator = statement.getValidator();
240+
String yql = prefixPragma + preparedYql;
241+
YdbTracer tracer = statement.getConnection().getCtx().getTracer();
242+
243+
try {
244+
DataQueryResult result = executeTableQuery(validator, tracer, yql, dataQuerySettings(statement), params);
245+
YdbResultSetMemory[] readers = new YdbResultSetMemory[result.getResultSetCount()];
246+
for (int idx = 0; idx < result.getResultSetCount(); idx += 1) {
247+
readers[idx] = new YdbResultSetMemory(types, statement, result.getResultSet(idx));
248+
}
249+
return readers;
238250
} finally {
239251
if (tx.isInsideTransaction()) {
240252
tracer.setId(tx.txID());
@@ -244,6 +256,13 @@ protected YdbQueryResult executeQueryImpl(YdbStatement statement, YdbQuery query
244256
}
245257
}
246258

259+
@Override
260+
public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String preparedYql, Params params)
261+
throws SQLException {
262+
YdbResultSetMemory[] readers = executeInMemoryQuery(statement, preparedYql, params);
263+
return updateCurrentResult(new YdbQueryResultStatic(query, readers));
264+
}
265+
247266
@Override
248267
public boolean isValid(YdbValidator validator, int timeout) throws SQLException {
249268
ensureOpened();

jdbc/src/main/java/tech/ydb/jdbc/context/YdbExecutor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import tech.ydb.jdbc.YdbQueryResult;
66
import tech.ydb.jdbc.YdbStatement;
7+
import tech.ydb.jdbc.impl.YdbResultSetMemory;
78
import tech.ydb.jdbc.query.YdbQuery;
89
import tech.ydb.table.query.Params;
910
import tech.ydb.table.values.ListValue;
@@ -34,6 +35,7 @@ public interface YdbExecutor {
3435
YdbQueryResult executeExplainQuery(YdbStatement st, YdbQuery query) throws SQLException;
3536
YdbQueryResult executeScanQuery(YdbStatement st, YdbQuery query, String yql, Params prms) throws SQLException;
3637
YdbQueryResult executeDataQuery(YdbStatement st, YdbQuery query, String yql, Params prms) throws SQLException;
38+
YdbResultSetMemory[] executeInMemoryQuery(YdbStatement st, String yql, Params prms) throws SQLException;
3739

3840
void commit(YdbContext ctx, YdbValidator validator) throws SQLException;
3941
void rollback(YdbContext ctx, YdbValidator validator) throws SQLException;

jdbc/src/main/java/tech/ydb/jdbc/impl/YdbPreparedStatementImpl.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.sql.Types;
2323
import java.util.Arrays;
2424
import java.util.Calendar;
25+
import java.util.List;
2526
import java.util.Objects;
2627
import java.util.logging.Logger;
2728

@@ -97,9 +98,14 @@ public int[] executeBatch() throws SQLException {
9798
YdbQueryResult newState = executeBulkUpsert(query, bulk.getTablePath(), bulk.getBatchedBulk());
9899
updateState(newState);
99100
} else {
100-
for (Params prm: prepared.getBatchParams()) {
101+
List<Params> prms = prepared.getBatchParams();
102+
if (prms.size() == 1) {
103+
Params prm = prms.get(0);
101104
YdbQueryResult newState = executeDataQuery(query, prepared.getBatchText(prm), prm);
102105
updateState(newState);
106+
} else {
107+
YdbQueryResult newState = executeBatchQuery(query, prepared::getBatchText, prms);
108+
updateState(newState);
103109
}
104110
}
105111
} finally {

jdbc/src/main/java/tech/ydb/jdbc/impl/YdbResultSetMemory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ public YdbResultSetMemory(YdbTypes types, YdbStatement statement, ResultSetReade
3434
this.totalCount = total;
3535
}
3636

37+
public ResultSetReader[] getResultSets() {
38+
return rs;
39+
}
40+
3741
@Override
3842
protected ValueReader getValue(int columnIndex) throws SQLException {
3943
if (!isRowIndexValid()) {

0 commit comments

Comments
 (0)