diff --git a/config/autoload/cli.global.php b/config/autoload/cli.global.php index 08644ea..29b0bd0 100644 --- a/config/autoload/cli.global.php +++ b/config/autoload/cli.global.php @@ -3,6 +3,8 @@ declare(strict_types=1); use Dot\Cli\FileLockerInterface; +use Queue\Swoole\Command\GetFailedMessagesCommand; +use Queue\Swoole\Command\GetProcessedMessagesCommand; use Queue\Swoole\Command\StartCommand; use Queue\Swoole\Command\StopCommand; use Symfony\Component\Messenger\Command\ConsumeMessagesCommand; @@ -13,10 +15,12 @@ 'version' => '1.0.0', 'name' => 'DotKernel CLI', 'commands' => [ - "swoole:start" => StartCommand::class, - "swoole:stop" => StopCommand::class, - "messenger:start" => ConsumeMessagesCommand::class, - "messenger:debug" => DebugCommand::class, + "swoole:start" => StartCommand::class, + "swoole:stop" => StopCommand::class, + "messenger:start" => ConsumeMessagesCommand::class, + "messenger:debug" => DebugCommand::class, + "messenger:processed" => GetProcessedMessagesCommand::class, + "messenger:failed" => GetFailedMessagesCommand::class, ], ], FileLockerInterface::class => [ diff --git a/config/autoload/local.php.dist b/config/autoload/local.php.dist index 11fc0cb..429a793 100644 --- a/config/autoload/local.php.dist +++ b/config/autoload/local.php.dist @@ -46,4 +46,10 @@ return [ 'eof' => "\n", ], ], + //delay time until the message is added back to the queue if an error occurs during processing + 'fail-safe' => [ + 'first_retry' => 3600000, // 1h + 'second_retry' => 43200000, // 12h + 'third_retry' => 86400000, // 24h + ], ]; diff --git a/src/App/Message/ExampleMessageHandler.php b/src/App/Message/ExampleMessageHandler.php index e2414a0..2d7ddf6 100644 --- a/src/App/Message/ExampleMessageHandler.php +++ b/src/App/Message/ExampleMessageHandler.php @@ -6,14 +6,19 @@ use Dot\DependencyInjection\Attribute\Inject; use Dot\Log\Logger; +use Symfony\Component\Messenger\Exception\ExceptionInterface; +use Symfony\Component\Messenger\MessageBusInterface; +use Symfony\Component\Messenger\Stamp\DelayStamp; class ExampleMessageHandler { #[Inject( + MessageBusInterface::class, 'dot-log.queue-log', 'config', )] public function __construct( + protected MessageBusInterface $bus, protected Logger $logger, protected array $config, ) { @@ -21,6 +26,42 @@ public function __construct( public function __invoke(ExampleMessage $message): void { - $this->logger->info("message: " . $message->getPayload()['foo']); + try { + // Throwing an exception to satisfy PHPStan (replace with own code) + throw new \Exception("Failed to execute"); + } catch (\Throwable $exception) { + $payload = $message->getPayload(); + $this->logger->error($payload['foo'] . ' failed with message: ' + . $exception->getMessage() . ' after ' . ($payload['retry'] ?? 0) . ' retries'); + $this->retry($payload); + } + } + + /** + * @throws ExceptionInterface + */ + public function retry(array $payload): void + { + if (! isset($payload['retry'])) { + $this->bus->dispatch(new ExampleMessage(["foo" => $payload['foo'], 'retry' => 1]), [ + new DelayStamp($this->config['fail-safe']['first_retry']), + ]); + } else { + $retry = $payload['retry']; + switch ($retry) { + case 1: + $delay = $this->config['fail-safe']['second_retry']; + $this->bus->dispatch(new ExampleMessage(["foo" => $payload['foo'], 'retry' => ++$retry]), [ + new DelayStamp($delay), + ]); + break; + case 2: + $delay = $this->config['fail-safe']['third_retry']; + $this->bus->dispatch(new ExampleMessage(["foo" => $payload['foo'], 'retry' => ++$retry]), [ + new DelayStamp($delay), + ]); + break; + } + } } } diff --git a/src/Swoole/Command/GetFailedMessagesCommand.php b/src/Swoole/Command/GetFailedMessagesCommand.php new file mode 100644 index 0000000..afbc893 --- /dev/null +++ b/src/Swoole/Command/GetFailedMessagesCommand.php @@ -0,0 +1,95 @@ +setDescription('Get processing failure messages.') + ->addOption('start', null, InputOption::VALUE_OPTIONAL, 'Start timestamp (Y-m-d H:i:s)') + ->addOption('end', null, InputOption::VALUE_OPTIONAL, 'End timestamp (Y-m-d H:i:s)') + ->addOption('limit', null, InputOption::VALUE_OPTIONAL, 'Limit in days'); + } + + protected function execute(InputInterface $input, OutputInterface $output): int + { + $start = $input->getOption('start'); + $end = $input->getOption('end'); + $limit = $input->getOption('limit'); + + if (! $end) { + $end = date('Y-m-d H:i:s'); + } elseif (! preg_match('/\d{2}:\d{2}:\d{2}/', $end)) { + $end .= ' 23:59:59'; + } + + if ($limit && is_numeric($limit) && ! $start) { + $start = date('Y-m-d H:i:s', strtotime("-{$limit} days", strtotime($end))); + } elseif ($start && ! preg_match('/\d{2}:\d{2}:\d{2}/', $start)) { + $start .= ' 00:00:00'; + } + + $startTimestamp = $start ? strtotime($start) : null; + $endTimestamp = $end ? strtotime($end) : null; + + $logPath = 'log/queue-log.log'; + $lines = file($logPath, FILE_IGNORE_NEW_LINES | FILE_SKIP_EMPTY_LINES); + + foreach ($lines as $line) { + $entry = json_decode($line, true); + + if (! $entry || ! isset($entry['levelName'], $entry['timestamp'])) { + continue; + } + + if (strtolower($entry['levelName']) !== 'error') { + continue; + } + + $logTimestamp = strtotime($entry['timestamp']); + if ( + ($startTimestamp && $logTimestamp < $startTimestamp) || + ($endTimestamp && $logTimestamp > $endTimestamp) + ) { + continue; + } + + $output->writeln($line); + } + + return Command::SUCCESS; + } +} diff --git a/src/Swoole/Command/GetProcessedMessagesCommand.php b/src/Swoole/Command/GetProcessedMessagesCommand.php new file mode 100644 index 0000000..0c559bf --- /dev/null +++ b/src/Swoole/Command/GetProcessedMessagesCommand.php @@ -0,0 +1,95 @@ +setDescription('Get successfully processed messages') + ->addOption('start', null, InputOption::VALUE_OPTIONAL, 'Start timestamp (Y-m-d H:i:s)') + ->addOption('end', null, InputOption::VALUE_OPTIONAL, 'End timestamp (Y-m-d H:i:s)') + ->addOption('limit', null, InputOption::VALUE_OPTIONAL, 'Limit in days'); + } + + protected function execute(InputInterface $input, OutputInterface $output): int + { + $start = $input->getOption('start'); + $end = $input->getOption('end'); + $limit = $input->getOption('limit'); + + if (! $end) { + $end = date('Y-m-d H:i:s'); + } elseif (! preg_match('/\d{2}:\d{2}:\d{2}/', $end)) { + $end .= ' 23:59:59'; + } + + if ($limit && is_numeric($limit) && ! $start) { + $start = date('Y-m-d H:i:s', strtotime("-{$limit} days", strtotime($end))); + } elseif ($start && ! preg_match('/\d{2}:\d{2}:\d{2}/', $start)) { + $start .= ' 00:00:00'; + } + + $startTimestamp = $start ? strtotime($start) : null; + $endTimestamp = $end ? strtotime($end) : null; + + $logPath = 'log/queue-log.log'; + $lines = file($logPath, FILE_IGNORE_NEW_LINES | FILE_SKIP_EMPTY_LINES); + + foreach ($lines as $line) { + $entry = json_decode($line, true); + + if (! $entry || ! isset($entry['levelName'], $entry['timestamp'])) { + continue; + } + + if (strtolower($entry['levelName']) !== 'info') { + continue; + } + + $logTimestamp = strtotime($entry['timestamp']); + if ( + ($startTimestamp && $logTimestamp < $startTimestamp) || + ($endTimestamp && $logTimestamp > $endTimestamp) + ) { + continue; + } + + $output->writeln($line); + } + + return Command::SUCCESS; + } +} diff --git a/src/Swoole/ConfigProvider.php b/src/Swoole/ConfigProvider.php index a468298..35784dd 100644 --- a/src/Swoole/ConfigProvider.php +++ b/src/Swoole/ConfigProvider.php @@ -4,8 +4,11 @@ namespace Queue\Swoole; +use Dot\DependencyInjection\Factory\AttributedServiceFactory; use Queue\Swoole\Command\Factory\StartCommandFactory; use Queue\Swoole\Command\Factory\StopCommandFactory; +use Queue\Swoole\Command\GetFailedMessagesCommand; +use Queue\Swoole\Command\GetProcessedMessagesCommand; use Queue\Swoole\Command\StartCommand; use Queue\Swoole\Command\StopCommand; use Queue\Swoole\Delegators\TCPServerDelegator; @@ -27,10 +30,12 @@ public function getDependencies(): array TCPSwooleServer::class => [TCPServerDelegator::class], ], "factories" => [ - TCPSwooleServer::class => ServerFactory::class, - PidManager::class => PidManagerFactory::class, - StartCommand::class => StartCommandFactory::class, - StopCommand::class => StopCommandFactory::class, + TCPSwooleServer::class => ServerFactory::class, + PidManager::class => PidManagerFactory::class, + StartCommand::class => StartCommandFactory::class, + StopCommand::class => StopCommandFactory::class, + GetProcessedMessagesCommand::class => AttributedServiceFactory::class, + GetFailedMessagesCommand::class => AttributedServiceFactory::class, ], "aliases" => [], ];