Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/autoload/cli.global.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Dot\Cli\FileLockerInterface;
use Queue\Swoole\Command\GetFailedMessagesCommand;
use Queue\Swoole\Command\GetProcessedMessagesCommand;
use Queue\Swoole\Command\GetQueuedMessagesCommand;
use Queue\Swoole\Command\StartCommand;
use Queue\Swoole\Command\StopCommand;
use Symfony\Component\Messenger\Command\ConsumeMessagesCommand;
Expand All @@ -21,6 +22,7 @@
"messenger:debug" => DebugCommand::class,
"processed" => GetProcessedMessagesCommand::class,
"failed" => GetFailedMessagesCommand::class,
"inventory" => GetQueuedMessagesCommand::class,
],
],
FileLockerInterface::class => [
Expand Down
13 changes: 9 additions & 4 deletions docs/book/v1/commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

The commands available are:

1. `GetFailedMessagesCommand.php` - returns logs with messages that failed to process (levelName:error)
2. `GetProcessedMessagesCommand.php` - returns logs with messages that were successfully processed (levelName:info)
1. `GetFailedMessagesCommand.php (failed)` - returns logs with messages that failed to process (levelName:error)
2. `GetProcessedMessagesCommand.php (procecssed)` - returns logs with messages that were successfully processed (levelName:info)
3. `GetQueuedMessagesCommand (inventory)` - returns all queued messages from Redis stream 'messages'

Both commands are used to extract data that can be filtered by period from the log file. The commands can be run in two different ways:
The commands can be run in two different ways:

### CLI

Expand All @@ -15,6 +16,8 @@ To run the commands via CLI, use the following syntax:

`php bin/cli.php processed --start="yyyy-mm-dd" --end="yyyy-mm-dd" --limit=int`

`php bin/cli.php inventory`

### TCP message

To use commands using TCP messages the following messages can be used:
Expand All @@ -29,4 +32,6 @@ In order to be able to test the `processed` command, by default when processing

`echo "control" | socat -t1 - TCP:host:port`

Using `-t1` flag is not necessary but can be useful, it is used to set a timeout of n seconds for both reading and writing, after n second of inactivity, socat will terminate the connection. If the timeout is not set and the server does not respond or keep the connection open, the socat process could hang indefinitely.
> Using `-t1` flag is not necessary but can be useful, it is used to set a timeout of n seconds for both reading and writing, after n second of inactivity, socat will terminate the connection. If the timeout is not set and the server does not respond or keep the connection open, the socat process could hang indefinitely.

`echo "inventory" | socat -t1 - TCP:host:port`
68 changes: 68 additions & 0 deletions src/Swoole/Command/GetQueuedMessagesCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
<?php

declare(strict_types=1);

namespace Queue\Swoole\Command;

use Dot\DependencyInjection\Attribute\Inject;
use Redis;
use RedisException;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

use function count;
use function json_encode;
use function str_repeat;

use const JSON_PRETTY_PRINT;
use const JSON_UNESCAPED_UNICODE;

#[AsCommand(
name: 'inventory',
description: 'Get all queued messages from Redis stream "messages"',
)]
class GetQueuedMessagesCommand extends Command
{
protected static string $defaultName = 'inventory';

private Redis $redis;

#[Inject('redis')]
public function __construct(Redis $redis)
{
parent::__construct(self::$defaultName);
$this->redis = $redis;
}

protected function configure(): void
{
$this->setDescription('Get all queued messages from Redis stream "messages"');
}

/**
* @throws RedisException
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
$entries = $this->redis->xRange('messages', '-', '+');

if (empty($entries)) {
$output->writeln('<info>No messages queued found in Redis stream "messages".</info>');
return Command::SUCCESS;
}

foreach ($entries as $id => $entry) {
$output->writeln("<info>Message ID:</info> $id");
$output->writeln(json_encode($entry, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE));
$output->writeln(str_repeat('-', 40));
}

$total = count($entries);
$output->writeln("<info>Total queued messages in stream 'messages':</info> $total");
$output->writeln(str_repeat('-', 40));

return Command::SUCCESS;
}
}
2 changes: 2 additions & 0 deletions src/Swoole/ConfigProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Queue\Swoole\Command\Factory\StopCommandFactory;
use Queue\Swoole\Command\GetFailedMessagesCommand;
use Queue\Swoole\Command\GetProcessedMessagesCommand;
use Queue\Swoole\Command\GetQueuedMessagesCommand;
use Queue\Swoole\Command\StartCommand;
use Queue\Swoole\Command\StopCommand;
use Queue\Swoole\Delegators\TCPServerDelegator;
Expand Down Expand Up @@ -36,6 +37,7 @@ public function getDependencies(): array
StopCommand::class => StopCommandFactory::class,
GetProcessedMessagesCommand::class => AttributedServiceFactory::class,
GetFailedMessagesCommand::class => AttributedServiceFactory::class,
GetQueuedMessagesCommand::class => AttributedServiceFactory::class,
],
"aliases" => [],
];
Expand Down
2 changes: 2 additions & 0 deletions src/Swoole/Delegators/TCPServerDelegator.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Queue\App\Message\ExampleMessage;
use Queue\Swoole\Command\GetFailedMessagesCommand;
use Queue\Swoole\Command\GetProcessedMessagesCommand;
use Queue\Swoole\Command\GetQueuedMessagesCommand;
use Swoole\Server as TCPSwooleServer;
use Symfony\Component\Console\Application;
use Symfony\Component\Console\Input\ArrayInput;
Expand Down Expand Up @@ -37,6 +38,7 @@ public function __invoke(ContainerInterface $container, string $serviceName, cal
$commandMap = [
'processed' => GetProcessedMessagesCommand::class,
'failed' => GetFailedMessagesCommand::class,
'inventory' => GetQueuedMessagesCommand::class,
];

$server->on('Connect', function ($server, $fd) {
Expand Down
Loading