Skip to content

Commit 08ff4ab

Browse files
committed
Sync with master
2 parents 6cc21d0 + 225708d commit 08ff4ab

22 files changed

+521
-152
lines changed

jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,13 @@
77
import java.util.concurrent.atomic.AtomicReference;
88

99
import tech.ydb.core.Status;
10-
import tech.ydb.core.StatusCode;
1110
import tech.ydb.core.grpc.GrpcReadStream;
1211
import tech.ydb.jdbc.YdbConst;
1312
import tech.ydb.jdbc.YdbQueryResult;
1413
import tech.ydb.jdbc.YdbResultSet;
1514
import tech.ydb.jdbc.YdbStatement;
1615
import tech.ydb.jdbc.YdbTracer;
1716
import tech.ydb.jdbc.common.YdbTypes;
18-
import tech.ydb.jdbc.exception.YdbRetryableException;
1917
import tech.ydb.jdbc.impl.YdbQueryResultReader;
2018
import tech.ydb.jdbc.impl.YdbQueryResultStatic;
2119
import tech.ydb.jdbc.impl.YdbResultSetMemory;
@@ -89,24 +87,6 @@ public void ensureOpened() throws SQLException {
8987
}
9088
}
9189

92-
@Override
93-
public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String preparedYql, Params params)
94-
throws SQLException {
95-
boolean insideTx = isInsideTransaction();
96-
while (true) {
97-
try {
98-
return executeQueryImpl(statement, query, preparedYql, params);
99-
} catch (YdbRetryableException ex) {
100-
if (insideTx || ex.getStatus().getCode() != StatusCode.BAD_SESSION) {
101-
throw ex;
102-
}
103-
}
104-
}
105-
}
106-
107-
protected abstract YdbQueryResult executeQueryImpl(YdbStatement statement, YdbQuery query, String preparedYql,
108-
Params params) throws SQLException;
109-
11090
@Override
11191
public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query) throws SQLException {
11292
ensureOpened();

jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java

Lines changed: 69 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import tech.ydb.core.Status;
1414
import tech.ydb.jdbc.YdbConst;
1515
import tech.ydb.jdbc.YdbQueryResult;
16-
import tech.ydb.jdbc.YdbResultSet;
1716
import tech.ydb.jdbc.YdbStatement;
1817
import tech.ydb.jdbc.YdbTracer;
1918
import tech.ydb.jdbc.impl.YdbQueryResultExplain;
@@ -226,12 +225,11 @@ public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException
226225
}
227226

