diff --git a/config/autoload/cli.global.php b/config/autoload/cli.global.php index f9b2d68..aefbeb9 100644 --- a/config/autoload/cli.global.php +++ b/config/autoload/cli.global.php @@ -5,6 +5,7 @@ use Dot\Cli\FileLockerInterface; use Queue\Swoole\Command\GetFailedMessagesCommand; use Queue\Swoole\Command\GetProcessedMessagesCommand; +use Queue\Swoole\Command\GetQueuedMessagesCommand; use Queue\Swoole\Command\StartCommand; use Queue\Swoole\Command\StopCommand; use Symfony\Component\Messenger\Command\ConsumeMessagesCommand; @@ -21,6 +22,7 @@ "messenger:debug" => DebugCommand::class, "processed" => GetProcessedMessagesCommand::class, "failed" => GetFailedMessagesCommand::class, + "inventory" => GetQueuedMessagesCommand::class, ], ], FileLockerInterface::class => [ diff --git a/docs/book/v1/commands.md b/docs/book/v1/commands.md index f0faaf5..94b428a 100644 --- a/docs/book/v1/commands.md +++ b/docs/book/v1/commands.md @@ -2,10 +2,11 @@ The commands available are: -1. `GetFailedMessagesCommand.php` - returns logs with messages that failed to process (levelName:error) -2. `GetProcessedMessagesCommand.php` - returns logs with messages that were successfully processed (levelName:info) +1. `GetFailedMessagesCommand.php (failed)` - returns logs with messages that failed to process (levelName:error) +2. `GetProcessedMessagesCommand.php (procecssed)` - returns logs with messages that were successfully processed (levelName:info) +3. `GetQueuedMessagesCommand (inventory)` - returns all queued messages from Redis stream 'messages' -Both commands are used to extract data that can be filtered by period from the log file. The commands can be run in two different ways: +The commands can be run in two different ways: ### CLI @@ -15,6 +16,8 @@ To run the commands via CLI, use the following syntax: `php bin/cli.php processed --start="yyyy-mm-dd" --end="yyyy-mm-dd" --limit=int` +`php bin/cli.php inventory` + ### TCP message To use commands using TCP messages the following messages can be used: @@ -29,4 +32,6 @@ In order to be able to test the `processed` command, by default when processing `echo "control" | socat -t1 - TCP:host:port` -Using `-t1` flag is not necessary but can be useful, it is used to set a timeout of n seconds for both reading and writing, after n second of inactivity, socat will terminate the connection. If the timeout is not set and the server does not respond or keep the connection open, the socat process could hang indefinitely. + > Using `-t1` flag is not necessary but can be useful, it is used to set a timeout of n seconds for both reading and writing, after n second of inactivity, socat will terminate the connection. If the timeout is not set and the server does not respond or keep the connection open, the socat process could hang indefinitely. + +`echo "inventory" | socat -t1 - TCP:host:port` diff --git a/src/Swoole/Command/GetQueuedMessagesCommand.php b/src/Swoole/Command/GetQueuedMessagesCommand.php new file mode 100644 index 0000000..a40227d --- /dev/null +++ b/src/Swoole/Command/GetQueuedMessagesCommand.php @@ -0,0 +1,68 @@ +redis = $redis; + } + + protected function configure(): void + { + $this->setDescription('Get all queued messages from Redis stream "messages"'); + } + + /** + * @throws RedisException + */ + protected function execute(InputInterface $input, OutputInterface $output): int + { + $entries = $this->redis->xRange('messages', '-', '+'); + + if (empty($entries)) { + $output->writeln('No messages queued found in Redis stream "messages".'); + return Command::SUCCESS; + } + + foreach ($entries as $id => $entry) { + $output->writeln("Message ID: $id"); + $output->writeln(json_encode($entry, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE)); + $output->writeln(str_repeat('-', 40)); + } + + $total = count($entries); + $output->writeln("Total queued messages in stream 'messages': $total"); + $output->writeln(str_repeat('-', 40)); + + return Command::SUCCESS; + } +} diff --git a/src/Swoole/ConfigProvider.php b/src/Swoole/ConfigProvider.php index 35784dd..a7976f8 100644 --- a/src/Swoole/ConfigProvider.php +++ b/src/Swoole/ConfigProvider.php @@ -9,6 +9,7 @@ use Queue\Swoole\Command\Factory\StopCommandFactory; use Queue\Swoole\Command\GetFailedMessagesCommand; use Queue\Swoole\Command\GetProcessedMessagesCommand; +use Queue\Swoole\Command\GetQueuedMessagesCommand; use Queue\Swoole\Command\StartCommand; use Queue\Swoole\Command\StopCommand; use Queue\Swoole\Delegators\TCPServerDelegator; @@ -36,6 +37,7 @@ public function getDependencies(): array StopCommand::class => StopCommandFactory::class, GetProcessedMessagesCommand::class => AttributedServiceFactory::class, GetFailedMessagesCommand::class => AttributedServiceFactory::class, + GetQueuedMessagesCommand::class => AttributedServiceFactory::class, ], "aliases" => [], ]; diff --git a/src/Swoole/Delegators/TCPServerDelegator.php b/src/Swoole/Delegators/TCPServerDelegator.php index bd9145a..e7353ec 100644 --- a/src/Swoole/Delegators/TCPServerDelegator.php +++ b/src/Swoole/Delegators/TCPServerDelegator.php @@ -8,6 +8,7 @@ use Queue\App\Message\ExampleMessage; use Queue\Swoole\Command\GetFailedMessagesCommand; use Queue\Swoole\Command\GetProcessedMessagesCommand; +use Queue\Swoole\Command\GetQueuedMessagesCommand; use Swoole\Server as TCPSwooleServer; use Symfony\Component\Console\Application; use Symfony\Component\Console\Input\ArrayInput; @@ -37,6 +38,7 @@ public function __invoke(ContainerInterface $container, string $serviceName, cal $commandMap = [ 'processed' => GetProcessedMessagesCommand::class, 'failed' => GetFailedMessagesCommand::class, + 'inventory' => GetQueuedMessagesCommand::class, ]; $server->on('Connect', function ($server, $fd) {