Skip to content

Commit af8293d

Browse files
author
Alexander Lavrukov
committed
better-spliterator: Better spliterator
1 parent 89a53cc commit af8293d

File tree

16 files changed

+497
-58
lines changed

16 files changed

+497
-58
lines changed

repository-test/src/main/java/tech/ydb/yoj/repository/test/RepositoryTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ private static <ID extends Entity.Id<?>> ReadTableParams<ID> defaultReadTablePar
380380
}
381381

382382
private static <ID extends Entity.Id<?>> ReadTableParams.ReadTableParamsBuilder<ID> buildReadTableParamsNonLegacy() {
383-
return ReadTableParams.<ID>builder().useNewSpliterator(true);
383+
return ReadTableParams.<ID>builder().spliteratorType(ReadTableParams.SpliteratorType.EXPERIMENTAL);
384384
}
385385

386386
@Test

repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java

Lines changed: 72 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import tech.ydb.core.Result;
1616
import tech.ydb.core.Status;
1717
import tech.ydb.core.StatusCode;
18+
import tech.ydb.core.grpc.GrpcReadStream;
1819
import tech.ydb.proto.ValueProtos;
1920
import tech.ydb.table.Session;
2021
import tech.ydb.table.query.DataQueryResult;
@@ -25,6 +26,7 @@
2526
import tech.ydb.table.query.stats.QueryStats;
2627
import tech.ydb.table.query.stats.QueryStatsCollectionMode;
2728
import tech.ydb.table.query.stats.TableAccessStats;
29+
import tech.ydb.table.query.ReadTablePart;
2830
import tech.ydb.table.result.ResultSetReader;
2931
import tech.ydb.table.settings.BulkUpsertSettings;
3032
import tech.ydb.table.settings.CommitTxSettings;
@@ -70,6 +72,14 @@
7072
import tech.ydb.yoj.repository.ydb.exception.YdbRepositoryException;
7173
import tech.ydb.yoj.repository.ydb.merge.QueriesMerger;
7274
import tech.ydb.yoj.repository.ydb.readtable.ReadTableMapper;
75+
import tech.ydb.yoj.repository.ydb.spliterator.ClosableSpliterator;
76+
import tech.ydb.yoj.repository.ydb.spliterator.ResultSetIterator;
77+
import tech.ydb.yoj.repository.ydb.spliterator.YdbSpliterator;
78+
import tech.ydb.yoj.repository.ydb.spliterator.queue.YojQueue;
79+
import tech.ydb.yoj.repository.ydb.spliterator.YdbSpliteratorQueueGrpcStreamAdapter;
80+
import tech.ydb.yoj.repository.ydb.spliterator.legacy.YdbLegacySpliterator;
81+
import tech.ydb.yoj.repository.ydb.spliterator.legacy.YdbNewLegacySpliterator;
82+
import tech.ydb.yoj.repository.ydb.spliterator.queue.YojQueueImpl;
7383
import tech.ydb.yoj.repository.ydb.statement.Statement;
7484
import tech.ydb.yoj.repository.ydb.table.YdbTable;
7585
import tech.ydb.yoj.util.lang.Interrupts;
@@ -78,6 +88,7 @@
7888
import javax.annotation.Nullable;
7989
import java.time.Duration;
8090
import java.util.ArrayList;
91+
import java.util.Iterator;
8192
import java.util.List;
8293
import java.util.Map;
8394
import java.util.concurrent.CompletableFuture;
@@ -101,7 +112,7 @@ public class YdbRepositoryTransaction<REPO extends YdbRepository>
101112
private static final String PROP_TRACE_VERBOSE_OBJ_RESULTS = "tech.ydb.yoj.repository.ydb.trace.verboseObjResults";
102113

103114
private final List<YdbRepository.Query<?>> pendingWrites = new ArrayList<>();
104-
private final List<YdbSpliterator<?>> spliterators = new ArrayList<>();
115+
private final List<ClosableSpliterator<?>> spliterators = new ArrayList<>();
105116

106117
@Getter
107118
private final TxOptions options;
@@ -127,8 +138,8 @@ public YdbRepositoryTransaction(REPO repo, TxOptions options) {
127138
this.tablespace = repo.getSchemaOperations().getTablespace();
128139
}
129140

