diff --git a/composer.lock b/composer.lock
index 4a34a36..5615653 100644
--- a/composer.lock
+++ b/composer.lock
@@ -107,16 +107,16 @@
},
{
"name": "utopia-php/framework",
- "version": "1.0.0",
+ "version": "1.0.2",
"source": {
"type": "git",
"url": "https://github.com/utopia-php/http.git",
- "reference": "cc880ec41f7f163d4f9956fec26cc6be51b412cf"
+ "reference": "fc63ec61c720190a5ea5bb484c615145850951e6"
},
"dist": {
"type": "zip",
- "url": "https://api.github.com/repos/utopia-php/http/zipball/cc880ec41f7f163d4f9956fec26cc6be51b412cf",
- "reference": "cc880ec41f7f163d4f9956fec26cc6be51b412cf",
+ "url": "https://api.github.com/repos/utopia-php/http/zipball/fc63ec61c720190a5ea5bb484c615145850951e6",
+ "reference": "fc63ec61c720190a5ea5bb484c615145850951e6",
"shasum": ""
},
"require": {
@@ -151,22 +151,22 @@
],
"support": {
"issues": "https://github.com/utopia-php/http/issues",
- "source": "https://github.com/utopia-php/http/tree/1.0.0"
+ "source": "https://github.com/utopia-php/http/tree/1.0.2"
},
- "time": "2024-09-05T15:38:08+00:00"
+ "time": "2024-09-10T09:04:19+00:00"
},
{
"name": "utopia-php/servers",
- "version": "0.1.0",
+ "version": "0.1.1",
"source": {
"type": "git",
"url": "https://github.com/utopia-php/servers.git",
- "reference": "7d9e4f364fb1ab1889fb89ca96eb9946467cb09c"
+ "reference": "fd5c8d32778f265256c1936372a071b944f5ba8a"
},
"dist": {
"type": "zip",
- "url": "https://api.github.com/repos/utopia-php/servers/zipball/7d9e4f364fb1ab1889fb89ca96eb9946467cb09c",
- "reference": "7d9e4f364fb1ab1889fb89ca96eb9946467cb09c",
+ "url": "https://api.github.com/repos/utopia-php/servers/zipball/fd5c8d32778f265256c1936372a071b944f5ba8a",
+ "reference": "fd5c8d32778f265256c1936372a071b944f5ba8a",
"shasum": ""
},
"require": {
@@ -204,9 +204,9 @@
],
"support": {
"issues": "https://github.com/utopia-php/servers/issues",
- "source": "https://github.com/utopia-php/servers/tree/0.1.0"
+ "source": "https://github.com/utopia-php/servers/tree/0.1.1"
},
- "time": "2024-08-08T14:31:39+00:00"
+ "time": "2024-09-06T02:25:56+00:00"
}
],
"packages-dev": [
@@ -408,16 +408,16 @@
},
{
"name": "nikic/php-parser",
- "version": "v5.1.0",
+ "version": "v5.3.1",
"source": {
"type": "git",
"url": "https://github.com/nikic/PHP-Parser.git",
- "reference": "683130c2ff8c2739f4822ff7ac5c873ec529abd1"
+ "reference": "8eea230464783aa9671db8eea6f8c6ac5285794b"
},
"dist": {
"type": "zip",
- "url": "https://api.github.com/repos/nikic/PHP-Parser/zipball/683130c2ff8c2739f4822ff7ac5c873ec529abd1",
- "reference": "683130c2ff8c2739f4822ff7ac5c873ec529abd1",
+ "url": "https://api.github.com/repos/nikic/PHP-Parser/zipball/8eea230464783aa9671db8eea6f8c6ac5285794b",
+ "reference": "8eea230464783aa9671db8eea6f8c6ac5285794b",
"shasum": ""
},
"require": {
@@ -460,9 +460,9 @@
],
"support": {
"issues": "https://github.com/nikic/PHP-Parser/issues",
- "source": "https://github.com/nikic/PHP-Parser/tree/v5.1.0"
+ "source": "https://github.com/nikic/PHP-Parser/tree/v5.3.1"
},
- "time": "2024-07-01T20:03:41+00:00"
+ "time": "2024-10-08T18:51:32+00:00"
},
{
"name": "phar-io/manifest",
@@ -584,16 +584,16 @@
},
{
"name": "phpstan/phpstan",
- "version": "1.12.2",
+ "version": "1.12.7",
"source": {
"type": "git",
"url": "https://github.com/phpstan/phpstan.git",
- "reference": "0ca1c7bb55fca8fe6448f16fff0f311ccec960a1"
+ "reference": "dc2b9976bd8b0f84ec9b0e50cc35378551de7af0"
},
"dist": {
"type": "zip",
- "url": "https://api.github.com/repos/phpstan/phpstan/zipball/0ca1c7bb55fca8fe6448f16fff0f311ccec960a1",
- "reference": "0ca1c7bb55fca8fe6448f16fff0f311ccec960a1",
+ "url": "https://api.github.com/repos/phpstan/phpstan/zipball/dc2b9976bd8b0f84ec9b0e50cc35378551de7af0",
+ "reference": "dc2b9976bd8b0f84ec9b0e50cc35378551de7af0",
"shasum": ""
},
"require": {
@@ -638,7 +638,7 @@
"type": "github"
}
],
- "time": "2024-09-05T16:09:28+00:00"
+ "time": "2024-10-18T11:12:07+00:00"
},
{
"name": "phpunit/php-code-coverage",
@@ -961,16 +961,16 @@
},
{
"name": "phpunit/phpunit",
- "version": "9.6.20",
+ "version": "9.6.21",
"source": {
"type": "git",
"url": "https://github.com/sebastianbergmann/phpunit.git",
- "reference": "49d7820565836236411f5dc002d16dd689cde42f"
+ "reference": "de6abf3b6f8dd955fac3caad3af7a9504e8c2ffa"
},
"dist": {
"type": "zip",
- "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/49d7820565836236411f5dc002d16dd689cde42f",
- "reference": "49d7820565836236411f5dc002d16dd689cde42f",
+ "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/de6abf3b6f8dd955fac3caad3af7a9504e8c2ffa",
+ "reference": "de6abf3b6f8dd955fac3caad3af7a9504e8c2ffa",
"shasum": ""
},
"require": {
@@ -985,7 +985,7 @@
"phar-io/manifest": "^2.0.4",
"phar-io/version": "^3.2.1",
"php": ">=7.3",
- "phpunit/php-code-coverage": "^9.2.31",
+ "phpunit/php-code-coverage": "^9.2.32",
"phpunit/php-file-iterator": "^3.0.6",
"phpunit/php-invoker": "^3.1.1",
"phpunit/php-text-template": "^2.0.4",
@@ -1044,7 +1044,7 @@
"support": {
"issues": "https://github.com/sebastianbergmann/phpunit/issues",
"security": "https://github.com/sebastianbergmann/phpunit/security/policy",
- "source": "https://github.com/sebastianbergmann/phpunit/tree/9.6.20"
+ "source": "https://github.com/sebastianbergmann/phpunit/tree/9.6.21"
},
"funding": [
{
@@ -1060,7 +1060,7 @@
"type": "tidelift"
}
],
- "time": "2024-07-10T11:45:39+00:00"
+ "time": "2024-09-19T10:50:18+00:00"
},
{
"name": "sebastian/cli-parser",
diff --git a/phpunit.xml b/phpunit.xml
index 1b8f40d..e004d23 100644
--- a/phpunit.xml
+++ b/phpunit.xml
@@ -12,5 +12,8 @@
./tests/Queue/E2E/Adapter
+
+ ./tests/Queue/Unit
+
\ No newline at end of file
diff --git a/src/Queue/Adapter/Swoole/Redis.php b/src/Queue/Adapter/Swoole/Redis.php
index a9a43e5..d54a0ae 100644
--- a/src/Queue/Adapter/Swoole/Redis.php
+++ b/src/Queue/Adapter/Swoole/Redis.php
@@ -5,8 +5,9 @@
use Swoole\Database\RedisConfig;
use Swoole\Database\RedisPool;
use Utopia\Queue\Connection\Redis as ConnectionRedis;
+use Utopia\Queue\Concurrency\Adapter;
-class Redis extends ConnectionRedis
+class Redis extends ConnectionRedis implements Adapter
{
protected RedisPool $pool;
diff --git a/src/Queue/Concurrency/Adapter.php b/src/Queue/Concurrency/Adapter.php
new file mode 100644
index 0000000..87bf1ff
--- /dev/null
+++ b/src/Queue/Concurrency/Adapter.php
@@ -0,0 +1,11 @@
+getConcurrencyKey($message);
+ $value = $this->adapter->get($key);
+ if ($value === null) {
+ $this->adapter->set($key, "0");
+ $value = 0;
+ }
+ return \intval($value) < $this->limit;
+ }
+
+ public function startJob(Message $message): void
+ {
+ $key = $this->getConcurrencyKey($message);
+ $this->adapter->increment($key);
+ }
+
+ public function finishJob(Message $message): void
+ {
+ $key = $this->getConcurrencyKey($message);
+ $this->adapter->decrement($key);
+ }
+
+ abstract public function getConcurrencyKey(Message $message): string;
+}
diff --git a/src/Queue/Worker.php b/src/Queue/Worker.php
index 6072986..a11818f 100644
--- a/src/Queue/Worker.php
+++ b/src/Queue/Worker.php
@@ -8,6 +8,7 @@
use Utopia\DI\Container;
use Utopia\DI\Dependency;
use Utopia\Servers\Base;
+use Utopia\Queue\Concurrency\Manager;
class Worker extends Base
{
@@ -146,6 +147,20 @@ public function start(): self
return $this;
}
+ protected ?Manager $concurrencyManager = null;
+
+ /**
+ * Set the concurrency manager
+ *
+ * @param Manager $manager
+ * @return self
+ */
+ public function setConcurrencyManager(Manager $manager): self
+ {
+ $this->concurrencyManager = $manager;
+ return $this;
+ }
+
protected function lifecycle(Job $job, Message $message, array $nextMessage, Container $context, Connection $connection): static
{
Console::info("[Job] Received Job ({$message->getPid()}).");
@@ -154,21 +169,15 @@ protected function lifecycle(Job $job, Message $message, array $nextMessage, Con
$connection->getConnection();
- /**
- * Move Job to Jobs and it's PID to the processing list.
- */
- $connection->setArray("{$this->adapter->namespace}.jobs.{$this->adapter->queue}.{$message->getPid()}", $nextMessage);
- $connection->leftPush("{$this->adapter->namespace}.processing.{$this->adapter->queue}", $message->getPid());
-
- /**
- * Increment Total Jobs Received from Stats.
- */
- $connection->increment("{$this->adapter->namespace}.stats.{$this->adapter->queue}.total");
-
- /**
- * Increment Processing Jobs from Stats.
- */
- $connection->increment("{$this->adapter->namespace}.stats.{$this->adapter->queue}.processing");
+ // if ($this->concurrencyManager) {
+ // if (!$this->concurrencyManager->canProcessJob($message)) {
+ // // If we can't process the job due to concurrency limits, put it back in the queue
+ // $connection->leftPush("{$this->adapter->namespace}.queue.{$this->adapter->queue}", json_encode($nextMessage));
+ // Console::info("[Job] ({$message->getPid()}) postponed due to concurrency limits.");
+ // return $this;
+ // }
+ // $this->concurrencyManager->startJob($message);
+ // }
try {
foreach (self::$init as $hook) { // Global init hooks
@@ -257,6 +266,10 @@ protected function lifecycle(Job $job, Message $message, array $nextMessage, Con
}
}
} finally {
+ // if ($this->concurrencyManager) {
+ // $this->concurrencyManager->finishJob($message);
+ // }
+
/**
* Remove Job from Processing.
*/
diff --git a/tests/Queue/E2E/Concurrency/ManagerTest.php b/tests/Queue/E2E/Concurrency/ManagerTest.php
new file mode 100644
index 0000000..58883fb
--- /dev/null
+++ b/tests/Queue/E2E/Concurrency/ManagerTest.php
@@ -0,0 +1,165 @@
+connection = new RedisConnection(
+ host: 'redis', // Docker service name
+ port: 6379,
+ user: '',
+ password: ''
+ );
+
+ // Create Redis adapter
+ $this->redisAdapter = new Redis('redis', 6379);
+
+ // Create Manager with Redis adapter
+ $this->manager = new class($this->redisAdapter, 2) extends Manager {
+ public function getConcurrencyKey(Message $message): string
+ {
+ return "test_concurrent_{$message->getQueue()}";
+ }
+ };
+ }
+
+ private function createMessage(int $id = 1, string $queue = 'default'): Message
+ {
+ return new Message([
+ 'pid' => "test-pid-{$id}",
+ 'queue' => $queue,
+ 'timestamp' => time(),
+ 'payload' => ['id' => $id]
+ ]);
+ }
+
+ public function testConcurrentJobProcessing(): void
+ {
+ $message1 = $this->createMessage(1);
+ $message2 = $this->createMessage(2);
+ $message3 = $this->createMessage(3);
+
+ // First job should be allowed
+ $this->assertTrue($this->manager->canProcessJob($message1));
+ $this->manager->startJob($message1);
+
+ // Second job should be allowed
+ $this->assertTrue($this->manager->canProcessJob($message2));
+ $this->manager->startJob($message2);
+
+ // Third job should be rejected (at limit)
+ $this->assertFalse($this->manager->canProcessJob($message3));
+
+ // Finish first job
+ $this->manager->finishJob($message1);
+
+ // Now third job should be allowed
+ $this->assertTrue($this->manager->canProcessJob($message3));
+
+ // Clean up
+ $this->manager->finishJob($message2);
+ }
+
+ public function testMultipleQueuesIndependentConcurrency(): void
+ {
+ $queue1Message1 = $this->createMessage(1, 'queue1');
+ $queue1Message2 = $this->createMessage(2, 'queue1');
+ $queue2Message1 = $this->createMessage(3, 'queue2');
+ $queue2Message2 = $this->createMessage(4, 'queue2');
+
+ // Queue 1 should allow two jobs
+ $this->assertTrue($this->manager->canProcessJob($queue1Message1));
+ $this->manager->startJob($queue1Message1);
+ $this->assertTrue($this->manager->canProcessJob($queue1Message2));
+ $this->manager->startJob($queue1Message2);
+
+ // Queue 2 should also allow two jobs (independent limit)
+ $this->assertTrue($this->manager->canProcessJob($queue2Message1));
+ $this->manager->startJob($queue2Message1);
+ $this->assertTrue($this->manager->canProcessJob($queue2Message2));
+ $this->manager->startJob($queue2Message2);
+
+ // Clean up
+ $this->manager->finishJob($queue1Message1);
+ $this->manager->finishJob($queue1Message2);
+ $this->manager->finishJob($queue2Message1);
+ $this->manager->finishJob($queue2Message2);
+ }
+
+ public function testConcurrencyCounterRecovery(): void
+ {
+ $message = $this->createMessage();
+
+ // Simulate a crash by starting jobs without finishing them
+ $this->manager->startJob($message);
+ $this->manager->startJob($message);
+
+ // Create new manager instance (simulating process restart)
+ $newManager = new class($this->redisAdapter, 2) extends Manager {
+ public function getConcurrencyKey(Message $message): string
+ {
+ return "test_concurrent_{$message->getQueue()}";
+ }
+ };
+
+ // Counter should still be at 2
+ $this->assertFalse($newManager->canProcessJob($message));
+
+ // Reset counter (simulating manual intervention)
+ $this->redisAdapter->set("test_concurrent_{$message->getQueue()}", "0");
+
+ // Should now allow new jobs
+ $this->assertTrue($newManager->canProcessJob($message));
+ }
+
+ public function testHighConcurrencyScenario(): void
+ {
+ // Create manager with higher limit
+ $highConcurrencyManager = new class($this->redisAdapter, 10) extends Manager {
+ public function getConcurrencyKey(Message $message): string
+ {
+ return "test_concurrent_{$message->getQueue()}";
+ }
+ };
+
+ $processed = 0;
+ $messages = [];
+
+ // Simulate processing 20 jobs with a limit of 10 concurrent jobs
+ for ($i = 0; $i < 20; $i++) {
+ $message = $this->createMessage($i);
+ $messages[] = $message;
+
+ if ($highConcurrencyManager->canProcessJob($message)) {
+ $highConcurrencyManager->startJob($message);
+ $processed++;
+
+ // Simulate completing some jobs to make room for others
+ if ($processed > 5) {
+ for ($j = 0; $j < 3; $j++) {
+ $highConcurrencyManager->finishJob($messages[$j]);
+ }
+ $processed -= 3;
+ }
+ }
+ }
+
+ // Clean up remaining jobs
+ foreach ($messages as $message) {
+ $highConcurrencyManager->finishJob($message);
+ }
+ }
+}
\ No newline at end of file
diff --git a/tests/Queue/Unit/Concurrency/ManagerTest.php b/tests/Queue/Unit/Concurrency/ManagerTest.php
new file mode 100644
index 0000000..9599db1
--- /dev/null
+++ b/tests/Queue/Unit/Concurrency/ManagerTest.php
@@ -0,0 +1,158 @@
+mockAdapter = $this->createMock(Adapter::class);
+ $this->manager = new class($this->mockAdapter, 2) extends Manager {
+ public function getConcurrencyKey(Message $message): string
+ {
+ return 'test_key';
+ }
+ };
+ }
+
+ private function createMessage(int $id = 1): Message
+ {
+ return new Message([
+ 'pid' => "test-pid-{$id}",
+ 'queue' => 'test-queue',
+ 'timestamp' => time(),
+ 'payload' => ['id' => $id]
+ ]);
+ }
+
+ public function testCanProcessJobWhenNoJobsRunning(): void
+ {
+ $message = $this->createMessage();
+
+ $this->mockAdapter
+ ->expects($this->once())
+ ->method('get')
+ ->with('test_key')
+ ->willReturn(null);
+
+ $this->mockAdapter
+ ->expects($this->once())
+ ->method('set')
+ ->with('test_key', '0');
+
+ $this->assertTrue($this->manager->canProcessJob($message));
+ }
+
+ public function testCanProcessJobWhenBelowLimit(): void
+ {
+ $message = $this->createMessage();
+
+ $this->mockAdapter
+ ->expects($this->once())
+ ->method('get')
+ ->with('test_key')
+ ->willReturn('1');
+
+ $this->assertTrue($this->manager->canProcessJob($message));
+ }
+
+ public function testCannotProcessJobWhenAtLimit(): void
+ {
+ $message = $this->createMessage();
+
+ $this->mockAdapter
+ ->expects($this->once())
+ ->method('get')
+ ->with('test_key')
+ ->willReturn('2');
+
+ $this->assertFalse($this->manager->canProcessJob($message));
+ }
+
+ public function testStartJobIncrementsCounter(): void
+ {
+ $message = $this->createMessage();
+
+ $this->mockAdapter
+ ->expects($this->once())
+ ->method('increment')
+ ->with('test_key');
+
+ $this->manager->startJob($message);
+ }
+
+ public function testFinishJobDecrementsCounter(): void
+ {
+ $message = $this->createMessage();
+
+ $this->mockAdapter
+ ->expects($this->once())
+ ->method('decrement')
+ ->with('test_key');
+
+ $this->manager->finishJob($message);
+ }
+
+ public function testFullConcurrencyFlow(): void
+ {
+ $message1 = $this->createMessage(1);
+ $message2 = $this->createMessage(2);
+ $message3 = $this->createMessage(3);
+
+ // Initial state - no jobs running
+ $this->mockAdapter
+ ->method('get')
+ ->willReturnOnConsecutiveCalls(null, '1', '2', '1');
+
+ $this->mockAdapter
+ ->expects($this->once())
+ ->method('set')
+ ->with('test_key', '0');
+
+ // Should allow first job
+ $this->assertTrue($this->manager->canProcessJob($message1));
+ $this->manager->startJob($message1);
+
+ // Should allow second job
+ $this->assertTrue($this->manager->canProcessJob($message2));
+ $this->manager->startJob($message2);
+
+ // Should reject third job (at limit)
+ $this->assertFalse($this->manager->canProcessJob($message3));
+
+ // Complete first job
+ $this->manager->finishJob($message1);
+
+ // Should now allow third job
+ $this->assertTrue($this->manager->canProcessJob($message3));
+ }
+
+ public function testWithCustomLimit(): void
+ {
+ // Create manager with limit of 3
+ $manager = new class($this->mockAdapter, 3) extends Manager {
+ public function getConcurrencyKey(Message $message): string
+ {
+ return 'test_key';
+ }
+ };
+
+ $message = $this->createMessage();
+
+ $this->mockAdapter
+ ->method('get')
+ ->willReturn('2');
+
+ // Should allow job when count is 2 and limit is 3
+ $this->assertTrue($manager->canProcessJob($message));
+ }
+}