Message sending is now a job

This commit is contained in:
James Cole 2020-12-05 07:01:26 +01:00
parent deb829dfdb
commit 9576806765
No known key found for this signature in database
GPG Key ID: B5669F9493CDE38D
8 changed files with 105 additions and 98 deletions

View File

@ -104,7 +104,6 @@ class StandardMessageGenerator implements MessageGeneratorInterface
foreach ($this->webhooks as $webhook) { foreach ($this->webhooks as $webhook) {
$this->runWebhook($webhook); $this->runWebhook($webhook);
} }
//event(new RequestedSendWebhookMessages);
} }
/** /**

View File

@ -22,6 +22,7 @@ declare(strict_types=1);
namespace FireflyIII\Handlers\Events; namespace FireflyIII\Handlers\Events;
use FireflyIII\Events\RequestedSendWebhookMessages;
use FireflyIII\Events\StoredTransactionGroup; use FireflyIII\Events\StoredTransactionGroup;
use FireflyIII\Generator\Webhook\MessageGeneratorInterface; use FireflyIII\Generator\Webhook\MessageGeneratorInterface;
use FireflyIII\Models\TransactionJournal; use FireflyIII\Models\TransactionJournal;
@ -92,6 +93,9 @@ class StoredGroupEventHandler
$engine->setObjects(new Collection([$group])); $engine->setObjects(new Collection([$group]));
// tell the generator to generate the messages // tell the generator to generate the messages
$engine->generateMessages(); $engine->generateMessages();
// trigger event to send them:
event(new RequestedSendWebhookMessages);
} }
} }

View File

@ -22,6 +22,7 @@ declare(strict_types=1);
namespace FireflyIII\Handlers\Events; namespace FireflyIII\Handlers\Events;
use FireflyIII\Events\RequestedSendWebhookMessages;
use FireflyIII\Events\UpdatedTransactionGroup; use FireflyIII\Events\UpdatedTransactionGroup;
use FireflyIII\Generator\Webhook\MessageGeneratorInterface; use FireflyIII\Generator\Webhook\MessageGeneratorInterface;
use FireflyIII\Models\Account; use FireflyIII\Models\Account;
@ -125,8 +126,10 @@ class UpdatedGroupEventHandler
/** @var MessageGeneratorInterface $engine */ /** @var MessageGeneratorInterface $engine */
$engine = app(MessageGeneratorInterface::class); $engine = app(MessageGeneratorInterface::class);
$engine->setUser($user); $engine->setUser($user);
$engine->setTransactionGroups(new Collection([$group])); $engine->setObjects(new Collection([$group]));
$engine->setTrigger(Webhook::TRIGGER_UPDATE_TRANSACTION); $engine->setTrigger(Webhook::TRIGGER_UPDATE_TRANSACTION);
$engine->generateMessages(); $engine->generateMessages();
event(new RequestedSendWebhookMessages);
} }
} }

View File

@ -22,14 +22,8 @@
namespace FireflyIII\Handlers\Events; namespace FireflyIII\Handlers\Events;
use Exception; use FireflyIII\Jobs\SendWebhookMessage;
use FireflyIII\Helpers\Webhook\SignatureGeneratorInterface;
use FireflyIII\Models\WebhookAttempt;
use FireflyIII\Models\WebhookMessage; use FireflyIII\Models\WebhookMessage;
use FireflyIII\Services\Webhook\WebhookSenderInterface;
use GuzzleHttp\Client;
use GuzzleHttp\Exception\ClientException;
use JsonException;
use Log; use Log;
/** /**
@ -42,6 +36,7 @@ class WebhookEventHandler
*/ */
public function sendWebhookMessages(): void public function sendWebhookMessages(): void
{ {
// kick offf the job!
$messages = WebhookMessage $messages = WebhookMessage
::where('webhook_messages.sent', 0) ::where('webhook_messages.sent', 0)
->where('webhook_messages.errored', 0) ->where('webhook_messages.errored', 0)
@ -52,10 +47,8 @@ class WebhookEventHandler
} }
)->splice(0, 3); )->splice(0, 3);
Log::debug(sprintf('Found %d webhook message(s) ready to be send.', $messages->count())); Log::debug(sprintf('Found %d webhook message(s) ready to be send.', $messages->count()));
foreach ($messages as $message) {
$sender =app(WebhookSenderInterface::class); SendWebhookMessage::dispatch($message)->afterResponse();
$sender->setMessages($messages); }
$sender->send();
} }
} }

View File

