Skip to content

Commit f1f77d3

Browse files
committed
PHPLIB-1617 Accept a Pipeline instance in aggregate and watch methods
1 parent 044a422 commit f1f77d3

File tree

4 files changed

+46
-28
lines changed

4 files changed

+46
-28
lines changed

src/Collection.php

+8-8
Original file line numberDiff line numberDiff line change
@@ -209,16 +209,16 @@ public function __toString(): string
209209
* Executes an aggregation framework pipeline on the collection.
210210
*
211211
* @see Aggregate::__construct() for supported options
212-
* @param array $pipeline Aggregation pipeline
213-
* @param array $options Command options
212+
* @param array|Pipeline $pipeline Aggregation pipeline
213+
* @param array $options Command options
214214
* @throws UnexpectedValueException if the command response was malformed
215215
* @throws UnsupportedException if options are not supported by the selected server
216216
* @throws InvalidArgumentException for parameter/option parsing errors
217217
* @throws DriverRuntimeException for other driver errors (e.g. connection errors)
218218
*/
219-
public function aggregate(array $pipeline, array $options = []): CursorInterface
219+
public function aggregate(array|Pipeline $pipeline, array $options = []): CursorInterface
220220
{
221-
if (is_builder_pipeline($pipeline)) {
221+
if (is_array($pipeline) && is_builder_pipeline($pipeline)) {
222222
$pipeline = new Pipeline(...$pipeline);
223223
}
224224

@@ -1014,13 +1014,13 @@ public function updateSearchIndex(string $name, array|object $definition, array
10141014
* Create a change stream for watching changes to the collection.
10151015
*
10161016
* @see Watch::__construct() for supported options
1017-
* @param array $pipeline Aggregation pipeline
1018-
* @param array $options Command options
1017+
* @param array|Pipeline $pipeline Aggregation pipeline
1018+
* @param array $options Command options
10191019
* @throws InvalidArgumentException for parameter/option parsing errors
10201020
*/
1021-
public function watch(array $pipeline = [], array $options = []): ChangeStream
1021+
public function watch(array|Pipeline $pipeline = [], array $options = []): ChangeStream
10221022
{
1023-
if (is_builder_pipeline($pipeline)) {
1023+
if (is_array($pipeline) && is_builder_pipeline($pipeline)) {
10241024
$pipeline = new Pipeline(...$pipeline);
10251025
}
10261026

src/Database.php

+8-8
Original file line numberDiff line numberDiff line change
@@ -188,16 +188,16 @@ public function __toString(): string
188188
* and $listLocalSessions. Requires MongoDB >= 3.6
189189
*
190190
* @see Aggregate::__construct() for supported options
191-
* @param array $pipeline Aggregation pipeline
192-
* @param array $options Command options
191+
* @param array|Pipeline $pipeline Aggregation pipeline
192+
* @param array $options Command options
193193
* @throws UnexpectedValueException if the command response was malformed
194194
* @throws UnsupportedException if options are not supported by the selected server
195195
* @throws InvalidArgumentException for parameter/option parsing errors
196196
* @throws DriverRuntimeException for other driver errors (e.g. connection errors)
197197
*/
198-
public function aggregate(array $pipeline, array $options = []): CursorInterface
198+
public function aggregate(array|Pipeline $pipeline, array $options = []): CursorInterface
199199
{
200-
if (is_builder_pipeline($pipeline)) {
200+
if (is_array($pipeline) && is_builder_pipeline($pipeline)) {
201201
$pipeline = new Pipeline(...$pipeline);
202202
}
203203

@@ -582,13 +582,13 @@ public function selectGridFSBucket(array $options = []): Bucket
582582
* Create a change stream for watching changes to the database.
583583
*
584584
* @see Watch::__construct() for supported options
585-
* @param array $pipeline Aggregation pipeline
586-
* @param array $options Command options
585+
* @param array|Pipeline $pipeline Aggregation pipeline
586+
* @param array $options Command options
587587
* @throws InvalidArgumentException for parameter/option parsing errors
588588
*/
589-
public function watch(array $pipeline = [], array $options = []): ChangeStream
589+
public function watch(array|Pipeline $pipeline = [], array $options = []): ChangeStream
590590
{
591-
if (is_builder_pipeline($pipeline)) {
591+
if (is_array($pipeline) && is_builder_pipeline($pipeline)) {
592592
$pipeline = new Pipeline(...$pipeline);
593593
}
594594

tests/Collection/BuilderCollectionFunctionalTest.php

+15-6
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use MongoDB\Builder\Pipeline;
77
use MongoDB\Builder\Query;
88
use MongoDB\Builder\Stage;
9+
use PHPUnit\Framework\Attributes\TestWith;
910

1011
use function iterator_to_array;
1112

@@ -18,7 +19,9 @@ public function setUp(): void
1819
$this->collection->insertMany([['x' => 1], ['x' => 2], ['x' => 2]]);
1920
}
2021

21-
public function testAggregate(): void
22+
#[TestWith([true])]
23+
#[TestWith([false])]
24+
public function testAggregate(bool $pipelineAsArray): void
2225
{
2326
$this->collection->insertMany([['x' => 10], ['x' => 10], ['x' => 10]]);
2427
$pipeline = new Pipeline(
@@ -27,8 +30,10 @@ public function testAggregate(): void
2730
buckets: 2,
2831
),
2932
);
30-
// Extract the list of stages for arg type restriction
31-
$pipeline = iterator_to_array($pipeline);
33+
34+
if ($pipelineAsArray) {
35+
$pipeline = iterator_to_array($pipeline);
36+
}
3237

3338
$results = $this->collection->aggregate($pipeline)->toArray();
3439
$this->assertCount(2, $results);
@@ -257,7 +262,9 @@ public function testUpdateManyWithPipeline(): void
257262
$this->assertEquals(3, $result[0]->x);
258263
}
259264

260-
public function testWatch(): void
265+
#[TestWith([true])]
266+
#[TestWith([false])]
267+
public function testWatch(bool $pipelineAsArray): void
261268
{
262269
$this->skipIfChangeStreamIsNotSupported();
263270

@@ -268,8 +275,10 @@ public function testWatch(): void
268275
$pipeline = new Pipeline(
269276
Stage::match(operationType: Query::eq('insert')),
270277
);
271-
// Extract the list of stages for arg type restriction
272-
$pipeline = iterator_to_array($pipeline);
278+
279+
if ($pipelineAsArray) {
280+
$pipeline = iterator_to_array($pipeline);
281+
}
273282

274283
$changeStream = $this->collection->watch($pipeline);
275284
$this->collection->insertOne(['x' => 3]);

tests/Database/BuilderDatabaseFunctionalTest.php

+15-6
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use MongoDB\Builder\Pipeline;
77
use MongoDB\Builder\Query;
88
use MongoDB\Builder\Stage;
9+
use PHPUnit\Framework\Attributes\TestWith;
910

1011
use function iterator_to_array;
1112

@@ -18,7 +19,9 @@ public function tearDown(): void
1819
parent::tearDown();
1920
}
2021

21-
public function testAggregate(): void
22+
#[TestWith([true])]
23+
#[TestWith([false])]
24+
public function testAggregate(bool $pipelineAsArray): void
2225
{
2326
$this->skipIfServerVersion('<', '6.0.0', '$documents stage is not supported');
2427

@@ -33,14 +36,18 @@ public function testAggregate(): void
3336
buckets: 2,
3437
),
3538
);
36-
// Extract the list of stages for arg type restriction
37-
$pipeline = iterator_to_array($pipeline);
39+
40+
if ($pipelineAsArray) {
41+
$pipeline = iterator_to_array($pipeline);
42+
}
3843

3944
$results = $this->database->aggregate($pipeline)->toArray();
4045
$this->assertCount(2, $results);
4146
}
4247

43-
public function testWatch(): void
48+
#[TestWith([true])]
49+
#[TestWith([false])]
50+
public function testWatch(bool $pipelineAsArray): void
4451
{
4552
$this->skipIfChangeStreamIsNotSupported();
4653

@@ -51,8 +58,10 @@ public function testWatch(): void
5158
$pipeline = new Pipeline(
5259
Stage::match(operationType: Query::eq('insert')),
5360
);
54-
// Extract the list of stages for arg type restriction
55-
$pipeline = iterator_to_array($pipeline);
61+
62+
if ($pipelineAsArray) {
63+
$pipeline = iterator_to_array($pipeline);
64+
}
5665

5766
$changeStream = $this->database->watch($pipeline);
5867
$this->database->selectCollection($this->getCollectionName())->insertOne(['x' => 3]);

0 commit comments

Comments
 (0)