228227
@Override
229-
protected YdbQueryResult executeQueryImpl(YdbStatement statement, YdbQuery query, String preparedYql, Params params)
228+
public YdbResultSetMemory[] executeInMemoryQuery(YdbStatement statement, String preparedYql, Params params)
230229
throws SQLException {
231230
ensureOpened();
232231

233232
YdbValidator validator = statement.getValidator();
234-
235233
int timeout = statement.getQueryTimeout();
236234
ExecuteQuerySettings.Builder settings = ExecuteQuerySettings.newBuilder();
237235
settings = settings.withStatsMode(QueryStatsMode.valueOf(statement.getStatsCollectionMode().name()));
@@ -253,41 +251,6 @@ protected YdbQueryResult executeQueryImpl(YdbStatement statement, YdbQuery query
253251

254252
String yql = prefixPragma + preparedYql;
255253

256-
if (useStreamResultSet) {
257-
tracer.trace("--> stream query");
258-
tracer.query(yql);
259-
String msg = "STREAM_QUERY >>\n" + yql;
260-
261-
final YdbQueryResultReader reader = new YdbQueryResultReader(types, statement, query) {
262-
@Override
263-
public void onClose(Status status, Throwable th) {
264-
if (th != null) {
265-
tracer.trace("<-- " + th.getMessage());
266-
}
267-
if (status != null) {
268-
validator.addStatusIssues(status);
269-
tracer.trace("<-- " + status.toString());
270-
}
271-
272-
if (localTx.isActive()) {
273-
tracer.setId(localTx.getId());
274-
} else {
275-
if (tx.compareAndSet(localTx, null)) {
276-
localTx.getSession().close();
277-
}
278-
tracer.close();
279-
}
280-
281-
super.onClose(status, th);
282-
}
283-
};
284-
285-
settings = settings.withGrpcFlowControl(reader);
286-
QueryStream stream = localTx.createQuery(yql, isAutoCommit, params, settings.build());
287-
validator.execute(msg, tracer, () -> reader.load(validator, stream));
288-
return updateCurrentResult(reader);
289-
}
290-
291254
try {
292255
tracer.trace("--> data query");
293256
tracer.query(yql);
@@ -298,14 +261,13 @@ public void onClose(Status status, Throwable th) {
298261
);
299262
validator.addStatusIssues(result.getIssueList());
300263

301-
YdbResultSet[] readers = new YdbResultSet[result.getResultSetCount()];
264+
YdbResultSetMemory[] readers = new YdbResultSetMemory[result.getResultSetCount()];
302265
for (int idx = 0; idx < readers.length; idx++) {
303266
readers[idx] = new YdbResultSetMemory(types, statement, result.getResultSet(idx));
304267
}
305268

306-
YdbQueryResultStatic queryResult = new YdbQueryResultStatic(query, readers);
307-
queryResult.setQueryStats(result.getQueryInfo().getStats());
308-
return updateCurrentResult(queryResult);
269+
// queryResult.setQueryStats(result.getQueryInfo().getStats());
270+
return readers;
309271
} finally {
310272
if (!localTx.isActive()) {
311273
if (tx.compareAndSet(localTx, null)) {
@@ -321,6 +283,71 @@ public void onClose(Status status, Throwable th) {
321283
}
322284
}
323285

286+
@Override
287+
public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String preparedYql, Params params)
288+
throws SQLException {
289+
ensureOpened();
290+
291+
if (!useStreamResultSet) {
292+
YdbResultSetMemory[] readers = executeInMemoryQuery(statement, preparedYql, params);
293+
return updateCurrentResult(new YdbQueryResultStatic(query, readers));
294+
}
295+
296+
YdbValidator validator = statement.getValidator();
297+
int timeout = statement.getQueryTimeout();
298+
ExecuteQuerySettings.Builder settings = ExecuteQuerySettings.newBuilder();
299+
if (timeout > 0) {
300+
settings = settings.withRequestTimeout(timeout, TimeUnit.SECONDS);
301+
}
302+
303+
QueryTransaction nextTx = tx.get();
304+
while (nextTx == null) {
305+
nextTx = createNewQuerySession(validator).createNewTransaction(txMode);
306+
if (!tx.compareAndSet(null, nextTx)) {
307+
nextTx.getSession().close();
308+
nextTx = tx.get();
309+
}
310+
}
311+
312+
final QueryTransaction localTx = nextTx;
313+
YdbTracer tracer = statement.getConnection().getCtx().getTracer();
314+
315+
String yql = prefixPragma + preparedYql;
316+
317+
tracer.trace("--> stream query");
318+
tracer.query(yql);
319+
String msg = "STREAM_QUERY >>\n" + yql;
320+
321+
YdbQueryResultReader reader = new YdbQueryResultReader(types, statement, query) {
322+
@Override
323+
public void onClose(Status status, Throwable th) {
324+
if (th != null) {
325+
tracer.trace("<-- " + th.getMessage());
326+
}
327+
if (status != null) {
328+
validator.addStatusIssues(status);
329+
tracer.trace("<-- " + status.toString());
330+
}
331+
332+
if (localTx.isActive()) {
333+
tracer.setId(localTx.getId());
334+
} else {
335+
if (tx.compareAndSet(localTx, null)) {
336+
localTx.getSession().close();
337+
}
338+
tracer.close();
339+
}
340+
341+
super.onClose(status, th);
342+
}
343+
};
344+
345+
settings = settings.withGrpcFlowControl(reader);
346+
QueryStream stream = localTx.createQuery(yql, isAutoCommit, params, settings.build());
347+
validator.execute(msg, tracer, () -> reader.load(validator, stream));
348+
return updateCurrentResult(reader);
349+
}
350+
324351
@Override
325352
public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query) throws SQLException {
326353
ensureOpened();

jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java

Lines changed: 43 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import tech.ydb.jdbc.YdbConst;
99
import tech.ydb.jdbc.YdbQueryResult;
10-
import tech.ydb.jdbc.YdbResultSet;
1110
import tech.ydb.jdbc.YdbStatement;
1211
import tech.ydb.jdbc.YdbTracer;
1312
import tech.ydb.jdbc.impl.YdbQueryResultExplain;
@@ -203,42 +202,54 @@ public YdbQueryResult executeExplainQuery(YdbStatement statement, YdbQuery query
203202
}
204203
}
205204

206-
@Override
207-
protected YdbQueryResult executeQueryImpl(YdbStatement statement, YdbQuery query, String preparedYql, Params params)
208-
throws SQLException {
209-
ensureOpened();
210-
211-
YdbValidator validator = statement.getValidator();
205+
private DataQueryResult executeTableQuery(YdbValidator validator, YdbTracer tracer, String yql,
206+
ExecuteDataQuerySettings settings, Params prms) throws SQLException {
212207
Session session = tx.getSession(validator);
213-
String yql = prefixPragma + preparedYql;
214-
YdbTracer tracer = statement.getConnection().getCtx().getTracer();
215-
tracer.trace("--> data query");
216-
tracer.query(yql);
217-
218208
try {
209+
tracer.trace("--> data query");
210+
tracer.query(yql);
211+
219212
DataQueryResult result = validator.call(
220-
QueryType.DATA_QUERY + " >>\n" + yql, tracer,
221-
() -> session.executeDataQuery(yql, tx.txControl(), params, dataQuerySettings(statement))
213+
QueryType.DATA_QUERY + " >>\n" + yql,
214+
tracer,
215+
() -> session.executeDataQuery(yql, tx.txControl(), prms, settings)
222216
);
223217
updateState(tx.withDataQuery(session, result.getTxId()));
224218

225-
YdbResultSet[] readers = new YdbResultSet[result.getResultSetCount()];
226-
for (int idx = 0; idx < result.getResultSetCount(); idx += 1) {
227-
ResultSetReader rs = result.getResultSet(idx);
228-
if (failOnTruncatedResult && rs.isTruncated()) {
229-
String msg = String.format(YdbConst.RESULT_IS_TRUNCATED, idx, rs.getRowCount());
230-
throw new SQLException(msg);
219+
if (failOnTruncatedResult) {
220+
for (int idx = 0; idx < result.getResultSetCount(); idx += 1) {
221+
ResultSetReader rs = result.getResultSet(idx);
222+
if (rs.isTruncated()) {
223+
String msg = String.format(YdbConst.RESULT_IS_TRUNCATED, idx, rs.getRowCount());
224+
throw new SQLException(msg);
225+
}
231226
}
232-
233-
readers[idx] = new YdbResultSetMemory(types, statement, rs);
234227
}
235228

236-
YdbQueryResultStatic queryResult = new YdbQueryResultStatic(query, readers);
237-
queryResult.setQueryStats(result.getQueryStats());
238-
return updateCurrentResult(queryResult);
229+
return result;
239230
} catch (SQLException | RuntimeException ex) {
240231
updateState(tx.withRollback(session));
241232
throw ex;
233+
}
234+
}
235+
236+
@Override
237+
public YdbResultSetMemory[] executeInMemoryQuery(YdbStatement statement, String preparedYql, Params params)
238+
throws SQLException {
239+
ensureOpened();
240+
241+
YdbValidator validator = statement.getValidator();
242+
String yql = prefixPragma + preparedYql;
243+
YdbTracer tracer = statement.getConnection().getCtx().getTracer();
244+
245+
try {
246+
DataQueryResult result = executeTableQuery(validator, tracer, yql, dataQuerySettings(statement), params);
247+
YdbResultSetMemory[] readers = new YdbResultSetMemory[result.getResultSetCount()];
248+
for (int idx = 0; idx < result.getResultSetCount(); idx += 1) {
249+
readers[idx] = new YdbResultSetMemory(types, statement, result.getResultSet(idx));
250+
}
251+
// queryResult.setQueryStats(result.getQueryStats());
252+
return readers;
242253
} finally {
243254
if (tx.isInsideTransaction()) {
244255
tracer.setId(tx.txID());
@@ -248,6 +259,13 @@ protected YdbQueryResult executeQueryImpl(YdbStatement statement, YdbQuery query
248259
}
249260
}
250261

262+
@Override
263+
public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String preparedYql, Params params)
264+
throws SQLException {
265+
YdbResultSetMemory[] readers = executeInMemoryQuery(statement, preparedYql, params);
266+
return updateCurrentResult(new YdbQueryResultStatic(query, readers));
267+
}
268+
251269
@Override
252270
public boolean isValid(YdbValidator validator, int timeout) throws SQLException {
253271
ensureOpened();

jdbc/src/main/java/tech/ydb/jdbc/context/TableTxExecutor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import tech.ydb.jdbc.YdbTracer;
1818
import tech.ydb.jdbc.exception.ExceptionFactory;
1919
import tech.ydb.jdbc.exception.YdbConditionallyRetryableException;
20+
import tech.ydb.jdbc.exception.YdbUnavailbaleException;
2021
import tech.ydb.jdbc.query.YdbQuery;
2122
import tech.ydb.query.QueryStream;
2223
import tech.ydb.query.QueryTransaction;
@@ -128,7 +129,7 @@ protected void commitImpl(YdbContext ctx, YdbValidator validator, QueryTransacti
128129
tracer.query(commitQuery);
129130
return query.execute();
130131
});
131-
} catch (YdbConditionallyRetryableException ex) {
132+
} catch (YdbConditionallyRetryableException | YdbUnavailbaleException ex) {
132133
Result<DataQueryResult> res = validateRetryCtx.supplyResult(
133134
session -> session.executeDataQuery(validateQuery, TxControl.snapshotRo(), params)
134135
).join();

jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
import tech.ydb.core.grpc.GrpcTransport;
1414
import tech.ydb.core.grpc.GrpcTransportBuilder;
15+
import tech.ydb.core.impl.SingleChannelTransport;
1516
import tech.ydb.core.settings.BaseRequestSettings;
1617
import tech.ydb.jdbc.YdbPrepareMode;
1718
import tech.ydb.jdbc.YdbTracer;
@@ -250,7 +251,7 @@ public static YdbContext createContext(YdbConfig config) throws SQLException {
250251
});
251252
});
252253

253-
grpcTransport = builder.build();
254+
grpcTransport = config.isUseDiscovery() ? builder.build() : new SingleChannelTransport(builder);
254255

255256
PooledTableClient.Builder tableClient = PooledTableClient.newClient(
256257
GrpcTableRpc.useTransport(grpcTransport)

jdbc/src/main/java/tech/ydb/jdbc/context/YdbExecutor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import tech.ydb.jdbc.YdbQueryResult;
66
import tech.ydb.jdbc.YdbStatement;
7+
import tech.ydb.jdbc.impl.YdbResultSetMemory;
78
import tech.ydb.jdbc.query.YdbQuery;
89
import tech.ydb.table.query.Params;
910
import tech.ydb.table.values.ListValue;
@@ -34,6 +35,7 @@ public interface YdbExecutor {
3435
YdbQueryResult executeExplainQuery(YdbStatement st, YdbQuery query) throws SQLException;
3536
YdbQueryResult executeScanQuery(YdbStatement st, YdbQuery query, String yql, Params prms) throws SQLException;
3637
YdbQueryResult executeDataQuery(YdbStatement st, YdbQuery query, String yql, Params prms) throws SQLException;
38+
YdbResultSetMemory[] executeInMemoryQuery(YdbStatement st, String yql, Params prms) throws SQLException;
3739

3840
void commit(YdbContext ctx, YdbValidator validator) throws SQLException;
3941
void rollback(YdbContext ctx, YdbValidator validator) throws SQLException;

jdbc/src/main/java/tech/ydb/jdbc/exception/ExceptionFactory.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,19 +32,17 @@ public static SQLException createException(String message, UnexpectedResultExcep
3232
}
3333

3434
// transport problems are translated to SQLTransientConnectionException
35-
if (code == StatusCode.TRANSPORT_UNAVAILABLE || code == StatusCode.UNAVAILABLE) {
35+
if (code == StatusCode.TRANSPORT_UNAVAILABLE) {
3636
return new YdbUnavailbaleException(message, sqlState, vendorCode, cause);
3737
}
3838

3939
// timeouts are translated to SQLTimeoutException
40-
if (code == StatusCode.TIMEOUT ||
41-
code == StatusCode.CLIENT_DEADLINE_EXPIRED ||
42-
code == StatusCode.CLIENT_DEADLINE_EXCEEDED) {
40+
if (code == StatusCode.CLIENT_DEADLINE_EXPIRED || code == StatusCode.CLIENT_DEADLINE_EXCEEDED) {
4341
return new YdbTimeoutException(message, sqlState, vendorCode, cause);
4442
}
4543

4644
// all others transient problems are translated to base SQLTransientException
47-
if (code.isRetryable(true)) {
45+
if (code.isRetryable(true) || code == StatusCode.TIMEOUT) {
4846
return new YdbConditionallyRetryableException(message, sqlState, vendorCode, cause);
4947
}
5048

jdbc/src/main/java/tech/ydb/jdbc/impl/YdbConnectionImpl.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,10 +255,8 @@ private YdbPreparedStatement prepareStatement(QueryKey key, int resultSetType, Y
255255
throws SQLException {
256256

257257
validator.clearWarnings();
258-
ctx.getTracer().trace("prepare statement");
259258
YdbQuery query = ctx.parseYdbQuery(key);
260259
YdbPreparedQuery params = ctx.prepareYdbQuery(query, mode);
261-
ctx.getTracer().trace("create prepared statement");
262260
return new YdbPreparedStatementImpl(this, query, params, resultSetType);
263261
}
264262

0 commit comments

Comments
 (0)