diff --git a/experimental/stats/metricregistry_test.go b/experimental/stats/metricregistry_test.go index 8bf50abbe162..95440f53f821 100644 --- a/experimental/stats/metricregistry_test.go +++ b/experimental/stats/metricregistry_test.go @@ -309,7 +309,22 @@ func (r *fakeMetricsRecorder) RecordInt64UpDownCount(handle *Int64UpDownCountHan r.intValues[handle.Descriptor()] += incr } -func (r *fakeMetricsRecorder) RecordInt64AsyncGauge(handle *Int64AsyncGaugeHandle, incr int64, labels ...string) { +func (r *fakeMetricsRecorder) RecordInt64AsyncGauge(handle *Int64AsyncGaugeHandle, val int64, labels ...string) { verifyLabels(r.t, handle.Descriptor().Labels, handle.Descriptor().OptionalLabels, labels) - r.intValues[handle.Descriptor()] += incr + // Async gauges in OTel are "Observer" instruments; they report + // the current state of the world every cycle, they do not accumulate deltas. + r.intValues[handle.Descriptor()] = val +} + +func (r *fakeMetricsRecorder) RegisterAsyncReporter(reporter AsyncMetricReporter, _ ...AsyncMetric) func() { + // We execute the reporter immediately. + // This allows the test to verify the metric value in r.intValues immediately + // after the component under test calls RegisterAsyncReporter. + err := reporter.Report(r) + if err != nil { + r.t.Logf("Async reporter returned error: %v", err) + } + + // Return a no-op cleanup function + return func() {} } diff --git a/experimental/stats/metrics.go b/experimental/stats/metrics.go index d7d404cbe438..1d2dc0167a61 100644 --- a/experimental/stats/metrics.go +++ b/experimental/stats/metrics.go @@ -41,6 +41,34 @@ type MetricsRecorder interface { // RecordInt64UpDownCounter records the measurement alongside labels on the int // count associated with the provided handle. RecordInt64UpDownCount(handle *Int64UpDownCountHandle, incr int64, labels ...string) + // RegisterAsyncReporter registers a reporter to produce metric values for + // only the listed descriptors. The returned function must be called when + // the metrics are no longer needed, which will remove the reporter. The + // returned method needs to be idempotent and concurrent safe. + RegisterAsyncReporter(reporter AsyncMetricReporter, descriptors ...AsyncMetric) func() +} + +// AsyncMetricReporter is an interface for types that record metrics asynchronously +// for the set of descriptors they are registered with. The AsyncMetricsRecorder +// parameter is used to record values for these metrics. +// +// Implementations must make unique recordings across all registered +// AsyncMetricReporters. Meaning, they should not report values for a metric with +// the same attributes as another AsyncMetricReporter will report. +// +// Implementations must be concurrent-safe. +type AsyncMetricReporter interface { + // Report records metric values using the provided recorder. + Report(AsyncMetricsRecorder) error +} + +// AsyncMetricReporterFunc is an adapter to allow the use of ordinary functions as +// AsyncMetricReporters. +type AsyncMetricReporterFunc func(AsyncMetricsRecorder) error + +// Report calls f(r). +func (f AsyncMetricReporterFunc) Report(r AsyncMetricsRecorder) error { + return f(r) } // AsyncMetricsRecorder records on asynchronous metrics derived from metric registry. diff --git a/internal/stats/metrics_recorder_list.go b/internal/stats/metrics_recorder_list.go index d5f7e4d62dd1..4a9fc0127f75 100644 --- a/internal/stats/metrics_recorder_list.go +++ b/internal/stats/metrics_recorder_list.go @@ -113,3 +113,53 @@ func (l *MetricsRecorderList) RecordInt64Gauge(handle *estats.Int64GaugeHandle, metricRecorder.RecordInt64Gauge(handle, incr, labels...) } } + +// RegisterAsyncReporter forwards the registration to all underlying metrics +// recorders. +// +// It returns a cleanup function that, when called, invokes the cleanup function +// returned by each underlying recorder, ensuring the reporter is unregistered +// from all of them. +func (l *MetricsRecorderList) RegisterAsyncReporter(reporter estats.AsyncMetricReporter, metrics ...estats.AsyncMetric) func() { + descriptorsMap := make(map[*estats.MetricDescriptor]bool, len(metrics)) + for _, m := range metrics { + descriptorsMap[m.Descriptor()] = true + } + unregisterFns := make([]func(), 0, len(l.metricsRecorders)) + for _, mr := range l.metricsRecorders { + // Wrap the AsyncMetricsRecorder to intercept calls to RecordInt64Gauge + // and validate the labels. + wrappedCallback := func(recorder estats.AsyncMetricsRecorder) error { + wrappedRecorder := &asyncRecorderWrapper{ + delegate: recorder, + descriptors: descriptorsMap, + } + return reporter.Report(wrappedRecorder) + } + unregisterFns = append(unregisterFns, mr.RegisterAsyncReporter(estats.AsyncMetricReporterFunc(wrappedCallback), metrics...)) + } + return func() { + for _, unregister := range unregisterFns { + unregister() + } + } +} + +type asyncRecorderWrapper struct { + delegate estats.AsyncMetricsRecorder + descriptors map[*estats.MetricDescriptor]bool +} + +// RecordIntAsync64Gauge records the measurement alongside labels on the int +// gauge associated with the provided handle. +func (w *asyncRecorderWrapper) RecordInt64AsyncGauge(handle *estats.Int64AsyncGaugeHandle, value int64, labels ...string) { + // Ensure only metrics for descriptors passed during callback registeration + // are emitted. + d := handle.Descriptor() + if _, ok := w.descriptors[d]; !ok { + return + } + // Validate labels and delegate. + verifyLabels(d, labels...) + w.delegate.RecordInt64AsyncGauge(handle, value, labels...) +} diff --git a/internal/stats/metrics_recorder_list_test.go b/internal/stats/metrics_recorder_list_test.go index 38f9472f926a..f94f85713377 100644 --- a/internal/stats/metrics_recorder_list_test.go +++ b/internal/stats/metrics_recorder_list_test.go @@ -41,6 +41,7 @@ import ( "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" "google.golang.org/grpc/serviceconfig" + gstats "google.golang.org/grpc/stats" ) var defaultTestTimeout = 5 * time.Second @@ -114,7 +115,6 @@ func (recordingLoadBalancerBuilder) Build(cc balancer.ClientConn, bOpts balancer intHistoHandle.Record(cc.MetricsRecorder(), 3, "int histo label val", "int histo optional label val") floatHistoHandle.Record(cc.MetricsRecorder(), 4, "float histo label val", "float histo optional label val") intGaugeHandle.Record(cc.MetricsRecorder(), 5, "int gauge label val", "int gauge optional label val") - return &recordingLoadBalancer{ Balancer: balancer.Get(pickfirst.Name).Build(cc, bOpts), } @@ -255,3 +255,82 @@ func (s) TestMetricRecorderListPanic(t *testing.T) { intCountHandle.Record(mrl, 1, "only one label") } + +// TestMetricsRecorderList_RegisterAsyncReporter verifies that the list implementation +// correctly fans out registration calls to all underlying recorders and +// aggregates the cleanup calls. +func TestMetricsRecorderList_RegisterAsyncReporter(t *testing.T) { + spy1 := &spyMetricsRecorder{name: "spy1"} + spy2 := &spyMetricsRecorder{name: "spy2"} + spy3 := &spyMetricsRecorder{name: "spy3"} + + list := istats.NewMetricsRecorderList([]gstats.Handler{spy1, spy2, spy3}) + + desc := &estats.MetricDescriptor{Name: "test_metric", Description: "test"} + mockMetric := &mockAsyncMetric{d: desc} + + dummyReporter := estats.AsyncMetricReporterFunc(func(estats.AsyncMetricsRecorder) error { + return nil + }) + cleanup := list.RegisterAsyncReporter(dummyReporter, mockMetric) + + // Check that RegisterAsyncReporter was called exactly once on ALL spies + if spy1.registerCalledCount != 1 { + t.Errorf("spy1 register called %d times, want 1", spy1.registerCalledCount) + } + if spy2.registerCalledCount != 1 { + t.Errorf("spy2 register called %d times, want 1", spy2.registerCalledCount) + } + if spy3.registerCalledCount != 1 { + t.Errorf("spy3 register called %d times, want 1", spy3.registerCalledCount) + } + + // Verify that cleanup has NOT been called yet + if spy1.cleanupCalledCount != 0 { + t.Error("spy1 cleanup called prematurely") + } + + cleanup() + + // Check that the cleanup function returned by the list actually triggers + // the cleanup logic on ALL underlying spies. + if spy1.cleanupCalledCount != 1 { + t.Errorf("spy1 cleanup called %d times, want 1", spy1.cleanupCalledCount) + } + if spy2.cleanupCalledCount != 1 { + t.Errorf("spy2 cleanup called %d times, want 1", spy2.cleanupCalledCount) + } + if spy3.cleanupCalledCount != 1 { + t.Errorf("spy3 cleanup called %d times, want 1", spy3.cleanupCalledCount) + } +} + +// --- Helpers & Spies --- + +// mockAsyncMetric implements estats.AsyncMetric +type mockAsyncMetric struct { + estats.AsyncMetric + d *estats.MetricDescriptor +} + +func (m *mockAsyncMetric) Descriptor() *estats.MetricDescriptor { + return m.d +} + +// spyMetricsRecorder implements estats.MetricsRecorder +type spyMetricsRecorder struct { + stats.TestMetricsRecorder + name string + registerCalledCount int + cleanupCalledCount int +} + +// RegisterAsyncReporter implements the interface and tracks calls. +func (s *spyMetricsRecorder) RegisterAsyncReporter(estats.AsyncMetricReporter, ...estats.AsyncMetric) func() { + s.registerCalledCount++ + + // Return a cleanup function that tracks if it was called + return func() { + s.cleanupCalledCount++ + } +} diff --git a/internal/testutils/stats/test_metrics_recorder.go b/internal/testutils/stats/test_metrics_recorder.go index be1a06117a2f..40fc9b7b274a 100644 --- a/internal/testutils/stats/test_metrics_recorder.go +++ b/internal/testutils/stats/test_metrics_recorder.go @@ -276,6 +276,12 @@ func (r *TestMetricsRecorder) RecordInt64Gauge(handle *estats.Int64GaugeHandle, r.data[handle.Name] = float64(incr) } +// RegisterAsyncReporter is noop implementation, async gauge test recorders should +// provide their own implementation +func (r *TestMetricsRecorder) RegisterAsyncReporter(estats.AsyncMetricReporter, ...estats.AsyncMetric) func() { + return func() {} +} + // To implement a stats.Handler, which allows it to be set as a dial option: // TagRPC is TestMetricsRecorder's implementation of TagRPC. @@ -316,3 +322,8 @@ func (r *NoopMetricsRecorder) RecordInt64Gauge(*estats.Int64GaugeHandle, int64, // RecordInt64UpDownCount is a noop implementation of RecordInt64UpDownCount. func (r *NoopMetricsRecorder) RecordInt64UpDownCount(*estats.Int64UpDownCountHandle, int64, ...string) { } + +// RegisterAsyncReporter is a noop implementation of RegisterAsyncReporter. +func (r *NoopMetricsRecorder) RegisterAsyncReporter(estats.AsyncMetricReporter, ...estats.AsyncMetric) func() { + return func() {} +} diff --git a/stats/opentelemetry/opentelemetry.go b/stats/opentelemetry/opentelemetry.go index 2a9cb5e57d77..db7207527f52 100644 --- a/stats/opentelemetry/opentelemetry.go +++ b/stats/opentelemetry/opentelemetry.go @@ -453,6 +453,19 @@ func (rm *registryMetrics) RecordInt64Gauge(handle *estats.Int64GaugeHandle, inc } } +// RegisterAsyncReporter will register a callback with the underlying OpenTelemetry +// Meter for the provided descriptors. +// +// It will map the provided descriptors to their corresponding OTel Observable +// instruments. If no instruments match the descriptors, registration is +// skipped. +// +// The returned cleanup function unregisters the callback from the Meter. +func (rm *registryMetrics) RegisterAsyncReporter(_ estats.AsyncMetricReporter, _ ...estats.AsyncMetric) func() { + // TODO(@mbissa) - add implementation + return func() {} +} + // Users of this component should use these bucket boundaries as part of their // SDK MeterProvider passed in. This component sends this as "advice" to the // API, which works, however this stability is not guaranteed, so for safety the