self::FULL_DOCUMENT_DEFAULT, 'readPreference' => new ReadPreference(ReadPreference::RP_PRIMARY), ]; if (isset($options['fullDocument']) && ! is_string($options['fullDocument'])) { throw InvalidArgumentException::invalidType('"fullDocument" option', $options['fullDocument'], 'string'); } if (isset($options['resumeAfter']) && ! is_array($options['resumeAfter']) && ! is_object($options['resumeAfter'])) { throw InvalidArgumentException::invalidType('"resumeAfter" option', $options['resumeAfter'], 'array or object'); } if (isset($options['startAtOperationTime']) && ! $options['startAtOperationTime'] instanceof TimestampInterface) { throw InvalidArgumentException::invalidType('"startAtOperationTime" option', $options['startAtOperationTime'], TimestampInterface::class); } /* In the absence of an explicit session, create one to ensure that the * initial aggregation and any resume attempts can use the same session * ("implicit from the user's perspective" per PHPLIB-342). Since this * is filling in for an implicit session, we default "causalConsistency" * to false. */ if ( ! isset($options['session'])) { try { $options['session'] = $manager->startSession(['causalConsistency' => false]); } catch (RuntimeException $e) { /* We can ignore the exception, as libmongoc likely cannot * create its own session and there is no risk of a mismatch. */ } } $this->aggregateOptions = array_intersect_key($options, ['batchSize' => 1, 'collation' => 1, 'maxAwaitTimeMS' => 1, 'readConcern' => 1, 'readPreference' => 1, 'session' => 1, 'typeMap' => 1]); $this->changeStreamOptions = array_intersect_key($options, ['fullDocument' => 1, 'resumeAfter' => 1, 'startAtOperationTime' => 1]); // Null database name implies a cluster-wide change stream if ($databaseName === null) { $databaseName = 'admin'; $this->changeStreamOptions['allChangesForCluster'] = true; } $this->databaseName = (string) $databaseName; $this->collectionName = isset($collectionName) ? (string) $collectionName : null; $this->pipeline = $pipeline; $this->aggregate = $this->createAggregate(); $this->resumeCallable = $this->createResumeCallable($manager); } /** @internal */ final public function commandFailed(CommandFailedEvent $event) { } /** @internal */ final public function commandStarted(CommandStartedEvent $event) { } /** @internal */ final public function commandSucceeded(CommandSucceededEvent $event) { if ($event->getCommandName() !== 'aggregate') { return; } $reply = $event->getReply(); if (isset($reply->operationTime) && $reply->operationTime instanceof TimestampInterface) { $this->operationTime = $reply->operationTime; } } /** * Execute the operation. * * @see Executable::execute() * @param Server $server * @return ChangeStream * @throws UnsupportedException if collation or read concern is used and unsupported * @throws RuntimeException for other driver errors (e.g. connection errors) */ public function execute(Server $server) { return new ChangeStream($this->executeAggregate($server), $this->resumeCallable); } /** * Create the aggregate command for creating a change stream. * * This method is also used to recreate the aggregate command when resuming. * * @return Aggregate */ private function createAggregate() { $pipeline = $this->pipeline; array_unshift($pipeline, ['$changeStream' => (object) $this->changeStreamOptions]); return new Aggregate($this->databaseName, $this->collectionName, $pipeline, $this->aggregateOptions); } private function createResumeCallable(Manager $manager) { return function($resumeToken = null) use ($manager) { /* If a resume token was provided, update the "resumeAfter" option * and ensure that "startAtOperationTime" is no longer set. */ if ($resumeToken !== null) { $this->changeStreamOptions['resumeAfter'] = $resumeToken; unset($this->changeStreamOptions['startAtOperationTime']); } /* If we captured an operation time from the first aggregate command * and there is no "resumeAfter" option, set "startAtOperationTime" * so that we can resume from the original aggregate's time. */ if ($this->operationTime !== null && ! isset($this->changeStreamOptions['resumeAfter'])) { $this->changeStreamOptions['startAtOperationTime'] = $this->operationTime; } $this->aggregate = $this->createAggregate(); /* Select a new server using the read preference, execute this * operation on it, and return the new ChangeStream. */ $server = $manager->selectServer($this->aggregateOptions['readPreference']); return $this->execute($server); }; } /** * Execute the aggregate command and optionally capture its operation time. * * @param Server $server * @return Cursor */ private function executeAggregate(Server $server) { /* If we've already captured an operation time or the server does not * support returning an operation time (e.g. MongoDB 3.6), execute the * aggregation directly and return its cursor. */ if ($this->operationTime !== null || ! \MongoDB\server_supports_feature($server, self::$wireVersionForOperationTime)) { return $this->aggregate->execute($server); } /* Otherwise, execute the aggregation using command monitoring so that * we can capture its operation time with commandSucceeded(). */ \MongoDB\Driver\Monitoring\addSubscriber($this); try { return $this->aggregate->execute($server); } finally { \MongoDB\Driver\Monitoring\removeSubscriber($this); } } }