Added support to dispatch async event listeners as RoadRunner jobs

This commit is contained in:
Alejandro Celaya 2022-08-26 14:59:27 +02:00
parent 4882bec118
commit e9ec32b3c3
7 changed files with 61 additions and 20 deletions

View File

@ -22,4 +22,5 @@ infection*
**/test* **/test*
build* build*
**/.* **/.*
!config/roadrunner/.rr.yml
bin/helper bin/helper

View File

@ -7,25 +7,33 @@ use Psr\Container\ContainerInterface;
use Psr\Http\Message\ServerRequestFactoryInterface; use Psr\Http\Message\ServerRequestFactoryInterface;
use Psr\Http\Message\StreamFactoryInterface; use Psr\Http\Message\StreamFactoryInterface;
use Psr\Http\Message\UploadedFileFactoryInterface; use Psr\Http\Message\UploadedFileFactoryInterface;
use Shlinkio\Shlink\EventDispatcher\RoadRunner\RoadRunnerTaskConsumerToListener;
use Spiral\RoadRunner\Http\PSR7Worker; use Spiral\RoadRunner\Http\PSR7Worker;
use Spiral\RoadRunner\Worker; use Spiral\RoadRunner\Worker;
(static function (): void { (static function (): void {
$rrMode = getenv('RR_MODE');
/** @var ContainerInterface $container */ /** @var ContainerInterface $container */
$container = include __DIR__ . '/../config/container.php'; $container = include __DIR__ . '/../config/container.php';
$app = $container->get(Application::class);
$worker = new PSR7Worker(
Worker::create(),
$container->get(ServerRequestFactoryInterface::class),
$container->get(StreamFactoryInterface::class),
$container->get(UploadedFileFactoryInterface::class),
);
while ($req = $worker->waitRequest()) { if ($rrMode === 'http') {
try { // This was spin-up as a web worker
$worker->respond($app->handle($req)); $app = $container->get(Application::class);
} catch (Throwable $throwable) { $worker = new PSR7Worker(
$worker->getWorker()->error((string) $throwable); Worker::create(),
$container->get(ServerRequestFactoryInterface::class),
$container->get(StreamFactoryInterface::class),
$container->get(UploadedFileFactoryInterface::class),
);
while ($req = $worker->waitRequest()) {
try {
$worker->respond($app->handle($req));
} catch (Throwable $e) {
$worker->getWorker()->error((string) $e);
}
} }
} else {
$container->get(RoadRunnerTaskConsumerToListener::class)->listenForTasks();
} }
})(); })();

View File

@ -45,7 +45,7 @@
"ramsey/uuid": "^4.3", "ramsey/uuid": "^4.3",
"shlinkio/shlink-common": "^5.0", "shlinkio/shlink-common": "^5.0",
"shlinkio/shlink-config": "dev-main#24ccd64 as 2.1", "shlinkio/shlink-config": "dev-main#24ccd64 as 2.1",
"shlinkio/shlink-event-dispatcher": "^2.5", "shlinkio/shlink-event-dispatcher": "dev-feature/roadrunner-support",
"shlinkio/shlink-importer": "^4.0", "shlinkio/shlink-importer": "^4.0",
"shlinkio/shlink-installer": "^8.1", "shlinkio/shlink-installer": "^8.1",
"shlinkio/shlink-ip-geolocation": "^3.0", "shlinkio/shlink-ip-geolocation": "^3.0",

View File

@ -1,5 +1,8 @@
version: '2.7' version: '2.7'
rpc:
listen: tcp://127.0.0.1:6001
server: server:
command: 'php ../../bin/roadrunner-worker.php' command: 'php ../../bin/roadrunner-worker.php'
@ -11,7 +14,7 @@ http:
- .php - .php
- .htaccess - .htaccess
pool: pool:
num_workers: 1 num_workers: 3
supervisor: supervisor:
max_worker_memory: 100 max_worker_memory: 100
@ -19,15 +22,22 @@ jobs:
pool: pool:
num_workers: 2 num_workers: 2
max_worker_memory: 100 max_worker_memory: 100
consume: { } timeout: 300
consume: ['shlink']
pipelines:
shlink:
driver: memory
config:
priority: 10
prefetch: 10
logs: logs:
mode: development mode: development
channels: channels:
http: http:
level: debug # Log all http requests, set to info to disable level: debug
server: server:
level: debug # Everything written to worker stderr is logged level: debug
metrics: metrics:
level: debug level: debug

View File

@ -1,5 +1,8 @@
version: '2.7' version: '2.7'
rpc:
listen: tcp://127.0.0.1:6001
server: server:
command: 'php -dopcache.enable_cli=1 -dopcache.validate_timestamps=0 ../../bin/roadrunner-worker.php' command: 'php -dopcache.enable_cli=1 -dopcache.validate_timestamps=0 ../../bin/roadrunner-worker.php'
@ -16,10 +19,17 @@ http:
max_worker_memory: 100 max_worker_memory: 100
jobs: jobs:
timeout: 300
pool: pool:
num_workers: 16 # TODO Make configurable num_workers: 16 # TODO Make configurable
max_worker_memory: 100 max_worker_memory: 100
consume: { } consume: ['shlink']
pipelines:
shlink:
driver: memory
config:
priority: 10
prefetch: 10
logs: logs:
mode: production mode: production

View File

@ -5,8 +5,9 @@ declare(strict_types=1);
namespace Shlinkio\Shlink\Core\EventDispatcher\Event; namespace Shlinkio\Shlink\Core\EventDispatcher\Event;
use JsonSerializable; use JsonSerializable;
use Shlinkio\Shlink\EventDispatcher\Util\JsonUnserializable;
abstract class AbstractVisitEvent implements JsonSerializable abstract class AbstractVisitEvent implements JsonSerializable, JsonUnserializable
{ {
public function __construct(public readonly string $visitId) public function __construct(public readonly string $visitId)
{ {
@ -16,4 +17,9 @@ abstract class AbstractVisitEvent implements JsonSerializable
{ {
return ['visitId' => $this->visitId]; return ['visitId' => $this->visitId];
} }
public static function fromPayload(array $payload): self
{
return new static($payload['visitId'] ?? '');
}
} }

View File

@ -5,8 +5,9 @@ declare(strict_types=1);
namespace Shlinkio\Shlink\Core\EventDispatcher\Event; namespace Shlinkio\Shlink\Core\EventDispatcher\Event;
use JsonSerializable; use JsonSerializable;
use Shlinkio\Shlink\EventDispatcher\Util\JsonUnserializable;
final class ShortUrlCreated implements JsonSerializable final class ShortUrlCreated implements JsonSerializable, JsonUnserializable
{ {
public function __construct(public readonly string $shortUrlId) public function __construct(public readonly string $shortUrlId)
{ {
@ -18,4 +19,9 @@ final class ShortUrlCreated implements JsonSerializable
'shortUrlId' => $this->shortUrlId, 'shortUrlId' => $this->shortUrlId,
]; ];
} }
public static function fromPayload(array $payload): self
{
return new self($payload['shortUrlId'] ?? '');
}
} }