Integrated PublishUpdatesGenerator in NotifyVisitToRabbitMq listener

This commit is contained in:
Alejandro Celaya 2022-07-27 17:41:48 +02:00
parent dada6aa3d1
commit da6aa1d697
3 changed files with 105 additions and 94 deletions

View File

@ -105,10 +105,10 @@ return [
],
EventDispatcher\RabbitMq\NotifyVisitToRabbitMq::class => [
RabbitMqPublishingHelper::class,
EventDispatcher\PublishingUpdatesGenerator::class,
'em',
'Logger_Shlink',
Visit\Transformer\OrphanVisitDataTransformer::class,
ShortUrl\Transformer\ShortUrlDataTransformer::class,
Options\RabbitMqOptions::class,
],
EventDispatcher\RabbitMq\NotifyNewShortUrlToRabbitMq::class => [

View File

@ -11,6 +11,7 @@ use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface;
use Shlinkio\Shlink\Common\UpdatePublishing\Update;
use Shlinkio\Shlink\Core\Entity\Visit;
use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated;
use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface;
use Shlinkio\Shlink\Core\EventDispatcher\Topic;
use Shlinkio\Shlink\Core\Options\RabbitMqOptions;
use Throwable;
@ -21,10 +22,10 @@ class NotifyVisitToRabbitMq
{
public function __construct(
private readonly PublishingHelperInterface $rabbitMqHelper,
private readonly PublishingUpdatesGeneratorInterface $updatesGenerator,
private readonly EntityManagerInterface $em,
private readonly LoggerInterface $logger,
private readonly DataTransformerInterface $orphanVisitTransformer,
private readonly DataTransformerInterface $shortUrlTransformer,
private readonly RabbitMqOptions $options,
) {
}
@ -45,50 +46,45 @@ class NotifyVisitToRabbitMq
return;
}
$queues = $this->determineQueuesToPublishTo($visit);
$payload = $this->visitToPayload($visit);
$updates = $this->determineUpdatesForVisit($visit);
try {
each($queues, fn (string $queue) => $this->rabbitMqHelper->publishUpdate(
Update::forTopicAndPayload($queue, $payload),
));
each($updates, fn (Update $update) => $this->rabbitMqHelper->publishUpdate($update));
} catch (Throwable $e) {
$this->logger->debug('Error while trying to notify RabbitMQ with new visit. {e}', ['e' => $e]);
}
}
/**
* @return string[]
* @return Update[]
*/
private function determineQueuesToPublishTo(Visit $visit): array
private function determineUpdatesForVisit(Visit $visit): array
{
if ($visit->isOrphan()) {
return [Topic::NEW_ORPHAN_VISIT->value];
}
return match (true) {
// This was defined incorrectly.
// According to the spec, both the visit and the short URL it belongs to, should be published.
// The shape should be ['visit' => [...], 'shortUrl' => ?[...]]
// However, this would be a breaking change, so we need a flag that determines the shape of the payload.
$this->options->legacyVisitsPublishing() && $visit->isOrphan() => [
Update::forTopicAndPayload(
Topic::NEW_ORPHAN_VISIT->value,
$this->orphanVisitTransformer->transform($visit),
),
],
$this->options->legacyVisitsPublishing() && ! $visit->isOrphan() => [
Update::forTopicAndPayload(Topic::NEW_VISIT->value, $visit->jsonSerialize()),
Update::forTopicAndPayload(
Topic::newShortUrlVisit($visit->getShortUrl()?->getShortCode()),
$visit->jsonSerialize(),
),
],
return [
Topic::NEW_VISIT->value,
Topic::newShortUrlVisit($visit->getShortUrl()?->getShortCode()),
];
}
private function visitToPayload(Visit $visit): array
{
// This was defined incorrectly.
// According to the spec, both the visit and the short URL it belongs to, should be published.
// The shape should be ['visit' => [...], 'shortUrl' => ?[...]]
// However, this would be a breaking change, so we need a flag that determines the shape of the payload.
if ($this->options->legacyVisitsPublishing()) {
return ! $visit->isOrphan() ? $visit->jsonSerialize() : $this->orphanVisitTransformer->transform($visit);
}
if ($visit->isOrphan()) {
return ['visit' => $this->orphanVisitTransformer->transform($visit)];
}
return [
'visit' => $visit->jsonSerialize(),
'shortUrl' => $this->shortUrlTransformer->transform($visit->getShortUrl()),
];
// Once the two deprecated cases above have been remove, replace this with a simple "if" and early return.
$visit->isOrphan() => [$this->updatesGenerator->newOrphanVisitUpdate($visit)],
default => [
$this->updatesGenerator->newShortUrlVisitUpdate($visit),
$this->updatesGenerator->newVisitUpdate($visit),
],
};
}
}

View File

