Skip to content
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ new Snidel([
'logger' => $monolog,
// Please refer to `Using custom queue`
'driver' => $driver,
// a polling duration(in seconds) of queueing
'pollingDuration' => 1,
]);
```

Expand Down
2 changes: 1 addition & 1 deletion src/Snidel.php
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<?php
declare(ticks = 1);
declare(ticks=1);

namespace Ackintosh;

Expand Down
2 changes: 2 additions & 0 deletions src/Snidel/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ public function __construct($params = [])
'concurrency' => 5,
'logger' => null,
'driver' => null,
// a polling duration(in seconds) of queueing
'pollingDuration' => 1,
];

$this->params = array_merge($default, $params);
Expand Down
6 changes: 3 additions & 3 deletions src/Snidel/Fork/Container.php
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<?php
declare(ticks = 1);
declare(ticks=1);
namespace Ackintosh\Snidel\Fork;

use Ackintosh\Snidel\ActiveWorkerSet;
Expand Down Expand Up @@ -186,7 +186,7 @@ private function forkWorker()
throw new \RuntimeException($message);
}

$worker = new Worker($process, $this->config->get('driver'));
$worker = new Worker($process, $this->config->get('driver'), $this->config->get('pollingDuration'));

if (getmypid() === $this->master->getPid()) {
// master
Expand Down Expand Up @@ -267,7 +267,7 @@ public function results()
{
for (; $this->queuedCount() > $this->dequeuedCount();) {
for (;;) {
if ($envelope = $this->resultQueue->dequeue()) {
if ($envelope = $this->resultQueue->dequeue($this->config->get('pollingDuration'))) {
$this->dequeuedCount++;
break;
}
Expand Down
39 changes: 30 additions & 9 deletions src/Snidel/Worker.php
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
<?php
declare(ticks=1);
namespace Ackintosh\Snidel;

use Ackintosh\Snidel\Result\Result;
use Ackintosh\Snidel\Task\Task;
use Ackintosh\Snidel\Traits\Queueing;
use Bernard\Router\SimpleRouter;

class Worker
{
Expand All @@ -25,26 +25,39 @@ class Worker
/** @var \Bernard\QueueFactory\PersistentFactory */
private $factory;

/** @var \Bernard\Consumer */
private $consumer;

/** @var \Bernard\Producer */
private $producer;

/** @var \Bernard\Queue */
private $taskQueue;

/** @var int */
private $pollingDuration;

/**
* @param \Ackintosh\Snidel\Fork\Process $process
* @param \Bernard\Driver $driver
* @param int $pollingDuration
*/
public function __construct($process, $driver)
public function __construct($process, $driver, $pollingDuration)
{
$this->pcntl = new Pcntl();
$this->process = $process;

$this->factory = $this->createFactory($driver);
$router = new SimpleRouter();
$router->add('Task', $this);
$this->consumer = $this->createConsumer($router);
$this->producer = $this->createProducer($this->factory);

/*
* Flat-file driver may causes E_WARNING (mkdir(): File exists) in race condition.
* Suppress the warning as it isn't matter and we should progress this task.
*/
if ($driver instanceof \Bernard\Driver\FlatFileDriver) {
$this->taskQueue = @$this->factory->create('task');
} else {
$this->taskQueue = $this->factory->create('task');
}

$this->pollingDuration = $pollingDuration;
}

/**
Expand All @@ -62,7 +75,15 @@ public function getPid()
*/
public function run()
{
$this->consumer->consume($this->factory->create('task'));
while (true) {
if ($envelope = $this->taskQueue->dequeue($this->pollingDuration)) {
$this->task($envelope->getMessage());
}
// We need to insert some statements here as condition expressions are not tickable.
// Worker process can't receive signals sent from Master if there's no statements here.
// @see http://jp2.php.net/manual/en/control-structures.declare.php#control-structures.declare.ticks
usleep(1);
}
}

/**
Expand Down
4 changes: 2 additions & 2 deletions tests/Snidel/ActiveWorkerSetTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,15 @@ public function terminate()
{
$driver = (new Config())->get('driver');
$worker1 = $this->getMockBuilder('\Ackintosh\Snidel\Worker')
->setConstructorArgs([$this->makeProcess(1), $driver])
->setConstructorArgs([$this->makeProcess(1), $driver, 1])
->setMethods(['terminate'])
->getMock();
$worker1->expects($this->once())
->method('terminate')
->with(SIGTERM);

$worker2 = $this->getMockBuilder('\Ackintosh\Snidel\Worker')
->setConstructorArgs([$this->makeProcess(2), $driver])
->setConstructorArgs([$this->makeProcess(2), $driver, 1])
->setMethods(['terminate'])
->getMock();
$worker2->expects($this->once())
Expand Down
6 changes: 5 additions & 1 deletion tests/Snidel/SnidelTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ public function passMultipleArguments()
*/
public function concurrency()
{
$snidel = new Snidel(['concurrency' => 3]);
$snidel = new Snidel([
'concurrency' => 3,
// in order to minify the delay time due to the issue of bernard's polling, specifying a small number.
'pollingDuration' => 0.5,
]);

$start = time();
$snidel->process('sleepsTwoSeconds');
Expand Down
3 changes: 2 additions & 1 deletion tests/TestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ protected function makeWorker($pid = null)

return new Worker(
new Process($pid),
(new Config())->get('driver')
(new Config())->get('driver'),
(new Config())->get('pollingDuration')
);
}
}