|
4 | 4 |
|
5 | 5 | use MongoDB\BSON\ObjectId;
|
6 | 6 | use MongoDB\BSON\UTCDateTime;
|
| 7 | +use MongoDB\Collection; |
7 | 8 | use MongoDB\Database;
|
8 | 9 | use MongoDB\Driver\Cursor;
|
| 10 | +use MongoDB\Driver\Exception\CommandException; |
9 | 11 | use MongoDB\Driver\Exception\Exception;
|
10 | 12 | use MongoDB\Driver\ReadPreference;
|
11 | 13 | use MongoDB\Driver\WriteConcern;
|
12 | 14 |
|
13 | 15 | use function in_array;
|
| 16 | +use function microtime; |
14 | 17 | use function ob_end_clean;
|
15 | 18 | use function ob_start;
|
16 | 19 | use function var_dump;
|
@@ -1571,6 +1574,119 @@ public function testCausalConsistency(): void
|
1571 | 1574 | ob_end_clean();
|
1572 | 1575 | }
|
1573 | 1576 |
|
| 1577 | + public function testSnapshotQueries(): void |
| 1578 | + { |
| 1579 | + if (version_compare($this->getServerVersion(), '5.0.0', '<')) { |
| 1580 | + $this->markTestSkipped('Snapshot queries outside of transactions are not supported'); |
| 1581 | + } |
| 1582 | + |
| 1583 | + if (! ($this->isReplicaSet() || $this->isShardedClusterUsingReplicasets())) { |
| 1584 | + $this->markTestSkipped('Snapshot read concern is only supported with replicasets'); |
| 1585 | + } |
| 1586 | + |
| 1587 | + $client = static::createTestClient(); |
| 1588 | + |
| 1589 | + $catsCollection = $client->selectCollection('pets', 'cats'); |
| 1590 | + $catsCollection->drop(); |
| 1591 | + $catsCollection->insertMany([ |
| 1592 | + ['name' => 'Whiskers', 'color' => 'white', 'adoptable' => true], |
| 1593 | + ['name' => 'Garfield', 'color' => 'orange', 'adoptable' => false], |
| 1594 | + ]); |
| 1595 | + |
| 1596 | + $dogsCollection = $client->selectCollection('pets', 'dogs'); |
| 1597 | + $dogsCollection->drop(); |
| 1598 | + $dogsCollection->insertMany([ |
| 1599 | + ['name' => 'Toto', 'color' => 'black', 'adoptable' => true], |
| 1600 | + ['name' => 'Milo', 'color' => 'black', 'adoptable' => false], |
| 1601 | + ['name' => 'Brian', 'color' => 'white', 'adoptable' => true], |
| 1602 | + ]); |
| 1603 | + |
| 1604 | + if ($this->isShardedCluster()) { |
| 1605 | + $this->preventStaleDbVersionError('pets', 'cats'); |
| 1606 | + $this->preventStaleDbVersionError('pets', 'dogs'); |
| 1607 | + } else { |
| 1608 | + $this->waitForSnapshot('pets', 'cats'); |
| 1609 | + $this->waitForSnapshot('pets', 'dogs'); |
| 1610 | + } |
| 1611 | + |
| 1612 | + ob_start(); |
| 1613 | + |
| 1614 | + // Start Snapshot Query Example 1 |
| 1615 | + $catsCollection = $client->selectCollection('pets', 'cats'); |
| 1616 | + $dogsCollection = $client->selectCollection('pets', 'dogs'); |
| 1617 | + |
| 1618 | + $session = $client->startSession(['snapshot' => true]); |
| 1619 | + |
| 1620 | + $adoptablePetsCount = $catsCollection->aggregate( |
| 1621 | + [ |
| 1622 | + ['$match' => ['adoptable' => true]], |
| 1623 | + ['$count' => 'adoptableCatsCount'], |
| 1624 | + ], |
| 1625 | + ['session' => $session] |
| 1626 | + )->toArray()[0]->adoptableCatsCount; |
| 1627 | + |
| 1628 | + $adoptablePetsCount += $dogsCollection->aggregate( |
| 1629 | + [ |
| 1630 | + ['$match' => ['adoptable' => true]], |
| 1631 | + ['$count' => 'adoptableDogsCount'], |
| 1632 | + ], |
| 1633 | + ['session' => $session] |
| 1634 | + )->toArray()[0]->adoptableDogsCount; |
| 1635 | + |
| 1636 | + var_dump($adoptablePetsCount); |
| 1637 | + // End Snapshot Query Example 1 |
| 1638 | + |
| 1639 | + ob_end_clean(); |
| 1640 | + |
| 1641 | + $this->assertSame(3, $adoptablePetsCount); |
| 1642 | + |
| 1643 | + $catsCollection->drop(); |
| 1644 | + $dogsCollection->drop(); |
| 1645 | + |
| 1646 | + $salesCollection = $client->selectCollection('retail', 'sales'); |
| 1647 | + $salesCollection->drop(); |
| 1648 | + $salesCollection->insertMany([ |
| 1649 | + ['shoeType' => 'boot', 'price' => 30, 'saleDate' => new UTCDateTime()], |
| 1650 | + ]); |
| 1651 | + |
| 1652 | + if ($this->isShardedCluster()) { |
| 1653 | + $this->preventStaleDbVersionError('retail', 'sales'); |
| 1654 | + } else { |
| 1655 | + $this->waitForSnapshot('retail', 'sales'); |
| 1656 | + } |
| 1657 | + |
| 1658 | + // Start Snapshot Query Example 2 |
| 1659 | + $salesCollection = $client->selectCollection('retail', 'sales'); |
| 1660 | + |
| 1661 | + $session = $client->startSession(['snapshot' => true]); |
| 1662 | + |
| 1663 | + $totalDailySales = $salesCollection->aggregate( |
| 1664 | + [ |
| 1665 | + [ |
| 1666 | + '$match' => [ |
| 1667 | + '$expr' => [ |
| 1668 | + '$gt' => ['$saleDate', [ |
| 1669 | + '$dateSubtract' => [ |
| 1670 | + 'startDate' => '$$NOW', |
| 1671 | + 'unit' => 'day', |
| 1672 | + 'amount' => 1, |
| 1673 | + ], |
| 1674 | + ], |
| 1675 | + ], |
| 1676 | + ], |
| 1677 | + ], |
| 1678 | + ], |
| 1679 | + ['$count' => 'totalDailySales'], |
| 1680 | + ], |
| 1681 | + ['session' => $session] |
| 1682 | + )->toArray()[0]->totalDailySales; |
| 1683 | + // End Snapshot Query Example 2 |
| 1684 | + |
| 1685 | + $this->assertSame(1, $totalDailySales); |
| 1686 | + |
| 1687 | + $salesCollection->drop(); |
| 1688 | + } |
| 1689 | + |
1574 | 1690 | /**
|
1575 | 1691 | * @doesNotPerformAssertions
|
1576 | 1692 | */
|
@@ -1747,4 +1863,42 @@ private function assertInventoryCount($count): void
|
1747 | 1863 | {
|
1748 | 1864 | $this->assertCollectionCount($this->getDatabaseName() . '.' . $this->getCollectionName(), $count);
|
1749 | 1865 | }
|
| 1866 | + |
| 1867 | + private function waitForSnapshot(string $databaseName, string $collectionName): void |
| 1868 | + { |
| 1869 | + $collection = new Collection($this->manager, $databaseName, $collectionName); |
| 1870 | + $session = $this->manager->startSession(['snapshot' => true]); |
| 1871 | + |
| 1872 | + /* Retry until a snapshot query succeeds or ten seconds elapse, |
| 1873 | + * whichwever comes first. |
| 1874 | + * |
| 1875 | + * TODO: use hrtime() once the library requires PHP 7.3+ */ |
| 1876 | + $retryUntil = microtime(true) + 10; |
| 1877 | + |
| 1878 | + do { |
| 1879 | + try { |
| 1880 | + $collection->aggregate( |
| 1881 | + [['$match' => ['_id' => ['$exists' => true]]]], |
| 1882 | + ['session' => $session] |
| 1883 | + ); |
| 1884 | + |
| 1885 | + break; |
| 1886 | + } catch (CommandException $e) { |
| 1887 | + if ($e->getCode() === 246 /* SnapshotUnavailable */) { |
| 1888 | + continue; |
| 1889 | + } |
| 1890 | + |
| 1891 | + throw $e; |
| 1892 | + } |
| 1893 | + } while (microtime(true) < $retryUntil); |
| 1894 | + } |
| 1895 | + |
| 1896 | + /** |
| 1897 | + * @see https://jira.mongodb.org/browse/SERVER-39704 |
| 1898 | + */ |
| 1899 | + private function preventStaleDbVersionError(string $databaseName, string $collectionName): void |
| 1900 | + { |
| 1901 | + $collection = new Collection($this->manager, $databaseName, $collectionName); |
| 1902 | + $collection->distinct('foo'); |
| 1903 | + } |
1750 | 1904 | }
|
0 commit comments