Skip to content

DOCSP-34008: Split large change events #374

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Dec 27, 2024
Merged
79 changes: 72 additions & 7 deletions source/fundamentals/crud/read-operations/change-streams.txt
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ for only specified change events. Create the pipeline by using the
You can specify the following aggregation stages in the ``pipeline`` parameter:

- ``$addFields``
- ``$changeStreamSplitLargeEvent``
- ``$match``
- ``$project``
- ``$replaceRoot``
Expand All @@ -119,9 +120,19 @@ You can specify the following aggregation stages in the ``pipeline`` parameter:
- ``$set``
- ``$unset``

To learn how to build an aggregation pipeline by using the
``PipelineDefinitionBuilder`` class, see :ref:`csharp-builders-aggregation` in
the Operations with Builders guide.
.. tip::

To learn how to build an aggregation pipeline by using the
``PipelineDefinitionBuilder`` class, see :ref:`csharp-builders-aggregation` in
the Operations with Builders guide.

To learn more about modifying your change stream output, see the
:manual:`Modify Change Stream Output
</changeStreams/#modify-change-stream-output>` section in the {+mdb-server+}
manual.

Monitor Update Events Example
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The following example uses the ``pipeline`` parameter to open a change stream
that records only update operations. Select the :guilabel:`Asynchronous` or :guilabel:`Synchronous` tab to see the
Expand All @@ -145,10 +156,64 @@ corresponding code.
:end-before: end-change-stream-pipeline
:language: csharp

To learn more about modifying your change stream output, see the
:manual:`Modify Change Stream Output
</changeStreams/#modify-change-stream-output>` section in the {+mdb-server+}
manual.
Split Large Change Events Example
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

If your application generates change events that exceed 16 MB in size, the
{+driver-short+} returns an error. To avoid this error, you can use
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Do we know the type of error it returns?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The server error is BSONObjectTooLarge, I'll include that

the ``$changeStreamSplitLargeEvent`` pipeline stage to split the events
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we talk about the $changeStreamSplitLargeEvent, but we do not explicitly say that we have it as a step in the typed aggregation api, called ChangeStreamSplitLargeEvent. We mention this only in the code at the bottom of this section. I think it would be better to make this clear from the start

into smaller fragments.

After you receive the change stream event fragments, you can use the
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to show how to use ChangeStreamSplitLargeEvent first (as done in the last code section) an then show how to merge fragments, as this is a secondary aspect.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I use the helper methods in the ChangeStreamSplitLargeEvent example, I decided to introduce the helper methods first - but the actual split event code example is the focus of the section so it does make sense to include it first. Updated!

following helper methods to reassemble the fragments into a single
change stream document:

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably should mention that fragments reassembling is optional (but probably needed), and the split events can be watched as usual events as they arrive.

.. tabs::

.. tab:: Asynchronous
:tabid: split-event-helpers-async

.. literalinclude:: /includes/code-examples/change-streams/change-streams.cs
:start-after: start-split-event-helpers-async
:end-before: end-split-event-helpers-async
:language: csharp

.. tab:: Synchronous
:tabid: split-event-helpers-sync

.. literalinclude:: /includes/code-examples/change-streams/change-streams.cs
:start-after: start-split-event-helpers-sync
:end-before: end-split-event-helpers-sync
:language: csharp

This example instructs the driver to watch for changes and split
change events that exceed the 16 MB limit. The code prints the
change document for each event and calls the preceding helper methods to
reassemble any event fragments:

.. tabs::

.. tab:: Asynchronous
:tabid: change-stream-split-async

.. literalinclude:: /includes/code-examples/change-streams/change-streams.cs
:start-after: start-split-change-event-async
:end-before: end-split-change-event-async
:language: csharp

.. tab:: Synchronous
:tabid: change-stream-split-sync

.. literalinclude:: /includes/code-examples/change-streams/change-streams.cs
:start-after: start-split-change-event-sync
:end-before: end-split-change-event-sync
:language: csharp

.. tip::

To learn more about splitting large change events, see the
:manual:`$changeStreamSplitLargeEvent </reference/operator/aggregation/changeStreamSplitLargeEvent/>`
page in the {+mdb-server+} manual.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit based on how other Server doc links on this page word this.

