Skip to content

Commit 6a7d673

Browse files
basertloks0n
authored andcommitted
feat: add workerStop handling
1 parent 9f74bfc commit 6a7d673

File tree

11 files changed

+176
-85
lines changed

11 files changed

+176
-85
lines changed

docker-compose.yml

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ services:
33
container_name: tests
44
build: .
55
volumes:
6-
- ./src:/usr/local/src/src
7-
- ./tests:/usr/local/src/tests
6+
- ./vendor:/usr/src/code/vendor
7+
- ./src:/usr/src/code/src
8+
- ./tests:/usr/src/code/tests
89
depends_on:
910
- swoole
1011
- swoole-amqp
@@ -16,8 +17,9 @@ services:
1617
build: ./tests/Queue/servers/Swoole/.
1718
command: php /usr/src/code/tests/Queue/servers/Swoole/worker.php
1819
volumes:
19-
- ./src:/usr/local/src/src
20-
- ./tests:/usr/local/src/tests
20+
- ./vendor:/usr/src/code/vendor
21+
- ./src:/usr/src/code/src
22+
- ./tests:/usr/src/code/tests
2123
depends_on:
2224
- redis
2325

@@ -26,8 +28,9 @@ services:
2628
build: ./tests/Queue/servers/SwooleRedisCluster/.
2729
command: php /usr/src/code/tests/Queue/servers/SwooleRedisCluster/worker.php
2830
volumes:
29-
- ./src:/usr/local/src/src
30-
- ./tests:/usr/local/src/tests
31+
- ./vendor:/usr/src/code/vendor
32+
- ./src:/usr/src/code/src
33+
- ./tests:/usr/src/code/tests
3134
depends_on:
3235
redis-cluster-0:
3336
condition: service_healthy
@@ -37,8 +40,9 @@ services:
3740
build: ./tests/Queue/servers/AMQP/.
3841
command: php /usr/src/code/tests/Queue/servers/AMQP/worker.php
3942
volumes:
40-
- ./src:/usr/local/src/src
41-
- ./tests:/usr/local/src/tests
43+
- ./vendor:/usr/src/code/vendor
44+
- ./src:/usr/src/code/src
45+
- ./tests:/usr/src/code/tests
4246
depends_on:
4347
amqp:
4448
condition: service_healthy
@@ -48,8 +52,9 @@ services:
4852
build: ./tests/Queue/servers/Workerman/.
4953
command: php /usr/src/code/tests/Queue/servers/Workerman/worker.php start
5054
volumes:
51-
- ./src:/usr/local/src/src
52-
- ./tests:/usr/local/src/tests
55+
- ./vendor:/usr/src/code/vendor
56+
- ./src:/usr/src/code/src
57+
- ./tests:/usr/src/code/tests
5358
depends_on:
5459
- redis
5560

src/Queue/Adapter/Swoole.php

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,19 @@
22

33
namespace Utopia\Queue\Adapter;
44

5+
use Swoole\Constant;
56
use Swoole\Process\Pool;
7+
use Utopia\CLI\Console;
68
use Utopia\Queue\Adapter;
79
use Utopia\Queue\Consumer;
810

