From 77ba6691d687b89db39d816b3db5a2a6751fe62d Mon Sep 17 00:00:00 2001 From: Qingming Qu Date: Thu, 7 Dec 2023 08:17:26 -0800 Subject: [PATCH] Make StdOut buffer size configurable --- client.go | 28 +++++++++++++++++----------- vssh.go | 30 ++++++++++++++++++++++-------- 2 files changed, 39 insertions(+), 19 deletions(-) diff --git a/client.go b/client.go index 6e57585..7d466a2 100644 --- a/client.go +++ b/client.go @@ -48,17 +48,18 @@ type clientStats struct { // clientAttr represents client attributes type clientAttr struct { - addr string - labels map[string]string - config *ssh.ClientConfig - client *ssh.Client - logger *log.Logger - maxSessions uint8 - curSessions uint8 - lastUpdate time.Time - pty pty - stats clientStats - err error + addr string + labels map[string]string + config *ssh.ClientConfig + client *ssh.Client + logger *log.Logger + maxSessions uint8 + curSessions uint8 + lastUpdate time.Time + pty pty + stats clientStats + err error + stdOutBufferSize int sync.RWMutex } @@ -297,6 +298,11 @@ func (c *clientAttr) getScanners(s *ssh.Session, lOut, lErr int64) (*bufio.Scann scanOut = bufio.NewScanner(readerOut) } + if c.stdOutBufferSize != 0 { + buf := make([]byte, c.stdOutBufferSize) + scanOut.Buffer(buf, c.stdOutBufferSize) + } + if lErr > 0 { lReaderErr := io.LimitReader(readerErr, lErr) scanErr = bufio.NewScanner(lReaderErr) diff --git a/vssh.go b/vssh.go index 572ea9a..886b920 100644 --- a/vssh.go +++ b/vssh.go @@ -171,6 +171,14 @@ func SetLabels(labels map[string]string) ClientOption { } } +// SetStdOutBufferSize sets stdout buffer size of client +// This should be set properly if expected output size is larger than 64kb +func SetStdOutBufferSize(n int) ClientOption { + return func(c *clientAttr) { + c.stdOutBufferSize = n + } +} + func clientValidation(c *clientAttr) error { if c.config == nil { return errSSHConfig @@ -186,6 +194,7 @@ func clientValidation(c *clientAttr) error { // Start starts vSSH, including action queue and re-connect procedures. // You can construct and start the vssh like below: +// // vs := vssh.New().Start() func (v *VSSH) Start() *VSSH { ctx := context.Background() @@ -271,11 +280,14 @@ func (v *VSSH) CurrentProc() uint64 { // SetInitNumProc sets the initial number of processes / workers. // // You need to set this number right after creating vssh. +// // vs := vssh.New() // vs.SetInitNumProc(200) // vs.Start() +// // There are two other methods in case you need to change // the settings in the middle of your code. +// // IncreaseProc(n int) // DecreaseProc(n int) func (v *VSSH) SetInitNumProc(n int) { @@ -307,14 +319,15 @@ func (v *VSSH) Run(ctx context.Context, cmd string, timeout time.Duration, opts // RunWithLabel runs the command on the specific clients which // they matched with given query statement. -// labels := map[string]string { -// "POP" : "LAX", -// "OS" : "JUNOS", -// } -// // sets labels to a client -// vs.AddClient(addr, config, vssh.SetLabels(labels)) -// // run the command with label -// vs.RunWithLabel(ctx, cmd, timeout, "POP == LAX || POP == DCA) && OS == JUNOS") +// +// labels := map[string]string { +// "POP" : "LAX", +// "OS" : "JUNOS", +// } +// // sets labels to a client +// vs.AddClient(addr, config, vssh.SetLabels(labels)) +// // run the command with label +// vs.RunWithLabel(ctx, cmd, timeout, "POP == LAX || POP == DCA) && OS == JUNOS") func (v *VSSH) RunWithLabel(ctx context.Context, cmd, queryStmt string, timeout time.Duration, opts ...RunOption) (chan *Response, error) { vis, err := parseExpr(queryStmt) if err != nil { @@ -342,6 +355,7 @@ func (v *VSSH) RunWithLabel(ctx context.Context, cmd, queryStmt string, timeout } // SetLimitReaderStdout sets limit for stdout reader. +// // respChan := vs.Run(ctx, cmd, timeout, vssh.SetLimitReaderStdout(1024)) func SetLimitReaderStdout(n int64) RunOption { return func(q *query) {