@@ -459,33 +459,45 @@ public async IAsyncEnumerable<ChangesFeedResponseResult<TSource>> GetContinuousC
459
459
request = request . ApplyQueryParametersOptions ( options ) ;
460
460
}
461
461
462
- await using Stream stream = filter == null
463
- ? await request . GetStreamAsync ( cancellationToken , HttpCompletionOption . ResponseHeadersRead )
464
- . ConfigureAwait ( false )
465
- : await request . QueryContinuousWithFilterAsync < TSource > ( _queryProvider , filter , cancellationToken )
466
- . ConfigureAwait ( false ) ;
467
-
468
- await foreach ( var line in stream . ReadLinesAsync ( cancellationToken ) )
462
+ do
469
463
{
470
- if ( string . IsNullOrEmpty ( line ) )
471
- {
472
- continue ;
473
- }
474
-
475
- MatchCollection matches = _feedChangeLineStartPattern . Matches ( line ) ;
476
- for ( var i = 0 ; i < matches . Count ; i ++ )
464
+ await using Stream stream = filter == null
465
+ ? await request . GetStreamAsync ( cancellationToken , HttpCompletionOption . ResponseHeadersRead )
466
+ . ConfigureAwait ( false )
467
+ : await request . QueryContinuousWithFilterAsync < TSource > ( _queryProvider , filter , cancellationToken )
468
+ . ConfigureAwait ( false ) ;
469
+
470
+ var lastSequence = options ? . Since ?? "0" ;
471
+
472
+ await foreach ( var line in stream . ReadLinesAsync ( cancellationToken ) )
477
473
{
478
- var startIndex = matches [ i ] . Index ;
479
- var endIndex = i < matches . Count - 1 ? matches [ i + 1 ] . Index : line . Length ;
480
- var lineLength = endIndex - startIndex ;
481
- var substring = line . Substring ( startIndex , lineLength ) ;
482
- ChangesFeedResponseResult < TSource > ? result = JsonConvert . DeserializeObject < ChangesFeedResponseResult < TSource > > ( substring ) ;
483
- if ( string . IsNullOrWhiteSpace ( _discriminator ) || result . Document . SplitDiscriminator == _discriminator )
474
+ if ( string . IsNullOrEmpty ( line ) )
475
+ {
476
+ continue ;
477
+ }
478
+
479
+ MatchCollection matches = _feedChangeLineStartPattern . Matches ( line ) ;
480
+ for ( var i = 0 ; i < matches . Count ; i ++ )
484
481
{
485
- yield return result ;
482
+ var startIndex = matches [ i ] . Index ;
483
+ var endIndex = i < matches . Count - 1 ? matches [ i + 1 ] . Index : line . Length ;
484
+ var lineLength = endIndex - startIndex ;
485
+ var substring = line . Substring ( startIndex , lineLength ) ;
486
+ ChangesFeedResponseResult < TSource > ? result =
487
+ JsonConvert . DeserializeObject < ChangesFeedResponseResult < TSource > > ( substring ) ;
488
+ if ( string . IsNullOrWhiteSpace ( _discriminator ) ||
489
+ result . Document . SplitDiscriminator == _discriminator )
490
+ {
491
+ lastSequence = result . Seq ;
492
+ yield return result ;
493
+ }
486
494
}
487
495
}
488
- }
496
+
497
+ // stream broke, pick up listening after last successful processed sequence
498
+ request = request . SetQueryParam ( "since" , lastSequence ) ;
499
+
500
+ } while ( ! cancellationToken . IsCancellationRequested ) ;
489
501
}
490
502
491
503
#endregion
0 commit comments