Skip to content

Commit 1a0af09

Browse files
committed
Added Registry, and made some progress towards making persistent connections equivalent to their non-persistent counterparts.
1 parent 12ea824 commit 1a0af09

File tree

8 files changed

+369
-16
lines changed

8 files changed

+369
-16
lines changed

nbproject/private/private.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,13 @@
22
<project-private xmlns="http://www.netbeans.org/ns/project-private/1">
33
<coverage xmlns="http://www.netbeans.org/ns/code-coverage/1" enabled="true"/>
44
<editor-bookmarks xmlns="http://www.netbeans.org/ns/editor-bookmarks/1"/>
5+
<open-files xmlns="http://www.netbeans.org/ns/projectui-open-files/1">
6+
<file>file:/D:/Projects/PEAR2_Net_RouterOS.git/src/PEAR2/Net/RouterOS/SocketException.php</file>
7+
<file>file:/D:/Projects/PEAR2_Net_RouterOS.git/src/PEAR2/Net/RouterOS/Client.php</file>
8+
<file>file:/D:/Projects/PEAR2_Net_RouterOS.git/src/PEAR2/Net/RouterOS/Communicator.php</file>
9+
<file>file:/D:/Projects/PEAR2_Net_RouterOS.git/src/PEAR2/Net/RouterOS/Query.php</file>
10+
<file>file:/D:/Projects/PEAR2_Net_RouterOS.git/src/PEAR2/Net/RouterOS/Request.php</file>
11+
<file>file:/D:/Projects/PEAR2_Net_RouterOS.git/src/PEAR2/Net/RouterOS/Registry.php</file>
12+
<file>file:/D:/Projects/PEAR2_Net_RouterOS.git/src/PEAR2/Net/RouterOS/PersistentRegistry.php</file>
13+
</open-files>
514
</project-private>

src/PEAR2/Net/RouterOS/Client.php

Lines changed: 49 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,10 @@ class Client
8282
protected $callbacks = array();
8383

8484
/**
85-
* @var SHM Handler for shared response buffer. Remains NULL when the
86-
* connection is not a persistent one.
85+
* @var Registry A registry for the operations. Particularly helpful at
86+
* persistent connections.
8787
*/
88-
protected $shmHandler = null;
88+
protected $registry = null;
8989

9090
/**
9191
* @var bool Whether to stream future responses.
@@ -118,20 +118,21 @@ public function __construct($host, $username, $password = '', $port = 8728,
118118
$host, $port, $persist, $timeout, $username, $context
119119
);
120120
//Login the user if necessary
121-
if ($this->com->getTransmitter()->isFresh()) {
121+
if ((!$persist
122+
|| 0 == $this->com->getTransmitter()->lock(T\Stream::DIRECTION_ALL))
123+
&& $this->com->getTransmitter()->isFresh()
124+
) {
122125
if (!static::login($this->com, $username, $password)) {
123126
$this->com->close();
124127
throw new DataFlowException(
125128
'Invalid username or password supplied.', 10000
126129
);
127130
}
131+
$this->com->getTransmitter()->lock(T\Stream::DIRECTION_NONE, true);
128132
}
129133

130134
if ($persist) {
131-
$this->shmHandler = new SHM(
132-
'PEAR2\Net\RouterOS\Client tcp://' .
133-
"{$host}:{$port}/{$username}"
134-
);
135+
$this->registry = new Registry("{$host}:{$port}/{$username}");
135136
}
136137
}
137138

@@ -155,7 +156,7 @@ public function __invoke($arg = null)
155156
if (is_int($arg) || is_double($arg)) {
156157
return $this->loop($arg);
157158
} elseif ($arg instanceof Request) {
158-
return $arg->getTag() === null ? $this->sendSync($arg)
159+
return '' == $arg->getTag() ? $this->sendSync($arg)
159160
: $this->sendAsync($arg);
160161
} elseif (null === $arg) {
161162
return $this->completeRequest();
@@ -523,16 +524,37 @@ public function cancelRequest($tag = null)
523524
{
524525
$cancelRequest = new Request('/cancel');
525526
$hasTag = !('' == $tag);
527+
$hasReg = null !== $this->registry;
528+
if ($hasReg && !$hasTag) {
529+
$tags = array_merge(
530+
array_keys($this->responseBuffer), array_keys($this->callbacks)
531+
);
532+
foreach ($tags as $t) {
533+
$this->cancelRequest($t);
534+
}
535+
return $this;
536+
}
537+
526538
if ($hasTag) {
527539
if ($this->isRequestActive($tag)) {
528-
$cancelRequest->setArgument('tag', $tag);
540+
if ($hasReg) {
541+
$cancelRequest->setArgument(
542+
'tag', $this->registry->getOwnershipTag() . $tag
543+
);
544+
} else {
545+
$cancelRequest->setArgument('tag', $tag);
546+
}
529547
} else {
530548
throw new DataFlowException(
531549
'No such request. Canceling aborted.', 11200
532550
);
533551
}
534552
}
553+
554+
$regBackup = $this->registry;
555+
$this->registry = null;
535556
$this->sendSync($cancelRequest);
557+
$this->registry = $regBackup;
536558

537559
if ($hasTag) {
538560
if ($this->isRequestActive($tag, self::FILTER_BUFFER)) {
@@ -603,8 +625,21 @@ public function close()
603625
}
604626
$this->callbacks = array();
605627
$this->pendingRequestsCount = 0;
628+
if (null !== $this->registry) {
629+
$this->registry->close();
630+
}
606631
return $result;
607632
}
633+
634+
/**
635+
* Closes the connection, unless it's a persistent one.
636+
*/
637+
public function __destruct()
638+
{
639+
if (!$this->com->getTransmitter()->isPersistent()) {
640+
$this->close();
641+
}
642+
}
608643