@ -19,17 +19,17 @@ use Shlinkio\Shlink\Common\UpdatePublishing\Update;
use Shlinkio\Shlink\Core\Entity\ShortUrl;
use Shlinkio\Shlink\Core\Entity\Visit;
use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated;
use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface;
use Shlinkio\Shlink\Core\EventDispatcher\RabbitMq\NotifyVisitToRabbitMq;
use Shlinkio\Shlink\Core\Model\ShortUrlMeta;
use Shlinkio\Shlink\Core\Model\Visitor;
use Shlinkio\Shlink\Core\Options\RabbitMqOptions;
use Shlinkio\Shlink\Core\ShortUrl\Helper\ShortUrlStringifier;
use Shlinkio\Shlink\Core\ShortUrl\Transformer\ShortUrlDataTransformer;
use Shlinkio\Shlink\Core\Visit\Transformer\OrphanVisitDataTransformer;
use Throwable;
use function count;
use function Functional\contains;
use function Functional\each;
use function Functional\noop;
class NotifyVisitToRabbitMqTest extends TestCase
{
@ -37,6 +37,7 @@ class NotifyVisitToRabbitMqTest extends TestCase
private NotifyVisitToRabbitMq $listener;
private ObjectProphecy $helper;
private ObjectProphecy $updatesGenerator;
private ObjectProphecy $em;
private ObjectProphecy $logger;
private RabbitMqOptions $options;
@ -44,16 +45,17 @@ class NotifyVisitToRabbitMqTest extends TestCase
protected function setUp(): void
{
$this->helper = $this->prophesize(PublishingHelperInterface::class);
$this->updatesGenerator = $this->prophesize(PublishingUpdatesGeneratorInterface::class);
$this->em = $this->prophesize(EntityManagerInterface::class);
$this->logger = $this->prophesize(LoggerInterface::class);
$this->options = new RabbitMqOptions(['enabled' => true, 'legacy_visits_publishing' => true]);
$this->options = new RabbitMqOptions(['enabled' => true, 'legacy_visits_publishing' => false]);
$this->listener = new NotifyVisitToRabbitMq(
$this->helper->reveal(),
$this->updatesGenerator->reveal(),
$this->em->reveal(),
$this->logger->reveal(),
new OrphanVisitDataTransformer(),
new ShortUrlDataTransformer(new ShortUrlStringifier([])),
$this->options,
);
}
@ -97,14 +99,16 @@ class NotifyVisitToRabbitMqTest extends TestCase
{
$visitId = '123';
$findVisit = $this->em->find(Visit::class, $visitId)->willReturn($visit);
$argumentWithExpectedChannels = Argument::that(
static fn (Update $update) => contains($expectedChannels, $update->topic),
);
each($expectedChannels, function (string $method): void {
$this->updatesGenerator->{$method}(Argument::type(Visit::class))->willReturn(
Update::forTopicAndPayload('', []),
)->shouldBeCalledOnce();
});
($this->listener)(new VisitLocated($visitId));
$findVisit->shouldHaveBeenCalledOnce();
$this->helper->publishUpdate($argumentWithExpectedChannels)->shouldHaveBeenCalledTimes(
$this->helper->publishUpdate(Argument::type(Update::class))->shouldHaveBeenCalledTimes(
count($expectedChannels),
);
$this->logger->debug(Argument::cetera())->shouldNotHaveBeenCalled();
@ -114,7 +118,7 @@ class NotifyVisitToRabbitMqTest extends TestCase
{
$visitor = Visitor::emptyInstance();
yield 'orphan visit' => [Visit::forBasePath($visitor), ['https://shlink.io/new-orphan-visit']];
yield 'orphan visit' => [Visit::forBasePath($visitor), ['newOrphanVisitUpdate']];
yield 'non-orphan visit' => [
Visit::forValidShortUrl(
ShortUrl::fromMeta(ShortUrlMeta::fromRawData([
@ -123,7 +127,7 @@ class NotifyVisitToRabbitMqTest extends TestCase
])),
$visitor,
),
['https://shlink.io/new-visit', 'https://shlink.io/new-visit/bar'],
['newShortUrlVisitUpdate', 'newVisitUpdate'],
];
}
@ -135,6 +139,9 @@ class NotifyVisitToRabbitMqTest extends TestCase
{
$visitId = '123';
$findVisit = $this->em->find(Visit::class, $visitId)->willReturn(Visit::forBasePath(Visitor::emptyInstance()));
$generateUpdate = $this->updatesGenerator->newOrphanVisitUpdate(Argument::type(Visit::class))->willReturn(
Update::forTopicAndPayload('', []),
);
$publish = $this->helper->publishUpdate(Argument::cetera())->willThrow($e);
($this->listener)(new VisitLocated($visitId));
@ -144,6 +151,7 @@ class NotifyVisitToRabbitMqTest extends TestCase
['e' => $e],
)->shouldHaveBeenCalledOnce();
$findVisit->shouldHaveBeenCalledOnce();
$generateUpdate->shouldHaveBeenCalledOnce();
$publish->shouldHaveBeenCalledOnce();
}
@ -161,74 +169,81 @@ class NotifyVisitToRabbitMqTest extends TestCase
public function expectedPayloadIsPublishedDependingOnConfig(
bool $legacy,
Visit $visit,
callable $assertPayload,
callable $assert,
callable $setup,
): void {
$this->options->legacyVisitsPublishing = $legacy;
$visitId = '123';
$findVisit = $this->em->find(Visit::class, $visitId)->willReturn($visit);
$setup($this->updatesGenerator);
($this->listener)(new VisitLocated($visitId));
$findVisit->shouldHaveBeenCalledOnce();
$this->helper->publishUpdate(Argument::that($assertPayload))
->shouldHaveBeenCalled();
$assert($this->helper, $this->updatesGenerator);
}
public function provideLegacyPayloads(): iterable
{
yield 'non-legacy non-orphan visit' => [
yield 'legacy non-orphan visit' => [
true,
$visit = Visit::forValidShortUrl(ShortUrl::withLongUrl(''), Visitor::emptyInstance()),
function (Update $update) use ($visit): bool {
$payload = $update->payload;
Assert::assertEquals($payload, $visit->jsonSerialize());
Assert::assertArrayNotHasKey('visitedUrl', $payload);
Assert::assertArrayNotHasKey('type', $payload);
Assert::assertArrayNotHasKey('visit', $payload);
Assert::assertArrayNotHasKey('shortUrl', $payload);
function (ObjectProphecy|PublishingHelperInterface $helper) use ($visit): void {
$helper->publishUpdate(Argument::that(function (Update $update) use ($visit): bool {
$payload = $update->payload;
Assert::assertEquals($payload, $visit->jsonSerialize());
Assert::assertArrayNotHasKey('visitedUrl', $payload);
Assert::assertArrayNotHasKey('type', $payload);
Assert::assertArrayNotHasKey('visit', $payload);
Assert::assertArrayNotHasKey('shortUrl', $payload);
return true;
return true;
}));
},
noop(...),
];
yield 'legacy orphan visit' => [
true,
Visit::forBasePath(Visitor::emptyInstance()),
function (ObjectProphecy|PublishingHelperInterface $helper): void {
$helper->publishUpdate(Argument::that(function (Update $update): bool {
$payload = $update->payload;
Assert::assertArrayHasKey('visitedUrl', $payload);
Assert::assertArrayHasKey('type', $payload);
return true;
}));
},
noop(...),
];
yield 'non-legacy non-orphan visit' => [
false,
Visit::forValidShortUrl(ShortUrl::withLongUrl(''), Visitor::emptyInstance()),
function (ObjectProphecy|PublishingHelperInterface $helper): void {
$helper->publishUpdate(Argument::type(Update::class))->shouldHaveBeenCalledTimes(2);
},
function (ObjectProphecy|PublishingUpdatesGeneratorInterface $updatesGenerator): void {
$update = Update::forTopicAndPayload('', []);
$updatesGenerator->newOrphanVisitUpdate(Argument::cetera())->shouldNotBeCalled();
$updatesGenerator->newVisitUpdate(Argument::cetera())->willReturn($update)
->shouldBeCalledOnce();
$updatesGenerator->newShortUrlVisitUpdate(Argument::cetera())->willReturn($update)
->shouldBeCalledOnce();
},
];
yield 'non-legacy orphan visit' => [
true,
Visit::forBasePath(Visitor::emptyInstance()),
function (Update $update): bool {
$payload = $update->payload;
Assert::assertArrayHasKey('visitedUrl', $payload);
Assert::assertArrayHasKey('type', $payload);
return true;
},
];
yield 'legacy non-orphan visit' => [
false,
$visit = Visit::forValidShortUrl(ShortUrl::withLongUrl(''), Visitor::emptyInstance()),
function (Update $update) use ($visit): bool {
$payload = $update->payload;
Assert::assertArrayHasKey('visit', $payload);
Assert::assertArrayHasKey('shortUrl', $payload);
Assert::assertIsArray($payload['visit']);
Assert::assertEquals($payload['visit'], $visit->jsonSerialize());
Assert::assertArrayNotHasKey('visitedUrl', ['visit']);
Assert::assertArrayNotHasKey('type', ['visit']);
return true;
},
];
yield 'legacy orphan visit' => [
false,
Visit::forBasePath(Visitor::emptyInstance()),
function (Update $update): bool {
$payload = $update->payload;
Assert::assertArrayHasKey('visit', $payload);
Assert::assertArrayNotHasKey('shortUrl', $payload);
Assert::assertIsArray($payload['visit']);
Assert::assertArrayHasKey('visitedUrl', $payload['visit']);
Assert::assertArrayHasKey('type', $payload['visit']);
return true;
function (ObjectProphecy|PublishingHelperInterface $helper): void {
$helper->publishUpdate(Argument::type(Update::class))->shouldHaveBeenCalledOnce();
},
function (ObjectProphecy|PublishingUpdatesGeneratorInterface $updatesGenerator): void {
$update = Update::forTopicAndPayload('', []);
$updatesGenerator->newOrphanVisitUpdate(Argument::cetera())->willReturn($update)
->shouldBeCalledOnce();
$updatesGenerator->newVisitUpdate(Argument::cetera())->shouldNotBeCalled();
$updatesGenerator->newShortUrlVisitUpdate(Argument::cetera())->shouldNotBeCalled();
},
];
}