Suggested change
To learn more about splitting large change events, see the
:manual:`$changeStreamSplitLargeEvent </reference/operator/aggregation/changeStreamSplitLargeEvent/>`
page in the {+mdb-server+} manual.
To learn more about splitting large change events, see
:manual:`$changeStreamSplitLargeEvent </reference/operator/aggregation/changeStreamSplitLargeEvent/>`
in the {+mdb-server+} manual.


Modify ``Watch()`` Behavior
---------------------------
Expand Down
109 changes: 109 additions & 0 deletions source/includes/code-examples/change-streams/change-streams.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,115 @@ await cursor.ForEachAsync(change =>
}
// end-change-stream-pipeline

// start-split-event-helpers-sync
// Fetches the next complete change stream event
private static ChangeStreamDocument<TDocument> GetNextChangeStreamEvent<TDocument>(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BorisDog, can you take a look at the code examples? It seems you've worked on the implementation of this so you can better spot errors

IEnumerator<ChangeStreamDocument<TDocument>> changeStreamEnumerator)
{
var changeStreamEvent = changeStreamEnumerator.Current;

// Reassembles change event fragments if the event is split
if (changeStreamEvent.SplitEvent != null)
{
var fragment = changeStreamEvent;
while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of)
{
changeStreamEnumerator.MoveNext();
fragment = changeStreamEnumerator.Current;
MergeFragment(changeStreamEvent, fragment);
}
}
return changeStreamEvent;
}

// Merges a fragment into the base event
private static void MergeFragment<TDocument>(
ChangeStreamDocument<TDocument> changeStreamEvent,
ChangeStreamDocument<TDocument> fragment)
{
foreach (var element in fragment.BackingDocument)
{
if (element.Name != "_id" && element.Name != "splitEvent")
{
changeStreamEvent.BackingDocument[element.Name] = element.Value;
}
}
}
// end-split-event-helpers-sync

// start-split-event-helpers-async
// Fetches the next complete change stream event
private static async Task<ChangeStreamDocument<TDocument>> GetNextChangeStreamEventAsync<TDocument>(
IAsyncCursor<ChangeStreamDocument<TDocument>> changeStreamCursor)
{
var changeStreamEvent = changeStreamCursor.Current.First();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Async path needs to be enumerated differently,
The whole batch returned by Current needs to be inspected. There are few alternatives to do so, I'll share an example later if you don't find one beforehand.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shared the code sample in slack.


// Reassembles change event fragments if the event is split
if (changeStreamEvent.SplitEvent != null)
{
var fragment = changeStreamEvent;
while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of)
{
if (!await changeStreamCursor.MoveNextAsync())
{
throw new InvalidOperationException("Incomplete split event fragments.");
}
fragment = changeStreamCursor.Current.First();
MergeFragment(changeStreamEvent, fragment);
}
}
return changeStreamEvent;
}

// Merges a fragment into the base event
private static void MergeFragment<TDocument>(
ChangeStreamDocument<TDocument> changeStreamEvent,
ChangeStreamDocument<TDocument> fragment)
{
foreach (var element in fragment.BackingDocument)
{
if (element.Name != "_id" && element.Name != "splitEvent")
{
changeStreamEvent.BackingDocument[element.Name] = element.Value;
}
}
}
// end-split-event-helpers-async

// start-split-change-event-async
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.ChangeStreamSplitLargeEvent();

using (var cursor = await _restaurantsCollection.WatchAsync(pipeline))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Not limited to this PR but it looks like some examples on this page use collection and some use _restaurantsCollection – should we standardize the name?

{
while (await cursor.MoveNextAsync())
{
foreach (var changeStreamEvent in cursor.Current)
{
var completeEvent = await GetNextChangeStreamEventAsync(cursor);
Console.WriteLine("Received the following change: " + completeEvent.BackingDocument);
}
}
}
// end-split-change-event-async

// start-split-change-event-sync
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.ChangeStreamSplitLargeEvent();

using (var cursor = _restaurantsCollection.Watch(pipeline))
{
using (var enumerator = cursor.ToEnumerable().GetEnumerator())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BorisDog do we need to do all of this here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Short answer: unfortunately yes.
We just demonstrate how to merge the events if the user chooses to. Our spec does not prescribe any merging functionality, so we did not implement a shortcut for that in our API. We also didn't have any requests for that yet.

{
while (enumerator.MoveNext())
{
var completeEvent = GetNextChangeStreamEvent(enumerator);
Console.WriteLine("Received the following change: " + completeEvent.BackingDocument);
}
}
}
// end-split-change-event-sync

// start-change-stream-post-image
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
Expand Down
Loading