@@ -438,7 +438,7 @@ public function completeRequest($tag = null)
438438 */
439439 public function extractNewResponses ($ tag = null )
440440 {
441- if ('' == $ tag ) {
441+ if (null = == $ tag ) {
442442 $ result = array ();
443443 foreach (array_keys ($ this ->responseBuffer ) as $ tag ) {
444444 $ result = array_merge (
@@ -529,40 +529,43 @@ public function cancelRequest($tag = null)
529529 $ tags = array_merge (
530530 array_keys ($ this ->responseBuffer ), array_keys ($ this ->callbacks )
531531 );
532+ $ this ->registry ->setTaglessMode (true );
532533 foreach ($ tags as $ t ) {
533- $ this ->cancelRequest ($ t );
534+ $ cancelRequest ->setArgument (
535+ 'tag ' , $ this ->registry ->getOwnershipTag () . $ t
536+ );
537+ $ this ->sendSync ($ cancelRequest );
534538 }
535- return $ this ;
536- }
537-
538- if ($ hasTag ) {
539- if ($ this ->isRequestActive ($ tag )) {
540- if ($ hasReg ) {
541- $ cancelRequest ->setArgument (
542- 'tag ' , $ this ->registry ->getOwnershipTag () . $ tag
543- );
539+ $ this ->registry ->setTaglessMode (false );
540+ } else {
541+ if ($ hasTag ) {
542+ if ($ this ->isRequestActive ($ tag )) {
543+ if ($ hasReg ) {
544+ $ this ->registry ->setTaglessMode (true );
545+ $ cancelRequest ->setArgument (
546+ 'tag ' , $ this ->registry ->getOwnershipTag () . $ tag
547+ );
548+ } else {
549+ $ cancelRequest ->setArgument ('tag ' , $ tag );
550+ }
544551 } else {
545- $ cancelRequest ->setArgument ('tag ' , $ tag );
552+ throw new DataFlowException (
553+ 'No such request. Canceling aborted. ' , 11200
554+ );
546555 }
547- } else {
548- throw new DataFlowException (
549- ' No such request. Canceling aborted. ' , 11200
550- );
556+ }
557+ $ this -> sendSync ( $ cancelRequest );
558+ if ( $ hasReg ) {
559+ $ this -> registry -> setTaglessMode ( false );
551560 }
552561 }
553-
554- $ regBackup = $ this ->registry ;
555- $ this ->registry = null ;
556- $ this ->sendSync ($ cancelRequest );
557- $ this ->registry = $ regBackup ;
558562
559563 if ($ hasTag ) {
560564 if ($ this ->isRequestActive ($ tag , self ::FILTER_BUFFER )) {
561- unset( $ this ->responseBuffer [$ tag ]);
562- } elseif ( $ this -> isRequestActive ( $ tag , self :: FILTER_CALLBACK )) {
563- unset( $ this ->callbacks [ $ tag] );
565+ $ this ->responseBuffer [$ tag ] = $ this -> completeRequest ( $ tag );
566+ } else {
567+ $ this ->completeRequest ( $ tag );
564568 }
565- $ this ->pendingRequestsCount --;
566569 } else {
567570 $ this ->responseBuffer = array ();
568571 $ this ->callbacks = array ();
@@ -615,19 +618,27 @@ public function isStreamingResponses()
615618 */
616619 public function close ()
617620 {
618- $ result = false ;
621+ $ result = true ;
619622 try {
620- $ response = $ this ->sendSync (new Request ('/quit ' ));
621- $ result = $ this ->com ->close ()
622- && $ response ->getType () === Response::TYPE_FATAL ;
623+ if (0 !== $ this ->pendingRequestsCount ) {
624+ if (null !== $ this ->registry ) {
625+ $ this ->registry ->setTaglessMode (true );
626+ }
627+ $ response = $ this ->sendSync (new Request ('/quit ' ));
628+ if (null !== $ this ->registry ) {
629+ $ this ->registry ->setTaglessMode (false );
630+ }
631+ $ result = $ response ->getType () === Response::TYPE_FATAL ;
632+ }
633+ $ result = $ result && $ this ->com ->close ();
623634 } catch (SocketException $ e ) {
624635 $ result = $ e ->getCode () === 205 ;
625636 }
626637 $ this ->callbacks = array ();
627638 $ this ->pendingRequestsCount = 0 ;
628- if (null !== $ this ->registry ) {
629- $ this ->registry ->close ();
630- }
639+ // if (null !== $this->registry) {
640+ // $this->registry->close();
641+ // }
631642 return $ result ;
632643 }
633644
@@ -636,7 +647,11 @@ public function close()
636647 */
637648 public function __destruct ()
638649 {
639- if (!$ this ->com ->getTransmitter ()->isPersistent ()) {
650+ if ($ this ->com ->getTransmitter ()->isPersistent ()) {
651+ if (0 !== $ this ->pendingRequestsCount ) {
652+ $ this ->cancelRequest ();
653+ }
654+ } else {
640655 $ this ->close ();
641656 }
642657 }
0 commit comments