Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions experimental/stats/metricregistry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,22 @@ func (r *fakeMetricsRecorder) RecordInt64UpDownCount(handle *Int64UpDownCountHan
r.intValues[handle.Descriptor()] += incr
}

func (r *fakeMetricsRecorder) RecordInt64AsyncGauge(handle *Int64AsyncGaugeHandle, incr int64, labels ...string) {
func (r *fakeMetricsRecorder) RecordInt64AsyncGauge(handle *Int64AsyncGaugeHandle, val int64, labels ...string) {
verifyLabels(r.t, handle.Descriptor().Labels, handle.Descriptor().OptionalLabels, labels)
r.intValues[handle.Descriptor()] += incr
// Async gauges in OTel are "Observer" instruments; they report
// the current state of the world every cycle, they do not accumulate deltas.
r.intValues[handle.Descriptor()] = val
}

func (r *fakeMetricsRecorder) RegisterAsyncReporter(reporter AsyncMetricReporter, _ ...AsyncMetric) func() {
// We execute the reporter immediately.
// This allows the test to verify the metric value in r.intValues immediately
// after the component under test calls RegisterAsyncReporter.
err := reporter.Report(r)
if err != nil {
r.t.Logf("Async reporter returned error: %v", err)
}

// Return a no-op cleanup function
return func() {}
}
28 changes: 28 additions & 0 deletions experimental/stats/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,34 @@ type MetricsRecorder interface {
// RecordInt64UpDownCounter records the measurement alongside labels on the int
// count associated with the provided handle.
RecordInt64UpDownCount(handle *Int64UpDownCountHandle, incr int64, labels ...string)
// RegisterAsyncReporter registers a reporter to produce metric values for
// only the listed descriptors. The returned function must be called when
// the metrics are no longer needed, which will remove the reporter. The
// returned method needs to be idempotent and concurrent safe.
RegisterAsyncReporter(reporter AsyncMetricReporter, descriptors ...AsyncMetric) func()
}

// AsyncMetricReporter is an interface for types that record metrics asynchronously
// for the set of descriptors they are registered with. The AsyncMetricsRecorder
// parameter is used to record values for these metrics.
//
// Implementations must make unique recordings across all registered
// AsyncMetricReporters. Meaning, they should not report values for a metric with
// the same attributes as another AsyncMetricReporter will report.
//
// Implementations must be concurrent-safe.
type AsyncMetricReporter interface {
// Report records metric values using the provided recorder.
Report(AsyncMetricsRecorder) error
}

// AsyncMetricReporterFunc is an adapter to allow the use of ordinary functions as
// AsyncMetricReporters.
type AsyncMetricReporterFunc func(AsyncMetricsRecorder) error

// Report calls f(r).
func (f AsyncMetricReporterFunc) Report(r AsyncMetricsRecorder) error {
return f(r)
}

// AsyncMetricsRecorder records on asynchronous metrics derived from metric registry.
Expand Down
50 changes: 50 additions & 0 deletions internal/stats/metrics_recorder_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,53 @@ func (l *MetricsRecorderList) RecordInt64Gauge(handle *estats.Int64GaugeHandle,
metricRecorder.RecordInt64Gauge(handle, incr, labels...)
}
}

// RegisterAsyncReporter forwards the registration to all underlying metrics
// recorders.
//
// It returns a cleanup function that, when called, invokes the cleanup function
// returned by each underlying recorder, ensuring the reporter is unregistered
// from all of them.
func (l *MetricsRecorderList) RegisterAsyncReporter(reporter estats.AsyncMetricReporter, metrics ...estats.AsyncMetric) func() {
descriptorsMap := make(map[*estats.MetricDescriptor]bool, len(metrics))
for _, m := range metrics {
descriptorsMap[m.Descriptor()] = true
}
unregisterFns := make([]func(), 0, len(l.metricsRecorders))
for _, mr := range l.metricsRecorders {
// Wrap the AsyncMetricsRecorder to intercept calls to RecordInt64Gauge
// and validate the labels.
wrappedCallback := func(recorder estats.AsyncMetricsRecorder) error {
wrappedRecorder := &asyncRecorderWrapper{
delegate: recorder,
descriptors: descriptorsMap,
}
return reporter.Report(wrappedRecorder)
}
unregisterFns = append(unregisterFns, mr.RegisterAsyncReporter(estats.AsyncMetricReporterFunc(wrappedCallback), metrics...))
}
return func() {
for _, unregister := range unregisterFns {
unregister()
}
}
}

type asyncRecorderWrapper struct {
delegate estats.AsyncMetricsRecorder
descriptors map[*estats.MetricDescriptor]bool
}

// RecordIntAsync64Gauge records the measurement alongside labels on the int
// gauge associated with the provided handle.
func (w *asyncRecorderWrapper) RecordInt64AsyncGauge(handle *estats.Int64AsyncGaugeHandle, value int64, labels ...string) {
// Ensure only metrics for descriptors passed during callback registeration
// are emitted.
d := handle.Descriptor()
if _, ok := w.descriptors[d]; !ok {
return
}
// Validate labels and delegate.
verifyLabels(d, labels...)
w.delegate.RecordInt64AsyncGauge(handle, value, labels...)
}
81 changes: 80 additions & 1 deletion internal/stats/metrics_recorder_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
gstats "google.golang.org/grpc/stats"
)

