Skip to content

DOCSP-34008: Split large change events #374

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 81 additions & 7 deletions source/fundamentals/crud/read-operations/change-streams.txt
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ for only specified change events. Create the pipeline by using the
You can specify the following aggregation stages in the ``pipeline`` parameter:

- ``$addFields``
- ``$changeStreamSplitLargeEvent``
- ``$match``
- ``$project``
- ``$replaceRoot``
Expand All @@ -119,9 +120,19 @@ You can specify the following aggregation stages in the ``pipeline`` parameter:
- ``$set``
- ``$unset``

To learn how to build an aggregation pipeline by using the
``PipelineDefinitionBuilder`` class, see :ref:`csharp-builders-aggregation` in
the Operations with Builders guide.
.. tip::

To learn how to build an aggregation pipeline by using the
``PipelineDefinitionBuilder`` class, see :ref:`csharp-builders-aggregation` in
the Operations with Builders guide.

To learn more about modifying your change stream output, see the
:manual:`Modify Change Stream Output
</changeStreams/#modify-change-stream-output>` section in the {+mdb-server+}
manual.

Monitor Update Events Example
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The following example uses the ``pipeline`` parameter to open a change stream
that records only update operations. Select the :guilabel:`Asynchronous` or :guilabel:`Synchronous` tab to see the
Expand All @@ -145,10 +156,73 @@ corresponding code.
:end-before: end-change-stream-pipeline
:language: csharp

To learn more about modifying your change stream output, see the
:manual:`Modify Change Stream Output
</changeStreams/#modify-change-stream-output>` section in the {+mdb-server+}
manual.
Split Large Change Events Example
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

If your application generates change events that exceed 16 MB in size, the
server returns a ``BSONObjectTooLarge`` error. To avoid this error, you can use
the ``$changeStreamSplitLargeEvent`` pipeline stage to split the events
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we talk about the $changeStreamSplitLargeEvent, but we do not explicitly say that we have it as a step in the typed aggregation api, called ChangeStreamSplitLargeEvent. We mention this only in the code at the bottom of this section. I think it would be better to make this clear from the start

into smaller fragments. The {+driver-short+} aggregation API includes the
``ChangeStreamSplitLargeEvent()`` method, which you can use to add the
``$changeStreamSplitLargeEvent`` stage to the change stream pipeline.

This example instructs the driver to watch for changes and split
change events that exceed the 16 MB limit. The code prints the
change document for each event and calls helper methods to
reassemble any event fragments:

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably should mention that fragments reassembling is optional (but probably needed), and the split events can be watched as usual events as they arrive.

.. tabs::

.. tab:: Asynchronous
:tabid: change-stream-split-async

.. literalinclude:: /includes/code-examples/change-streams/change-streams.cs
:start-after: start-split-change-event-async
:end-before: end-split-change-event-async
:language: csharp

.. tab:: Synchronous
:tabid: change-stream-split-sync

.. literalinclude:: /includes/code-examples/change-streams/change-streams.cs
:start-after: start-split-change-event-sync
:end-before: end-split-change-event-sync
:language: csharp

.. note::

We recommend reassembling change event fragments, as shown in the
preceding example, but this step is optional. You can use the same
logic to watch both split and complete change events.

The preceding example uses the ``GetNextChangeStreamEvent()``,
``GetNextChangeStreamEventAsync()``, and ``MergeFragment()``
methods to reassemble change event fragments into a single change stream document.
The following code defines these methods:

.. tabs::

.. tab:: Asynchronous
:tabid: split-event-helpers-async

.. literalinclude:: /includes/code-examples/change-streams/change-streams.cs
:start-after: start-split-event-helpers-async
:end-before: end-split-event-helpers-async
:language: csharp

.. tab:: Synchronous
:tabid: split-event-helpers-sync

.. literalinclude:: /includes/code-examples/change-streams/change-streams.cs
:start-after: start-split-event-helpers-sync
:end-before: end-split-event-helpers-sync
:language: csharp

.. tip::

To learn more about splitting large change events, see
:manual:`$changeStreamSplitLargeEvent </reference/operator/aggregation/changeStreamSplitLargeEvent/>`
in the {+mdb-server+} manual.

