Migrated rabbit integration to RabbitMqPublishingHelper from shlink-common

This commit is contained in:
Alejandro Celaya 2022-07-24 10:12:26 +02:00
parent f832c56adb
commit 67d91d5fc5
7 changed files with 104 additions and 111 deletions

View File

@ -40,12 +40,11 @@
"mlocati/ip-lib": "^1.17", "mlocati/ip-lib": "^1.17",
"ocramius/proxy-manager": "^2.11", "ocramius/proxy-manager": "^2.11",
"pagerfanta/core": "^3.5", "pagerfanta/core": "^3.5",
"php-amqplib/php-amqplib": "^3.1",
"php-middleware/request-id": "^4.1", "php-middleware/request-id": "^4.1",
"predis/predis": "^1.1", "predis/predis": "^1.1",
"pugx/shortid-php": "^1.0", "pugx/shortid-php": "^1.0",
"ramsey/uuid": "^4.2", "ramsey/uuid": "^4.2",
"shlinkio/shlink-common": "dev-main#3244088 as 4.5", "shlinkio/shlink-common": "dev-main#0396706 as 4.5",
"shlinkio/shlink-config": "^1.6", "shlinkio/shlink-config": "^1.6",
"shlinkio/shlink-event-dispatcher": "^2.4", "shlinkio/shlink-event-dispatcher": "^2.4",
"shlinkio/shlink-importer": "^3.0", "shlinkio/shlink-importer": "^3.0",

View File

@ -2,9 +2,6 @@
declare(strict_types=1); declare(strict_types=1);
use Laminas\ServiceManager\AbstractFactory\ConfigAbstractFactory;
use Laminas\ServiceManager\Proxy\LazyServiceFactory;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use Shlinkio\Shlink\Core\Config\EnvVars; use Shlinkio\Shlink\Core\Config\EnvVars;
return [ return [
@ -18,30 +15,4 @@ return [
'vhost' => EnvVars::RABBITMQ_VHOST->loadFromEnv('/'), 'vhost' => EnvVars::RABBITMQ_VHOST->loadFromEnv('/'),
], ],
'dependencies' => [
'factories' => [
AMQPStreamConnection::class => ConfigAbstractFactory::class,
],
'delegators' => [
AMQPStreamConnection::class => [
LazyServiceFactory::class,
],
],
'lazy_services' => [
'class_map' => [
AMQPStreamConnection::class => AMQPStreamConnection::class,
],
],
],
ConfigAbstractFactory::class => [
AMQPStreamConnection::class => [
'config.rabbitmq.host',
'config.rabbitmq.port',
'config.rabbitmq.user',
'config.rabbitmq.password',
'config.rabbitmq.vhost',
],
],
]; ];

View File

