Skip to content

Commit 498bbbe

Browse files
authored
Merge pull request #45 from utopia-php/PLA-2762-2
feat: add workerStop handling
2 parents 06b5ced + 9e20d42 commit 498bbbe

File tree

3 files changed

+61
-77
lines changed

3 files changed

+61
-77
lines changed

src/Queue/Adapter/Swoole.php

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace Utopia\Queue\Adapter;
44

5+
use Swoole\Constant;
56
use Swoole\Process\Pool;
67
use Utopia\Queue\Adapter;
78
use Utopia\Queue\Consumer;
@@ -10,6 +11,9 @@ class Swoole extends Adapter
1011
{
1112
protected Pool $pool;
1213

14+
/** @var callable */
15+
private $onStop;
16+
1317
public function __construct(Consumer $consumer, int $workerNum, string $queue, string $namespace = 'utopia-queue')
1418
{
1519
parent::__construct($workerNum, $queue, $namespace);
@@ -27,13 +31,16 @@ public function start(): self
2731

2832
public function stop(): self
2933
{
34+
if ($this->onStop) {
35+
call_user_func($this->onStop);
36+
}
3037
$this->pool->shutdown();
3138
return $this;
3239
}
3340

3441
public function workerStart(callable $callback): self
3542
{
36-
$this->pool->on('WorkerStart', function (Pool $pool, string $workerId) use ($callback) {
43+
$this->pool->on(Constant::EVENT_WORKER_START, function (Pool $pool, string $workerId) use ($callback) {
3744
call_user_func($callback, $workerId);
3845
});
3946

@@ -42,7 +49,8 @@ public function workerStart(callable $callback): self
4249

4350
public function workerStop(callable $callback): self
4451
{
45-
$this->pool->on('WorkerStart', function (Pool $pool, string $workerId) use ($callback) {
52+
$this->onStop = $callback;
53+
$this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $pool, string $workerId) use ($callback) {
4654
call_user_func($callback, $workerId);
4755
});
4856

src/Queue/Broker/AMQP.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe
131131

132132
public function close(): void
133133
{
134+
$this->channel?->stopConsume();
134135
$this->channel?->getConnection()?->close();
135136
}
136137

src/Queue/Server.php

Lines changed: 50 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -217,45 +217,19 @@ public function start(): self
217217
call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook));
218218
}
219219

220-
while (true) {
221-
$this->adapter->consumer->consume(
222-
$this->adapter->queue,
223-
function (Message $message) {
224-
$receivedAtTimestamp = microtime(true);
225-
Console::info("[Job] Received Job ({$message->getPid()}).");
226-
try {
227-
$waitDuration = microtime(true) - $message->getTimestamp();
228-
$this->jobWaitTime->record($waitDuration);
229-
230-
$this->resources = [];
231-
self::setResource('message', fn () => $message);
232-
if ($this->job->getHook()) {
233-
foreach ($this->initHooks as $hook) { // Global init hooks
234-
if (in_array('*', $hook->getGroups())) {
235-
$arguments = $this->getArguments($hook, $message->getPayload());
236-
\call_user_func_array($hook->getAction(), $arguments);
237-
}
238-
}
239-
}
240-
241-
foreach ($this->job->getGroups() as $group) {
242-
foreach ($this->initHooks as $hook) { // Group init hooks
243-
if (in_array($group, $hook->getGroups())) {
244-
$arguments = $this->getArguments($hook, $message->getPayload());
245-
\call_user_func_array($hook->getAction(), $arguments);
246-
}
247-
}
248-
}
249-
250-
return \call_user_func_array($this->job->getAction(), $this->getArguments($this->job, $message->getPayload()));
251-
} finally {
252-
$processDuration = microtime(true) - $receivedAtTimestamp;
253-
$this->processDuration->record($processDuration);
254-
}
255-
},
256-
function (Message $message) {
220+
$this->adapter->consumer->consume(
221+
$this->adapter->queue,
222+
function (Message $message) {
223+
$receivedAtTimestamp = microtime(true);
224+
Console::info("[Job] Received Job ({$message->getPid()}).");
225+
try {
226+
$waitDuration = microtime(true) - $message->getTimestamp();
227+
$this->jobWaitTime->record($waitDuration);
228+
229+
$this->resources = [];
230+
self::setResource('message', fn () => $message);
257231
if ($this->job->getHook()) {
258-
foreach ($this->shutdownHooks as $hook) { // Global init hooks
232+
foreach ($this->initHooks as $hook) { // Global init hooks
259233
if (in_array('*', $hook->getGroups())) {
260234
$arguments = $this->getArguments($hook, $message->getPayload());
261235
\call_user_func_array($hook->getAction(), $arguments);
@@ -264,29 +238,55 @@ function (Message $message) {
264238
}
265239

266240
foreach ($this->job->getGroups() as $group) {
267-
foreach ($this->shutdownHooks as $hook) { // Group init hooks
241+
foreach ($this->initHooks as $hook) { // Group init hooks
268242
if (in_array($group, $hook->getGroups())) {
269243
$arguments = $this->getArguments($hook, $message->getPayload());
270244
\call_user_func_array($hook->getAction(), $arguments);
271245
}
272246
}
273247
}
274-
Console::success("[Job] ({$message->getPid()}) successfully run.");
275-
},
276-
function (?Message $message, Throwable $th) {
277-
Console::error("[Job] ({$message?->getPid()}) failed to run.");
278-
Console::error("[Job] ({$message?->getPid()}) {$th->getMessage()}");
279248

280-
self::setResource('error', fn () => $th);
249+
return \call_user_func_array($this->job->getAction(), $this->getArguments($this->job, $message->getPayload()));
250+
} finally {
251+
$processDuration = microtime(true) - $receivedAtTimestamp;
252+
$this->processDuration->record($processDuration);
253+
}
254+
},
255+
function (Message $message) {
256+
if ($this->job->getHook()) {
257+
foreach ($this->shutdownHooks as $hook) { // Global init hooks
258+
if (in_array('*', $hook->getGroups())) {
259+
$arguments = $this->getArguments($hook, $message->getPayload());
260+
\call_user_func_array($hook->getAction(), $arguments);
261+
}
262+
}
263+
}
281264

282-
foreach ($this->errorHooks as $hook) {
283-
($hook->getAction())(...$this->getArguments($hook));
265+
foreach ($this->job->getGroups() as $group) {
266+
foreach ($this->shutdownHooks as $hook) { // Group init hooks
267+
if (in_array($group, $hook->getGroups())) {
268+
$arguments = $this->getArguments($hook, $message->getPayload());
269+
\call_user_func_array($hook->getAction(), $arguments);
270+
}
284271
}
285-
},
286-
);
287-
}
272+
}
273+
Console::success("[Job] ({$message->getPid()}) successfully run.");
274+
},
275+
function (?Message $message, Throwable $th) {
276+
Console::error("[Job] ({$message?->getPid()}) failed to run.");
277+
Console::error("[Job] ({$message?->getPid()}) {$th->getMessage()}");
278+
279+
self::setResource('error', fn () => $th);
280+
281+
foreach ($this->errorHooks as $hook) {
282+
($hook->getAction())(...$this->getArguments($hook));
283+
}
284+
},
285+
);
288286
});
289287

288+
$this->adapter->workerStop(fn () => $this->adapter->consumer->close());
289+
290290
$this->adapter->start();
291291
} catch (Throwable $error) {
292292
self::setResource('error', fn () => $error);
@@ -318,31 +318,6 @@ public function getWorkerStart(): Hook
318318
return $this->workerStartHook;
319319
}
320320

321-
/**
322-
* Is called when a Worker stops.
323-
* @param callable|null $callback
324-
* @return self
325-
* @throws Exception
326-
*/
327-
public function workerStop(?callable $callback = null): self
328-
{
329-
try {
330-
$this->adapter->workerStop(function (string $workerId) use ($callback) {
331-
Console::success("[Worker] Worker {$workerId} is ready!");
332-
if (!is_null($callback)) {
333-
call_user_func($callback);
334-
}
335-
});
336-
} catch (Throwable $error) {
337-
self::setResource('error', fn () => $error);
338-
foreach ($this->errorHooks as $hook) {
339-
call_user_func_array($hook->getAction(), $this->getArguments($hook));
340-
}
341-
}
342-
343-
return $this;
344-
}
345-
346321
/**
347322
* Get Arguments
348323
*

0 commit comments

Comments
 (0)