609644
/**
610645
* Sends a request to RouterOS.
@@ -617,7 +652,7 @@ public function close()
617652
*/
618653
protected function send(Request $request)
619654
{
620-
$request->send($this->com);
655+
$request->send($this->com, $this->registry);
621656
$this->pendingRequestsCount++;
622657
return $this;
623658
}
@@ -632,7 +667,9 @@ protected function send(Request $request)
632667
*/
633668
protected function dispatchNextResponse()
634669
{
635-
$response = new Response($this->com, $this->_streamingResponses);
670+
$response = new Response(
671+
$this->com, $this->_streamingResponses, $this->registry
672+
);
636673
if ($response->getType() === Response::TYPE_FATAL) {
637674
$this->pendingRequestsCount = 0;
638675
$this->com->close();
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
<?php
2+
3+
/**
4+
* ~~summary~~
5+
*
6+
* ~~description~~
7+
*
8+
* PHP version 5
9+
*
10+
* @category Net
11+
* @package PEAR2_Net_RouterOS
12+
* @author Vasil Rangelov <boen.robot@gmail.com>
13+
* @copyright 2011 Vasil Rangelov
14+
* @license http://www.gnu.org/copyleft/lesser.html LGPL License 2.1
15+
* @version GIT: $Id$
16+
* @link http://pear2.php.net/PEAR2_Net_RouterOS
17+
*/
18+
/**
19+
* The namespace declaration.
20+
*/
21+
namespace PEAR2\Net\RouterOS;
22+
23+
/**
24+
* Uses shared memory to keep responses in.
25+
*/
26+
use PEAR2\Cache\SHM;
27+
28+
/**
29+
* A RouterOS registry.
30+
*
31+
* Provides functionality for managing the request/response flow. Particularly
32+
* useful in persistent connections.
33+
*
34+
* @category Net
35+
* @package PEAR2_Net_RouterOS
36+
* @author Vasil Rangelov <boen.robot@gmail.com>
37+
* @license http://www.gnu.org/copyleft/lesser.html LGPL License 2.1
38+
* @link http://pear2.php.net/PEAR2_Net_RouterOS
39+
*/
40+
class Registry
41+
{
42+
/**
43+
* @var SHM The storage.
44+
*/
45+
protected $shm;
46+
47+
/**
48+
* @var int ID of request. Populated at first instance in request.
49+
*/
50+
protected static $requestId = -1;
51+
52+
/**
53+
* @var int ID to be given to next instance, after incrementing it.
54+
*/
55+
protected static $instanceIdSeed = -1;
56+
57+
/**
58+
* @var int ID of instance within the request.
59+
*/
60+
protected $instanceId;
61+
62+
/**
63+
* Creates a registry.
64+
*
65+
* @param string $uri An URI to bind the registry to.
66+
*/
67+
public function __construct($uri)
68+
{
69+
$this->shm = new SHM('PEAR2\Net\RouterOS\Registry ' . $uri);
70+
if (-1 === self::$requestId) {
71+
self::$requestId = $this->shm->add('requestId', 0)
72+
? 0 : $this->shm->inc('requestId');
73+
}
74+
$this->instanceId = ++self::$instanceIdSeed;
75+
$this->shm->add('responseBuffer_' . $this->getOwnershipTag(), array());
76+
}
77+
78+
/**
79+
* Parses a tag.
80+
*
81+
* Parses a tag to reveal the ownership part of it, and the original tag.
82+
*
83+
* @param string $tag The tag (as received) to parse.
84+
*
85+
* @return array An array with the first member being the ownership tag, and
86+
* the second one being the original tag.
87+
*/
88+
public static function parseTag($tag)
89+
{
90+
if (null === $tag) {
91+
return array(null, null);
92+
}
93+
$result = explode('__', $tag, 2);
94+
$result[0] .= '__';
95+
if ('' === $result[1]) {
96+
$result[1] = null;
97+
}
98+
return $result;
99+
}
100+
101+
/**
102+
* Get the ownership tag for this instance.
103+
*
104+
* @return string The ownership tag for this registry instance.
105+
*/
106+
public function getOwnershipTag()
107+
{
108+
return self::$requestId . '_' . $this->instanceId . '__';
109+
}
110+
111+
/**
112+
* Add a response to the registry.
113+
*
114+
* @param Response $response The response to add. The caller of this
115+
* function is responsible for ensuring that the ownership tag and the
116+
* original tag are separated, so that only the original one remains in the
117+
* response.
118+
* @param string $ownershipTag The ownership tag that the response had.
119+
*
120+
* @return boolean TRUE if the request was added to its buffer, FALSE if
121+
* this instance owns the response, and therefore doesn't need to add the
122+
* response to its buffer.
123+
*/
124+
public function add(Response $response, $ownershipTag)
125+
{
126+
if ($this->getOwnershipTag() === $ownershipTag) {
127+
return false;
128+
}
129+
130+
if (null === $ownershipTag) {
131+
foreach ($this->shm->getIterator('/^(responseBuffer\_)/', true)
132+
as $targetBufferName) {
133+
$this->_add($response, $targetBufferName);
134+
}
135+
return true;
136+
}
137+
138+
$this->_add($response, 'responseBuffer_' . $ownershipTag);
139+
return true;
140+
}
141+
142+
/**
143+
* Adds a response to a buffer.
144+
*
145+
* @param Response $response The response to add.
146+
* @param type $targetBufferName The name of the buffer to add the
147+
* response to.
148+
*
149+
* @return void
150+
*/
151+
private function _add(Response $response, $targetBufferName)
152+
{
153+
if ($this->shm->lock($targetBufferName)) {
154+
$targetBuffer = $this->shm->get($targetBufferName);
155+
$targetBuffer[] = $response;
156+
$this->shm->set($targetBufferName, $targetBuffer);
157+
$this->shm->unlock($targetBufferName);
158+
}
159+
}
160+
161+
/**
162+
* Gets the next response from this instance's buffer.
163+
*
164+
* @return Response|null The next response, or NULL if there isn't one.
165+
*/
166+
public function getNextResponse()
167+
{
168+
$response = null;
169+
$targetBufferName = 'responseBuffer_' . $this->getOwnershipTag();
170+
if ($this->shm->exists($targetBufferName)
171+
&& $this->shm->lock($targetBufferName)
172+
) {
173+
$targetBuffer = $this->shm->get($targetBufferName);
174+
if (!empty($targetBuffer)) {
175+
$response = array_shift($targetBuffer);
176+
$this->shm->set($targetBufferName, $targetBuffer);
177+
}
178+
$this->shm->unlock($targetBufferName);
179+
}
180+
return $response;
181+
}
182+
183+
/**
184+
* Closes the registry.
185+
*
186+
* Closes the registry, meaning that all buffers are cleared.
187+
*
188+
* @return void
189+
*/
190+
public function close()
191+
{
192+
foreach ($this->shm->getIterator('/^(responseBuffer\_)/', true)
193+
as $targetBufferName) {
194+
$this->_close($targetBufferName);
195+
}
196+
}
197+
198+
/**
199+
* Removes a buffer.
200+
*
201+
* @param string $targetBufferName The buffer to remove.
202+
*
203+
* @return void
204+
*/
205+
private function _close($targetBufferName)
206+
{
207+
if ($this->shm->lock($targetBufferName)) {
208+
$this->shm->delete($targetBufferName);
209+
$this->shm->unlock($targetBufferName);
210+
}
211+
}
212+
213+
/**
214+
* Removes this instance's buffer.
215+
*/
216+
public function __destruct()
217+
{
218+
$this->_close('responseBuffer_' . $this->getOwnershipTag());
219+
}
220+
}

src/PEAR2/Net/RouterOS/Request.php

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,13 +237,21 @@ public function removeAllArguments()
237237
* Sends a request over a communicator.
238238
*
239239
* @param Communicator $com The communicator to send the request over.
240+
* @param Registry $reg An optional registry to sync the request with.
240241
*
241242
* @return int The number of bytes sent.
242243
* @see Client::sendSync()
243244
* @see Client::sendAsync()
244245
*/
245-
public function send(Communicator $com)
246+
public function send(Communicator $com, Registry $reg = null)
246247
{
248+
if (null !== $reg) {
249+
$originalTag = $this->getTag();
250+
$this->setTag($reg->getOwnershipTag() . $originalTag);
251+
$bytes = $this->send($com);
252+
$this->setTag($originalTag);
253+
return $bytes;
254+
}
247255
if ($com->getTransmitter()->isPersistent()) {
248256
$old = $com->getTransmitter()->lock(T\Stream::DIRECTION_SEND);
249257
$bytes = $this->_send($com);

0 commit comments

Comments
 (0)