2
2
3
3
namespace MongoDB \Tests \Collection ;
4
4
5
+ use MongoDB \Builder \Expression ;
5
6
use MongoDB \Builder \Pipeline ;
6
7
use MongoDB \Builder \Query ;
7
8
use MongoDB \Builder \Stage ;
8
9
10
+ use function iterator_to_array ;
11
+
9
12
class BuilderCollectionFunctionalTest extends FunctionalTestCase
10
13
{
11
14
public function setUp (): void
@@ -17,7 +20,18 @@ public function setUp(): void
17
20
18
21
public function testAggregate (): void
19
22
{
20
- $ this ->markTestSkipped ('Not supported yet ' );
23
+ $ this ->collection ->insertMany ([['x ' => 10 ], ['x ' => 10 ], ['x ' => 10 ]]);
24
+ $ pipeline = new Pipeline (
25
+ Stage::bucketAuto (
26
+ groupBy: Expression::intFieldPath ('x ' ),
27
+ buckets: 2 ,
28
+ ),
29
+ );
30
+ // Extract the list of stages for arg type restriction
31
+ $ pipeline = iterator_to_array ($ pipeline );
32
+
33
+ $ results = $ this ->collection ->aggregate ($ pipeline )->toArray ();
34
+ $ this ->assertCount (2 , $ results );
21
35
}
22
36
23
37
public function testBulkWriteDeleteMany (): void
@@ -245,6 +259,24 @@ public function testUpdateManyWithPipeline(): void
245
259
246
260
public function testWatch (): void
247
261
{
248
- $ this ->markTestSkipped ('Not supported yet ' );
262
+ $ this ->skipIfChangeStreamIsNotSupported ();
263
+
264
+ if ($ this ->isShardedCluster ()) {
265
+ $ this ->markTestSkipped ('Test does not apply on sharded clusters: need more than a single getMore call on the change stream. ' );
266
+ }
267
+
268
+ $ pipeline = new Pipeline (
269
+ Stage::match (operationType: Query::eq ('insert ' )),
270
+ );
271
+ // Extract the list of stages for arg type restriction
272
+ $ pipeline = iterator_to_array ($ pipeline );
273
+
274
+ $ changeStream = $ this ->collection ->watch ($ pipeline );
275
+ $ changeStream ->rewind ();
276
+ $ this ->assertNull ($ changeStream ->current ());
277
+ $ this ->collection ->insertOne (['x ' => 3 ]);
278
+ $ changeStream ->next ();
279
+ $ this ->assertTrue ($ changeStream ->valid ());
280
+ $ this ->assertEquals ('insert ' , $ changeStream ->current ()->operationType );
249
281
}
250
282
}
0 commit comments