From dc7d42deedcdb155a94c0a018499fdf49338a504 Mon Sep 17 00:00:00 2001 From: bota Date: Thu, 24 Jul 2025 15:23:37 +0300 Subject: [PATCH 1/2] run commands using tcp messages Signed-off-by: bota --- src/Swoole/Delegators/TCPServerDelegator.php | 76 ++++++++++++++++---- 1 file changed, 63 insertions(+), 13 deletions(-) diff --git a/src/Swoole/Delegators/TCPServerDelegator.php b/src/Swoole/Delegators/TCPServerDelegator.php index a5e6a48..a588572 100644 --- a/src/Swoole/Delegators/TCPServerDelegator.php +++ b/src/Swoole/Delegators/TCPServerDelegator.php @@ -6,10 +6,23 @@ use Psr\Container\ContainerInterface; use Queue\App\Message\ExampleMessage; +use Queue\Swoole\Command\GetFailedMessagesCommand; +// Import your commands +use Queue\Swoole\Command\GetProcessedMessagesCommand; use Swoole\Server as TCPSwooleServer; +use Symfony\Component\Console\Application; +use Symfony\Component\Console\Input\ArrayInput; +use Symfony\Component\Console\Output\BufferedOutput; use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Messenger\Stamp\DelayStamp; +use function array_merge; +use function array_shift; +use function explode; +use function ltrim; +use function str_starts_with; +use function trim; + class TCPServerDelegator { public function __invoke(ContainerInterface $container, string $serviceName, callable $callback): TCPSwooleServer @@ -22,25 +35,62 @@ public function __invoke(ContainerInterface $container, string $serviceName, cal $logger = $container->get("dot-log.queue-log"); + $commandMap = [ + 'processed' => GetProcessedMessagesCommand::class, + 'failed' => GetFailedMessagesCommand::class, + ]; + $server->on('Connect', function ($server, $fd) { echo "Client: Connect.\n"; }); - // Register the function for the event `receive` - $server->on('receive', function ($server, $fd, $fromId, $data) use ($logger, $bus) { - $bus->dispatch(new ExampleMessage(["foo" => $data])); - $bus->dispatch(new ExampleMessage(["foo" => "with 5 seconds delay"]), [ - new DelayStamp(5000), - ]); - - $server->send($fd, "Server: {$data}"); - $logger->notice("Request received on receive", [ - 'fd' => $fd, - 'from_id' => $fromId, - ]); + $server->on('receive', function ($server, $fd, $fromId, $data) use ($logger, $bus, $commandMap, $container) { + $message = trim($data); + $response = ''; + + $args = explode(' ', $message); + $commandName = array_shift($args); + + if (isset($commandMap[$commandName])) { + $commandClass = $commandMap[$commandName]; + $application = new Application(); + $commandInstance = $container->get($commandClass); + $application->add($commandInstance); + + $parsedOptions = []; + foreach ($args as $arg) { + if (str_starts_with($arg, '--')) { + [$key, $value] = explode('=', ltrim($arg, '-'), 2) + [null, null]; + $parsedOptions["--$key"] = $value; + } + } + + $inputData = array_merge(['command' => $commandName], $parsedOptions); + $input = new ArrayInput($inputData); + $output = new BufferedOutput(); + + try { + $application->setAutoExit(false); + $application->run($input, $output); + $response = $output->fetch(); + $server->send($fd, $response); + } catch (\Throwable $e) { + $logger->error("Error running command: " . $e->getMessage()); + } + } else { + $bus->dispatch(new ExampleMessage(["foo" => $data])); + $bus->dispatch(new ExampleMessage(["foo" => "with 5 seconds delay"]), [ + new DelayStamp(5000), + ]); + + $logger->notice("TCP request received", [ + 'fd' => $fd, + 'from_id' => $fromId, + 'data' => $data, + ]); + } }); - // Listen for the 'Close' event. $server->on('Close', function ($server, $fd) { echo "Client: Close.\n"; }); From 9d2f9fc463539b3d4169698889eb323dba0ca96c Mon Sep 17 00:00:00 2001 From: bota Date: Thu, 24 Jul 2025 15:25:24 +0300 Subject: [PATCH 2/2] removed com Signed-off-by: bota --- src/Swoole/Delegators/TCPServerDelegator.php | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Swoole/Delegators/TCPServerDelegator.php b/src/Swoole/Delegators/TCPServerDelegator.php index a588572..0953a97 100644 --- a/src/Swoole/Delegators/TCPServerDelegator.php +++ b/src/Swoole/Delegators/TCPServerDelegator.php @@ -7,7 +7,6 @@ use Psr\Container\ContainerInterface; use Queue\App\Message\ExampleMessage; use Queue\Swoole\Command\GetFailedMessagesCommand; -// Import your commands use Queue\Swoole\Command\GetProcessedMessagesCommand; use Swoole\Server as TCPSwooleServer; use Symfony\Component\Console\Application;