@ -5,9 +5,9 @@ declare(strict_types=1);
namespace Shlinkio\Shlink\Core; namespace Shlinkio\Shlink\Core;
use Laminas\ServiceManager\AbstractFactory\ConfigAbstractFactory; use Laminas\ServiceManager\AbstractFactory\ConfigAbstractFactory;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use Psr\EventDispatcher\EventDispatcherInterface; use Psr\EventDispatcher\EventDispatcherInterface;
use Shlinkio\Shlink\CLI\Util\GeolocationDbUpdater; use Shlinkio\Shlink\CLI\Util\GeolocationDbUpdater;
use Shlinkio\Shlink\Common\RabbitMq\RabbitMqPublishingHelper;
use Shlinkio\Shlink\IpGeolocation\GeoLite2\DbUpdater; use Shlinkio\Shlink\IpGeolocation\GeoLite2\DbUpdater;
use Shlinkio\Shlink\IpGeolocation\Resolver\IpLocationResolverInterface; use Shlinkio\Shlink\IpGeolocation\Resolver\IpLocationResolverInterface;
use Symfony\Component\Mercure\Hub; use Symfony\Component\Mercure\Hub;
@ -27,10 +27,10 @@ return [
EventDispatcher\NotifyVisitToWebHooks::class, EventDispatcher\NotifyVisitToWebHooks::class,
EventDispatcher\UpdateGeoLiteDb::class, EventDispatcher\UpdateGeoLiteDb::class,
], ],
// EventDispatcher\Event\ShortUrlCreated::class => [ EventDispatcher\Event\ShortUrlCreated::class => [
// EventDispatcher\Mercure\NotifyNewShortUrlToMercure::class, EventDispatcher\Mercure\NotifyNewShortUrlToMercure::class,
// EventDispatcher\RabbitMq\NotifyNewShortUrlToRabbitMq::class, EventDispatcher\RabbitMq\NotifyNewShortUrlToRabbitMq::class,
// ], ],
], ],
], ],
@ -79,7 +79,7 @@ return [
'Logger_Shlink', 'Logger_Shlink',
], ],
EventDispatcher\RabbitMq\NotifyVisitToRabbitMq::class => [ EventDispatcher\RabbitMq\NotifyVisitToRabbitMq::class => [
AMQPStreamConnection::class, RabbitMqPublishingHelper::class,
'em', 'em',
'Logger_Shlink', 'Logger_Shlink',
Visit\Transformer\OrphanVisitDataTransformer::class, Visit\Transformer\OrphanVisitDataTransformer::class,

View File

@ -8,7 +8,7 @@ use Shlinkio\Shlink\Core\EventDispatcher\Event\ShortUrlCreated;
class NotifyNewShortUrlToMercure class NotifyNewShortUrlToMercure
{ {
public function __invoke(ShortUrlCreated $shortUrlCreated) public function __invoke(ShortUrlCreated $shortUrlCreated): void
{ {
// TODO: Implement __invoke() method. // TODO: Implement __invoke() method.
} }

View File

@ -4,12 +4,81 @@ declare(strict_types=1);
namespace Shlinkio\Shlink\Core\EventDispatcher\RabbitMq; namespace Shlinkio\Shlink\Core\EventDispatcher\RabbitMq;
use Doctrine\ORM\EntityManagerInterface;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\Log\LoggerInterface;
use Shlinkio\Shlink\Common\Rest\DataTransformerInterface;
use Shlinkio\Shlink\Core\Entity\ShortUrl;
use Shlinkio\Shlink\Core\EventDispatcher\Event\ShortUrlCreated; use Shlinkio\Shlink\Core\EventDispatcher\Event\ShortUrlCreated;
use Throwable;
use function Shlinkio\Shlink\Common\json_encode;
class NotifyNewShortUrlToRabbitMq class NotifyNewShortUrlToRabbitMq
{ {
public function __invoke(ShortUrlCreated $shortUrlCreated) private const NEW_SHORT_URL_QUEUE = 'https://shlink.io/new-short-url';
public function __construct(
private readonly AMQPStreamConnection $connection,
private readonly EntityManagerInterface $em,
private readonly LoggerInterface $logger,
private readonly DataTransformerInterface $shortUrlTransformer,
private readonly bool $isEnabled,
) {
}
public function __invoke(ShortUrlCreated $shortUrlCreated): void
{ {
// TODO: Implement __invoke() method. if (! $this->isEnabled) {
return;
}
$shortUrlId = $shortUrlCreated->shortUrlId;
$shortUrl = $this->em->find(ShortUrl::class, $shortUrlId);
if ($shortUrl === null) {
$this->logger->warning(
'Tried to notify RabbitMQ for new short URL with id "{shortUrlId}", but it does not exist.',
['shortUrlId' => $shortUrlId],
);
return;
}
if (! $this->connection->isConnected()) {
$this->connection->reconnect();
}
$queue = self::NEW_SHORT_URL_QUEUE;
$message = $this->shortUrlToMessage($shortUrl);
try {
$channel = $this->connection->channel();
// Declare an exchange and a queue that will persist server restarts
$exchange = $queue; // We use the same name for the exchange and the queue
$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
$channel->queue_declare($queue, false, true, false, false);
// Bind the exchange and the queue together, and publish the message
$channel->queue_bind($queue, $exchange);
$channel->basic_publish($message, $exchange);
$channel->close();
} catch (Throwable $e) {
$this->logger->debug('Error while trying to notify RabbitMQ with new short URL. {e}', ['e' => $e]);
} finally {
$this->connection->close();
}
}
private function shortUrlToMessage(ShortUrl $shortUrl): AMQPMessage
{
$messageBody = json_encode($this->shortUrlTransformer->transform($shortUrl));
return new AMQPMessage($messageBody, [
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]);
} }
} }

View File

@ -5,16 +5,13 @@ declare(strict_types=1);
namespace Shlinkio\Shlink\Core\EventDispatcher\RabbitMq; namespace Shlinkio\Shlink\Core\EventDispatcher\RabbitMq;
use Doctrine\ORM\EntityManagerInterface; use Doctrine\ORM\EntityManagerInterface;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
use Shlinkio\Shlink\Common\RabbitMq\RabbitMqPublishingHelperInterface;
use Shlinkio\Shlink\Common\Rest\DataTransformerInterface; use Shlinkio\Shlink\Common\Rest\DataTransformerInterface;
use Shlinkio\Shlink\Core\Entity\Visit; use Shlinkio\Shlink\Core\Entity\Visit;
use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated; use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated;
use Throwable; use Throwable;
use function Shlinkio\Shlink\Common\json_encode;
use function sprintf; use function sprintf;
class NotifyVisitToRabbitMq class NotifyVisitToRabbitMq
@ -23,11 +20,11 @@ class NotifyVisitToRabbitMq
private const NEW_ORPHAN_VISIT_QUEUE = 'https://shlink.io/new-orphan-visit'; private const NEW_ORPHAN_VISIT_QUEUE = 'https://shlink.io/new-orphan-visit';
public function __construct( public function __construct(
private AMQPStreamConnection $connection, private readonly RabbitMqPublishingHelperInterface $rabbitMqHelper,
private EntityManagerInterface $em, private readonly EntityManagerInterface $em,
private LoggerInterface $logger, private readonly LoggerInterface $logger,
private DataTransformerInterface $orphanVisitTransformer, private readonly DataTransformerInterface $orphanVisitTransformer,
private bool $isEnabled, private readonly bool $isEnabled,
) { ) {
} }
@ -47,32 +44,15 @@ class NotifyVisitToRabbitMq
return; return;
} }
if (! $this->connection->isConnected()) {
$this->connection->reconnect();
}
$queues = $this->determineQueuesToPublishTo($visit); $queues = $this->determineQueuesToPublishTo($visit);
$message = $this->visitToMessage($visit); $payload = $this->visitToPayload($visit);
try { try {
$channel = $this->connection->channel();
foreach ($queues as $queue) { foreach ($queues as $queue) {
// Declare an exchange and a queue that will persist server restarts $this->rabbitMqHelper->publishPayloadInQueue($payload, $queue);
$exchange = $queue; // We use the same name for the exchange and the queue
$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
$channel->queue_declare($queue, false, true, false, false);
// Bind the exchange and the queue together, and publish the message
$channel->queue_bind($queue, $exchange);
$channel->basic_publish($message, $exchange);
} }
$channel->close();
} catch (Throwable $e) { } catch (Throwable $e) {
$this->logger->debug('Error while trying to notify RabbitMQ with new visit. {e}', ['e' => $e]); $this->logger->debug('Error while trying to notify RabbitMQ with new visit. {e}', ['e' => $e]);
} finally {
$this->connection->close();
} }
} }
@ -91,12 +71,8 @@ class NotifyVisitToRabbitMq
]; ];
} }
private function visitToMessage(Visit $visit): AMQPMessage private function visitToPayload(Visit $visit): array
{ {
$messageBody = json_encode(! $visit->isOrphan() ? $visit : $this->orphanVisitTransformer->transform($visit)); return ! $visit->isOrphan() ? $visit->jsonSerialize() : $this->orphanVisitTransformer->transform($visit);
return new AMQPMessage($messageBody, [
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]);
} }
} }

