-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
Copy pathMongo.watch.txt
296 lines (184 loc) · 8.1 KB
/
Mongo.watch.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
=============
Mongo.watch()
=============
.. default-domain:: mongodb
.. contents:: On this page
:local:
:backlinks: none
:depth: 2
:class: singlecol
Definition
----------
.. method:: Mongo.watch( pipeline, options )
*For replica sets and sharded clusters only*
Opens a :ref:`change stream cursor <changeStreams>` for a replica
set or a sharded cluster to report on all its non-``system``
collections across its databases, with the exception of the
``admin``, ``local``, and the ``config`` databases.
.. list-table::
:header-rows: 1
:widths: 20 20 80
* - Parameter
- Type
- Description
* - ``pipeline``
- array
- Optional. An :ref:`aggregation-pipeline` consisting of one or
more of the following aggregation stages:
.. include:: /includes/extracts/changestream-available-pipeline-stages.rst
Specify a pipeline to filter/modify the change events output.
.. include:: /includes/extracts/4.2-changes-change-stream-modification-error.rst
* - ``options``
- document
- Optional. Additional options that modify the behavior of
:method:`Mongo.watch()`.
The ``options`` document can contain the following fields and values:
.. list-table::
:header-rows: 1
:widths: 20 20 80
* - Field
- Type
- Description
* - ``resumeAfter``
- document
- Optional. Directs :method:`Mongo.watch()` to attempt resuming
notifications starting after the operation specified in the resume
token.
Each change stream event document includes a resume token as the
``_id`` field. Pass the *entire* ``_id`` field of the change event
document that represents the operation you want to resume after.
``resumeAfter`` is mutually exclusive with ``startAfter`` and
``startAtOperationTime``.
* - ``startAfter``
- document
- Optional. Directs :method:`Mongo.watch()` to attempt starting a new change
stream after the operation specified in the resume token. Allows
notifications to resume after an invalidate event.
Each change stream event document includes a resume token as the
``_id`` field. Pass the *entire* ``_id`` field of the change event
document that represents the operation you want to resume after.
``startAfter`` is mutually exclusive with ``resumeAfter`` and
``startAtOperationTime``.
* - ``fullDocument``
- string
- Optional. By default, :method:`Mongo.watch()` returns the delta of
those fields modified by an update operation, instead of the entire
updated document.
Set ``fullDocument`` to ``"updateLookup"`` to direct
:method:`Mongo.watch()` to look up the most current
majority-committed version of the updated document.
:method:`Mongo.watch()` returns a ``fullDocument`` field with
the document lookup in addition to the ``updateDescription`` delta.
* - ``batchSize``
- int
- Optional. Specifies the maximum number of change events to return in each
batch of the response from the MongoDB cluster.
Has the same functionality as :method:`cursor.batchSize()`.
* - ``maxAwaitTimeMS``
- int
- Optional. The maximum amount of time in milliseconds the server waits for new
data changes to report to the change stream cursor before returning an
empty batch.
Defaults to ``1000`` milliseconds.
* - ``collation``
- document
- Optional. Pass a :ref:`collation document <collation-document-fields>`
to specify collation for the change stream cursor.
If omitted, defaults to ``simple`` binary comparison.
* - ``startAtOperationTime``
- Timestamp
- Optional. The starting point for the change stream. If the specified starting
point is in the past, it must be in the time range of the oplog. To
check the time range of the oplog, see
:method:`rs.printReplicationInfo()`.
``startAtOperationTime`` is mutually exclusive with ``resumeAfter``
and ``startAfter``.
:returns:
A :term:`cursor` over the change event documents.
See :doc:`/reference/change-events` for examples of change
event documents.
.. seealso::
:method:`db.collection.watch()` and :method:`db.watch()`
Compatibility
-------------
This method is available in deployments hosted in the following environments:
.. include:: /includes/fact-environments-atlas-only.rst
.. include:: /includes/fact-environments-onprem-only.rst
Availability
------------
Deployment
~~~~~~~~~~
:method:`Mongo.watch()` is available for replica sets and sharded
clusters:
- For a replica set, you can issue :method:`Mongo.watch()` on any
data-bearing member.
- For a sharded cluster, you must issue :method:`Mongo.watch()` on a
:binary:`~bin.mongos` instance.
Storage Engine
~~~~~~~~~~~~~~
You can only use :method:`Mongo.watch()` with the :ref:`Wired Tiger
storage engine <storage-wiredtiger>`.
Read Concern ``majority`` Support
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. include:: /includes/extracts/changestream-rc-majority-4.2.rst
Behavior
--------
- :method:`Mongo.watch()` only notifies on data changes that have
persisted to a majority of data-bearing members.
- .. include:: /includes/extracts/changestream-cursor-open.rst
Resumability
~~~~~~~~~~~~
.. include:: /includes/extracts/changestream-resume.rst
.. |watchmethod| replace:: :method:`Mongo.watch()`
.. note:: Resume Token
.. include:: /includes/extracts/changestream-resume-token-versions-4.2-greater.rst
Hex Encoded Tokens
``````````````````
.. include:: /includes/extracts/changestream-resume-token-hex-change.rst
Decode Resume Tokens
````````````````````
.. include:: /includes/note-decode-resume-tokens.rst
Full Document Lookup of Update Operations
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. include:: /includes/extracts/changestream-full-document-lookup.rst
Availability
~~~~~~~~~~~~
.. include:: /includes/extracts/changestream-rc-majority-4.2.rst
Access Control
--------------
When running with access control, the user must have the
:authaction:`find` and :authaction:`changeStream` privilege actions on
:ref:`all non-systems collections across all
databases<resource-all-but-system-collections>`.. That is, a user must
have a :ref:`role <roles>` that grants the following :ref:`privilege
<privileges>`:
.. code-block:: javascript
{ resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }
The built-in :authrole:`read` role provides the appropriate
privileges.
Cursor Iteration
----------------
.. include:: /includes/fact-multiple-cursor-monitors.rst
Example
-------
The following operation in :binary:`~bin.mongosh` opens a
change stream cursor on a replica set. The returned cursor reports on
data changes to all the non-``system`` collections across all databases
except for the ``admin``, ``local``, and the ``config`` databases.
.. code-block:: javascript
watchCursor = db.getMongo().watch()
Iterate the cursor to check for new events. Use the
:method:`cursor.isClosed()` method with the :method:`cursor.tryNext()`
method to ensure the loop only exits if the change stream cursor is
closed *and* there are no objects remaining in the latest batch:
.. code-block:: javascript
while (!watchCursor.isClosed()) {
let next = watchCursor.tryNext()
while (next !== null) {
printjson(next);
next = watchCursor.tryNext()
}
}
For complete documentation on change stream output, see
:ref:`change-stream-output`.
.. include:: /includes/isExhausted-no-change-streams.rst