vendor/shopware/core/Framework/DataAbstractionLayer/Indexing/EntityIndexerRegistry.php line 69

Open in your IDE?
  1. <?php declare(strict_types=1);
  2. namespace Shopware\Core\Framework\DataAbstractionLayer\Indexing;
  3. use Shopware\Core\Framework\DataAbstractionLayer\Event\EntityWrittenContainerEvent;
  4. use Shopware\Core\Framework\DataAbstractionLayer\Indexing\MessageQueue\IterateEntityIndexerMessage;
  5. use Shopware\Core\Framework\MessageQueue\Handler\AbstractMessageHandler;
  6. use Symfony\Component\EventDispatcher\EventSubscriberInterface;
  7. use Symfony\Component\Messenger\MessageBusInterface;
  8. class EntityIndexerRegistry extends AbstractMessageHandler implements EventSubscriberInterface
  9. {
  10.     public const USE_INDEXING_QUEUE 'use-queue-indexing';
  11.     public const DISABLE_INDEXING 'disable-indexing';
  12.     /**
  13.      * @var EntityIndexer[]
  14.      */
  15.     private $indexer;
  16.     /**
  17.      * @var MessageBusInterface
  18.      */
  19.     private $messageBus;
  20.     /**
  21.      * @var bool
  22.      */
  23.     private $working false;
  24.     public function __construct(iterable $indexerMessageBusInterface $messageBus)
  25.     {
  26.         $this->indexer $indexer;
  27.         $this->messageBus $messageBus;
  28.     }
  29.     public static function getSubscribedEvents(): array
  30.     {
  31.         return [
  32.             EntityWrittenContainerEvent::class => [
  33.                 ['refresh'1000],
  34.             ],
  35.         ];
  36.     }
  37.     public static function getHandledMessages(): iterable
  38.     {
  39.         return [
  40.             EntityIndexingMessage::class,
  41.             IterateEntityIndexerMessage::class,
  42.         ];
  43.     }
  44.     public function index(bool $useQueue): void
  45.     {
  46.         foreach ($this->indexer as $indexer) {
  47.             $offset null;
  48.             while ($message $indexer->iterate($offset)) {
  49.                 $message->setIndexer($indexer->getName());
  50.                 $this->sendOrHandle($message$useQueue);
  51.                 $offset $message->getOffset();
  52.             }
  53.         }
  54.     }
  55.     public function refresh(EntityWrittenContainerEvent $event): void
  56.     {
  57.         if ($this->working) {
  58.             return;
  59.         }
  60.         $this->working true;
  61.         if ($event->getContext()->hasExtension(self::DISABLE_INDEXING)) {
  62.             return;
  63.         }
  64.         $useQueue $event->getContext()->hasExtension(self::USE_INDEXING_QUEUE);
  65.         foreach ($this->indexer as $indexer) {
  66.             $message $indexer->update($event);
  67.             if (!$message) {
  68.                 continue;
  69.             }
  70.             $message->setIndexer($indexer->getName());
  71.             $this->sendOrHandle($message$useQueue);
  72.         }
  73.         $this->working false;
  74.     }
  75.     public function handle($message): void
  76.     {
  77.         if ($message instanceof EntityIndexingMessage) {
  78.             $indexer $this->getIndexer($message->getIndexer());
  79.             if ($indexer) {
  80.                 $indexer->handle($message);
  81.             }
  82.             return;
  83.         }
  84.         if ($message instanceof IterateEntityIndexerMessage) {
  85.             $next $this->iterateIndexer($message->getIndexer(), $message->getOffset(), true);
  86.             if (!$next) {
  87.                 return;
  88.             }
  89.             $this->messageBus->dispatch(new IterateEntityIndexerMessage($message->getIndexer(), $next->getOffset()));
  90.             return;
  91.         }
  92.     }
  93.     public function sendIndexingMessage(array $indexer = []): void
  94.     {
  95.         if (empty($indexer)) {
  96.             $indexer = [];
  97.             foreach ($this->indexer as $loop) {
  98.                 $indexer[] = $loop->getName();
  99.             }
  100.         }
  101.         if (empty($indexer)) {
  102.             return;
  103.         }
  104.         foreach ($indexer as $name) {
  105.             $this->messageBus->dispatch(new IterateEntityIndexerMessage($namenull));
  106.         }
  107.     }
  108.     public function has(string $name): bool
  109.     {
  110.         return $this->getIndexer($name) !== null;
  111.     }
  112.     private function sendOrHandle(EntityIndexingMessage $messagebool $useQueue): void
  113.     {
  114.         if ($useQueue || $message->forceQueue()) {
  115.             $this->messageBus->dispatch($message);
  116.             return;
  117.         }
  118.         $this->handle($message);
  119.     }
  120.     private function getIndexer(string $name): ?EntityIndexer
  121.     {
  122.         foreach ($this->indexer as $indexer) {
  123.             if ($indexer->getName() === $name) {
  124.                 return $indexer;
  125.             }
  126.         }
  127.         return null;
  128.     }
  129.     private function iterateIndexer(string $name$offsetbool $useQueue): ?EntityIndexingMessage
  130.     {
  131.         $indexer $this->getIndexer($name);
  132.         if (!$indexer instanceof EntityIndexer) {
  133.             throw new \RuntimeException(sprintf('Entity indexer with name %s not found'$name));
  134.         }
  135.         $message $indexer->iterate($offset);
  136.         if (!$message) {
  137.             return null;
  138.         }
  139.         $message->setIndexer($indexer->getName());
  140.         $this->sendOrHandle($message$useQueue);
  141.         return $message;
  142.     }
  143. }