From ec07feda9c92466c381933e1b717e2c7c8762069 Mon Sep 17 00:00:00 2001 From: aumel Date: Mon, 8 Jul 2019 20:33:17 +0200 Subject: [PATCH] Invoke sendUpdateIndexMessage on PostFlush --- .../SyncIndexWithObjectChangeListener.php | 25 ++++++++++++++++--- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/Doctrine/SyncIndexWithObjectChangeListener.php b/Doctrine/SyncIndexWithObjectChangeListener.php index d286a94..5485979 100644 --- a/Doctrine/SyncIndexWithObjectChangeListener.php +++ b/Doctrine/SyncIndexWithObjectChangeListener.php @@ -2,6 +2,7 @@ namespace Enqueue\ElasticaBundle\Doctrine; use Doctrine\Common\Persistence\Event\LifecycleEventArgs; +use Doctrine\ORM\Event\PostFlushEventArgs; use Enqueue\ElasticaBundle\Doctrine\Queue\Commands; use Enqueue\ElasticaBundle\Doctrine\Queue\SyncIndexWithObjectChangeProcessor as SyncProcessor; use Enqueue\Util\JSON; @@ -17,6 +18,11 @@ final class SyncIndexWithObjectChangeListener implements EventSubscriber */ private $modelClass; + /** + * @var array + */ + private $scheduledForUpdateIndex = []; + /** * @var array */ @@ -31,23 +37,33 @@ public function __construct(Context $context, $modelClass, array $config) public function postUpdate(LifecycleEventArgs $args) { - if ($args->getObject() instanceof $this->modelClass) { - $this->sendUpdateIndexMessage(SyncProcessor::UPDATE_ACTION, $args); + $this->scheduledForUpdateIndex[] = ['action' => SyncProcessor::UPDATE_ACTION, 'args' => $args]; } } public function postPersist(LifecycleEventArgs $args) { if ($args->getObject() instanceof $this->modelClass) { - $this->sendUpdateIndexMessage(SyncProcessor::INSERT_ACTION, $args); + $this->scheduledForUpdateIndex[] = ['action' => SyncProcessor::INSERT_ACTION, 'args' => $args]; } } public function preRemove(LifecycleEventArgs $args) { if ($args->getObject() instanceof $this->modelClass) { - $this->sendUpdateIndexMessage(SyncProcessor::REMOVE_ACTION, $args); + $this->scheduledForUpdateIndex[] = ['action' => SyncProcessor::REMOVE_ACTION, 'args' => $args]; + } + } + + public function postFlush(PostFlushEventArgs $event) + { + if (count($this->scheduledForUpdateIndex)) { + foreach ($this->scheduledForUpdateIndex as $updateIndex) { + $this->sendUpdateIndexMessage($updateIndex['action'], $updateIndex['args']); + } + + $this->scheduledForUpdateIndex = []; } } @@ -57,6 +73,7 @@ public function getSubscribedEvents() 'postPersist', 'postUpdate', 'preRemove', + 'postFlush' ]; }