@@ -687,38 +687,59 @@ ACTOR static Future<Void> doNonIsolatedRO(PlanCheckpoint* outerCheckpoint,
687687 Reference<MetadataManager> mm) {
688688 if (!dtr)
689689 dtr = self->newTransaction ();
690- state double startt = now ();
691690 state Reference<PlanCheckpoint> innerCheckpoint (new PlanCheckpoint);
692691 state int nTransactions = 1 ;
693692 state int64_t nResults = 0 ;
694693 state FlowLock* outerLock = outerCheckpoint->getDocumentFinishedLock ();
694+ state Deque<std::pair<Reference<ScanReturnedContext>, Future<Void>>> bufferedDocs;
695+
695696 try {
696697 state uint64_t metadataVersion = wait (cx->bindCollectionContext (dtr)->getMetadataVersion ());
697698 loop {
698699 state FutureStream<Reference<ScanReturnedContext>> docs = subPlan->execute (innerCheckpoint.getPtr (), dtr);
699700 state FlowLock* innerLock = innerCheckpoint->getDocumentFinishedLock ();
700701 state bool first = true ;
701- state Future<Void> timeout = delay (3.0 );
702+ state Future<Void> timeout = delay (3.0 , g_network->getCurrentTask () + 1 );
703+ state bool finished = false ;
702704
703- loop choose {
704- when (state Reference<ScanReturnedContext> doc = waitNext (docs)) {
705- // throws end_of_stream when totally finished
706- Void _ = wait (outerLock->take ());
707- innerLock->release ();
708- output.send (doc);
709- ++nResults;
710- if (first) {
711- timeout = delay (DOCLAYER_KNOBS->NONISOLATED_INTERNAL_TIMEOUT );
712- first = false ;
705+ try {
706+ loop choose {
707+ when (Reference<ScanReturnedContext> doc = waitNext (docs)) {
708+ // throws end_of_stream when totally finished
709+ bufferedDocs.push_back (std::make_pair (doc, outerLock->take (g_network->getCurrentTask () + 1 )));
710+ }
711+ when (Void _ = wait (bufferedDocs.empty () ? Never () : bufferedDocs.front ().second )) {
712+ innerLock->release ();
713+ output.send (bufferedDocs.front ().first );
714+ bufferedDocs.pop_front ();
715+ ++nResults;
716+ if (first) {
717+ timeout =
718+ delay (DOCLAYER_KNOBS->NONISOLATED_INTERNAL_TIMEOUT , g_network->getCurrentTask () + 1 );
719+ first = false ;
720+ }
713721 }
722+ when (Void _ = wait (timeout)) { break ; }
714723 }
715- when (Void _ = wait (timeout)) { break ; }
724+ ASSERT (!docs.isReady ());
725+ } catch (Error& e) {
726+ if (e.code () != error_code_end_of_stream)
727+ throw ;
728+ finished = true ;
716729 }
717730
718- ASSERT (!docs.isReady ());
719-
720731 innerCheckpoint = innerCheckpoint->stopAndCheckpoint ();
721732
733+ while (!bufferedDocs.empty ()) {
734+ Void _ = wait (bufferedDocs.front ().second );
735+ output.send (bufferedDocs.front ().first );
736+ bufferedDocs.pop_front ();
737+ ++nResults;
738+ }
739+
740+ if (finished)
741+ throw end_of_stream ();
742+
722743 dtr = self->newTransaction ();
723744 state uint64_t newMetadataVersion = wait (cx->bindCollectionContext (dtr)->getMetadataVersion ());
724745 if (newMetadataVersion != metadataVersion) {
@@ -763,7 +784,7 @@ ACTOR static Future<Void> doNonIsolatedRW(PlanCheckpoint* outerCheckpoint,
763784 state FlowLock* innerLock = innerCheckpoint->getDocumentFinishedLock ();
764785 state bool first = true ;
765786 state bool finished = false ;
766- state Future<Void> timeout = delay (3.0 );
787+ state Future<Void> timeout = delay (3.0 , g_network-> getCurrentTask () + 1 );
767788 state Deque<std::pair<Reference<ScanReturnedContext>, Future<Void>>> committingDocs;
768789 state Deque<Reference<ScanReturnedContext>> bufferedDocs;
769790
@@ -781,7 +802,8 @@ ACTOR static Future<Void> doNonIsolatedRW(PlanCheckpoint* outerCheckpoint,
781802 waitNext (docs)) { // throws end_of_stream when totally finished
782803 committingDocs.push_back (std::make_pair (doc, doc->commitChanges ()));
783804 if (first) {
784- timeout = delay (DOCLAYER_KNOBS->NONISOLATED_INTERNAL_TIMEOUT );
805+ timeout = delay (DOCLAYER_KNOBS->NONISOLATED_INTERNAL_TIMEOUT ,
806+ g_network->getCurrentTask () + 1 );
785807 first = false ;
786808 }
787809 }
@@ -1148,10 +1170,8 @@ ACTOR static Future<Void> findAndModify(PlanCheckpoint* outerCheckpoint,
11481170 PromiseStream<Reference<ScanReturnedContext>> output) {
11491171 if (!dtr)
11501172 dtr = NonIsolatedPlan::newTransaction (database);
1151- state double startt = now ();
11521173 state Reference<PlanCheckpoint> innerCheckpoint (new PlanCheckpoint);
11531174 state int nTransactions = 1 ;
1154- state int64_t nResults = 0 ;
11551175 state FlowLock* outerLock = outerCheckpoint->getDocumentFinishedLock ();
11561176 state Reference<ScanReturnedContext> firstDoc;
11571177 state bool any = false ;
0 commit comments