@ -0,0 +1,44 @@
<?php
namespace FireflyIII\Jobs;
use FireflyIII\Models\WebhookMessage;
use FireflyIII\Services\Webhook\WebhookSenderInterface;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
/**
* Class SendWebhookMessage
*/
class SendWebhookMessage implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
private WebhookMessage $message;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct(WebhookMessage $message)
{
$this->message = $message;
}
/**
* Execute the job.
*
* @return void
*/
public function handle(): void
{
// send job!
$sender = app(WebhookSenderInterface::class);
$sender->setMessage($this->message);
$sender->send();
}
}

View File

@ -21,21 +21,19 @@
namespace FireflyIII\Services\Webhook; namespace FireflyIII\Services\Webhook;
use FireflyIII\Exceptions\FireflyException;
use FireflyIII\Helpers\Webhook\SignatureGeneratorInterface; use FireflyIII\Helpers\Webhook\SignatureGeneratorInterface;
use FireflyIII\Models\WebhookAttempt;
use FireflyIII\Models\WebhookMessage; use FireflyIII\Models\WebhookMessage;
use GuzzleHttp\Client; use GuzzleHttp\Client;
use GuzzleHttp\Exception\ClientException; use GuzzleHttp\Exception\ClientException;
use Illuminate\Support\Collection;
use Log;
use JsonException; use JsonException;
use Log;
/** /**
* Class StandardWebhookSender * Class StandardWebhookSender
*/ */
class StandardWebhookSender implements WebhookSenderInterface class StandardWebhookSender implements WebhookSenderInterface
{ {
private Collection $messages; private WebhookMessage $message;
private int $version = 1; private int $version = 1;
/** /**
@ -49,49 +47,32 @@ class StandardWebhookSender implements WebhookSenderInterface
/** /**
* @inheritDoc * @inheritDoc
*/ */
public function setMessages(Collection $messages): void public function setMessage(WebhookMessage $message): void
{ {
$this->messages = $messages; $this->message = $message;
} }
/** /**
* @inheritDoc * @inheritDoc
*/ */
public function send(): void public function send(): void
{
/** @var WebhookMessage $message */
foreach ($this->messages as $message) {
try {
$this->sendMessage($message);
} catch (FireflyException $e) {
// TODO log attempt and make WebhookAttempt
}
}
}
/**
* @param WebhookMessage $message
*
* @throws \GuzzleHttp\Exception\GuzzleException
*/
private function sendMessage(WebhookMessage $message): void
{ {
// have the signature generator generate a signature. If it fails, the error thrown will // have the signature generator generate a signature. If it fails, the error thrown will
// end up in send() to be caught. // end up in send() to be caught.
$signatureGenerator = app(SignatureGeneratorInterface::class); $signatureGenerator = app(SignatureGeneratorInterface::class);
$signature = $signatureGenerator->generate($message); $signature = $signatureGenerator->generate($this->message);
Log::debug(sprintf('Trying to send webhook message #%d', $message->id)); Log::debug(sprintf('Trying to send webhook message #%d', $this->message->id));
try { try {
$json = json_encode($message->message, JSON_THROW_ON_ERROR); $json = json_encode($this->message->message, JSON_THROW_ON_ERROR);
} catch (JsonException $e) { } catch (JsonException $e) {
// TODO throw Firefly Exception // TODO throw Firefly Exception
// $attempt = new WebhookAttempt; // $attempt = new WebhookAttempt;
// $attempt->webhookMessage()->associate($message); // $attempt->webhookMessage()->associate($this->message);
// $attempt->status_code = 0; // $attempt->status_code = 0;
// $attempt->logs = sprintf('Json error: %s', $e->getMessage()); // $attempt->logs = sprintf('Json error: %s', $e->getMessage());
// $attempt->save(); // $attempt->save();
return; return;
} }
@ -107,32 +88,32 @@ class StandardWebhookSender implements WebhookSenderInterface
], ],
]; ];
$client = new Client; $client = new Client;
//$logs = $message->logs ?? []; //$logs = $this->message->logs ?? [];
try { try {
$res = $client->request('POST', $message->webhook->url, $options); $res = $client->request('POST', $this->message->webhook->url, $options);
$message->sent = true; $this->message->sent = true;
} catch (ClientException|Exception $e) { } catch (ClientException | Exception $e) {
Log::error($e->getMessage()); Log::error($e->getMessage());
Log::error($e->getTraceAsString()); Log::error($e->getTraceAsString());
//$logs[] = sprintf('%s: %s', date('Y-m-d H:i:s'), $e->getMessage()); //$logs[] = sprintf('%s: %s', date('Y-m-d H:i:s'), $e->getMessage());
$message->errored = true; $this->message->errored = true;
$message->sent = false; $this->message->sent = false;
} }
$message->save(); $this->message->save();
// $attempt = new WebhookAttempt; // $attempt = new WebhookAttempt;
// $attempt->webhookMessage()->associate($message); // $attempt->webhookMessage()->associate($this->message);
// $attempt->status_code = $res->getStatusCode(); // $attempt->status_code = $res->getStatusCode();
// $attempt->logs = ''; // $attempt->logs = '';
// $attempt->response = (string)$res->getBody(); // $attempt->response = (string)$res->getBody();
// $attempt->save(); // $attempt->save();
Log::debug(sprintf('Webhook message #%d was sent. Status code %d', $message->id, $res->getStatusCode())); Log::debug(sprintf('Webhook message #%d was sent. Status code %d', $this->message->id, $res->getStatusCode()));
Log::debug(sprintf('Webhook request body size: %d bytes', strlen($json))); Log::debug(sprintf('Webhook request body size: %d bytes', strlen($json)));
Log::debug(sprintf('Response body: %s', $res->getBody())); Log::debug(sprintf('Response body: %s', $res->getBody()));
//$sender //$sender
//$this->sendMessageV0($message); //$this->sendMessageV0($this->message);
} }
} }

