@@ -82,7 +82,7 @@ class Communicator
8282 protected $ charsets = array ();
8383
8484 /**
85- * @var TcpClient The transmitter for the connection.
85+ * @var T\ TcpClient The transmitter for the connection.
8686 */
8787 protected $ trans ;
8888
@@ -317,6 +317,12 @@ public function sendWord($word)
317317 }
318318 $ length = strlen ($ word );
319319 static ::verifyLengthSupport ($ length );
320+ if ($ this ->trans ->isPersistent ()) {
321+ $ old = $ this ->trans ->lock (T \Stream::DIRECTION_SEND );
322+ $ bytes = $ this ->trans ->send (self ::encodeLength ($ length ) . $ word );
323+ $ this ->trans ->lock ($ old , true );
324+ return $ bytes ;
325+ }
320326 return $ this ->trans ->send (self ::encodeLength ($ length ) . $ word );
321327 }
322328
@@ -440,7 +446,18 @@ public static function encodeLength($length)
440446 */
441447 public function getNextWord ()
442448 {
443- $ word = $ this ->trans ->receive (self ::decodeLength ($ this ->trans ), 'word ' );
449+ if ($ this ->trans ->isPersistent ()) {
450+ $ old = $ this ->trans ->lock (T \Stream::DIRECTION_RECEIVE );
451+ $ word = $ this ->trans ->receive (
452+ self ::decodeLength ($ this ->trans ), 'word '
453+ );
454+ $ this ->trans ->lock ($ old , true );
455+ } else {
456+ $ word = $ this ->trans ->receive (
457+ self ::decodeLength ($ this ->trans ), 'word '
458+ );
459+ }
460+
444461 if (null !== ($ remoteCharset = $ this ->getCharset (self ::CHARSET_REMOTE ))
445462 && null !== ($ localCharset = $ this ->getCharset (self ::CHARSET_LOCAL ))
446463 ) {
@@ -450,6 +467,7 @@ public function getNextWord()
450467 $ word
451468 );
452469 }
470+
453471 return $ word ;
454472 }
455473
@@ -473,9 +491,19 @@ public function getNextWordAsStream()
473491 $ remoteCharset . '. ' . $ localCharset . '//IGNORE//TRANSLIT '
474492 );
475493 }
476- $ stream = $ this ->trans ->receiveStream (
477- self ::decodeLength ($ this ->trans ), $ filters , 'stream word '
478- );
494+
495+ if ($ this ->trans ->isPersistent ()) {
496+ $ old = $ this ->trans ->lock (T \Stream::DIRECTION_RECEIVE );
497+ $ stream = $ this ->trans ->receiveStream (
498+ self ::decodeLength ($ this ->trans ), $ filters , 'stream word '
499+ );
500+ $ this ->trans ->lock ($ old , true );
501+ } else {
502+ $ stream = $ this ->trans ->receiveStream (
503+ self ::decodeLength ($ this ->trans ), $ filters , 'stream word '
504+ );
505+ }
506+
479507 return $ stream ;
480508 }
481509
@@ -491,6 +519,31 @@ public function getNextWordAsStream()
491519 * @return int The decoded length
492520 */
493521 public static function decodeLength (T \Stream $ trans )
522+ {
523+ if ($ trans ->isPersistent ()) {
524+ $ old = $ trans ->lock ($ trans ::DIRECTION_RECEIVE );
525+ $ length = self ::_decodeLength ($ trans );
526+ $ trans ->lock ($ old , true );
527+ return $ length ;
528+ }
529+ return self ::_decodeLength ($ trans );
530+ }
531+
532+ /**
533+ * Decodes the lenght of the incoming message.
534+ *
535+ * Decodes the lenght of the incoming message, as specified by the RouterOS
536+ * API.
537+ *
538+ * Difference with the non private function is that this one doesn't perform
539+ * locking if the connection is a persistent one.
540+ *
541+ * @param T\Stream $trans The transmitter from which to decode the length of
542+ * the incoming message.
543+ *
544+ * @return int The decoded length
545+ */
546+ private static function _decodeLength (T \Stream $ trans )
494547 {
495548 $ byte = ord ($ trans ->receive (1 , 'initial length byte ' ));
496549 if ($ byte & 0x80 ) {
@@ -508,7 +561,7 @@ public static function decodeLength(T\Stream $trans)
508561 + (double ) sprintf ('%u ' , $ rem ['~ ' ]);
509562 }
510563 throw new NotSupportedException (
511- 'Unknown control byte encountered. ' , 1600 , null , $ byte
564+ 'Unknown control byte encountered. ' , 1601 , null , $ byte
512565 );
513566 } else {
514567 return $ byte ;
0 commit comments