diff --git a/composer.lock b/composer.lock index 4a34a36..5615653 100644 --- a/composer.lock +++ b/composer.lock @@ -107,16 +107,16 @@ }, { "name": "utopia-php/framework", - "version": "1.0.0", + "version": "1.0.2", "source": { "type": "git", "url": "https://github.com/utopia-php/http.git", - "reference": "cc880ec41f7f163d4f9956fec26cc6be51b412cf" + "reference": "fc63ec61c720190a5ea5bb484c615145850951e6" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/http/zipball/cc880ec41f7f163d4f9956fec26cc6be51b412cf", - "reference": "cc880ec41f7f163d4f9956fec26cc6be51b412cf", + "url": "https://api.github.com/repos/utopia-php/http/zipball/fc63ec61c720190a5ea5bb484c615145850951e6", + "reference": "fc63ec61c720190a5ea5bb484c615145850951e6", "shasum": "" }, "require": { @@ -151,22 +151,22 @@ ], "support": { "issues": "https://github.com/utopia-php/http/issues", - "source": "https://github.com/utopia-php/http/tree/1.0.0" + "source": "https://github.com/utopia-php/http/tree/1.0.2" }, - "time": "2024-09-05T15:38:08+00:00" + "time": "2024-09-10T09:04:19+00:00" }, { "name": "utopia-php/servers", - "version": "0.1.0", + "version": "0.1.1", "source": { "type": "git", "url": "https://github.com/utopia-php/servers.git", - "reference": "7d9e4f364fb1ab1889fb89ca96eb9946467cb09c" + "reference": "fd5c8d32778f265256c1936372a071b944f5ba8a" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/servers/zipball/7d9e4f364fb1ab1889fb89ca96eb9946467cb09c", - "reference": "7d9e4f364fb1ab1889fb89ca96eb9946467cb09c", + "url": "https://api.github.com/repos/utopia-php/servers/zipball/fd5c8d32778f265256c1936372a071b944f5ba8a", + "reference": "fd5c8d32778f265256c1936372a071b944f5ba8a", "shasum": "" }, "require": { @@ -204,9 +204,9 @@ ], "support": { "issues": "https://github.com/utopia-php/servers/issues", - "source": "https://github.com/utopia-php/servers/tree/0.1.0" + "source": "https://github.com/utopia-php/servers/tree/0.1.1" }, - "time": "2024-08-08T14:31:39+00:00" + "time": "2024-09-06T02:25:56+00:00" } ], "packages-dev": [ @@ -408,16 +408,16 @@ }, { "name": "nikic/php-parser", - "version": "v5.1.0", + "version": "v5.3.1", "source": { "type": "git", "url": "https://github.com/nikic/PHP-Parser.git", - "reference": "683130c2ff8c2739f4822ff7ac5c873ec529abd1" + "reference": "8eea230464783aa9671db8eea6f8c6ac5285794b" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/nikic/PHP-Parser/zipball/683130c2ff8c2739f4822ff7ac5c873ec529abd1", - "reference": "683130c2ff8c2739f4822ff7ac5c873ec529abd1", + "url": "https://api.github.com/repos/nikic/PHP-Parser/zipball/8eea230464783aa9671db8eea6f8c6ac5285794b", + "reference": "8eea230464783aa9671db8eea6f8c6ac5285794b", "shasum": "" }, "require": { @@ -460,9 +460,9 @@ ], "support": { "issues": "https://github.com/nikic/PHP-Parser/issues", - "source": "https://github.com/nikic/PHP-Parser/tree/v5.1.0" + "source": "https://github.com/nikic/PHP-Parser/tree/v5.3.1" }, - "time": "2024-07-01T20:03:41+00:00" + "time": "2024-10-08T18:51:32+00:00" }, { "name": "phar-io/manifest", @@ -584,16 +584,16 @@ }, { "name": "phpstan/phpstan", - "version": "1.12.2", + "version": "1.12.7", "source": { "type": "git", "url": "https://github.com/phpstan/phpstan.git", - "reference": "0ca1c7bb55fca8fe6448f16fff0f311ccec960a1" + "reference": "dc2b9976bd8b0f84ec9b0e50cc35378551de7af0" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/phpstan/phpstan/zipball/0ca1c7bb55fca8fe6448f16fff0f311ccec960a1", - "reference": "0ca1c7bb55fca8fe6448f16fff0f311ccec960a1", + "url": "https://api.github.com/repos/phpstan/phpstan/zipball/dc2b9976bd8b0f84ec9b0e50cc35378551de7af0", + "reference": "dc2b9976bd8b0f84ec9b0e50cc35378551de7af0", "shasum": "" }, "require": { @@ -638,7 +638,7 @@ "type": "github" } ], - "time": "2024-09-05T16:09:28+00:00" + "time": "2024-10-18T11:12:07+00:00" }, { "name": "phpunit/php-code-coverage", @@ -961,16 +961,16 @@ }, { "name": "phpunit/phpunit", - "version": "9.6.20", + "version": "9.6.21", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/phpunit.git", - "reference": "49d7820565836236411f5dc002d16dd689cde42f" + "reference": "de6abf3b6f8dd955fac3caad3af7a9504e8c2ffa" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/49d7820565836236411f5dc002d16dd689cde42f", - "reference": "49d7820565836236411f5dc002d16dd689cde42f", + "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/de6abf3b6f8dd955fac3caad3af7a9504e8c2ffa", + "reference": "de6abf3b6f8dd955fac3caad3af7a9504e8c2ffa", "shasum": "" }, "require": { @@ -985,7 +985,7 @@ "phar-io/manifest": "^2.0.4", "phar-io/version": "^3.2.1", "php": ">=7.3", - "phpunit/php-code-coverage": "^9.2.31", + "phpunit/php-code-coverage": "^9.2.32", "phpunit/php-file-iterator": "^3.0.6", "phpunit/php-invoker": "^3.1.1", "phpunit/php-text-template": "^2.0.4", @@ -1044,7 +1044,7 @@ "support": { "issues": "https://github.com/sebastianbergmann/phpunit/issues", "security": "https://github.com/sebastianbergmann/phpunit/security/policy", - "source": "https://github.com/sebastianbergmann/phpunit/tree/9.6.20" + "source": "https://github.com/sebastianbergmann/phpunit/tree/9.6.21" }, "funding": [ { @@ -1060,7 +1060,7 @@ "type": "tidelift" } ], - "time": "2024-07-10T11:45:39+00:00" + "time": "2024-09-19T10:50:18+00:00" }, { "name": "sebastian/cli-parser", diff --git a/phpunit.xml b/phpunit.xml index 1b8f40d..e004d23 100644 --- a/phpunit.xml +++ b/phpunit.xml @@ -12,5 +12,8 @@ ./tests/Queue/E2E/Adapter + + ./tests/Queue/Unit + \ No newline at end of file diff --git a/src/Queue/Adapter/Swoole/Redis.php b/src/Queue/Adapter/Swoole/Redis.php index a9a43e5..d54a0ae 100644 --- a/src/Queue/Adapter/Swoole/Redis.php +++ b/src/Queue/Adapter/Swoole/Redis.php @@ -5,8 +5,9 @@ use Swoole\Database\RedisConfig; use Swoole\Database\RedisPool; use Utopia\Queue\Connection\Redis as ConnectionRedis; +use Utopia\Queue\Concurrency\Adapter; -class Redis extends ConnectionRedis +class Redis extends ConnectionRedis implements Adapter { protected RedisPool $pool; diff --git a/src/Queue/Concurrency/Adapter.php b/src/Queue/Concurrency/Adapter.php new file mode 100644 index 0000000..87bf1ff --- /dev/null +++ b/src/Queue/Concurrency/Adapter.php @@ -0,0 +1,11 @@ +getConcurrencyKey($message); + $value = $this->adapter->get($key); + if ($value === null) { + $this->adapter->set($key, "0"); + $value = 0; + } + return \intval($value) < $this->limit; + } + + public function startJob(Message $message): void + { + $key = $this->getConcurrencyKey($message); + $this->adapter->increment($key); + } + + public function finishJob(Message $message): void + { + $key = $this->getConcurrencyKey($message); + $this->adapter->decrement($key); + } + + abstract public function getConcurrencyKey(Message $message): string; +} diff --git a/src/Queue/Worker.php b/src/Queue/Worker.php index 6072986..a11818f 100644 --- a/src/Queue/Worker.php +++ b/src/Queue/Worker.php @@ -8,6 +8,7 @@ use Utopia\DI\Container; use Utopia\DI\Dependency; use Utopia\Servers\Base; +use Utopia\Queue\Concurrency\Manager; class Worker extends Base { @@ -146,6 +147,20 @@ public function start(): self return $this; } + protected ?Manager $concurrencyManager = null; + + /** + * Set the concurrency manager + * + * @param Manager $manager + * @return self + */ + public function setConcurrencyManager(Manager $manager): self + { + $this->concurrencyManager = $manager; + return $this; + } + protected function lifecycle(Job $job, Message $message, array $nextMessage, Container $context, Connection $connection): static { Console::info("[Job] Received Job ({$message->getPid()})."); @@ -154,21 +169,15 @@ protected function lifecycle(Job $job, Message $message, array $nextMessage, Con $connection->getConnection(); - /** - * Move Job to Jobs and it's PID to the processing list. - */ - $connection->setArray("{$this->adapter->namespace}.jobs.{$this->adapter->queue}.{$message->getPid()}", $nextMessage); - $connection->leftPush("{$this->adapter->namespace}.processing.{$this->adapter->queue}", $message->getPid()); - - /** - * Increment Total Jobs Received from Stats. - */ - $connection->increment("{$this->adapter->namespace}.stats.{$this->adapter->queue}.total"); - - /** - * Increment Processing Jobs from Stats. - */ - $connection->increment("{$this->adapter->namespace}.stats.{$this->adapter->queue}.processing"); + // if ($this->concurrencyManager) { + // if (!$this->concurrencyManager->canProcessJob($message)) { + // // If we can't process the job due to concurrency limits, put it back in the queue + // $connection->leftPush("{$this->adapter->namespace}.queue.{$this->adapter->queue}", json_encode($nextMessage)); + // Console::info("[Job] ({$message->getPid()}) postponed due to concurrency limits."); + // return $this; + // } + // $this->concurrencyManager->startJob($message); + // } try { foreach (self::$init as $hook) { // Global init hooks @@ -257,6 +266,10 @@ protected function lifecycle(Job $job, Message $message, array $nextMessage, Con } } } finally { + // if ($this->concurrencyManager) { + // $this->concurrencyManager->finishJob($message); + // } + /** * Remove Job from Processing. */ diff --git a/tests/Queue/E2E/Concurrency/ManagerTest.php b/tests/Queue/E2E/Concurrency/ManagerTest.php new file mode 100644 index 0000000..58883fb --- /dev/null +++ b/tests/Queue/E2E/Concurrency/ManagerTest.php @@ -0,0 +1,165 @@ +connection = new RedisConnection( + host: 'redis', // Docker service name + port: 6379, + user: '', + password: '' + ); + + // Create Redis adapter + $this->redisAdapter = new Redis('redis', 6379); + + // Create Manager with Redis adapter + $this->manager = new class($this->redisAdapter, 2) extends Manager { + public function getConcurrencyKey(Message $message): string + { + return "test_concurrent_{$message->getQueue()}"; + } + }; + } + + private function createMessage(int $id = 1, string $queue = 'default'): Message + { + return new Message([ + 'pid' => "test-pid-{$id}", + 'queue' => $queue, + 'timestamp' => time(), + 'payload' => ['id' => $id] + ]); + } + + public function testConcurrentJobProcessing(): void + { + $message1 = $this->createMessage(1); + $message2 = $this->createMessage(2); + $message3 = $this->createMessage(3); + + // First job should be allowed + $this->assertTrue($this->manager->canProcessJob($message1)); + $this->manager->startJob($message1); + + // Second job should be allowed + $this->assertTrue($this->manager->canProcessJob($message2)); + $this->manager->startJob($message2); + + // Third job should be rejected (at limit) + $this->assertFalse($this->manager->canProcessJob($message3)); + + // Finish first job + $this->manager->finishJob($message1); + + // Now third job should be allowed + $this->assertTrue($this->manager->canProcessJob($message3)); + + // Clean up + $this->manager->finishJob($message2); + } + + public function testMultipleQueuesIndependentConcurrency(): void + { + $queue1Message1 = $this->createMessage(1, 'queue1'); + $queue1Message2 = $this->createMessage(2, 'queue1'); + $queue2Message1 = $this->createMessage(3, 'queue2'); + $queue2Message2 = $this->createMessage(4, 'queue2'); + + // Queue 1 should allow two jobs + $this->assertTrue($this->manager->canProcessJob($queue1Message1)); + $this->manager->startJob($queue1Message1); + $this->assertTrue($this->manager->canProcessJob($queue1Message2)); + $this->manager->startJob($queue1Message2); + + // Queue 2 should also allow two jobs (independent limit) + $this->assertTrue($this->manager->canProcessJob($queue2Message1)); + $this->manager->startJob($queue2Message1); + $this->assertTrue($this->manager->canProcessJob($queue2Message2)); + $this->manager->startJob($queue2Message2); + + // Clean up + $this->manager->finishJob($queue1Message1); + $this->manager->finishJob($queue1Message2); + $this->manager->finishJob($queue2Message1); + $this->manager->finishJob($queue2Message2); + } + + public function testConcurrencyCounterRecovery(): void + { + $message = $this->createMessage(); + + // Simulate a crash by starting jobs without finishing them + $this->manager->startJob($message); + $this->manager->startJob($message); + + // Create new manager instance (simulating process restart) + $newManager = new class($this->redisAdapter, 2) extends Manager { + public function getConcurrencyKey(Message $message): string + { + return "test_concurrent_{$message->getQueue()}"; + } + }; + + // Counter should still be at 2 + $this->assertFalse($newManager->canProcessJob($message)); + + // Reset counter (simulating manual intervention) + $this->redisAdapter->set("test_concurrent_{$message->getQueue()}", "0"); + + // Should now allow new jobs + $this->assertTrue($newManager->canProcessJob($message)); + } + + public function testHighConcurrencyScenario(): void + { + // Create manager with higher limit + $highConcurrencyManager = new class($this->redisAdapter, 10) extends Manager { + public function getConcurrencyKey(Message $message): string + { + return "test_concurrent_{$message->getQueue()}"; + } + }; + + $processed = 0; + $messages = []; + + // Simulate processing 20 jobs with a limit of 10 concurrent jobs + for ($i = 0; $i < 20; $i++) { + $message = $this->createMessage($i); + $messages[] = $message; + + if ($highConcurrencyManager->canProcessJob($message)) { + $highConcurrencyManager->startJob($message); + $processed++; + + // Simulate completing some jobs to make room for others + if ($processed > 5) { + for ($j = 0; $j < 3; $j++) { + $highConcurrencyManager->finishJob($messages[$j]); + } + $processed -= 3; + } + } + } + + // Clean up remaining jobs + foreach ($messages as $message) { + $highConcurrencyManager->finishJob($message); + } + } +} \ No newline at end of file diff --git a/tests/Queue/Unit/Concurrency/ManagerTest.php b/tests/Queue/Unit/Concurrency/ManagerTest.php new file mode 100644 index 0000000..9599db1 --- /dev/null +++ b/tests/Queue/Unit/Concurrency/ManagerTest.php @@ -0,0 +1,158 @@ +mockAdapter = $this->createMock(Adapter::class); + $this->manager = new class($this->mockAdapter, 2) extends Manager { + public function getConcurrencyKey(Message $message): string + { + return 'test_key'; + } + }; + } + + private function createMessage(int $id = 1): Message + { + return new Message([ + 'pid' => "test-pid-{$id}", + 'queue' => 'test-queue', + 'timestamp' => time(), + 'payload' => ['id' => $id] + ]); + } + + public function testCanProcessJobWhenNoJobsRunning(): void + { + $message = $this->createMessage(); + + $this->mockAdapter + ->expects($this->once()) + ->method('get') + ->with('test_key') + ->willReturn(null); + + $this->mockAdapter + ->expects($this->once()) + ->method('set') + ->with('test_key', '0'); + + $this->assertTrue($this->manager->canProcessJob($message)); + } + + public function testCanProcessJobWhenBelowLimit(): void + { + $message = $this->createMessage(); + + $this->mockAdapter + ->expects($this->once()) + ->method('get') + ->with('test_key') + ->willReturn('1'); + + $this->assertTrue($this->manager->canProcessJob($message)); + } + + public function testCannotProcessJobWhenAtLimit(): void + { + $message = $this->createMessage(); + + $this->mockAdapter + ->expects($this->once()) + ->method('get') + ->with('test_key') + ->willReturn('2'); + + $this->assertFalse($this->manager->canProcessJob($message)); + } + + public function testStartJobIncrementsCounter(): void + { + $message = $this->createMessage(); + + $this->mockAdapter + ->expects($this->once()) + ->method('increment') + ->with('test_key'); + + $this->manager->startJob($message); + } + + public function testFinishJobDecrementsCounter(): void + { + $message = $this->createMessage(); + + $this->mockAdapter + ->expects($this->once()) + ->method('decrement') + ->with('test_key'); + + $this->manager->finishJob($message); + } + + public function testFullConcurrencyFlow(): void + { + $message1 = $this->createMessage(1); + $message2 = $this->createMessage(2); + $message3 = $this->createMessage(3); + + // Initial state - no jobs running + $this->mockAdapter + ->method('get') + ->willReturnOnConsecutiveCalls(null, '1', '2', '1'); + + $this->mockAdapter + ->expects($this->once()) + ->method('set') + ->with('test_key', '0'); + + // Should allow first job + $this->assertTrue($this->manager->canProcessJob($message1)); + $this->manager->startJob($message1); + + // Should allow second job + $this->assertTrue($this->manager->canProcessJob($message2)); + $this->manager->startJob($message2); + + // Should reject third job (at limit) + $this->assertFalse($this->manager->canProcessJob($message3)); + + // Complete first job + $this->manager->finishJob($message1); + + // Should now allow third job + $this->assertTrue($this->manager->canProcessJob($message3)); + } + + public function testWithCustomLimit(): void + { + // Create manager with limit of 3 + $manager = new class($this->mockAdapter, 3) extends Manager { + public function getConcurrencyKey(Message $message): string + { + return 'test_key'; + } + }; + + $message = $this->createMessage(); + + $this->mockAdapter + ->method('get') + ->willReturn('2'); + + // Should allow job when count is 2 and limit is 3 + $this->assertTrue($manager->canProcessJob($message)); + } +}