From 9f1382d27731f65ce608925ce82c6dccd261cf14 Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Tue, 9 Dec 2025 21:22:21 -0500 Subject: [PATCH 1/4] add support for faas metrics in telemetryapi receiver --- .../receiver/telemetryapireceiver/factory.go | 14 + .../telemetryapireceiver/metric_builder.go | 225 ++++++ .../metric_builder_test.go | 639 ++++++++++++++++++ .../telemetryapireceiver/receiver_test.go | 2 +- 4 files changed, 879 insertions(+), 1 deletion(-) create mode 100644 collector/receiver/telemetryapireceiver/metric_builder.go create mode 100644 collector/receiver/telemetryapireceiver/metric_builder_test.go diff --git a/collector/receiver/telemetryapireceiver/factory.go b/collector/receiver/telemetryapireceiver/factory.go index b5ea6a3b13..3f4f4f177d 100644 --- a/collector/receiver/telemetryapireceiver/factory.go +++ b/collector/receiver/telemetryapireceiver/factory.go @@ -50,6 +50,7 @@ func NewFactory(extensionID string) receiver.Factory { } }, receiver.WithTraces(createTracesReceiver, stability), + receiver.WithMetrics(createMetricsReceiver, stability), receiver.WithLogs(createLogsReceiver, stability)) } @@ -66,6 +67,19 @@ func createTracesReceiver(ctx context.Context, params receiver.Settings, rConf c return r, nil } +func createMetricsReceiver(ctx context.Context, params receiver.Settings, rConf component.Config, next consumer.Metrics) (receiver.Metrics, error) { + cfg, ok := rConf.(*Config) + if !ok { + return nil, errConfigNotTelemetryAPI + } + r := receivers.GetOrAdd(cfg, func() component.Component { + t, _ := newTelemetryAPIReceiver(cfg, params) + return t + }) + r.Unwrap().(*telemetryAPIReceiver).registerMetricsConsumer(next) + return r, nil +} + func createLogsReceiver(ctx context.Context, params receiver.Settings, rConf component.Config, next consumer.Logs) (receiver.Logs, error) { cfg, ok := rConf.(*Config) if !ok { diff --git a/collector/receiver/telemetryapireceiver/metric_builder.go b/collector/receiver/telemetryapireceiver/metric_builder.go new file mode 100644 index 0000000000..55eb90ab47 --- /dev/null +++ b/collector/receiver/telemetryapireceiver/metric_builder.go @@ -0,0 +1,225 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package telemetryapireceiver + +import ( + "sort" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + semconv2 "go.opentelemetry.io/otel/semconv/v1.24.0" +) + +var DefaultHistogramBounds = []float64{0.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 750.0, 1000.0, 2500.0, 5000.0, 7500.0, 10000.0} +var DurationHistogramBounds = []float64{0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10} + +type HistogramMetricBuilder struct { + name string + description string + unit string + bounds []float64 + counts []uint64 + total uint64 + sum float64 + startTime pcommon.Timestamp + temporality pmetric.AggregationTemporality +} + +func NewHistogramMetricBuilder(name string, description string, unit string, bounds []float64, startTime pcommon.Timestamp) *HistogramMetricBuilder { + b := bounds + if bounds == nil { + b = DefaultHistogramBounds + } + + counts := make([]uint64, len(b)+1) + return &HistogramMetricBuilder{ + name: name, + description: description, + unit: unit, + bounds: b, + counts: counts, + startTime: startTime, + temporality: pmetric.AggregationTemporalityCumulative, + } +} + +func (h *HistogramMetricBuilder) AppendDataPoint(scopeMetrics pmetric.ScopeMetrics, timestamp pcommon.Timestamp, value float64) { + metric := scopeMetrics.Metrics().AppendEmpty() + metric.SetName(h.name) + metric.SetDescription(h.description) + metric.SetUnit(h.unit) + + hist := metric.SetEmptyHistogram() + hist.SetAggregationTemporality(h.temporality) + + h.sum += value + h.total++ + h.counts[sort.SearchFloat64s(h.bounds, value)]++ + + dp := hist.DataPoints().AppendEmpty() + dp.SetStartTimestamp(h.startTime) + dp.SetTimestamp(timestamp) + dp.SetSum(h.sum) + dp.SetCount(h.total) + + dp.BucketCounts().FromRaw(h.counts) + dp.ExplicitBounds().FromRaw(h.bounds) +} + +func (h *HistogramMetricBuilder) Reset(timestamp pcommon.Timestamp) { + h.startTime = timestamp + h.sum = 0 + h.total = 0 + + for i := range h.counts { + h.counts[i] = 0 + } +} + +type CounterMetricBuilder struct { + name string + description string + unit string + total int64 + isMonotonic bool + temporality pmetric.AggregationTemporality + startTime pcommon.Timestamp +} + +func NewCounterMetricBuilder(name string, description string, unit string, isMonotonic bool, startTime pcommon.Timestamp) *CounterMetricBuilder { + return &CounterMetricBuilder{ + name: name, + description: description, + unit: unit, + isMonotonic: isMonotonic, + temporality: pmetric.AggregationTemporalityCumulative, + startTime: startTime, + } +} + +func (c *CounterMetricBuilder) AppendDataPoint(scopeMetrics pmetric.ScopeMetrics, timestamp pcommon.Timestamp, value int64) { + metric := scopeMetrics.Metrics().AppendEmpty() + metric.SetName(c.name) + metric.SetDescription(c.description) + metric.SetUnit(c.unit) + + sum := metric.SetEmptySum() + sum.SetAggregationTemporality(c.temporality) + sum.SetIsMonotonic(c.isMonotonic) + + c.total += value + + dp := sum.DataPoints().AppendEmpty() + dp.SetStartTimestamp(c.startTime) + dp.SetTimestamp(timestamp) + dp.SetIntValue(c.total) +} + +func (c *CounterMetricBuilder) Reset(timestamp pcommon.Timestamp) { + c.startTime = timestamp + c.total = 0 +} + +func NewFasSInvokeDurationMetricBuilder(startTime pcommon.Timestamp) *HistogramMetricBuilder { + return NewHistogramMetricBuilder( + semconv2.FaaSInvokeDurationName, + semconv2.FaaSInvokeDurationDescription, + semconv2.FaaSInvokeDurationUnit, + DurationHistogramBounds, + startTime, + ) +} + +func NewFasSInitDurationMetricBuilder(startTime pcommon.Timestamp) *HistogramMetricBuilder { + return NewHistogramMetricBuilder( + semconv2.FaaSInitDurationName, + semconv2.FaaSInitDurationDescription, + semconv2.FaaSInitDurationUnit, + DurationHistogramBounds, + startTime, + ) +} + +func NewFaaSMemUsageMetricBuilder(startTime pcommon.Timestamp) *HistogramMetricBuilder { + return NewHistogramMetricBuilder( + semconv2.FaaSMemUsageName, + semconv2.FaaSMemUsageDescription, + semconv2.FaaSMemUsageUnit, + DefaultHistogramBounds, + startTime, + ) +} + +func NewFaaSColdstartsMetricBuilder(startTime pcommon.Timestamp) *CounterMetricBuilder { + return NewCounterMetricBuilder( + semconv2.FaaSColdstartsName, + semconv2.FaaSColdstartsDescription, + semconv2.FaaSColdstartsUnit, + true, + startTime, + ) +} + +func NewFaaSErrorsMetricBuilder(startTime pcommon.Timestamp) *CounterMetricBuilder { + return NewCounterMetricBuilder( + semconv2.FaaSErrorsName, + semconv2.FaaSErrorsDescription, + semconv2.FaaSErrorsUnit, + true, + startTime, + ) +} + +func NewFaaSInvocationsMetricBuilder(startTime pcommon.Timestamp) *CounterMetricBuilder { + return NewCounterMetricBuilder( + semconv2.FaaSInvocationsName, + semconv2.FaaSInvocationsDescription, + semconv2.FaaSInvocationsUnit, + true, + startTime, + ) +} + +func NewFaaSTimeoutsMetricBuilder(startTime pcommon.Timestamp) *CounterMetricBuilder { + return NewCounterMetricBuilder( + semconv2.FaaSTimeoutsName, + semconv2.FaaSTimeoutsDescription, + semconv2.FaaSTimeoutsUnit, + true, + startTime, + ) +} + +type FaaSMetricBuilders struct { + invokeDurationMetric *HistogramMetricBuilder + initDurationMetric *HistogramMetricBuilder + memUsageMetric *HistogramMetricBuilder + coldstartsMetric *CounterMetricBuilder + errorsMetric *CounterMetricBuilder + invocationsMetric *CounterMetricBuilder + timeoutsMetric *CounterMetricBuilder +} + +func NewFaaSMetricBuilders(startTime pcommon.Timestamp) *FaaSMetricBuilders { + return &FaaSMetricBuilders{ + invokeDurationMetric: NewFasSInvokeDurationMetricBuilder(startTime), + initDurationMetric: NewFasSInitDurationMetricBuilder(startTime), + memUsageMetric: NewFaaSMemUsageMetricBuilder(startTime), + coldstartsMetric: NewFaaSColdstartsMetricBuilder(startTime), + errorsMetric: NewFaaSErrorsMetricBuilder(startTime), + invocationsMetric: NewFaaSInvocationsMetricBuilder(startTime), + timeoutsMetric: NewFaaSTimeoutsMetricBuilder(startTime), + } +} diff --git a/collector/receiver/telemetryapireceiver/metric_builder_test.go b/collector/receiver/telemetryapireceiver/metric_builder_test.go new file mode 100644 index 0000000000..330c474b8f --- /dev/null +++ b/collector/receiver/telemetryapireceiver/metric_builder_test.go @@ -0,0 +1,639 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package telemetryapireceiver + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + semconv2 "go.opentelemetry.io/otel/semconv/v1.24.0" +) + +func TestHistogramMetricBuilder_AppendDataPoint(t *testing.T) { + startTime := pcommon.NewTimestampFromTime(time.Now().Add(-time.Hour)) + + tests := []struct { + name string + builder *HistogramMetricBuilder + value float64 + expectedBucket int + }{ + { + name: "FaaS invoke duration - small value", + builder: NewFasSInvokeDurationMetricBuilder(startTime), + value: 0.007, + expectedBucket: 1, + }, + { + name: "FaaS invoke duration - middle value", + builder: NewFasSInvokeDurationMetricBuilder(startTime), + value: 0.5, + expectedBucket: 7, + }, + { + name: "FaaS invoke duration - large value", + builder: NewFasSInvokeDurationMetricBuilder(startTime), + value: 15.0, + expectedBucket: 14, + }, + { + name: "Default bounds - boundary value", + builder: NewHistogramMetricBuilder("test.histogram", "Test with default bounds", "By", DefaultHistogramBounds, startTime), + value: 100.0, + expectedBucket: 6, + }, + { + name: "Default bounds - zero value", + builder: NewHistogramMetricBuilder("test.histogram", "Test zero value", "By", nil, startTime), + value: 0.0, + expectedBucket: 0, + }, + { + name: "Memory usage histogram", + builder: NewFaaSMemUsageMetricBuilder(startTime), + value: 256.0, + expectedBucket: 8, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + scopeMetrics := rm.ScopeMetrics().AppendEmpty() + + timestamp := pcommon.NewTimestampFromTime(time.Now()) + + tt.builder.AppendDataPoint(scopeMetrics, timestamp, tt.value) + + require.Equal(t, 1, scopeMetrics.Metrics().Len()) + + metric := scopeMetrics.Metrics().At(0) + assert.Equal(t, tt.builder.name, metric.Name()) + assert.Equal(t, tt.builder.description, metric.Description()) + assert.Equal(t, tt.builder.unit, metric.Unit()) + + assert.Equal(t, pmetric.MetricTypeHistogram, metric.Type()) + + hist := metric.Histogram() + assert.Equal(t, tt.builder.temporality, hist.AggregationTemporality()) + + require.Equal(t, 1, hist.DataPoints().Len()) + dp := hist.DataPoints().At(0) + assert.Equal(t, startTime, dp.StartTimestamp()) + assert.Equal(t, timestamp, dp.Timestamp()) + assert.Equal(t, uint64(1), dp.Count()) + assert.Equal(t, tt.value, dp.Sum()) + + assert.Equal(t, len(tt.builder.bounds), dp.ExplicitBounds().Len()) + assert.Equal(t, len(tt.builder.bounds)+1, dp.BucketCounts().Len()) + + bucketCounts := dp.BucketCounts().AsRaw() + for i, count := range bucketCounts { + if i == tt.expectedBucket { + assert.Equal(t, uint64(1), count, "expected value in bucket %d", i) + } else { + assert.Equal(t, uint64(0), count, "expected no value in bucket %d", i) + } + } + }) + } +} + +func TestCounterMetricBuilder_AppendDataPoint(t *testing.T) { + startTime := pcommon.NewTimestampFromTime(time.Now().Add(-time.Hour)) + + tests := []struct { + name string + builder *CounterMetricBuilder + value int64 + isMonotonic bool + }{ + { + name: "FaaS coldstarts counter", + builder: NewFaaSColdstartsMetricBuilder(startTime), + value: 1, + isMonotonic: true, + }, + { + name: "FaaS errors counter", + builder: NewFaaSErrorsMetricBuilder(startTime), + value: 5, + isMonotonic: true, + }, + { + name: "FaaS invocations counter", + builder: NewFaaSInvocationsMetricBuilder(startTime), + value: 100, + isMonotonic: true, + }, + { + name: "FaaS timeouts counter", + builder: NewFaaSTimeoutsMetricBuilder(startTime), + value: 0, + isMonotonic: true, + }, + { + name: "Non-monotonic Counter", + builder: NewCounterMetricBuilder("test.counter", "Test non-monotonic counter", "{count}", false, startTime), + value: -10, + isMonotonic: false, + }, + { + name: "Counter with large value", + builder: NewCounterMetricBuilder("test.large_counter", "Test large counter value", "{count}", true, startTime), + value: 9223372036854775807, // max int64 + isMonotonic: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + scopeMetrics := rm.ScopeMetrics().AppendEmpty() + + timestamp := pcommon.NewTimestampFromTime(time.Now()) + + tt.builder.AppendDataPoint(scopeMetrics, timestamp, tt.value) + + require.Equal(t, 1, scopeMetrics.Metrics().Len()) + + metric := scopeMetrics.Metrics().At(0) + assert.Equal(t, tt.builder.name, metric.Name()) + assert.Equal(t, tt.builder.description, metric.Description()) + assert.Equal(t, tt.builder.unit, metric.Unit()) + + assert.Equal(t, pmetric.MetricTypeSum, metric.Type()) + + sum := metric.Sum() + assert.Equal(t, tt.builder.temporality, sum.AggregationTemporality()) + assert.Equal(t, tt.isMonotonic, sum.IsMonotonic()) + + require.Equal(t, 1, sum.DataPoints().Len()) + dp := sum.DataPoints().At(0) + assert.Equal(t, startTime, dp.StartTimestamp()) + assert.Equal(t, timestamp, dp.Timestamp()) + assert.Equal(t, tt.value, dp.IntValue()) + }) + } +} + +func TestFaaSMetricBuilderFactories(t *testing.T) { + startTime := pcommon.NewTimestampFromTime(time.Now()) + + t.Run("NewFasSInvokeDurationMetricBuilder", func(t *testing.T) { + builder := NewFasSInvokeDurationMetricBuilder(startTime) + assert.Equal(t, semconv2.FaaSInvokeDurationName, builder.name) + assert.Equal(t, semconv2.FaaSInvokeDurationDescription, builder.description) + assert.Equal(t, semconv2.FaaSInvokeDurationUnit, builder.unit) + assert.Equal(t, DurationHistogramBounds, builder.bounds) + assert.Equal(t, pmetric.AggregationTemporalityCumulative, builder.temporality) + assert.Equal(t, startTime, builder.startTime) + }) + + t.Run("NewFasSInitDurationMetricBuilder", func(t *testing.T) { + builder := NewFasSInitDurationMetricBuilder(startTime) + assert.Equal(t, semconv2.FaaSInitDurationName, builder.name) + assert.Equal(t, semconv2.FaaSInitDurationDescription, builder.description) + assert.Equal(t, semconv2.FaaSInitDurationUnit, builder.unit) + assert.Equal(t, DurationHistogramBounds, builder.bounds) + assert.Equal(t, pmetric.AggregationTemporalityCumulative, builder.temporality) + assert.Equal(t, startTime, builder.startTime) + }) + + t.Run("NewFaaSMemUsageMetricBuilder", func(t *testing.T) { + builder := NewFaaSMemUsageMetricBuilder(startTime) + assert.Equal(t, semconv2.FaaSMemUsageName, builder.name) + assert.Equal(t, semconv2.FaaSMemUsageDescription, builder.description) + assert.Equal(t, semconv2.FaaSMemUsageUnit, builder.unit) + assert.Equal(t, DefaultHistogramBounds, builder.bounds) + assert.Equal(t, pmetric.AggregationTemporalityCumulative, builder.temporality) + assert.Equal(t, startTime, builder.startTime) + }) + + t.Run("NewFaaSColdstartsMetricBuilder", func(t *testing.T) { + builder := NewFaaSColdstartsMetricBuilder(startTime) + assert.Equal(t, semconv2.FaaSColdstartsName, builder.name) + assert.Equal(t, semconv2.FaaSColdstartsDescription, builder.description) + assert.Equal(t, semconv2.FaaSColdstartsUnit, builder.unit) + assert.True(t, builder.isMonotonic) + assert.Equal(t, pmetric.AggregationTemporalityCumulative, builder.temporality) + assert.Equal(t, startTime, builder.startTime) + }) + + t.Run("NewFaaSErrorsMetricBuilder", func(t *testing.T) { + builder := NewFaaSErrorsMetricBuilder(startTime) + assert.Equal(t, semconv2.FaaSErrorsName, builder.name) + assert.Equal(t, semconv2.FaaSErrorsDescription, builder.description) + assert.Equal(t, semconv2.FaaSErrorsUnit, builder.unit) + assert.True(t, builder.isMonotonic) + assert.Equal(t, pmetric.AggregationTemporalityCumulative, builder.temporality) + assert.Equal(t, startTime, builder.startTime) + }) + + t.Run("NewFaaSInvocationsMetricBuilder", func(t *testing.T) { + builder := NewFaaSInvocationsMetricBuilder(startTime) + assert.Equal(t, semconv2.FaaSInvocationsName, builder.name) + assert.Equal(t, semconv2.FaaSInvocationsDescription, builder.description) + assert.Equal(t, semconv2.FaaSInvocationsUnit, builder.unit) + assert.True(t, builder.isMonotonic) + assert.Equal(t, pmetric.AggregationTemporalityCumulative, builder.temporality) + assert.Equal(t, startTime, builder.startTime) + }) + + t.Run("NewFaaSTimeoutsMetricBuilder", func(t *testing.T) { + builder := NewFaaSTimeoutsMetricBuilder(startTime) + assert.Equal(t, semconv2.FaaSTimeoutsName, builder.name) + assert.Equal(t, semconv2.FaaSTimeoutsDescription, builder.description) + assert.Equal(t, semconv2.FaaSTimeoutsUnit, builder.unit) + assert.True(t, builder.isMonotonic) + assert.Equal(t, pmetric.AggregationTemporalityCumulative, builder.temporality) + assert.Equal(t, startTime, builder.startTime) + }) +} + +func TestNewFaaSMetricBuilders(t *testing.T) { + startTime := pcommon.NewTimestampFromTime(time.Now()) + builders := NewFaaSMetricBuilders(startTime) + + require.NotNil(t, builders) + require.NotNil(t, builders.invokeDurationMetric) + require.NotNil(t, builders.initDurationMetric) + require.NotNil(t, builders.memUsageMetric) + require.NotNil(t, builders.coldstartsMetric) + require.NotNil(t, builders.errorsMetric) + require.NotNil(t, builders.invocationsMetric) + require.NotNil(t, builders.timeoutsMetric) + + assert.Equal(t, semconv2.FaaSInvokeDurationName, builders.invokeDurationMetric.name) + assert.Equal(t, semconv2.FaaSInitDurationName, builders.initDurationMetric.name) + assert.Equal(t, semconv2.FaaSMemUsageName, builders.memUsageMetric.name) + assert.Equal(t, semconv2.FaaSColdstartsName, builders.coldstartsMetric.name) + assert.Equal(t, semconv2.FaaSErrorsName, builders.errorsMetric.name) + assert.Equal(t, semconv2.FaaSInvocationsName, builders.invocationsMetric.name) + assert.Equal(t, semconv2.FaaSTimeoutsName, builders.timeoutsMetric.name) +} + +func TestDefaultHistogramBounds(t *testing.T) { + expected := []float64{0.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 750.0, 1000.0, 2500.0, 5000.0, 7500.0, 10000.0} + assert.Equal(t, expected, DefaultHistogramBounds) + assert.Len(t, DefaultHistogramBounds, 15) +} + +func TestDurationHistogramBounds(t *testing.T) { + expected := []float64{0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10} + assert.Equal(t, expected, DurationHistogramBounds) + assert.Len(t, DurationHistogramBounds, 14) +} + +func TestHistogramBucketPlacement(t *testing.T) { + startTime := pcommon.NewTimestampFromTime(time.Now().Add(-time.Hour)) + + tests := []struct { + name string + bounds []float64 + value float64 + expectedBucket int + }{ + { + name: "value less than first bound", + bounds: []float64{1.0, 5.0, 10.0}, + value: 0.5, + expectedBucket: 0, + }, + { + name: "value equals first bound", + bounds: []float64{1.0, 5.0, 10.0}, + value: 1.0, + expectedBucket: 0, + }, + { + name: "value between bounds", + bounds: []float64{1.0, 5.0, 10.0}, + value: 3.0, + expectedBucket: 1, + }, + { + name: "value equals middle bound", + bounds: []float64{1.0, 5.0, 10.0}, + value: 5.0, + expectedBucket: 1, + }, + { + name: "value greater than all bounds", + bounds: []float64{1.0, 5.0, 10.0}, + value: 15.0, + expectedBucket: 3, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + builder := NewHistogramMetricBuilder( + "test.histogram", + "Test bucket placement", + "1", + tt.bounds, + startTime, + ) + + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + scopeMetrics := rm.ScopeMetrics().AppendEmpty() + + timestamp := pcommon.NewTimestampFromTime(time.Now()) + builder.AppendDataPoint(scopeMetrics, timestamp, tt.value) + + dp := scopeMetrics.Metrics().At(0).Histogram().DataPoints().At(0) + bucketCounts := dp.BucketCounts().AsRaw() + + for i, count := range bucketCounts { + if i == tt.expectedBucket { + assert.Equal(t, uint64(1), count, "expected value in bucket %d for value %f", i, tt.value) + } else { + assert.Equal(t, uint64(0), count, "expected no value in bucket %d for value %f", i, tt.value) + } + } + }) + } +} + +func TestHistogramMetricBuilder_CumulativeDataPoints(t *testing.T) { + startTime := pcommon.NewTimestampFromTime(time.Now().Add(-time.Hour)) + + tests := []struct { + name string + builderFn func() *HistogramMetricBuilder + values []float64 + expectedCount uint64 + expectedSum float64 + expectedBucketIndex int + expectedBucketCount uint64 + checkAllBuckets bool + }{ + { + name: "two data points accumulate correctly", + builderFn: func() *HistogramMetricBuilder { + return NewFasSInvokeDurationMetricBuilder(startTime) + }, + values: []float64{0.1, 0.2}, + expectedCount: 2, + expectedSum: 0.3, + }, + { + name: "multiple data points across different buckets", + builderFn: func() *HistogramMetricBuilder { + return NewFasSInvokeDurationMetricBuilder(startTime) + }, + values: []float64{0.001, 0.05, 0.5, 1.0, 5.0}, + expectedCount: 5, + expectedSum: 6.551, + }, + { + name: "same bucket receives multiple values", + builderFn: func() *HistogramMetricBuilder { + return NewFasSInvokeDurationMetricBuilder(startTime) + }, + values: []float64{0.3, 0.35, 0.4, 0.45}, + expectedCount: 4, + expectedSum: 1.5, + expectedBucketIndex: 7, + expectedBucketCount: 4, + checkAllBuckets: true, + }, + { + name: "zero values accumulate correctly", + builderFn: func() *HistogramMetricBuilder { + return NewFasSInvokeDurationMetricBuilder(startTime) + }, + values: []float64{0.0, 0.0, 0.0, 0.0, 0.0}, + expectedCount: 5, + expectedSum: 0.0, + expectedBucketIndex: 0, + expectedBucketCount: 5, + checkAllBuckets: true, + }, + { + name: "large values in overflow bucket", + builderFn: func() *HistogramMetricBuilder { + return NewFasSInvokeDurationMetricBuilder(startTime) + }, + values: []float64{15.0, 20.0, 100.0}, + expectedCount: 3, + expectedSum: 135.0, + expectedBucketIndex: 14, + expectedBucketCount: 3, + checkAllBuckets: true, + }, + { + name: "memory usage histogram with realistic values", + builderFn: func() *HistogramMetricBuilder { + return NewFaaSMemUsageMetricBuilder(startTime) + }, + values: []float64{128.0, 256.0, 512.0, 384.0, 192.0}, + expectedCount: 5, + expectedSum: 1472.0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + builder := tt.builderFn() + + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + scopeMetrics := rm.ScopeMetrics().AppendEmpty() + + baseTime := time.Now() + for i, v := range tt.values { + ts := pcommon.NewTimestampFromTime(baseTime.Add(time.Duration(i) * time.Second)) + builder.AppendDataPoint(scopeMetrics, ts, v) + } + + require.Equal(t, len(tt.values), scopeMetrics.Metrics().Len()) + + lastDp := scopeMetrics.Metrics().At(len(tt.values) - 1).Histogram().DataPoints().At(0) + assert.Equal(t, tt.expectedCount, lastDp.Count()) + assert.InDelta(t, tt.expectedSum, lastDp.Sum(), 0.0001) + + if tt.checkAllBuckets { + bucketCounts := lastDp.BucketCounts().AsRaw() + for i, count := range bucketCounts { + if i == tt.expectedBucketIndex { + assert.Equal(t, tt.expectedBucketCount, count, "expected %d values in bucket %d", tt.expectedBucketCount, i) + } else { + assert.Equal(t, uint64(0), count, "bucket %d should be empty", i) + } + } + } + }) + } + + t.Run("start timestamp remains constant across data points", func(t *testing.T) { + builder := NewFasSInvokeDurationMetricBuilder(startTime) + + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + scopeMetrics := rm.ScopeMetrics().AppendEmpty() + + baseTime := time.Now() + for i := 0; i < 3; i++ { + ts := pcommon.NewTimestampFromTime(baseTime.Add(time.Duration(i) * time.Second)) + builder.AppendDataPoint(scopeMetrics, ts, float64(i)*0.1) + } + + for i := 0; i < 3; i++ { + dp := scopeMetrics.Metrics().At(i).Histogram().DataPoints().At(0) + assert.Equal(t, startTime, dp.StartTimestamp()) + } + }) +} + +func TestCounterMetricBuilder_CumulativeDataPoints(t *testing.T) { + startTime := pcommon.NewTimestampFromTime(time.Now().Add(-time.Hour)) + + tests := []struct { + name string + builderFn func() *CounterMetricBuilder + values []int64 + expectedTotal int64 + }{ + { + name: "two data points accumulate correctly", + builderFn: func() *CounterMetricBuilder { + return NewFaaSInvocationsMetricBuilder(startTime) + }, + values: []int64{5, 3}, + expectedTotal: 8, + }, + { + name: "multiple increments accumulate correctly", + builderFn: func() *CounterMetricBuilder { + return NewFaaSInvocationsMetricBuilder(startTime) + }, + values: []int64{1, 2, 3, 4, 5}, + expectedTotal: 15, + }, + { + name: "zero increments do not change total", + builderFn: func() *CounterMetricBuilder { + return NewFaaSInvocationsMetricBuilder(startTime) + }, + values: []int64{10, 0, 0, 5}, + expectedTotal: 15, + }, + { + name: "coldstarts counter increments by one", + builderFn: func() *CounterMetricBuilder { + return NewFaaSColdstartsMetricBuilder(startTime) + }, + values: []int64{1, 1, 1}, + expectedTotal: 3, + }, + { + name: "errors counter accumulates", + builderFn: func() *CounterMetricBuilder { + return NewFaaSErrorsMetricBuilder(startTime) + }, + values: []int64{2, 0, 1, 5, 0, 3}, + expectedTotal: 11, + }, + { + name: "timeouts counter accumulates", + builderFn: func() *CounterMetricBuilder { + return NewFaaSTimeoutsMetricBuilder(startTime) + }, + values: []int64{1, 1}, + expectedTotal: 2, + }, + { + name: "large values accumulate without overflow", + builderFn: func() *CounterMetricBuilder { + return NewFaaSInvocationsMetricBuilder(startTime) + }, + values: []int64{1000000000, 1000000000, 1000000000}, + expectedTotal: 3000000000, + }, + { + name: "non-monotonic counter allows negative deltas", + builderFn: func() *CounterMetricBuilder { + return NewCounterMetricBuilder("test.gauge", "Test gauge", "{count}", false, startTime) + }, + values: []int64{10, -3, 5, -7}, + expectedTotal: 5, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + builder := tt.builderFn() + + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + scopeMetrics := rm.ScopeMetrics().AppendEmpty() + + baseTime := time.Now() + for i, v := range tt.values { + ts := pcommon.NewTimestampFromTime(baseTime.Add(time.Duration(i) * time.Second)) + builder.AppendDataPoint(scopeMetrics, ts, v) + } + + require.Equal(t, len(tt.values), scopeMetrics.Metrics().Len()) + + lastDp := scopeMetrics.Metrics().At(len(tt.values) - 1).Sum().DataPoints().At(0) + assert.Equal(t, tt.expectedTotal, lastDp.IntValue()) + }) + } + + t.Run("start timestamp remains constant across data points", func(t *testing.T) { + builder := NewFaaSInvocationsMetricBuilder(startTime) + + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + scopeMetrics := rm.ScopeMetrics().AppendEmpty() + + baseTime := time.Now() + for i := 0; i < 3; i++ { + ts := pcommon.NewTimestampFromTime(baseTime.Add(time.Duration(i) * time.Second)) + builder.AppendDataPoint(scopeMetrics, ts, int64(i+1)) + } + + for i := 0; i < 3; i++ { + dp := scopeMetrics.Metrics().At(i).Sum().DataPoints().At(0) + assert.Equal(t, startTime, dp.StartTimestamp()) + } + }) + + t.Run("monotonic property is set correctly", func(t *testing.T) { + monotonicBuilder := NewFaaSInvocationsMetricBuilder(startTime) + nonMonotonicBuilder := NewCounterMetricBuilder("test.gauge", "Test gauge", "{count}", false, startTime) + + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + scopeMetrics := rm.ScopeMetrics().AppendEmpty() + + ts := pcommon.NewTimestampFromTime(time.Now()) + monotonicBuilder.AppendDataPoint(scopeMetrics, ts, 1) + nonMonotonicBuilder.AppendDataPoint(scopeMetrics, ts, 1) + + assert.True(t, scopeMetrics.Metrics().At(0).Sum().IsMonotonic()) + assert.False(t, scopeMetrics.Metrics().At(1).Sum().IsMonotonic()) + }) +} diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index b3beafd2bd..d21bf01dbf 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -161,7 +161,7 @@ func TestCreatePlatformInitSpan(t *testing.T) { receivertest.NewNopSettings(Type), ) require.NoError(t, err) - td, err := r.createPlatformInitSpan(tc.start, tc.end) + td, err := r.createPlatformInitSpan(make(map[string]any), tc.start, tc.end) if tc.expectError { require.Error(t, err) } else { From 6ed9e6c3fb19d16e7c58abe91f50003adcaa8a3b Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Wed, 10 Dec 2025 15:34:11 -0500 Subject: [PATCH 2/4] add support for delta metric temporality --- .../receiver/telemetryapireceiver/config.go | 16 +- .../telemetryapireceiver/metric_builder.go | 114 +++++--- .../metric_builder_test.go | 267 ++++++++++++++---- 3 files changed, 303 insertions(+), 94 deletions(-) diff --git a/collector/receiver/telemetryapireceiver/config.go b/collector/receiver/telemetryapireceiver/config.go index 246b8dde39..153f3f488a 100644 --- a/collector/receiver/telemetryapireceiver/config.go +++ b/collector/receiver/telemetryapireceiver/config.go @@ -16,14 +16,16 @@ package telemetryapireceiver // import "github.com/open-telemetry/opentelemetry- import ( "fmt" + "strings" ) // Config defines the configuration for the various elements of the receiver agent. type Config struct { - extensionID string - Port int `mapstructure:"port"` - Types []string `mapstructure:"types"` - LogReport bool `mapstructure:"log_report"` + extensionID string + Port int `mapstructure:"port"` + Types []string `mapstructure:"types"` + LogReport bool `mapstructure:"log_report"` + MetricsTemporality string `mapstructure:"metrics_temporality"` } // Validate validates the configuration by checking for missing or invalid fields @@ -33,5 +35,11 @@ func (cfg *Config) Validate() error { return fmt.Errorf("unknown extension type: %s", t) } } + if cfg.MetricsTemporality != "" { + temporality := strings.ToLower(cfg.MetricsTemporality) + if temporality != "delta" && temporality != "cumulative" { + return fmt.Errorf("unknown metrics temporality: %s", cfg.MetricsTemporality) + } + } return nil } diff --git a/collector/receiver/telemetryapireceiver/metric_builder.go b/collector/receiver/telemetryapireceiver/metric_builder.go index 55eb90ab47..f5a32d62f0 100644 --- a/collector/receiver/telemetryapireceiver/metric_builder.go +++ b/collector/receiver/telemetryapireceiver/metric_builder.go @@ -22,8 +22,12 @@ import ( semconv2 "go.opentelemetry.io/otel/semconv/v1.24.0" ) +const MiB = float64(1 << 20) +const GiB = float64(1 << 30) + var DefaultHistogramBounds = []float64{0.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 750.0, 1000.0, 2500.0, 5000.0, 7500.0, 10000.0} var DurationHistogramBounds = []float64{0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10} +var MemUsageHistogramBounds = []float64{16 * MiB, 32 * MiB, 64 * MiB, 128 * MiB, 256 * MiB, 512 * MiB, 768 * MiB, 1 * GiB, 2 * GiB, 3 * GiB, 4 * GiB, 6 * GiB, 8 * GiB} type HistogramMetricBuilder struct { name string @@ -37,12 +41,17 @@ type HistogramMetricBuilder struct { temporality pmetric.AggregationTemporality } -func NewHistogramMetricBuilder(name string, description string, unit string, bounds []float64, startTime pcommon.Timestamp) *HistogramMetricBuilder { +func NewHistogramMetricBuilder(name string, description string, unit string, bounds []float64, startTime pcommon.Timestamp, temporality pmetric.AggregationTemporality) *HistogramMetricBuilder { b := bounds if bounds == nil { b = DefaultHistogramBounds } + temp := temporality + if temporality == pmetric.AggregationTemporalityUnspecified { + temp = pmetric.AggregationTemporalityCumulative + } + counts := make([]uint64, len(b)+1) return &HistogramMetricBuilder{ name: name, @@ -51,11 +60,27 @@ func NewHistogramMetricBuilder(name string, description string, unit string, bou bounds: b, counts: counts, startTime: startTime, - temporality: pmetric.AggregationTemporalityCumulative, + temporality: temp, + } +} + +func (h *HistogramMetricBuilder) Record(value float64) { + h.sum += value + h.total++ + h.counts[sort.SearchFloat64s(h.bounds, value)]++ +} + +func (h *HistogramMetricBuilder) Reset(timestamp pcommon.Timestamp) { + h.startTime = timestamp + h.sum = 0 + h.total = 0 + + for i := range h.counts { + h.counts[i] = 0 } } -func (h *HistogramMetricBuilder) AppendDataPoint(scopeMetrics pmetric.ScopeMetrics, timestamp pcommon.Timestamp, value float64) { +func (h *HistogramMetricBuilder) AppendDataPoints(scopeMetrics pmetric.ScopeMetrics, timestamp pcommon.Timestamp) { metric := scopeMetrics.Metrics().AppendEmpty() metric.SetName(h.name) metric.SetDescription(h.description) @@ -64,11 +89,8 @@ func (h *HistogramMetricBuilder) AppendDataPoint(scopeMetrics pmetric.ScopeMetri hist := metric.SetEmptyHistogram() hist.SetAggregationTemporality(h.temporality) - h.sum += value - h.total++ - h.counts[sort.SearchFloat64s(h.bounds, value)]++ - dp := hist.DataPoints().AppendEmpty() + dp.Attributes() dp.SetStartTimestamp(h.startTime) dp.SetTimestamp(timestamp) dp.SetSum(h.sum) @@ -76,15 +98,9 @@ func (h *HistogramMetricBuilder) AppendDataPoint(scopeMetrics pmetric.ScopeMetri dp.BucketCounts().FromRaw(h.counts) dp.ExplicitBounds().FromRaw(h.bounds) -} -func (h *HistogramMetricBuilder) Reset(timestamp pcommon.Timestamp) { - h.startTime = timestamp - h.sum = 0 - h.total = 0 - - for i := range h.counts { - h.counts[i] = 0 + if h.temporality == pmetric.AggregationTemporalityDelta { + h.Reset(timestamp) } } @@ -98,18 +114,32 @@ type CounterMetricBuilder struct { startTime pcommon.Timestamp } -func NewCounterMetricBuilder(name string, description string, unit string, isMonotonic bool, startTime pcommon.Timestamp) *CounterMetricBuilder { +func NewCounterMetricBuilder(name string, description string, unit string, isMonotonic bool, startTime pcommon.Timestamp, temporality pmetric.AggregationTemporality) *CounterMetricBuilder { + temp := temporality + if temporality == pmetric.AggregationTemporalityUnspecified { + temp = pmetric.AggregationTemporalityCumulative + } + return &CounterMetricBuilder{ name: name, description: description, unit: unit, isMonotonic: isMonotonic, - temporality: pmetric.AggregationTemporalityCumulative, + temporality: temp, startTime: startTime, } } -func (c *CounterMetricBuilder) AppendDataPoint(scopeMetrics pmetric.ScopeMetrics, timestamp pcommon.Timestamp, value int64) { +func (c *CounterMetricBuilder) Add(value int64) { + c.total += value +} + +func (c *CounterMetricBuilder) Reset(timestamp pcommon.Timestamp) { + c.startTime = timestamp + c.total = 0 +} + +func (c *CounterMetricBuilder) AppendDataPoints(scopeMetrics pmetric.ScopeMetrics, timestamp pcommon.Timestamp) { metric := scopeMetrics.Metrics().AppendEmpty() metric.SetName(c.name) metric.SetDescription(c.description) @@ -119,86 +149,90 @@ func (c *CounterMetricBuilder) AppendDataPoint(scopeMetrics pmetric.ScopeMetrics sum.SetAggregationTemporality(c.temporality) sum.SetIsMonotonic(c.isMonotonic) - c.total += value - dp := sum.DataPoints().AppendEmpty() dp.SetStartTimestamp(c.startTime) dp.SetTimestamp(timestamp) dp.SetIntValue(c.total) -} -func (c *CounterMetricBuilder) Reset(timestamp pcommon.Timestamp) { - c.startTime = timestamp - c.total = 0 + if c.temporality == pmetric.AggregationTemporalityDelta { + c.Reset(timestamp) + } } -func NewFasSInvokeDurationMetricBuilder(startTime pcommon.Timestamp) *HistogramMetricBuilder { +func NewFasSInvokeDurationMetricBuilder(startTime pcommon.Timestamp, temporality pmetric.AggregationTemporality) *HistogramMetricBuilder { return NewHistogramMetricBuilder( semconv2.FaaSInvokeDurationName, semconv2.FaaSInvokeDurationDescription, semconv2.FaaSInvokeDurationUnit, DurationHistogramBounds, startTime, + temporality, ) } -func NewFasSInitDurationMetricBuilder(startTime pcommon.Timestamp) *HistogramMetricBuilder { +func NewFasSInitDurationMetricBuilder(startTime pcommon.Timestamp, temporality pmetric.AggregationTemporality) *HistogramMetricBuilder { return NewHistogramMetricBuilder( semconv2.FaaSInitDurationName, semconv2.FaaSInitDurationDescription, semconv2.FaaSInitDurationUnit, DurationHistogramBounds, startTime, + temporality, ) } -func NewFaaSMemUsageMetricBuilder(startTime pcommon.Timestamp) *HistogramMetricBuilder { +func NewFaaSMemUsageMetricBuilder(startTime pcommon.Timestamp, temporality pmetric.AggregationTemporality) *HistogramMetricBuilder { return NewHistogramMetricBuilder( semconv2.FaaSMemUsageName, semconv2.FaaSMemUsageDescription, semconv2.FaaSMemUsageUnit, - DefaultHistogramBounds, + MemUsageHistogramBounds, startTime, + temporality, ) } -func NewFaaSColdstartsMetricBuilder(startTime pcommon.Timestamp) *CounterMetricBuilder { +func NewFaaSColdstartsMetricBuilder(startTime pcommon.Timestamp, temporality pmetric.AggregationTemporality) *CounterMetricBuilder { return NewCounterMetricBuilder( semconv2.FaaSColdstartsName, semconv2.FaaSColdstartsDescription, semconv2.FaaSColdstartsUnit, true, startTime, + temporality, ) } -func NewFaaSErrorsMetricBuilder(startTime pcommon.Timestamp) *CounterMetricBuilder { +func NewFaaSErrorsMetricBuilder(startTime pcommon.Timestamp, temporality pmetric.AggregationTemporality) *CounterMetricBuilder { return NewCounterMetricBuilder( semconv2.FaaSErrorsName, semconv2.FaaSErrorsDescription, semconv2.FaaSErrorsUnit, true, startTime, + temporality, ) } -func NewFaaSInvocationsMetricBuilder(startTime pcommon.Timestamp) *CounterMetricBuilder { +func NewFaaSInvocationsMetricBuilder(startTime pcommon.Timestamp, temporality pmetric.AggregationTemporality) *CounterMetricBuilder { return NewCounterMetricBuilder( semconv2.FaaSInvocationsName, semconv2.FaaSInvocationsDescription, semconv2.FaaSInvocationsUnit, true, startTime, + temporality, ) } -func NewFaaSTimeoutsMetricBuilder(startTime pcommon.Timestamp) *CounterMetricBuilder { +func NewFaaSTimeoutsMetricBuilder(startTime pcommon.Timestamp, temporality pmetric.AggregationTemporality) *CounterMetricBuilder { return NewCounterMetricBuilder( semconv2.FaaSTimeoutsName, semconv2.FaaSTimeoutsDescription, semconv2.FaaSTimeoutsUnit, true, startTime, + temporality, ) } @@ -212,14 +246,14 @@ type FaaSMetricBuilders struct { timeoutsMetric *CounterMetricBuilder } -func NewFaaSMetricBuilders(startTime pcommon.Timestamp) *FaaSMetricBuilders { +func NewFaaSMetricBuilders(startTime pcommon.Timestamp, temporality pmetric.AggregationTemporality) *FaaSMetricBuilders { return &FaaSMetricBuilders{ - invokeDurationMetric: NewFasSInvokeDurationMetricBuilder(startTime), - initDurationMetric: NewFasSInitDurationMetricBuilder(startTime), - memUsageMetric: NewFaaSMemUsageMetricBuilder(startTime), - coldstartsMetric: NewFaaSColdstartsMetricBuilder(startTime), - errorsMetric: NewFaaSErrorsMetricBuilder(startTime), - invocationsMetric: NewFaaSInvocationsMetricBuilder(startTime), - timeoutsMetric: NewFaaSTimeoutsMetricBuilder(startTime), + invokeDurationMetric: NewFasSInvokeDurationMetricBuilder(startTime, temporality), + initDurationMetric: NewFasSInitDurationMetricBuilder(startTime, temporality), + memUsageMetric: NewFaaSMemUsageMetricBuilder(startTime, temporality), + coldstartsMetric: NewFaaSColdstartsMetricBuilder(startTime, temporality), + errorsMetric: NewFaaSErrorsMetricBuilder(startTime, temporality), + invocationsMetric: NewFaaSInvocationsMetricBuilder(startTime, temporality), + timeoutsMetric: NewFaaSTimeoutsMetricBuilder(startTime, temporality), } } diff --git a/collector/receiver/telemetryapireceiver/metric_builder_test.go b/collector/receiver/telemetryapireceiver/metric_builder_test.go index 330c474b8f..6ee781c280 100644 --- a/collector/receiver/telemetryapireceiver/metric_builder_test.go +++ b/collector/receiver/telemetryapireceiver/metric_builder_test.go @@ -36,39 +36,39 @@ func TestHistogramMetricBuilder_AppendDataPoint(t *testing.T) { }{ { name: "FaaS invoke duration - small value", - builder: NewFasSInvokeDurationMetricBuilder(startTime), + builder: NewFasSInvokeDurationMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative), value: 0.007, expectedBucket: 1, }, { name: "FaaS invoke duration - middle value", - builder: NewFasSInvokeDurationMetricBuilder(startTime), + builder: NewFasSInvokeDurationMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative), value: 0.5, expectedBucket: 7, }, { name: "FaaS invoke duration - large value", - builder: NewFasSInvokeDurationMetricBuilder(startTime), + builder: NewFasSInvokeDurationMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative), value: 15.0, expectedBucket: 14, }, { name: "Default bounds - boundary value", - builder: NewHistogramMetricBuilder("test.histogram", "Test with default bounds", "By", DefaultHistogramBounds, startTime), + builder: NewHistogramMetricBuilder("test.histogram", "Test with default bounds", "By", DefaultHistogramBounds, startTime, pmetric.AggregationTemporalityCumulative), value: 100.0, expectedBucket: 6, }, { name: "Default bounds - zero value", - builder: NewHistogramMetricBuilder("test.histogram", "Test zero value", "By", nil, startTime), + builder: NewHistogramMetricBuilder("test.histogram", "Test zero value", "By", nil, startTime, pmetric.AggregationTemporalityCumulative), value: 0.0, expectedBucket: 0, }, { name: "Memory usage histogram", - builder: NewFaaSMemUsageMetricBuilder(startTime), - value: 256.0, - expectedBucket: 8, + builder: NewFaaSMemUsageMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative), + value: 256.0 * 1024 * 1024, + expectedBucket: 4, }, } @@ -80,7 +80,8 @@ func TestHistogramMetricBuilder_AppendDataPoint(t *testing.T) { timestamp := pcommon.NewTimestampFromTime(time.Now()) - tt.builder.AppendDataPoint(scopeMetrics, timestamp, tt.value) + tt.builder.Record(tt.value) + tt.builder.AppendDataPoints(scopeMetrics, timestamp) require.Equal(t, 1, scopeMetrics.Metrics().Len()) @@ -127,37 +128,37 @@ func TestCounterMetricBuilder_AppendDataPoint(t *testing.T) { }{ { name: "FaaS coldstarts counter", - builder: NewFaaSColdstartsMetricBuilder(startTime), + builder: NewFaaSColdstartsMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative), value: 1, isMonotonic: true, }, { name: "FaaS errors counter", - builder: NewFaaSErrorsMetricBuilder(startTime), + builder: NewFaaSErrorsMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative), value: 5, isMonotonic: true, }, { name: "FaaS invocations counter", - builder: NewFaaSInvocationsMetricBuilder(startTime), + builder: NewFaaSInvocationsMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative), value: 100, isMonotonic: true, }, { name: "FaaS timeouts counter", - builder: NewFaaSTimeoutsMetricBuilder(startTime), + builder: NewFaaSTimeoutsMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative), value: 0, isMonotonic: true, }, { name: "Non-monotonic Counter", - builder: NewCounterMetricBuilder("test.counter", "Test non-monotonic counter", "{count}", false, startTime), + builder: NewCounterMetricBuilder("test.counter", "Test non-monotonic counter", "{count}", false, startTime, pmetric.AggregationTemporalityCumulative), value: -10, isMonotonic: false, }, { name: "Counter with large value", - builder: NewCounterMetricBuilder("test.large_counter", "Test large counter value", "{count}", true, startTime), + builder: NewCounterMetricBuilder("test.large_counter", "Test large counter value", "{count}", true, startTime, pmetric.AggregationTemporalityCumulative), value: 9223372036854775807, // max int64 isMonotonic: true, }, @@ -171,7 +172,8 @@ func TestCounterMetricBuilder_AppendDataPoint(t *testing.T) { timestamp := pcommon.NewTimestampFromTime(time.Now()) - tt.builder.AppendDataPoint(scopeMetrics, timestamp, tt.value) + tt.builder.Add(tt.value) + tt.builder.AppendDataPoints(scopeMetrics, timestamp) require.Equal(t, 1, scopeMetrics.Metrics().Len()) @@ -199,7 +201,7 @@ func TestFaaSMetricBuilderFactories(t *testing.T) { startTime := pcommon.NewTimestampFromTime(time.Now()) t.Run("NewFasSInvokeDurationMetricBuilder", func(t *testing.T) { - builder := NewFasSInvokeDurationMetricBuilder(startTime) + builder := NewFasSInvokeDurationMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) assert.Equal(t, semconv2.FaaSInvokeDurationName, builder.name) assert.Equal(t, semconv2.FaaSInvokeDurationDescription, builder.description) assert.Equal(t, semconv2.FaaSInvokeDurationUnit, builder.unit) @@ -209,7 +211,7 @@ func TestFaaSMetricBuilderFactories(t *testing.T) { }) t.Run("NewFasSInitDurationMetricBuilder", func(t *testing.T) { - builder := NewFasSInitDurationMetricBuilder(startTime) + builder := NewFasSInitDurationMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) assert.Equal(t, semconv2.FaaSInitDurationName, builder.name) assert.Equal(t, semconv2.FaaSInitDurationDescription, builder.description) assert.Equal(t, semconv2.FaaSInitDurationUnit, builder.unit) @@ -219,17 +221,17 @@ func TestFaaSMetricBuilderFactories(t *testing.T) { }) t.Run("NewFaaSMemUsageMetricBuilder", func(t *testing.T) { - builder := NewFaaSMemUsageMetricBuilder(startTime) + builder := NewFaaSMemUsageMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) assert.Equal(t, semconv2.FaaSMemUsageName, builder.name) assert.Equal(t, semconv2.FaaSMemUsageDescription, builder.description) assert.Equal(t, semconv2.FaaSMemUsageUnit, builder.unit) - assert.Equal(t, DefaultHistogramBounds, builder.bounds) + assert.Equal(t, MemUsageHistogramBounds, builder.bounds) assert.Equal(t, pmetric.AggregationTemporalityCumulative, builder.temporality) assert.Equal(t, startTime, builder.startTime) }) t.Run("NewFaaSColdstartsMetricBuilder", func(t *testing.T) { - builder := NewFaaSColdstartsMetricBuilder(startTime) + builder := NewFaaSColdstartsMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) assert.Equal(t, semconv2.FaaSColdstartsName, builder.name) assert.Equal(t, semconv2.FaaSColdstartsDescription, builder.description) assert.Equal(t, semconv2.FaaSColdstartsUnit, builder.unit) @@ -239,7 +241,7 @@ func TestFaaSMetricBuilderFactories(t *testing.T) { }) t.Run("NewFaaSErrorsMetricBuilder", func(t *testing.T) { - builder := NewFaaSErrorsMetricBuilder(startTime) + builder := NewFaaSErrorsMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) assert.Equal(t, semconv2.FaaSErrorsName, builder.name) assert.Equal(t, semconv2.FaaSErrorsDescription, builder.description) assert.Equal(t, semconv2.FaaSErrorsUnit, builder.unit) @@ -249,7 +251,7 @@ func TestFaaSMetricBuilderFactories(t *testing.T) { }) t.Run("NewFaaSInvocationsMetricBuilder", func(t *testing.T) { - builder := NewFaaSInvocationsMetricBuilder(startTime) + builder := NewFaaSInvocationsMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) assert.Equal(t, semconv2.FaaSInvocationsName, builder.name) assert.Equal(t, semconv2.FaaSInvocationsDescription, builder.description) assert.Equal(t, semconv2.FaaSInvocationsUnit, builder.unit) @@ -259,7 +261,7 @@ func TestFaaSMetricBuilderFactories(t *testing.T) { }) t.Run("NewFaaSTimeoutsMetricBuilder", func(t *testing.T) { - builder := NewFaaSTimeoutsMetricBuilder(startTime) + builder := NewFaaSTimeoutsMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) assert.Equal(t, semconv2.FaaSTimeoutsName, builder.name) assert.Equal(t, semconv2.FaaSTimeoutsDescription, builder.description) assert.Equal(t, semconv2.FaaSTimeoutsUnit, builder.unit) @@ -271,7 +273,7 @@ func TestFaaSMetricBuilderFactories(t *testing.T) { func TestNewFaaSMetricBuilders(t *testing.T) { startTime := pcommon.NewTimestampFromTime(time.Now()) - builders := NewFaaSMetricBuilders(startTime) + builders := NewFaaSMetricBuilders(startTime, pmetric.AggregationTemporalityCumulative) require.NotNil(t, builders) require.NotNil(t, builders.invokeDurationMetric) @@ -352,6 +354,7 @@ func TestHistogramBucketPlacement(t *testing.T) { "1", tt.bounds, startTime, + pmetric.AggregationTemporalityCumulative, ) metrics := pmetric.NewMetrics() @@ -359,7 +362,8 @@ func TestHistogramBucketPlacement(t *testing.T) { scopeMetrics := rm.ScopeMetrics().AppendEmpty() timestamp := pcommon.NewTimestampFromTime(time.Now()) - builder.AppendDataPoint(scopeMetrics, timestamp, tt.value) + builder.Record(tt.value) + builder.AppendDataPoints(scopeMetrics, timestamp) dp := scopeMetrics.Metrics().At(0).Histogram().DataPoints().At(0) bucketCounts := dp.BucketCounts().AsRaw() @@ -391,7 +395,7 @@ func TestHistogramMetricBuilder_CumulativeDataPoints(t *testing.T) { { name: "two data points accumulate correctly", builderFn: func() *HistogramMetricBuilder { - return NewFasSInvokeDurationMetricBuilder(startTime) + return NewFasSInvokeDurationMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) }, values: []float64{0.1, 0.2}, expectedCount: 2, @@ -400,7 +404,7 @@ func TestHistogramMetricBuilder_CumulativeDataPoints(t *testing.T) { { name: "multiple data points across different buckets", builderFn: func() *HistogramMetricBuilder { - return NewFasSInvokeDurationMetricBuilder(startTime) + return NewFasSInvokeDurationMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) }, values: []float64{0.001, 0.05, 0.5, 1.0, 5.0}, expectedCount: 5, @@ -409,7 +413,7 @@ func TestHistogramMetricBuilder_CumulativeDataPoints(t *testing.T) { { name: "same bucket receives multiple values", builderFn: func() *HistogramMetricBuilder { - return NewFasSInvokeDurationMetricBuilder(startTime) + return NewFasSInvokeDurationMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) }, values: []float64{0.3, 0.35, 0.4, 0.45}, expectedCount: 4, @@ -421,7 +425,7 @@ func TestHistogramMetricBuilder_CumulativeDataPoints(t *testing.T) { { name: "zero values accumulate correctly", builderFn: func() *HistogramMetricBuilder { - return NewFasSInvokeDurationMetricBuilder(startTime) + return NewFasSInvokeDurationMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) }, values: []float64{0.0, 0.0, 0.0, 0.0, 0.0}, expectedCount: 5, @@ -433,7 +437,7 @@ func TestHistogramMetricBuilder_CumulativeDataPoints(t *testing.T) { { name: "large values in overflow bucket", builderFn: func() *HistogramMetricBuilder { - return NewFasSInvokeDurationMetricBuilder(startTime) + return NewFasSInvokeDurationMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) }, values: []float64{15.0, 20.0, 100.0}, expectedCount: 3, @@ -445,7 +449,7 @@ func TestHistogramMetricBuilder_CumulativeDataPoints(t *testing.T) { { name: "memory usage histogram with realistic values", builderFn: func() *HistogramMetricBuilder { - return NewFaaSMemUsageMetricBuilder(startTime) + return NewFaaSMemUsageMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) }, values: []float64{128.0, 256.0, 512.0, 384.0, 192.0}, expectedCount: 5, @@ -464,7 +468,8 @@ func TestHistogramMetricBuilder_CumulativeDataPoints(t *testing.T) { baseTime := time.Now() for i, v := range tt.values { ts := pcommon.NewTimestampFromTime(baseTime.Add(time.Duration(i) * time.Second)) - builder.AppendDataPoint(scopeMetrics, ts, v) + builder.Record(v) + builder.AppendDataPoints(scopeMetrics, ts) } require.Equal(t, len(tt.values), scopeMetrics.Metrics().Len()) @@ -487,7 +492,7 @@ func TestHistogramMetricBuilder_CumulativeDataPoints(t *testing.T) { } t.Run("start timestamp remains constant across data points", func(t *testing.T) { - builder := NewFasSInvokeDurationMetricBuilder(startTime) + builder := NewFasSInvokeDurationMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) metrics := pmetric.NewMetrics() rm := metrics.ResourceMetrics().AppendEmpty() @@ -496,7 +501,8 @@ func TestHistogramMetricBuilder_CumulativeDataPoints(t *testing.T) { baseTime := time.Now() for i := 0; i < 3; i++ { ts := pcommon.NewTimestampFromTime(baseTime.Add(time.Duration(i) * time.Second)) - builder.AppendDataPoint(scopeMetrics, ts, float64(i)*0.1) + builder.Record(float64(i) * 0.1) + builder.AppendDataPoints(scopeMetrics, ts) } for i := 0; i < 3; i++ { @@ -518,7 +524,7 @@ func TestCounterMetricBuilder_CumulativeDataPoints(t *testing.T) { { name: "two data points accumulate correctly", builderFn: func() *CounterMetricBuilder { - return NewFaaSInvocationsMetricBuilder(startTime) + return NewFaaSInvocationsMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) }, values: []int64{5, 3}, expectedTotal: 8, @@ -526,7 +532,7 @@ func TestCounterMetricBuilder_CumulativeDataPoints(t *testing.T) { { name: "multiple increments accumulate correctly", builderFn: func() *CounterMetricBuilder { - return NewFaaSInvocationsMetricBuilder(startTime) + return NewFaaSInvocationsMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) }, values: []int64{1, 2, 3, 4, 5}, expectedTotal: 15, @@ -534,7 +540,7 @@ func TestCounterMetricBuilder_CumulativeDataPoints(t *testing.T) { { name: "zero increments do not change total", builderFn: func() *CounterMetricBuilder { - return NewFaaSInvocationsMetricBuilder(startTime) + return NewFaaSInvocationsMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) }, values: []int64{10, 0, 0, 5}, expectedTotal: 15, @@ -542,7 +548,7 @@ func TestCounterMetricBuilder_CumulativeDataPoints(t *testing.T) { { name: "coldstarts counter increments by one", builderFn: func() *CounterMetricBuilder { - return NewFaaSColdstartsMetricBuilder(startTime) + return NewFaaSColdstartsMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) }, values: []int64{1, 1, 1}, expectedTotal: 3, @@ -550,7 +556,7 @@ func TestCounterMetricBuilder_CumulativeDataPoints(t *testing.T) { { name: "errors counter accumulates", builderFn: func() *CounterMetricBuilder { - return NewFaaSErrorsMetricBuilder(startTime) + return NewFaaSErrorsMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) }, values: []int64{2, 0, 1, 5, 0, 3}, expectedTotal: 11, @@ -558,7 +564,7 @@ func TestCounterMetricBuilder_CumulativeDataPoints(t *testing.T) { { name: "timeouts counter accumulates", builderFn: func() *CounterMetricBuilder { - return NewFaaSTimeoutsMetricBuilder(startTime) + return NewFaaSTimeoutsMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) }, values: []int64{1, 1}, expectedTotal: 2, @@ -566,7 +572,7 @@ func TestCounterMetricBuilder_CumulativeDataPoints(t *testing.T) { { name: "large values accumulate without overflow", builderFn: func() *CounterMetricBuilder { - return NewFaaSInvocationsMetricBuilder(startTime) + return NewFaaSInvocationsMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) }, values: []int64{1000000000, 1000000000, 1000000000}, expectedTotal: 3000000000, @@ -574,7 +580,7 @@ func TestCounterMetricBuilder_CumulativeDataPoints(t *testing.T) { { name: "non-monotonic counter allows negative deltas", builderFn: func() *CounterMetricBuilder { - return NewCounterMetricBuilder("test.gauge", "Test gauge", "{count}", false, startTime) + return NewCounterMetricBuilder("test.gauge", "Test gauge", "{count}", false, startTime, pmetric.AggregationTemporalityCumulative) }, values: []int64{10, -3, 5, -7}, expectedTotal: 5, @@ -592,7 +598,8 @@ func TestCounterMetricBuilder_CumulativeDataPoints(t *testing.T) { baseTime := time.Now() for i, v := range tt.values { ts := pcommon.NewTimestampFromTime(baseTime.Add(time.Duration(i) * time.Second)) - builder.AppendDataPoint(scopeMetrics, ts, v) + builder.Add(v) + builder.AppendDataPoints(scopeMetrics, ts) } require.Equal(t, len(tt.values), scopeMetrics.Metrics().Len()) @@ -603,7 +610,7 @@ func TestCounterMetricBuilder_CumulativeDataPoints(t *testing.T) { } t.Run("start timestamp remains constant across data points", func(t *testing.T) { - builder := NewFaaSInvocationsMetricBuilder(startTime) + builder := NewFaaSInvocationsMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) metrics := pmetric.NewMetrics() rm := metrics.ResourceMetrics().AppendEmpty() @@ -612,7 +619,8 @@ func TestCounterMetricBuilder_CumulativeDataPoints(t *testing.T) { baseTime := time.Now() for i := 0; i < 3; i++ { ts := pcommon.NewTimestampFromTime(baseTime.Add(time.Duration(i) * time.Second)) - builder.AppendDataPoint(scopeMetrics, ts, int64(i+1)) + builder.Add(int64(i + 1)) + builder.AppendDataPoints(scopeMetrics, ts) } for i := 0; i < 3; i++ { @@ -622,18 +630,177 @@ func TestCounterMetricBuilder_CumulativeDataPoints(t *testing.T) { }) t.Run("monotonic property is set correctly", func(t *testing.T) { - monotonicBuilder := NewFaaSInvocationsMetricBuilder(startTime) - nonMonotonicBuilder := NewCounterMetricBuilder("test.gauge", "Test gauge", "{count}", false, startTime) + monotonicBuilder := NewFaaSInvocationsMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) + nonMonotonicBuilder := NewCounterMetricBuilder("test.gauge", "Test gauge", "{count}", false, startTime, pmetric.AggregationTemporalityCumulative) metrics := pmetric.NewMetrics() rm := metrics.ResourceMetrics().AppendEmpty() scopeMetrics := rm.ScopeMetrics().AppendEmpty() ts := pcommon.NewTimestampFromTime(time.Now()) - monotonicBuilder.AppendDataPoint(scopeMetrics, ts, 1) - nonMonotonicBuilder.AppendDataPoint(scopeMetrics, ts, 1) + monotonicBuilder.Add(1) + nonMonotonicBuilder.Add(1) + monotonicBuilder.AppendDataPoints(scopeMetrics, ts) + nonMonotonicBuilder.AppendDataPoints(scopeMetrics, ts) assert.True(t, scopeMetrics.Metrics().At(0).Sum().IsMonotonic()) assert.False(t, scopeMetrics.Metrics().At(1).Sum().IsMonotonic()) }) } + +func TestHistogramMetricBuilder_AggregationTemporality(t *testing.T) { + startTime := pcommon.NewTimestampFromTime(time.Now().Add(-time.Hour)) + + t.Run("unspecified temporality defaults to cumulative", func(t *testing.T) { + builder := NewHistogramMetricBuilder( + "test.histogram", + "Test histogram", + "ms", + nil, + startTime, + pmetric.AggregationTemporalityUnspecified, + ) + + assert.Equal(t, pmetric.AggregationTemporalityCumulative, builder.temporality) + + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + scopeMetrics := rm.ScopeMetrics().AppendEmpty() + + timestamp := pcommon.NewTimestampFromTime(time.Now()) + builder.Record(1.0) + builder.AppendDataPoints(scopeMetrics, timestamp) + + hist := scopeMetrics.Metrics().At(0).Histogram() + assert.Equal(t, pmetric.AggregationTemporalityCumulative, hist.AggregationTemporality()) + }) + + t.Run("cumulative temporality accumulates values", func(t *testing.T) { + builder := NewHistogramMetricBuilder( + "test.histogram", + "Test histogram", + "ms", + []float64{1.0, 5.0, 10.0}, + startTime, + pmetric.AggregationTemporalityCumulative, + ) + + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + scopeMetrics := rm.ScopeMetrics().AppendEmpty() + + baseTime := time.Now() + values := []float64{2.0, 3.0, 7.0} + + for i, v := range values { + ts := pcommon.NewTimestampFromTime(baseTime.Add(time.Duration(i) * time.Second)) + builder.Record(v) + builder.AppendDataPoints(scopeMetrics, ts) + } + + require.Equal(t, 3, scopeMetrics.Metrics().Len()) + + dp1 := scopeMetrics.Metrics().At(0).Histogram().DataPoints().At(0) + assert.Equal(t, uint64(1), dp1.Count()) + assert.Equal(t, 2.0, dp1.Sum()) + assert.Equal(t, startTime, dp1.StartTimestamp()) + + dp2 := scopeMetrics.Metrics().At(1).Histogram().DataPoints().At(0) + assert.Equal(t, uint64(2), dp2.Count()) + assert.Equal(t, 5.0, dp2.Sum()) + assert.Equal(t, startTime, dp2.StartTimestamp()) + + dp3 := scopeMetrics.Metrics().At(2).Histogram().DataPoints().At(0) + assert.Equal(t, uint64(3), dp3.Count()) + assert.Equal(t, 12.0, dp3.Sum()) + assert.Equal(t, startTime, dp3.StartTimestamp()) + }) + + t.Run("delta temporality resets after each append", func(t *testing.T) { + builder := NewHistogramMetricBuilder( + "test.histogram", + "Test histogram", + "ms", + []float64{1.0, 5.0, 10.0}, + startTime, + pmetric.AggregationTemporalityDelta, + ) + + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + scopeMetrics := rm.ScopeMetrics().AppendEmpty() + + baseTime := time.Now() + ts1 := pcommon.NewTimestampFromTime(baseTime) + ts2 := pcommon.NewTimestampFromTime(baseTime.Add(time.Second)) + ts3 := pcommon.NewTimestampFromTime(baseTime.Add(2 * time.Second)) + + builder.Record(2.0) + builder.Record(3.0) + builder.AppendDataPoints(scopeMetrics, ts1) + + builder.Record(7.0) + builder.AppendDataPoints(scopeMetrics, ts2) + + builder.Record(1.5) + builder.Record(8.0) + builder.AppendDataPoints(scopeMetrics, ts3) + + require.Equal(t, 3, scopeMetrics.Metrics().Len()) + + dp1 := scopeMetrics.Metrics().At(0).Histogram().DataPoints().At(0) + assert.Equal(t, uint64(2), dp1.Count()) + assert.Equal(t, 5.0, dp1.Sum()) + assert.Equal(t, startTime, dp1.StartTimestamp()) + + dp2 := scopeMetrics.Metrics().At(1).Histogram().DataPoints().At(0) + assert.Equal(t, uint64(1), dp2.Count()) + assert.Equal(t, 7.0, dp2.Sum()) + assert.Equal(t, ts1, dp2.StartTimestamp()) + + dp3 := scopeMetrics.Metrics().At(2).Histogram().DataPoints().At(0) + assert.Equal(t, uint64(2), dp3.Count()) + assert.Equal(t, 9.5, dp3.Sum()) + assert.Equal(t, ts2, dp3.StartTimestamp()) + }) + + t.Run("delta temporality resets bucket counts", func(t *testing.T) { + builder := NewHistogramMetricBuilder( + "test.histogram", + "Test histogram", + "ms", + []float64{5.0, 10.0}, + startTime, + pmetric.AggregationTemporalityDelta, + ) + + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + scopeMetrics := rm.ScopeMetrics().AppendEmpty() + + baseTime := time.Now() + ts1 := pcommon.NewTimestampFromTime(baseTime) + ts2 := pcommon.NewTimestampFromTime(baseTime.Add(time.Second)) + + builder.Record(1.0) + builder.Record(2.0) + builder.Record(3.0) + builder.AppendDataPoints(scopeMetrics, ts1) + + builder.Record(15.0) + builder.Record(20.0) + builder.AppendDataPoints(scopeMetrics, ts2) + + dp1 := scopeMetrics.Metrics().At(0).Histogram().DataPoints().At(0) + buckets1 := dp1.BucketCounts().AsRaw() + assert.Equal(t, uint64(3), buckets1[0]) + assert.Equal(t, uint64(0), buckets1[1]) + assert.Equal(t, uint64(0), buckets1[2]) + + dp2 := scopeMetrics.Metrics().At(1).Histogram().DataPoints().At(0) + buckets2 := dp2.BucketCounts().AsRaw() + assert.Equal(t, uint64(0), buckets2[0]) + assert.Equal(t, uint64(0), buckets2[1]) + assert.Equal(t, uint64(2), buckets2[2]) + }) +} From 4767c1f5993bfc53a6bbc3f4403586da84307319 Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Wed, 10 Dec 2025 23:07:16 -0500 Subject: [PATCH 3/4] add ability to include metric attributes --- .../telemetryapireceiver/metric_builder.go | 192 +++++- .../metric_builder_attributes_test.go | 618 ++++++++++++++++++ .../metric_builder_test.go | 22 +- 3 files changed, 787 insertions(+), 45 deletions(-) create mode 100644 collector/receiver/telemetryapireceiver/metric_builder_attributes_test.go diff --git a/collector/receiver/telemetryapireceiver/metric_builder.go b/collector/receiver/telemetryapireceiver/metric_builder.go index f5a32d62f0..60ab4ffa06 100644 --- a/collector/receiver/telemetryapireceiver/metric_builder.go +++ b/collector/receiver/telemetryapireceiver/metric_builder.go @@ -17,6 +17,7 @@ package telemetryapireceiver import ( "sort" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" semconv2 "go.opentelemetry.io/otel/semconv/v1.24.0" @@ -29,16 +30,33 @@ var DefaultHistogramBounds = []float64{0.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, var DurationHistogramBounds = []float64{0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10} var MemUsageHistogramBounds = []float64{16 * MiB, 32 * MiB, 64 * MiB, 128 * MiB, 256 * MiB, 512 * MiB, 768 * MiB, 1 * GiB, 2 * GiB, 3 * GiB, 4 * GiB, 6 * GiB, 8 * GiB} +type histogramDataPoint struct { + attributes pcommon.Map + counts []uint64 + total uint64 + sum float64 + startTime pcommon.Timestamp + lastUpdated uint64 // epoch when this data point was last updated +} + +func newHistogramDataPoint(attrs pcommon.Map, numBuckets int, startTime pcommon.Timestamp, epoch uint64) *histogramDataPoint { + return &histogramDataPoint{ + attributes: attrs, + counts: make([]uint64, numBuckets), + startTime: startTime, + lastUpdated: epoch, + } +} + type HistogramMetricBuilder struct { name string description string unit string bounds []float64 - counts []uint64 - total uint64 - sum float64 + dataPoints map[[16]byte]*histogramDataPoint startTime pcommon.Timestamp temporality pmetric.AggregationTemporality + epoch uint64 // current epoch counter } func NewHistogramMetricBuilder(name string, description string, unit string, bounds []float64, startTime pcommon.Timestamp, temporality pmetric.AggregationTemporality) *HistogramMetricBuilder { @@ -52,35 +70,67 @@ func NewHistogramMetricBuilder(name string, description string, unit string, bou temp = pmetric.AggregationTemporalityCumulative } - counts := make([]uint64, len(b)+1) return &HistogramMetricBuilder{ name: name, description: description, unit: unit, bounds: b, - counts: counts, + dataPoints: make(map[[16]byte]*histogramDataPoint), startTime: startTime, temporality: temp, + epoch: 0, } } func (h *HistogramMetricBuilder) Record(value float64) { - h.sum += value - h.total++ - h.counts[sort.SearchFloat64s(h.bounds, value)]++ + h.RecordWithAttributes(value, pcommon.NewMap()) } -func (h *HistogramMetricBuilder) Reset(timestamp pcommon.Timestamp) { - h.startTime = timestamp - h.sum = 0 - h.total = 0 +func (h *HistogramMetricBuilder) RecordWithAttributes(value float64, attrs pcommon.Map) { + key := pdatautil.MapHash(attrs) + dp, exists := h.dataPoints[key] + if !exists { + dp = newHistogramDataPoint(attrs, len(h.bounds)+1, h.startTime, h.epoch) + h.dataPoints[key] = dp + } + + dp.sum += value + dp.total++ + dp.counts[sort.SearchFloat64s(h.bounds, value)]++ + dp.lastUpdated = h.epoch +} - for i := range h.counts { - h.counts[i] = 0 +func (h *HistogramMetricBuilder) RecordWithMap(value float64, attrs map[string]any) error { + m := pcommon.NewMap() + err := m.FromRaw(attrs) + if err != nil { + return err } + h.RecordWithAttributes(value, m) + return nil +} + +func (h *HistogramMetricBuilder) Reset(timestamp pcommon.Timestamp) { + h.startTime = timestamp + clear(h.dataPoints) + h.epoch = 0 } func (h *HistogramMetricBuilder) AppendDataPoints(scopeMetrics pmetric.ScopeMetrics, timestamp pcommon.Timestamp) { + export := h.temporality == pmetric.AggregationTemporalityDelta + if !export { + for _, hdp := range h.dataPoints { + if hdp.lastUpdated == h.epoch { + export = true + break + } + } + } + + if !export { + return + } + metric := scopeMetrics.Metrics().AppendEmpty() metric.SetName(h.name) metric.SetDescription(h.description) @@ -89,18 +139,42 @@ func (h *HistogramMetricBuilder) AppendDataPoints(scopeMetrics pmetric.ScopeMetr hist := metric.SetEmptyHistogram() hist.SetAggregationTemporality(h.temporality) - dp := hist.DataPoints().AppendEmpty() - dp.Attributes() - dp.SetStartTimestamp(h.startTime) - dp.SetTimestamp(timestamp) - dp.SetSum(h.sum) - dp.SetCount(h.total) - - dp.BucketCounts().FromRaw(h.counts) - dp.ExplicitBounds().FromRaw(h.bounds) + for _, hdp := range h.dataPoints { + // For cumulative: only export if updated in current epoch + if h.temporality == pmetric.AggregationTemporalityCumulative && hdp.lastUpdated != h.epoch { + continue + } + + dp := hist.DataPoints().AppendEmpty() + hdp.attributes.CopyTo(dp.Attributes()) + dp.SetStartTimestamp(hdp.startTime) + dp.SetTimestamp(timestamp) + dp.SetSum(hdp.sum) + dp.SetCount(hdp.total) + dp.BucketCounts().FromRaw(hdp.counts) + dp.ExplicitBounds().FromRaw(h.bounds) + } if h.temporality == pmetric.AggregationTemporalityDelta { h.Reset(timestamp) + } else { + // For cumulative, increment epoch for next collection cycle + h.epoch++ + } +} + +type counterDataPoint struct { + attributes pcommon.Map + total int64 + startTime pcommon.Timestamp + lastUpdated uint64 // epoch when this data point was last updated +} + +func newCounterDataPoint(attrs pcommon.Map, startTime pcommon.Timestamp, epoch uint64) *counterDataPoint { + return &counterDataPoint{ + attributes: attrs, + startTime: startTime, + lastUpdated: epoch, } } @@ -108,10 +182,11 @@ type CounterMetricBuilder struct { name string description string unit string - total int64 + dataPoints map[[16]byte]*counterDataPoint isMonotonic bool temporality pmetric.AggregationTemporality startTime pcommon.Timestamp + epoch uint64 // current epoch counter } func NewCounterMetricBuilder(name string, description string, unit string, isMonotonic bool, startTime pcommon.Timestamp, temporality pmetric.AggregationTemporality) *CounterMetricBuilder { @@ -124,22 +199,60 @@ func NewCounterMetricBuilder(name string, description string, unit string, isMon name: name, description: description, unit: unit, + dataPoints: make(map[[16]byte]*counterDataPoint), isMonotonic: isMonotonic, temporality: temp, startTime: startTime, + epoch: 0, } } func (c *CounterMetricBuilder) Add(value int64) { - c.total += value + c.AddWithAttributes(value, pcommon.NewMap()) +} + +func (c *CounterMetricBuilder) AddWithAttributes(value int64, attrs pcommon.Map) { + key := pdatautil.MapHash(attrs) + dp, exists := c.dataPoints[key] + if !exists { + dp = newCounterDataPoint(attrs, c.startTime, c.epoch) + c.dataPoints[key] = dp + } + dp.total += value + dp.lastUpdated = c.epoch +} + +func (c *CounterMetricBuilder) AddWithMap(value int64, attrs map[string]any) error { + m := pcommon.NewMap() + err := m.FromRaw(attrs) + if err != nil { + return err + } + c.AddWithAttributes(value, m) + return nil } func (c *CounterMetricBuilder) Reset(timestamp pcommon.Timestamp) { c.startTime = timestamp - c.total = 0 + clear(c.dataPoints) + c.epoch = 0 } func (c *CounterMetricBuilder) AppendDataPoints(scopeMetrics pmetric.ScopeMetrics, timestamp pcommon.Timestamp) { + export := c.temporality == pmetric.AggregationTemporalityDelta + if !export { + for _, cdp := range c.dataPoints { + if cdp.lastUpdated == c.epoch { + export = true + break + } + } + } + + if !export { + return + } + metric := scopeMetrics.Metrics().AppendEmpty() metric.SetName(c.name) metric.SetDescription(c.description) @@ -149,17 +262,28 @@ func (c *CounterMetricBuilder) AppendDataPoints(scopeMetrics pmetric.ScopeMetric sum.SetAggregationTemporality(c.temporality) sum.SetIsMonotonic(c.isMonotonic) - dp := sum.DataPoints().AppendEmpty() - dp.SetStartTimestamp(c.startTime) - dp.SetTimestamp(timestamp) - dp.SetIntValue(c.total) + for _, cdp := range c.dataPoints { + // For cumulative: only export if updated in current epoch + if c.temporality == pmetric.AggregationTemporalityCumulative && cdp.lastUpdated != c.epoch { + continue + } + + dp := sum.DataPoints().AppendEmpty() + cdp.attributes.CopyTo(dp.Attributes()) + dp.SetStartTimestamp(cdp.startTime) + dp.SetTimestamp(timestamp) + dp.SetIntValue(cdp.total) + } if c.temporality == pmetric.AggregationTemporalityDelta { c.Reset(timestamp) + } else { + // For cumulative, increment epoch for next collection cycle + c.epoch++ } } -func NewFasSInvokeDurationMetricBuilder(startTime pcommon.Timestamp, temporality pmetric.AggregationTemporality) *HistogramMetricBuilder { +func NewFaaSInvokeDurationMetricBuilder(startTime pcommon.Timestamp, temporality pmetric.AggregationTemporality) *HistogramMetricBuilder { return NewHistogramMetricBuilder( semconv2.FaaSInvokeDurationName, semconv2.FaaSInvokeDurationDescription, @@ -170,7 +294,7 @@ func NewFasSInvokeDurationMetricBuilder(startTime pcommon.Timestamp, temporality ) } -func NewFasSInitDurationMetricBuilder(startTime pcommon.Timestamp, temporality pmetric.AggregationTemporality) *HistogramMetricBuilder { +func NewFaaSInitDurationMetricBuilder(startTime pcommon.Timestamp, temporality pmetric.AggregationTemporality) *HistogramMetricBuilder { return NewHistogramMetricBuilder( semconv2.FaaSInitDurationName, semconv2.FaaSInitDurationDescription, @@ -248,8 +372,8 @@ type FaaSMetricBuilders struct { func NewFaaSMetricBuilders(startTime pcommon.Timestamp, temporality pmetric.AggregationTemporality) *FaaSMetricBuilders { return &FaaSMetricBuilders{ - invokeDurationMetric: NewFasSInvokeDurationMetricBuilder(startTime, temporality), - initDurationMetric: NewFasSInitDurationMetricBuilder(startTime, temporality), + invokeDurationMetric: NewFaaSInvokeDurationMetricBuilder(startTime, temporality), + initDurationMetric: NewFaaSInitDurationMetricBuilder(startTime, temporality), memUsageMetric: NewFaaSMemUsageMetricBuilder(startTime, temporality), coldstartsMetric: NewFaaSColdstartsMetricBuilder(startTime, temporality), errorsMetric: NewFaaSErrorsMetricBuilder(startTime, temporality), diff --git a/collector/receiver/telemetryapireceiver/metric_builder_attributes_test.go b/collector/receiver/telemetryapireceiver/metric_builder_attributes_test.go new file mode 100644 index 0000000000..26ab238e3c --- /dev/null +++ b/collector/receiver/telemetryapireceiver/metric_builder_attributes_test.go @@ -0,0 +1,618 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package telemetryapireceiver + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +func TestHistogramMetricBuilder_RecordWithAttributes(t *testing.T) { + startTime := pcommon.NewTimestampFromTime(time.Now().Add(-time.Hour)) + + t.Run("single data point with attributes", func(t *testing.T) { + builder := NewHistogramMetricBuilder( + "test.histogram", + "Test histogram with attributes", + "ms", + []float64{1.0, 5.0, 10.0}, + startTime, + pmetric.AggregationTemporalityCumulative, + ) + + attrs := pcommon.NewMap() + attrs.PutStr("service.name", "test-service") + attrs.PutStr("region", "us-east-1") + attrs.PutInt("instance.id", 42) + + builder.RecordWithAttributes(3.5, attrs) + + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + scopeMetrics := rm.ScopeMetrics().AppendEmpty() + timestamp := pcommon.NewTimestampFromTime(time.Now()) + + builder.AppendDataPoints(scopeMetrics, timestamp) + + require.Equal(t, 1, scopeMetrics.Metrics().Len()) + hist := scopeMetrics.Metrics().At(0).Histogram() + require.Equal(t, 1, hist.DataPoints().Len()) + + dp := hist.DataPoints().At(0) + assert.Equal(t, 3, dp.Attributes().Len()) + + val, ok := dp.Attributes().Get("service.name") + require.True(t, ok) + assert.Equal(t, "test-service", val.Str()) + + val, ok = dp.Attributes().Get("region") + require.True(t, ok) + assert.Equal(t, "us-east-1", val.Str()) + + val, ok = dp.Attributes().Get("instance.id") + require.True(t, ok) + assert.Equal(t, int64(42), val.Int()) + }) + + t.Run("multiple data points with different attributes create separate series", func(t *testing.T) { + builder := NewHistogramMetricBuilder( + "test.histogram", + "Test histogram with different attributes", + "ms", + []float64{1.0, 5.0, 10.0}, + startTime, + pmetric.AggregationTemporalityCumulative, + ) + + attrs1 := pcommon.NewMap() + attrs1.PutStr("function", "handler1") + + attrs2 := pcommon.NewMap() + attrs2.PutStr("function", "handler2") + + builder.RecordWithAttributes(2.0, attrs1) + builder.RecordWithAttributes(7.0, attrs2) + builder.RecordWithAttributes(3.0, attrs1) // Same attributes as first, should aggregate + + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + scopeMetrics := rm.ScopeMetrics().AppendEmpty() + timestamp := pcommon.NewTimestampFromTime(time.Now()) + + builder.AppendDataPoints(scopeMetrics, timestamp) + + require.Equal(t, 1, scopeMetrics.Metrics().Len()) + hist := scopeMetrics.Metrics().At(0).Histogram() + require.Equal(t, 2, hist.DataPoints().Len()) + + var handler1Dp, handler2Dp pmetric.HistogramDataPoint + for i := 0; i < hist.DataPoints().Len(); i++ { + dp := hist.DataPoints().At(i) + val, _ := dp.Attributes().Get("function") + if val.Str() == "handler1" { + handler1Dp = dp + } else if val.Str() == "handler2" { + handler2Dp = dp + } + } + + assert.Equal(t, uint64(2), handler1Dp.Count()) + assert.Equal(t, 5.0, handler1Dp.Sum()) + + assert.Equal(t, uint64(1), handler2Dp.Count()) + assert.Equal(t, 7.0, handler2Dp.Sum()) + }) + + t.Run("empty attributes creates separate series from attributed data", func(t *testing.T) { + builder := NewHistogramMetricBuilder( + "test.histogram", + "Test with empty and non-empty attributes", + "ms", + []float64{1.0, 5.0, 10.0}, + startTime, + pmetric.AggregationTemporalityCumulative, + ) + + attrs := pcommon.NewMap() + attrs.PutStr("key", "value") + + builder.Record(1.0) + builder.RecordWithAttributes(2.0, attrs) + + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + scopeMetrics := rm.ScopeMetrics().AppendEmpty() + timestamp := pcommon.NewTimestampFromTime(time.Now()) + + builder.AppendDataPoints(scopeMetrics, timestamp) + + require.Equal(t, 1, scopeMetrics.Metrics().Len()) + hist := scopeMetrics.Metrics().At(0).Histogram() + require.Equal(t, 2, hist.DataPoints().Len()) + }) + + t.Run("same attributes with different values aggregate correctly", func(t *testing.T) { + builder := NewHistogramMetricBuilder( + "test.histogram", + "Test aggregation with same attributes", + "ms", + []float64{1.0, 5.0, 10.0}, + startTime, + pmetric.AggregationTemporalityCumulative, + ) + + for i := 0; i < 5; i++ { + attrs := pcommon.NewMap() + attrs.PutStr("operation", "process") + builder.RecordWithAttributes(float64(i+1), attrs) + } + + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + scopeMetrics := rm.ScopeMetrics().AppendEmpty() + timestamp := pcommon.NewTimestampFromTime(time.Now()) + + builder.AppendDataPoints(scopeMetrics, timestamp) + + hist := scopeMetrics.Metrics().At(0).Histogram() + require.Equal(t, 1, hist.DataPoints().Len()) + + dp := hist.DataPoints().At(0) + assert.Equal(t, uint64(5), dp.Count()) + assert.Equal(t, 15.0, dp.Sum()) // 1+2+3+4+5 = 15 + }) +} + +func TestCounterMetricBuilder_AddWithAttributes(t *testing.T) { + startTime := pcommon.NewTimestampFromTime(time.Now().Add(-time.Hour)) + + t.Run("single data point with attributes", func(t *testing.T) { + builder := NewCounterMetricBuilder( + "test.counter", + "Test counter with attributes", + "{count}", + true, + startTime, + pmetric.AggregationTemporalityCumulative, + ) + + attrs := pcommon.NewMap() + attrs.PutStr("error.type", "timeout") + attrs.PutStr("service.name", "payment-service") + + builder.AddWithAttributes(5, attrs) + + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + scopeMetrics := rm.ScopeMetrics().AppendEmpty() + timestamp := pcommon.NewTimestampFromTime(time.Now()) + + builder.AppendDataPoints(scopeMetrics, timestamp) + + require.Equal(t, 1, scopeMetrics.Metrics().Len()) + sum := scopeMetrics.Metrics().At(0).Sum() + require.Equal(t, 1, sum.DataPoints().Len()) + + dp := sum.DataPoints().At(0) + assert.Equal(t, 2, dp.Attributes().Len()) + assert.Equal(t, int64(5), dp.IntValue()) + + val, ok := dp.Attributes().Get("error.type") + require.True(t, ok) + assert.Equal(t, "timeout", val.Str()) + }) + + t.Run("multiple data points with different attributes create separate series", func(t *testing.T) { + builder := NewCounterMetricBuilder( + "test.counter", + "Test counter with different attributes", + "{count}", + true, + startTime, + pmetric.AggregationTemporalityCumulative, + ) + + attrs1 := pcommon.NewMap() + attrs1.PutStr("status_code", "200") + + attrs2 := pcommon.NewMap() + attrs2.PutStr("status_code", "500") + + attrs3 := pcommon.NewMap() + attrs3.PutStr("status_code", "404") + + builder.AddWithAttributes(100, attrs1) + builder.AddWithAttributes(5, attrs2) + builder.AddWithAttributes(10, attrs3) + builder.AddWithAttributes(50, attrs1) + + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + scopeMetrics := rm.ScopeMetrics().AppendEmpty() + timestamp := pcommon.NewTimestampFromTime(time.Now()) + + builder.AppendDataPoints(scopeMetrics, timestamp) + + sum := scopeMetrics.Metrics().At(0).Sum() + require.Equal(t, 3, sum.DataPoints().Len()) + + valuesByStatus := make(map[string]int64) + for i := 0; i < sum.DataPoints().Len(); i++ { + dp := sum.DataPoints().At(i) + val, _ := dp.Attributes().Get("status_code") + valuesByStatus[val.Str()] = dp.IntValue() + } + + assert.Equal(t, int64(150), valuesByStatus["200"]) + assert.Equal(t, int64(5), valuesByStatus["500"]) + assert.Equal(t, int64(10), valuesByStatus["404"]) + }) + + t.Run("FaaS errors counter with trigger attribute", func(t *testing.T) { + builder := NewFaaSErrorsMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) + + httpTrigger := pcommon.NewMap() + httpTrigger.PutStr("faas.trigger", "http") + + sqsTrigger := pcommon.NewMap() + sqsTrigger.PutStr("faas.trigger", "pubsub") + + builder.AddWithAttributes(3, httpTrigger) + builder.AddWithAttributes(1, sqsTrigger) + builder.AddWithAttributes(2, httpTrigger) + + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + scopeMetrics := rm.ScopeMetrics().AppendEmpty() + timestamp := pcommon.NewTimestampFromTime(time.Now()) + + builder.AppendDataPoints(scopeMetrics, timestamp) + + sum := scopeMetrics.Metrics().At(0).Sum() + require.Equal(t, 2, sum.DataPoints().Len()) + + var httpCount, pubsubCount int64 + for i := 0; i < sum.DataPoints().Len(); i++ { + dp := sum.DataPoints().At(i) + val, _ := dp.Attributes().Get("faas.trigger") + if val.Str() == "http" { + httpCount = dp.IntValue() + } else if val.Str() == "pubsub" { + pubsubCount = dp.IntValue() + } + } + + assert.Equal(t, int64(5), httpCount) + assert.Equal(t, int64(1), pubsubCount) + }) + + t.Run("multiple attributes create unique series", func(t *testing.T) { + builder := NewCounterMetricBuilder( + "test.counter", + "Test with multiple attributes", + "{count}", + true, + startTime, + pmetric.AggregationTemporalityCumulative, + ) + + attrs1 := pcommon.NewMap() + attrs1.PutStr("method", "GET") + attrs1.PutStr("path", "/api/users") + + attrs2 := pcommon.NewMap() + attrs2.PutStr("method", "GET") + attrs2.PutStr("path", "/api/orders") + + attrs3 := pcommon.NewMap() + attrs3.PutStr("method", "POST") + attrs3.PutStr("path", "/api/users") + + builder.AddWithAttributes(10, attrs1) + builder.AddWithAttributes(20, attrs2) + builder.AddWithAttributes(5, attrs3) + + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + scopeMetrics := rm.ScopeMetrics().AppendEmpty() + timestamp := pcommon.NewTimestampFromTime(time.Now()) + + builder.AppendDataPoints(scopeMetrics, timestamp) + + sum := scopeMetrics.Metrics().At(0).Sum() + require.Equal(t, 3, sum.DataPoints().Len()) + }) +} + +func TestMetricBuilder_AttributesWithDeltaTemporality(t *testing.T) { + startTime := pcommon.NewTimestampFromTime(time.Now().Add(-time.Hour)) + + t.Run("histogram delta resets attributed data points", func(t *testing.T) { + builder := NewHistogramMetricBuilder( + "test.histogram", + "Test delta histogram with attributes", + "ms", + []float64{1.0, 5.0, 10.0}, + startTime, + pmetric.AggregationTemporalityDelta, + ) + + attrs := pcommon.NewMap() + attrs.PutStr("operation", "query") + + builder.RecordWithAttributes(2.0, attrs) + builder.RecordWithAttributes(3.0, attrs) + + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + scopeMetrics := rm.ScopeMetrics().AppendEmpty() + + ts1 := pcommon.NewTimestampFromTime(time.Now()) + builder.AppendDataPoints(scopeMetrics, ts1) + + hist1 := scopeMetrics.Metrics().At(0).Histogram() + dp1 := hist1.DataPoints().At(0) + assert.Equal(t, uint64(2), dp1.Count()) + assert.Equal(t, 5.0, dp1.Sum()) + + builder.RecordWithAttributes(7.0, attrs) + + ts2 := pcommon.NewTimestampFromTime(time.Now().Add(time.Second)) + builder.AppendDataPoints(scopeMetrics, ts2) + + hist2 := scopeMetrics.Metrics().At(1).Histogram() + dp2 := hist2.DataPoints().At(0) + assert.Equal(t, uint64(1), dp2.Count()) + assert.Equal(t, 7.0, dp2.Sum()) + }) + + t.Run("counter delta resets attributed data points", func(t *testing.T) { + builder := NewCounterMetricBuilder( + "test.counter", + "Test delta counter with attributes", + "{count}", + true, + startTime, + pmetric.AggregationTemporalityDelta, + ) + + attrs := pcommon.NewMap() + attrs.PutStr("status", "success") + + builder.AddWithAttributes(10, attrs) + builder.AddWithAttributes(5, attrs) + + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + scopeMetrics := rm.ScopeMetrics().AppendEmpty() + + ts1 := pcommon.NewTimestampFromTime(time.Now()) + builder.AppendDataPoints(scopeMetrics, ts1) + + sum1 := scopeMetrics.Metrics().At(0).Sum() + dp1 := sum1.DataPoints().At(0) + assert.Equal(t, int64(15), dp1.IntValue()) + + builder.AddWithAttributes(3, attrs) + + ts2 := pcommon.NewTimestampFromTime(time.Now().Add(time.Second)) + builder.AppendDataPoints(scopeMetrics, ts2) + + sum2 := scopeMetrics.Metrics().At(1).Sum() + dp2 := sum2.DataPoints().At(0) + assert.Equal(t, int64(3), dp2.IntValue()) + }) +} + +func TestMetricBuilder_ResetClearsAttributes(t *testing.T) { + startTime := pcommon.NewTimestampFromTime(time.Now().Add(-time.Hour)) + + t.Run("histogram reset clears all attributed data points", func(t *testing.T) { + builder := NewHistogramMetricBuilder( + "test.histogram", + "Test reset clears attributes", + "ms", + []float64{1.0, 5.0, 10.0}, + startTime, + pmetric.AggregationTemporalityCumulative, + ) + + attrs1 := pcommon.NewMap() + attrs1.PutStr("region", "us-east") + + attrs2 := pcommon.NewMap() + attrs2.PutStr("region", "eu-west") + + builder.RecordWithAttributes(1.0, attrs1) + builder.RecordWithAttributes(2.0, attrs2) + + newStartTime := pcommon.NewTimestampFromTime(time.Now()) + builder.Reset(newStartTime) + + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + scopeMetrics := rm.ScopeMetrics().AppendEmpty() + timestamp := pcommon.NewTimestampFromTime(time.Now().Add(time.Second)) + + builder.AppendDataPoints(scopeMetrics, timestamp) + + assert.Equal(t, 0, scopeMetrics.Metrics().Len()) + }) + + t.Run("counter reset clears all attributed data points", func(t *testing.T) { + builder := NewCounterMetricBuilder( + "test.counter", + "Test reset clears attributes", + "{count}", + true, + startTime, + pmetric.AggregationTemporalityCumulative, + ) + + attrs1 := pcommon.NewMap() + attrs1.PutStr("tier", "free") + + attrs2 := pcommon.NewMap() + attrs2.PutStr("tier", "premium") + + builder.AddWithAttributes(10, attrs1) + builder.AddWithAttributes(100, attrs2) + + newStartTime := pcommon.NewTimestampFromTime(time.Now()) + builder.Reset(newStartTime) + + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + scopeMetrics := rm.ScopeMetrics().AppendEmpty() + timestamp := pcommon.NewTimestampFromTime(time.Now().Add(time.Second)) + + builder.AppendDataPoints(scopeMetrics, timestamp) + + assert.Equal(t, 0, scopeMetrics.Metrics().Len()) + }) +} + +func TestMetricBuilder_AttributeOrdering(t *testing.T) { + startTime := pcommon.NewTimestampFromTime(time.Now().Add(-time.Hour)) + + t.Run("attribute order does not affect aggregation", func(t *testing.T) { + builder := NewCounterMetricBuilder( + "test.counter", + "Test attribute ordering", + "{count}", + true, + startTime, + pmetric.AggregationTemporalityCumulative, + ) + + attrs1 := pcommon.NewMap() + attrs1.PutStr("a", "1") + attrs1.PutStr("b", "2") + attrs1.PutStr("c", "3") + + attrs2 := pcommon.NewMap() + attrs2.PutStr("c", "3") + attrs2.PutStr("a", "1") + attrs2.PutStr("b", "2") + + builder.AddWithAttributes(10, attrs1) + builder.AddWithAttributes(20, attrs2) + + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + scopeMetrics := rm.ScopeMetrics().AppendEmpty() + timestamp := pcommon.NewTimestampFromTime(time.Now()) + + builder.AppendDataPoints(scopeMetrics, timestamp) + + sum := scopeMetrics.Metrics().At(0).Sum() + require.Equal(t, 1, sum.DataPoints().Len()) + assert.Equal(t, int64(30), sum.DataPoints().At(0).IntValue()) + }) +} + +func TestMetricBuilder_CumulativeEpochWithAttributes(t *testing.T) { + startTime := pcommon.NewTimestampFromTime(time.Now().Add(-time.Hour)) + + t.Run("cumulative histogram only exports updated attributes in epoch", func(t *testing.T) { + builder := NewHistogramMetricBuilder( + "test.histogram", + "Test epoch tracking", + "ms", + []float64{1.0, 5.0, 10.0}, + startTime, + pmetric.AggregationTemporalityCumulative, + ) + + attrs1 := pcommon.NewMap() + attrs1.PutStr("key", "value1") + + attrs2 := pcommon.NewMap() + attrs2.PutStr("key", "value2") + + builder.RecordWithAttributes(1.0, attrs1) + builder.RecordWithAttributes(2.0, attrs2) + + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + scopeMetrics := rm.ScopeMetrics().AppendEmpty() + ts1 := pcommon.NewTimestampFromTime(time.Now()) + + builder.AppendDataPoints(scopeMetrics, ts1) + require.Equal(t, 1, scopeMetrics.Metrics().Len()) + require.Equal(t, 2, scopeMetrics.Metrics().At(0).Histogram().DataPoints().Len()) + + builder.RecordWithAttributes(3.0, attrs1) + + ts2 := pcommon.NewTimestampFromTime(time.Now().Add(time.Second)) + builder.AppendDataPoints(scopeMetrics, ts2) + + require.Equal(t, 2, scopeMetrics.Metrics().Len()) + hist2 := scopeMetrics.Metrics().At(1).Histogram() + require.Equal(t, 1, hist2.DataPoints().Len()) + + dp := hist2.DataPoints().At(0) + val, _ := dp.Attributes().Get("key") + assert.Equal(t, "value1", val.Str()) + assert.Equal(t, uint64(2), dp.Count()) + assert.Equal(t, 4.0, dp.Sum()) + }) + + t.Run("cumulative counter only exports updated attributes in epoch", func(t *testing.T) { + builder := NewCounterMetricBuilder( + "test.counter", + "Test epoch tracking", + "{count}", + true, + startTime, + pmetric.AggregationTemporalityCumulative, + ) + + attrs1 := pcommon.NewMap() + attrs1.PutStr("service", "api") + + attrs2 := pcommon.NewMap() + attrs2.PutStr("service", "worker") + + builder.AddWithAttributes(10, attrs1) + builder.AddWithAttributes(20, attrs2) + + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + scopeMetrics := rm.ScopeMetrics().AppendEmpty() + ts1 := pcommon.NewTimestampFromTime(time.Now()) + + builder.AppendDataPoints(scopeMetrics, ts1) + require.Equal(t, 2, scopeMetrics.Metrics().At(0).Sum().DataPoints().Len()) + + builder.AddWithAttributes(5, attrs2) + + ts2 := pcommon.NewTimestampFromTime(time.Now().Add(time.Second)) + builder.AppendDataPoints(scopeMetrics, ts2) + + sum2 := scopeMetrics.Metrics().At(1).Sum() + require.Equal(t, 1, sum2.DataPoints().Len()) + + dp := sum2.DataPoints().At(0) + val, _ := dp.Attributes().Get("service") + assert.Equal(t, "worker", val.Str()) + assert.Equal(t, int64(25), dp.IntValue()) + }) +} diff --git a/collector/receiver/telemetryapireceiver/metric_builder_test.go b/collector/receiver/telemetryapireceiver/metric_builder_test.go index 6ee781c280..8c44512734 100644 --- a/collector/receiver/telemetryapireceiver/metric_builder_test.go +++ b/collector/receiver/telemetryapireceiver/metric_builder_test.go @@ -36,19 +36,19 @@ func TestHistogramMetricBuilder_AppendDataPoint(t *testing.T) { }{ { name: "FaaS invoke duration - small value", - builder: NewFasSInvokeDurationMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative), + builder: NewFaaSInvokeDurationMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative), value: 0.007, expectedBucket: 1, }, { name: "FaaS invoke duration - middle value", - builder: NewFasSInvokeDurationMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative), + builder: NewFaaSInvokeDurationMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative), value: 0.5, expectedBucket: 7, }, { name: "FaaS invoke duration - large value", - builder: NewFasSInvokeDurationMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative), + builder: NewFaaSInvokeDurationMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative), value: 15.0, expectedBucket: 14, }, @@ -201,7 +201,7 @@ func TestFaaSMetricBuilderFactories(t *testing.T) { startTime := pcommon.NewTimestampFromTime(time.Now()) t.Run("NewFasSInvokeDurationMetricBuilder", func(t *testing.T) { - builder := NewFasSInvokeDurationMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) + builder := NewFaaSInvokeDurationMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) assert.Equal(t, semconv2.FaaSInvokeDurationName, builder.name) assert.Equal(t, semconv2.FaaSInvokeDurationDescription, builder.description) assert.Equal(t, semconv2.FaaSInvokeDurationUnit, builder.unit) @@ -211,7 +211,7 @@ func TestFaaSMetricBuilderFactories(t *testing.T) { }) t.Run("NewFasSInitDurationMetricBuilder", func(t *testing.T) { - builder := NewFasSInitDurationMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) + builder := NewFaaSInitDurationMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) assert.Equal(t, semconv2.FaaSInitDurationName, builder.name) assert.Equal(t, semconv2.FaaSInitDurationDescription, builder.description) assert.Equal(t, semconv2.FaaSInitDurationUnit, builder.unit) @@ -395,7 +395,7 @@ func TestHistogramMetricBuilder_CumulativeDataPoints(t *testing.T) { { name: "two data points accumulate correctly", builderFn: func() *HistogramMetricBuilder { - return NewFasSInvokeDurationMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) + return NewFaaSInvokeDurationMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) }, values: []float64{0.1, 0.2}, expectedCount: 2, @@ -404,7 +404,7 @@ func TestHistogramMetricBuilder_CumulativeDataPoints(t *testing.T) { { name: "multiple data points across different buckets", builderFn: func() *HistogramMetricBuilder { - return NewFasSInvokeDurationMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) + return NewFaaSInvokeDurationMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) }, values: []float64{0.001, 0.05, 0.5, 1.0, 5.0}, expectedCount: 5, @@ -413,7 +413,7 @@ func TestHistogramMetricBuilder_CumulativeDataPoints(t *testing.T) { { name: "same bucket receives multiple values", builderFn: func() *HistogramMetricBuilder { - return NewFasSInvokeDurationMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) + return NewFaaSInvokeDurationMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) }, values: []float64{0.3, 0.35, 0.4, 0.45}, expectedCount: 4, @@ -425,7 +425,7 @@ func TestHistogramMetricBuilder_CumulativeDataPoints(t *testing.T) { { name: "zero values accumulate correctly", builderFn: func() *HistogramMetricBuilder { - return NewFasSInvokeDurationMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) + return NewFaaSInvokeDurationMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) }, values: []float64{0.0, 0.0, 0.0, 0.0, 0.0}, expectedCount: 5, @@ -437,7 +437,7 @@ func TestHistogramMetricBuilder_CumulativeDataPoints(t *testing.T) { { name: "large values in overflow bucket", builderFn: func() *HistogramMetricBuilder { - return NewFasSInvokeDurationMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) + return NewFaaSInvokeDurationMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) }, values: []float64{15.0, 20.0, 100.0}, expectedCount: 3, @@ -492,7 +492,7 @@ func TestHistogramMetricBuilder_CumulativeDataPoints(t *testing.T) { } t.Run("start timestamp remains constant across data points", func(t *testing.T) { - builder := NewFasSInvokeDurationMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) + builder := NewFaaSInvokeDurationMetricBuilder(startTime, pmetric.AggregationTemporalityCumulative) metrics := pmetric.NewMetrics() rm := metrics.ResourceMetrics().AppendEmpty() From bed59f89a1cba9923bd13799246a0336d833e140 Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Fri, 19 Dec 2025 13:32:58 -0500 Subject: [PATCH 4/4] merge in platform log changes --- .../receiver/telemetryapireceiver/go.mod | 4 +- .../receiver/telemetryapireceiver/go.sum | 4 + .../receiver/telemetryapireceiver/receiver.go | 181 ++++++++++++++++-- 3 files changed, 170 insertions(+), 19 deletions(-) diff --git a/collector/receiver/telemetryapireceiver/go.mod b/collector/receiver/telemetryapireceiver/go.mod index 7cd1f37cea..822a0dbfa6 100644 --- a/collector/receiver/telemetryapireceiver/go.mod +++ b/collector/receiver/telemetryapireceiver/go.mod @@ -6,6 +6,7 @@ replace github.com/open-telemetry/opentelemetry-lambda/collector => ../../ require ( github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.138.0 github.com/open-telemetry/opentelemetry-lambda/collector v0.98.0 github.com/stretchr/testify v1.11.1 go.opentelemetry.io/collector/component v1.44.0 @@ -21,6 +22,7 @@ require ( ) require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/gobwas/glob v0.2.3 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect ) @@ -43,7 +45,7 @@ require ( github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect - go.opentelemetry.io/collector/confmap/xconfmap v0.138.0 + go.opentelemetry.io/collector/confmap/xconfmap v0.138.0 // indirect go.opentelemetry.io/collector/consumer/consumererror v0.138.0 // indirect go.opentelemetry.io/collector/consumer/xconsumer v0.138.0 // indirect go.opentelemetry.io/collector/featuregate v1.44.0 // indirect diff --git a/collector/receiver/telemetryapireceiver/go.sum b/collector/receiver/telemetryapireceiver/go.sum index ded8961456..9927aa2b7c 100644 --- a/collector/receiver/telemetryapireceiver/go.sum +++ b/collector/receiver/telemetryapireceiver/go.sum @@ -1,3 +1,5 @@ +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -48,6 +50,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee h1:W5t00kpgFdJifH4BDsTlE89Zl93FEloxaWZfGcifgq8= github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.138.0 h1:34HE7sAjlXlzL1HAbDxOBKFdU3tTQcmgFVvjnts67DA= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.138.0/go.mod h1:XzBJKpG3Gi3GMyWF+7NgVl219PaGTl4+RaNo8f8KAZs= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index 25670f81fd..2438a4c8d3 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -33,9 +33,11 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver" semconv "go.opentelemetry.io/collector/semconv/v1.25.0" + semconv2 "go.opentelemetry.io/otel/semconv/v1.24.0" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-lambda/collector/internal/telemetryapi" @@ -44,6 +46,10 @@ import ( const ( initialQueueSize = 5 scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi" + telemetrySuccessStatus = "success" + telemetryFailureStatus = "failure" + telemetryErrorStatus = "error" + telemetryTimeoutStatus = "timeout" platformReportLogFmt = "REPORT RequestId: %s Duration: %.2f ms Billed Duration: %.0f ms Memory Size: %.0f MB Max Memory Used: %.0f MB" platformStartLogFmt = "START RequestId: %s Version: %s" platformRuntimeDoneLogFmt = "END RequestId: %s Version: %s" @@ -63,6 +69,7 @@ type telemetryAPIReceiver struct { logger *zap.Logger queue *queue.Queue // queue is a synchronous queue and is used to put the received log events to be dispatched later nextTraces consumer.Traces + nextMetrics consumer.Metrics nextLogs consumer.Logs lastPlatformStartTime string lastPlatformEndTime string @@ -71,6 +78,8 @@ type telemetryAPIReceiver struct { types []telemetryapi.EventType resource pcommon.Resource faasFunctionVersion string + faasName string + faaSMetricBuilders *FaaSMetricBuilders currentFaasInvocationID string logReport bool } @@ -145,10 +154,31 @@ func (r *telemetryAPIReceiver) httpHandler(w http.ResponseWriter, req *http.Requ case string(telemetryapi.PlatformInitStart): r.logger.Info(fmt.Sprintf("Init start: %s", r.lastPlatformStartTime), zap.Any("event", el)) r.lastPlatformStartTime = el.Time + + if record, ok := el.Record.(map[string]any); ok { + functionName, _ := record["functionName"].(string) + if functionName != "" { + r.faasName = functionName + } + } // Function initialization completed. case string(telemetryapi.PlatformInitRuntimeDone): r.logger.Info(fmt.Sprintf("Init end: %s", r.lastPlatformEndTime), zap.Any("event", el)) r.lastPlatformEndTime = el.Time + + if len(r.lastPlatformStartTime) > 0 && len(r.lastPlatformEndTime) > 0 { + if record, ok := el.Record.(map[string]any); ok { + if td, err := r.createPlatformInitSpan(record, r.lastPlatformStartTime, r.lastPlatformEndTime); err == nil { + err := r.nextTraces.ConsumeTraces(context.Background(), td) + if err == nil { + r.lastPlatformEndTime = "" + r.lastPlatformStartTime = "" + } else { + r.logger.Error("error receiving traces", zap.Error(err)) + } + } + } + } } // TODO: add support for additional events, see https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api.html // A report of function initialization. @@ -170,15 +200,13 @@ func (r *telemetryAPIReceiver) httpHandler(w http.ResponseWriter, req *http.Requ // Lambda dropped log entries. // case "platform.logsDropped": } - if len(r.lastPlatformStartTime) > 0 && len(r.lastPlatformEndTime) > 0 { - if td, err := r.createPlatformInitSpan(r.lastPlatformStartTime, r.lastPlatformEndTime); err == nil { - if r.nextTraces != nil { - err := r.nextTraces.ConsumeTraces(context.Background(), td) - if err == nil { - r.lastPlatformEndTime = "" - r.lastPlatformStartTime = "" - } else { - r.logger.Error("error receiving traces", zap.Error(err)) + // Metrics + if r.nextMetrics != nil { + if metrics, err := r.createMetrics(slice); err == nil { + if metrics.MetricCount() > 0 { + err := r.nextMetrics.ConsumeMetrics(context.Background(), metrics) + if err != nil { + r.logger.Error("error receiving metrics", zap.Error(err)) } } } @@ -209,6 +237,91 @@ func (r *telemetryAPIReceiver) getRecordRequestId(record map[string]interface{}) return "" } +func (r *telemetryAPIReceiver) createMetrics(slice []event) (pmetric.Metrics, error) { + metric := pmetric.NewMetrics() + resourceMetric := metric.ResourceMetrics().AppendEmpty() + r.resource.CopyTo(resourceMetric.Resource()) + scopeMetric := resourceMetric.ScopeMetrics().AppendEmpty() + scopeMetric.Scope().SetName(scopeName) + scopeMetric.SetSchemaUrl(semconv2.SchemaURL) + + for _, el := range slice { + r.logger.Debug(fmt.Sprintf("Event: %s", el.Type), zap.Any("event", el)) + record, ok := el.Record.(map[string]any) + if !ok { + continue + } + ts, err := time.Parse(time.RFC3339, el.Time) + if err != nil { + continue + } + + switch el.Type { + case string(telemetryapi.PlatformInitStart): + r.faaSMetricBuilders.coldstartsMetric.Add(1) + r.faaSMetricBuilders.coldstartsMetric.AppendDataPoints(scopeMetric, pcommon.NewTimestampFromTime(ts)) + case string(telemetryapi.PlatformInitReport): + metrics, ok := record["metrics"].(map[string]any) + if !ok { + continue + } + + status, _ := metrics["status"].(string) + if status == telemetryFailureStatus || status == telemetryErrorStatus { + r.faaSMetricBuilders.errorsMetric.Add(1) + r.faaSMetricBuilders.errorsMetric.AppendDataPoints(scopeMetric, pcommon.NewTimestampFromTime(ts)) + } else if status == telemetryTimeoutStatus { + r.faaSMetricBuilders.timeoutsMetric.Add(1) + r.faaSMetricBuilders.timeoutsMetric.AppendDataPoints(scopeMetric, pcommon.NewTimestampFromTime(ts)) + } + + durationMs, ok := metrics["durationMs"].(float64) + if !ok { + continue + } + + r.faaSMetricBuilders.initDurationMetric.Record(durationMs / 1000.0) + r.faaSMetricBuilders.initDurationMetric.AppendDataPoints(scopeMetric, pcommon.NewTimestampFromTime(ts)) + case string(telemetryapi.PlatformReport): + metrics, ok := record["metrics"].(map[string]any) + if !ok { + continue + } + + maxMemoryUsedMb, ok := metrics["maxMemoryUsedMB"].(float64) + if ok { + r.faaSMetricBuilders.memUsageMetric.Record(maxMemoryUsedMb * 1000000.0) + r.faaSMetricBuilders.memUsageMetric.AppendDataPoints(scopeMetric, pcommon.NewTimestampFromTime(ts)) + } + case string(telemetryapi.PlatformRuntimeDone): + status, _ := record["status"].(string) + + if status == telemetrySuccessStatus { + r.faaSMetricBuilders.invocationsMetric.Add(1) + r.faaSMetricBuilders.invocationsMetric.AppendDataPoints(scopeMetric, pcommon.NewTimestampFromTime(ts)) + } else if status == telemetryFailureStatus || status == telemetryErrorStatus { + r.faaSMetricBuilders.errorsMetric.Add(1) + r.faaSMetricBuilders.errorsMetric.AppendDataPoints(scopeMetric, pcommon.NewTimestampFromTime(ts)) + } else if status == telemetryTimeoutStatus { + r.faaSMetricBuilders.timeoutsMetric.Add(1) + r.faaSMetricBuilders.timeoutsMetric.AppendDataPoints(scopeMetric, pcommon.NewTimestampFromTime(ts)) + } + + metrics, ok := record["metrics"].(map[string]any) + if !ok { + continue + } + + durationMs, ok := metrics["durationMs"].(float64) + if ok { + r.faaSMetricBuilders.invokeDurationMetric.Record(durationMs / 1000.0) + r.faaSMetricBuilders.invokeDurationMetric.AppendDataPoints(scopeMetric, pcommon.NewTimestampFromTime(ts)) + } + } + } + return metric, nil +} + func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { log := plog.NewLogs() resourceLog := log.ResourceLogs().AppendEmpty() @@ -456,11 +569,15 @@ func (r *telemetryAPIReceiver) registerTracesConsumer(next consumer.Traces) { r.nextTraces = next } +func (r *telemetryAPIReceiver) registerMetricsConsumer(next consumer.Metrics) { + r.nextMetrics = next +} + func (r *telemetryAPIReceiver) registerLogsConsumer(next consumer.Logs) { r.nextLogs = next } -func (r *telemetryAPIReceiver) createPlatformInitSpan(start, end string) (ptrace.Traces, error) { +func (r *telemetryAPIReceiver) createPlatformInitSpan(record map[string]any, start, end string) (ptrace.Traces, error) { traceData := ptrace.NewTraces() rs := traceData.ResourceSpans().AppendEmpty() r.resource.CopyTo(rs.Resource()) @@ -470,7 +587,7 @@ func (r *telemetryAPIReceiver) createPlatformInitSpan(start, end string) (ptrace span := ss.Spans().AppendEmpty() span.SetTraceID(newTraceID()) span.SetSpanID(newSpanID()) - span.SetName("platform.initRuntimeDone") + span.SetName(fmt.Sprintf("init %s", r.faasName)) span.SetKind(ptrace.SpanKindInternal) span.Attributes().PutBool(semconv.AttributeFaaSColdstart, true) startTime, err := time.Parse(time.RFC3339, start) @@ -483,9 +600,36 @@ func (r *telemetryAPIReceiver) createPlatformInitSpan(start, end string) (ptrace return ptrace.Traces{}, err } span.SetEndTimestamp(pcommon.NewTimestampFromTime(endTime)) + + status, _ := record["status"].(string) + if status != "" && status != "success" { + span.Status().SetCode(ptrace.StatusCodeError) + errorType, _ := record["errorType"].(string) + if errorType != "" { + span.Attributes().PutStr(semconv.AttributeErrorType, errorType) + } else { + span.Attributes().PutStr(semconv.AttributeErrorType, status) + } + } return traceData, nil } +func getMetricsTemporality(cfg *Config) pmetric.AggregationTemporality { + temporality := strings.ToLower(cfg.MetricsTemporality) + if temporality == "" { + temporality = os.Getenv("OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE") + } + + switch temporality { + case "delta": + return pmetric.AggregationTemporalityDelta + case "cumulative": + return pmetric.AggregationTemporalityCumulative + default: + return pmetric.AggregationTemporalityCumulative + } +} + func newTelemetryAPIReceiver( cfg *Config, set receiver.Settings, @@ -527,13 +671,14 @@ func newTelemetryAPIReceiver( } return &telemetryAPIReceiver{ - logger: set.Logger, - queue: queue.New(initialQueueSize), - extensionID: cfg.extensionID, - port: cfg.Port, - types: subscribedTypes, - resource: r, - logReport: cfg.LogReport, + logger: set.Logger, + queue: queue.New(initialQueueSize), + extensionID: cfg.extensionID, + port: cfg.Port, + types: subscribedTypes, + resource: r, + faaSMetricBuilders: NewFaaSMetricBuilders(pcommon.NewTimestampFromTime(time.Now()), getMetricsTemporality(cfg)), + logReport: cfg.LogReport, }, nil }