diff --git a/source/fundamentals/crud/read-operations/change-streams.txt b/source/fundamentals/crud/read-operations/change-streams.txt index f346ea26..d30f55a2 100644 --- a/source/fundamentals/crud/read-operations/change-streams.txt +++ b/source/fundamentals/crud/read-operations/change-streams.txt @@ -111,6 +111,7 @@ for only specified change events. Create the pipeline by using the You can specify the following aggregation stages in the ``pipeline`` parameter: - ``$addFields`` +- ``$changeStreamSplitLargeEvent`` - ``$match`` - ``$project`` - ``$replaceRoot`` @@ -119,9 +120,19 @@ You can specify the following aggregation stages in the ``pipeline`` parameter: - ``$set`` - ``$unset`` -To learn how to build an aggregation pipeline by using the -``PipelineDefinitionBuilder`` class, see :ref:`csharp-builders-aggregation` in -the Operations with Builders guide. +.. tip:: + + To learn how to build an aggregation pipeline by using the + ``PipelineDefinitionBuilder`` class, see :ref:`csharp-builders-aggregation` in + the Operations with Builders guide. + + To learn more about modifying your change stream output, see the + :manual:`Modify Change Stream Output + ` section in the {+mdb-server+} + manual. + +Monitor Update Events Example +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The following example uses the ``pipeline`` parameter to open a change stream that records only update operations. Select the :guilabel:`Asynchronous` or :guilabel:`Synchronous` tab to see the @@ -145,10 +156,73 @@ corresponding code. :end-before: end-change-stream-pipeline :language: csharp -To learn more about modifying your change stream output, see the -:manual:`Modify Change Stream Output -` section in the {+mdb-server+} -manual. +Split Large Change Events Example +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +If your application generates change events that exceed 16 MB in size, the +server returns a ``BSONObjectTooLarge`` error. To avoid this error, you can use +the ``$changeStreamSplitLargeEvent`` pipeline stage to split the events +into smaller fragments. The {+driver-short+} aggregation API includes the +``ChangeStreamSplitLargeEvent()`` method, which you can use to add the +``$changeStreamSplitLargeEvent`` stage to the change stream pipeline. + +This example instructs the driver to watch for changes and split +change events that exceed the 16 MB limit. The code prints the +change document for each event and calls helper methods to +reassemble any event fragments: + +.. tabs:: + + .. tab:: Asynchronous + :tabid: change-stream-split-async + + .. literalinclude:: /includes/code-examples/change-streams/change-streams.cs + :start-after: start-split-change-event-async + :end-before: end-split-change-event-async + :language: csharp + + .. tab:: Synchronous + :tabid: change-stream-split-sync + + .. literalinclude:: /includes/code-examples/change-streams/change-streams.cs + :start-after: start-split-change-event-sync + :end-before: end-split-change-event-sync + :language: csharp + +.. note:: + + We recommend reassembling change event fragments, as shown in the + preceding example, but this step is optional. You can use the same + logic to watch both split and complete change events. + +The preceding example uses the ``GetNextChangeStreamEvent()``, +``GetNextChangeStreamEventAsync()``, and ``MergeFragment()`` +methods to reassemble change event fragments into a single change stream document. +The following code defines these methods: + +.. tabs:: + + .. tab:: Asynchronous + :tabid: split-event-helpers-async + + .. literalinclude:: /includes/code-examples/change-streams/change-streams.cs + :start-after: start-split-event-helpers-async + :end-before: end-split-event-helpers-async + :language: csharp + + .. tab:: Synchronous + :tabid: split-event-helpers-sync + + .. literalinclude:: /includes/code-examples/change-streams/change-streams.cs + :start-after: start-split-event-helpers-sync + :end-before: end-split-event-helpers-sync + :language: csharp + +.. tip:: + + To learn more about splitting large change events, see + :manual:`$changeStreamSplitLargeEvent ` + in the {+mdb-server+} manual. Modify ``Watch()`` Behavior --------------------------- diff --git a/source/includes/code-examples/change-streams/change-streams.cs b/source/includes/code-examples/change-streams/change-streams.cs index edeaf709..b71be65a 100644 --- a/source/includes/code-examples/change-streams/change-streams.cs +++ b/source/includes/code-examples/change-streams/change-streams.cs @@ -42,7 +42,7 @@ await cursor.ForEachAsync(change => .Match(change => change.OperationType == ChangeStreamOperationType.Update); // Opens a change stream and prints the changes as they're received -using (var cursor = await _restaurantsCollection.WatchAsync(pipeline)) +using (var cursor = await collection.WatchAsync(pipeline)) { await cursor.ForEachAsync(change => { @@ -56,7 +56,7 @@ await cursor.ForEachAsync(change => .Match(change => change.OperationType == ChangeStreamOperationType.Update); // Opens a change streams and print the changes as they're received -using (var cursor = _restaurantsCollection.Watch(pipeline)) +using (var cursor = collection.Watch(pipeline)) { foreach (var change in cursor.ToEnumerable()) { @@ -65,6 +65,115 @@ await cursor.ForEachAsync(change => } // end-change-stream-pipeline +// start-split-event-helpers-sync +// Fetches the next complete change stream event +private static IEnumerable> GetNextChangeStreamEvent( +IEnumerator> changeStreamEnumerator) +{ + while (changeStreamEnumerator.MoveNext()) + { + var changeStreamEvent = changeStreamEnumerator.Current; + if (changeStreamEvent.SplitEvent != null) + { + var fragment = changeStreamEvent; + while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of) + { + changeStreamEnumerator.MoveNext(); + fragment = changeStreamEnumerator.Current; + MergeFragment(changeStreamEvent, fragment); + } + } + yield return changeStreamEvent; + } +} + +// Merges a fragment into the base event +private static void MergeFragment( + ChangeStreamDocument changeStreamEvent, + ChangeStreamDocument fragment) +{ + foreach (var element in fragment.BackingDocument) + { + if (element.Name != "_id" && element.Name != "splitEvent") + { + changeStreamEvent.BackingDocument[element.Name] = element.Value; + } + } +} +// end-split-event-helpers-sync + +// start-split-event-helpers-async +// Fetches the next complete change stream event +private static async IAsyncEnumerable> GetNextChangeStreamEventAsync( + IAsyncCursor> changeStreamCursor) +{ + var changeStreamEnumerator = GetNextChangeStreamEventFragmentAsync(changeStreamCursor).GetAsyncEnumerator(); + while (await changeStreamEnumerator.MoveNextAsync()) + { + var changeStreamEvent = changeStreamEnumerator.Current; + if (changeStreamEvent.SplitEvent != null) + { + var fragment = changeStreamEvent; + while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of) + { + await changeStreamEnumerator.MoveNextAsync(); + fragment = changeStreamEnumerator.Current; + MergeFragment(changeStreamEvent, fragment); + } + } + yield return changeStreamEvent; + } +} + +private static async IAsyncEnumerable> GetNextChangeStreamEventFragmentAsync( + IAsyncCursor> changeStreamCursor) +{ + while (await changeStreamCursor.MoveNextAsync()) + { + foreach (var changeStreamEvent in changeStreamCursor.Current) + { + yield return changeStreamEvent; + } + } +} + +// Merges a fragment into the base event +private static void MergeFragment( + ChangeStreamDocument changeStreamEvent, + ChangeStreamDocument fragment) +{ + foreach (var element in fragment.BackingDocument) + { + if (element.Name != "_id" && element.Name != "splitEvent") + { + changeStreamEvent.BackingDocument[element.Name] = element.Value; + } + } +} +// end-split-event-helpers-async + +// start-split-change-event-sync +var pipeline = new EmptyPipelineDefinition>() + .ChangeStreamSplitLargeEvent(); + +using var cursor = collection.Watch(pipeline); +foreach (var completeEvent in GetNextChangeStreamEvent(cursor.ToEnumerable().GetEnumerator())) +{ + Console.WriteLine("Received the following change: " + completeEvent.BackingDocument); +} +// end-split-change-event-sync + +// start-split-change-event-async +var pipeline = new EmptyPipelineDefinition>() + .ChangeStreamSplitLargeEvent(); + +using var cursor = await collection.WatchAsync(pipeline); +await foreach (var completeEvent in GetNextChangeStreamEventAsync(cursor)) +{ + Console.WriteLine("Received the following change: " + completeEvent.BackingDocument); +} +// end-split-change-event-async + // start-change-stream-post-image var pipeline = new EmptyPipelineDefinition>() .Match(change => change.OperationType == ChangeStreamOperationType.Update); @@ -74,7 +183,7 @@ await cursor.ForEachAsync(change => FullDocument = ChangeStreamFullDocumentOption.UpdateLookup, }; -using (var cursor = _restaurantsCollection.Watch(pipeline, options)) +using (var cursor = collection.Watch(pipeline, options)) { foreach (var change in cursor.ToEnumerable()) { @@ -92,7 +201,7 @@ await cursor.ForEachAsync(change => FullDocument = ChangeStreamFullDocumentOption.UpdateLookup, }; -using var cursor = await _restaurantsCollection.WatchAsync(pipeline, options); +using var cursor = await collection.WatchAsync(pipeline, options); await cursor.ForEachAsync(change => { Console.WriteLine(change.FullDocument.ToBsonDocument());