From 7ebf97306668cd6e849ac036404e619de33dd986 Mon Sep 17 00:00:00 2001 From: bota Date: Wed, 30 Jul 2025 16:21:13 +0300 Subject: [PATCH 1/2] command to get all queued messages in messages stream Signed-off-by: bota --- composer.json | 1 + config/autoload/cli.global.php | 2 + docs/book/v1/commands.md | 13 ++-- .../Command/GetQueuedMessagesCommand.php | 68 +++++++++++++++++++ src/Swoole/ConfigProvider.php | 2 + src/Swoole/Delegators/TCPServerDelegator.php | 2 + 6 files changed, 84 insertions(+), 4 deletions(-) create mode 100644 src/Swoole/Command/GetQueuedMessagesCommand.php diff --git a/composer.json b/composer.json index c0c1ab6..95947c0 100644 --- a/composer.json +++ b/composer.json @@ -44,6 +44,7 @@ }, "require": { "php": "~8.2.0 || ~8.3.0 || ~8.4", + "ext-redis": "*", "dotkernel/dot-cli": "^3.9", "dotkernel/dot-dependency-injection": "^1.2", "dotkernel/dot-errorhandler": "4.2.1", diff --git a/config/autoload/cli.global.php b/config/autoload/cli.global.php index f9b2d68..aefbeb9 100644 --- a/config/autoload/cli.global.php +++ b/config/autoload/cli.global.php @@ -5,6 +5,7 @@ use Dot\Cli\FileLockerInterface; use Queue\Swoole\Command\GetFailedMessagesCommand; use Queue\Swoole\Command\GetProcessedMessagesCommand; +use Queue\Swoole\Command\GetQueuedMessagesCommand; use Queue\Swoole\Command\StartCommand; use Queue\Swoole\Command\StopCommand; use Symfony\Component\Messenger\Command\ConsumeMessagesCommand; @@ -21,6 +22,7 @@ "messenger:debug" => DebugCommand::class, "processed" => GetProcessedMessagesCommand::class, "failed" => GetFailedMessagesCommand::class, + "inventory" => GetQueuedMessagesCommand::class, ], ], FileLockerInterface::class => [ diff --git a/docs/book/v1/commands.md b/docs/book/v1/commands.md index f0faaf5..94b428a 100644 --- a/docs/book/v1/commands.md +++ b/docs/book/v1/commands.md @@ -2,10 +2,11 @@ The commands available are: -1. `GetFailedMessagesCommand.php` - returns logs with messages that failed to process (levelName:error) -2. `GetProcessedMessagesCommand.php` - returns logs with messages that were successfully processed (levelName:info) +1. `GetFailedMessagesCommand.php (failed)` - returns logs with messages that failed to process (levelName:error) +2. `GetProcessedMessagesCommand.php (procecssed)` - returns logs with messages that were successfully processed (levelName:info) +3. `GetQueuedMessagesCommand (inventory)` - returns all queued messages from Redis stream 'messages' -Both commands are used to extract data that can be filtered by period from the log file. The commands can be run in two different ways: +The commands can be run in two different ways: ### CLI @@ -15,6 +16,8 @@ To run the commands via CLI, use the following syntax: `php bin/cli.php processed --start="yyyy-mm-dd" --end="yyyy-mm-dd" --limit=int` +`php bin/cli.php inventory` + ### TCP message To use commands using TCP messages the following messages can be used: @@ -29,4 +32,6 @@ In order to be able to test the `processed` command, by default when processing `echo "control" | socat -t1 - TCP:host:port` -Using `-t1` flag is not necessary but can be useful, it is used to set a timeout of n seconds for both reading and writing, after n second of inactivity, socat will terminate the connection. If the timeout is not set and the server does not respond or keep the connection open, the socat process could hang indefinitely. + > Using `-t1` flag is not necessary but can be useful, it is used to set a timeout of n seconds for both reading and writing, after n second of inactivity, socat will terminate the connection. If the timeout is not set and the server does not respond or keep the connection open, the socat process could hang indefinitely. + +`echo "inventory" | socat -t1 - TCP:host:port` diff --git a/src/Swoole/Command/GetQueuedMessagesCommand.php b/src/Swoole/Command/GetQueuedMessagesCommand.php new file mode 100644 index 0000000..a40227d --- /dev/null +++ b/src/Swoole/Command/GetQueuedMessagesCommand.php @@ -0,0 +1,68 @@ +redis = $redis; + } + + protected function configure(): void + { + $this->setDescription('Get all queued messages from Redis stream "messages"'); + } + + /** + * @throws RedisException + */ + protected function execute(InputInterface $input, OutputInterface $output): int + { + $entries = $this->redis->xRange('messages', '-', '+'); + + if (empty($entries)) { + $output->writeln('No messages queued found in Redis stream "messages".'); + return Command::SUCCESS; + } + + foreach ($entries as $id => $entry) { + $output->writeln("Message ID: $id"); + $output->writeln(json_encode($entry, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE)); + $output->writeln(str_repeat('-', 40)); + } + + $total = count($entries); + $output->writeln("Total queued messages in stream 'messages': $total"); + $output->writeln(str_repeat('-', 40)); + + return Command::SUCCESS; + } +} diff --git a/src/Swoole/ConfigProvider.php b/src/Swoole/ConfigProvider.php index 35784dd..a7976f8 100644 --- a/src/Swoole/ConfigProvider.php +++ b/src/Swoole/ConfigProvider.php @@ -9,6 +9,7 @@ use Queue\Swoole\Command\Factory\StopCommandFactory; use Queue\Swoole\Command\GetFailedMessagesCommand; use Queue\Swoole\Command\GetProcessedMessagesCommand; +use Queue\Swoole\Command\GetQueuedMessagesCommand; use Queue\Swoole\Command\StartCommand; use Queue\Swoole\Command\StopCommand; use Queue\Swoole\Delegators\TCPServerDelegator; @@ -36,6 +37,7 @@ public function getDependencies(): array StopCommand::class => StopCommandFactory::class, GetProcessedMessagesCommand::class => AttributedServiceFactory::class, GetFailedMessagesCommand::class => AttributedServiceFactory::class, + GetQueuedMessagesCommand::class => AttributedServiceFactory::class, ], "aliases" => [], ]; diff --git a/src/Swoole/Delegators/TCPServerDelegator.php b/src/Swoole/Delegators/TCPServerDelegator.php index bd9145a..e7353ec 100644 --- a/src/Swoole/Delegators/TCPServerDelegator.php +++ b/src/Swoole/Delegators/TCPServerDelegator.php @@ -8,6 +8,7 @@ use Queue\App\Message\ExampleMessage; use Queue\Swoole\Command\GetFailedMessagesCommand; use Queue\Swoole\Command\GetProcessedMessagesCommand; +use Queue\Swoole\Command\GetQueuedMessagesCommand; use Swoole\Server as TCPSwooleServer; use Symfony\Component\Console\Application; use Symfony\Component\Console\Input\ArrayInput; @@ -37,6 +38,7 @@ public function __invoke(ContainerInterface $container, string $serviceName, cal $commandMap = [ 'processed' => GetProcessedMessagesCommand::class, 'failed' => GetFailedMessagesCommand::class, + 'inventory' => GetQueuedMessagesCommand::class, ]; $server->on('Connect', function ($server, $fd) { From c82f2e3cbf20d5073fff9ce692397b848c7d0286 Mon Sep 17 00:00:00 2001 From: bota Date: Wed, 30 Jul 2025 21:14:16 +0300 Subject: [PATCH 2/2] removed ext-redis from composer.json Signed-off-by: bota --- composer.json | 1 - 1 file changed, 1 deletion(-) diff --git a/composer.json b/composer.json index 95947c0..c0c1ab6 100644 --- a/composer.json +++ b/composer.json @@ -44,7 +44,6 @@ }, "require": { "php": "~8.2.0 || ~8.3.0 || ~8.4", - "ext-redis": "*", "dotkernel/dot-cli": "^3.9", "dotkernel/dot-dependency-injection": "^1.2", "dotkernel/dot-errorhandler": "4.2.1",