From e4a6b91f68ac105200981d4f27785d0902a48e2d Mon Sep 17 00:00:00 2001 From: karitra Date: Thu, 21 Dec 2017 17:05:10 +0300 Subject: [PATCH 01/21] feat: few additional error types --- isolate/errors.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/isolate/errors.go b/isolate/errors.go index 8d65061..d5c8200 100644 --- a/isolate/errors.go +++ b/isolate/errors.go @@ -20,6 +20,8 @@ const ( codeOutputError codeKillError codeSpoolCancellationError + codeContainerMetricsFailed + codeMarshallingError ) var ( @@ -31,6 +33,8 @@ var ( errOutputError = [2]int{isolateErrCategory, codeOutputError} errKillError = [2]int{isolateErrCategory, codeKillError} errSpoolCancellationError = [2]int{isolateErrCategory, codeSpoolCancellationError} + errContainerMetricsFailed = [2]int{isolateErrCategory, codeContainerMetricsFailed} + errMarshallingError = [2]int{isolateErrCategory, codeMarshallingError} errSpawnEAGAIN = [2]int{systemCategory, codeSpawnEAGAIN} ) From 4c9565bdc20f8b0d96870b458e18236349f5c204 Mon Sep 17 00:00:00 2001 From: karitra Date: Thu, 21 Dec 2017 17:26:33 +0300 Subject: [PATCH 02/21] refact: Box/ResponseStream interface update --- isolate/isolate.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/isolate/isolate.go b/isolate/isolate.go index c7b7c7e..e565031 100644 --- a/isolate/isolate.go +++ b/isolate/isolate.go @@ -27,10 +27,14 @@ type ( Spawn(ctx context.Context, config SpawnConfig, output io.Writer) (Process, error) Inspect(ctx context.Context, workerid string) ([]byte, error) Close() error + + QueryMetrics(uuids []string) []MarkedContainerMetrics } ResponseStream interface { Write(ctx context.Context, num uint64, data []byte) error + // packedPayload - MassagePacked data byte stream + WriteMessage(ctx context.Context, num uint64, packedPayload []byte) error Error(ctx context.Context, num uint64, code [2]int, msg string) error Close(ctx context.Context, num uint64) error } From 236a170b9b437eb3a42eef98c040f235137f452d Mon Sep 17 00:00:00 2001 From: karitra Date: Thu, 21 Dec 2017 17:41:07 +0300 Subject: [PATCH 03/21] feat(conn_handler): WriteMessage for messagepack payload --- isolate/conn_handler.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/isolate/conn_handler.go b/isolate/conn_handler.go index 1856637..642f5c9 100644 --- a/isolate/conn_handler.go +++ b/isolate/conn_handler.go @@ -220,7 +220,8 @@ func (r *responseStream) close(ctx context.Context) { } } -func (r *responseStream) Write(ctx context.Context, num uint64, data []byte) error { +// Writes messagepacked payload `as is` as a packet of Cocaine +func (r *responseStream) WriteMessage(ctx context.Context, num uint64, packedPayload []byte) error { r.Lock() defer r.Unlock() @@ -238,7 +239,7 @@ func (r *responseStream) Write(ctx context.Context, num uint64, data []byte) err p = msgp.AppendUint64(p, num) p = msgp.AppendArrayHeader(p, 1) - p = msgp.AppendStringFromBytes(p, data) + p = append(p, packedPayload...) if _, err := r.wr.Write(p); err != nil { log.G(r.ctx).WithError(err).Error("responseStream.Write") @@ -247,6 +248,15 @@ func (r *responseStream) Write(ctx context.Context, num uint64, data []byte) err return nil } +// IMHO: should be named WriteString, really. +func (r *responseStream) Write(ctx context.Context, num uint64, str []byte) error { + p := msgpackBytePool.Get().([]byte)[:0] + defer msgpackBytePool.Put(p) + + p = msgp.AppendStringFromBytes(p, str) + return r.WriteMessage(ctx, num, p) +} + func (r *responseStream) Error(ctx context.Context, num uint64, code [2]int, msg string) error { r.Lock() defer r.Unlock() From 40113789c90a9b0a13529e9aca11c0e53aa11086 Mon Sep 17 00:00:00 2001 From: karitra Date: Thu, 21 Dec 2017 17:50:27 +0300 Subject: [PATCH 04/21] feat(initialdispatch): metrics requests --- isolate/initialdispatch.go | 103 +++++++++++++++++++++++++++++++++++++ 1 file changed, 103 insertions(+) diff --git a/isolate/initialdispatch.go b/isolate/initialdispatch.go index 5510e37..ef3958d 100644 --- a/isolate/initialdispatch.go +++ b/isolate/initialdispatch.go @@ -1,6 +1,7 @@ package isolate import ( + "bytes" "errors" "fmt" "reflect" @@ -24,13 +25,22 @@ const ( replySpawnWrite = 0 replySpawnError = 1 replySpawnClose = 2 + + containersMetrics = 2 + + replyMetricsOk = 0 + replyMetricsError = 1 + replyMetricsClose = 2 ) +const expectedUuidsCount = 32 + var ( // ErrInvalidArgsNum should be returned if number of arguments is wrong ErrInvalidArgsNum = errors.New("invalid arguments number") _onSpoolArgsNum = uint32(reflect.TypeOf(new(initialDispatch).onSpool).NumIn()) _onSpawnArgsNum = uint32(reflect.TypeOf(new(initialDispatch).onSpawn).NumIn()) + _onMetricsArgsNum = uint32(reflect.TypeOf(new(initialDispatch).onContainersMetrics).NumIn()) ) func checkSize(num uint32, r *msgp.Reader) error { @@ -46,6 +56,27 @@ func checkSize(num uint32, r *msgp.Reader) error { return nil } + +func readSliceString(r *msgp.Reader) (uuids []string, err error) { + var sz uint32 + + sz, err = r.ReadArrayHeader() + if err != nil { + return nil, err + } + + for i := uint32(0); i < sz; i++ { + var u string + if u, err = r.ReadString(); err == nil { + uuids = append(uuids, u) + } else { + return nil, err + } + } + + return +} + func readMapStrStr(r *msgp.Reader, mp map[string]string) (err error) { var sz uint32 sz, err = r.ReadMapHeader() @@ -84,6 +115,7 @@ func newInitialDispatch(ctx context.Context, stream ResponseStream) Dispatcher { func (d *initialDispatch) Handle(id uint64, r *msgp.Reader) (Dispatcher, error) { var err error + switch id { case spool: var rawProfile = newCocaineProfile() @@ -155,6 +187,20 @@ func (d *initialDispatch) Handle(id uint64, r *msgp.Reader) (Dispatcher, error) } return d.onSpawn(rawProfile, name, executable, args, env) + case containersMetrics: + if err = checkSize(_onMetricsArgsNum, r); err != nil { + log.G(d.ctx).Errorf("wrong args count for slot %d", id) + return nil, err + } + + uuids := make([]string, 0, expectedUuidsCount) + uuids, err = readSliceString(r) + if err != nil { + log.G(d.ctx).Errorf("wrong containersMetrics request framing: %v", err) + return nil, err + } + + return d.onContainersMetrics(uuids) default: return nil, fmt.Errorf("unknown transition id: %d", id) } @@ -268,6 +314,63 @@ func (d *initialDispatch) onSpawn(opts *cocaineProfile, name, executable string, return newSpawnDispatch(d.ctx, cancelSpawn, prCh, &flagKilled, d.stream), nil } +func (d *initialDispatch) onContainersMetrics(uuidsQuery []string) (Dispatcher, error) { + + log.G(d.ctx).Debugf("onContainersMetrics() Uuids query: %b", uuidsQuery) + + sendMetricsFunc := func(metrics MetricsResponse) { + var ( + buf bytes.Buffer + err error + ) + + if d == nil { + log.G(d.ctx).Error("strange: dispatch is `nil`") + return + } + + if err = msgp.Encode(&buf, &metrics); err != nil { + log.G(d.ctx).WithError(err).Error("unable to encode containers metrics response") + d.stream.Error(d.ctx, replyMetricsError, errMarshallingError, err.Error()) + } + + if err = d.stream.WriteMessage(d.ctx, replyMetricsOk, buf.Bytes()); err != nil { + log.G(d.ctx).WithError(err).Error("unable to send containers metrics") + d.stream.Error(d.ctx, replyMetricsError, errContainerMetricsFailed, err.Error()) + } + + log.G(d.ctx).Debug("containers metrics have been sent to runtime") + } + + go func() { + // + // TODO: + // - reduce complexity + // - log execution time + // + boxes := getBoxes(d.ctx) + boxesSize := len(boxes) + metricsResponse := make(MetricsResponse, len(uuidsQuery)) + queryResCh := make(chan []MarkedContainerMetrics) + + for _, b := range boxes { + go func(b Box) { + queryResCh <- b.QueryMetrics(uuidsQuery) + }(b) + } + + for i := 0; i < boxesSize; i++ { + for _, m := range <- queryResCh { + metricsResponse[m.uuid] = m.m + } + } + + sendMetricsFunc(metricsResponse) + }() + + return nil, nil +} + type OutputCollector struct { ctx context.Context From 4abae051fbd9a2b6460e5a5fb13562e3115c4d6c Mon Sep 17 00:00:00 2001 From: karitra Date: Thu, 21 Dec 2017 18:12:14 +0300 Subject: [PATCH 05/21] feat(Porto): metrics collector --- isolate/docker/box.go | 4 + isolate/porto/box.go | 50 ++++++++ isolate/porto/metrics_gatherer.go | 202 ++++++++++++++++++++++++++++++ isolate/process/box.go | 6 +- 4 files changed, 261 insertions(+), 1 deletion(-) create mode 100644 isolate/porto/metrics_gatherer.go diff --git a/isolate/docker/box.go b/isolate/docker/box.go index 9bdc702..794d4c9 100644 --- a/isolate/docker/box.go +++ b/isolate/docker/box.go @@ -254,6 +254,10 @@ func (b *Box) Inspect(ctx context.Context, workeruuid string) ([]byte, error) { return []byte("{}"), nil } +func (b *Box) QueryMetrics(uuids []string) (r []isolate.MarkedContainerMetrics) { + return +} + // Spool spools an image with a tag latest func (b *Box) Spool(ctx context.Context, name string, opts isolate.RawProfile) (err error) { profile, err := decodeProfile(opts) diff --git a/isolate/porto/box.go b/isolate/porto/box.go index ce5b40b..35ff6ef 100644 --- a/isolate/porto/box.go +++ b/isolate/porto/box.go @@ -33,6 +33,8 @@ import ( portorpc "github.com/yandex/porto/src/api/go/rpc" ) +const expectedContainersCount = 1000 + type portoBoxConfig struct { // Directory where volumes per app are placed Layers string `json:"layers"` @@ -80,6 +82,10 @@ type Box struct { onClose context.CancelFunc containerPropertiesAndData []string + + // mappig uuid -> metrics + muMetrics sync.Mutex + containersMetrics map[string]*isolate.ContainerMetrics } const defaultVolumeBackend = "overlay" @@ -183,6 +189,8 @@ func NewBox(ctx context.Context, cfg isolate.BoxConfig, gstate isolate.GlobalSta rootPrefix: rootPrefix, blobRepo: blobRepo, + + containersMetrics: make(map[string]*isolate.ContainerMetrics), } body, err := json.Marshal(config) @@ -558,6 +566,48 @@ func (b *Box) Inspect(ctx context.Context, workeruuid string) ([]byte, error) { return []byte(""), nil } +func (b *Box) QueryMetrics(uuids []string) (r []isolate.MarkedContainerMetrics) { + r = make([]isolate.MarkedContainerMetrics, 0, len(uuids)) + + mm := b.GetMetricsMapping() + for _, uuid := range uuids { + if met, ok := mm[uuid]; ok { + r = append(r, isolate.NewMarkedMetrics(uuid, met)) + } + } + + return +} + +func (b *Box) getIdUuidMapping() map[string]string { + result := make(map[string]string, expectedContainersCount) + + b.muContainers.Lock() + defer b.muContainers.Unlock() + + for _, c := range b.containers { + result[c.containerID] = c.uuid + } + + return result +} + +func (b *Box) SetMetricsMapping(m map[string]*isolate.ContainerMetrics) { + b.muMetrics.Lock() + defer b.muMetrics.Unlock() + + b.containersMetrics = m +} + +func (b *Box) GetMetricsMapping() (m map[string]*isolate.ContainerMetrics) { + b.muMetrics.Lock() + defer b.muMetrics.Unlock() + + return b.containersMetrics +} + + + // Close releases all resources such as idle connections from http.Transport func (b *Box) Close() error { b.transport.CloseIdleConnections() diff --git a/isolate/porto/metrics_gatherer.go b/isolate/porto/metrics_gatherer.go new file mode 100644 index 0000000..41ea578 --- /dev/null +++ b/isolate/porto/metrics_gatherer.go @@ -0,0 +1,202 @@ +package porto + +import ( + "fmt" + "golang.org/x/net/context" + + "regexp" + "time" + "strconv" + "strings" + + "github.com/noxiouz/stout/isolate" + "github.com/noxiouz/stout/pkg/log" + + porto "github.com/yandex/porto/src/api/go" +) + +var ( + spacesRegexp, _ = regexp.Compile("[ ]+") + metricsNames = []string{ + "cpu_usage", + "time", + "memory_usage", + "net_tx_bytes", + "net_rx_bytes", + } +) + +const ( + pairName = iota + pairVal + pairLen +) + +type portoResponse map[string]map[string]porto.TPortoGetResponse +type rawMetrics map[string]porto.TPortoGetResponse + +type netIfStat struct { + name string + bytesCount uint64 +} + +func parseStrUIntPair(eth string) (nstat netIfStat, err error) { + pair := strings.Split(eth, ": ") + if len(pair) == pairLen { + var v uint64 + v, err = strconv.ParseUint(pair[pairVal], 10, 64) + if err != nil { + return + } + + name := strings.Trim(pair[pairName], " ") + name = spacesRegexp.ReplaceAllString(name, "_") + + nstat = netIfStat{ + name: name, + bytesCount: v, + } + + } else { + err = fmt.Errorf("Failed to parse net record") + } + + return +} + +// TODO: check property Error/ErrorMsg fields +func parseNetValues(val porto.TPortoGetResponse) (ifs []netIfStat) { + for _, eth := range strings.Split(val.Value, ";") { + nf, err := parseStrUIntPair(eth) + if err == nil { + ifs = append(ifs, nf) + } + } + + return +} + +// TODO: check property Error/ErrorMsg fields +func parseUintProp(raw rawMetrics, propName string) (v uint64, err error) { + s, ok := raw[propName] + if !ok { + return 0, fmt.Errorf("no such prop in Porto: %s", propName) + } + + v, err = strconv.ParseUint(s.Value, 10, 64) + return +} + +func setUintField(field *uint64, raw rawMetrics, propName string) (err error) { + var v uint64 + if v, err = parseUintProp(raw, propName); err == nil { + *field = v + } + + return +} + +func makeMetricsFromMap(raw rawMetrics) (m isolate.ContainerMetrics, err error) { + + m = isolate.NewContainerMetrics() + + if err = setUintField(&m.CpuUsageNs, raw, "cpu_usage"); err != nil { + return + } + + if err = setUintField(&m.UptimeSec, raw, "time"); err != nil { + return + } + + if err = setUintField(&m.Mem, raw, "memory_usage"); err != nil { + return + } + + + for _, netIf := range parseNetValues(raw["net_tx_bytes"]) { + v := m.Net[netIf.name] + v.TxBytes += netIf.bytesCount + m.Net[netIf.name] = v + } + + for _, netIf := range parseNetValues(raw["net_rx_bytes"]) { + v := m.Net[netIf.name] + v.RxBytes += netIf.bytesCount + m.Net[netIf.name] = v + } + + if m.UptimeSec > 0 { + cpu_usage_sec := float64(m.CpuUsageNs >> 10) + m.CpuLoad = cpu_usage_sec / float64(m.UptimeSec) + } + + return +} + +func parseMetrics(ctx context.Context, props portoResponse, idToUuid map[string]string) map[string]*isolate.ContainerMetrics { + + metrics := make(map[string]*isolate.ContainerMetrics, len(props)) + + for id, rawMetrics := range props { + uuid, ok := idToUuid[id] + if !ok { + continue + } + + if m, err := makeMetricsFromMap(rawMetrics); err != nil { + log.G(ctx).WithError(err).Error("Failed to parse raw metrics") + continue + } else { + metrics[uuid] = &m + } + } + + return metrics +} + +func (box *Box) gatherMetrics(ctx context.Context) { + log.G(ctx).Debug("Initializing Porto metrics gather loop") + + idToUuid := box.getIdUuidMapping() + + portoApi, err := portoConnect() + if err != nil { + log.G(ctx).WithError(err).Error("Failed to connect to Porto service") + return + } + defer func() { + if err := portoApi.Close(); err != nil { + log.G(ctx).WithError(err).Error("Failed close connection to Porto service") + } + }() + + ids := make([]string, 0, len(idToUuid)) + for id, _ := range idToUuid { + ids = append(ids, id) + } + + var props portoResponse + props, err = portoApi.Get(ids, metricsNames) + if err != nil { + log.G(ctx).WithError(err).Error("Failed to connect to Porto service") + return + } + + metrics := parseMetrics(ctx, props, idToUuid) + box.SetMetricsMapping(metrics) +} + +func (box *Box) gatherLoop(ctx context.Context, interval time.Duration) { + log.G(ctx).Debug("Initializing Porto metrics gather loop") + + t := time.NewTicker(interval) + for { + select { + case <- ctx.Done(): + return + case <- t.C: + log.G(ctx).Debug("Porto metrics gather iteration") + box.gatherMetrics(ctx) + } + } +} diff --git a/isolate/process/box.go b/isolate/process/box.go index bac73f3..ec029eb 100644 --- a/isolate/process/box.go +++ b/isolate/process/box.go @@ -243,7 +243,7 @@ func (b *Box) Spawn(ctx context.Context, config isolate.SpawnConfig, output io.W } b.children[pr.cmd.Process.Pid] = workerInfo{ Cmd: pr.cmd, - uuid: "", + uuid: config.Args["--uuid"], } b.mu.Unlock() @@ -294,6 +294,10 @@ func (b *Box) Inspect(ctx context.Context, worker string) ([]byte, error) { return []byte("{}"), nil } +func (b *Box) QueryMetrics(uuids []string) (r []isolate.MarkedContainerMetrics) { + return +} + func (b *Box) fetch(ctx context.Context, appname string) ([]byte, error) { return b.storage.Spool(ctx, appname) } From 815aed32d15921b28a268bde070efedd5c302423 Mon Sep 17 00:00:00 2001 From: karitra Date: Thu, 21 Dec 2017 18:36:27 +0300 Subject: [PATCH 06/21] feat: container metrics structs --- isolate/container_metrics.go | 37 ++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 isolate/container_metrics.go diff --git a/isolate/container_metrics.go b/isolate/container_metrics.go new file mode 100644 index 0000000..427f037 --- /dev/null +++ b/isolate/container_metrics.go @@ -0,0 +1,37 @@ +//go:generate msgp --tests=false +//msgp:ignore isolate.MarkedContainerMetrics +package isolate + +type ( + NetStat struct { + RxBytes, TxBytes uint64 + } + + ContainerMetrics struct { + UptimeSec uint64 + + CpuUsageNs uint64 + CpuLoad float64 + + Mem uint64 + + // iface -> net stat + Net map[string]NetStat + } + + MetricsResponse map[string]*ContainerMetrics + + MarkedContainerMetrics struct { + uuid string + m *ContainerMetrics + } +) + +func NewContainerMetrics() (c ContainerMetrics) { + c.Net = make(map[string]NetStat) + return +} + +func NewMarkedMetrics(uuid string, cm *ContainerMetrics) MarkedContainerMetrics { + return MarkedContainerMetrics{uuid: uuid, m: cm} +} From b458a1b11a58a91c2f8d0ef4f028693359b9b37f Mon Sep 17 00:00:00 2001 From: karitra Date: Wed, 27 Dec 2017 20:54:37 +0300 Subject: [PATCH 07/21] chore: update Makefile with msgp generator step --- Makefile | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index e55f485..c6883bc 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,7 @@ LDFLAGS=-ldflags "-X github.com/noxiouz/stout/version.GitTag=${TAG} -X github.co .DEFAULT: all -.PHONY: fmt vet test +.PHONY: fmt vet test gen_msgp PKGS := $(shell go list ./... | grep -v ^github.com/noxiouz/stout/vendor/ | grep -v ^github.com/noxiouz/stout/version) @@ -32,11 +32,14 @@ test: cat profile.out >> coverage.txt; rm profile.out; \ fi done; \ -build: +build: gen_msgp @echo "+ $@" go build ${LDFLAGS} -o ${NAME} github.com/noxiouz/stout/cmd/stout -build_travis_release: +build_travis_release: gen_msgp @echo "+ $@" env GOOS="linux" go build ${LDFLAGS} -o ${NAME} github.com/noxiouz/stout/cmd/stout env GOOS="darwin" go build ${LDFLAGS} -o ${NAME}_osx github.com/noxiouz/stout/cmd/stout + +gen_msgp: + @cd ./isolate; go generate; cd .. From 30afd3dbb3791bb09a955ad63a7f4d42293e645f Mon Sep 17 00:00:00 2001 From: karitra Date: Wed, 27 Dec 2017 20:55:55 +0300 Subject: [PATCH 08/21] feat: add msgpack tags to metrics structs fields + show uuids metrics query in debug log --- isolate/container_metrics.go | 13 +++++++------ isolate/initialdispatch.go | 3 ++- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/isolate/container_metrics.go b/isolate/container_metrics.go index 427f037..a6c87e5 100644 --- a/isolate/container_metrics.go +++ b/isolate/container_metrics.go @@ -4,19 +4,20 @@ package isolate type ( NetStat struct { - RxBytes, TxBytes uint64 + RxBytes uint64 `msg:"rx_bytes"` + TxBytes uint64 `msg:"tx_bytes"` } ContainerMetrics struct { - UptimeSec uint64 + UptimeSec uint64 `msg:"uptime"` + CpuUsageSec uint64 `msg:"cpu_usage"` - CpuUsageNs uint64 - CpuLoad float64 + CpuLoad float64 `msg:"cpu_load"` - Mem uint64 + Mem uint64 `msg:"mem"` // iface -> net stat - Net map[string]NetStat + Net map[string]NetStat `msg:"net"` } MetricsResponse map[string]*ContainerMetrics diff --git a/isolate/initialdispatch.go b/isolate/initialdispatch.go index ef3958d..5d4bece 100644 --- a/isolate/initialdispatch.go +++ b/isolate/initialdispatch.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "reflect" + "strings" "sync/atomic" "syscall" @@ -316,7 +317,7 @@ func (d *initialDispatch) onSpawn(opts *cocaineProfile, name, executable string, func (d *initialDispatch) onContainersMetrics(uuidsQuery []string) (Dispatcher, error) { - log.G(d.ctx).Debugf("onContainersMetrics() Uuids query: %b", uuidsQuery) + log.G(d.ctx).Debugf("onContainersMetrics() Uuids query: %s", strings.Join(uuidsQuery, ", ")) sendMetricsFunc := func(metrics MetricsResponse) { var ( From a278f9d60b038087277b4d92e459a678a1c7162f Mon Sep 17 00:00:00 2001 From: karitra Date: Wed, 27 Dec 2017 20:59:54 +0300 Subject: [PATCH 09/21] feat: metrics gatherer for legacy process subsys --- isolate/process/box.go | 79 +++++++++++++++++ isolate/process/metrics_gatherer.go | 132 ++++++++++++++++++++++++++++ 2 files changed, 211 insertions(+) create mode 100644 isolate/process/metrics_gatherer.go diff --git a/isolate/process/box.go b/isolate/process/box.go index ec029eb..17c8de9 100644 --- a/isolate/process/box.go +++ b/isolate/process/box.go @@ -18,11 +18,14 @@ import ( "github.com/noxiouz/stout/pkg/log" "github.com/noxiouz/stout/pkg/semaphore" + "github.com/mitchellh/mapstructure" + apexlog "github.com/apex/log" ) const ( defaultSpoolPath = "/var/spool/cocaine" + expectedWorkersCount = 512 ) var ( @@ -41,6 +44,12 @@ type codeStorage interface { type workerInfo struct { *exec.Cmd uuid string + startTime time.Time +} + +type taskInfo struct { + uuid string + startTime time.Time } type Box struct { @@ -57,6 +66,28 @@ type Box struct { wg sync.WaitGroup spawnSm semaphore.Semaphore + + muMetrics sync.Mutex + containersMetrics map[string]*isolate.ContainerMetrics +} + +func getMetricsPollConf(cfg interface{}) (metricsConf isolate.MetricsPollConfig, err error) { + decoderConfig := mapstructure.DecoderConfig{ + WeaklyTypedInput: true, + Result: &metricsConf, + TagName: "json", + } + + var decoder *mapstructure.Decoder + + decoder, err = mapstructure.NewDecoder(&decoderConfig) + if err != nil { + return + } + + err = decoder.Decode(cfg) + + return } func NewBox(ctx context.Context, cfg isolate.BoxConfig, gstate isolate.GlobalState) (isolate.Box, error) { @@ -98,6 +129,16 @@ func NewBox(ctx context.Context, cfg isolate.BoxConfig, gstate isolate.GlobalSta box.sigchldHandler() }() + wrkMetricsConf, ok := cfg["workersmetrics"] + if ok { + // TODO: increase box working group count? + if metConf, err := getMetricsPollConf(wrkMetricsConf); err != nil { + log.G(ctx).Infof("Failed to read `workersmetrics` field, using defaults. Err: %v", err) + } else { + go box.gatherLoopEvery(ctx, time.Duration(metConf.PollPeriod) * time.Second) + } + } + return box, nil } @@ -244,6 +285,7 @@ func (b *Box) Spawn(ctx context.Context, config isolate.SpawnConfig, output io.W b.children[pr.cmd.Process.Pid] = workerInfo{ Cmd: pr.cmd, uuid: config.Args["--uuid"], + startTime: newProcStarted, } b.mu.Unlock() @@ -295,9 +337,46 @@ func (b *Box) Inspect(ctx context.Context, worker string) ([]byte, error) { } func (b *Box) QueryMetrics(uuids []string) (r []isolate.MarkedContainerMetrics) { + r = make([]isolate.MarkedContainerMetrics, 0, len(uuids)) + + mm := b.getMetricsMapping() + for _, uuid := range uuids { + if met, ok := mm[uuid]; ok { + r = append(r, isolate.NewMarkedMetrics(uuid, met)) + } + } + + return +} + +func (b *Box) getIdUuidMapping() (result map[int]taskInfo) { + // TODO: is len(b.children) safe to use as `expectedWorkersCount` + result = make(map[int]taskInfo, expectedWorkersCount) + + b.mu.Lock() + defer b.mu.Unlock() + + for pid, kid := range b.children { + result[pid] = taskInfo{uuid: kid.uuid, startTime: kid.startTime} + } + return } +func (b *Box) setMetricsMapping(m map[string]*isolate.ContainerMetrics) { + b.muMetrics.Lock() + defer b.muMetrics.Unlock() + + b.containersMetrics = m +} + +func (b *Box) getMetricsMapping() (m map[string]*isolate.ContainerMetrics) { + b.muMetrics.Lock() + defer b.muMetrics.Unlock() + + return b.containersMetrics +} + func (b *Box) fetch(ctx context.Context, appname string) ([]byte, error) { return b.storage.Spool(ctx, appname) } diff --git a/isolate/process/metrics_gatherer.go b/isolate/process/metrics_gatherer.go new file mode 100644 index 0000000..4e17609 --- /dev/null +++ b/isolate/process/metrics_gatherer.go @@ -0,0 +1,132 @@ +// TODO: +// - log timings +// +package process + +import ( + "context" + "regexp" + "syscall" + "time" + + "github.com/noxiouz/stout/isolate" + "github.com/noxiouz/stout/pkg/log" + + gopsutil "github.com/shirou/gopsutil/process" + gopsnet "github.com/shirou/gopsutil/net" +) + +const eachIface = true + +var ( + pageSize = uint64(syscall.Getpagesize()) + spacesRegexp, _ = regexp.Compile("[ ]+") +) + +func makeNiceName(name string) string { + return spacesRegexp.ReplaceAllString(name, "_") +} + +func generateNetStat(net []gopsnet.IOCountersStat) (out map[string]isolate.NetStat) { + out = make(map[string]isolate.NetStat, len(net)) + + for _, c := range net { + out[c.Name] = isolate.NetStat{ + RxBytes: c.BytesRecv, + TxBytes: c.BytesSent, + } + } + + return +} + +func readProcStat(pid int, startTime time.Time, now time.Time) (isolate.ContainerMetrics, error) { + uptime := now.Sub(startTime).Seconds() + + var ( + process *gopsutil.Process + + cpuload float64 + // netstat []gopsnet.IOCountersStat + memstat *gopsutil.MemoryInfoStat + + errStub isolate.ContainerMetrics + err error + ) + + if process, err = gopsutil.NewProcess(int32(pid)); err != nil { + return errStub, err + } + + if cpuload, err = process.CPUPercent(); err != nil { + return errStub, err + } + + if memstat, err = process.MemoryInfo(); err != nil { + return errStub, err + } + + // + // TODO: + // There is no per process network stat yet in gopsutil, + // Per process view of system stat is in `netstat` slice. + // + // Most commonly used (the only?) way to take per process network + // stats is by libpcap. + // + // if netstat, err = process.NetIOCounters(eachIface); err != nil { + // + if _, err = process.NetIOCounters(eachIface); err != nil { + return errStub, err + } + + return isolate.ContainerMetrics{ + UptimeSec: uint64(uptime), + // CpuUsageSec: + + CpuLoad: cpuload, + Mem: memstat.VMS, + // Per process net io stat is unimplemented. + // Net: generateNetStat(netstat), + }, nil +} + +func (b *Box) gatherMetrics(ctx context.Context) { + ids := b.getIdUuidMapping() + metrics := make(map[string]*isolate.ContainerMetrics, len(ids)) + + now := time.Now() + + for pid, taskInfo := range ids { + state, err := readProcStat(pid, taskInfo.startTime, now) + if err != nil { + log.G(ctx).Errorf("Failed to read stat for process with pid %d", pid) + continue + } + + metrics[taskInfo.uuid] = &state + } + + b.setMetricsMapping(metrics) +} + +func (b *Box) gatherLoopEvery(ctx context.Context, interval time.Duration) { + + if interval == 0 { + log.G(ctx).Info("Process metrics gatherer disabled (use config to setup)") + return + } + + log.G(ctx).Infof("Initializing Process metrics gather loop with %v duration", interval) + + for { + select { + case <- ctx.Done(): + return + case <-time.After(interval): + b.gatherMetrics(ctx) + } + } + + log.G(ctx).Info("Cancelling Process metrics loop") +} From a73df5d9540a7bdc7e0dc806314b77658e7c1623 Mon Sep 17 00:00:00 2001 From: karitra Date: Wed, 27 Dec 2017 21:04:26 +0300 Subject: [PATCH 10/21] feat(isolate): metrics poll config struct --- isolate/isolate.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/isolate/isolate.go b/isolate/isolate.go index e565031..022f4d0 100644 --- a/isolate/isolate.go +++ b/isolate/isolate.go @@ -78,6 +78,11 @@ type ( Headers map[string]string `json:"headers,omitempty"` } `json:"mtn,omitempty"` } + + MetricsPollConfig struct { + PollPeriod uint `json:"period_sec"` + Args json.RawMessage `json:"args"` + } ) func (d *JSONEncodedDuration) UnmarshalJSON(b []byte) error { From 8966316cf13775a26ee41b11e58f282973660730 Mon Sep 17 00:00:00 2001 From: karitra Date: Wed, 27 Dec 2017 21:08:17 +0300 Subject: [PATCH 11/21] refact: porto metrics collector --- isolate/porto/box.go | 13 ++++--- isolate/porto/metrics_gatherer.go | 63 ++++++++++++++++++++++--------- 2 files changed, 53 insertions(+), 23 deletions(-) diff --git a/isolate/porto/box.go b/isolate/porto/box.go index 35ff6ef..ad03d5c 100644 --- a/isolate/porto/box.go +++ b/isolate/porto/box.go @@ -51,6 +51,8 @@ type portoBoxConfig struct { WeakEnabled bool `json:"weakenabled"` DefaultUlimits string `json:"defaultulimits"` VolumeBackend string `json:"volumebackend"` + + WorkersMetrics isolate.MetricsPollConfig `json:"workersmetrics"` } func (c *portoBoxConfig) String() string { @@ -99,6 +101,8 @@ func NewBox(ctx context.Context, cfg isolate.BoxConfig, gstate isolate.GlobalSta CleanupEnabled: true, WeakEnabled: false, + + WorkersMetrics: isolate.MetricsPollConfig{}, } decoderConfig := mapstructure.DecoderConfig{ WeaklyTypedInput: true, @@ -215,6 +219,7 @@ func NewBox(ctx context.Context, cfg isolate.BoxConfig, gstate isolate.GlobalSta go box.waitLoop(ctx) go box.dumpJournalEvery(ctx, time.Minute) + go box.gatherLoopEvery(ctx, time.Duration(config.WorkersMetrics.PollPeriod) * time.Second) return box, nil } @@ -569,7 +574,7 @@ func (b *Box) Inspect(ctx context.Context, workeruuid string) ([]byte, error) { func (b *Box) QueryMetrics(uuids []string) (r []isolate.MarkedContainerMetrics) { r = make([]isolate.MarkedContainerMetrics, 0, len(uuids)) - mm := b.GetMetricsMapping() + mm := b.getMetricsMapping() for _, uuid := range uuids { if met, ok := mm[uuid]; ok { r = append(r, isolate.NewMarkedMetrics(uuid, met)) @@ -592,22 +597,20 @@ func (b *Box) getIdUuidMapping() map[string]string { return result } -func (b *Box) SetMetricsMapping(m map[string]*isolate.ContainerMetrics) { +func (b *Box) setMetricsMapping(m map[string]*isolate.ContainerMetrics) { b.muMetrics.Lock() defer b.muMetrics.Unlock() b.containersMetrics = m } -func (b *Box) GetMetricsMapping() (m map[string]*isolate.ContainerMetrics) { +func (b *Box) getMetricsMapping() (m map[string]*isolate.ContainerMetrics) { b.muMetrics.Lock() defer b.muMetrics.Unlock() return b.containersMetrics } - - // Close releases all resources such as idle connections from http.Transport func (b *Box) Close() error { b.transport.CloseIdleConnections() diff --git a/isolate/porto/metrics_gatherer.go b/isolate/porto/metrics_gatherer.go index 41ea578..cd6a8f7 100644 --- a/isolate/porto/metrics_gatherer.go +++ b/isolate/porto/metrics_gatherer.go @@ -1,3 +1,6 @@ +// TODO: +// - log timings +// package porto import ( @@ -8,6 +11,7 @@ import ( "time" "strconv" "strings" + "syscall" "github.com/noxiouz/stout/isolate" "github.com/noxiouz/stout/pkg/log" @@ -16,6 +20,7 @@ import ( ) var ( + pageSize = uint64(syscall.Getpagesize()) spacesRegexp, _ = regexp.Compile("[ ]+") metricsNames = []string{ "cpu_usage", @@ -26,6 +31,10 @@ var ( } ) +const ( + nanosPerSecond = 1000000000 +) + const ( pairName = iota pairVal @@ -100,10 +109,13 @@ func makeMetricsFromMap(raw rawMetrics) (m isolate.ContainerMetrics, err error) m = isolate.NewContainerMetrics() - if err = setUintField(&m.CpuUsageNs, raw, "cpu_usage"); err != nil { + if err = setUintField(&m.CpuUsageSec, raw, "cpu_usage"); err != nil { return } + // Porto's `cpu_usage` is in nanoseconds, seconds in metrics are used. + m.CpuUsageSec /= nanosPerSecond + if err = setUintField(&m.UptimeSec, raw, "time"); err != nil { return } @@ -111,6 +123,7 @@ func makeMetricsFromMap(raw rawMetrics) (m isolate.ContainerMetrics, err error) if err = setUintField(&m.Mem, raw, "memory_usage"); err != nil { return } + m.Mem *= pageSize for _, netIf := range parseNetValues(raw["net_tx_bytes"]) { @@ -126,7 +139,7 @@ func makeMetricsFromMap(raw rawMetrics) (m isolate.ContainerMetrics, err error) } if m.UptimeSec > 0 { - cpu_usage_sec := float64(m.CpuUsageNs >> 10) + cpu_usage_sec := float64(m.CpuUsageSec) m.CpuLoad = cpu_usage_sec / float64(m.UptimeSec) } @@ -154,6 +167,21 @@ func parseMetrics(ctx context.Context, props portoResponse, idToUuid map[string] return metrics } +func makeIdsSlice(idToUuid map[string]string) (ids []string) { + ids = make([]string, 0, len(idToUuid)) + for id, _ := range idToUuid { + ids = append(ids, id) + } + + return +} + +func closeApiWithLog(ctx context.Context, portoApi porto.API) { + if err := portoApi.Close(); err != nil { + log.G(ctx).WithError(err).Error("Failed to close connection to Porto service") + } +} + func (box *Box) gatherMetrics(ctx context.Context) { log.G(ctx).Debug("Initializing Porto metrics gather loop") @@ -161,19 +189,12 @@ func (box *Box) gatherMetrics(ctx context.Context) { portoApi, err := portoConnect() if err != nil { - log.G(ctx).WithError(err).Error("Failed to connect to Porto service") + log.G(ctx).WithError(err).Error("Failed to connect to Porto service for workers metrics collection") return } - defer func() { - if err := portoApi.Close(); err != nil { - log.G(ctx).WithError(err).Error("Failed close connection to Porto service") - } - }() + defer closeApiWithLog(ctx, portoApi) - ids := make([]string, 0, len(idToUuid)) - for id, _ := range idToUuid { - ids = append(ids, id) - } + ids := makeIdsSlice(idToUuid) var props portoResponse props, err = portoApi.Get(ids, metricsNames) @@ -183,20 +204,26 @@ func (box *Box) gatherMetrics(ctx context.Context) { } metrics := parseMetrics(ctx, props, idToUuid) - box.SetMetricsMapping(metrics) + box.setMetricsMapping(metrics) } -func (box *Box) gatherLoop(ctx context.Context, interval time.Duration) { - log.G(ctx).Debug("Initializing Porto metrics gather loop") +func (box *Box) gatherLoopEvery(ctx context.Context, interval time.Duration) { + + if interval == 0 { + log.G(ctx).Info("Porto metrics gatherer disabled (use config to setup)") + return + } + + log.G(ctx).Info("Initializing Porto metrics gather loop") - t := time.NewTicker(interval) for { select { case <- ctx.Done(): return - case <- t.C: - log.G(ctx).Debug("Porto metrics gather iteration") + case <-time.After(interval): box.gatherMetrics(ctx) } } + + log.G(ctx).Info("Porto metrics gather loop canceled") } From 79e4d6cb91a746b32ffec2bf688f42963952d19a Mon Sep 17 00:00:00 2001 From: karitra Date: Fri, 12 Jan 2018 15:41:20 +0300 Subject: [PATCH 12/21] feat: metrics processing time in debug log --- isolate/initialdispatch.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/isolate/initialdispatch.go b/isolate/initialdispatch.go index 5d4bece..0bed24c 100644 --- a/isolate/initialdispatch.go +++ b/isolate/initialdispatch.go @@ -8,6 +8,7 @@ import ( "strings" "sync/atomic" "syscall" + "time" "golang.org/x/net/context" @@ -317,7 +318,9 @@ func (d *initialDispatch) onSpawn(opts *cocaineProfile, name, executable string, func (d *initialDispatch) onContainersMetrics(uuidsQuery []string) (Dispatcher, error) { - log.G(d.ctx).Debugf("onContainersMetrics() Uuids query: %s", strings.Join(uuidsQuery, ", ")) + log.G(d.ctx).Debugf("onContainersMetrics() Uuids query (len %d): %s", len(uuidsQuery), strings.Join(uuidsQuery, ", ")) + + startTime := time.Now() sendMetricsFunc := func(metrics MetricsResponse) { var ( @@ -340,7 +343,7 @@ func (d *initialDispatch) onContainersMetrics(uuidsQuery []string) (Dispatcher, d.stream.Error(d.ctx, replyMetricsError, errContainerMetricsFailed, err.Error()) } - log.G(d.ctx).Debug("containers metrics have been sent to runtime") + log.G(d.ctx).WithField("time", time.Since(startTime)).Debugf("Containers metrics have been sent to runtime, response length %d", len(metrics)) } go func() { From 1dcca8518b56a89a885ccb1ccfb6ff2cefd24389 Mon Sep 17 00:00:00 2001 From: karitra Date: Wed, 27 Dec 2017 21:13:38 +0300 Subject: [PATCH 13/21] feat: use fancy string period in config --- isolate/isolate.go | 2 +- isolate/porto/box.go | 4 +++- isolate/process/box.go | 8 ++++---- isolate/process/metrics_gatherer.go | 1 + 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/isolate/isolate.go b/isolate/isolate.go index 022f4d0..dfcbf0e 100644 --- a/isolate/isolate.go +++ b/isolate/isolate.go @@ -80,7 +80,7 @@ type ( } MetricsPollConfig struct { - PollPeriod uint `json:"period_sec"` + PollPeriod string `json:"period"` Args json.RawMessage `json:"args"` } ) diff --git a/isolate/porto/box.go b/isolate/porto/box.go index ad03d5c..6c25bad 100644 --- a/isolate/porto/box.go +++ b/isolate/porto/box.go @@ -217,9 +217,11 @@ func NewBox(ctx context.Context, cfg isolate.BoxConfig, gstate isolate.GlobalSta journalContent.Set(box.journal.String()) + pollDuration, _ := time.ParseDuration(config.WorkersMetrics.PollPeriod) + go box.waitLoop(ctx) go box.dumpJournalEvery(ctx, time.Minute) - go box.gatherLoopEvery(ctx, time.Duration(config.WorkersMetrics.PollPeriod) * time.Second) + go box.gatherLoopEvery(ctx, pollDuration) return box, nil } diff --git a/isolate/process/box.go b/isolate/process/box.go index 17c8de9..bb3a3f6 100644 --- a/isolate/process/box.go +++ b/isolate/process/box.go @@ -129,13 +129,13 @@ func NewBox(ctx context.Context, cfg isolate.BoxConfig, gstate isolate.GlobalSta box.sigchldHandler() }() - wrkMetricsConf, ok := cfg["workersmetrics"] - if ok { - // TODO: increase box working group count? + // TODO: increase box working group count? + if wrkMetricsConf, ok := cfg["workersmetrics"]; ok { if metConf, err := getMetricsPollConf(wrkMetricsConf); err != nil { log.G(ctx).Infof("Failed to read `workersmetrics` field, using defaults. Err: %v", err) } else { - go box.gatherLoopEvery(ctx, time.Duration(metConf.PollPeriod) * time.Second) + duration, _ := time.ParseDuration(metConf.PollPeriod) + go box.gatherLoopEvery(ctx, duration) } } diff --git a/isolate/process/metrics_gatherer.go b/isolate/process/metrics_gatherer.go index 4e17609..6595ac1 100644 --- a/isolate/process/metrics_gatherer.go +++ b/isolate/process/metrics_gatherer.go @@ -86,6 +86,7 @@ func readProcStat(pid int, startTime time.Time, now time.Time) (isolate.Containe CpuLoad: cpuload, Mem: memstat.VMS, + // Per process net io stat is unimplemented. // Net: generateNetStat(netstat), }, nil From a8d499702e6462ab8a017d0894c7fbff9aaba68d Mon Sep 17 00:00:00 2001 From: karitra Date: Fri, 12 Jan 2018 16:05:55 +0300 Subject: [PATCH 14/21] refactor: bunch of updates - (porto) cpu load calculation - remove redundant slice make - rename container metrics -> worker metrics - rename gatherLoop -> gatherMetricsEvery --- isolate/container_metrics.go | 16 +++---- isolate/docker/box.go | 3 +- isolate/errors.go | 4 +- isolate/initialdispatch.go | 35 ++++++++------- isolate/isolate.go | 8 ++-- isolate/porto/box.go | 14 +++--- isolate/porto/metrics_gatherer.go | 66 +++++++++++++++-------------- isolate/process/box.go | 12 +++--- isolate/process/metrics_gatherer.go | 24 +++++------ 9 files changed, 90 insertions(+), 92 deletions(-) diff --git a/isolate/container_metrics.go b/isolate/container_metrics.go index a6c87e5..410af53 100644 --- a/isolate/container_metrics.go +++ b/isolate/container_metrics.go @@ -1,5 +1,5 @@ //go:generate msgp --tests=false -//msgp:ignore isolate.MarkedContainerMetrics +//msgp:ignore isolate.MarkedWorkerMetrics package isolate type ( @@ -8,7 +8,7 @@ type ( TxBytes uint64 `msg:"tx_bytes"` } - ContainerMetrics struct { + WorkerMetrics struct { UptimeSec uint64 `msg:"uptime"` CpuUsageSec uint64 `msg:"cpu_usage"` @@ -20,19 +20,19 @@ type ( Net map[string]NetStat `msg:"net"` } - MetricsResponse map[string]*ContainerMetrics + MetricsResponse map[string]*WorkerMetrics - MarkedContainerMetrics struct { + MarkedWorkerMetrics struct { uuid string - m *ContainerMetrics + m *WorkerMetrics } ) -func NewContainerMetrics() (c ContainerMetrics) { +func NewWorkerMetrics() (c WorkerMetrics) { c.Net = make(map[string]NetStat) return } -func NewMarkedMetrics(uuid string, cm *ContainerMetrics) MarkedContainerMetrics { - return MarkedContainerMetrics{uuid: uuid, m: cm} +func NewMarkedMetrics(uuid string, cm *WorkerMetrics) MarkedWorkerMetrics { + return MarkedWorkerMetrics{uuid: uuid, m: cm} } diff --git a/isolate/docker/box.go b/isolate/docker/box.go index 794d4c9..9fd94b7 100644 --- a/isolate/docker/box.go +++ b/isolate/docker/box.go @@ -254,7 +254,8 @@ func (b *Box) Inspect(ctx context.Context, workeruuid string) ([]byte, error) { return []byte("{}"), nil } -func (b *Box) QueryMetrics(uuids []string) (r []isolate.MarkedContainerMetrics) { +func (b *Box) QueryMetrics(uuids []string) (r []isolate.MarkedWorkerMetrics) { + // Not implemented yet return } diff --git a/isolate/errors.go b/isolate/errors.go index d5c8200..ea94752 100644 --- a/isolate/errors.go +++ b/isolate/errors.go @@ -20,7 +20,7 @@ const ( codeOutputError codeKillError codeSpoolCancellationError - codeContainerMetricsFailed + codeWorkerMetricsFailed codeMarshallingError ) @@ -33,7 +33,7 @@ var ( errOutputError = [2]int{isolateErrCategory, codeOutputError} errKillError = [2]int{isolateErrCategory, codeKillError} errSpoolCancellationError = [2]int{isolateErrCategory, codeSpoolCancellationError} - errContainerMetricsFailed = [2]int{isolateErrCategory, codeContainerMetricsFailed} + errWorkerMetricsFailed = [2]int{isolateErrCategory, codeWorkerMetricsFailed} errMarshallingError = [2]int{isolateErrCategory, codeMarshallingError} errSpawnEAGAIN = [2]int{systemCategory, codeSpawnEAGAIN} ) diff --git a/isolate/initialdispatch.go b/isolate/initialdispatch.go index 0bed24c..1e39d41 100644 --- a/isolate/initialdispatch.go +++ b/isolate/initialdispatch.go @@ -28,7 +28,7 @@ const ( replySpawnError = 1 replySpawnClose = 2 - containersMetrics = 2 + workersMetrics = 2 replyMetricsOk = 0 replyMetricsError = 1 @@ -42,7 +42,7 @@ var ( ErrInvalidArgsNum = errors.New("invalid arguments number") _onSpoolArgsNum = uint32(reflect.TypeOf(new(initialDispatch).onSpool).NumIn()) _onSpawnArgsNum = uint32(reflect.TypeOf(new(initialDispatch).onSpawn).NumIn()) - _onMetricsArgsNum = uint32(reflect.TypeOf(new(initialDispatch).onContainersMetrics).NumIn()) + _onMetricsArgsNum = uint32(reflect.TypeOf(new(initialDispatch).onWorkersMetrics).NumIn()) ) func checkSize(num uint32, r *msgp.Reader) error { @@ -58,13 +58,12 @@ func checkSize(num uint32, r *msgp.Reader) error { return nil } - -func readSliceString(r *msgp.Reader) (uuids []string, err error) { +func readStringsSlice(r *msgp.Reader) (uuids []string, err error) { var sz uint32 sz, err = r.ReadArrayHeader() if err != nil { - return nil, err + return uuids, err } for i := uint32(0); i < sz; i++ { @@ -72,7 +71,7 @@ func readSliceString(r *msgp.Reader) (uuids []string, err error) { if u, err = r.ReadString(); err == nil { uuids = append(uuids, u) } else { - return nil, err + return uuids, err } } @@ -189,20 +188,19 @@ func (d *initialDispatch) Handle(id uint64, r *msgp.Reader) (Dispatcher, error) } return d.onSpawn(rawProfile, name, executable, args, env) - case containersMetrics: + case workersMetrics: if err = checkSize(_onMetricsArgsNum, r); err != nil { log.G(d.ctx).Errorf("wrong args count for slot %d", id) return nil, err } - uuids := make([]string, 0, expectedUuidsCount) - uuids, err = readSliceString(r) - if err != nil { - log.G(d.ctx).Errorf("wrong containersMetrics request framing: %v", err) + var uuids []string + if uuids, err = readStringsSlice(r); err != nil { + log.G(d.ctx).Errorf("wrong workersMetrics request framing: %v", err) return nil, err } - return d.onContainersMetrics(uuids) + return d.onWorkersMetrics(uuids) default: return nil, fmt.Errorf("unknown transition id: %d", id) } @@ -316,9 +314,9 @@ func (d *initialDispatch) onSpawn(opts *cocaineProfile, name, executable string, return newSpawnDispatch(d.ctx, cancelSpawn, prCh, &flagKilled, d.stream), nil } -func (d *initialDispatch) onContainersMetrics(uuidsQuery []string) (Dispatcher, error) { +func (d *initialDispatch) onWorkersMetrics(uuidsQuery []string) (Dispatcher, error) { - log.G(d.ctx).Debugf("onContainersMetrics() Uuids query (len %d): %s", len(uuidsQuery), strings.Join(uuidsQuery, ", ")) + log.G(d.ctx).Debugf("onWorkersMetrics() Uuids query (len %d): %s", len(uuidsQuery), strings.Join(uuidsQuery, ", ")) startTime := time.Now() @@ -334,13 +332,13 @@ func (d *initialDispatch) onContainersMetrics(uuidsQuery []string) (Dispatcher, } if err = msgp.Encode(&buf, &metrics); err != nil { - log.G(d.ctx).WithError(err).Error("unable to encode containers metrics response") + log.G(d.ctx).WithError(err).Errorf("unable to encode containers metrics response: %v", err) d.stream.Error(d.ctx, replyMetricsError, errMarshallingError, err.Error()) } if err = d.stream.WriteMessage(d.ctx, replyMetricsOk, buf.Bytes()); err != nil { - log.G(d.ctx).WithError(err).Error("unable to send containers metrics") - d.stream.Error(d.ctx, replyMetricsError, errContainerMetricsFailed, err.Error()) + log.G(d.ctx).WithError(err).Errorf("unable to send containers metrics: %v", err) + d.stream.Error(d.ctx, replyMetricsError, errWorkerMetricsFailed, err.Error()) } log.G(d.ctx).WithField("time", time.Since(startTime)).Debugf("Containers metrics have been sent to runtime, response length %d", len(metrics)) @@ -350,12 +348,13 @@ func (d *initialDispatch) onContainersMetrics(uuidsQuery []string) (Dispatcher, // // TODO: // - reduce complexity + // DONE: // - log execution time // boxes := getBoxes(d.ctx) boxesSize := len(boxes) metricsResponse := make(MetricsResponse, len(uuidsQuery)) - queryResCh := make(chan []MarkedContainerMetrics) + queryResCh := make(chan []MarkedWorkerMetrics) for _, b := range boxes { go func(b Box) { diff --git a/isolate/isolate.go b/isolate/isolate.go index dfcbf0e..d6ae609 100644 --- a/isolate/isolate.go +++ b/isolate/isolate.go @@ -28,12 +28,12 @@ type ( Inspect(ctx context.Context, workerid string) ([]byte, error) Close() error - QueryMetrics(uuids []string) []MarkedContainerMetrics + QueryMetrics(uuids []string) []MarkedWorkerMetrics } ResponseStream interface { Write(ctx context.Context, num uint64, data []byte) error - // packedPayload - MassagePacked data byte stream + // packedPayload - MessagePacked data byte stream WriteMessage(ctx context.Context, num uint64, packedPayload []byte) error Error(ctx context.Context, num uint64, code [2]int, msg string) error Close(ctx context.Context, num uint64) error @@ -80,8 +80,8 @@ type ( } MetricsPollConfig struct { - PollPeriod string `json:"period"` - Args json.RawMessage `json:"args"` + PollPeriod string `json:"period"` + Args json.RawMessage `json:"args"` } ) diff --git a/isolate/porto/box.go b/isolate/porto/box.go index 6c25bad..3bc7b9d 100644 --- a/isolate/porto/box.go +++ b/isolate/porto/box.go @@ -87,7 +87,7 @@ type Box struct { // mappig uuid -> metrics muMetrics sync.Mutex - containersMetrics map[string]*isolate.ContainerMetrics + containersMetrics map[string]*isolate.WorkerMetrics } const defaultVolumeBackend = "overlay" @@ -194,7 +194,7 @@ func NewBox(ctx context.Context, cfg isolate.BoxConfig, gstate isolate.GlobalSta blobRepo: blobRepo, - containersMetrics: make(map[string]*isolate.ContainerMetrics), + containersMetrics: make(map[string]*isolate.WorkerMetrics), } body, err := json.Marshal(config) @@ -221,7 +221,7 @@ func NewBox(ctx context.Context, cfg isolate.BoxConfig, gstate isolate.GlobalSta go box.waitLoop(ctx) go box.dumpJournalEvery(ctx, time.Minute) - go box.gatherLoopEvery(ctx, pollDuration) + go box.gatherMetricsEvery(ctx, pollDuration) return box, nil } @@ -573,9 +573,7 @@ func (b *Box) Inspect(ctx context.Context, workeruuid string) ([]byte, error) { return []byte(""), nil } -func (b *Box) QueryMetrics(uuids []string) (r []isolate.MarkedContainerMetrics) { - r = make([]isolate.MarkedContainerMetrics, 0, len(uuids)) - +func (b *Box) QueryMetrics(uuids []string) (r []isolate.MarkedWorkerMetrics) { mm := b.getMetricsMapping() for _, uuid := range uuids { if met, ok := mm[uuid]; ok { @@ -599,14 +597,14 @@ func (b *Box) getIdUuidMapping() map[string]string { return result } -func (b *Box) setMetricsMapping(m map[string]*isolate.ContainerMetrics) { +func (b *Box) setMetricsMapping(m map[string]*isolate.WorkerMetrics) { b.muMetrics.Lock() defer b.muMetrics.Unlock() b.containersMetrics = m } -func (b *Box) getMetricsMapping() (m map[string]*isolate.ContainerMetrics) { +func (b *Box) getMetricsMapping() (m map[string]*isolate.WorkerMetrics) { b.muMetrics.Lock() defer b.muMetrics.Unlock() diff --git a/isolate/porto/metrics_gatherer.go b/isolate/porto/metrics_gatherer.go index cd6a8f7..1a79d7a 100644 --- a/isolate/porto/metrics_gatherer.go +++ b/isolate/porto/metrics_gatherer.go @@ -11,7 +11,6 @@ import ( "time" "strconv" "strings" - "syscall" "github.com/noxiouz/stout/isolate" "github.com/noxiouz/stout/pkg/log" @@ -20,7 +19,6 @@ import ( ) var ( - pageSize = uint64(syscall.Getpagesize()) spacesRegexp, _ = regexp.Compile("[ ]+") metricsNames = []string{ "cpu_usage", @@ -49,11 +47,15 @@ type netIfStat struct { bytesCount uint64 } -func parseStrUIntPair(eth string) (nstat netIfStat, err error) { +// +// Parses string in format `w(lan) interface: bytes count` +// +func parseNetPair(eth string) (nstat netIfStat, err error) { pair := strings.Split(eth, ": ") if len(pair) == pairLen { var v uint64 - v, err = strconv.ParseUint(pair[pairVal], 10, 64) + trimmedStr := strings.Trim(pair[pairVal], " ") + v, err = strconv.ParseUint(trimmedStr, 10, 64) if err != nil { return } @@ -65,7 +67,6 @@ func parseStrUIntPair(eth string) (nstat netIfStat, err error) { name: name, bytesCount: v, } - } else { err = fmt.Errorf("Failed to parse net record") } @@ -76,8 +77,7 @@ func parseStrUIntPair(eth string) (nstat netIfStat, err error) { // TODO: check property Error/ErrorMsg fields func parseNetValues(val porto.TPortoGetResponse) (ifs []netIfStat) { for _, eth := range strings.Split(val.Value, ";") { - nf, err := parseStrUIntPair(eth) - if err == nil { + if nf, err := parseNetPair(eth); err == nil { ifs = append(ifs, nf) } } @@ -92,8 +92,11 @@ func parseUintProp(raw rawMetrics, propName string) (v uint64, err error) { return 0, fmt.Errorf("no such prop in Porto: %s", propName) } - v, err = strconv.ParseUint(s.Value, 10, 64) - return + if len(s.Value) == 0 { + return v, fmt.Errorf("property is mpty string") + } + + return strconv.ParseUint(s.Value, 10, 64) } func setUintField(field *uint64, raw rawMetrics, propName string) (err error) { @@ -105,26 +108,29 @@ func setUintField(field *uint64, raw rawMetrics, propName string) (err error) { return } -func makeMetricsFromMap(raw rawMetrics) (m isolate.ContainerMetrics, err error) { +func makeMetricsFromMap(raw rawMetrics) (m isolate.WorkerMetrics, err error) { + m = isolate.NewWorkerMetrics() - m = isolate.NewContainerMetrics() + if err = setUintField(&m.UptimeSec, raw, "time"); err != nil { + return + } if err = setUintField(&m.CpuUsageSec, raw, "cpu_usage"); err != nil { return } + if m.UptimeSec > 0 { + m.CpuLoad = float64(m.CpuUsageSec) / float64(nanosPerSecond) / float64(m.UptimeSec) + } + // Porto's `cpu_usage` is in nanoseconds, seconds in metrics are used. m.CpuUsageSec /= nanosPerSecond - if err = setUintField(&m.UptimeSec, raw, "time"); err != nil { - return - } - if err = setUintField(&m.Mem, raw, "memory_usage"); err != nil { return } - m.Mem *= pageSize - + // `memory_usage` is in bytes, not in pages + // m.Mem *= pageSize for _, netIf := range parseNetValues(raw["net_tx_bytes"]) { v := m.Net[netIf.name] @@ -138,18 +144,13 @@ func makeMetricsFromMap(raw rawMetrics) (m isolate.ContainerMetrics, err error) m.Net[netIf.name] = v } - if m.UptimeSec > 0 { - cpu_usage_sec := float64(m.CpuUsageSec) - m.CpuLoad = cpu_usage_sec / float64(m.UptimeSec) - } - return } -func parseMetrics(ctx context.Context, props portoResponse, idToUuid map[string]string) map[string]*isolate.ContainerMetrics { - - metrics := make(map[string]*isolate.ContainerMetrics, len(props)) +func parseMetrics(ctx context.Context, props portoResponse, idToUuid map[string]string) map[string]*isolate.WorkerMetrics { + var parse_errors []string + metrics := make(map[string]*isolate.WorkerMetrics, len(props)) for id, rawMetrics := range props { uuid, ok := idToUuid[id] if !ok { @@ -157,22 +158,25 @@ func parseMetrics(ctx context.Context, props portoResponse, idToUuid map[string] } if m, err := makeMetricsFromMap(rawMetrics); err != nil { - log.G(ctx).WithError(err).Error("Failed to parse raw metrics") + parse_errors = append(parse_errors, err.Error()) continue } else { metrics[uuid] = &m } + + } + + if len(parse_errors) != 0 { + log.G(ctx).Errorf("Failed to parse raw metrics with error %s", strings.Join(parse_errors, ", ")); } return metrics } func makeIdsSlice(idToUuid map[string]string) (ids []string) { - ids = make([]string, 0, len(idToUuid)) for id, _ := range idToUuid { ids = append(ids, id) } - return } @@ -183,8 +187,6 @@ func closeApiWithLog(ctx context.Context, portoApi porto.API) { } func (box *Box) gatherMetrics(ctx context.Context) { - log.G(ctx).Debug("Initializing Porto metrics gather loop") - idToUuid := box.getIdUuidMapping() portoApi, err := portoConnect() @@ -207,14 +209,14 @@ func (box *Box) gatherMetrics(ctx context.Context) { box.setMetricsMapping(metrics) } -func (box *Box) gatherLoopEvery(ctx context.Context, interval time.Duration) { +func (box *Box) gatherMetricsEvery(ctx context.Context, interval time.Duration) { if interval == 0 { log.G(ctx).Info("Porto metrics gatherer disabled (use config to setup)") return } - log.G(ctx).Info("Initializing Porto metrics gather loop") + log.G(ctx).Infof("Initializing Porto metrics gather loop with %v duration", interval) for { select { diff --git a/isolate/process/box.go b/isolate/process/box.go index bb3a3f6..22da8fe 100644 --- a/isolate/process/box.go +++ b/isolate/process/box.go @@ -68,7 +68,7 @@ type Box struct { spawnSm semaphore.Semaphore muMetrics sync.Mutex - containersMetrics map[string]*isolate.ContainerMetrics + containersMetrics map[string]*isolate.WorkerMetrics } func getMetricsPollConf(cfg interface{}) (metricsConf isolate.MetricsPollConfig, err error) { @@ -135,7 +135,7 @@ func NewBox(ctx context.Context, cfg isolate.BoxConfig, gstate isolate.GlobalSta log.G(ctx).Infof("Failed to read `workersmetrics` field, using defaults. Err: %v", err) } else { duration, _ := time.ParseDuration(metConf.PollPeriod) - go box.gatherLoopEvery(ctx, duration) + go box.gatherMetricsEvery(ctx, duration) } } @@ -336,9 +336,7 @@ func (b *Box) Inspect(ctx context.Context, worker string) ([]byte, error) { return []byte("{}"), nil } -func (b *Box) QueryMetrics(uuids []string) (r []isolate.MarkedContainerMetrics) { - r = make([]isolate.MarkedContainerMetrics, 0, len(uuids)) - +func (b *Box) QueryMetrics(uuids []string) (r []isolate.MarkedWorkerMetrics) { mm := b.getMetricsMapping() for _, uuid := range uuids { if met, ok := mm[uuid]; ok { @@ -363,14 +361,14 @@ func (b *Box) getIdUuidMapping() (result map[int]taskInfo) { return } -func (b *Box) setMetricsMapping(m map[string]*isolate.ContainerMetrics) { +func (b *Box) setMetricsMapping(m map[string]*isolate.WorkerMetrics) { b.muMetrics.Lock() defer b.muMetrics.Unlock() b.containersMetrics = m } -func (b *Box) getMetricsMapping() (m map[string]*isolate.ContainerMetrics) { +func (b *Box) getMetricsMapping() (m map[string]*isolate.WorkerMetrics) { b.muMetrics.Lock() defer b.muMetrics.Unlock() diff --git a/isolate/process/metrics_gatherer.go b/isolate/process/metrics_gatherer.go index 6595ac1..7f8f744 100644 --- a/isolate/process/metrics_gatherer.go +++ b/isolate/process/metrics_gatherer.go @@ -40,8 +40,7 @@ func generateNetStat(net []gopsnet.IOCountersStat) (out map[string]isolate.NetSt return } -func readProcStat(pid int, startTime time.Time, now time.Time) (isolate.ContainerMetrics, error) { - uptime := now.Sub(startTime).Seconds() +func readProcStat(pid int, uptimeSeconds uint64) (isolate.WorkerMetrics, error) { var ( process *gopsutil.Process @@ -50,7 +49,7 @@ func readProcStat(pid int, startTime time.Time, now time.Time) (isolate.Containe // netstat []gopsnet.IOCountersStat memstat *gopsutil.MemoryInfoStat - errStub isolate.ContainerMetrics + errStub isolate.WorkerMetrics err error ) @@ -80,8 +79,8 @@ func readProcStat(pid int, startTime time.Time, now time.Time) (isolate.Containe return errStub, err } - return isolate.ContainerMetrics{ - UptimeSec: uint64(uptime), + return isolate.WorkerMetrics{ + UptimeSec: uptimeSeconds, // CpuUsageSec: CpuLoad: cpuload, @@ -94,24 +93,25 @@ func readProcStat(pid int, startTime time.Time, now time.Time) (isolate.Containe func (b *Box) gatherMetrics(ctx context.Context) { ids := b.getIdUuidMapping() - metrics := make(map[string]*isolate.ContainerMetrics, len(ids)) + metrics := make(map[string]*isolate.WorkerMetrics, len(ids)) now := time.Now() for pid, taskInfo := range ids { - state, err := readProcStat(pid, taskInfo.startTime, now) - if err != nil { + uptimeSeconds := uint64(now.Sub(taskInfo.startTime).Seconds()) + + if state, err := readProcStat(pid, uptimeSeconds); err != nil { log.G(ctx).Errorf("Failed to read stat for process with pid %d", pid) continue + } else { + metrics[taskInfo.uuid] = &state } - - metrics[taskInfo.uuid] = &state - } + } // for each taskInfo b.setMetricsMapping(metrics) } -func (b *Box) gatherLoopEvery(ctx context.Context, interval time.Duration) { +func (b *Box) gatherMetricsEvery(ctx context.Context, interval time.Duration) { if interval == 0 { log.G(ctx).Info("Process metrics gatherer disabled (use config to setup)") From 0bb4c696f8bdcd560ff786a157b682d9115a0fc6 Mon Sep 17 00:00:00 2001 From: karitra Date: Tue, 16 Jan 2018 18:14:23 +0300 Subject: [PATCH 15/21] chore: make gofmt a bit happier --- isolate/container_metrics.go | 4 +- isolate/initialdispatch.go | 4 +- isolate/isolate.go | 4 +- isolate/porto/box.go | 2 +- isolate/porto/metrics_gatherer.go | 316 ++++++++++++++-------------- isolate/process/box.go | 14 +- isolate/process/metrics_gatherer.go | 186 ++++++++-------- 7 files changed, 265 insertions(+), 265 deletions(-) diff --git a/isolate/container_metrics.go b/isolate/container_metrics.go index 410af53..c85b03e 100644 --- a/isolate/container_metrics.go +++ b/isolate/container_metrics.go @@ -9,7 +9,7 @@ type ( } WorkerMetrics struct { - UptimeSec uint64 `msg:"uptime"` + UptimeSec uint64 `msg:"uptime"` CpuUsageSec uint64 `msg:"cpu_usage"` CpuLoad float64 `msg:"cpu_load"` @@ -24,7 +24,7 @@ type ( MarkedWorkerMetrics struct { uuid string - m *WorkerMetrics + m *WorkerMetrics } ) diff --git a/isolate/initialdispatch.go b/isolate/initialdispatch.go index 1e39d41..67dc1e0 100644 --- a/isolate/initialdispatch.go +++ b/isolate/initialdispatch.go @@ -30,7 +30,7 @@ const ( workersMetrics = 2 - replyMetricsOk = 0 + replyMetricsOk = 0 replyMetricsError = 1 replyMetricsClose = 2 ) @@ -363,7 +363,7 @@ func (d *initialDispatch) onWorkersMetrics(uuidsQuery []string) (Dispatcher, err } for i := 0; i < boxesSize; i++ { - for _, m := range <- queryResCh { + for _, m := range <-queryResCh { metricsResponse[m.uuid] = m.m } } diff --git a/isolate/isolate.go b/isolate/isolate.go index d6ae609..2bc13b1 100644 --- a/isolate/isolate.go +++ b/isolate/isolate.go @@ -80,8 +80,8 @@ type ( } MetricsPollConfig struct { - PollPeriod string `json:"period"` - Args json.RawMessage `json:"args"` + PollPeriod string `json:"period"` + Args json.RawMessage `json:"args"` } ) diff --git a/isolate/porto/box.go b/isolate/porto/box.go index 3bc7b9d..70889a6 100644 --- a/isolate/porto/box.go +++ b/isolate/porto/box.go @@ -86,7 +86,7 @@ type Box struct { containerPropertiesAndData []string // mappig uuid -> metrics - muMetrics sync.Mutex + muMetrics sync.Mutex containersMetrics map[string]*isolate.WorkerMetrics } diff --git a/isolate/porto/metrics_gatherer.go b/isolate/porto/metrics_gatherer.go index 1a79d7a..6522cd7 100644 --- a/isolate/porto/metrics_gatherer.go +++ b/isolate/porto/metrics_gatherer.go @@ -4,228 +4,228 @@ package porto import ( - "fmt" - "golang.org/x/net/context" + "fmt" + "golang.org/x/net/context" - "regexp" - "time" - "strconv" - "strings" + "regexp" + "strconv" + "strings" + "time" - "github.com/noxiouz/stout/isolate" - "github.com/noxiouz/stout/pkg/log" + "github.com/noxiouz/stout/isolate" + "github.com/noxiouz/stout/pkg/log" - porto "github.com/yandex/porto/src/api/go" + porto "github.com/yandex/porto/src/api/go" ) var ( - spacesRegexp, _ = regexp.Compile("[ ]+") - metricsNames = []string{ - "cpu_usage", - "time", - "memory_usage", - "net_tx_bytes", - "net_rx_bytes", - } + spacesRegexp, _ = regexp.Compile("[ ]+") + metricsNames = []string{ + "cpu_usage", + "time", + "memory_usage", + "net_tx_bytes", + "net_rx_bytes", + } ) const ( - nanosPerSecond = 1000000000 + nanosPerSecond = 1000000000 ) const ( - pairName = iota - pairVal - pairLen + pairName = iota + pairVal + pairLen ) type portoResponse map[string]map[string]porto.TPortoGetResponse type rawMetrics map[string]porto.TPortoGetResponse type netIfStat struct { - name string - bytesCount uint64 + name string + bytesCount uint64 } // // Parses string in format `w(lan) interface: bytes count` // func parseNetPair(eth string) (nstat netIfStat, err error) { - pair := strings.Split(eth, ": ") - if len(pair) == pairLen { - var v uint64 - trimmedStr := strings.Trim(pair[pairVal], " ") - v, err = strconv.ParseUint(trimmedStr, 10, 64) - if err != nil { - return - } - - name := strings.Trim(pair[pairName], " ") - name = spacesRegexp.ReplaceAllString(name, "_") - - nstat = netIfStat{ - name: name, - bytesCount: v, - } - } else { - err = fmt.Errorf("Failed to parse net record") - } - - return + pair := strings.Split(eth, ": ") + if len(pair) == pairLen { + var v uint64 + trimmedStr := strings.Trim(pair[pairVal], " ") + v, err = strconv.ParseUint(trimmedStr, 10, 64) + if err != nil { + return + } + + name := strings.Trim(pair[pairName], " ") + name = spacesRegexp.ReplaceAllString(name, "_") + + nstat = netIfStat{ + name: name, + bytesCount: v, + } + } else { + err = fmt.Errorf("Failed to parse net record") + } + + return } // TODO: check property Error/ErrorMsg fields func parseNetValues(val porto.TPortoGetResponse) (ifs []netIfStat) { - for _, eth := range strings.Split(val.Value, ";") { - if nf, err := parseNetPair(eth); err == nil { - ifs = append(ifs, nf) - } - } + for _, eth := range strings.Split(val.Value, ";") { + if nf, err := parseNetPair(eth); err == nil { + ifs = append(ifs, nf) + } + } - return + return } // TODO: check property Error/ErrorMsg fields func parseUintProp(raw rawMetrics, propName string) (v uint64, err error) { - s, ok := raw[propName] - if !ok { - return 0, fmt.Errorf("no such prop in Porto: %s", propName) - } + s, ok := raw[propName] + if !ok { + return 0, fmt.Errorf("no such prop in Porto: %s", propName) + } - if len(s.Value) == 0 { - return v, fmt.Errorf("property is mpty string") - } + if len(s.Value) == 0 { + return v, fmt.Errorf("property is empty string") + } - return strconv.ParseUint(s.Value, 10, 64) + return strconv.ParseUint(s.Value, 10, 64) } func setUintField(field *uint64, raw rawMetrics, propName string) (err error) { - var v uint64 - if v, err = parseUintProp(raw, propName); err == nil { - *field = v - } + var v uint64 + if v, err = parseUintProp(raw, propName); err == nil { + *field = v + } - return + return } func makeMetricsFromMap(raw rawMetrics) (m isolate.WorkerMetrics, err error) { - m = isolate.NewWorkerMetrics() - - if err = setUintField(&m.UptimeSec, raw, "time"); err != nil { - return - } - - if err = setUintField(&m.CpuUsageSec, raw, "cpu_usage"); err != nil { - return - } - - if m.UptimeSec > 0 { - m.CpuLoad = float64(m.CpuUsageSec) / float64(nanosPerSecond) / float64(m.UptimeSec) - } - - // Porto's `cpu_usage` is in nanoseconds, seconds in metrics are used. - m.CpuUsageSec /= nanosPerSecond - - if err = setUintField(&m.Mem, raw, "memory_usage"); err != nil { - return - } - // `memory_usage` is in bytes, not in pages - // m.Mem *= pageSize - - for _, netIf := range parseNetValues(raw["net_tx_bytes"]) { - v := m.Net[netIf.name] - v.TxBytes += netIf.bytesCount - m.Net[netIf.name] = v - } - - for _, netIf := range parseNetValues(raw["net_rx_bytes"]) { - v := m.Net[netIf.name] - v.RxBytes += netIf.bytesCount - m.Net[netIf.name] = v - } - - return + m = isolate.NewWorkerMetrics() + + if err = setUintField(&m.UptimeSec, raw, "time"); err != nil { + return + } + + if err = setUintField(&m.CpuUsageSec, raw, "cpu_usage"); err != nil { + return + } + + if m.UptimeSec > 0 { + m.CpuLoad = float64(m.CpuUsageSec) / float64(nanosPerSecond) / float64(m.UptimeSec) + } + + // Porto's `cpu_usage` is in nanoseconds, seconds in metrics are used. + m.CpuUsageSec /= nanosPerSecond + + if err = setUintField(&m.Mem, raw, "memory_usage"); err != nil { + return + } + // `memory_usage` is in bytes, not in pages + // m.Mem *= pageSize + + for _, netIf := range parseNetValues(raw["net_tx_bytes"]) { + v := m.Net[netIf.name] + v.TxBytes += netIf.bytesCount + m.Net[netIf.name] = v + } + + for _, netIf := range parseNetValues(raw["net_rx_bytes"]) { + v := m.Net[netIf.name] + v.RxBytes += netIf.bytesCount + m.Net[netIf.name] = v + } + + return } func parseMetrics(ctx context.Context, props portoResponse, idToUuid map[string]string) map[string]*isolate.WorkerMetrics { - var parse_errors []string + var parse_errors []string - metrics := make(map[string]*isolate.WorkerMetrics, len(props)) - for id, rawMetrics := range props { - uuid, ok := idToUuid[id] - if !ok { - continue - } + metrics := make(map[string]*isolate.WorkerMetrics, len(props)) + for id, rawMetrics := range props { + uuid, ok := idToUuid[id] + if !ok { + continue + } - if m, err := makeMetricsFromMap(rawMetrics); err != nil { - parse_errors = append(parse_errors, err.Error()) - continue - } else { - metrics[uuid] = &m - } + if m, err := makeMetricsFromMap(rawMetrics); err != nil { + parse_errors = append(parse_errors, err.Error()) + continue + } else { + metrics[uuid] = &m + } - } + } - if len(parse_errors) != 0 { - log.G(ctx).Errorf("Failed to parse raw metrics with error %s", strings.Join(parse_errors, ", ")); - } + if len(parse_errors) != 0 { + log.G(ctx).Errorf("Failed to parse raw metrics with error %s", strings.Join(parse_errors, ", ")) + } - return metrics + return metrics } func makeIdsSlice(idToUuid map[string]string) (ids []string) { - for id, _ := range idToUuid { - ids = append(ids, id) - } - return + for id := range idToUuid { + ids = append(ids, id) + } + return } func closeApiWithLog(ctx context.Context, portoApi porto.API) { - if err := portoApi.Close(); err != nil { - log.G(ctx).WithError(err).Error("Failed to close connection to Porto service") - } + if err := portoApi.Close(); err != nil { + log.G(ctx).WithError(err).Error("Failed to close connection to Porto service") + } } func (box *Box) gatherMetrics(ctx context.Context) { - idToUuid := box.getIdUuidMapping() - - portoApi, err := portoConnect() - if err != nil { - log.G(ctx).WithError(err).Error("Failed to connect to Porto service for workers metrics collection") - return - } - defer closeApiWithLog(ctx, portoApi) - - ids := makeIdsSlice(idToUuid) - - var props portoResponse - props, err = portoApi.Get(ids, metricsNames) - if err != nil { - log.G(ctx).WithError(err).Error("Failed to connect to Porto service") - return - } - - metrics := parseMetrics(ctx, props, idToUuid) - box.setMetricsMapping(metrics) + idToUuid := box.getIdUuidMapping() + + portoApi, err := portoConnect() + if err != nil { + log.G(ctx).WithError(err).Error("Failed to connect to Porto service for workers metrics collection") + return + } + defer closeApiWithLog(ctx, portoApi) + + ids := makeIdsSlice(idToUuid) + + var props portoResponse + props, err = portoApi.Get(ids, metricsNames) + if err != nil { + log.G(ctx).WithError(err).Error("Failed to connect to Porto service") + return + } + + metrics := parseMetrics(ctx, props, idToUuid) + box.setMetricsMapping(metrics) } func (box *Box) gatherMetricsEvery(ctx context.Context, interval time.Duration) { - if interval == 0 { - log.G(ctx).Info("Porto metrics gatherer disabled (use config to setup)") - return - } + if interval == 0 { + log.G(ctx).Info("Porto metrics gatherer disabled (use config to setup)") + return + } - log.G(ctx).Infof("Initializing Porto metrics gather loop with %v duration", interval) + log.G(ctx).Infof("Initializing Porto metrics gather loop with %v duration", interval) - for { - select { - case <- ctx.Done(): - return - case <-time.After(interval): - box.gatherMetrics(ctx) - } - } + for { + select { + case <-ctx.Done(): + return + case <-time.After(interval): + box.gatherMetrics(ctx) + } + } - log.G(ctx).Info("Porto metrics gather loop canceled") + log.G(ctx).Info("Porto metrics gather loop canceled") } diff --git a/isolate/process/box.go b/isolate/process/box.go index 22da8fe..c4a69bc 100644 --- a/isolate/process/box.go +++ b/isolate/process/box.go @@ -24,7 +24,7 @@ import ( ) const ( - defaultSpoolPath = "/var/spool/cocaine" + defaultSpoolPath = "/var/spool/cocaine" expectedWorkersCount = 512 ) @@ -43,12 +43,12 @@ type codeStorage interface { type workerInfo struct { *exec.Cmd - uuid string + uuid string startTime time.Time } type taskInfo struct { - uuid string + uuid string startTime time.Time } @@ -59,7 +59,7 @@ type Box struct { spoolPath string storage codeStorage - state isolate.GlobalState + state isolate.GlobalState mu sync.Mutex children map[int]workerInfo @@ -67,7 +67,7 @@ type Box struct { spawnSm semaphore.Semaphore - muMetrics sync.Mutex + muMetrics sync.Mutex containersMetrics map[string]*isolate.WorkerMetrics } @@ -283,8 +283,8 @@ func (b *Box) Spawn(ctx context.Context, config isolate.SpawnConfig, output io.W return nil, err } b.children[pr.cmd.Process.Pid] = workerInfo{ - Cmd: pr.cmd, - uuid: config.Args["--uuid"], + Cmd: pr.cmd, + uuid: config.Args["--uuid"], startTime: newProcStarted, } b.mu.Unlock() diff --git a/isolate/process/metrics_gatherer.go b/isolate/process/metrics_gatherer.go index 7f8f744..1168fe5 100644 --- a/isolate/process/metrics_gatherer.go +++ b/isolate/process/metrics_gatherer.go @@ -4,130 +4,130 @@ package process import ( - "context" - "regexp" - "syscall" - "time" + "context" + "regexp" + "syscall" + "time" - "github.com/noxiouz/stout/isolate" - "github.com/noxiouz/stout/pkg/log" + "github.com/noxiouz/stout/isolate" + "github.com/noxiouz/stout/pkg/log" - gopsutil "github.com/shirou/gopsutil/process" - gopsnet "github.com/shirou/gopsutil/net" + gopsnet "github.com/shirou/gopsutil/net" + gopsutil "github.com/shirou/gopsutil/process" ) const eachIface = true var ( - pageSize = uint64(syscall.Getpagesize()) - spacesRegexp, _ = regexp.Compile("[ ]+") + pageSize = uint64(syscall.Getpagesize()) + spacesRegexp, _ = regexp.Compile("[ ]+") ) func makeNiceName(name string) string { - return spacesRegexp.ReplaceAllString(name, "_") + return spacesRegexp.ReplaceAllString(name, "_") } func generateNetStat(net []gopsnet.IOCountersStat) (out map[string]isolate.NetStat) { - out = make(map[string]isolate.NetStat, len(net)) + out = make(map[string]isolate.NetStat, len(net)) - for _, c := range net { - out[c.Name] = isolate.NetStat{ - RxBytes: c.BytesRecv, - TxBytes: c.BytesSent, - } - } + for _, c := range net { + out[c.Name] = isolate.NetStat{ + RxBytes: c.BytesRecv, + TxBytes: c.BytesSent, + } + } - return + return } func readProcStat(pid int, uptimeSeconds uint64) (isolate.WorkerMetrics, error) { - var ( - process *gopsutil.Process - - cpuload float64 - // netstat []gopsnet.IOCountersStat - memstat *gopsutil.MemoryInfoStat - - errStub isolate.WorkerMetrics - err error - ) - - if process, err = gopsutil.NewProcess(int32(pid)); err != nil { - return errStub, err - } - - if cpuload, err = process.CPUPercent(); err != nil { - return errStub, err - } - - if memstat, err = process.MemoryInfo(); err != nil { - return errStub, err - } - - // - // TODO: - // There is no per process network stat yet in gopsutil, - // Per process view of system stat is in `netstat` slice. - // - // Most commonly used (the only?) way to take per process network - // stats is by libpcap. - // - // if netstat, err = process.NetIOCounters(eachIface); err != nil { - // - if _, err = process.NetIOCounters(eachIface); err != nil { - return errStub, err - } - - return isolate.WorkerMetrics{ - UptimeSec: uptimeSeconds, - // CpuUsageSec: - - CpuLoad: cpuload, - Mem: memstat.VMS, - - // Per process net io stat is unimplemented. - // Net: generateNetStat(netstat), - }, nil + var ( + process *gopsutil.Process + + cpuload float64 + // netstat []gopsnet.IOCountersStat + memstat *gopsutil.MemoryInfoStat + + errStub isolate.WorkerMetrics + err error + ) + + if process, err = gopsutil.NewProcess(int32(pid)); err != nil { + return errStub, err + } + + if cpuload, err = process.CPUPercent(); err != nil { + return errStub, err + } + + if memstat, err = process.MemoryInfo(); err != nil { + return errStub, err + } + + // + // TODO: + // There is no per process network stat yet in gopsutil, + // Per process view of system stat is in `netstat` slice. + // + // Most commonly used (the only?) way to take per process network + // stats is by libpcap. + // + // if netstat, err = process.NetIOCounters(eachIface); err != nil { + // + if _, err = process.NetIOCounters(eachIface); err != nil { + return errStub, err + } + + return isolate.WorkerMetrics{ + UptimeSec: uptimeSeconds, + // CpuUsageSec: + + CpuLoad: cpuload, + Mem: memstat.VMS, + + // Per process net io stat is unimplemented. + // Net: generateNetStat(netstat), + }, nil } func (b *Box) gatherMetrics(ctx context.Context) { - ids := b.getIdUuidMapping() - metrics := make(map[string]*isolate.WorkerMetrics, len(ids)) + ids := b.getIdUuidMapping() + metrics := make(map[string]*isolate.WorkerMetrics, len(ids)) - now := time.Now() + now := time.Now() - for pid, taskInfo := range ids { - uptimeSeconds := uint64(now.Sub(taskInfo.startTime).Seconds()) + for pid, taskInfo := range ids { + uptimeSeconds := uint64(now.Sub(taskInfo.startTime).Seconds()) - if state, err := readProcStat(pid, uptimeSeconds); err != nil { - log.G(ctx).Errorf("Failed to read stat for process with pid %d", pid) - continue - } else { - metrics[taskInfo.uuid] = &state - } - } // for each taskInfo + if state, err := readProcStat(pid, uptimeSeconds); err != nil { + log.G(ctx).Errorf("Failed to read stat for process with pid %d", pid) + continue + } else { + metrics[taskInfo.uuid] = &state + } + } // for each taskInfo - b.setMetricsMapping(metrics) + b.setMetricsMapping(metrics) } func (b *Box) gatherMetricsEvery(ctx context.Context, interval time.Duration) { - if interval == 0 { - log.G(ctx).Info("Process metrics gatherer disabled (use config to setup)") - return - } + if interval == 0 { + log.G(ctx).Info("Process metrics gatherer disabled (use config to setup)") + return + } - log.G(ctx).Infof("Initializing Process metrics gather loop with %v duration", interval) + log.G(ctx).Infof("Initializing Process metrics gather loop with %v duration", interval) - for { - select { - case <- ctx.Done(): - return - case <-time.After(interval): - b.gatherMetrics(ctx) - } - } + for { + select { + case <-ctx.Done(): + return + case <-time.After(interval): + b.gatherMetrics(ctx) + } + } - log.G(ctx).Info("Cancelling Process metrics loop") + log.G(ctx).Info("Cancelling Process metrics loop") } From 7632da4dd22c03adfa8d5e1991a1a16721258b53 Mon Sep 17 00:00:00 2001 From: karitra Date: Thu, 18 Jan 2018 16:01:21 +0300 Subject: [PATCH 16/21] refact: post review changes --- isolate/initialdispatch.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/isolate/initialdispatch.go b/isolate/initialdispatch.go index 67dc1e0..afb9bd0 100644 --- a/isolate/initialdispatch.go +++ b/isolate/initialdispatch.go @@ -63,7 +63,7 @@ func readStringsSlice(r *msgp.Reader) (uuids []string, err error) { sz, err = r.ReadArrayHeader() if err != nil { - return uuids, err + return } for i := uint32(0); i < sz; i++ { @@ -71,7 +71,7 @@ func readStringsSlice(r *msgp.Reader) (uuids []string, err error) { if u, err = r.ReadString(); err == nil { uuids = append(uuids, u) } else { - return uuids, err + return } } @@ -326,11 +326,6 @@ func (d *initialDispatch) onWorkersMetrics(uuidsQuery []string) (Dispatcher, err err error ) - if d == nil { - log.G(d.ctx).Error("strange: dispatch is `nil`") - return - } - if err = msgp.Encode(&buf, &metrics); err != nil { log.G(d.ctx).WithError(err).Errorf("unable to encode containers metrics response: %v", err) d.stream.Error(d.ctx, replyMetricsError, errMarshallingError, err.Error()) From a1f520987ca4a7d7ba9b955411df3e870074984f Mon Sep 17 00:00:00 2001 From: karitra Date: Thu, 25 Jan 2018 14:31:27 +0300 Subject: [PATCH 17/21] refact: get rid of `go generate msgp` --- Makefile | 9 +- isolate/container_metrics_gen.go | 665 +++++++++++++++++++++++++++++++ 2 files changed, 668 insertions(+), 6 deletions(-) create mode 100644 isolate/container_metrics_gen.go diff --git a/Makefile b/Makefile index c6883bc..e55f485 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,7 @@ LDFLAGS=-ldflags "-X github.com/noxiouz/stout/version.GitTag=${TAG} -X github.co .DEFAULT: all -.PHONY: fmt vet test gen_msgp +.PHONY: fmt vet test PKGS := $(shell go list ./... | grep -v ^github.com/noxiouz/stout/vendor/ | grep -v ^github.com/noxiouz/stout/version) @@ -32,14 +32,11 @@ test: cat profile.out >> coverage.txt; rm profile.out; \ fi done; \ -build: gen_msgp +build: @echo "+ $@" go build ${LDFLAGS} -o ${NAME} github.com/noxiouz/stout/cmd/stout -build_travis_release: gen_msgp +build_travis_release: @echo "+ $@" env GOOS="linux" go build ${LDFLAGS} -o ${NAME} github.com/noxiouz/stout/cmd/stout env GOOS="darwin" go build ${LDFLAGS} -o ${NAME}_osx github.com/noxiouz/stout/cmd/stout - -gen_msgp: - @cd ./isolate; go generate; cd .. diff --git a/isolate/container_metrics_gen.go b/isolate/container_metrics_gen.go new file mode 100644 index 0000000..4c6af20 --- /dev/null +++ b/isolate/container_metrics_gen.go @@ -0,0 +1,665 @@ +package isolate + +// NOTE: THIS FILE WAS PRODUCED BY THE +// MSGP CODE GENERATION TOOL (github.com/tinylib/msgp) +// DO NOT EDIT + +import ( + "github.com/tinylib/msgp/msgp" +) + +// DecodeMsg implements msgp.Decodable +func (z *MarkedWorkerMetrics) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + return + } + switch msgp.UnsafeString(field) { + default: + err = dc.Skip() + if err != nil { + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z MarkedWorkerMetrics) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 0 + err = en.Append(0x80) + if err != nil { + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z MarkedWorkerMetrics) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 0 + o = append(o, 0x80) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *MarkedWorkerMetrics) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + return + } + switch msgp.UnsafeString(field) { + default: + bts, err = msgp.Skip(bts) + if err != nil { + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z MarkedWorkerMetrics) Msgsize() (s int) { + s = 1 + return +} + +// DecodeMsg implements msgp.Decodable +func (z *MetricsResponse) DecodeMsg(dc *msgp.Reader) (err error) { + var zb0003 uint32 + zb0003, err = dc.ReadMapHeader() + if err != nil { + return + } + if (*z) == nil && zb0003 > 0 { + (*z) = make(MetricsResponse, zb0003) + } else if len((*z)) > 0 { + for key, _ := range *z { + delete((*z), key) + } + } + for zb0003 > 0 { + zb0003-- + var zb0001 string + var zb0002 *WorkerMetrics + zb0001, err = dc.ReadString() + if err != nil { + return + } + if dc.IsNil() { + err = dc.ReadNil() + if err != nil { + return + } + zb0002 = nil + } else { + if zb0002 == nil { + zb0002 = new(WorkerMetrics) + } + err = zb0002.DecodeMsg(dc) + if err != nil { + return + } + } + (*z)[zb0001] = zb0002 + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z MetricsResponse) EncodeMsg(en *msgp.Writer) (err error) { + err = en.WriteMapHeader(uint32(len(z))) + if err != nil { + return + } + for zb0004, zb0005 := range z { + err = en.WriteString(zb0004) + if err != nil { + return + } + if zb0005 == nil { + err = en.WriteNil() + if err != nil { + return + } + } else { + err = zb0005.EncodeMsg(en) + if err != nil { + return + } + } + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z MetricsResponse) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + o = msgp.AppendMapHeader(o, uint32(len(z))) + for zb0004, zb0005 := range z { + o = msgp.AppendString(o, zb0004) + if zb0005 == nil { + o = msgp.AppendNil(o) + } else { + o, err = zb0005.MarshalMsg(o) + if err != nil { + return + } + } + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *MetricsResponse) UnmarshalMsg(bts []byte) (o []byte, err error) { + var zb0003 uint32 + zb0003, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + return + } + if (*z) == nil && zb0003 > 0 { + (*z) = make(MetricsResponse, zb0003) + } else if len((*z)) > 0 { + for key, _ := range *z { + delete((*z), key) + } + } + for zb0003 > 0 { + var zb0001 string + var zb0002 *WorkerMetrics + zb0003-- + zb0001, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + return + } + if msgp.IsNil(bts) { + bts, err = msgp.ReadNilBytes(bts) + if err != nil { + return + } + zb0002 = nil + } else { + if zb0002 == nil { + zb0002 = new(WorkerMetrics) + } + bts, err = zb0002.UnmarshalMsg(bts) + if err != nil { + return + } + } + (*z)[zb0001] = zb0002 + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z MetricsResponse) Msgsize() (s int) { + s = msgp.MapHeaderSize + if z != nil { + for zb0004, zb0005 := range z { + _ = zb0005 + s += msgp.StringPrefixSize + len(zb0004) + if zb0005 == nil { + s += msgp.NilSize + } else { + s += zb0005.Msgsize() + } + } + } + return +} + +// DecodeMsg implements msgp.Decodable +func (z *NetStat) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + return + } + switch msgp.UnsafeString(field) { + case "rx_bytes": + z.RxBytes, err = dc.ReadUint64() + if err != nil { + return + } + case "tx_bytes": + z.TxBytes, err = dc.ReadUint64() + if err != nil { + return + } + default: + err = dc.Skip() + if err != nil { + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z NetStat) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 2 + // write "rx_bytes" + err = en.Append(0x82, 0xa8, 0x72, 0x78, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73) + if err != nil { + return + } + err = en.WriteUint64(z.RxBytes) + if err != nil { + return + } + // write "tx_bytes" + err = en.Append(0xa8, 0x74, 0x78, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73) + if err != nil { + return + } + err = en.WriteUint64(z.TxBytes) + if err != nil { + return + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z NetStat) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 2 + // string "rx_bytes" + o = append(o, 0x82, 0xa8, 0x72, 0x78, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73) + o = msgp.AppendUint64(o, z.RxBytes) + // string "tx_bytes" + o = append(o, 0xa8, 0x74, 0x78, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73) + o = msgp.AppendUint64(o, z.TxBytes) + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *NetStat) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + return + } + switch msgp.UnsafeString(field) { + case "rx_bytes": + z.RxBytes, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + return + } + case "tx_bytes": + z.TxBytes, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z NetStat) Msgsize() (s int) { + s = 1 + 9 + msgp.Uint64Size + 9 + msgp.Uint64Size + return +} + +// DecodeMsg implements msgp.Decodable +func (z *WorkerMetrics) DecodeMsg(dc *msgp.Reader) (err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, err = dc.ReadMapHeader() + if err != nil { + return + } + for zb0001 > 0 { + zb0001-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + return + } + switch msgp.UnsafeString(field) { + case "uptime": + z.UptimeSec, err = dc.ReadUint64() + if err != nil { + return + } + case "cpu_usage": + z.CpuUsageSec, err = dc.ReadUint64() + if err != nil { + return + } + case "cpu_load": + z.CpuLoad, err = dc.ReadFloat32() + if err != nil { + return + } + case "mem": + z.Mem, err = dc.ReadUint64() + if err != nil { + return + } + case "net": + var zb0002 uint32 + zb0002, err = dc.ReadMapHeader() + if err != nil { + return + } + if z.Net == nil && zb0002 > 0 { + z.Net = make(map[string]NetStat, zb0002) + } else if len(z.Net) > 0 { + for key, _ := range z.Net { + delete(z.Net, key) + } + } + for zb0002 > 0 { + zb0002-- + var za0001 string + var za0002 NetStat + za0001, err = dc.ReadString() + if err != nil { + return + } + var zb0003 uint32 + zb0003, err = dc.ReadMapHeader() + if err != nil { + return + } + for zb0003 > 0 { + zb0003-- + field, err = dc.ReadMapKeyPtr() + if err != nil { + return + } + switch msgp.UnsafeString(field) { + case "rx_bytes": + za0002.RxBytes, err = dc.ReadUint64() + if err != nil { + return + } + case "tx_bytes": + za0002.TxBytes, err = dc.ReadUint64() + if err != nil { + return + } + default: + err = dc.Skip() + if err != nil { + return + } + } + } + z.Net[za0001] = za0002 + } + default: + err = dc.Skip() + if err != nil { + return + } + } + } + return +} + +// EncodeMsg implements msgp.Encodable +func (z *WorkerMetrics) EncodeMsg(en *msgp.Writer) (err error) { + // map header, size 5 + // write "uptime" + err = en.Append(0x85, 0xa6, 0x75, 0x70, 0x74, 0x69, 0x6d, 0x65) + if err != nil { + return + } + err = en.WriteUint64(z.UptimeSec) + if err != nil { + return + } + // write "cpu_usage" + err = en.Append(0xa9, 0x63, 0x70, 0x75, 0x5f, 0x75, 0x73, 0x61, 0x67, 0x65) + if err != nil { + return + } + err = en.WriteUint64(z.CpuUsageSec) + if err != nil { + return + } + // write "cpu_load" + err = en.Append(0xa8, 0x63, 0x70, 0x75, 0x5f, 0x6c, 0x6f, 0x61, 0x64) + if err != nil { + return + } + err = en.WriteFloat32(z.CpuLoad) + if err != nil { + return + } + // write "mem" + err = en.Append(0xa3, 0x6d, 0x65, 0x6d) + if err != nil { + return + } + err = en.WriteUint64(z.Mem) + if err != nil { + return + } + // write "net" + err = en.Append(0xa3, 0x6e, 0x65, 0x74) + if err != nil { + return + } + err = en.WriteMapHeader(uint32(len(z.Net))) + if err != nil { + return + } + for za0001, za0002 := range z.Net { + err = en.WriteString(za0001) + if err != nil { + return + } + // map header, size 2 + // write "rx_bytes" + err = en.Append(0x82, 0xa8, 0x72, 0x78, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73) + if err != nil { + return + } + err = en.WriteUint64(za0002.RxBytes) + if err != nil { + return + } + // write "tx_bytes" + err = en.Append(0xa8, 0x74, 0x78, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73) + if err != nil { + return + } + err = en.WriteUint64(za0002.TxBytes) + if err != nil { + return + } + } + return +} + +// MarshalMsg implements msgp.Marshaler +func (z *WorkerMetrics) MarshalMsg(b []byte) (o []byte, err error) { + o = msgp.Require(b, z.Msgsize()) + // map header, size 5 + // string "uptime" + o = append(o, 0x85, 0xa6, 0x75, 0x70, 0x74, 0x69, 0x6d, 0x65) + o = msgp.AppendUint64(o, z.UptimeSec) + // string "cpu_usage" + o = append(o, 0xa9, 0x63, 0x70, 0x75, 0x5f, 0x75, 0x73, 0x61, 0x67, 0x65) + o = msgp.AppendUint64(o, z.CpuUsageSec) + // string "cpu_load" + o = append(o, 0xa8, 0x63, 0x70, 0x75, 0x5f, 0x6c, 0x6f, 0x61, 0x64) + o = msgp.AppendFloat32(o, z.CpuLoad) + // string "mem" + o = append(o, 0xa3, 0x6d, 0x65, 0x6d) + o = msgp.AppendUint64(o, z.Mem) + // string "net" + o = append(o, 0xa3, 0x6e, 0x65, 0x74) + o = msgp.AppendMapHeader(o, uint32(len(z.Net))) + for za0001, za0002 := range z.Net { + o = msgp.AppendString(o, za0001) + // map header, size 2 + // string "rx_bytes" + o = append(o, 0x82, 0xa8, 0x72, 0x78, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73) + o = msgp.AppendUint64(o, za0002.RxBytes) + // string "tx_bytes" + o = append(o, 0xa8, 0x74, 0x78, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73) + o = msgp.AppendUint64(o, za0002.TxBytes) + } + return +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *WorkerMetrics) UnmarshalMsg(bts []byte) (o []byte, err error) { + var field []byte + _ = field + var zb0001 uint32 + zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + return + } + for zb0001 > 0 { + zb0001-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + return + } + switch msgp.UnsafeString(field) { + case "uptime": + z.UptimeSec, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + return + } + case "cpu_usage": + z.CpuUsageSec, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + return + } + case "cpu_load": + z.CpuLoad, bts, err = msgp.ReadFloat32Bytes(bts) + if err != nil { + return + } + case "mem": + z.Mem, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + return + } + case "net": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + return + } + if z.Net == nil && zb0002 > 0 { + z.Net = make(map[string]NetStat, zb0002) + } else if len(z.Net) > 0 { + for key, _ := range z.Net { + delete(z.Net, key) + } + } + for zb0002 > 0 { + var za0001 string + var za0002 NetStat + zb0002-- + za0001, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + return + } + var zb0003 uint32 + zb0003, bts, err = msgp.ReadMapHeaderBytes(bts) + if err != nil { + return + } + for zb0003 > 0 { + zb0003-- + field, bts, err = msgp.ReadMapKeyZC(bts) + if err != nil { + return + } + switch msgp.UnsafeString(field) { + case "rx_bytes": + za0002.RxBytes, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + return + } + case "tx_bytes": + za0002.TxBytes, bts, err = msgp.ReadUint64Bytes(bts) + if err != nil { + return + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + return + } + } + } + z.Net[za0001] = za0002 + } + default: + bts, err = msgp.Skip(bts) + if err != nil { + return + } + } + } + o = bts + return +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z *WorkerMetrics) Msgsize() (s int) { + s = 1 + 7 + msgp.Uint64Size + 10 + msgp.Uint64Size + 9 + msgp.Float32Size + 4 + msgp.Uint64Size + 4 + msgp.MapHeaderSize + if z.Net != nil { + for za0001, za0002 := range z.Net { + _ = za0002 + s += msgp.StringPrefixSize + len(za0001) + 1 + 9 + msgp.Uint64Size + 9 + msgp.Uint64Size + } + } + return +} From cf5aa806ce77f44e6cfd9bd6e1dd6161939b402c Mon Sep 17 00:00:00 2001 From: karitra Date: Thu, 25 Jan 2018 14:33:23 +0300 Subject: [PATCH 18/21] refact(porto): display less errors --- isolate/container_metrics.go | 2 +- isolate/porto/metrics_gatherer.go | 49 ++++++++++++++++--------------- 2 files changed, 27 insertions(+), 24 deletions(-) diff --git a/isolate/container_metrics.go b/isolate/container_metrics.go index c85b03e..636e399 100644 --- a/isolate/container_metrics.go +++ b/isolate/container_metrics.go @@ -12,7 +12,7 @@ type ( UptimeSec uint64 `msg:"uptime"` CpuUsageSec uint64 `msg:"cpu_usage"` - CpuLoad float64 `msg:"cpu_load"` + CpuLoad float32 `msg:"cpu_load"` Mem uint64 `msg:"mem"` diff --git a/isolate/porto/metrics_gatherer.go b/isolate/porto/metrics_gatherer.go index 6522cd7..e87f161 100644 --- a/isolate/porto/metrics_gatherer.go +++ b/isolate/porto/metrics_gatherer.go @@ -31,6 +31,7 @@ var ( const ( nanosPerSecond = 1000000000 + errorsToDisplay = 4 ) const ( @@ -52,23 +53,24 @@ type netIfStat struct { // func parseNetPair(eth string) (nstat netIfStat, err error) { pair := strings.Split(eth, ": ") - if len(pair) == pairLen { - var v uint64 - trimmedStr := strings.Trim(pair[pairVal], " ") - v, err = strconv.ParseUint(trimmedStr, 10, 64) - if err != nil { - return - } + if len(pair) != pairLen { + err = fmt.Errorf("wrong fields count") + return + } - name := strings.Trim(pair[pairName], " ") - name = spacesRegexp.ReplaceAllString(name, "_") + var v uint64 + trimmedStr := strings.Trim(pair[pairVal], " ") + v, err = strconv.ParseUint(trimmedStr, 10, 64) + if err != nil { + return + } - nstat = netIfStat{ - name: name, - bytesCount: v, - } - } else { - err = fmt.Errorf("Failed to parse net record") + name := strings.Trim(pair[pairName], " ") + name = spacesRegexp.ReplaceAllString(name, "_") + + nstat = netIfStat{ + name: name, + bytesCount: v, } return @@ -93,7 +95,7 @@ func parseUintProp(raw rawMetrics, propName string) (v uint64, err error) { } if len(s.Value) == 0 { - return v, fmt.Errorf("property is empty string") + return v, fmt.Errorf("property [%s] is empty string", propName) } return strconv.ParseUint(s.Value, 10, 64) @@ -120,7 +122,7 @@ func makeMetricsFromMap(raw rawMetrics) (m isolate.WorkerMetrics, err error) { } if m.UptimeSec > 0 { - m.CpuLoad = float64(m.CpuUsageSec) / float64(nanosPerSecond) / float64(m.UptimeSec) + m.CpuLoad = float32(m.CpuUsageSec) / float32(nanosPerSecond) / float32(m.UptimeSec) } // Porto's `cpu_usage` is in nanoseconds, seconds in metrics are used. @@ -157,17 +159,18 @@ func parseMetrics(ctx context.Context, props portoResponse, idToUuid map[string] continue } - if m, err := makeMetricsFromMap(rawMetrics); err != nil { - parse_errors = append(parse_errors, err.Error()) - continue - } else { + if m, err := makeMetricsFromMap(rawMetrics); err == nil { metrics[uuid] = &m + } else { + parse_errors = append(parse_errors, err.Error()) } - } if len(parse_errors) != 0 { - log.G(ctx).Errorf("Failed to parse raw metrics with error %s", strings.Join(parse_errors, ", ")) + if len(parse_errors) > errorsToDisplay { + parse_errors = parse_errors[:errorsToDisplay] + } + log.G(ctx).Errorf("Failed to parse raw metrics with errors: %s", strings.Join(parse_errors, ", ")) } return metrics From a2e0be3d440ea31052740b4821e7611df024200a Mon Sep 17 00:00:00 2001 From: karitra Date: Thu, 25 Jan 2018 15:15:47 +0300 Subject: [PATCH 19/21] refact(process): drop gopsutil (it is fat) --- isolate/process/box.go | 13 +- isolate/process/metrics_gatherer.go | 229 +++++++++++++++++++++------- 2 files changed, 178 insertions(+), 64 deletions(-) diff --git a/isolate/process/box.go b/isolate/process/box.go index c4a69bc..503d46c 100644 --- a/isolate/process/box.go +++ b/isolate/process/box.go @@ -44,12 +44,6 @@ type codeStorage interface { type workerInfo struct { *exec.Cmd uuid string - startTime time.Time -} - -type taskInfo struct { - uuid string - startTime time.Time } type Box struct { @@ -285,7 +279,6 @@ func (b *Box) Spawn(ctx context.Context, config isolate.SpawnConfig, output io.W b.children[pr.cmd.Process.Pid] = workerInfo{ Cmd: pr.cmd, uuid: config.Args["--uuid"], - startTime: newProcStarted, } b.mu.Unlock() @@ -347,15 +340,15 @@ func (b *Box) QueryMetrics(uuids []string) (r []isolate.MarkedWorkerMetrics) { return } -func (b *Box) getIdUuidMapping() (result map[int]taskInfo) { +func (b *Box) getIdUuidMapping() (result map[int]string) { // TODO: is len(b.children) safe to use as `expectedWorkersCount` - result = make(map[int]taskInfo, expectedWorkersCount) + result = make(map[int]string, expectedWorkersCount) b.mu.Lock() defer b.mu.Unlock() for pid, kid := range b.children { - result[pid] = taskInfo{uuid: kid.uuid, startTime: kid.startTime} + result[pid] = kid.uuid } return diff --git a/isolate/process/metrics_gatherer.go b/isolate/process/metrics_gatherer.go index 1168fe5..42e961e 100644 --- a/isolate/process/metrics_gatherer.go +++ b/isolate/process/metrics_gatherer.go @@ -4,107 +4,223 @@ package process import ( + "bytes" + "bufio" "context" + "fmt" + "io/ioutil" "regexp" + "strconv" + "strings" "syscall" "time" "github.com/noxiouz/stout/isolate" "github.com/noxiouz/stout/pkg/log" - - gopsnet "github.com/shirou/gopsutil/net" - gopsutil "github.com/shirou/gopsutil/process" ) -const eachIface = true +const clockTicks = 100 // sysconf(_SC_CLK_TCK) var ( pageSize = uint64(syscall.Getpagesize()) spacesRegexp, _ = regexp.Compile("[ ]+") ) -func makeNiceName(name string) string { - return spacesRegexp.ReplaceAllString(name, "_") +type ( + memStat struct { + vms uint64 + rss uint64 + } +) + +// /proc//statm fields (see `man proc` for details) +const ( + statmVMS = iota + statmRSS + + statmShare + statmText + statmLib + statmData + statmDt + + statmFieldsCount +) + +const ( + statUtime = 13 + statStime = 14 + + statStartTime = 21 + + statFieldsCount = 44 +) + +const ( + pairKey = iota + pairVal + pairLen +) + +func readLines(b []byte) (text []string) { + reader := bytes.NewBuffer(b) + scanner := bufio.NewScanner(reader) + + for scanner.Scan() { + text = append(text, scanner.Text()) + } + + return } -func generateNetStat(net []gopsnet.IOCountersStat) (out map[string]isolate.NetStat) { - out = make(map[string]isolate.NetStat, len(net)) +func loadSysBootTime() (bt uint64, err error) { + var b []byte + if b, err = ioutil.ReadFile("/proc/stat"); err != nil { + return + } + + for _, ln := range readLines(b) { + if strings.HasPrefix(ln, "btime") { + fields := strings.Fields(ln) + if len(fields) < pairLen { + return bt, fmt.Errorf("incorrect count of fields in `btime` record: %d", len(fields)) + } - for _, c := range net { - out[c.Name] = isolate.NetStat{ - RxBytes: c.BytesRecv, - TxBytes: c.BytesSent, + return strconv.ParseUint(fields[pairVal], 10, 64) } } return } -func readProcStat(pid int, uptimeSeconds uint64) (isolate.WorkerMetrics, error) { +func getProcPath(pid int, file string) string { + return fmt.Sprintf("/proc/%d/%s", pid, file) +} - var ( - process *gopsutil.Process +func getProcContent(pid int, file string) (content string, err error) { + var b []byte - cpuload float64 - // netstat []gopsnet.IOCountersStat - memstat *gopsutil.MemoryInfoStat + if b, err = ioutil.ReadFile(getProcPath(pid, file)); err != nil { + return + } - errStub isolate.WorkerMetrics - err error - ) + content = string(b) + return +} - if process, err = gopsutil.NewProcess(int32(pid)); err != nil { - return errStub, err +func readMemStat(pid int) (mstat memStat, err error) { + var content string + if content, err = getProcContent(pid, "statm"); err != nil { + return } - if cpuload, err = process.CPUPercent(); err != nil { - return errStub, err + fields := strings.Fields(content) + if len(fields) < statmFieldsCount { + err = fmt.Errorf("wrong number of fields in `statm` file: %d, but shoud be greater or equal to %d", len(fields), statmFieldsCount) + return } - if memstat, err = process.MemoryInfo(); err != nil { - return errStub, err + var vms, rss uint64 + vms, err = strconv.ParseUint(fields[statmVMS], 10, 64) + if err != nil { + return + } + + rss, err = strconv.ParseUint(fields[statmRSS], 10, 64) + if err != nil { + return + } + + mstat = memStat{ + vms: vms * pageSize, + rss: rss * pageSize, } - // - // TODO: - // There is no per process network stat yet in gopsutil, - // Per process view of system stat is in `netstat` slice. - // - // Most commonly used (the only?) way to take per process network - // stats is by libpcap. - // - // if netstat, err = process.NetIOCounters(eachIface); err != nil { - // - if _, err = process.NetIOCounters(eachIface); err != nil { - return errStub, err + return +} + +func readCPUPercent(pid int, bootTime uint64) (cpu float32, uptime uint64, err error) { + var content string + if content, err = getProcContent(pid, "stat"); err != nil { + return } - return isolate.WorkerMetrics{ + fields := strings.Fields(content) + if len(fields) < statFieldsCount { + err = fmt.Errorf("wrong number of fields in `statm` file: %d, but shoud be greater or equal to %d", len(fields), statFieldsCount) + return + } + + var utime, stime, startedAt uint64 + if utime, err = strconv.ParseUint(fields[statUtime], 10, 64); err != nil { + return + } + + if stime, err = strconv.ParseUint(fields[statStime], 10, 64); err != nil { + return + } + + if startedAt, err = strconv.ParseUint(fields[statStartTime], 10, 64); err != nil { + return + } + + utimeSec := float64(utime) / clockTicks + stimeSec := float64(stime) / clockTicks + + startedAt = bootTime + startedAt / clockTicks + created := time.Unix(0, int64(startedAt * uint64(time.Second))) + + total := float64(utimeSec + stimeSec) + if runtime := time.Since(created).Seconds(); runtime > 0 { + uptime = uint64(runtime) + cpu = float32(100 * total / runtime) + } + + return +} + +func makeNiceName(name string) string { + return spacesRegexp.ReplaceAllString(name, "_") +} + +func readProcStat(pid int, bootTime uint64) (stat isolate.WorkerMetrics,err error) { + var ( + cpuload float32 + uptimeSeconds uint64 + memstat memStat + ) + + if cpuload, uptimeSeconds, err = readCPUPercent(pid, bootTime); err != nil { + return + } + + if memstat, err = readMemStat(pid); err != nil { + return + } + + stat = isolate.WorkerMetrics{ UptimeSec: uptimeSeconds, // CpuUsageSec: CpuLoad: cpuload, - Mem: memstat.VMS, + Mem: memstat.vms, // Per process net io stat is unimplemented. // Net: generateNetStat(netstat), - }, nil + } + + return } -func (b *Box) gatherMetrics(ctx context.Context) { +func (b *Box) gatherMetrics(ctx context.Context, bootTime uint64) { ids := b.getIdUuidMapping() metrics := make(map[string]*isolate.WorkerMetrics, len(ids)) - now := time.Now() - - for pid, taskInfo := range ids { - uptimeSeconds := uint64(now.Sub(taskInfo.startTime).Seconds()) - - if state, err := readProcStat(pid, uptimeSeconds); err != nil { - log.G(ctx).Errorf("Failed to read stat for process with pid %d", pid) - continue + for pid, uuid := range ids { + if stat, err := readProcStat(pid, bootTime); err == nil { + metrics[uuid] = &stat } else { - metrics[taskInfo.uuid] = &state + log.G(ctx).Errorf("Failed to read stat, pid: %d, err: %v", pid, err) } } // for each taskInfo @@ -112,7 +228,6 @@ func (b *Box) gatherMetrics(ctx context.Context) { } func (b *Box) gatherMetricsEvery(ctx context.Context, interval time.Duration) { - if interval == 0 { log.G(ctx).Info("Process metrics gatherer disabled (use config to setup)") return @@ -120,12 +235,18 @@ func (b *Box) gatherMetricsEvery(ctx context.Context, interval time.Duration) { log.G(ctx).Infof("Initializing Process metrics gather loop with %v duration", interval) + bootTime, err := loadSysBootTime() + if err != nil { + log.G(ctx).Errorf("Error while reading system boot time %v", err) + return + } + for { select { case <-ctx.Done(): return case <-time.After(interval): - b.gatherMetrics(ctx) + b.gatherMetrics(ctx, bootTime) } } From e9a04419013314c1ede7f90f04d090b3d6fde4a3 Mon Sep 17 00:00:00 2001 From: karitra Date: Thu, 25 Jan 2018 15:19:50 +0300 Subject: [PATCH 20/21] test: QueryMetrics, WriteMessage stubs --- isolate/d_test.go | 8 ++++++++ isolate/porto/metrics_gatherer.go | 5 +++++ 2 files changed, 13 insertions(+) diff --git a/isolate/d_test.go b/isolate/d_test.go index 66a15e5..4f7ce0f 100644 --- a/isolate/d_test.go +++ b/isolate/d_test.go @@ -36,6 +36,10 @@ func (t *testDownstream) Write(ctx context.Context, code uint64, data []byte) er return nil } +func (t *testDownstream) WriteMessage(ctx context.Context, num uint64, packedPayload []byte) error { + return nil +} + func (t *testDownstream) Error(ctx context.Context, code uint64, errorcode [2]int, msg string) error { t.ch <- testDownstreamItem{code, []interface{}{errorcode, msg}} return nil @@ -72,6 +76,10 @@ func (b *testBox) Close() error { return nil } +func (b *testBox) QueryMetrics(uuids []string) (response []MarkedWorkerMetrics) { + return +} + type testProcess struct { ctx context.Context killed chan struct{} diff --git a/isolate/porto/metrics_gatherer.go b/isolate/porto/metrics_gatherer.go index e87f161..74df85c 100644 --- a/isolate/porto/metrics_gatherer.go +++ b/isolate/porto/metrics_gatherer.go @@ -12,6 +12,8 @@ import ( "strings" "time" + apexlog "github.com/apex/log" + "github.com/noxiouz/stout/isolate" "github.com/noxiouz/stout/pkg/log" @@ -190,6 +192,7 @@ func closeApiWithLog(ctx context.Context, portoApi porto.API) { } func (box *Box) gatherMetrics(ctx context.Context) { + startTime := time.Now() idToUuid := box.getIdUuidMapping() portoApi, err := portoConnect() @@ -210,6 +213,8 @@ func (box *Box) gatherMetrics(ctx context.Context) { metrics := parseMetrics(ctx, props, idToUuid) box.setMetricsMapping(metrics) + + log.G(ctx).WithFields(apexlog.Fields{"time": time.Since(startTime), "boxes": len(ids)}).Debug("Finished metrics gather iteration") } func (box *Box) gatherMetricsEvery(ctx context.Context, interval time.Duration) { From e27c3b4d4d064897294a0ec372dc1ea07a978052 Mon Sep 17 00:00:00 2001 From: karitra Date: Fri, 26 Jan 2018 14:22:32 +0300 Subject: [PATCH 21/21] refact: post `pernicious` ghost review updates --- isolate/porto/box.go | 2 +- isolate/porto/metrics_gatherer.go | 24 ++++++++++++++++-------- isolate/process/metrics_gatherer.go | 16 +++++++++++----- 3 files changed, 28 insertions(+), 14 deletions(-) diff --git a/isolate/porto/box.go b/isolate/porto/box.go index 70889a6..e799ac9 100644 --- a/isolate/porto/box.go +++ b/isolate/porto/box.go @@ -604,7 +604,7 @@ func (b *Box) setMetricsMapping(m map[string]*isolate.WorkerMetrics) { b.containersMetrics = m } -func (b *Box) getMetricsMapping() (m map[string]*isolate.WorkerMetrics) { +func (b *Box) getMetricsMapping() map[string]*isolate.WorkerMetrics { b.muMetrics.Lock() defer b.muMetrics.Unlock() diff --git a/isolate/porto/metrics_gatherer.go b/isolate/porto/metrics_gatherer.go index 74df85c..19e2312 100644 --- a/isolate/porto/metrics_gatherer.go +++ b/isolate/porto/metrics_gatherer.go @@ -89,15 +89,18 @@ func parseNetValues(val porto.TPortoGetResponse) (ifs []netIfStat) { return } -// TODO: check property Error/ErrorMsg fields +func isBroken(r porto.TPortoGetResponse) bool { + return len(r.Value) == 0 || r.Error != 0 || len(r.ErrorMsg) > 0 +} + func parseUintProp(raw rawMetrics, propName string) (v uint64, err error) { s, ok := raw[propName] if !ok { - return 0, fmt.Errorf("no such prop in Porto: %s", propName) + return 0, fmt.Errorf("no such prop in Porto response: %s", propName) } - if len(s.Value) == 0 { - return v, fmt.Errorf("property [%s] is empty string", propName) + if isBroken(s) { + return v, fmt.Errorf("property record [%s] is broken, val [%s], err code %d, err msg %s", propName, s.Value, s.Error, s.ErrorMsg) } return strconv.ParseUint(s.Value, 10, 64) @@ -207,7 +210,7 @@ func (box *Box) gatherMetrics(ctx context.Context) { var props portoResponse props, err = portoApi.Get(ids, metricsNames) if err != nil { - log.G(ctx).WithError(err).Error("Failed to connect to Porto service") + log.G(ctx).WithError(err).Error("Failed to get metrics from Porto service") return } @@ -226,14 +229,19 @@ func (box *Box) gatherMetricsEvery(ctx context.Context, interval time.Duration) log.G(ctx).Infof("Initializing Porto metrics gather loop with %v duration", interval) + // Note that `ctx` would be done (cancelled) at this point, + // but internal logger shoud be still available. + defer log.G(ctx).Info("Porto metrics gather loop cancelled") + + tick := time.NewTicker(interval) + defer tick.Stop() + for { select { case <-ctx.Done(): return - case <-time.After(interval): + case <-tick.C: box.gatherMetrics(ctx) } } - - log.G(ctx).Info("Porto metrics gather loop canceled") } diff --git a/isolate/process/metrics_gatherer.go b/isolate/process/metrics_gatherer.go index 42e961e..5383d75 100644 --- a/isolate/process/metrics_gatherer.go +++ b/isolate/process/metrics_gatherer.go @@ -73,6 +73,7 @@ func readLines(b []byte) (text []string) { return } +// `bt` in seconds, Unix time since epoch, as usual func loadSysBootTime() (bt uint64, err error) { var b []byte if b, err = ioutil.ReadFile("/proc/stat"); err != nil { @@ -235,20 +236,25 @@ func (b *Box) gatherMetricsEvery(ctx context.Context, interval time.Duration) { log.G(ctx).Infof("Initializing Process metrics gather loop with %v duration", interval) - bootTime, err := loadSysBootTime() + bootTimeSec, err := loadSysBootTime() if err != nil { log.G(ctx).Errorf("Error while reading system boot time %v", err) return } + // Note that `ctx` would be done (cancelled) at this point, + // but internal logger shoud be still available. + defer log.G(ctx).Info("Process metrics polling has been cancelled") + + tick := time.NewTicker(interval) + defer tick.Stop() + for { select { case <-ctx.Done(): return - case <-time.After(interval): - b.gatherMetrics(ctx, bootTime) + case <-tick.C: + b.gatherMetrics(ctx, bootTimeSec) } } - - log.G(ctx).Info("Cancelling Process metrics loop") }