130-
private <V> YdbSpliterator<V> createSpliterator(String request, boolean isOrdered) {
131-
YdbSpliterator<V> spliterator = new YdbSpliterator<>(request, isOrdered);
141+
private <V> YdbNewLegacySpliterator<V> createSpliterator(String request, boolean isOrdered) {
142+
YdbNewLegacySpliterator<V> spliterator = new YdbNewLegacySpliterator<>(request, isOrdered);
132143
spliterators.add(spliterator);
133144
return spliterator;
134145
}
@@ -183,7 +194,7 @@ private void doCommit() {
183194

184195
private void closeStreams() {
185196
Exception summaryException = null;
186-
for (YdbSpliterator<?> spliterator : spliterators) {
197+
for (ClosableSpliterator<?> spliterator : spliterators) {
187198
try {
188199
spliterator.close();
189200
} catch (Exception e) {
@@ -451,7 +462,7 @@ public <PARAMS, RESULT> Stream<RESULT> executeScanQuery(Statement<PARAMS, RESULT
451462
String yql = getYql(statement);
452463
Params sdkParams = getSdkParams(statement, params);
453464

454-
YdbSpliterator<RESULT> spliterator = createSpliterator("scanQuery: " + yql, false);
465+
YdbNewLegacySpliterator<RESULT> spliterator = createSpliterator("scanQuery: " + yql, false);
455466

456467
initSession();
457468
session.executeScanQuery(
@@ -559,38 +570,65 @@ public <PARAMS, RESULT> Stream<RESULT> readTable(ReadTableMapper<PARAMS, RESULT>
559570
settings.toKey(TupleValue.of(values), params.isToInclusive());
560571
}
561572

562-
if (params.isUseNewSpliterator()) {
563-
YdbSpliterator<RESULT> spliterator = createSpliterator("readTable: " + tableName, params.isOrdered());
573+
return switch (params.getSpliteratorType()) {
574+
case LEGACY -> {
575+
try {
576+
YdbLegacySpliterator<RESULT> spliterator = new YdbLegacySpliterator<>(params.isOrdered(), action ->
577+
doCall("read table " + mapper.getTableName(""), () -> {
578+
Status status = YdbOperations.safeJoin(
579+
session.readTable(
580+
tableName,
581+
settings.build(),
582+
rs -> new ResultSetConverter(rs).stream(mapper::mapResult).forEach(action)
583+
),
584+
params.getTimeout().plusMinutes(5)
585+
);
586+
validate("readTable", status.getCode(), status.toString());
587+
})
588+
);
589+
yield spliterator.makeStream();
590+
} catch (RepositoryException e) {
591+
throw e;
592+
} catch (Exception e) {
593+
throw new UnexpectedException("Could not read table " + tableName, e);
594+
}
595+
}
596+
case LEGACY_SLOW -> {
597+
YdbNewLegacySpliterator<RESULT> spliterator = createSpliterator("readTable: " + tableName, params.isOrdered());
564598

565-
initSession();
566-
session.readTable(
567-
tableName, settings.build(),
568-
resultSet -> new ResultSetConverter(resultSet).stream(mapper::mapResult).forEach(spliterator::onNext)
569-
).whenComplete(spliterator::onSupplierThreadComplete);
599+
initSession();
600+
session.readTable(
601+
tableName, settings.build(),
602+
resultSet -> new ResultSetConverter(resultSet).stream(mapper::mapResult).forEach(spliterator::onNext)
603+
).whenComplete(spliterator::onSupplierThreadComplete);
570604

571-
return spliterator.createStream();
572-
}
605+
yield spliterator.createStream();
606+
}
607+
case EXPERIMENTAL -> {
608+
initSession();
609+
610+
// TODO: configure stream timeout
611+
// TODO: configure batch count
612+
YojQueue<Iterator<RESULT>> queue = YojQueueImpl.create(0, Duration.ofMinutes(5));
613+
614+
var adapter = new YdbSpliteratorQueueGrpcStreamAdapter<>("readTable: " + tableName, queue);
615+
GrpcReadStream<ReadTablePart> grpcStream = session.executeReadTable(tableName, settings.build());
616+
CompletableFuture<Status> future = grpcStream.start(readTablePart -> {
617+
ResultSetIterator<RESULT> iterator = new ResultSetIterator<>(
618+
readTablePart.getResultSetReader(),
619+
mapper::mapResult
620+
);
621+
adapter.onNext(iterator);
622+
});
623+
future.whenComplete(adapter::onSupplierThreadComplete);
573624

574-
try {
575-
YdbLegacySpliterator<RESULT> spliterator = new YdbLegacySpliterator<>(params.isOrdered(), action ->
576-
doCall("read table " + mapper.getTableName(""), () -> {
577-
Status status = YdbOperations.safeJoin(
578-
session.readTable(
579-
tableName,
580-
settings.build(),
581-
rs -> new ResultSetConverter(rs).stream(mapper::mapResult).forEach(action)
582-
),
583-
params.getTimeout().plusMinutes(5)
584-
);
585-
validate("readTable", status.getCode(), status.toString());
586-
})
587-
);
588-
return spliterator.makeStream();
589-
} catch (RepositoryException e) {
590-
throw e;
591-
} catch (Exception e) {
592-
throw new UnexpectedException("Could not read table " + tableName, e);
593-
}
625+
YdbSpliterator<RESULT> spliterator = new YdbSpliterator<>(queue, params.isOrdered());
626+
627+
spliterators.add(spliterator);
628+
629+
yield spliterator.createStream();
630+
}
631+
};
594632
}
595633

596634
/**
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package tech.ydb.yoj.repository.ydb.spliterator;
2+
3+
import java.util.Spliterator;
4+
5+
public interface ClosableSpliterator<V> extends Spliterator<V> {
6+
void close();
7+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package tech.ydb.yoj.repository.ydb.spliterator;
2+
3+
import tech.ydb.proto.ValueProtos;
4+
import tech.ydb.table.result.ResultSetReader;
5+
import tech.ydb.yoj.repository.ydb.client.YdbConverter;
6+
7+
import java.util.ArrayList;
8+
import java.util.Iterator;
9+
import java.util.List;
10+
import java.util.NoSuchElementException;
11+
12+
public final class ResultSetIterator<V> implements Iterator<V> {
13+
private final ResultSetReader resultSet;
14+
private final ResultConverter<V> converter;
15+
private final List<ValueProtos.Column> columns;
16+
17+
private int position = 0;
18+
19+
public ResultSetIterator(ResultSetReader resultSet, ResultConverter<V> converter) {
20+
List<ValueProtos.Column> columns;
21+
if (resultSet.getRowCount() > 0) {
22+
resultSet.setRowIndex(0);
23+
columns = getColumns(resultSet);
24+
} else {
25+
columns = new ArrayList<>();
26+
}
27+
28+
this.resultSet = resultSet;
29+
this.converter = converter;
30+
this.columns = columns;
31+
}
32+
33+
@Override
34+
public boolean hasNext() {
35+
return position < resultSet.getRowCount();
36+
}
37+
38+
@Override
39+
public V next() {
40+
if (!hasNext()) {
41+
throw new NoSuchElementException();
42+
}
43+
44+
ValueProtos.Value value = buildValue(position++);
45+
46+
return converter.convert(columns, value);
47+
}
48+
49+
private ValueProtos.Value buildValue(int rowIndex) {
50+
resultSet.setRowIndex(rowIndex);
51+
ValueProtos.Value.Builder value = ValueProtos.Value.newBuilder();
52+
for (int i = 0; i < columns.size(); i++) {
53+
value.addItems(YdbConverter.convertValueToProto(resultSet.getColumn(i)));
54+
}
55+
return value.build();
56+
}
57+
58+
private static List<ValueProtos.Column> getColumns(ResultSetReader resultSet) {
59+
List<ValueProtos.Column> columns = new ArrayList<>();
60+
for (int i = 0; i < resultSet.getColumnCount(); i++) {
61+
columns.add(ValueProtos.Column.newBuilder()
62+
.setName(resultSet.getColumnName(i))
63+
.build()
64+
);
65+
}
66+
return columns;
67+
}
68+
69+
@FunctionalInterface
70+
public interface ResultConverter<V> {
71+
V convert(List<ValueProtos.Column> columns, ValueProtos.Value value);
72+
}
73+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package tech.ydb.yoj.repository.ydb.spliterator;
2+
3+
import tech.ydb.yoj.repository.ydb.spliterator.queue.YojQueue;
4+
import tech.ydb.yoj.repository.ydb.spliterator.queue.YojSpliteratorQueue;
5+
6+
import java.util.Collections;
7+
import java.util.Iterator;
8+
import java.util.Spliterator;
9+
import java.util.function.Consumer;
10+
import java.util.stream.Stream;
11+
import java.util.stream.StreamSupport;
12+
13+
public final class YdbSpliterator<V> implements ClosableSpliterator<V> {
14+
private final YojSpliteratorQueue<Iterator<V>> queue;
15+
private final int flags;
16+
17+
private Iterator<V> valueIterator = Collections.emptyIterator();
18+
19+
private boolean closed = false;
20+
21+
public YdbSpliterator(YojQueue<Iterator<V>> queue, boolean isOrdered) {
22+
this.queue = queue;
23+
this.flags = (isOrdered ? ORDERED : 0) | NONNULL;
24+
}
25+
26+
// Correct way to create stream with YdbSpliterator. onClose call is important for avoid supplier thread leak.
27+
public Stream<V> createStream() {
28+
return StreamSupport.stream(this, false).onClose(this::close);
29+
}
30+
31+
@Override
32+
public void close() {
33+
if (closed) {
34+
return;
35+
}
36+
closed = true;
37+
queue.close();
38+
}
39+
40+
@Override
41+
public boolean tryAdvance(Consumer<? super V> action) {
42+
if (closed) {
43+
return false;
44+
}
45+
46+
// queue could return empty iterator, we have to select one with elements
47+
while (!valueIterator.hasNext()) {
48+
valueIterator = queue.poll();
49+
if (valueIterator == null) {
50+
close();
51+
return false;
52+
}
53+
}
54+
55+
V value = valueIterator.next();
56+
57+
action.accept(value);
58+
59+
return true;
60+
}
61+
62+
@Override
63+
public Spliterator<V> trySplit() {
64+
return null;
65+
}
66+
67+
@Override
68+
public long estimateSize() {
69+
return Long.MAX_VALUE;
70+
}
71+
72+
@Override
73+
public long getExactSizeIfKnown() {
74+
return -1;
75+
}
76+
77+
@Override
78+
public int characteristics() {
79+
return flags;
80+
}
81+
}

0 commit comments

Comments
 (0)