911
class Swoole extends Adapter
1012
{
1113
protected Pool $pool;
1214

15+
/** @var callable */
16+
private $onStop;
17+
1318
public function __construct(Consumer $consumer, int $workerNum, string $queue, string $namespace = 'utopia-queue')
1419
{
1520
parent::__construct($workerNum, $queue, $namespace);
@@ -21,19 +26,57 @@ public function __construct(Consumer $consumer, int $workerNum, string $queue, s
2126
public function start(): self
2227
{
2328
$this->pool->set(['enable_coroutine' => true]);
29+
30+
// Register signal handlers in the main process before starting pool
31+
if (extension_loaded('pcntl')) {
32+
pcntl_signal(SIGTERM, function () {
33+
Console::info("[Swoole] Received SIGTERM, initiating graceful shutdown...");
34+
$this->stop();
35+
});
36+
37+
pcntl_signal(SIGINT, function () {
38+
Console::info("[Swoole] Received SIGINT, initiating graceful shutdown...");
39+
$this->stop();
40+
});
41+
42+
// Enable async signals
43+
pcntl_async_signals(true);
44+
}
45+
2446
$this->pool->start();
2547
return $this;
2648
}
2749

2850
public function stop(): self
2951
{
52+
if ($this->onStop) {
53+
call_user_func($this->onStop);
54+
}
55+
56+
Console::info("[Swoole] Shutting down process pool...");
3057
$this->pool->shutdown();
58+
Console::success("[Swoole] Process pool stopped.");
3159
return $this;
3260
}
3361

3462
public function workerStart(callable $callback): self
3563
{
36-
$this->pool->on('WorkerStart', function (Pool $pool, string $workerId) use ($callback) {
64+
$this->pool->on(Constant::EVENT_WORKER_START, function (Pool $pool, string $workerId) use ($callback) {
65+
// Register signal handlers in each worker process for graceful shutdown
66+
if (extension_loaded('pcntl')) {
67+
pcntl_signal(SIGTERM, function () use ($workerId) {
68+
Console::info("[Worker] Worker {$workerId} received SIGTERM, closing consumer...");
69+
$this->consumer->close();
70+
});
71+
72+
pcntl_signal(SIGINT, function () use ($workerId) {
73+
Console::info("[Worker] Worker {$workerId} received SIGINT, closing consumer...");
74+
$this->consumer->close();
75+
});
76+
77+
pcntl_async_signals(true);
78+
}
79+
3780
call_user_func($callback, $workerId);
3881
});
3982

@@ -42,7 +85,8 @@ public function workerStart(callable $callback): self
4285

4386
public function workerStop(callable $callback): self
4487
{
45-
$this->pool->on('WorkerStart', function (Pool $pool, string $workerId) use ($callback) {
88+
$this->onStop = $callback;
89+
$this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $pool, string $workerId) use ($callback) {
4690
call_user_func($callback, $workerId);
4791
});
4892

src/Queue/Broker/AMQP.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe
131131

132132
public function close(): void
133133
{
134+
$this->channel?->stopConsume();
134135
$this->channel?->getConnection()?->close();
135136
}
136137

src/Queue/Server.php

Lines changed: 85 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,11 @@ class Server
5353
*/
5454
protected Hook $workerStartHook;
5555

56+
/**
57+
* Hook that is called when worker stops
58+
*/
59+
protected Hook $workerStopHook;
60+
5661
/**
5762
* @var array
5863
*/
@@ -96,7 +101,7 @@ public function getResource(string $name, bool $fresh = false): mixed
96101
{
97102
if (!\array_key_exists($name, $this->resources) || $fresh || self::$resourcesCallbacks[$name]['reset']) {
98103
if (!\array_key_exists($name, self::$resourcesCallbacks)) {
99-
throw new Exception('Failed to find resource: "' . $name . '"');
104+
throw new Exception("Failed to find resource: $name");
100105
}
101106

102107
$this->resources[$name] = \call_user_func_array(
@@ -213,49 +218,23 @@ public function start(): self
213218
$this->adapter->workerStart(function (string $workerId) {
214219
Console::success("[Worker] Worker {$workerId} is ready!");
215220
self::setResource('workerId', fn () => $workerId);
216-
if (!is_null($this->workerStartHook)) {
221+
if ($this->workerStartHook !== null) {
217222
call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook));
218223
}
219224

220-
while (true) {
221-
$this->adapter->consumer->consume(
222-
$this->adapter->queue,
223-
function (Message $message) {
224-
$receivedAtTimestamp = microtime(true);
225-
Console::info("[Job] Received Job ({$message->getPid()}).");
226-
try {
227-
$waitDuration = microtime(true) - $message->getTimestamp();
228-
$this->jobWaitTime->record($waitDuration);
229-
230-
$this->resources = [];
231-
self::setResource('message', fn () => $message);
232-
if ($this->job->getHook()) {
233-
foreach ($this->initHooks as $hook) { // Global init hooks
234-
if (in_array('*', $hook->getGroups())) {
235-
$arguments = $this->getArguments($hook, $message->getPayload());
236-
\call_user_func_array($hook->getAction(), $arguments);
237-
}
238-
}
239-
}
240-
241-
foreach ($this->job->getGroups() as $group) {
242-
foreach ($this->initHooks as $hook) { // Group init hooks
243-
if (in_array($group, $hook->getGroups())) {
244-
$arguments = $this->getArguments($hook, $message->getPayload());
245-
\call_user_func_array($hook->getAction(), $arguments);
246-
}
247-
}
248-
}
249-
250-
return \call_user_func_array($this->job->getAction(), $this->getArguments($this->job, $message->getPayload()));
251-
} finally {
252-
$processDuration = microtime(true) - $receivedAtTimestamp;
253-
$this->processDuration->record($processDuration);
254-
}
255-
},
256-
function (Message $message) {
225+
$this->adapter->consumer->consume(
226+
$this->adapter->queue,
227+
function (Message $message) {
228+
$receivedAtTimestamp = microtime(true);
229+
Console::info("[Job] Received Job ({$message->getPid()}).");
230+
try {
231+
$waitDuration = microtime(true) - $message->getTimestamp();
232+
$this->jobWaitTime->record($waitDuration);
233+
234+
$this->resources = [];
235+
self::setResource('message', fn () => $message);
257236
if ($this->job->getHook()) {
258-
foreach ($this->shutdownHooks as $hook) { // Global init hooks
237+
foreach ($this->initHooks as $hook) { // Global init hooks
259238
if (in_array('*', $hook->getGroups())) {
260239
$arguments = $this->getArguments($hook, $message->getPayload());
261240
\call_user_func_array($hook->getAction(), $arguments);
@@ -264,27 +243,65 @@ function (Message $message) {
264243
}
265244

266245
foreach ($this->job->getGroups() as $group) {
267-
foreach ($this->shutdownHooks as $hook) { // Group init hooks
246+
foreach ($this->initHooks as $hook) { // Group init hooks
268247
if (in_array($group, $hook->getGroups())) {
269248
$arguments = $this->getArguments($hook, $message->getPayload());
270249
\call_user_func_array($hook->getAction(), $arguments);
271250
}
272251
}
273252
}
274-
Console::success("[Job] ({$message->getPid()}) successfully run.");
275-
},
276-
function (?Message $message, Throwable $th) {
277-
Console::error("[Job] ({$message?->getPid()}) failed to run.");
278-
Console::error("[Job] ({$message?->getPid()}) {$th->getMessage()}");
279253

280-
self::setResource('error', fn () => $th);
254+
return \call_user_func_array($this->job->getAction(), $this->getArguments($this->job, $message->getPayload()));
255+
} finally {
256+
$processDuration = microtime(true) - $receivedAtTimestamp;
257+
$this->processDuration->record($processDuration);
258+
}
259+
},
260+
function (Message $message) {
261+
if ($this->job->getHook()) {
262+
foreach ($this->shutdownHooks as $hook) { // Global init hooks
263+
if (in_array('*', $hook->getGroups())) {
264+
$arguments = $this->getArguments($hook, $message->getPayload());
265+
\call_user_func_array($hook->getAction(), $arguments);
266+
}
267+
}
268+
}
281269

282-
foreach ($this->errorHooks as $hook) {
283-
($hook->getAction())(...$this->getArguments($hook));
270+
foreach ($this->job->getGroups() as $group) {
271+
foreach ($this->shutdownHooks as $hook) { // Group init hooks
272+
if (in_array($group, $hook->getGroups())) {
273+
$arguments = $this->getArguments($hook, $message->getPayload());
274+
\call_user_func_array($hook->getAction(), $arguments);
275+
}
284276
}
285-
},
286-
);
277+
}
278+
Console::success("[Job] ({$message->getPid()}) successfully run.");
279+
},
280+
function (?Message $message, Throwable $th) {
281+
Console::error("[Job] ({$message?->getPid()}) failed to run.");
282+
Console::error("[Job] ({$message?->getPid()}) {$th->getMessage()}");
283+
284+
self::setResource('error', fn () => $th);
285+
286+
foreach ($this->errorHooks as $hook) {
287+
$hook->getAction()(...$this->getArguments($hook));
288+
}
289+
},
290+
);
291+
});
292+
293+
$this->adapter->workerStop(function ($workerId) {
294+
Console::info("[Worker] Worker {$workerId} stopping...");
295+
self::setResource('workerId', fn () => $workerId);
296+
297+
// Call user-defined workerStop hook if set
298+
if ($this->workerStopHook !== null) {
299+
call_user_func_array($this->workerStopHook->getAction(), $this->getArguments($this->workerStopHook));
287300
}
301+
302+
// Close consumer connection
303+
$this->adapter->consumer->close();
304+
Console::success("[Worker] Worker {$workerId} stopped gracefully.");
288305
});
289306

290307
$this->adapter->start();
@@ -320,27 +337,23 @@ public function getWorkerStart(): Hook
320337

321338
/**
322339
* Is called when a Worker stops.
323-
* @param callable|null $callback
324-
* @return self
325-
* @throws Exception
340+
* @return Hook
326341
*/
327-
public function workerStop(?callable $callback = null): self
342+
public function workerStop(): Hook
328343
{
329-
try {
330-
$this->adapter->workerStop(function (string $workerId) use ($callback) {
331-
Console::success("[Worker] Worker {$workerId} is ready!");
332-
if (!is_null($callback)) {
333-
call_user_func($callback);
334-
}
335-
});
336-
} catch (Throwable $error) {
337-
self::setResource('error', fn () => $error);
338-
foreach ($this->errorHooks as $hook) {
339-
call_user_func_array($hook->getAction(), $this->getArguments($hook));
340-
}
341-
}
344+
$hook = new Hook();
345+
$hook->groups(['*']);
346+
$this->workerStopHook = $hook;
347+
return $hook;
348+
}
342349

343-
return $this;
350+
/**
351+
* Returns Worker stops hook.
352+
* @return ?Hook
353+
*/
354+
public function getWorkerStop(): ?Hook
355+
{
356+
return $this->workerStopHook;
344357
}
345358

346359
/**
@@ -355,7 +368,7 @@ protected function getArguments(Hook $hook, array $payload = []): array
355368
$arguments = [];
356369
foreach ($hook->getParams() as $key => $param) { // Get value from route or request object
357370
$value = $payload[$key] ?? $param['default'];
358-
$value = ($value === '' || is_null($value)) ? $param['default'] : $value;
371+
$value = ($value === '' || $value === null) ? $param['default'] : $value;
359372

360373
$this->validate($key, $param, $value);
361374
$hook->setParamValue($key, $value);
@@ -384,7 +397,7 @@ protected function getArguments(Hook $hook, array $payload = []): array
384397
*/
385398
protected function validate(string $key, array $param, mixed $value): void
386399
{
387-
if ('' !== $value && !is_null($value)) {
400+
if ('' !== $value && $value !== null) {
388401
$validator = $param['validator']; // checking whether the class exists
389402

390403
if (\is_callable($validator)) {
@@ -399,7 +412,7 @@ protected function validate(string $key, array $param, mixed $value): void
399412
throw new Exception('Invalid ' .$key . ': ' . $validator->getDescription(), 400);
400413
}
401414
} elseif (!$param['optional']) {
402-
throw new Exception('Param "' . $key . '" is not optional.', 400);
415+
throw new Exception("Param $key is not optional.", 400);
403416
}
404417
}
405418

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
11
FROM phpswoole/swoole:php8.3-alpine
22

3-
RUN apk add autoconf build-base
3+
RUN apk add autoconf build-base
4+
5+
RUN docker-php-ext-install pcntl

0 commit comments

Comments
 (0)