@@ -335,7 +335,6 @@ ACTOR static Future<Void> toDocInfo(PlanCheckpoint* checkpoint,
335335 inputLock->release ();
336336 Void _ = wait (outputLock->take ());
337337 lastKey = Key (kv.key , kv.arena ());
338- // fprintf(stderr, "lastkey: %s\n", printable(lastKey).c_str());
339338 Standalone<StringRef> last (DataKey::decode_item_rev (kv.key , 0 ), kv.arena ());
340339 Reference<ScanReturnedContext> output (new ScanReturnedContext (base->getSubContext (last), scanID, lastKey));
341340 dis.send (output);
@@ -373,9 +372,6 @@ ACTOR static Future<bool> simpleWouldBeLast(Reference<ScanReturnedContext> doc,
373372 break ;
374373 }
375374 }
376- // fprintf(stderr, "SWBL last: %s\n", printable(last).c_str());
377- // fprintf(stderr, "SWBL scankey: %s\n", printable(doc->scanKey()).c_str());
378- // fprintf(stderr, "SWBL upperBound: %s\n", printable(indexUpperBound).c_str());
379375 if (doc->scanKey ().startsWith (last))
380376 return true ;
381377 }
@@ -416,9 +412,6 @@ ACTOR static Future<bool> compoundWouldBeLast(Reference<ScanReturnedContext> doc
416412 break ;
417413 }
418414 }
419- // fprintf(stderr, "CWBL last: %s\n", printable(last).c_str());
420- // fprintf(stderr, "CWBL scankey: %s\n", printable(doc->scanKey()).c_str());
421- // fprintf(stderr, "CWBL upperBound: %s\n", printable(indexUpperBound).c_str());
422415 if (doc->scanKey ().startsWith (last))
423416 return true ;
424417 }
@@ -558,7 +551,6 @@ ACTOR static Future<Void> doPKScan(PlanCheckpoint* checkpoint,
558551 lastKey = Key (kv.key , kv.arena ());
559552 }
560553 } catch (Error& e) {
561- // fprintf(stderr, "doPKScan: %s %d\n", e.what(), checkpoint->splitBoundWanted());
562554 if (e.code () == error_code_actor_cancelled) {
563555 if (checkpoint->splitBoundWanted ()) {
564556 DataKey splitKey = DataKey::decode_bytes (lastKey);
@@ -577,8 +569,8 @@ FutureStream<Reference<ScanReturnedContext>> PrimaryKeyLookupPlan::execute(PlanC
577569 Reference<CollectionContext> bcx = cx->bindCollectionContext (tr);
578570 if (begin.present () && end.present () && begin.get () == end.get ()) {
579571 PromiseStream<Reference<ScanReturnedContext>> p;
580- checkpoint-> addOperation ( doSinglePKLookup (checkpoint, p, bcx, begin. get (), scanID),
581- p); // ??? Can we skip this overhead?
572+ // ??? Can we skip this overhead?
573+ checkpoint-> addOperation ( doSinglePKLookup (checkpoint, p, bcx, begin. get (), scanID), p);
582574 return p.getFuture ();
583575 } else {
584576 PromiseStream<Reference<ScanReturnedContext>> p;
@@ -590,7 +582,6 @@ FutureStream<Reference<ScanReturnedContext>> PrimaryKeyLookupPlan::execute(PlanC
590582 Standalone<StringRef> endKey = std::max<Standalone<StringRef>>(
591583 beginKey, std::min (end.present () ? strinc (end.get ().encode_key_part ()) : LiteralStringRef (" \xff " ),
592584 checkpoint->getBounds (scanID).end ));
593- // fprintf(stderr, "PK scan executing from %s to %s\n", printable(beginKey).c_str(), printable(endKey).c_str());
594585
595586 GenFutureStream<KeyValue> kvs = bcx->cx ->getDescendants (beginKey, endKey, descendantFlowControlLock);
596587 checkpoint->addOperation (doPKScan (checkpoint, bcx, scanID, kvs, p, descendantFlowControlLock),
@@ -704,17 +695,14 @@ ACTOR static Future<Void> doNonIsolatedRO(PlanCheckpoint* outerCheckpoint,
704695 try {
705696 state uint64_t metadataVersion = wait (cx->bindCollectionContext (dtr)->getMetadataVersion ());
706697 loop {
707- // printf("Trying nonIsolatedRO with %d outputs and checkpoint '%s'-'%s'\n", nResults,
708- // printable(innerCheckpoint->getBounds(0).begin).c_str(),
709- // printable(innerCheckpoint->getBounds(0).end).c_str());
710698 state FutureStream<Reference<ScanReturnedContext>> docs = subPlan->execute (innerCheckpoint.getPtr (), dtr);
711699 state FlowLock* innerLock = innerCheckpoint->getDocumentFinishedLock ();
712700 state bool first = true ;
713701 state Future<Void> timeout = delay (3.0 );
714702
715703 loop choose {
716- when (state Reference<ScanReturnedContext> doc =
717- waitNext (docs)) { // throws end_of_stream when totally finished
704+ when (state Reference<ScanReturnedContext> doc = waitNext (docs)) {
705+ // throws end_of_stream when totally finished
718706 Void _ = wait (outerLock->take ());
719707 innerLock->release ();
720708 output.send (doc);
@@ -723,7 +711,6 @@ ACTOR static Future<Void> doNonIsolatedRO(PlanCheckpoint* outerCheckpoint,
723711 timeout = delay (DOCLAYER_KNOBS->NONISOLATED_INTERNAL_TIMEOUT );
724712 first = false ;
725713 }
726- // if (oCount == 3) timeout = delay(0);
727714 }
728715 when (Void _ = wait (timeout)) { break ; }
729716 }
@@ -785,9 +772,10 @@ ACTOR static Future<Void> doNonIsolatedRW(PlanCheckpoint* outerCheckpoint,
785772 loop {
786773 if (bufferedDocs.size () + committingDocs.size () >=
787774 DOCLAYER_KNOBS->NONISOLATED_RW_INTERNAL_BUFFER_MAX )
788- timeout = delay (0 ); // We do this instead of breaking so that when stopAndCheckpoint() gets
789- // called below, the actor for the plan immediately inside us is never
790- // on the call stack, so gets its actor_cancelled delivered immediately.
775+ // We do this instead of breaking so that when stopAndCheckpoint() gets
776+ // called below, the actor for the plan immediately inside us is never
777+ // on the call stack, so gets its actor_cancelled delivered immediately.
778+ timeout = delay (0 );
791779 choose {
792780 when (state Reference<ScanReturnedContext> doc =
793781 waitNext (docs)) { // throws end_of_stream when totally finished
@@ -1045,10 +1033,10 @@ ACTOR static Future<Void> doFlushChanges(PlanCheckpoint* checkpoint,
10451033 try {
10461034 choose {
10471035 when (Reference<ScanReturnedContext> nextInput = waitNext (input)) {
1036+ // FIXME: this will be unsafe with unique indexes. Something has to happen here that doesn't
1037+ // kill performance.
10481038 futures.push_back (std::pair<Reference<ScanReturnedContext>, Future<Void>>(
1049- nextInput,
1050- nextInput->commitChanges ())); // FIXME: this will be unsafe with unique indexes. Something
1051- // has to happen here that doesn't kill performance.
1039+ nextInput, nextInput->commitChanges ()));
10521040 }
10531041 when (Void _ = wait (futures.empty () ? Never () : futures.front ().second )) {
10541042 output.send (futures.front ().first );
@@ -1127,7 +1115,6 @@ ACTOR static Future<Void> doUpdate(PlanCheckpoint* checkpoint,
11271115
11281116 throw end_of_stream ();
11291117 } catch (Error& e) {
1130- // printf("doUpdate: %s\n", e.what());
11311118 if (e.code () == error_code_actor_cancelled) {
11321119 if (checkpoint->splitBoundWanted ()) {
11331120 for (int i = futures.size () - 1 ; i >= 0 ; i--)
@@ -1172,18 +1159,15 @@ ACTOR static Future<Void> findAndModify(PlanCheckpoint* outerCheckpoint,
11721159 try {
11731160 state uint64_t metadataVersion = wait (cx->bindCollectionContext (dtr)->getMetadataVersion ());
11741161 loop {
1175- // printf("Trying nonIsolatedRO with %d outputs and checkpoint '%s'-'%s'\n", nResults,
1176- // printable(innerCheckpoint->getBounds(0).begin).c_str(),
1177- // printable(innerCheckpoint->getBounds(0).end).c_str());
11781162 state FutureStream<Reference<ScanReturnedContext>> docs = subPlan->execute (innerCheckpoint.getPtr (), dtr);
11791163 state FlowLock* innerLock = innerCheckpoint->getDocumentFinishedLock ();
11801164 state Future<Void> timeout = delay (1.0 );
11811165 state bool done = false ;
11821166
11831167 try {
11841168 loop choose {
1185- when (state Reference<ScanReturnedContext> doc =
1186- waitNext (docs)) { // throws end_of_stream when totally finished
1169+ when (state Reference<ScanReturnedContext> doc = waitNext (docs)) {
1170+ // throws end_of_stream when totally finished
11871171 firstDoc = doc;
11881172 innerLock->release ();
11891173 done = true ;
@@ -1441,10 +1425,9 @@ ACTOR static Future<Void> doInsert(PlanCheckpoint* checkpoint,
14411425 Reference<MetadataManager> mm,
14421426 Namespace ns,
14431427 PromiseStream<Reference<ScanReturnedContext>> output) {
1444- // state int64_t& inserted = checkpoint->getIntState(0); <- This is broken for now.
14451428 state Deque<Future<Reference<IReadWriteContext>>> f;
14461429 state FlowLock* flowControlLock = checkpoint->getDocumentFinishedLock ();
1447- state int i = 0 ; // = inserted;
1430+ state int i = 0 ;
14481431
14491432 try {
14501433 state Reference<UnboundCollectionContext> ucx = wait (mm->getUnboundCollectionContext (tr, ns));
@@ -1459,15 +1442,13 @@ ACTOR static Future<Void> doInsert(PlanCheckpoint* checkpoint,
14591442 when (Reference<IReadWriteContext> doc = wait (f.empty () ? Never () : f.front ())) {
14601443 output.send (ref (new ScanReturnedContext (doc, -1 , Key ()))); // Are these the right scanId etc?
14611444 f.pop_front ();
1462- // inserted++;
14631445 }
14641446 }
14651447 }
14661448 state int j = 0 ;
14671449 for (; j < f.size (); j++) {
14681450 Reference<IReadWriteContext> doc = wait (f[j]);
14691451 output.send (ref (new ScanReturnedContext (doc, -1 , Key ()))); // Are these the right scanId etc?
1470- // inserted++;
14711452 }
14721453 throw end_of_stream ();
14731454 } catch (Error& e) {
@@ -1503,11 +1484,9 @@ ACTOR static Future<Void> doSort(PlanCheckpoint* outerCheckpoint,
15031484 loop {
15041485 try {
15051486 Reference<ScanReturnedContext> doc = waitNext (docs);
1506- returnProjections.push_back (
1507- doc->toDataValue ().get ().getPackedObject ().getOwned ()); // Note that this call to get() is safe here but
1508- // not in general, because we know that
1509- // doc is wrapping a BsonContext, which means
1510- // toDataValue() is synchronous.
1487+ // Note that this call to get() is safe here but not in general, because we know that doc is wrapping a
1488+ // BsonContext, which means toDataValue() is synchronous.
1489+ returnProjections.push_back (doc->toDataValue ().get ().getPackedObject ().getOwned ());
15111490 innerLock->release ();
15121491 } catch (Error& e) {
15131492 if (e.code () == error_code_end_of_stream) {
0 commit comments