Skip to content

Commit 7b4886b

Browse files
committed
PHPLIB-1419 Support builder Pipeline in Client and Database watch
1 parent d3caa38 commit 7b4886b

7 files changed

+135
-2
lines changed

psalm-baseline.xml

+28
Original file line numberDiff line numberDiff line change
@@ -191,13 +191,20 @@
191191
<file src="src/Client.php">
192192
<MixedArgument>
193193
<code><![CDATA[$driverOptions['driver'] ?? []]]></code>
194+
<code><![CDATA[$pipeline]]></code>
194195
</MixedArgument>
195196
<MixedAssignment>
196197
<code><![CDATA[$mergedDriver['platform']]]></code>
197198
</MixedAssignment>
198199
<MixedPropertyTypeCoercion>
199200
<code><![CDATA[$driverOptions['builderEncoder'] ?? new BuilderEncoder()]]></code>
200201
</MixedPropertyTypeCoercion>
202+
<NamedArgumentNotAllowed>
203+
<code><![CDATA[$pipeline]]></code>
204+
</NamedArgumentNotAllowed>
205+
<PossiblyInvalidArgument>
206+
<code><![CDATA[$pipeline]]></code>
207+
</PossiblyInvalidArgument>
201208
</file>
202209
<file src="src/Codec/EncodeIfSupported.php">
203210
<ArgumentTypeCoercion>
@@ -220,9 +227,17 @@
220227
</MixedArgumentTypeCoercion>
221228
</file>
222229
<file src="src/Collection.php">
230+
<MixedArgument>
231+
<code><![CDATA[$pipeline]]></code>
232+
<code><![CDATA[$pipeline]]></code>
233+
</MixedArgument>
223234
<MixedPropertyTypeCoercion>
224235
<code><![CDATA[$options['builderEncoder'] ?? new BuilderEncoder()]]></code>
225236
</MixedPropertyTypeCoercion>
237+
<NamedArgumentNotAllowed>
238+
<code><![CDATA[$pipeline]]></code>
239+
<code><![CDATA[$pipeline]]></code>
240+
</NamedArgumentNotAllowed>
226241
<PossiblyInvalidArgument>
227242
<code><![CDATA[$pipeline]]></code>
228243
<code><![CDATA[$pipeline]]></code>
@@ -242,9 +257,22 @@
242257
</MixedAssignment>
243258
</file>
244259
<file src="src/Database.php">
260+
<MixedArgument>
261+
<code><![CDATA[$pipeline]]></code>
262+
<code><![CDATA[$pipeline]]></code>
263+
</MixedArgument>
245264
<MixedPropertyTypeCoercion>
246265
<code><![CDATA[$options['builderEncoder'] ?? new BuilderEncoder()]]></code>
247266
</MixedPropertyTypeCoercion>
267+
<NamedArgumentNotAllowed>
268+
<code><![CDATA[$pipeline]]></code>
269+
<code><![CDATA[$pipeline]]></code>
270+
</NamedArgumentNotAllowed>
271+
<PossiblyInvalidArgument>
272+
<code><![CDATA[$pipeline]]></code>
273+
<code><![CDATA[$pipeline]]></code>
274+
<code><![CDATA[$pipeline]]></code>
275+
</PossiblyInvalidArgument>
248276
</file>
249277
<file src="src/GridFS/Bucket.php">
250278
<MixedArgument>

src/Client.php

+7
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
use MongoDB\BSON\Document;
2323
use MongoDB\BSON\PackedArray;
2424
use MongoDB\Builder\BuilderEncoder;
25+
use MongoDB\Builder\Pipeline;
2526
use MongoDB\Codec\Encoder;
2627
use MongoDB\Driver\ClientEncryption;
2728
use MongoDB\Driver\Exception\InvalidArgumentException as DriverInvalidArgumentException;
@@ -391,6 +392,12 @@ public function startSession(array $options = [])
391392
*/
392393
public function watch(array $pipeline = [], array $options = [])
393394
{
395+
if (is_builder_pipeline($pipeline)) {
396+
$pipeline = new Pipeline(...$pipeline);
397+
}
398+
399+
$pipeline = $this->builderEncoder->encodeIfSupported($pipeline);
400+
394401
if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
395402
$options['readPreference'] = $this->readPreference;
396403
}

src/Collection.php

+1
Original file line numberDiff line numberDiff line change
@@ -1110,6 +1110,7 @@ public function watch(array $pipeline = [], array $options = [])
11101110
}
11111111

11121112
$pipeline = $this->builderEncoder->encodeIfSupported($pipeline);
1113+
11131114
$options = $this->inheritReadOptions($options);
11141115
$options = $this->inheritCodecOrTypeMap($options);
11151116

src/Database.php