View File

@ -7,14 +7,13 @@ namespace ShlinkioTest\Shlink\Core\EventDispatcher\RabbitMq;
use Doctrine\ORM\EntityManagerInterface; use Doctrine\ORM\EntityManagerInterface;
use DomainException; use DomainException;
use Exception; use Exception;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PHPUnit\Framework\TestCase; use PHPUnit\Framework\TestCase;
use Prophecy\Argument; use Prophecy\Argument;
use Prophecy\PhpUnit\ProphecyTrait; use Prophecy\PhpUnit\ProphecyTrait;
use Prophecy\Prophecy\ObjectProphecy; use Prophecy\Prophecy\ObjectProphecy;
use Psr\Log\LoggerInterface; use Psr\Log\LoggerInterface;
use RuntimeException; use RuntimeException;
use Shlinkio\Shlink\Common\RabbitMq\RabbitMqPublishingHelperInterface;
use Shlinkio\Shlink\Core\Entity\ShortUrl; use Shlinkio\Shlink\Core\Entity\ShortUrl;
use Shlinkio\Shlink\Core\Entity\Visit; use Shlinkio\Shlink\Core\Entity\Visit;
use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated; use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated;
@ -32,25 +31,19 @@ class NotifyVisitToRabbitMqTest extends TestCase
use ProphecyTrait; use ProphecyTrait;
private NotifyVisitToRabbitMq $listener; private NotifyVisitToRabbitMq $listener;
private ObjectProphecy $connection; private ObjectProphecy $helper;
private ObjectProphecy $em; private ObjectProphecy $em;
private ObjectProphecy $logger; private ObjectProphecy $logger;
private ObjectProphecy $orphanVisitTransformer; private ObjectProphecy $orphanVisitTransformer;
private ObjectProphecy $channel;
protected function setUp(): void protected function setUp(): void
{ {
$this->channel = $this->prophesize(AMQPChannel::class); $this->helper = $this->prophesize(RabbitMqPublishingHelperInterface::class);
$this->connection = $this->prophesize(AMQPStreamConnection::class);
$this->connection->isConnected()->willReturn(false);
$this->connection->channel()->willReturn($this->channel->reveal());
$this->em = $this->prophesize(EntityManagerInterface::class); $this->em = $this->prophesize(EntityManagerInterface::class);
$this->logger = $this->prophesize(LoggerInterface::class); $this->logger = $this->prophesize(LoggerInterface::class);
$this->listener = new NotifyVisitToRabbitMq( $this->listener = new NotifyVisitToRabbitMq(
$this->connection->reveal(), $this->helper->reveal(),
$this->em->reveal(), $this->em->reveal(),
$this->logger->reveal(), $this->logger->reveal(),
new OrphanVisitDataTransformer(), new OrphanVisitDataTransformer(),
@ -61,8 +54,8 @@ class NotifyVisitToRabbitMqTest extends TestCase
/** @test */ /** @test */
public function doesNothingWhenTheFeatureIsNotEnabled(): void public function doesNothingWhenTheFeatureIsNotEnabled(): void
{ {
$listener = new \Shlinkio\Shlink\Core\EventDispatcher\RabbitMq\NotifyVisitToRabbitMq( $listener = new NotifyVisitToRabbitMq(
$this->connection->reveal(), $this->helper->reveal(),
$this->em->reveal(), $this->em->reveal(),
$this->logger->reveal(), $this->logger->reveal(),
new OrphanVisitDataTransformer(), new OrphanVisitDataTransformer(),
@ -74,8 +67,7 @@ class NotifyVisitToRabbitMqTest extends TestCase
$this->em->find(Argument::cetera())->shouldNotHaveBeenCalled(); $this->em->find(Argument::cetera())->shouldNotHaveBeenCalled();
$this->logger->warning(Argument::cetera())->shouldNotHaveBeenCalled(); $this->logger->warning(Argument::cetera())->shouldNotHaveBeenCalled();
$this->logger->debug(Argument::cetera())->shouldNotHaveBeenCalled(); $this->logger->debug(Argument::cetera())->shouldNotHaveBeenCalled();
$this->connection->isConnected()->shouldNotHaveBeenCalled(); $this->helper->publishPayloadInQueue(Argument::cetera())->shouldNotHaveBeenCalled();
$this->connection->close()->shouldNotHaveBeenCalled();
} }
/** @test */ /** @test */
@ -93,8 +85,7 @@ class NotifyVisitToRabbitMqTest extends TestCase
$findVisit->shouldHaveBeenCalledOnce(); $findVisit->shouldHaveBeenCalledOnce();
$logWarning->shouldHaveBeenCalledOnce(); $logWarning->shouldHaveBeenCalledOnce();
$this->logger->debug(Argument::cetera())->shouldNotHaveBeenCalled(); $this->logger->debug(Argument::cetera())->shouldNotHaveBeenCalled();
$this->connection->isConnected()->shouldNotHaveBeenCalled(); $this->helper->publishPayloadInQueue(Argument::cetera())->shouldNotHaveBeenCalled();
$this->connection->close()->shouldNotHaveBeenCalled();
} }
/** /**
@ -105,27 +96,17 @@ class NotifyVisitToRabbitMqTest extends TestCase
{ {
$visitId = '123'; $visitId = '123';
$findVisit = $this->em->find(Visit::class, $visitId)->willReturn($visit); $findVisit = $this->em->find(Visit::class, $visitId)->willReturn($visit);
$argumentWithExpectedChannel = Argument::that(fn (string $channel) => contains($expectedChannels, $channel)); $argumentWithExpectedChannels = Argument::that(
static fn (string $channel) => contains($expectedChannels, $channel),
);
($this->listener)(new VisitLocated($visitId)); ($this->listener)(new VisitLocated($visitId));
$findVisit->shouldHaveBeenCalledOnce(); $findVisit->shouldHaveBeenCalledOnce();
$this->channel->exchange_declare($argumentWithExpectedChannel, Argument::cetera())->shouldHaveBeenCalledTimes( $this->helper->publishPayloadInQueue(
count($expectedChannels), Argument::type('array'),
); $argumentWithExpectedChannels,
$this->channel->queue_declare($argumentWithExpectedChannel, Argument::cetera())->shouldHaveBeenCalledTimes(
count($expectedChannels),
);
$this->channel->queue_bind(
$argumentWithExpectedChannel,
$argumentWithExpectedChannel,
)->shouldHaveBeenCalledTimes(count($expectedChannels)); )->shouldHaveBeenCalledTimes(count($expectedChannels));
$this->channel->basic_publish(Argument::any(), $argumentWithExpectedChannel)->shouldHaveBeenCalledTimes(
count($expectedChannels),
);
$this->channel->close()->shouldHaveBeenCalledOnce();
$this->connection->reconnect()->shouldHaveBeenCalledOnce();
$this->connection->close()->shouldHaveBeenCalledOnce();
$this->logger->debug(Argument::cetera())->shouldNotHaveBeenCalled(); $this->logger->debug(Argument::cetera())->shouldNotHaveBeenCalled();
} }
@ -154,7 +135,7 @@ class NotifyVisitToRabbitMqTest extends TestCase
{ {
$visitId = '123'; $visitId = '123';
$findVisit = $this->em->find(Visit::class, $visitId)->willReturn(Visit::forBasePath(Visitor::emptyInstance())); $findVisit = $this->em->find(Visit::class, $visitId)->willReturn(Visit::forBasePath(Visitor::emptyInstance()));
$channel = $this->connection->channel()->willThrow($e); $channel = $this->helper->publishPayloadInQueue(Argument::cetera())->willThrow($e);
($this->listener)(new VisitLocated($visitId)); ($this->listener)(new VisitLocated($visitId));
@ -162,11 +143,8 @@ class NotifyVisitToRabbitMqTest extends TestCase
'Error while trying to notify RabbitMQ with new visit. {e}', 'Error while trying to notify RabbitMQ with new visit. {e}',
['e' => $e], ['e' => $e],
)->shouldHaveBeenCalledOnce(); )->shouldHaveBeenCalledOnce();
$this->connection->close()->shouldHaveBeenCalledOnce();
$this->connection->reconnect()->shouldHaveBeenCalledOnce();
$findVisit->shouldHaveBeenCalledOnce(); $findVisit->shouldHaveBeenCalledOnce();
$channel->shouldHaveBeenCalledOnce(); $channel->shouldHaveBeenCalledOnce();
$this->channel->close()->shouldNotHaveBeenCalled();
} }
public function provideExceptions(): iterable public function provideExceptions(): iterable