Skip to content

Commit 8dba579

Browse files
committed
chore: add workerstop
1 parent eed8b7c commit 8dba579

File tree

7 files changed

+324
-227
lines changed

7 files changed

+324
-227
lines changed

src/Queue/Adapter/Swoole.php

Lines changed: 49 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33
namespace Utopia\Queue\Adapter;
44

55
use Swoole\Constant;
6+
use Swoole\Process;
67
use Swoole\Process\Pool;
7-
use Utopia\CLI\Console;
8+
use Utopia\Console;
89
use Utopia\Queue\Adapter;
910
use Utopia\Queue\Consumer;
1011

@@ -15,8 +16,12 @@ class Swoole extends Adapter
1516
/** @var callable */
1617
private $onStop;
1718

18-
public function __construct(Consumer $consumer, int $workerNum, string $queue, string $namespace = 'utopia-queue')
19-
{
19+
public function __construct(
20+
Consumer $consumer,
21+
int $workerNum,
22+
string $queue,
23+
string $namespace = "utopia-queue",
24+
) {
2025
parent::__construct($workerNum, $queue, $namespace);
2126

2227
$this->consumer = $consumer;
@@ -25,25 +30,22 @@ public function __construct(Consumer $consumer, int $workerNum, string $queue, s
2530

2631
public function start(): self
2732
{
28-
$this->pool->set(['enable_coroutine' => true]);
29-
30-
// Register signal handlers in the main process before starting pool
31-
if (extension_loaded('pcntl')) {
32-
pcntl_signal(SIGTERM, function () {
33-
Console::info("[Swoole] Received SIGTERM, initiating graceful shutdown...");
34-
$this->stop();
35-
});
36-
37-
pcntl_signal(SIGINT, function () {
38-
Console::info("[Swoole] Received SIGINT, initiating graceful shutdown...");
39-
$this->stop();
40-
});
33+
$this->pool->set(["enable_coroutine" => true]);
34+
35+
// Register signal handlers
36+
Process::signal(SIGTERM, function () {
37+
Console::info(
38+
"[Swoole] Received SIGTERM, initiating graceful shutdown...",
39+
);
40+
$this->stop();
41+
});
4142

42-
// Enable async signals
43-
pcntl_async_signals(true);
44-
} else {
45-
Console::warning("[Swoole] pcntl extension is not loaded, worker will not shutdown gracefully.");
46-
}
43+
Process::signal(SIGINT, function () {
44+
Console::info(
45+
"[Swoole] Received SIGINT, initiating graceful shutdown...",
46+
);
47+
$this->stop();
48+
});
4749

4850
$this->pool->start();
4951
return $this;
@@ -52,7 +54,7 @@ public function start(): self
5254
public function stop(): self
5355
{
5456
if ($this->onStop) {
55-
call_user_func($this->onStop);
57+
\call_user_func($this->onStop);
5658
}
5759

5860
Console::info("[Swoole] Shutting down process pool...");
@@ -63,23 +65,26 @@ public function stop(): self
6365

6466
public function workerStart(callable $callback): self
6567
{
66-
$this->pool->on(Constant::EVENT_WORKER_START, function (Pool $pool, string $workerId) use ($callback) {
67-
// Register signal handlers in each worker process for graceful shutdown
68-
if (extension_loaded('pcntl')) {
69-
pcntl_signal(SIGTERM, function () use ($workerId) {
70-
Console::info("[Worker] Worker {$workerId} received SIGTERM, closing consumer...");
71-
$this->consumer->close();
72-
});
73-
74-
pcntl_signal(SIGINT, function () use ($workerId) {
75-
Console::info("[Worker] Worker {$workerId} received SIGINT, closing consumer...");
76-
$this->consumer->close();
77-
});
78-
79-
pcntl_async_signals(true);
80-
}
81-
82-
call_user_func($callback, $workerId);
68+
$this->pool->on(Constant::EVENT_WORKER_START, function (
69+
Pool $pool,
70+
string $workerId,
71+
) use ($callback) {
72+
// Register signal handlers in worker
73+
Process::signal(SIGTERM, function () {
74+
Console::info(
75+
"[Swoole] Received SIGTERM, initiating graceful shutdown...",
76+
);
77+
$this->stop();
78+
});
79+
80+
Process::signal(SIGINT, function () {
81+
Console::info(
82+
"[Swoole] Received SIGINT, initiating graceful shutdown...",
83+
);
84+
$this->stop();
85+
});
86+
87+
\call_user_func($callback, $workerId);
8388
});
8489

8590
return $this;
@@ -88,8 +93,11 @@ public function workerStart(callable $callback): self
8893
public function workerStop(callable $callback): self
8994
{
9095
$this->onStop = $callback;
91-
$this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $pool, string $workerId) use ($callback) {
92-
call_user_func($callback, $workerId);
96+
$this->pool->on(Constant::EVENT_WORKER_STOP, function (
97+
Pool $pool,
98+
string $workerId,
99+
) use ($callback) {
100+
\call_user_func($callback, $workerId);
93101
});
94102

95103
return $this;

src/Queue/Broker/Pool.php

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,12 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int
3030
return $this->delegatePublish(__FUNCTION__, \func_get_args());
3131
}
3232

33-
public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void
34-
{
33+
public function consume(
34+
Queue $queue,
35+
callable $messageCallback,
36+
callable $successCallback,
37+
callable $errorCallback,
38+
): void {
3539
$this->delegateConsumer(__FUNCTION__, \func_get_args());
3640
}
3741

@@ -42,14 +46,20 @@ public function close(): void
4246

4347
protected function delegatePublish(string $method, array $args): mixed
4448
{
45-
return $this->publisher?->use(function (Publisher $adapter) use ($method, $args) {
49+
return $this->publisher?->use(function (Publisher $adapter) use (
50+
$method,
51+
$args,
52+
) {
4653
return $adapter->$method(...$args);
4754
});
4855
}
4956

5057
protected function delegateConsumer(string $method, array $args): mixed
5158
{
52-
return $this->consumer?->use(function (Consumer $adapter) use ($method, $args) {
59+
return $this->consumer?->use(function (Consumer $adapter) use (
60+
$method,
61+
$args,
62+
) {
5363
return $adapter->$method(...$args);
5464
});
5565
}

0 commit comments

Comments
 (0)