+13
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
use MongoDB\BSON\Document;
2222
use MongoDB\BSON\PackedArray;
2323
use MongoDB\Builder\BuilderEncoder;
24+
use MongoDB\Builder\Pipeline;
2425
use MongoDB\Codec\Encoder;
2526
use MongoDB\Driver\ClientEncryption;
2627
use MongoDB\Driver\Cursor;
@@ -202,6 +203,12 @@ public function __toString()
202203
*/
203204
public function aggregate(array $pipeline, array $options = [])
204205
{
206+
if (is_builder_pipeline($pipeline)) {
207+
$pipeline = new Pipeline(...$pipeline);
208+
}
209+
210+
$pipeline = $this->builderEncoder->encodeIfSupported($pipeline);
211+
205212
$hasWriteStage = is_last_pipeline_operator_write($pipeline);
206213

207214
if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
@@ -611,6 +618,12 @@ public function selectGridFSBucket(array $options = [])
611618
*/
612619
public function watch(array $pipeline = [], array $options = [])
613620
{
621+
if (is_builder_pipeline($pipeline)) {
622+
$pipeline = new Pipeline(...$pipeline);
623+
}
624+
625+
$pipeline = $this->builderEncoder->encodeIfSupported($pipeline);
626+
614627
if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
615628
$options['readPreference'] = $this->readPreference;
616629
}

tests/ClientFunctionalTest.php

+25
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
namespace MongoDB\Tests;
44

5+
use MongoDB\Builder\Pipeline;
6+
use MongoDB\Builder\Query;
7+
use MongoDB\Builder\Stage;
58
use MongoDB\Client;
69
use MongoDB\Driver\BulkWrite;
710
use MongoDB\Driver\Command;
@@ -13,6 +16,7 @@
1316

1417
use function call_user_func;
1518
use function is_callable;
19+
use function iterator_to_array;
1620
use function sprintf;
1721

1822
/**
@@ -137,4 +141,25 @@ public function testAddAndRemoveSubscriber(): void
137141

138142
$client->getManager()->executeCommand('admin', new Command(['ping' => 1]));
139143
}
144+
145+
public function testWatchWithBuilderPipeline(): void
146+
{
147+
$this->skipIfChangeStreamIsNotSupported();
148+
149+
if ($this->isShardedCluster()) {
150+
$this->markTestSkipped('Test does not apply on sharded clusters: need more than a single getMore call on the change stream.');
151+
}
152+
153+
$pipeline = new Pipeline(
154+
Stage::match(operationType: Query::eq('insert')),
155+
);
156+
// Extract the list of stages for arg type restriction
157+
$pipeline = iterator_to_array($pipeline);
158+
159+
$changeStream = $this->client->watch($pipeline);
160+
$this->client->selectCollection($this->getDatabaseName(), $this->getCollectionName())->insertOne(['x' => 3]);
161+
$changeStream->next();
162+
$this->assertTrue($changeStream->valid());
163+
$this->assertEquals('insert', $changeStream->current()->operationType);
164+
}
140165
}

tests/Collection/BuilderCollectionFunctionalTest.php

-2
Original file line numberDiff line numberDiff line change
@@ -272,8 +272,6 @@ public function testWatch(): void
272272
$pipeline = iterator_to_array($pipeline);
273273

274274
$changeStream = $this->collection->watch($pipeline);
275-
$changeStream->rewind();
276-
$this->assertNull($changeStream->current());
277275
$this->collection->insertOne(['x' => 3]);
278276
$changeStream->next();
279277
$this->assertTrue($changeStream->valid());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
<?php
2+
3+
namespace MongoDB\Tests\Database;
4+
5+
use MongoDB\Builder\Expression;
6+
use MongoDB\Builder\Pipeline;
7+
use MongoDB\Builder\Query;
8+
use MongoDB\Builder\Stage;
9+
10+
use function iterator_to_array;
11+
12+
class BuilderDatabaseFunctionalTest extends FunctionalTestCase
13+
{
14+
public function tearDown(): void
15+
{
16+
$this->dropCollection($this->getDatabaseName(), $this->getCollectionName());
17+
18+
parent::tearDown();
19+
}
20+
21+
public function testAggregate(): void
22+
{
23+
$pipeline = new Pipeline(
24+
Stage::documents([
25+
['x' => 1],
26+
['x' => 2],
27+
['x' => 3],
28+
]),
29+
Stage::bucketAuto(
30+
groupBy: Expression::intFieldPath('x'),
31+
buckets: 2,
32+
),
33+
);
34+
// Extract the list of stages for arg type restriction
35+
$pipeline = iterator_to_array($pipeline);
36+
37+
$results = $this->database->aggregate($pipeline)->toArray();
38+
$this->assertCount(2, $results);
39+
}
40+
41+
public function testWatch(): void
42+
{
43+
$this->skipIfChangeStreamIsNotSupported();
44+
45+
if ($this->isShardedCluster()) {
46+
$this->markTestSkipped('Test does not apply on sharded clusters: need more than a single getMore call on the change stream.');
47+
}
48+
49+
$pipeline = new Pipeline(
50+
Stage::match(operationType: Query::eq('insert')),
51+
);
52+
// Extract the list of stages for arg type restriction
53+
$pipeline = iterator_to_array($pipeline);
54+
55+
$changeStream = $this->database->watch($pipeline);
56+
$this->database->selectCollection($this->getCollectionName())->insertOne(['x' => 3]);
57+
$changeStream->next();
58+
$this->assertTrue($changeStream->valid());
59+
$this->assertEquals('insert', $changeStream->current()->operationType);
60+
}
61+
}

0 commit comments

Comments
 (0)