Modify ``Watch()`` Behavior
---------------------------
Expand Down
117 changes: 113 additions & 4 deletions source/includes/code-examples/change-streams/change-streams.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ await cursor.ForEachAsync(change =>
.Match(change => change.OperationType == ChangeStreamOperationType.Update);

// Opens a change stream and prints the changes as they're received
using (var cursor = await _restaurantsCollection.WatchAsync(pipeline))
using (var cursor = await collection.WatchAsync(pipeline))
{
await cursor.ForEachAsync(change =>
{
Expand All @@ -56,7 +56,7 @@ await cursor.ForEachAsync(change =>
.Match(change => change.OperationType == ChangeStreamOperationType.Update);

// Opens a change streams and print the changes as they're received
using (var cursor = _restaurantsCollection.Watch(pipeline))
using (var cursor = collection.Watch(pipeline))
{
foreach (var change in cursor.ToEnumerable())
{
Expand All @@ -65,6 +65,115 @@ await cursor.ForEachAsync(change =>
}
// end-change-stream-pipeline

// start-split-event-helpers-sync
// Fetches the next complete change stream event
private static IEnumerable<ChangeStreamDocument<TDocument>> GetNextChangeStreamEvent<TDocument>(
IEnumerator<ChangeStreamDocument<TDocument>> changeStreamEnumerator)
{
while (changeStreamEnumerator.MoveNext())
{
var changeStreamEvent = changeStreamEnumerator.Current;
if (changeStreamEvent.SplitEvent != null)
{
var fragment = changeStreamEvent;
while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of)
{
changeStreamEnumerator.MoveNext();
fragment = changeStreamEnumerator.Current;
MergeFragment(changeStreamEvent, fragment);
}
}
yield return changeStreamEvent;
}
}

// Merges a fragment into the base event
private static void MergeFragment<TDocument>(
ChangeStreamDocument<TDocument> changeStreamEvent,
ChangeStreamDocument<TDocument> fragment)
{
foreach (var element in fragment.BackingDocument)
{
if (element.Name != "_id" && element.Name != "splitEvent")
{
changeStreamEvent.BackingDocument[element.Name] = element.Value;
}
}
}
// end-split-event-helpers-sync

// start-split-event-helpers-async
// Fetches the next complete change stream event
private static async IAsyncEnumerable<ChangeStreamDocument<TDocument>> GetNextChangeStreamEventAsync<TDocument>(
IAsyncCursor<ChangeStreamDocument<TDocument>> changeStreamCursor)
{
var changeStreamEnumerator = GetNextChangeStreamEventFragmentAsync(changeStreamCursor).GetAsyncEnumerator();
while (await changeStreamEnumerator.MoveNextAsync())
{
var changeStreamEvent = changeStreamEnumerator.Current;
if (changeStreamEvent.SplitEvent != null)
{
var fragment = changeStreamEvent;
while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of)
{
await changeStreamEnumerator.MoveNextAsync();
fragment = changeStreamEnumerator.Current;
MergeFragment(changeStreamEvent, fragment);
}
}
yield return changeStreamEvent;
}
}

private static async IAsyncEnumerable<ChangeStreamDocument<TDocument>> GetNextChangeStreamEventFragmentAsync<TDocument>(
IAsyncCursor<ChangeStreamDocument<TDocument>> changeStreamCursor)
{
while (await changeStreamCursor.MoveNextAsync())
{
foreach (var changeStreamEvent in changeStreamCursor.Current)
{
yield return changeStreamEvent;
}
}
}

// Merges a fragment into the base event
private static void MergeFragment<TDocument>(
ChangeStreamDocument<TDocument> changeStreamEvent,
ChangeStreamDocument<TDocument> fragment)
{
foreach (var element in fragment.BackingDocument)
{
if (element.Name != "_id" && element.Name != "splitEvent")
{
changeStreamEvent.BackingDocument[element.Name] = element.Value;
}
}
}
// end-split-event-helpers-async

// start-split-change-event-sync
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.ChangeStreamSplitLargeEvent();

using var cursor = collection.Watch(pipeline);
foreach (var completeEvent in GetNextChangeStreamEvent(cursor.ToEnumerable().GetEnumerator()))
{
Console.WriteLine("Received the following change: " + completeEvent.BackingDocument);
}
// end-split-change-event-sync

// start-split-change-event-async
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.ChangeStreamSplitLargeEvent();

using var cursor = await collection.WatchAsync(pipeline);
await foreach (var completeEvent in GetNextChangeStreamEventAsync(cursor))
{
Console.WriteLine("Received the following change: " + completeEvent.BackingDocument);
}
// end-split-change-event-async

// start-change-stream-post-image
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
Expand All @@ -74,7 +183,7 @@ await cursor.ForEachAsync(change =>
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,
};

using (var cursor = _restaurantsCollection.Watch(pipeline, options))
using (var cursor = collection.Watch(pipeline, options))
{
foreach (var change in cursor.ToEnumerable())
{
Expand All @@ -92,7 +201,7 @@ await cursor.ForEachAsync(change =>
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,
};

using var cursor = await _restaurantsCollection.WatchAsync(pipeline, options);
using var cursor = await collection.WatchAsync(pipeline, options);
await cursor.ForEachAsync(change =>
{
Console.WriteLine(change.FullDocument.ToBsonDocument());
Expand Down
Loading