From 67d91d5fc52c9b63a6e2118c10569d776074f174 Mon Sep 17 00:00:00 2001 From: Alejandro Celaya Date: Sun, 24 Jul 2022 10:12:26 +0200 Subject: [PATCH] Migrated rabbit integration to RabbitMqPublishingHelper from shlink-common --- composer.json | 3 +- config/autoload/rabbit.global.php | 29 -------- .../Core/config/event_dispatcher.config.php | 12 +-- .../Mercure/NotifyNewShortUrlToMercure.php | 2 +- .../RabbitMq/NotifyNewShortUrlToRabbitMq.php | 73 ++++++++++++++++++- .../RabbitMq/NotifyVisitToRabbitMq.php | 44 +++-------- .../RabbitMq/NotifyVisitToRabbitMqTest.php | 52 ++++--------- 7 files changed, 104 insertions(+), 111 deletions(-) diff --git a/composer.json b/composer.json index 02a22264..f85c1a83 100644 --- a/composer.json +++ b/composer.json @@ -40,12 +40,11 @@ "mlocati/ip-lib": "^1.17", "ocramius/proxy-manager": "^2.11", "pagerfanta/core": "^3.5", - "php-amqplib/php-amqplib": "^3.1", "php-middleware/request-id": "^4.1", "predis/predis": "^1.1", "pugx/shortid-php": "^1.0", "ramsey/uuid": "^4.2", - "shlinkio/shlink-common": "dev-main#3244088 as 4.5", + "shlinkio/shlink-common": "dev-main#0396706 as 4.5", "shlinkio/shlink-config": "^1.6", "shlinkio/shlink-event-dispatcher": "^2.4", "shlinkio/shlink-importer": "^3.0", diff --git a/config/autoload/rabbit.global.php b/config/autoload/rabbit.global.php index a9764c8c..6f63eca6 100644 --- a/config/autoload/rabbit.global.php +++ b/config/autoload/rabbit.global.php @@ -2,9 +2,6 @@ declare(strict_types=1); -use Laminas\ServiceManager\AbstractFactory\ConfigAbstractFactory; -use Laminas\ServiceManager\Proxy\LazyServiceFactory; -use PhpAmqpLib\Connection\AMQPStreamConnection; use Shlinkio\Shlink\Core\Config\EnvVars; return [ @@ -18,30 +15,4 @@ return [ 'vhost' => EnvVars::RABBITMQ_VHOST->loadFromEnv('/'), ], - 'dependencies' => [ - 'factories' => [ - AMQPStreamConnection::class => ConfigAbstractFactory::class, - ], - 'delegators' => [ - AMQPStreamConnection::class => [ - LazyServiceFactory::class, - ], - ], - 'lazy_services' => [ - 'class_map' => [ - AMQPStreamConnection::class => AMQPStreamConnection::class, - ], - ], - ], - - ConfigAbstractFactory::class => [ - AMQPStreamConnection::class => [ - 'config.rabbitmq.host', - 'config.rabbitmq.port', - 'config.rabbitmq.user', - 'config.rabbitmq.password', - 'config.rabbitmq.vhost', - ], - ], - ]; diff --git a/module/Core/config/event_dispatcher.config.php b/module/Core/config/event_dispatcher.config.php index e1342ebf..be4872e2 100644 --- a/module/Core/config/event_dispatcher.config.php +++ b/module/Core/config/event_dispatcher.config.php @@ -5,9 +5,9 @@ declare(strict_types=1); namespace Shlinkio\Shlink\Core; use Laminas\ServiceManager\AbstractFactory\ConfigAbstractFactory; -use PhpAmqpLib\Connection\AMQPStreamConnection; use Psr\EventDispatcher\EventDispatcherInterface; use Shlinkio\Shlink\CLI\Util\GeolocationDbUpdater; +use Shlinkio\Shlink\Common\RabbitMq\RabbitMqPublishingHelper; use Shlinkio\Shlink\IpGeolocation\GeoLite2\DbUpdater; use Shlinkio\Shlink\IpGeolocation\Resolver\IpLocationResolverInterface; use Symfony\Component\Mercure\Hub; @@ -27,10 +27,10 @@ return [ EventDispatcher\NotifyVisitToWebHooks::class, EventDispatcher\UpdateGeoLiteDb::class, ], -// EventDispatcher\Event\ShortUrlCreated::class => [ -// EventDispatcher\Mercure\NotifyNewShortUrlToMercure::class, -// EventDispatcher\RabbitMq\NotifyNewShortUrlToRabbitMq::class, -// ], + EventDispatcher\Event\ShortUrlCreated::class => [ + EventDispatcher\Mercure\NotifyNewShortUrlToMercure::class, + EventDispatcher\RabbitMq\NotifyNewShortUrlToRabbitMq::class, + ], ], ], @@ -79,7 +79,7 @@ return [ 'Logger_Shlink', ], EventDispatcher\RabbitMq\NotifyVisitToRabbitMq::class => [ - AMQPStreamConnection::class, + RabbitMqPublishingHelper::class, 'em', 'Logger_Shlink', Visit\Transformer\OrphanVisitDataTransformer::class, diff --git a/module/Core/src/EventDispatcher/Mercure/NotifyNewShortUrlToMercure.php b/module/Core/src/EventDispatcher/Mercure/NotifyNewShortUrlToMercure.php index 51fa3b58..4ebb8536 100644 --- a/module/Core/src/EventDispatcher/Mercure/NotifyNewShortUrlToMercure.php +++ b/module/Core/src/EventDispatcher/Mercure/NotifyNewShortUrlToMercure.php @@ -8,7 +8,7 @@ use Shlinkio\Shlink\Core\EventDispatcher\Event\ShortUrlCreated; class NotifyNewShortUrlToMercure { - public function __invoke(ShortUrlCreated $shortUrlCreated) + public function __invoke(ShortUrlCreated $shortUrlCreated): void { // TODO: Implement __invoke() method. } diff --git a/module/Core/src/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMq.php b/module/Core/src/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMq.php index dfce8419..ef38c356 100644 --- a/module/Core/src/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMq.php +++ b/module/Core/src/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMq.php @@ -4,12 +4,81 @@ declare(strict_types=1); namespace Shlinkio\Shlink\Core\EventDispatcher\RabbitMq; +use Doctrine\ORM\EntityManagerInterface; +use PhpAmqpLib\Connection\AMQPStreamConnection; +use PhpAmqpLib\Exchange\AMQPExchangeType; +use PhpAmqpLib\Message\AMQPMessage; +use Psr\Log\LoggerInterface; +use Shlinkio\Shlink\Common\Rest\DataTransformerInterface; +use Shlinkio\Shlink\Core\Entity\ShortUrl; use Shlinkio\Shlink\Core\EventDispatcher\Event\ShortUrlCreated; +use Throwable; + +use function Shlinkio\Shlink\Common\json_encode; class NotifyNewShortUrlToRabbitMq { - public function __invoke(ShortUrlCreated $shortUrlCreated) + private const NEW_SHORT_URL_QUEUE = 'https://shlink.io/new-short-url'; + + public function __construct( + private readonly AMQPStreamConnection $connection, + private readonly EntityManagerInterface $em, + private readonly LoggerInterface $logger, + private readonly DataTransformerInterface $shortUrlTransformer, + private readonly bool $isEnabled, + ) { + } + + public function __invoke(ShortUrlCreated $shortUrlCreated): void { - // TODO: Implement __invoke() method. + if (! $this->isEnabled) { + return; + } + + $shortUrlId = $shortUrlCreated->shortUrlId; + $shortUrl = $this->em->find(ShortUrl::class, $shortUrlId); + + if ($shortUrl === null) { + $this->logger->warning( + 'Tried to notify RabbitMQ for new short URL with id "{shortUrlId}", but it does not exist.', + ['shortUrlId' => $shortUrlId], + ); + return; + } + + if (! $this->connection->isConnected()) { + $this->connection->reconnect(); + } + + $queue = self::NEW_SHORT_URL_QUEUE; + $message = $this->shortUrlToMessage($shortUrl); + + try { + $channel = $this->connection->channel(); + + // Declare an exchange and a queue that will persist server restarts + $exchange = $queue; // We use the same name for the exchange and the queue + $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false); + $channel->queue_declare($queue, false, true, false, false); + + // Bind the exchange and the queue together, and publish the message + $channel->queue_bind($queue, $exchange); + $channel->basic_publish($message, $exchange); + + $channel->close(); + } catch (Throwable $e) { + $this->logger->debug('Error while trying to notify RabbitMQ with new short URL. {e}', ['e' => $e]); + } finally { + $this->connection->close(); + } + } + + private function shortUrlToMessage(ShortUrl $shortUrl): AMQPMessage + { + $messageBody = json_encode($this->shortUrlTransformer->transform($shortUrl)); + return new AMQPMessage($messageBody, [ + 'content_type' => 'application/json', + 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, + ]); } } diff --git a/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php b/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php index bec1ef94..929e37c0 100644 --- a/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php +++ b/module/Core/src/EventDispatcher/RabbitMq/NotifyVisitToRabbitMq.php @@ -5,16 +5,13 @@ declare(strict_types=1); namespace Shlinkio\Shlink\Core\EventDispatcher\RabbitMq; use Doctrine\ORM\EntityManagerInterface; -use PhpAmqpLib\Connection\AMQPStreamConnection; -use PhpAmqpLib\Exchange\AMQPExchangeType; -use PhpAmqpLib\Message\AMQPMessage; use Psr\Log\LoggerInterface; +use Shlinkio\Shlink\Common\RabbitMq\RabbitMqPublishingHelperInterface; use Shlinkio\Shlink\Common\Rest\DataTransformerInterface; use Shlinkio\Shlink\Core\Entity\Visit; use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated; use Throwable; -use function Shlinkio\Shlink\Common\json_encode; use function sprintf; class NotifyVisitToRabbitMq @@ -23,11 +20,11 @@ class NotifyVisitToRabbitMq private const NEW_ORPHAN_VISIT_QUEUE = 'https://shlink.io/new-orphan-visit'; public function __construct( - private AMQPStreamConnection $connection, - private EntityManagerInterface $em, - private LoggerInterface $logger, - private DataTransformerInterface $orphanVisitTransformer, - private bool $isEnabled, + private readonly RabbitMqPublishingHelperInterface $rabbitMqHelper, + private readonly EntityManagerInterface $em, + private readonly LoggerInterface $logger, + private readonly DataTransformerInterface $orphanVisitTransformer, + private readonly bool $isEnabled, ) { } @@ -47,32 +44,15 @@ class NotifyVisitToRabbitMq return; } - if (! $this->connection->isConnected()) { - $this->connection->reconnect(); - } - $queues = $this->determineQueuesToPublishTo($visit); - $message = $this->visitToMessage($visit); + $payload = $this->visitToPayload($visit); try { - $channel = $this->connection->channel(); - foreach ($queues as $queue) { - // Declare an exchange and a queue that will persist server restarts - $exchange = $queue; // We use the same name for the exchange and the queue - $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false); - $channel->queue_declare($queue, false, true, false, false); - - // Bind the exchange and the queue together, and publish the message - $channel->queue_bind($queue, $exchange); - $channel->basic_publish($message, $exchange); + $this->rabbitMqHelper->publishPayloadInQueue($payload, $queue); } - - $channel->close(); } catch (Throwable $e) { $this->logger->debug('Error while trying to notify RabbitMQ with new visit. {e}', ['e' => $e]); - } finally { - $this->connection->close(); } } @@ -91,12 +71,8 @@ class NotifyVisitToRabbitMq ]; } - private function visitToMessage(Visit $visit): AMQPMessage + private function visitToPayload(Visit $visit): array { - $messageBody = json_encode(! $visit->isOrphan() ? $visit : $this->orphanVisitTransformer->transform($visit)); - return new AMQPMessage($messageBody, [ - 'content_type' => 'application/json', - 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, - ]); + return ! $visit->isOrphan() ? $visit->jsonSerialize() : $this->orphanVisitTransformer->transform($visit); } } diff --git a/module/Core/test/EventDispatcher/RabbitMq/NotifyVisitToRabbitMqTest.php b/module/Core/test/EventDispatcher/RabbitMq/NotifyVisitToRabbitMqTest.php index c39407f8..942a7bdc 100644 --- a/module/Core/test/EventDispatcher/RabbitMq/NotifyVisitToRabbitMqTest.php +++ b/module/Core/test/EventDispatcher/RabbitMq/NotifyVisitToRabbitMqTest.php @@ -7,14 +7,13 @@ namespace ShlinkioTest\Shlink\Core\EventDispatcher\RabbitMq; use Doctrine\ORM\EntityManagerInterface; use DomainException; use Exception; -use PhpAmqpLib\Channel\AMQPChannel; -use PhpAmqpLib\Connection\AMQPStreamConnection; use PHPUnit\Framework\TestCase; use Prophecy\Argument; use Prophecy\PhpUnit\ProphecyTrait; use Prophecy\Prophecy\ObjectProphecy; use Psr\Log\LoggerInterface; use RuntimeException; +use Shlinkio\Shlink\Common\RabbitMq\RabbitMqPublishingHelperInterface; use Shlinkio\Shlink\Core\Entity\ShortUrl; use Shlinkio\Shlink\Core\Entity\Visit; use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated; @@ -32,25 +31,19 @@ class NotifyVisitToRabbitMqTest extends TestCase use ProphecyTrait; private NotifyVisitToRabbitMq $listener; - private ObjectProphecy $connection; + private ObjectProphecy $helper; private ObjectProphecy $em; private ObjectProphecy $logger; private ObjectProphecy $orphanVisitTransformer; - private ObjectProphecy $channel; protected function setUp(): void { - $this->channel = $this->prophesize(AMQPChannel::class); - - $this->connection = $this->prophesize(AMQPStreamConnection::class); - $this->connection->isConnected()->willReturn(false); - $this->connection->channel()->willReturn($this->channel->reveal()); - + $this->helper = $this->prophesize(RabbitMqPublishingHelperInterface::class); $this->em = $this->prophesize(EntityManagerInterface::class); $this->logger = $this->prophesize(LoggerInterface::class); $this->listener = new NotifyVisitToRabbitMq( - $this->connection->reveal(), + $this->helper->reveal(), $this->em->reveal(), $this->logger->reveal(), new OrphanVisitDataTransformer(), @@ -61,8 +54,8 @@ class NotifyVisitToRabbitMqTest extends TestCase /** @test */ public function doesNothingWhenTheFeatureIsNotEnabled(): void { - $listener = new \Shlinkio\Shlink\Core\EventDispatcher\RabbitMq\NotifyVisitToRabbitMq( - $this->connection->reveal(), + $listener = new NotifyVisitToRabbitMq( + $this->helper->reveal(), $this->em->reveal(), $this->logger->reveal(), new OrphanVisitDataTransformer(), @@ -74,8 +67,7 @@ class NotifyVisitToRabbitMqTest extends TestCase $this->em->find(Argument::cetera())->shouldNotHaveBeenCalled(); $this->logger->warning(Argument::cetera())->shouldNotHaveBeenCalled(); $this->logger->debug(Argument::cetera())->shouldNotHaveBeenCalled(); - $this->connection->isConnected()->shouldNotHaveBeenCalled(); - $this->connection->close()->shouldNotHaveBeenCalled(); + $this->helper->publishPayloadInQueue(Argument::cetera())->shouldNotHaveBeenCalled(); } /** @test */ @@ -93,8 +85,7 @@ class NotifyVisitToRabbitMqTest extends TestCase $findVisit->shouldHaveBeenCalledOnce(); $logWarning->shouldHaveBeenCalledOnce(); $this->logger->debug(Argument::cetera())->shouldNotHaveBeenCalled(); - $this->connection->isConnected()->shouldNotHaveBeenCalled(); - $this->connection->close()->shouldNotHaveBeenCalled(); + $this->helper->publishPayloadInQueue(Argument::cetera())->shouldNotHaveBeenCalled(); } /** @@ -105,27 +96,17 @@ class NotifyVisitToRabbitMqTest extends TestCase { $visitId = '123'; $findVisit = $this->em->find(Visit::class, $visitId)->willReturn($visit); - $argumentWithExpectedChannel = Argument::that(fn (string $channel) => contains($expectedChannels, $channel)); + $argumentWithExpectedChannels = Argument::that( + static fn (string $channel) => contains($expectedChannels, $channel), + ); ($this->listener)(new VisitLocated($visitId)); $findVisit->shouldHaveBeenCalledOnce(); - $this->channel->exchange_declare($argumentWithExpectedChannel, Argument::cetera())->shouldHaveBeenCalledTimes( - count($expectedChannels), - ); - $this->channel->queue_declare($argumentWithExpectedChannel, Argument::cetera())->shouldHaveBeenCalledTimes( - count($expectedChannels), - ); - $this->channel->queue_bind( - $argumentWithExpectedChannel, - $argumentWithExpectedChannel, + $this->helper->publishPayloadInQueue( + Argument::type('array'), + $argumentWithExpectedChannels, )->shouldHaveBeenCalledTimes(count($expectedChannels)); - $this->channel->basic_publish(Argument::any(), $argumentWithExpectedChannel)->shouldHaveBeenCalledTimes( - count($expectedChannels), - ); - $this->channel->close()->shouldHaveBeenCalledOnce(); - $this->connection->reconnect()->shouldHaveBeenCalledOnce(); - $this->connection->close()->shouldHaveBeenCalledOnce(); $this->logger->debug(Argument::cetera())->shouldNotHaveBeenCalled(); } @@ -154,7 +135,7 @@ class NotifyVisitToRabbitMqTest extends TestCase { $visitId = '123'; $findVisit = $this->em->find(Visit::class, $visitId)->willReturn(Visit::forBasePath(Visitor::emptyInstance())); - $channel = $this->connection->channel()->willThrow($e); + $channel = $this->helper->publishPayloadInQueue(Argument::cetera())->willThrow($e); ($this->listener)(new VisitLocated($visitId)); @@ -162,11 +143,8 @@ class NotifyVisitToRabbitMqTest extends TestCase 'Error while trying to notify RabbitMQ with new visit. {e}', ['e' => $e], )->shouldHaveBeenCalledOnce(); - $this->connection->close()->shouldHaveBeenCalledOnce(); - $this->connection->reconnect()->shouldHaveBeenCalledOnce(); $findVisit->shouldHaveBeenCalledOnce(); $channel->shouldHaveBeenCalledOnce(); - $this->channel->close()->shouldNotHaveBeenCalled(); } public function provideExceptions(): iterable