Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion agent/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,6 @@ ENV MONITORING_RESOURCE_LABELS ""
ENV METRIC_DOMAIN ""
ENV FORCE_HTTP2 "false"
ENV REQUEST_FORWARDING_TIMEOUT "60s"
ENV STATS_ADDR ""

CMD ["/bin/sh", "-c", "/opt/bin/proxy-forwarding-agent --debug=${DEBUG} --proxy=${PROXY} --proxy-timeout=${PROXY_TIMEOUT} --request-forwarding-timeout=${REQUEST_FORWARDING_TIMEOUT} --backend=${BACKEND} --host=${HOSTNAME}:${PORT} --shim-websockets=${SHIM_WEBSOCKETS} --shim-path=${SHIM_PATH} --health-check-path=${HEALTH_CHECK_PATH} --health-check-interval-seconds=${HEALTH_CHECK_INTERVAL_SECONDS} --health-check-unhealthy-threshold=${HEALTH_CHECK_UNHEALTHY_THRESHOLD} --session-cookie-name=${SESSION_COOKIE_NAME} --forward-user-id=${FORWARD_USER_ID} --rewrite-websocket-host=${REWRITE_WEBSOCKET_HOST} --monitoring-project-id=${MONITORING_PROJECT_ID} --monitoring-resource-labels=${MONITORING_RESOURCE_LABELS} --metric-domain=${METRIC_DOMAIN} --force-http2=${FORCE_HTTP2}"]
CMD ["/bin/sh", "-c", "/opt/bin/proxy-forwarding-agent --debug=${DEBUG} --proxy=${PROXY} --proxy-timeout=${PROXY_TIMEOUT} --request-forwarding-timeout=${REQUEST_FORWARDING_TIMEOUT} --backend=${BACKEND} --host=${HOSTNAME}:${PORT} --shim-websockets=${SHIM_WEBSOCKETS} --shim-path=${SHIM_PATH} --health-check-path=${HEALTH_CHECK_PATH} --health-check-interval-seconds=${HEALTH_CHECK_INTERVAL_SECONDS} --health-check-unhealthy-threshold=${HEALTH_CHECK_UNHEALTHY_THRESHOLD} --session-cookie-name=${SESSION_COOKIE_NAME} --forward-user-id=${FORWARD_USER_ID} --rewrite-websocket-host=${REWRITE_WEBSOCKET_HOST} --monitoring-project-id=${MONITORING_PROJECT_ID} --monitoring-resource-labels=${MONITORING_RESOURCE_LABELS} --metric-domain=${METRIC_DOMAIN} --force-http2=${FORCE_HTTP2} --stats-addr=${STATS_ADDR}"]
15 changes: 14 additions & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/google/inverting-proxy/agent/banner"
"github.com/google/inverting-proxy/agent/metrics"
"github.com/google/inverting-proxy/agent/sessions"
"github.com/google/inverting-proxy/agent/stats"
"github.com/google/inverting-proxy/agent/utils"
"github.com/google/inverting-proxy/agent/websockets"
)
Expand Down Expand Up @@ -87,6 +88,7 @@ var (
sessionCookieCacheLimit = flag.Int("session-cookie-cache-limit", 1000, "Upper bound on the number of concurrent sessions that can be tracked by the agent")
rewriteWebsocketHost = flag.Bool("rewrite-websocket-host", false, "Whether to rewrite the Host header to the original request when shimming a websocket connection")
stripCredentials = flag.Bool("strip-credentials", false, "Whether to strip the Authorization header from all requests.")
statsAddr = flag.String("stats-addr", "", "If non-empty, local address to serve HTTP page stats on. Serves on /stats")

projectID = flag.String("monitoring-project-id", "", "Name of the GCP project id")
metricDomain = flag.String("metric-domain", "", "Domain under which to write metrics eg. notebooks.googleapis.com")
Expand Down Expand Up @@ -162,9 +164,12 @@ func forwardRequest(client *http.Client, hostProxy http.Handler, request *utils.
return fmt.Errorf("failed to create the response forwarder: %v", err)
}
hostProxy.ServeHTTP(responseForwarder, httpRequest)
latency := time.Since(request.StartTime)
if *debug {
log.Printf("Backend latency for request %s: %s\n", request.RequestID, time.Since(request.StartTime).String())
log.Printf("Backend latency for request %s: %s\n", request.RequestID, latency.String())
}
// Always record for expvar metrics
metrics.RecordResponseTime(latency)
if err := responseForwarder.Close(); err != nil {
return fmt.Errorf("failed to close the response forwarder: %v", err)
}
Expand Down Expand Up @@ -332,6 +337,9 @@ func main() {
if *backendID == "" {
log.Fatal("You must specify a backend ID")
}
if *statsAddr != "" {
go stats.Start(*statsAddr, *backendID, *proxy)
}
if !strings.HasPrefix(*healthCheckPath, "/") {
*healthCheckPath = "/" + *healthCheckPath
}
Expand All @@ -344,6 +352,11 @@ func main() {
log.Printf("Unable to create metric handler: %v", err)
}

// Start expvar metrics update goroutine only if cloud monitoring is disabled
if metricHandler == nil {
metrics.StartExpvarMetrics()
}

waitForHealthy()
go runHealthChecks()

Expand Down
158 changes: 151 additions & 7 deletions agent/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ package metrics

import (
"context"
"expvar"
"fmt"
"log"
"sort"
"strings"
"sync"
"time"
Expand All @@ -41,7 +43,40 @@ var (
}
)

var codeCount map[string]int64
var (
codeCount map[string]int64
responseCodes = expvar.NewMap("response_codes")
p50ResponseTime = new(expvar.Float)
p90ResponseTime = new(expvar.Float)
p99ResponseTime = new(expvar.Float)
responseTimesVar = new(expvar.Map)
latencies []time.Duration
latenciesMutex sync.Mutex
percentilesToCalc = []float64{50.0, 90.0, 99.0}
percentileToExpvar = map[float64]*expvar.Float{
50.0: p50ResponseTime,
90.0: p90ResponseTime,
99.0: p99ResponseTime,
}
)

func init() {
responseTimesVar.Set("p50", p50ResponseTime)
responseTimesVar.Set("p90", p90ResponseTime)
responseTimesVar.Set("p99", p99ResponseTime)
expvar.Publish("response_times", responseTimesVar)
}

// StartExpvarMetrics starts a goroutine that periodically updates expvar metrics
func StartExpvarMetrics() {
go func() {
ticker := time.NewTicker(samplePeriod)
defer ticker.Stop()
for range ticker.C {
updateExpvarPercentiles()
}
}()
}

// metricClient is a client for interacting with Cloud Monitoring API.
type metricClient interface {
Expand Down Expand Up @@ -89,10 +124,7 @@ func NewMetricHandler(ctx context.Context, projectID, resourceType, resourceKeyV
case <-handler.ctx.Done():
return
case <-ticker.C:
handler.emitResponseCodeMetric()
handler.mu.Lock()
codeCount = make(map[string]int64)
handler.mu.Unlock()
handler.emitMetrics()
}
}
}()
Expand Down Expand Up @@ -127,6 +159,7 @@ func newMetricHandlerHelper(ctx context.Context, projectID, resourceType, resour
}

codeCount = make(map[string]int64)
latencies = make([]time.Duration, 0)

return &MetricHandler{
projectID: projectID,
Expand Down Expand Up @@ -187,13 +220,19 @@ func (h *MetricHandler) GetResponseCountMetricType() string {
return fmt.Sprintf("%s/instance/proxy_agent/response_count", h.metricDomain)
}

// RecordResponseCode records a response code to expvar (always works, even without cloud monitoring)
func RecordResponseCode(statusCode int) {
responseCode := fmt.Sprintf("%v", statusCode)
responseCodes.Add(responseCode, 1)
}

// WriteResponseCodeMetric will record observed response codes and emitResponseCodeMetric writes to cloud monarch
func (h *MetricHandler) WriteResponseCodeMetric(statusCode int) error {
if h == nil {
return nil
}
responseCode := fmt.Sprintf("%v", statusCode)

responseCode := fmt.Sprintf("%v", statusCode)
// Update response code count for the current sample period
h.mu.Lock()
codeCount[responseCode]++
Expand All @@ -202,10 +241,42 @@ func (h *MetricHandler) WriteResponseCodeMetric(statusCode int) error {
return nil
}

// RecordResponseTime records observed response times for expvar metrics
func RecordResponseTime(latency time.Duration) {
latenciesMutex.Lock()
latencies = append(latencies, latency)
latenciesMutex.Unlock()
}

// WriteResponseTime will record observed response times
func (h *MetricHandler) WriteResponseTime(latency time.Duration) {
RecordResponseTime(latency)
}

func (h *MetricHandler) emitMetrics() {
h.emitResponseCodeMetric()
h.emitResponseTimeMetric()
h.mu.Lock()
codeCount = make(map[string]int64)
h.mu.Unlock()
latenciesMutex.Lock()
latencies = latencies[:0]
latenciesMutex.Unlock()
}

// emitResponseCodeMetric emits observed response codes to cloud monarch once sample period is over
func (h *MetricHandler) emitResponseCodeMetric() {
log.Printf("WriteResponseCodeMetric|attempting to write metrics at time: %v\n", time.Now())
for responseCode, count := range codeCount {

// Copy codeCount while holding the lock to avoid race conditions
h.mu.Lock()
counts := make(map[string]int64, len(codeCount))
for k, v := range codeCount {
counts[k] = v
}
h.mu.Unlock()

for responseCode, count := range counts {
responseClass := fmt.Sprintf("%sXX", responseCode[0:1])
metricLabels := map[string]string{
"response_code": responseCode,
Expand All @@ -225,6 +296,79 @@ func (h *MetricHandler) emitResponseCodeMetric() {
}
}

// updateExpvarPercentiles calculates and updates expvar percentiles from recorded latencies
func updateExpvarPercentiles() {
latenciesMutex.Lock()
defer latenciesMutex.Unlock()
if len(latencies) == 0 {
return
}
// Make a copy and sort it
latenciesCopy := make([]time.Duration, len(latencies))
copy(latenciesCopy, latencies)
sort.Slice(latenciesCopy, func(i, j int) bool {
return latenciesCopy[i] < latenciesCopy[j]
})
for _, p := range percentilesToCalc {
percentileValue := calculatePercentile(p, latenciesCopy)
expvar, ok := percentileToExpvar[p]
if !ok {
log.Printf("Unknown percentile value: %v", p)
continue
}
expvar.Set(percentileValue)
}
}

func (h *MetricHandler) emitResponseTimeMetric() {
updateExpvarPercentiles()
}

func calculatePercentile(p float64, d []time.Duration) float64 {
if len(d) == 0 {
return 0.0
}
index := (p / 100.0) * float64(len(d)-1)
lower := int(index)
upper := lower + 1
if upper >= len(d) {
return float64(d[lower].Nanoseconds()) / 1e6
}
weight := index - float64(lower)
lowerVal := float64(d[lower].Nanoseconds()) / 1e6
upperVal := float64(d[upper].Nanoseconds()) / 1e6
return lowerVal*(1-weight) + upperVal*weight
}

// GetCurrentPercentiles calculates and returns the current percentiles from recorded latencies
func GetCurrentPercentiles() map[string]float64 {
latenciesMutex.Lock()
defer latenciesMutex.Unlock()

result := map[string]float64{
"p50": 0.0,
"p90": 0.0,
"p99": 0.0,
}

if len(latencies) == 0 {
return result
}

// Make a copy and sort it
latenciesCopy := make([]time.Duration, len(latencies))
copy(latenciesCopy, latencies)
sort.Slice(latenciesCopy, func(i, j int) bool {
return latenciesCopy[i] < latenciesCopy[j]
})

result["p50"] = calculatePercentile(50.0, latenciesCopy)
result["p90"] = calculatePercentile(90.0, latenciesCopy)
result["p99"] = calculatePercentile(99.0, latenciesCopy)

return result
}

// newTimeSeries creates and returns a new time series
func (h *MetricHandler) newTimeSeries(metricType string, metricLabels map[string]string, dataPoint *monitoringpb.Point) *monitoringpb.TimeSeries {
return &monitoringpb.TimeSeries{
Expand Down
Loading
Loading