diff --git a/README.md b/README.md index 4c344d0..0d0bbf0 100644 --- a/README.md +++ b/README.md @@ -98,6 +98,8 @@ new Snidel([ 'logger' => $monolog, // Please refer to `Using custom queue` 'driver' => $driver, + // a polling duration(in seconds) of queueing + 'pollingDuration' => 1, ]); ``` diff --git a/src/Snidel.php b/src/Snidel.php index e25adc3..75673fd 100644 --- a/src/Snidel.php +++ b/src/Snidel.php @@ -1,5 +1,5 @@ 5, 'logger' => null, 'driver' => null, + // a polling duration(in seconds) of queueing + 'pollingDuration' => 1, ]; $this->params = array_merge($default, $params); diff --git a/src/Snidel/Fork/Container.php b/src/Snidel/Fork/Container.php index 92c17ca..27e5de0 100644 --- a/src/Snidel/Fork/Container.php +++ b/src/Snidel/Fork/Container.php @@ -1,5 +1,5 @@ config->get('driver')); + $worker = new Worker($process, $this->config->get('driver'), $this->config->get('pollingDuration')); if (getmypid() === $this->master->getPid()) { // master @@ -267,7 +267,7 @@ public function results() { for (; $this->queuedCount() > $this->dequeuedCount();) { for (;;) { - if ($envelope = $this->resultQueue->dequeue()) { + if ($envelope = $this->resultQueue->dequeue($this->config->get('pollingDuration'))) { $this->dequeuedCount++; break; } diff --git a/src/Snidel/Worker.php b/src/Snidel/Worker.php index 2568ca7..0a11070 100644 --- a/src/Snidel/Worker.php +++ b/src/Snidel/Worker.php @@ -1,10 +1,10 @@ pcntl = new Pcntl(); $this->process = $process; $this->factory = $this->createFactory($driver); - $router = new SimpleRouter(); - $router->add('Task', $this); - $this->consumer = $this->createConsumer($router); $this->producer = $this->createProducer($this->factory); + + /* + * Flat-file driver may causes E_WARNING (mkdir(): File exists) in race condition. + * Suppress the warning as it isn't matter and we should progress this task. + */ + if ($driver instanceof \Bernard\Driver\FlatFileDriver) { + $this->taskQueue = @$this->factory->create('task'); + } else { + $this->taskQueue = $this->factory->create('task'); + } + + $this->pollingDuration = $pollingDuration; } /** @@ -62,7 +75,15 @@ public function getPid() */ public function run() { - $this->consumer->consume($this->factory->create('task')); + while (true) { + if ($envelope = $this->taskQueue->dequeue($this->pollingDuration)) { + $this->task($envelope->getMessage()); + } + // We need to insert some statements here as condition expressions are not tickable. + // Worker process can't receive signals sent from Master if there's no statements here. + // @see http://jp2.php.net/manual/en/control-structures.declare.php#control-structures.declare.ticks + usleep(1); + } } /** diff --git a/tests/Snidel/ActiveWorkerSetTest.php b/tests/Snidel/ActiveWorkerSetTest.php index 4a7db80..ddc5047 100644 --- a/tests/Snidel/ActiveWorkerSetTest.php +++ b/tests/Snidel/ActiveWorkerSetTest.php @@ -70,7 +70,7 @@ public function terminate() { $driver = (new Config())->get('driver'); $worker1 = $this->getMockBuilder('\Ackintosh\Snidel\Worker') - ->setConstructorArgs([$this->makeProcess(1), $driver]) + ->setConstructorArgs([$this->makeProcess(1), $driver, 1]) ->setMethods(['terminate']) ->getMock(); $worker1->expects($this->once()) @@ -78,7 +78,7 @@ public function terminate() ->with(SIGTERM); $worker2 = $this->getMockBuilder('\Ackintosh\Snidel\Worker') - ->setConstructorArgs([$this->makeProcess(2), $driver]) + ->setConstructorArgs([$this->makeProcess(2), $driver, 1]) ->setMethods(['terminate']) ->getMock(); $worker2->expects($this->once()) diff --git a/tests/Snidel/SnidelTest.php b/tests/Snidel/SnidelTest.php index ca80ebe..6481b54 100644 --- a/tests/Snidel/SnidelTest.php +++ b/tests/Snidel/SnidelTest.php @@ -65,7 +65,11 @@ public function passMultipleArguments() */ public function concurrency() { - $snidel = new Snidel(['concurrency' => 3]); + $snidel = new Snidel([ + 'concurrency' => 3, + // in order to minify the delay time due to the issue of bernard's polling, specifying a small number. + 'pollingDuration' => 0.5, + ]); $start = time(); $snidel->process('sleepsTwoSeconds'); diff --git a/tests/TestCase.php b/tests/TestCase.php index dfacd59..2561a80 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -61,7 +61,8 @@ protected function makeWorker($pid = null) return new Worker( new Process($pid), - (new Config())->get('driver') + (new Config())->get('driver'), + (new Config())->get('pollingDuration') ); } }