1111import  tech .ydb .core .Result ;
1212import  tech .ydb .core .Status ;
1313import  tech .ydb .core .StatusCode ;
14+ import  tech .ydb .core .grpc .GrpcReadStream ;
1415import  tech .ydb .proto .ValueProtos ;
1516import  tech .ydb .table .Session ;
1617import  tech .ydb .table .query .DataQueryResult ;
1718import  tech .ydb .table .query .Params ;
19+ import  tech .ydb .table .query .ReadTablePart ;
1820import  tech .ydb .table .result .ResultSetReader ;
1921import  tech .ydb .table .settings .BulkUpsertSettings ;
2022import  tech .ydb .table .settings .CommitTxSettings ;
5456import  tech .ydb .yoj .repository .ydb .exception .YdbRepositoryException ;
5557import  tech .ydb .yoj .repository .ydb .merge .QueriesMerger ;
5658import  tech .ydb .yoj .repository .ydb .readtable .ReadTableMapper ;
59+ import  tech .ydb .yoj .repository .ydb .spliterator .ClosableSpliterator ;
60+ import  tech .ydb .yoj .repository .ydb .spliterator .ResultSetIterator ;
61+ import  tech .ydb .yoj .repository .ydb .spliterator .YdbSpliterator ;
62+ import  tech .ydb .yoj .repository .ydb .spliterator .YdbSpliteratorQueue ;
63+ import  tech .ydb .yoj .repository .ydb .spliterator .YdbSpliteratorQueueGrpcStreamAdapter ;
64+ import  tech .ydb .yoj .repository .ydb .spliterator .legacy .YdbLegacySpliterator ;
65+ import  tech .ydb .yoj .repository .ydb .spliterator .legacy .YdbNewLegacySpliterator ;
5766import  tech .ydb .yoj .repository .ydb .statement .Statement ;
5867import  tech .ydb .yoj .repository .ydb .table .YdbTable ;
5968import  tech .ydb .yoj .util .lang .Interrupts ;
6069
6170import  java .time .Duration ;
6271import  java .util .ArrayList ;
72+ import  java .util .Iterator ;
6373import  java .util .List ;
6474import  java .util .Map ;
75+ import  java .util .concurrent .CompletableFuture ;
6576import  java .util .concurrent .TimeUnit ;
6677import  java .util .function .Supplier ;
6778import  java .util .stream .Collectors ;
@@ -78,7 +89,7 @@ public class YdbRepositoryTransaction<REPO extends YdbRepository>
7889    private  static  final  Logger  log  = LoggerFactory .getLogger (YdbRepositoryTransaction .class );
7990
8091    private  final  List <YdbRepository .Query <?>> pendingWrites  = new  ArrayList <>();
81-     private  final  List <YdbSpliterator <?>> spliterators  = new  ArrayList <>();
92+     private  final  List <ClosableSpliterator <?>> spliterators  = new  ArrayList <>();
8293
8394    @ Getter 
8495    private  final  TxOptions  options ;
@@ -102,8 +113,8 @@ public YdbRepositoryTransaction(REPO repo, @NonNull TxOptions options) {
102113        this .cache  = options .isFirstLevelCache () ? new  RepositoryCacheImpl () : RepositoryCache .empty ();
103114    }
104115
105-     private  <V > YdbSpliterator <V > createSpliterator (String  request , boolean  isOrdered ) {
106-         YdbSpliterator <V > spliterator  = new  YdbSpliterator <>(request , isOrdered );
116+     private  <V > YdbNewLegacySpliterator <V > createSpliterator (String  request , boolean  isOrdered ) {
117+         YdbNewLegacySpliterator <V > spliterator  = new  YdbNewLegacySpliterator <>(request , isOrdered );
107118        spliterators .add (spliterator );
108119        return  spliterator ;
109120    }
@@ -153,7 +164,7 @@ private void doCommit() {
153164
154165    private  void  closeStreams () {
155166        Exception  summaryException  = null ;
156-         for  (YdbSpliterator <?> spliterator  : spliterators ) {
167+         for  (ClosableSpliterator <?> spliterator  : spliterators ) {
157168            try  {
158169                spliterator .close ();
159170            } catch  (Exception  e ) {
@@ -387,7 +398,7 @@ public <PARAMS, RESULT> Stream<RESULT> executeScanQuery(Statement<PARAMS, RESULT
387398        String  yql  = getYql (statement );
388399        Params  sdkParams  = getSdkParams (statement , params );
389400
390-         YdbSpliterator <RESULT > spliterator  = createSpliterator ("scanQuery: "  + yql , false );
401+         YdbNewLegacySpliterator <RESULT > spliterator  = createSpliterator ("scanQuery: "  + yql , false );
391402
392403        initSession ();
393404        session .executeScanQuery (
@@ -489,7 +500,7 @@ public <PARAMS, RESULT> Stream<RESULT> readTable(ReadTableMapper<PARAMS, RESULT>
489500        }
490501
491502        if  (params .isUseNewSpliterator ()) {
492-             YdbSpliterator <RESULT > spliterator  = createSpliterator ("readTable: "  + tableName , params .isOrdered ());
503+             YdbNewLegacySpliterator <RESULT > spliterator  = createSpliterator ("readTable: "  + tableName , params .isOrdered ());
493504
494505            initSession ();
495506            session .readTable (
@@ -500,6 +511,30 @@ public <PARAMS, RESULT> Stream<RESULT> readTable(ReadTableMapper<PARAMS, RESULT>
500511            return  spliterator .createStream ();
501512        }
502513
514+         if  (params .isUseNewSpliterator2 ()) {
515+             initSession ();
516+ 
517+             // TODO: configure stream timeout 
518+             YdbSpliteratorQueue <Iterator <RESULT >> queue  = new  YdbSpliteratorQueue <>(1 , Duration .ofMinutes (5 ));
519+ 
520+             var  adapter  = new  YdbSpliteratorQueueGrpcStreamAdapter <>("readTable: "  + tableName , queue );
521+             GrpcReadStream <ReadTablePart > grpcStream  = session .executeReadTable (tableName , settings .build ());
522+             CompletableFuture <Status > future  = grpcStream .start (readTablePart  -> {
523+                 ResultSetIterator <RESULT > iterator  = new  ResultSetIterator <>(
524+                         readTablePart .getResultSetReader (),
525+                         mapper ::mapResult 
526+                 );
527+                 adapter .onNext (iterator );
528+             });
529+             future .whenComplete (adapter ::onSupplierThreadComplete );
530+ 
531+             YdbSpliterator <RESULT > spliterator  = new  YdbSpliterator <>(queue , params .isOrdered ());
532+ 
533+             spliterators .add (spliterator );
534+ 
535+             return  spliterator .createStream ();
536+         }
537+ 
503538        try  {
504539            YdbLegacySpliterator <RESULT > spliterator  = new  YdbLegacySpliterator <>(params .isOrdered (), action  ->
505540                    doCall ("read table "  + mapper .getTableName ("" ), () -> {
0 commit comments