diff --git a/docker-compose.yml b/docker-compose.yml index bf0089d..6b230a3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,8 +3,9 @@ services: container_name: tests build: . volumes: - - ./src:/usr/local/src/src - - ./tests:/usr/local/src/tests + - ./vendor:/usr/src/code/vendor + - ./src:/usr/src/code/src + - ./tests:/usr/src/code/tests depends_on: - swoole - swoole-amqp @@ -16,8 +17,9 @@ services: build: ./tests/Queue/servers/Swoole/. command: php /usr/src/code/tests/Queue/servers/Swoole/worker.php volumes: - - ./src:/usr/local/src/src - - ./tests:/usr/local/src/tests + - ./vendor:/usr/src/code/vendor + - ./src:/usr/src/code/src + - ./tests:/usr/src/code/tests depends_on: - redis @@ -26,8 +28,9 @@ services: build: ./tests/Queue/servers/SwooleRedisCluster/. command: php /usr/src/code/tests/Queue/servers/SwooleRedisCluster/worker.php volumes: - - ./src:/usr/local/src/src - - ./tests:/usr/local/src/tests + - ./vendor:/usr/src/code/vendor + - ./src:/usr/src/code/src + - ./tests:/usr/src/code/tests depends_on: redis-cluster-0: condition: service_healthy @@ -37,8 +40,9 @@ services: build: ./tests/Queue/servers/AMQP/. command: php /usr/src/code/tests/Queue/servers/AMQP/worker.php volumes: - - ./src:/usr/local/src/src - - ./tests:/usr/local/src/tests + - ./vendor:/usr/src/code/vendor + - ./src:/usr/src/code/src + - ./tests:/usr/src/code/tests depends_on: amqp: condition: service_healthy @@ -48,8 +52,9 @@ services: build: ./tests/Queue/servers/Workerman/. command: php /usr/src/code/tests/Queue/servers/Workerman/worker.php start volumes: - - ./src:/usr/local/src/src - - ./tests:/usr/local/src/tests + - ./vendor:/usr/src/code/vendor + - ./src:/usr/src/code/src + - ./tests:/usr/src/code/tests depends_on: - redis diff --git a/pint.json b/pint.json index c781933..dc0de5d 100644 --- a/pint.json +++ b/pint.json @@ -1,3 +1,6 @@ { - "preset": "psr12" + "preset": "psr12", + "rules": { + "single_quote": true + } } \ No newline at end of file diff --git a/src/Queue/Adapter/Swoole.php b/src/Queue/Adapter/Swoole.php index fef30da..e22b650 100644 --- a/src/Queue/Adapter/Swoole.php +++ b/src/Queue/Adapter/Swoole.php @@ -2,16 +2,26 @@ namespace Utopia\Queue\Adapter; +use Swoole\Constant; +use Swoole\Process; use Swoole\Process\Pool; + +use Utopia\Console; use Utopia\Queue\Adapter; use Utopia\Queue\Consumer; +use function Swoole\Coroutine\go; + class Swoole extends Adapter { protected Pool $pool; - public function __construct(Consumer $consumer, int $workerNum, string $queue, string $namespace = 'utopia-queue') - { + public function __construct( + Consumer $consumer, + int $workerNum, + string $queue, + string $namespace = 'utopia-queue', + ) { parent::__construct($workerNum, $queue, $namespace); $this->consumer = $consumer; @@ -20,21 +30,47 @@ public function __construct(Consumer $consumer, int $workerNum, string $queue, s public function start(): self { + // Enable coroutine hooks for Redis and other extensions $this->pool->set(['enable_coroutine' => true]); + $this->pool->start(); return $this; } public function stop(): self { + Console::info('[Swoole] Shutting down process pool...'); $this->pool->shutdown(); + Console::success('[Swoole] Process pool stopped.'); return $this; } public function workerStart(callable $callback): self { - $this->pool->on('WorkerStart', function (Pool $pool, string $workerId) use ($callback) { - call_user_func($callback, $workerId); + $this->pool->on(Constant::EVENT_WORKER_START, function ( + Pool $pool, + string $workerId, + ) use ($callback) { + // Register signal handlers for graceful shutdown + Process::signal(SIGTERM, function () use ($workerId) { + Console::info( + "[Swoole] Worker {$workerId} received SIGTERM, stopping consumer...", + ); + $this->consumer->close(); + }); + + Process::signal(SIGINT, function () use ($workerId) { + Console::info( + "[Swoole] Worker {$workerId} received SIGINT, stopping consumer...", + ); + $this->consumer->close(); + }); + + // Run consume loop in a coroutine to allow event loop to process signals + // The coroutine container waits for all child coroutines before worker exits + go(function () use ($callback, $workerId) { + \call_user_func($callback, $workerId); + }); }); return $this; @@ -42,8 +78,11 @@ public function workerStart(callable $callback): self public function workerStop(callable $callback): self { - $this->pool->on('WorkerStart', function (Pool $pool, string $workerId) use ($callback) { - call_user_func($callback, $workerId); + $this->pool->on(Constant::EVENT_WORKER_STOP, function ( + Pool $pool, + string $workerId, + ) use ($callback) { + \call_user_func($callback, $workerId); }); return $this; diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index 21825b1..62b2774 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -111,7 +111,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe $channel->exchange_declare("{$queue->namespace}.failed", AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments)); // 2. Declare the working queue and configure the DLX for receiving rejected messages. - $channel->queue_declare($queue->name, durable: true, auto_delete: false, arguments: new AMQPTable(array_merge($this->queueArguments, ["x-dead-letter-exchange" => "{$queue->namespace}.failed"]))); + $channel->queue_declare($queue->name, durable: true, auto_delete: false, arguments: new AMQPTable(array_merge($this->queueArguments, ['x-dead-letter-exchange' => "{$queue->namespace}.failed"]))); $channel->queue_bind($queue->name, $queue->namespace, routing_key: $queue->name); // 3. Declare the dead-letter-queue and bind it to the DLX. @@ -131,6 +131,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe public function close(): void { + $this->channel?->stopConsume(); $this->channel?->getConnection()?->close(); } @@ -161,7 +162,7 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int { $queueName = $queue->name; if ($failedJobs) { - $queueName = $queueName . ".failed"; + $queueName = $queueName . '.failed'; } $client = new Client(); diff --git a/src/Queue/Broker/Pool.php b/src/Queue/Broker/Pool.php index 8fcf5f0..aa7cf92 100644 --- a/src/Queue/Broker/Pool.php +++ b/src/Queue/Broker/Pool.php @@ -30,26 +30,36 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int return $this->delegatePublish(__FUNCTION__, \func_get_args()); } - public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void - { + public function consume( + Queue $queue, + callable $messageCallback, + callable $successCallback, + callable $errorCallback, + ): void { $this->delegateConsumer(__FUNCTION__, \func_get_args()); } public function close(): void { - $this->delegateConsumer(__FUNCTION__, \func_get_args()); + // TODO: Implement closing all connections in the pool } protected function delegatePublish(string $method, array $args): mixed { - return $this->publisher?->use(function (Publisher $adapter) use ($method, $args) { + return $this->publisher?->use(function (Publisher $adapter) use ( + $method, + $args, + ) { return $adapter->$method(...$args); }); } protected function delegateConsumer(string $method, array $args): mixed { - return $this->consumer?->use(function (Consumer $adapter) use ($method, $args) { + return $this->consumer?->use(function (Consumer $adapter) use ( + $method, + $args, + ) { return $adapter->$method(...$args); }); } diff --git a/src/Queue/Broker/Redis.php b/src/Queue/Broker/Redis.php index 0e3beb7..759b655 100644 --- a/src/Queue/Broker/Redis.php +++ b/src/Queue/Broker/Redis.php @@ -92,6 +92,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe public function close(): void { $this->closed = true; + $this->connection->close(); } public function enqueue(Queue $queue, array $payload): bool diff --git a/src/Queue/Connection.php b/src/Queue/Connection.php index 6f37505..c1310b7 100644 --- a/src/Queue/Connection.php +++ b/src/Queue/Connection.php @@ -25,4 +25,5 @@ public function setArray(string $key, array $value): bool; public function increment(string $key): int; public function decrement(string $key): int; public function ping(): bool; + public function close(): void; } diff --git a/src/Queue/Connection/Redis.php b/src/Queue/Connection/Redis.php index 4536bb3..7418e43 100644 --- a/src/Queue/Connection/Redis.php +++ b/src/Queue/Connection/Redis.php @@ -169,6 +169,12 @@ public function ping(): bool } } + public function close(): void + { + $this->redis?->close(); + $this->redis = null; + } + protected function getRedis(): \Redis { if ($this->redis) { diff --git a/src/Queue/Connection/RedisCluster.php b/src/Queue/Connection/RedisCluster.php index 50a2b46..476b735 100644 --- a/src/Queue/Connection/RedisCluster.php +++ b/src/Queue/Connection/RedisCluster.php @@ -166,6 +166,12 @@ public function ping(): bool } } + public function close(): void + { + $this->redis?->close(); + $this->redis = null; + } + protected function getRedis(): \RedisCluster { if ($this->redis) { diff --git a/src/Queue/Queue.php b/src/Queue/Queue.php index d6c9846..d22d971 100644 --- a/src/Queue/Queue.php +++ b/src/Queue/Queue.php @@ -9,7 +9,7 @@ public function __construct( public string $namespace = 'utopia-queue', ) { if (empty($this->name)) { - throw new \InvalidArgumentException("Cannot create queue with empty name."); + throw new \InvalidArgumentException('Cannot create queue with empty name.'); } } } diff --git a/src/Queue/Server.php b/src/Queue/Server.php index 3ee64c4..48ffa33 100644 --- a/src/Queue/Server.php +++ b/src/Queue/Server.php @@ -30,28 +30,37 @@ class Server /** * Hooks that will run when error occur * - * @var array + * @var array */ protected array $errorHooks = []; /** * Hooks that will run before running job * - * @var array + * @var array */ protected array $initHooks = []; /** * Hooks that will run after running job * - * @var array + * @var array */ protected array $shutdownHooks = []; /** - * Hook that is called when worker starts + * Hooks that will run when worker starts + * + * @var array + */ + protected array $workerStartHooks = []; + + /** + * Hooks that will run when worker stops + * + * @var array */ - protected Hook $workerStartHook; + protected array $workerStopHooks = []; /** * @var array @@ -94,14 +103,20 @@ public function job(): Job */ public function getResource(string $name, bool $fresh = false): mixed { - if (!\array_key_exists($name, $this->resources) || $fresh || self::$resourcesCallbacks[$name]['reset']) { + if ( + !\array_key_exists($name, $this->resources) || + $fresh || + self::$resourcesCallbacks[$name]['reset'] + ) { if (!\array_key_exists($name, self::$resourcesCallbacks)) { - throw new Exception('Failed to find resource: "' . $name . '"'); + throw new Exception("Failed to find resource: $name"); } $this->resources[$name] = \call_user_func_array( self::$resourcesCallbacks[$name]['callback'], - $this->getResources(self::$resourcesCallbacks[$name]['injections']) + $this->getResources( + self::$resourcesCallbacks[$name]['injections'], + ), ); } @@ -138,9 +153,16 @@ public function getResources(array $list): array * * @return void */ - public static function setResource(string $name, callable $callback, array $injections = []): void - { - self::$resourcesCallbacks[$name] = ['callback' => $callback, 'injections' => $injections, 'reset' => true]; + public static function setResource( + string $name, + callable $callback, + array $injections = [], + ): void { + self::$resourcesCallbacks[$name] = [ + 'callback' => $callback, + 'injections' => $injections, + 'reset' => true, + ]; } public function setTelemetry(Telemetry $telemetry): void @@ -149,7 +171,24 @@ public function setTelemetry(Telemetry $telemetry): void 'messaging.process.wait.duration', 's', null, - ['ExplicitBucketBoundaries' => [0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10]] + [ + 'ExplicitBucketBoundaries' => [ + 0.005, + 0.01, + 0.025, + 0.05, + 0.075, + 0.1, + 0.25, + 0.5, + 0.75, + 1, + 2.5, + 5, + 7.5, + 10, + ], + ], ); // https://opentelemetry.io/docs/specs/semconv/messaging/messaging-metrics/#metric-messagingprocessduration @@ -157,7 +196,24 @@ public function setTelemetry(Telemetry $telemetry): void 'messaging.process.duration', 's', null, - ['ExplicitBucketBoundaries' => [0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10]] + [ + 'ExplicitBucketBoundaries' => [ + 0.005, + 0.01, + 0.025, + 0.05, + 0.075, + 0.1, + 0.25, + 0.5, + 0.75, + 1, + 2.5, + 5, + 7.5, + 10, + ], + ], ); } @@ -184,7 +240,7 @@ public function stop(): self } catch (Throwable $error) { self::setResource('error', fn () => $error); foreach ($this->errorHooks as $hook) { - call_user_func_array($hook->getAction(), $this->getArguments($hook)); + $hook->getAction()(...$this->getArguments($hook)); } } return $this; @@ -213,76 +269,131 @@ public function start(): self $this->adapter->workerStart(function (string $workerId) { Console::success("[Worker] Worker {$workerId} is ready!"); self::setResource('workerId', fn () => $workerId); - if (!is_null($this->workerStartHook)) { - call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook)); + + foreach ($this->workerStartHooks as $hook) { + $hook->getAction()(...$this->getArguments($hook)); } - while (true) { - $this->adapter->consumer->consume( - $this->adapter->queue, - function (Message $message) { - $receivedAtTimestamp = microtime(true); - Console::info("[Job] Received Job ({$message->getPid()})."); - try { - $waitDuration = microtime(true) - $message->getTimestamp(); - $this->jobWaitTime->record($waitDuration); - - $this->resources = []; - self::setResource('message', fn () => $message); - if ($this->job->getHook()) { - foreach ($this->initHooks as $hook) { // Global init hooks - if (in_array('*', $hook->getGroups())) { - $arguments = $this->getArguments($hook, $message->getPayload()); - \call_user_func_array($hook->getAction(), $arguments); - } + $this->adapter->consumer->consume( + $this->adapter->queue, + function (Message $message) { + $receivedAtTimestamp = microtime(true); + Console::info( + "[Job] Received Job ({$message->getPid()}).", + ); + try { + $waitDuration = + microtime(true) - $message->getTimestamp(); + $this->jobWaitTime->record($waitDuration); + + $this->resources = []; + self::setResource('message', fn () => $message); + if ($this->job->getHook()) { + foreach ($this->initHooks as $hook) { + // Global init hooks + if (\in_array('*', $hook->getGroups())) { + $arguments = $this->getArguments( + $hook, + $message->getPayload(), + ); + $hook->getAction()(...$arguments); } } + } - foreach ($this->job->getGroups() as $group) { - foreach ($this->initHooks as $hook) { // Group init hooks - if (in_array($group, $hook->getGroups())) { - $arguments = $this->getArguments($hook, $message->getPayload()); - \call_user_func_array($hook->getAction(), $arguments); - } + foreach ($this->job->getGroups() as $group) { + foreach ($this->initHooks as $hook) { + // Group init hooks + if (\in_array($group, $hook->getGroups())) { + $arguments = $this->getArguments( + $hook, + $message->getPayload(), + ); + $hook->getAction()(...$arguments); } } - - return \call_user_func_array($this->job->getAction(), $this->getArguments($this->job, $message->getPayload())); - } finally { - $processDuration = microtime(true) - $receivedAtTimestamp; - $this->processDuration->record($processDuration); } - }, - function (Message $message) { - if ($this->job->getHook()) { - foreach ($this->shutdownHooks as $hook) { // Global init hooks - if (in_array('*', $hook->getGroups())) { - $arguments = $this->getArguments($hook, $message->getPayload()); - \call_user_func_array($hook->getAction(), $arguments); - } + + return \call_user_func_array( + $this->job->getAction(), + $this->getArguments( + $this->job, + $message->getPayload(), + ), + ); + } finally { + $processDuration = + microtime(true) - $receivedAtTimestamp; + $this->processDuration->record($processDuration); + } + }, + function (Message $message) { + if ($this->job->getHook()) { + foreach ($this->shutdownHooks as $hook) { + // Global init hooks + if (\in_array('*', $hook->getGroups())) { + $arguments = $this->getArguments( + $hook, + $message->getPayload(), + ); + $hook->getAction()(...$arguments); } } - - foreach ($this->job->getGroups() as $group) { - foreach ($this->shutdownHooks as $hook) { // Group init hooks - if (in_array($group, $hook->getGroups())) { - $arguments = $this->getArguments($hook, $message->getPayload()); - \call_user_func_array($hook->getAction(), $arguments); - } + } + + foreach ($this->job->getGroups() as $group) { + foreach ($this->shutdownHooks as $hook) { + // Group init hooks + if (\in_array($group, $hook->getGroups())) { + $arguments = $this->getArguments( + $hook, + $message->getPayload(), + ); + $hook->getAction()(...$arguments); } } - Console::success("[Job] ({$message->getPid()}) successfully run."); - }, - function (?Message $message, Throwable $th) { - Console::error("[Job] ({$message?->getPid()}) failed to run."); - Console::error("[Job] ({$message?->getPid()}) {$th->getMessage()}"); + } + Console::success( + "[Job] ({$message->getPid()}) successfully run.", + ); + }, + function (?Message $message, Throwable $th) { + Console::error( + "[Job] ({$message?->getPid()}) failed to run.", + ); + Console::error( + "[Job] ({$message?->getPid()}) {$th->getMessage()}", + ); + + self::setResource('error', fn () => $th); + + foreach ($this->errorHooks as $hook) { + $hook->getAction()(...$this->getArguments($hook)); + } + }, + ); + }); - self::setResource('error', fn () => $th); + $this->adapter->workerStop(function (string $workerId) { + Console::info("[Worker] Worker {$workerId} stopping..."); + self::setResource('workerId', fn () => $workerId); - foreach ($this->errorHooks as $hook) { - ($hook->getAction())(...$this->getArguments($hook)); - } - }, + try { + // Call user-defined workerStop hooks + foreach ($this->workerStopHooks as $hook) { + try { + $hook->getAction()(...$this->getArguments($hook)); + } catch (Throwable $e) { + Console::error( + "[Worker] Worker {$workerId} workerStop hook failed: {$e->getMessage()}", + ); + } + } + } finally { + // Always close consumer connection, even if hooks throw + $this->adapter->consumer->close(); + Console::success( + "[Worker] Worker {$workerId} stopped gracefully.", ); } }); @@ -291,7 +402,7 @@ function (?Message $message, Throwable $th) { } catch (Throwable $error) { self::setResource('error', fn () => $error); foreach ($this->errorHooks as $hook) { - call_user_func_array($hook->getAction(), $this->getArguments($hook)); + $hook->getAction()(...$this->getArguments($hook)); } } return $this; @@ -305,42 +416,38 @@ public function workerStart(): Hook { $hook = new Hook(); $hook->groups(['*']); - $this->workerStartHook = $hook; + $this->workerStartHooks[] = $hook; return $hook; } /** - * Returns Worker starts hook. - * @return Hook - */ - public function getWorkerStart(): Hook + * Returns Worker starts hooks. + * @return array + */ + public function getWorkerStart(): array { - return $this->workerStartHook; + return $this->workerStartHooks; } /** * Is called when a Worker stops. - * @param callable|null $callback - * @return self - * @throws Exception + * @return Hook */ - public function workerStop(?callable $callback = null): self + public function workerStop(): Hook { - try { - $this->adapter->workerStop(function (string $workerId) use ($callback) { - Console::success("[Worker] Worker {$workerId} is ready!"); - if (!is_null($callback)) { - call_user_func($callback); - } - }); - } catch (Throwable $error) { - self::setResource('error', fn () => $error); - foreach ($this->errorHooks as $hook) { - call_user_func_array($hook->getAction(), $this->getArguments($hook)); - } - } + $hook = new Hook(); + $hook->groups(['*']); + $this->workerStopHooks[] = $hook; + return $hook; + } - return $this; + /** + * Returns Worker stops hooks. + * @return array + */ + public function getWorkerStop(): array + { + return $this->workerStopHooks; } /** @@ -353,9 +460,11 @@ public function workerStop(?callable $callback = null): self protected function getArguments(Hook $hook, array $payload = []): array { $arguments = []; - foreach ($hook->getParams() as $key => $param) { // Get value from route or request object + foreach ($hook->getParams() as $key => $param) { + // Get value from route or request object $value = $payload[$key] ?? $param['default']; - $value = ($value === '' || is_null($value)) ? $param['default'] : $value; + $value = + $value === '' || $value === null ? $param['default'] : $value; $this->validate($key, $param, $value); $hook->setParamValue($key, $value); @@ -363,7 +472,9 @@ protected function getArguments(Hook $hook, array $payload = []): array } foreach ($hook->getInjections() as $key => $injection) { - $arguments[$injection['order']] = $this->getResource($injection['name']); + $arguments[$injection['order']] = $this->getResource( + $injection['name'], + ); } return $arguments; @@ -384,22 +495,32 @@ protected function getArguments(Hook $hook, array $payload = []): array */ protected function validate(string $key, array $param, mixed $value): void { - if ('' !== $value && !is_null($value)) { + if ('' !== $value && $value !== null) { $validator = $param['validator']; // checking whether the class exists if (\is_callable($validator)) { - $validator = \call_user_func_array($validator, $this->getResources($param['injections'])); + $validator = \call_user_func_array( + $validator, + $this->getResources($param['injections']), + ); } - if (!$validator instanceof Validator) { // is the validator object an instance of the Validator class - throw new Exception('Validator object is not an instance of the Validator class', 500); + if (!$validator instanceof Validator) { + // is the validator object an instance of the Validator class + throw new Exception( + 'Validator object is not an instance of the Validator class', + 500, + ); } if (!$validator->isValid($value)) { - throw new Exception('Invalid ' .$key . ': ' . $validator->getDescription(), 400); + throw new Exception( + 'Invalid ' . $key . ': ' . $validator->getDescription(), + 400, + ); } } elseif (!$param['optional']) { - throw new Exception('Param "' . $key . '" is not optional.', 400); + throw new Exception("Param $key is not optional.", 400); } } diff --git a/tests/Queue/servers/AMQP/Dockerfile b/tests/Queue/servers/AMQP/Dockerfile index 8643629..65460fa 100644 --- a/tests/Queue/servers/AMQP/Dockerfile +++ b/tests/Queue/servers/AMQP/Dockerfile @@ -1,3 +1,3 @@ FROM phpswoole/swoole:php8.3-alpine -RUN apk add autoconf build-base \ No newline at end of file +RUN apk add autoconf build-base diff --git a/tests/Queue/servers/AMQP/worker.php b/tests/Queue/servers/AMQP/worker.php index d590d46..9f4e286 100644 --- a/tests/Queue/servers/AMQP/worker.php +++ b/tests/Queue/servers/AMQP/worker.php @@ -3,18 +3,15 @@ require_once __DIR__ . '/../../../../vendor/autoload.php'; require_once __DIR__ . '/../tests.php'; -use Utopia\Queue; -use Utopia\Queue\Message; +use Utopia\Queue\Broker\AMQP; +use Utopia\Queue\Adapter\Swoole; +use Utopia\Queue\Server; -$consumer = new Queue\Broker\AMQP(host: 'amqp', port: 5672, user: 'amqp', password: 'amqp'); -$adapter = new Queue\Adapter\Swoole($consumer, 12, 'amqp'); -$server = new Queue\Server($adapter); +$consumer = new AMQP(host: 'amqp', port: 5672, user: 'amqp', password: 'amqp'); +$adapter = new Swoole($consumer, 12, 'amqp'); +$server = new Server($adapter); -$server->job() - ->inject('message') - ->action(function (Message $message) { - handleRequest($message); - }); +$server->job()->inject('message')->action(handleRequest(...)); $server ->error() @@ -23,10 +20,12 @@ echo $th->getMessage() . PHP_EOL; }); -$server - ->workerStart() - ->action(function () { - echo "Worker Started" . PHP_EOL; - }); +$server->workerStart()->action(function () { + echo 'Worker Started' . PHP_EOL; +}); + +$server->workerStop()->action(function () { + echo 'Worker Stopped' . PHP_EOL; +}); $server->start(); diff --git a/tests/Queue/servers/Swoole/Dockerfile b/tests/Queue/servers/Swoole/Dockerfile index 7857498..eb30cec 100644 --- a/tests/Queue/servers/Swoole/Dockerfile +++ b/tests/Queue/servers/Swoole/Dockerfile @@ -2,4 +2,4 @@ FROM phpswoole/swoole:php8.3-alpine RUN apk add autoconf build-base -RUN docker-php-ext-enable redis \ No newline at end of file +RUN docker-php-ext-enable redis diff --git a/tests/Queue/servers/Swoole/worker.php b/tests/Queue/servers/Swoole/worker.php index 3645a1d..833ecf2 100644 --- a/tests/Queue/servers/Swoole/worker.php +++ b/tests/Queue/servers/Swoole/worker.php @@ -3,19 +3,16 @@ require_once __DIR__ . '/../../../../vendor/autoload.php'; require_once __DIR__ . '/../tests.php'; -use Utopia\Queue; -use Utopia\Queue\Message; - -$connection = new Queue\Connection\Redis('redis'); -$consumer = new Queue\Broker\Redis($connection); -$adapter = new Queue\Adapter\Swoole($consumer, 12, 'swoole'); -$server = new Queue\Server($adapter); - -$server->job() - ->inject('message') - ->action(function (Message $message) { - handleRequest($message); - }); +use Utopia\Queue\Server; +use Utopia\Queue\Adapter\Swoole; +use Utopia\Queue\Connection\Redis as RedisConnection; +use Utopia\Queue\Broker\Redis; + +$consumer = new Redis(new RedisConnection('redis')); +$adapter = new Swoole($consumer, 12, 'swoole'); +$server = new Server($adapter); + +$server->job()->inject('message')->action(handleRequest(...)); $server ->error() @@ -24,10 +21,12 @@ echo $th->getMessage() . PHP_EOL; }); -$server - ->workerStart() - ->action(function () { - echo "Worker Started" . PHP_EOL; - }); +$server->workerStart()->action(function () { + echo 'Worker Started' . PHP_EOL; +}); + +$server->workerStop()->action(function () { + echo 'Worker Stopped' . PHP_EOL; +}); $server->start(); diff --git a/tests/Queue/servers/SwooleRedisCluster/Dockerfile b/tests/Queue/servers/SwooleRedisCluster/Dockerfile index 7857498..eb30cec 100644 --- a/tests/Queue/servers/SwooleRedisCluster/Dockerfile +++ b/tests/Queue/servers/SwooleRedisCluster/Dockerfile @@ -2,4 +2,4 @@ FROM phpswoole/swoole:php8.3-alpine RUN apk add autoconf build-base -RUN docker-php-ext-enable redis \ No newline at end of file +RUN docker-php-ext-enable redis diff --git a/tests/Queue/servers/SwooleRedisCluster/worker.php b/tests/Queue/servers/SwooleRedisCluster/worker.php index d120b24..e12b8ae 100644 --- a/tests/Queue/servers/SwooleRedisCluster/worker.php +++ b/tests/Queue/servers/SwooleRedisCluster/worker.php @@ -3,19 +3,22 @@ require_once __DIR__ . '/../../../../vendor/autoload.php'; require_once __DIR__ . '/../tests.php'; -use Utopia\Queue; -use Utopia\Queue\Message; - -$connection = new Queue\Connection\RedisCluster(['redis-cluster-0:6379', 'redis-cluster-1:6379', 'redis-cluster-2:6379']); -$consumer = new Queue\Broker\Redis($connection); -$adapter = new Queue\Adapter\Swoole($consumer, 12, 'swoole-redis-cluster'); -$server = new Queue\Server($adapter); - -$server->job() - ->inject('message') - ->action(function (Message $message) { - handleRequest($message); - }); +use Utopia\Queue\Broker\Redis; +use Utopia\Queue\Connection\RedisCluster; +use Utopia\Queue\Adapter\Swoole; +use Utopia\Queue\Server; + +$consumer = new Redis( + new RedisCluster([ + 'redis-cluster-0:6379', + 'redis-cluster-1:6379', + 'redis-cluster-2:6379', + ]), +); +$adapter = new Swoole($consumer, 12, 'swoole-redis-cluster'); +$server = new Server($adapter); + +$server->job()->inject('message')->action(handleRequest(...)); $server ->error() @@ -24,10 +27,12 @@ echo $th->getMessage() . PHP_EOL; }); -$server - ->workerStart() - ->action(function () { - echo "Worker Started" . PHP_EOL; - }); +$server->workerStart()->action(function () { + echo 'Worker Started' . PHP_EOL; +}); + +$server->workerStop()->action(function () { + echo 'Worker Stopped' . PHP_EOL; +}); $server->start(); diff --git a/tests/Queue/servers/Workerman/Dockerfile b/tests/Queue/servers/Workerman/Dockerfile index 6dd16ab..1704dd1 100644 --- a/tests/Queue/servers/Workerman/Dockerfile +++ b/tests/Queue/servers/Workerman/Dockerfile @@ -8,4 +8,4 @@ ADD https://github.com/mlocati/docker-php-extension-installer/releases/latest/do RUN docker-php-ext-configure pcntl --enable-pcntl -RUN docker-php-ext-install pcntl \ No newline at end of file +RUN docker-php-ext-install pcntl diff --git a/tests/Queue/servers/Workerman/worker.php b/tests/Queue/servers/Workerman/worker.php index 5a093ec..f3558fb 100644 --- a/tests/Queue/servers/Workerman/worker.php +++ b/tests/Queue/servers/Workerman/worker.php @@ -4,17 +4,15 @@ require_once __DIR__ . '/../tests.php'; use Utopia\Queue; -use Utopia\Queue\Message; +use Utopia\Queue\Adapter\Workerman; +use Utopia\Queue\Connection\Redis as RedisConnection; +use Utopia\Queue\Broker\Redis; -$connection = new Queue\Connection\Redis('redis'); -$consumer = new Queue\Broker\Redis($connection); -$adapter = new Queue\Adapter\Workerman($consumer, 12, 'wokerman'); +$consumer = new Redis(new RedisConnection('redis')); +$adapter = new Workerman($consumer, 12, 'wokerman'); $server = new Queue\Server($adapter); -$server->job() - ->inject('message') - ->action(function (Message $message) { - handleRequest($message); - }); + +$server->job()->inject('message')->action(handleRequest(...)); $server ->error() @@ -23,10 +21,12 @@ echo $th->getMessage() . PHP_EOL; }); -$server - ->workerStart() - ->action(function () { - echo "Worker Started" . PHP_EOL; - }); +$server->workerStart()->action(function () { + echo 'Worker Started' . PHP_EOL; +}); + +$server->workerStop()->action(function () { + echo 'Worker Stopped' . PHP_EOL; +}); $server->start();