Skip to content

Commit a829d24

Browse files
authored
PHPLIB-1419 Encode Agg builder objects in Collection methods (#1383)
1 parent 8956fb5 commit a829d24

9 files changed

+227
-2
lines changed

psalm-baseline.xml

+34
Original file line numberDiff line numberDiff line change
@@ -191,13 +191,20 @@
191191
<file src="src/Client.php">
192192
<MixedArgument>
193193
<code><![CDATA[$driverOptions['driver'] ?? []]]></code>
194+
<code><![CDATA[$pipeline]]></code>
194195
</MixedArgument>
195196
<MixedAssignment>
196197
<code><![CDATA[$mergedDriver['platform']]]></code>
197198
</MixedAssignment>
198199
<MixedPropertyTypeCoercion>
199200
<code><![CDATA[$driverOptions['builderEncoder'] ?? new BuilderEncoder()]]></code>
200201
</MixedPropertyTypeCoercion>
202+
<NamedArgumentNotAllowed>
203+
<code><![CDATA[$pipeline]]></code>
204+
</NamedArgumentNotAllowed>
205+
<PossiblyInvalidArgument>
206+
<code><![CDATA[$pipeline]]></code>
207+
</PossiblyInvalidArgument>
201208
</file>
202209
<file src="src/Codec/EncodeIfSupported.php">
203210
<ArgumentTypeCoercion>
@@ -220,9 +227,22 @@
220227
</MixedArgumentTypeCoercion>
221228
</file>
222229
<file src="src/Collection.php">
230+
<MixedArgument>
231+
<code><![CDATA[$pipeline]]></code>
232+
<code><![CDATA[$pipeline]]></code>
233+
</MixedArgument>
223234
<MixedPropertyTypeCoercion>
224235
<code><![CDATA[$options['builderEncoder'] ?? new BuilderEncoder()]]></code>
225236
</MixedPropertyTypeCoercion>
237+
<NamedArgumentNotAllowed>
238+
<code><![CDATA[$pipeline]]></code>
239+
<code><![CDATA[$pipeline]]></code>
240+
</NamedArgumentNotAllowed>
241+
<PossiblyInvalidArgument>
242+
<code><![CDATA[$pipeline]]></code>
243+
<code><![CDATA[$pipeline]]></code>
244+
<code><![CDATA[$pipeline]]></code>
245+
</PossiblyInvalidArgument>
226246
</file>
227247
<file src="src/Command/ListCollections.php">
228248
<MixedAssignment>
@@ -237,9 +257,22 @@
237257
</MixedAssignment>
238258
</file>
239259
<file src="src/Database.php">
260+
<MixedArgument>
261+
<code><![CDATA[$pipeline]]></code>
262+
<code><![CDATA[$pipeline]]></code>
263+
</MixedArgument>
240264
<MixedPropertyTypeCoercion>
241265
<code><![CDATA[$options['builderEncoder'] ?? new BuilderEncoder()]]></code>
242266
</MixedPropertyTypeCoercion>
267+
<NamedArgumentNotAllowed>
268+
<code><![CDATA[$pipeline]]></code>
269+
<code><![CDATA[$pipeline]]></code>
270+
</NamedArgumentNotAllowed>
271+
<PossiblyInvalidArgument>
272+
<code><![CDATA[$pipeline]]></code>
273+
<code><![CDATA[$pipeline]]></code>
274+
<code><![CDATA[$pipeline]]></code>
275+
</PossiblyInvalidArgument>
243276
</file>
244277
<file src="src/GridFS/Bucket.php">
245278
<MixedArgument>
@@ -854,6 +887,7 @@
854887
<MixedAssignment>
855888
<code><![CDATA[$element[$key]]]></code>
856889
<code><![CDATA[$stage]]></code>
890+
<code><![CDATA[$stage]]></code>
857891
<code><![CDATA[$type]]></code>
858892
<code><![CDATA[$typeMap['fieldPaths'][$fieldPath . '.' . $existingFieldPath]]]></code>
859893
<code><![CDATA[$typeMap['fieldPaths'][$fieldPath]]]></code>

src/Client.php

+7
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
use MongoDB\BSON\Document;
2323
use MongoDB\BSON\PackedArray;
2424
use MongoDB\Builder\BuilderEncoder;
25+
use MongoDB\Builder\Pipeline;
2526
use MongoDB\Codec\Encoder;
2627
use MongoDB\Driver\ClientEncryption;
2728
use MongoDB\Driver\Exception\InvalidArgumentException as DriverInvalidArgumentException;
@@ -391,6 +392,12 @@ public function startSession(array $options = [])
391392
*/
392393
public function watch(array $pipeline = [], array $options = [])
393394
{
395+
if (is_builder_pipeline($pipeline)) {
396+
$pipeline = new Pipeline(...$pipeline);
397+
}
398+
399+
$pipeline = $this->builderEncoder->encodeIfSupported($pipeline);
400+
394401
if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
395402
$options['readPreference'] = $this->readPreference;
396403
}

src/Collection.php

+13
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
use MongoDB\BSON\JavascriptInterface;
2424
use MongoDB\BSON\PackedArray;
2525
use MongoDB\Builder\BuilderEncoder;
26+
use MongoDB\Builder\Pipeline;
2627
use MongoDB\Codec\DocumentCodec;
2728
use MongoDB\Codec\Encoder;
2829
use MongoDB\Driver\CursorInterface;
@@ -223,6 +224,12 @@ public function __toString()
223224
*/
224225
public function aggregate(array $pipeline, array $options = [])
225226
{
227+
if (is_builder_pipeline($pipeline)) {
228+
$pipeline = new Pipeline(...$pipeline);
229+
}
230+
231+
$pipeline = $this->builderEncoder->encodeIfSupported($pipeline);
232+
226233
$hasWriteStage = is_last_pipeline_operator_write($pipeline);
227234

228235
$options = $this->inheritReadPreference($options);
@@ -1098,6 +1105,12 @@ public function updateSearchIndex(string $name, array|object $definition, array
10981105
*/
10991106
public function watch(array $pipeline = [], array $options = [])
11001107
{
1108+
if (is_builder_pipeline($pipeline)) {
1109+
$pipeline = new Pipeline(...$pipeline);
1110+
}
1111+
1112+
$pipeline = $this->builderEncoder->encodeIfSupported($pipeline);
1113+
11011114
$options = $this->inheritReadOptions($options);
11021115
$options = $this->inheritCodecOrTypeMap($options);
11031116

src/Database.php

+13
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
use MongoDB\BSON\Document;
2222
use MongoDB\BSON\PackedArray;
2323
use MongoDB\Builder\BuilderEncoder;
24+
use MongoDB\Builder\Pipeline;
2425
use MongoDB\Codec\Encoder;
2526
use MongoDB\Driver\ClientEncryption;
2627
use MongoDB\Driver\Cursor;
@@ -202,6 +203,12 @@ public function __toString()
202203
*/
203204
public function aggregate(array $pipeline, array $options = [])
204205
{
206+
if (is_builder_pipeline($pipeline)) {
207+
$pipeline = new Pipeline(...$pipeline);
208+
}
209+
210+
$pipeline = $this->builderEncoder->encodeIfSupported($pipeline);
211+
205212
$hasWriteStage = is_last_pipeline_operator_write($pipeline);
206213

207214
if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
@@ -611,6 +618,12 @@ public function selectGridFSBucket(array $options = [])
611618
*/
612619
public function watch(array $pipeline = [], array $options = [])
613620
{
621+
if (is_builder_pipeline($pipeline)) {
622+
$pipeline = new Pipeline(...$pipeline);
623+
}
624+
625+
$pipeline = $this->builderEncoder->encodeIfSupported($pipeline);
626+
614627
if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
615628
$options['readPreference'] = $this->readPreference;
616629
}

src/functions.php

+22
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
use MongoDB\BSON\Document;
2222
use MongoDB\BSON\PackedArray;
2323
use MongoDB\BSON\Serializable;
24+
use MongoDB\Builder\Type\StageInterface;
2425
use MongoDB\Driver\Exception\RuntimeException as DriverRuntimeException;
2526
use MongoDB\Driver\Manager;
2627
use MongoDB\Driver\ReadPreference;
@@ -327,6 +328,27 @@ function is_pipeline(array|object $pipeline, bool $allowEmpty = false): bool
327328
return true;
328329
}
329330

331+
/**
332+
* Returns whether the argument is a list that contains at least one
333+
* {@see StageInterface} object.
334+
*
335+
* @internal
336+
*/
337+
function is_builder_pipeline(array $pipeline): bool
338+
{
339+
if (! $pipeline || ! array_is_list($pipeline)) {
340+
return false;
341+
}
342+
343+
foreach ($pipeline as $stage) {
344+
if (is_object($stage) && $stage instanceof StageInterface) {
345+
return true;
346+
}
347+
}
348+
349+
return false;
350+
}
351+
330352
/**
331353
* Returns whether we are currently in a transaction.
332354
*

tests/ClientFunctionalTest.php

+25
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
namespace MongoDB\Tests;
44

5+
use MongoDB\Builder\Pipeline;
6+
use MongoDB\Builder\Query;
7+
use MongoDB\Builder\Stage;
58
use MongoDB\Client;
69
use MongoDB\Driver\BulkWrite;
710
use MongoDB\Driver\Command;
@@ -13,6 +16,7 @@
1316

1417
use function call_user_func;
1518
use function is_callable;
19+
use function iterator_to_array;
1620
use function sprintf;
1721

1822
/**
@@ -137,4 +141,25 @@ public function testAddAndRemoveSubscriber(): void
137141

138142
$client->getManager()->executeCommand('admin', new Command(['ping' => 1]));
139143
}
144+
145+
public function testWatchWithBuilderPipeline(): void
146+
{
147+
$this->skipIfChangeStreamIsNotSupported();
148+
149+
if ($this->isShardedCluster()) {
150+
$this->markTestSkipped('Test does not apply on sharded clusters: need more than a single getMore call on the change stream.');
151+
}
152+
153+
$pipeline = new Pipeline(
154+
Stage::match(operationType: Query::eq('insert')),
155+
);
156+
// Extract the list of stages for arg type restriction
157+
$pipeline = iterator_to_array($pipeline);
158+
159+
$changeStream = $this->client->watch($pipeline);
160+
$this->client->selectCollection($this->getDatabaseName(), $this->getCollectionName())->insertOne(['x' => 3]);
161+
$changeStream->next();
162+
$this->assertTrue($changeStream->valid());
163+
$this->assertEquals('insert', $changeStream->current()->operationType);
164+
}
140165
}

tests/Collection/BuilderCollectionFunctionalTest.php

+32-2
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@
22

33
namespace MongoDB\Tests\Collection;
44

5+
use MongoDB\Builder\Expression;
56
use MongoDB\Builder\Pipeline;
67
use MongoDB\Builder\Query;
78
use MongoDB\Builder\Stage;
89

10+
use function iterator_to_array;
11+
912
class BuilderCollectionFunctionalTest extends FunctionalTestCase
1013
{
1114
public function setUp(): void
@@ -17,7 +20,18 @@ public function setUp(): void
1720

1821
public function testAggregate(): void
1922
{
20-
$this->markTestSkipped('Not supported yet');
23+
$this->collection->insertMany([['x' => 10], ['x' => 10], ['x' => 10]]);
24+
$pipeline = new Pipeline(
25+
Stage::bucketAuto(
26+
groupBy: Expression::intFieldPath('x'),
27+
buckets: 2,
28+
),
29+
);
30+
// Extract the list of stages for arg type restriction
31+
$pipeline = iterator_to_array($pipeline);
32+
33+
$results = $this->collection->aggregate($pipeline)->toArray();
34+
$this->assertCount(2, $results);
2135
}
2236

2337
public function testBulkWriteDeleteMany(): void
@@ -245,6 +259,22 @@ public function testUpdateManyWithPipeline(): void
245259

246260
public function testWatch(): void
247261
{
248-
$this->markTestSkipped('Not supported yet');
262+
$this->skipIfChangeStreamIsNotSupported();
263+
264+
if ($this->isShardedCluster()) {
265+
$this->markTestSkipped('Test does not apply on sharded clusters: need more than a single getMore call on the change stream.');
266+
}
267+
268+
$pipeline = new Pipeline(
269+
Stage::match(operationType: Query::eq('insert')),
270+
);
271+
// Extract the list of stages for arg type restriction
272+
$pipeline = iterator_to_array($pipeline);
273+
274+
$changeStream = $this->collection->watch($pipeline);
275+
$this->collection->insertOne(['x' => 3]);
276+
$changeStream->next();
277+
$this->assertTrue($changeStream->valid());
278+
$this->assertEquals('insert', $changeStream->current()->operationType);
249279
}
250280
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
<?php
2+
3+
namespace MongoDB\Tests\Database;
4+
5+
use MongoDB\Builder\Expression;
6+
use MongoDB\Builder\Pipeline;
7+
use MongoDB\Builder\Query;
8+
use MongoDB\Builder\Stage;
9+
10+
use function iterator_to_array;
11+
12+
class BuilderDatabaseFunctionalTest extends FunctionalTestCase
13+
{
14+
public function tearDown(): void
15+
{
16+
$this->dropCollection($this->getDatabaseName(), $this->getCollectionName());
17+
18+
parent::tearDown();
19+
}
20+
21+
public function testAggregate(): void
22+
{
23+
$this->skipIfServerVersion('<', '6.0.0', '$documents stage is not supported');
24+
25+
$pipeline = new Pipeline(
26+
Stage::documents([
27+
['x' => 1],
28+
['x' => 2],
29+
['x' => 3],
30+
]),
31+
Stage::bucketAuto(
32+
groupBy: Expression::intFieldPath('x'),
33+
buckets: 2,
34+
),
35+
);
36+
// Extract the list of stages for arg type restriction
37+
$pipeline = iterator_to_array($pipeline);
38+
39+
$results = $this->database->aggregate($pipeline)->toArray();
40+
$this->assertCount(2, $results);
41+
}
42+
43+
public function testWatch(): void
44+
{
45+
$this->skipIfChangeStreamIsNotSupported();
46+
47+
if ($this->isShardedCluster()) {
48+
$this->markTestSkipped('Test does not apply on sharded clusters: need more than a single getMore call on the change stream.');
49+
}
50+
51+
$pipeline = new Pipeline(
52+
Stage::match(operationType: Query::eq('insert')),
53+
);
54+
// Extract the list of stages for arg type restriction
55+
$pipeline = iterator_to_array($pipeline);
56+
57+
$changeStream = $this->database->watch($pipeline);
58+
$this->database->selectCollection($this->getCollectionName())->insertOne(['x' => 3]);
59+
$changeStream->next();
60+
$this->assertTrue($changeStream->valid());
61+
$this->assertEquals('insert', $changeStream->current()->operationType);
62+
}
63+
}

0 commit comments

Comments
 (0)