vendor/3slab/vdm-library-bundle/EventSubscriber/StopWorker/StopWorkerOnEmptyMessageSubscriber.php line 109

Open in your IDE?
  1. <?php
  2. /**
  3.  * @package    3slab/VdmLibraryBundle
  4.  * @copyright  2020 Suez Smart Solutions 3S.lab
  5.  * @license    https://github.com/3slab/VdmLibraryBundle/blob/master/LICENSE
  6.  */
  7. namespace Vdm\Bundle\LibraryBundle\EventSubscriber\StopWorker;
  8. use Psr\Log\LoggerInterface;
  9. use Psr\Log\NullLogger;
  10. use Symfony\Component\EventDispatcher\EventSubscriberInterface;
  11. use Symfony\Component\Messenger\Event\AbstractWorkerMessageEvent;
  12. use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
  13. use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
  14. use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
  15. use Vdm\Bundle\LibraryBundle\Event\CollectWorkerMessageFailedEvent;
  16. use Vdm\Bundle\LibraryBundle\Event\CollectWorkerMessageHandledEvent;
  17. use Vdm\Bundle\LibraryBundle\Event\CollectWorkerMessageReceivedEvent;
  18. use Vdm\Bundle\LibraryBundle\Model\IsEmptyMessageInterface;
  19. use Vdm\Bundle\LibraryBundle\Model\Message;
  20. use Vdm\Bundle\LibraryBundle\Service\StopWorkerService;
  21. /**
  22.  * Class StopWorkerOnEmptyMessageSubscriber
  23.  *
  24.  * @package Vdm\Bundle\LibraryBundle\EventSubscriber\StopWorker
  25.  */
  26. class StopWorkerOnEmptyMessageSubscriber implements EventSubscriberInterface
  27. {
  28.     /**
  29.      * @var StopWorkerService $stopWorker
  30.      */
  31.     private $stopWorker;
  32.     /**
  33.      * @var LoggerInterface
  34.      */
  35.     private $logger;
  36.     /**
  37.      * StopWorkOnEmptyMessageSubscriber constructor.
  38.      *
  39.      * @param StopWorkerService $stopWorker
  40.      * @param LoggerInterface|null $vdmLogger
  41.      */
  42.     public function __construct(StopWorkerService $stopWorkerLoggerInterface $vdmLogger null)
  43.     {
  44.         $this->stopWorker $stopWorker;
  45.         $this->logger $vdmLogger ?? new NullLogger();
  46.     }
  47.     /**
  48.      * Method executed on WorkerMessageFailedEvent event
  49.      *
  50.      * @param WorkerMessageFailedEvent $event
  51.      */
  52.     public function onWorkerMessageFailedEvent(WorkerMessageFailedEvent $event)
  53.     {
  54.         $this->isEmptyMessage($event'WorkerMessageFailedEvent');
  55.     }
  56.     /**
  57.      * Method executed on WorkerMessageHandledEvent event
  58.      *
  59.      * @param WorkerMessageHandledEvent $event
  60.      */
  61.     public function onWorkerMessageHandledEvent(WorkerMessageHandledEvent $event)
  62.     {
  63.         $this->isEmptyMessage($event'WorkerMessageReceivedEvent');
  64.     }
  65.     /**
  66.      * Method executed on WorkerMessageReceivedEvent event
  67.      *
  68.      * @param WorkerMessageReceivedEvent $event
  69.      */
  70.     public function onWorkerMessageReceivedEvent(WorkerMessageReceivedEvent $event)
  71.     {
  72.         $this->isEmptyMessage($event'WorkerMessageReceivedEvent');
  73.     }
  74.     /**
  75.      * Method executed on CollectWorkerMessageFailedEvent event
  76.      *
  77.      * @param CollectWorkerMessageFailedEvent $event
  78.      */
  79.     public function onCollectWorkerMessageFailedEvent(CollectWorkerMessageFailedEvent $event)
  80.     {
  81.         $this->isEmptyMessage($event'CollectWorkerMessageFailedEvent');
  82.     }
  83.     /**
  84.      * Method executed on CollectWorkerMessageHandledEvent event
  85.      *
  86.      * @param CollectWorkerMessageHandledEvent $event
  87.      */
  88.     public function onCollectWorkerMessageHandledEvent(CollectWorkerMessageHandledEvent $event)
  89.     {
  90.         $this->isEmptyMessage($event'CollectWorkerMessageReceivedEvent');
  91.     }
  92.     /**
  93.      * Method executed on CollectWorkerMessageReceivedEvent event
  94.      *
  95.      * @param CollectWorkerMessageReceivedEvent $event
  96.      */
  97.     public function onCollectWorkerMessageReceivedEvent(CollectWorkerMessageReceivedEvent $event)
  98.     {
  99.         $this->isEmptyMessage($event'CollectWorkerMessageReceivedEvent');
  100.     }
  101.     /**
  102.      * @param AbstractWorkerMessageEvent $event
  103.      * @param string $eventName
  104.      */
  105.     protected function isEmptyMessage(AbstractWorkerMessageEvent $eventstring $eventName)
  106.     {
  107.         $message $event->getEnvelope()->getMessage();
  108.         if (!$message instanceof IsEmptyMessageInterface) {
  109.             return;
  110.         }
  111.         if (!$message->isEmpty()) {
  112.             return;
  113.         }
  114.         $this->logger->debug(
  115.             'Empty message detected during event {eventName} so we schedule to stop the worker',
  116.             ['eventName' => $eventName]
  117.         );
  118.         $this->stopWorker->setFlag(true);
  119.         if ($event instanceof WorkerMessageReceivedEvent || $event instanceof CollectWorkerMessageReceivedEvent) {
  120.             $event->shouldHandle(false);
  121.             $this->logger->debug(
  122.                 'Set ShouldHandle flag on {eventName} event to false as the message is empty',
  123.                 ['eventName' => $eventName]
  124.             );
  125.         }
  126.     }
  127.     /**
  128.      * {@inheritDoc}
  129.      * @codeCoverageIgnore
  130.      */
  131.     public static function getSubscribedEvents(): array
  132.     {
  133.         return [
  134.             CollectWorkerMessageFailedEvent::class => 'onCollectWorkerMessageFailedEvent',
  135.             CollectWorkerMessageHandledEvent::class => 'onCollectWorkerMessageHandledEvent',
  136.             CollectWorkerMessageReceivedEvent::class => 'onCollectWorkerMessageReceivedEvent',
  137.             WorkerMessageFailedEvent::class => 'onWorkerMessageFailedEvent',
  138.             WorkerMessageHandledEvent::class => 'onWorkerMessageHandledEvent',
  139.             WorkerMessageReceivedEvent::class => 'onWorkerMessageReceivedEvent',
  140.         ];
  141.     }
  142. }