From e9ec32b3c3ba2e33c21d8c4e823323b6b74a196d Mon Sep 17 00:00:00 2001 From: Alejandro Celaya Date: Fri, 26 Aug 2022 14:59:27 +0200 Subject: [PATCH] Added support to dispatch async event listeners as RoadRunner jobs --- .dockerignore | 1 + bin/roadrunner-worker.php | 32 ++++++++++++------- composer.json | 2 +- config/roadrunner/.rr.dev.yml | 18 ++++++++--- config/roadrunner/.rr.yml | 12 ++++++- .../Event/AbstractVisitEvent.php | 8 ++++- .../EventDispatcher/Event/ShortUrlCreated.php | 8 ++++- 7 files changed, 61 insertions(+), 20 deletions(-) diff --git a/.dockerignore b/.dockerignore index 870f3610..ac530ed3 100644 --- a/.dockerignore +++ b/.dockerignore @@ -22,4 +22,5 @@ infection* **/test* build* **/.* +!config/roadrunner/.rr.yml bin/helper diff --git a/bin/roadrunner-worker.php b/bin/roadrunner-worker.php index a8d3506a..7d65f250 100644 --- a/bin/roadrunner-worker.php +++ b/bin/roadrunner-worker.php @@ -7,25 +7,33 @@ use Psr\Container\ContainerInterface; use Psr\Http\Message\ServerRequestFactoryInterface; use Psr\Http\Message\StreamFactoryInterface; use Psr\Http\Message\UploadedFileFactoryInterface; +use Shlinkio\Shlink\EventDispatcher\RoadRunner\RoadRunnerTaskConsumerToListener; use Spiral\RoadRunner\Http\PSR7Worker; use Spiral\RoadRunner\Worker; (static function (): void { + $rrMode = getenv('RR_MODE'); /** @var ContainerInterface $container */ $container = include __DIR__ . '/../config/container.php'; - $app = $container->get(Application::class); - $worker = new PSR7Worker( - Worker::create(), - $container->get(ServerRequestFactoryInterface::class), - $container->get(StreamFactoryInterface::class), - $container->get(UploadedFileFactoryInterface::class), - ); - while ($req = $worker->waitRequest()) { - try { - $worker->respond($app->handle($req)); - } catch (Throwable $throwable) { - $worker->getWorker()->error((string) $throwable); + if ($rrMode === 'http') { + // This was spin-up as a web worker + $app = $container->get(Application::class); + $worker = new PSR7Worker( + Worker::create(), + $container->get(ServerRequestFactoryInterface::class), + $container->get(StreamFactoryInterface::class), + $container->get(UploadedFileFactoryInterface::class), + ); + + while ($req = $worker->waitRequest()) { + try { + $worker->respond($app->handle($req)); + } catch (Throwable $e) { + $worker->getWorker()->error((string) $e); + } } + } else { + $container->get(RoadRunnerTaskConsumerToListener::class)->listenForTasks(); } })(); diff --git a/composer.json b/composer.json index f9d3e27a..e549a8f6 100644 --- a/composer.json +++ b/composer.json @@ -45,7 +45,7 @@ "ramsey/uuid": "^4.3", "shlinkio/shlink-common": "^5.0", "shlinkio/shlink-config": "dev-main#24ccd64 as 2.1", - "shlinkio/shlink-event-dispatcher": "^2.5", + "shlinkio/shlink-event-dispatcher": "dev-feature/roadrunner-support", "shlinkio/shlink-importer": "^4.0", "shlinkio/shlink-installer": "^8.1", "shlinkio/shlink-ip-geolocation": "^3.0", diff --git a/config/roadrunner/.rr.dev.yml b/config/roadrunner/.rr.dev.yml index af667e0a..389ed003 100644 --- a/config/roadrunner/.rr.dev.yml +++ b/config/roadrunner/.rr.dev.yml @@ -1,5 +1,8 @@ version: '2.7' +rpc: + listen: tcp://127.0.0.1:6001 + server: command: 'php ../../bin/roadrunner-worker.php' @@ -11,7 +14,7 @@ http: - .php - .htaccess pool: - num_workers: 1 + num_workers: 3 supervisor: max_worker_memory: 100 @@ -19,15 +22,22 @@ jobs: pool: num_workers: 2 max_worker_memory: 100 - consume: { } + timeout: 300 + consume: ['shlink'] + pipelines: + shlink: + driver: memory + config: + priority: 10 + prefetch: 10 logs: mode: development channels: http: - level: debug # Log all http requests, set to info to disable + level: debug server: - level: debug # Everything written to worker stderr is logged + level: debug metrics: level: debug diff --git a/config/roadrunner/.rr.yml b/config/roadrunner/.rr.yml index ebfe9673..032989b9 100644 --- a/config/roadrunner/.rr.yml +++ b/config/roadrunner/.rr.yml @@ -1,5 +1,8 @@ version: '2.7' +rpc: + listen: tcp://127.0.0.1:6001 + server: command: 'php -dopcache.enable_cli=1 -dopcache.validate_timestamps=0 ../../bin/roadrunner-worker.php' @@ -16,10 +19,17 @@ http: max_worker_memory: 100 jobs: + timeout: 300 pool: num_workers: 16 # TODO Make configurable max_worker_memory: 100 - consume: { } + consume: ['shlink'] + pipelines: + shlink: + driver: memory + config: + priority: 10 + prefetch: 10 logs: mode: production diff --git a/module/Core/src/EventDispatcher/Event/AbstractVisitEvent.php b/module/Core/src/EventDispatcher/Event/AbstractVisitEvent.php index 6fadaa5d..0d41b7d1 100644 --- a/module/Core/src/EventDispatcher/Event/AbstractVisitEvent.php +++ b/module/Core/src/EventDispatcher/Event/AbstractVisitEvent.php @@ -5,8 +5,9 @@ declare(strict_types=1); namespace Shlinkio\Shlink\Core\EventDispatcher\Event; use JsonSerializable; +use Shlinkio\Shlink\EventDispatcher\Util\JsonUnserializable; -abstract class AbstractVisitEvent implements JsonSerializable +abstract class AbstractVisitEvent implements JsonSerializable, JsonUnserializable { public function __construct(public readonly string $visitId) { @@ -16,4 +17,9 @@ abstract class AbstractVisitEvent implements JsonSerializable { return ['visitId' => $this->visitId]; } + + public static function fromPayload(array $payload): self + { + return new static($payload['visitId'] ?? ''); + } } diff --git a/module/Core/src/EventDispatcher/Event/ShortUrlCreated.php b/module/Core/src/EventDispatcher/Event/ShortUrlCreated.php index 9786808f..b6ab1a0c 100644 --- a/module/Core/src/EventDispatcher/Event/ShortUrlCreated.php +++ b/module/Core/src/EventDispatcher/Event/ShortUrlCreated.php @@ -5,8 +5,9 @@ declare(strict_types=1); namespace Shlinkio\Shlink\Core\EventDispatcher\Event; use JsonSerializable; +use Shlinkio\Shlink\EventDispatcher\Util\JsonUnserializable; -final class ShortUrlCreated implements JsonSerializable +final class ShortUrlCreated implements JsonSerializable, JsonUnserializable { public function __construct(public readonly string $shortUrlId) { @@ -18,4 +19,9 @@ final class ShortUrlCreated implements JsonSerializable 'shortUrlId' => $this->shortUrlId, ]; } + + public static function fromPayload(array $payload): self + { + return new self($payload['shortUrlId'] ?? ''); + } }