var defaultTestTimeout = 5 * time.Second
Expand Down Expand Up @@ -114,7 +115,6 @@ func (recordingLoadBalancerBuilder) Build(cc balancer.ClientConn, bOpts balancer
intHistoHandle.Record(cc.MetricsRecorder(), 3, "int histo label val", "int histo optional label val")
floatHistoHandle.Record(cc.MetricsRecorder(), 4, "float histo label val", "float histo optional label val")
intGaugeHandle.Record(cc.MetricsRecorder(), 5, "int gauge label val", "int gauge optional label val")

return &recordingLoadBalancer{
Balancer: balancer.Get(pickfirst.Name).Build(cc, bOpts),
}
Expand Down Expand Up @@ -255,3 +255,82 @@ func (s) TestMetricRecorderListPanic(t *testing.T) {

intCountHandle.Record(mrl, 1, "only one label")
}

// TestMetricsRecorderList_RegisterAsyncReporter verifies that the list implementation
// correctly fans out registration calls to all underlying recorders and
// aggregates the cleanup calls.
func TestMetricsRecorderList_RegisterAsyncReporter(t *testing.T) {
spy1 := &spyMetricsRecorder{name: "spy1"}
spy2 := &spyMetricsRecorder{name: "spy2"}
spy3 := &spyMetricsRecorder{name: "spy3"}

list := istats.NewMetricsRecorderList([]gstats.Handler{spy1, spy2, spy3})

desc := &estats.MetricDescriptor{Name: "test_metric", Description: "test"}
mockMetric := &mockAsyncMetric{d: desc}

dummyReporter := estats.AsyncMetricReporterFunc(func(estats.AsyncMetricsRecorder) error {
return nil
})
cleanup := list.RegisterAsyncReporter(dummyReporter, mockMetric)

// Check that RegisterAsyncReporter was called exactly once on ALL spies
if spy1.registerCalledCount != 1 {
t.Errorf("spy1 register called %d times, want 1", spy1.registerCalledCount)
}
if spy2.registerCalledCount != 1 {
t.Errorf("spy2 register called %d times, want 1", spy2.registerCalledCount)
}
if spy3.registerCalledCount != 1 {
t.Errorf("spy3 register called %d times, want 1", spy3.registerCalledCount)
}

// Verify that cleanup has NOT been called yet
if spy1.cleanupCalledCount != 0 {
t.Error("spy1 cleanup called prematurely")
}

cleanup()

// Check that the cleanup function returned by the list actually triggers
// the cleanup logic on ALL underlying spies.
if spy1.cleanupCalledCount != 1 {
t.Errorf("spy1 cleanup called %d times, want 1", spy1.cleanupCalledCount)
}
if spy2.cleanupCalledCount != 1 {
t.Errorf("spy2 cleanup called %d times, want 1", spy2.cleanupCalledCount)
}
if spy3.cleanupCalledCount != 1 {
t.Errorf("spy3 cleanup called %d times, want 1", spy3.cleanupCalledCount)
}
}

// --- Helpers & Spies ---

// mockAsyncMetric implements estats.AsyncMetric
type mockAsyncMetric struct {
estats.AsyncMetric
d *estats.MetricDescriptor
}

func (m *mockAsyncMetric) Descriptor() *estats.MetricDescriptor {
return m.d
}

// spyMetricsRecorder implements estats.MetricsRecorder
type spyMetricsRecorder struct {
stats.TestMetricsRecorder
name string
registerCalledCount int
cleanupCalledCount int
}

// RegisterAsyncReporter implements the interface and tracks calls.
func (s *spyMetricsRecorder) RegisterAsyncReporter(estats.AsyncMetricReporter, ...estats.AsyncMetric) func() {
s.registerCalledCount++

// Return a cleanup function that tracks if it was called
return func() {
s.cleanupCalledCount++
}
}
11 changes: 11 additions & 0 deletions internal/testutils/stats/test_metrics_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,12 @@ func (r *TestMetricsRecorder) RecordInt64Gauge(handle *estats.Int64GaugeHandle,
r.data[handle.Name] = float64(incr)
}

// RegisterAsyncReporter is noop implementation, async gauge test recorders should
// provide their own implementation
func (r *TestMetricsRecorder) RegisterAsyncReporter(estats.AsyncMetricReporter, ...estats.AsyncMetric) func() {
return func() {}
}

// To implement a stats.Handler, which allows it to be set as a dial option:

// TagRPC is TestMetricsRecorder's implementation of TagRPC.
Expand Down Expand Up @@ -316,3 +322,8 @@ func (r *NoopMetricsRecorder) RecordInt64Gauge(*estats.Int64GaugeHandle, int64,
// RecordInt64UpDownCount is a noop implementation of RecordInt64UpDownCount.
func (r *NoopMetricsRecorder) RecordInt64UpDownCount(*estats.Int64UpDownCountHandle, int64, ...string) {
}

// RegisterAsyncReporter is a noop implementation of RegisterAsyncReporter.
func (r *NoopMetricsRecorder) RegisterAsyncReporter(estats.AsyncMetricReporter, ...estats.AsyncMetric) func() {
return func() {}
}
13 changes: 13 additions & 0 deletions stats/opentelemetry/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,19 @@ func (rm *registryMetrics) RecordInt64Gauge(handle *estats.Int64GaugeHandle, inc
}
}

// RegisterAsyncReporter will register a callback with the underlying OpenTelemetry
// Meter for the provided descriptors.
//
// It will map the provided descriptors to their corresponding OTel Observable
// instruments. If no instruments match the descriptors, registration is
// skipped.
//
// The returned cleanup function unregisters the callback from the Meter.
func (rm *registryMetrics) RegisterAsyncReporter(_ estats.AsyncMetricReporter, _ ...estats.AsyncMetric) func() {
// TODO(@mbissa) - add implementation
return func() {}
}

// Users of this component should use these bucket boundaries as part of their
// SDK MeterProvider passed in. This component sends this as "advice" to the
// API, which works, however this stability is not guaranteed, so for safety the
Expand Down