Skip to content

Commit 06b5ced

Browse files
authored
Merge pull request #46 from utopia-php/feat-timeout-overrides
Add timeout overrides
2 parents 0eccc55 + fe8d6dd commit 06b5ced

File tree

1 file changed

+15
-6
lines changed

1 file changed

+15
-6
lines changed

src/Queue/Broker/AMQP.php

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ public function __construct(
4242
private readonly ?string $password = null,
4343
private readonly string $vhost = '/',
4444
private readonly int $heartbeat = 0,
45+
private readonly float $connectTimeout = 3.0,
46+
private readonly float $readWriteTimeout = 3.0,
4547
) {
4648
}
4749

@@ -94,7 +96,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe
9496
$amqpMessage->nack(requeue: true);
9597
$errorCallback($message ?? null, $e);
9698
} catch (\Throwable $th) {
97-
$amqpMessage->nack(requeue: false);
99+
$amqpMessage->nack();
98100
$errorCallback($message ?? null, $th);
99101
}
100102
};
@@ -129,9 +131,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe
129131

130132
public function close(): void
131133
{
132-
if ($this->channel) {
133-
$this->channel->getConnection()?->close();
134-
}
134+
$this->channel?->getConnection()?->close();
135135
}
136136

137137
public function enqueue(Queue $queue, array $payload): bool
@@ -184,7 +184,16 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int
184184
private function withChannel(callable $callback): void
185185
{
186186
$createChannel = function (): AMQPChannel {
187-
$connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password, $this->vhost, heartbeat: $this->heartbeat);
187+
$connection = new AMQPStreamConnection(
188+
$this->host,
189+
$this->port,
190+
$this->user,
191+
$this->password,
192+
$this->vhost,
193+
connection_timeout: $this->connectTimeout,
194+
read_write_timeout: $this->readWriteTimeout,
195+
heartbeat: $this->heartbeat,
196+
);
188197
if (is_callable($this->connectionConfigHook)) {
189198
call_user_func($this->connectionConfigHook, $connection);
190199
}
@@ -201,7 +210,7 @@ private function withChannel(callable $callback): void
201210

202211
try {
203212
$callback($this->channel);
204-
} catch (\Throwable $th) {
213+
} catch (\Throwable) {
205214
// createChannel() might throw, in that case set the channel to `null` first.
206215
$this->channel = null;
207216
// try creating a new connection once, if this still fails, throw the error

0 commit comments

Comments
 (0)