View File

@ -22,7 +22,6 @@
namespace FireflyIII\Services\Webhook; namespace FireflyIII\Services\Webhook;
use FireflyIII\Models\WebhookMessage; use FireflyIII\Models\WebhookMessage;
use Illuminate\Support\Collection;
/** /**
* Interface WebhookSenderInterface * Interface WebhookSenderInterface
@ -35,9 +34,9 @@ interface WebhookSenderInterface
public function getVersion(): int; public function getVersion(): int;
/** /**
* @param Collection $messages * @param WebhookMessage $message
*/ */
public function setMessages(Collection $messages): void; public function setMessage(WebhookMessage $message): void;
/** /**
* *

View File

@ -1,41 +1,19 @@
<?php <?php
/**
* queue.php
* Copyright (c) 2019 james@firefly-iii.org.
*
* This file is part of Firefly III (https://github.com/firefly-iii).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
declare(strict_types=1);
return [ return [
/* /*
|-------------------------------------------------------------------------- |--------------------------------------------------------------------------
| Default Queue Driver | Default Queue Connection Name
|-------------------------------------------------------------------------- |--------------------------------------------------------------------------
| |
| Laravel's queue API supports an assortment of back-ends via a single | Laravel's queue API supports an assortment of back-ends via a single
| API, giving you convenient access to each back-end using the same | API, giving you convenient access to each back-end using the same
| syntax for each one. Here you may set the default queue driver. | syntax for every one. Here you may define a default connection.
|
| Supported: "sync", "database", "beanstalkd", "sqs", "redis", "null"
| |
*/ */
'default' => envNonEmpty('QUEUE_DRIVER', 'sync'), 'default' => env('QUEUE_CONNECTION', 'sync'),
/* /*
|-------------------------------------------------------------------------- |--------------------------------------------------------------------------
@ -46,6 +24,8 @@ return [
| is used by your application. A default configuration has been added | is used by your application. A default configuration has been added
| for each back-end shipped with Laravel. You are free to add more. | for each back-end shipped with Laravel. You are free to add more.
| |
| Drivers: "sync", "database", "beanstalkd", "sqs", "redis", "null"
|
*/ */
'connections' => [ 'connections' => [
@ -66,22 +46,25 @@ return [
'host' => 'localhost', 'host' => 'localhost',
'queue' => 'default', 'queue' => 'default',
'retry_after' => 90, 'retry_after' => 90,
'block_for' => 0,
], ],
'sqs' => [ 'sqs' => [
'driver' => 'sqs', 'driver' => 'sqs',
'key' => 'your-public-key', 'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => 'your-secret-key', 'secret' => env('AWS_SECRET_ACCESS_KEY'),
'prefix' => 'https://sqs.us-east-1.amazonaws.com/your-account-id', 'prefix' => env('SQS_PREFIX', 'https://sqs.us-east-1.amazonaws.com/your-account-id'),
'queue' => 'your-queue-name', 'queue' => env('SQS_QUEUE', 'your-queue-name'),
'region' => 'us-east-1', 'suffix' => env('SQS_SUFFIX'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
], ],
'redis' => [ 'redis' => [
'driver' => 'redis', 'driver' => 'redis',
'connection' => 'default', 'connection' => 'default',
'queue' => 'default', 'queue' => env('REDIS_QUEUE', 'default'),
'retry_after' => 90, 'retry_after' => 90,
'block_for' => null,
], ],
], ],
@ -98,7 +81,8 @@ return [
*/ */
'failed' => [ 'failed' => [
'database' => envNonEmpty('DB_CONNECTION', 'pgsql'), 'driver' => env('QUEUE_FAILED_DRIVER', 'database-uuids'),
'database' => env('DB_CONNECTION', 'mysql'),
'table' => 'failed_jobs', 'table' => 'failed_jobs',
], ],