⚝
One Hat Cyber Team
⚝
Your IP:
216.73.216.1
Server IP:
185.238.29.86
Server:
Linux server2 6.8.12-6-pve #1 SMP PREEMPT_DYNAMIC PMX 6.8.12-6 (2024-12-19T19:05Z) x86_64
Server Software:
nginx/1.18.0
PHP Version:
8.1.31
Buat File
|
Buat Folder
Eksekusi
Dir :
~
/
var
/
www
/
shopware
/
vendor
/
enqueue
/
dbal
/
View File Name :
DbalSubscriptionConsumer.php
<?php declare(strict_types=1); namespace Enqueue\Dbal; use Doctrine\DBAL\Connection; use Interop\Queue\Consumer; use Interop\Queue\SubscriptionConsumer; class DbalSubscriptionConsumer implements SubscriptionConsumer { use DbalConsumerHelperTrait; /** * @var DbalContext */ private $context; /** * an item contains an array: [DbalConsumer $consumer, callable $callback];. * * @var array */ private $subscribers; /** * @var \Doctrine\DBAL\Connection */ private $dbal; /** * Default 20 minutes in milliseconds. * * @var int */ private $redeliveryDelay; /** * Time to wait between subscription requests in milliseconds. * * @var int */ private $pollingInterval = 200; public function __construct(DbalContext $context) { $this->context = $context; $this->dbal = $this->context->getDbalConnection(); $this->subscribers = []; $this->redeliveryDelay = 1200000; } /** * Get interval between retrying failed messages in milliseconds. */ public function getRedeliveryDelay(): int { return $this->redeliveryDelay; } public function setRedeliveryDelay(int $redeliveryDelay): self { $this->redeliveryDelay = $redeliveryDelay; return $this; } public function getPollingInterval(): int { return $this->pollingInterval; } public function setPollingInterval(int $msec): self { $this->pollingInterval = $msec; return $this; } public function consume(int $timeout = 0): void { if (empty($this->subscribers)) { throw new \LogicException('No subscribers'); } $queueNames = []; foreach (array_keys($this->subscribers) as $queueName) { $queueNames[$queueName] = $queueName; } $timeout /= 1000; $now = time(); $redeliveryDelay = $this->getRedeliveryDelay() / 1000; // milliseconds to seconds $currentQueueNames = []; $queueConsumed = false; while (true) { if (empty($currentQueueNames)) { $currentQueueNames = $queueNames; $queueConsumed = false; } $this->removeExpiredMessages(); $this->redeliverMessages(); if ($message = $this->fetchMessage($currentQueueNames, $redeliveryDelay)) { $queueConsumed = true; /** * @var DbalConsumer $consumer * @var callable $callback */ [$consumer, $callback] = $this->subscribers[$message->getQueue()]; if (false === call_user_func($callback, $message, $consumer)) { return; } unset($currentQueueNames[$message->getQueue()]); } else { $currentQueueNames = []; if (!$queueConsumed) { usleep($this->getPollingInterval() * 1000); } } if ($timeout && microtime(true) >= $now + $timeout) { return; } } } /** * @param DbalConsumer $consumer */ public function subscribe(Consumer $consumer, callable $callback): void { if (false == $consumer instanceof DbalConsumer) { throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', DbalConsumer::class, get_class($consumer))); } $queueName = $consumer->getQueue()->getQueueName(); if (array_key_exists($queueName, $this->subscribers)) { if ($this->subscribers[$queueName][0] === $consumer && $this->subscribers[$queueName][1] === $callback) { return; } throw new \InvalidArgumentException(sprintf('There is a consumer subscribed to queue: "%s"', $queueName)); } $this->subscribers[$queueName] = [$consumer, $callback]; } /** * @param DbalConsumer $consumer */ public function unsubscribe(Consumer $consumer): void { if (false == $consumer instanceof DbalConsumer) { throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', DbalConsumer::class, get_class($consumer))); } $queueName = $consumer->getQueue()->getQueueName(); if (false == array_key_exists($queueName, $this->subscribers)) { return; } if ($this->subscribers[$queueName][0] !== $consumer) { return; } unset($this->subscribers[$queueName]); } public function unsubscribeAll(): void { $this->subscribers = []; } protected function getContext(): DbalContext { return $this->context; } protected function getConnection(): Connection { return $this->dbal; } }