diff --git a/client.go b/client.go index 6e57585..0612b5f 100644 --- a/client.go +++ b/client.go @@ -74,6 +74,7 @@ type Response struct { toCancel chan struct{} session *ssh.Session + streaming bool exitStatus int err error } @@ -157,8 +158,8 @@ func (c *clientAttr) run(q *query) { sigChan: rcSig, session: session, + streaming: false, } - resp.setTimeout(q.respTimeout) q.respChan <- resp @@ -166,30 +167,49 @@ func (c *clientAttr) run(q *query) { wg.Add(1) go func() { + var chunk []byte + defer close(done) defer wg.Done() for scanOut.Scan() { - select { - case rcOut <- scanOut.Bytes(): - default: - c.logger.Println("msg stdout has been dropped") + chunk = scanOut.Bytes() + chunk2 := make([]byte, len(chunk)) + copy(chunk2, chunk) + if resp.streaming { + select { + case rcOut <- chunk2: + } + } else { + select { + case rcOut <- chunk2: + default: + c.logger.Println("msg stdout has been dropped") + } } } if err := scanOut.Err(); err != nil { c.logger.Println(err) } - - close(done) }() wg.Add(1) go func() { + var chunk []byte defer wg.Done() for scanErr.Scan() { - select { - case rcErr <- scanErr.Bytes(): - default: - c.logger.Println("msg stderr has been dropped") + chunk = scanErr.Bytes() + chunk2 := make([]byte, len(chunk)) + copy(chunk2, chunk) + if resp.streaming { + select { + case rcErr <- chunk2: + } + } else { + select { + case rcErr <- chunk2: + default: + c.logger.Println("msg stderr has been dropped") + } } } @@ -484,6 +504,7 @@ func (r *Response) GetStream() *Stream { return nil } + r.streaming = true return &Stream{ r: r, } @@ -554,6 +575,7 @@ func (s *Stream) Close() error { s.done = true + s.r.streaming = false s.r.session.Close() s.r.cancelTimeout() diff --git a/go.mod b/go.mod index dbe6b90..371060a 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/yahoo/vssh +module github.com/jmptbl/vssh go 1.14