From 9f70133c17433db65ae7b1f044f3aa41ac944e48 Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Fri, 19 Dec 2025 22:54:37 -0500 Subject: [PATCH 1/5] feat: add support for running on AWS Lambda managed instance types --- .../decoupleafterbatchconverter/converter.go | 9 ++ .../converter_test.go | 36 +++++ collector/internal/extensionapi/client.go | 6 +- collector/internal/lifecycle/constants.go | 17 +++ collector/internal/lifecycle/manager.go | 56 +++++--- collector/internal/lifecycle/manager_test.go | 10 +- collector/lambdalifecycle/constants.go | 17 +++ collector/lambdalifecycle/go.mod | 7 + collector/lambdalifecycle/go.sum | 9 ++ collector/lambdalifecycle/types.go | 61 ++++++++ collector/lambdalifecycle/types_test.go | 99 +++++++++++++ .../receiver/telemetryapireceiver/receiver.go | 134 +++++++++++------- python/src/otel/otel_sdk/otel-instrument | 25 ++-- python/src/otel/otel_sdk/otel_wrapper.py | 101 ++++++++----- 14 files changed, 455 insertions(+), 132 deletions(-) create mode 100644 collector/internal/lifecycle/constants.go create mode 100644 collector/lambdalifecycle/constants.go create mode 100644 collector/lambdalifecycle/go.sum create mode 100644 collector/lambdalifecycle/types.go create mode 100644 collector/lambdalifecycle/types_test.go diff --git a/collector/internal/confmap/converter/decoupleafterbatchconverter/converter.go b/collector/internal/confmap/converter/decoupleafterbatchconverter/converter.go index 4079ff994e..00736593b9 100644 --- a/collector/internal/confmap/converter/decoupleafterbatchconverter/converter.go +++ b/collector/internal/confmap/converter/decoupleafterbatchconverter/converter.go @@ -24,6 +24,7 @@ import ( "fmt" "strings" + "github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle" "go.opentelemetry.io/collector/confmap" ) @@ -43,6 +44,14 @@ func New() confmap.Converter { } func (c converter) Convert(_ context.Context, conf *confmap.Conf) error { + initType := lambdalifecycle.InitTypeFromEnv(lambdalifecycle.InitTypeEnvVar) + + // Do not append decouple processors for Lambda Managed Instances + // see: https://docs.aws.amazon.com/lambda/latest/dg/lambda-managed-instances-execution-environment.html + if initType == lambdalifecycle.LambdaManagedInstances { + return nil + } + serviceVal := conf.Get(serviceKey) service, ok := serviceVal.(map[string]interface{}) if !ok { diff --git a/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go b/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go index 26ccd2c277..967f3a5945 100644 --- a/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go +++ b/collector/internal/confmap/converter/decoupleafterbatchconverter/converter_test.go @@ -17,6 +17,7 @@ import ( "context" "testing" + "github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle" "go.opentelemetry.io/collector/confmap" "github.com/google/go-cmp/cmp" @@ -151,3 +152,38 @@ func TestConvert(t *testing.T) { }) } } + +func TestConvert_LambdaManagedInstances(t *testing.T) { + t.Setenv(lambdalifecycle.InitTypeEnvVar, lambdalifecycle.LambdaManagedInstances.String()) + + // Config that would normally have decouple appended + input := confmap.NewFromStringMap(map[string]interface{}{ + "service": map[string]interface{}{ + "pipelines": map[string]interface{}{ + "traces": map[string]interface{}{ + "processors": []interface{}{"batch"}, + }, + }, + }, + }) + + // Expected to remain unchanged + expected := confmap.NewFromStringMap(map[string]interface{}{ + "service": map[string]interface{}{ + "pipelines": map[string]interface{}{ + "traces": map[string]interface{}{ + "processors": []interface{}{"batch"}, + }, + }, + }, + }) + + c := New() + err := c.Convert(context.Background(), input) + if err != nil { + t.Errorf("unexpected error converting: %v", err) + } + if diff := cmp.Diff(expected.ToStringMap(), input.ToStringMap()); diff != "" { + t.Errorf("Convert() mismatch: (-want +got):\n%s", diff) + } +} diff --git a/collector/internal/extensionapi/client.go b/collector/internal/extensionapi/client.go index 7210a07efa..8403eb4955 100644 --- a/collector/internal/extensionapi/client.go +++ b/collector/internal/extensionapi/client.go @@ -76,15 +76,17 @@ type Client struct { httpClient *http.Client extensionID string logger *zap.Logger + events []EventType } // NewClient returns a Lambda Extensions API client. -func NewClient(logger *zap.Logger, awsLambdaRuntimeAPI string) *Client { +func NewClient(logger *zap.Logger, awsLambdaRuntimeAPI string, events []EventType) *Client { baseURL := fmt.Sprintf("http://%s/2020-01-01/extension", awsLambdaRuntimeAPI) return &Client{ baseURL: baseURL, httpClient: &http.Client{}, logger: logger.Named("extensionAPI.Client"), + events: events, } } @@ -94,7 +96,7 @@ func (e *Client) Register(ctx context.Context, filename string) (*RegisterRespon url := e.baseURL + action reqBody, err := json.Marshal(map[string]interface{}{ - "events": []EventType{Invoke, Shutdown}, + "events": e.events, }) if err != nil { return nil, err diff --git a/collector/internal/lifecycle/constants.go b/collector/internal/lifecycle/constants.go new file mode 100644 index 0000000000..506c28e7b2 --- /dev/null +++ b/collector/internal/lifecycle/constants.go @@ -0,0 +1,17 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lifecycle + +const RuntimeApiEnvVar = "AWS_LAMBDA_RUNTIME_API" diff --git a/collector/internal/lifecycle/manager.go b/collector/internal/lifecycle/manager.go index 052c45f671..ddd90820bf 100644 --- a/collector/internal/lifecycle/manager.go +++ b/collector/internal/lifecycle/manager.go @@ -17,13 +17,14 @@ package lifecycle import ( "context" "fmt" - "github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle" "os" "os/signal" "path/filepath" "sync" "syscall" + "github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle" + "go.uber.org/multierr" "go.uber.org/zap" @@ -49,6 +50,7 @@ type manager struct { listener *telemetryapi.Listener wg sync.WaitGroup lifecycleListeners []lambdalifecycle.Listener + initType lambdalifecycle.InitType } func NewManager(ctx context.Context, logger *zap.Logger, version string) (context.Context, *manager) { @@ -62,28 +64,40 @@ func NewManager(ctx context.Context, logger *zap.Logger, version string) (contex logger.Info("received signal", zap.String("signal", s.String())) }() - extensionClient := extensionapi.NewClient(logger, os.Getenv("AWS_LAMBDA_RUNTIME_API")) + var extensionEvents []extensionapi.EventType + initType := lambdalifecycle.InitTypeFromEnv(lambdalifecycle.InitTypeEnvVar) + if initType == lambdalifecycle.LambdaManagedInstances { + extensionEvents = []extensionapi.EventType{extensionapi.Shutdown} + } else { + extensionEvents = []extensionapi.EventType{extensionapi.Invoke, extensionapi.Shutdown} + } + + extensionClient := extensionapi.NewClient(logger, os.Getenv(RuntimeApiEnvVar), extensionEvents) res, err := extensionClient.Register(ctx, extensionName) if err != nil { logger.Fatal("Cannot register extension", zap.Error(err)) } - listener := telemetryapi.NewListener(logger) - addr, err := listener.Start() - if err != nil { - logger.Fatal("Cannot start Telemetry API Listener", zap.Error(err)) - } + var listener *telemetryapi.Listener + if initType != lambdalifecycle.LambdaManagedInstances { + listener = telemetryapi.NewListener(logger) + addr, err := listener.Start() + if err != nil { + logger.Fatal("Cannot start Telemetry API Listener", zap.Error(err)) + } - telemetryClient := telemetryapi.NewClient(logger) - _, err = telemetryClient.Subscribe(ctx, []telemetryapi.EventType{telemetryapi.Platform}, res.ExtensionID, addr) - if err != nil { - logger.Fatal("Cannot register Telemetry API client", zap.Error(err)) + telemetryClient := telemetryapi.NewClient(logger) + _, err = telemetryClient.Subscribe(ctx, []telemetryapi.EventType{telemetryapi.Platform}, res.ExtensionID, addr) + if err != nil { + logger.Fatal("Cannot register Telemetry API client", zap.Error(err)) + } } lm := &manager{ logger: logger.Named("lifecycle.manager"), extensionClient: extensionClient, listener: listener, + initType: initType, } factories, _ := lambdacomponents.Components(res.ExtensionID) @@ -134,7 +148,9 @@ func (lm *manager) processEvents(ctx context.Context) error { if res.EventType == extensionapi.Shutdown { lm.logger.Info("Received SHUTDOWN event") lm.notifyEnvironmentShutdown() - lm.listener.Shutdown() + if lm.listener != nil { + lm.listener.Shutdown() + } err = lm.collector.Stop() if err != nil { if _, exitErr := lm.extensionClient.ExitError(ctx, fmt.Sprintf("error stopping collector: %v", err)); exitErr != nil { @@ -142,17 +158,17 @@ func (lm *manager) processEvents(ctx context.Context) error { } } return err - } + } else if lm.listener != nil && res.EventType == extensionapi.Invoke { + lm.notifyFunctionInvoked() - lm.notifyFunctionInvoked() + err = lm.listener.Wait(ctx, res.RequestID) + if err != nil { + lm.logger.Error("problem waiting for platform.runtimeDone event", zap.Error(err), zap.String("requestID", res.RequestID)) + } - err = lm.listener.Wait(ctx, res.RequestID) - if err != nil { - lm.logger.Error("problem waiting for platform.runtimeDone event", zap.Error(err), zap.String("requestID", res.RequestID)) + // Check other components are ready before allowing the freezing of the environment. + lm.notifyFunctionFinished() } - - // Check other components are ready before allowing the freezing of the environment. - lm.notifyFunctionFinished() } } } diff --git a/collector/internal/lifecycle/manager_test.go b/collector/internal/lifecycle/manager_test.go index e121779552..e010276919 100644 --- a/collector/internal/lifecycle/manager_test.go +++ b/collector/internal/lifecycle/manager_test.go @@ -56,11 +56,12 @@ func TestRun(t *testing.T) { u, err := url.Parse(server.URL) require.NoError(t, err) + extensionEventTypes := []extensionapi.EventType{extensionapi.Invoke, extensionapi.Shutdown} // test with an error lm := manager{ collector: &MockCollector{err: fmt.Errorf("test start error")}, logger: logger, - extensionClient: extensionapi.NewClient(logger, ""), + extensionClient: extensionapi.NewClient(logger, "", extensionEventTypes), } require.Error(t, lm.Run(ctx)) // test with no waitgroup @@ -68,7 +69,7 @@ func TestRun(t *testing.T) { collector: &MockCollector{}, logger: logger, listener: telemetryapi.NewListener(logger), - extensionClient: extensionapi.NewClient(logger, u.Host), + extensionClient: extensionapi.NewClient(logger, u.Host, extensionEventTypes), } require.NoError(t, lm.Run(ctx)) // test with waitgroup counter incremented @@ -76,7 +77,7 @@ func TestRun(t *testing.T) { collector: &MockCollector{}, logger: logger, listener: telemetryapi.NewListener(logger), - extensionClient: extensionapi.NewClient(logger, u.Host), + extensionClient: extensionapi.NewClient(logger, u.Host, extensionEventTypes), } lm.wg.Add(1) go func() { @@ -142,7 +143,7 @@ func TestProcessEvents(t *testing.T) { collector: &MockCollector{err: tc.collectorError}, logger: logger, listener: telemetryapi.NewListener(logger), - extensionClient: extensionapi.NewClient(logger, u.Host), + extensionClient: extensionapi.NewClient(logger, u.Host, []extensionapi.EventType{extensionapi.Invoke, extensionapi.Shutdown}), } lm.wg.Add(1) if tc.err != nil { @@ -152,7 +153,6 @@ func TestProcessEvents(t *testing.T) { } else { require.NoError(t, lm.processEvents(ctx)) } - }) } diff --git a/collector/lambdalifecycle/constants.go b/collector/lambdalifecycle/constants.go new file mode 100644 index 0000000000..0854e088ec --- /dev/null +++ b/collector/lambdalifecycle/constants.go @@ -0,0 +1,17 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lambdalifecycle + +const InitTypeEnvVar = "AWS_LAMBDA_INITIALIZATION_TYPE" diff --git a/collector/lambdalifecycle/go.mod b/collector/lambdalifecycle/go.mod index 79aa02e218..00cab696ac 100644 --- a/collector/lambdalifecycle/go.mod +++ b/collector/lambdalifecycle/go.mod @@ -1,3 +1,10 @@ module github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle go 1.24.4 + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/stretchr/testify v1.11.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/collector/lambdalifecycle/go.sum b/collector/lambdalifecycle/go.sum new file mode 100644 index 0000000000..cc8b3f4798 --- /dev/null +++ b/collector/lambdalifecycle/go.sum @@ -0,0 +1,9 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/collector/lambdalifecycle/types.go b/collector/lambdalifecycle/types.go new file mode 100644 index 0000000000..c2602429c4 --- /dev/null +++ b/collector/lambdalifecycle/types.go @@ -0,0 +1,61 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lambdalifecycle + +import "os" + +type InitType int + +const ( + OnDemand InitType = iota + ProvisionedConcurrency + SnapStart + LambdaManagedInstances + Unknown InitType = -1 +) + +func (t InitType) String() string { + switch t { + case OnDemand: + return "on-demand" + case ProvisionedConcurrency: + return "provisioned-concurrency" + case SnapStart: + return "snap-start" + case LambdaManagedInstances: + return "lambda-managed-instances" + default: + return "unknown" + } +} + +func ParseInitType(s string) InitType { + switch s { + case "on-demand": + return OnDemand + case "provisioned-concurrency": + return ProvisionedConcurrency + case "snap-start": + return SnapStart + case "lambda-managed-instances": + return LambdaManagedInstances + default: + return Unknown + } +} + +func InitTypeFromEnv(envVar string) InitType { + return ParseInitType(os.Getenv(envVar)) +} diff --git a/collector/lambdalifecycle/types_test.go b/collector/lambdalifecycle/types_test.go new file mode 100644 index 0000000000..32254635b6 --- /dev/null +++ b/collector/lambdalifecycle/types_test.go @@ -0,0 +1,99 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lambdalifecycle + +import ( + "os" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestInitType_String(t *testing.T) { + tests := []struct { + initType InitType + expected string + }{ + {OnDemand, "on-demand"}, + {ProvisionedConcurrency, "provisioned-concurrency"}, + {SnapStart, "snap-start"}, + {LambdaManagedInstances, "lambda-managed-instance"}, + {Unknown, "unknown"}, + {InitType(99), "unknown"}, + } + + for _, tt := range tests { + t.Run(tt.expected, func(t *testing.T) { + if result := tt.initType.String(); result != tt.expected { + t.Errorf("InitType.String() = %q, expected %q", result, tt.expected) + } + }) + } +} + +func TestParseInitType(t *testing.T) { + tests := []struct { + input string + expected InitType + }{ + {"on-demand", OnDemand}, + {"provisioned-concurrency", ProvisionedConcurrency}, + {"snap-start", SnapStart}, + {"lambda-managed-instances", LambdaManagedInstances}, + {"unknown", Unknown}, + {"", Unknown}, + {"invalid", Unknown}, + } + + for _, tt := range tests { + t.Run(tt.input, func(t *testing.T) { + if result := ParseInitType(tt.input); result != tt.expected { + t.Errorf("ParseInitType(%q) = %v, expected %v", tt.input, result, tt.expected) + } + }) + } +} + +func TestInitTypeFromEnv(t *testing.T) { + const testEnvVar = "TEST_INIT_TYPE" + + tests := []struct { + name string + envVal string + expected InitType + setEnv bool + }{ + {"on-demand", "on-demand", OnDemand, true}, + {"provisioned-concurrency", "provisioned-concurrency", ProvisionedConcurrency, true}, + {"snap-start", "snap-start", SnapStart, true}, + {"lambda-managed-instances", "lambda-managed-instances", LambdaManagedInstances, true}, + {"unset env var", "", Unknown, false}, + {"empty env var", "", Unknown, true}, + {"invalid value", "foo", Unknown, true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.NoError(t, os.Unsetenv(testEnvVar)) + if tt.setEnv { + require.NoError(t, os.Setenv(testEnvVar, tt.envVal)) + } + + if result := InitTypeFromEnv(testEnvVar); result != tt.expected { + t.Errorf("InitTypeFromEnv() = %v, expected %v", result, tt.expected) + } + }) + } +} diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index 25670f81fd..781ee1127b 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -29,6 +29,8 @@ import ( "time" "github.com/golang-collections/go-datastructures/queue" + "github.com/open-telemetry/opentelemetry-lambda/collector/internal/telemetryapi" + "github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" @@ -37,14 +39,12 @@ import ( "go.opentelemetry.io/collector/receiver" semconv "go.opentelemetry.io/collector/semconv/v1.25.0" "go.uber.org/zap" - - "github.com/open-telemetry/opentelemetry-lambda/collector/internal/telemetryapi" ) const ( initialQueueSize = 5 scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi" - platformReportLogFmt = "REPORT RequestId: %s Duration: %.2f ms Billed Duration: %.0f ms Memory Size: %.0f MB Max Memory Used: %.0f MB" + platformReportLogFmt = "REPORT RequestId: %s" platformStartLogFmt = "START RequestId: %s Version: %s" platformRuntimeDoneLogFmt = "END RequestId: %s Version: %s" platformInitStartLogFmt = "INIT_START Runtime Version: %s Runtime Version ARN: %s" @@ -72,6 +72,7 @@ type telemetryAPIReceiver struct { resource pcommon.Resource faasFunctionVersion string currentFaasInvocationID string + lambdaInitType lambdalifecycle.InitType logReport bool } @@ -143,12 +144,16 @@ func (r *telemetryAPIReceiver) httpHandler(w http.ResponseWriter, req *http.Requ switch el.Type { // Function initialization started. case string(telemetryapi.PlatformInitStart): - r.logger.Info(fmt.Sprintf("Init start: %s", r.lastPlatformStartTime), zap.Any("event", el)) - r.lastPlatformStartTime = el.Time + if el.Time != "" { + r.lastPlatformStartTime = el.Time + r.logger.Info(fmt.Sprintf("Init start: %s", r.lastPlatformStartTime), zap.Any("event", el)) + } // Function initialization completed. - case string(telemetryapi.PlatformInitRuntimeDone): - r.logger.Info(fmt.Sprintf("Init end: %s", r.lastPlatformEndTime), zap.Any("event", el)) - r.lastPlatformEndTime = el.Time + case string(telemetryapi.PlatformInitRuntimeDone), string(telemetryapi.PlatformInitReport): + if r.lastPlatformStartTime != "" && el.Time != "" { + r.lastPlatformEndTime = el.Time + r.logger.Info(fmt.Sprintf("Init end: %s", r.lastPlatformEndTime), zap.Any("event", el)) + } } // TODO: add support for additional events, see https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api.html // A report of function initialization. @@ -170,6 +175,7 @@ func (r *telemetryAPIReceiver) httpHandler(w http.ResponseWriter, req *http.Requ // Lambda dropped log entries. // case "platform.logsDropped": } + if len(r.lastPlatformStartTime) > 0 && len(r.lastPlatformEndTime) > 0 { if td, err := r.createPlatformInitSpan(r.lastPlatformStartTime, r.lastPlatformEndTime); err == nil { if r.nextTraces != nil { @@ -201,14 +207,28 @@ func (r *telemetryAPIReceiver) httpHandler(w http.ResponseWriter, req *http.Requ } func (r *telemetryAPIReceiver) getRecordRequestId(record map[string]interface{}) string { - if requestId, ok := record["requestId"].(string); ok { - return requestId - } else if r.currentFaasInvocationID != "" { + if record != nil { + if requestId, ok := record["requestId"].(string); ok { + return requestId + } + } + + return "" +} + +func (r *telemetryAPIReceiver) getCurrentRequestId() string { + if r.lambdaInitType != lambdalifecycle.LambdaManagedInstances { return r.currentFaasInvocationID } return "" } +func (r *telemetryAPIReceiver) updateCurrentRequestId(requestId string) { + if r.lambdaInitType != lambdalifecycle.LambdaManagedInstances { + r.currentFaasInvocationID = requestId + } +} + func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { log := plog.NewLogs() resourceLog := log.ResourceLogs().AppendEmpty() @@ -231,14 +251,15 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { } if record, ok := el.Record.(map[string]interface{}); ok { requestId := r.getRecordRequestId(record) - if requestId != "" { - logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId) - // If this is the first event in the invocation with a request id (i.e. the "platform.start" event), - // set the current invocation id to this request id. - if el.Type == string(telemetryapi.PlatformStart) { - r.currentFaasInvocationID = requestId - } + // If this is the first event in the invocation with a request id (i.e. the "platform.start" event), + // set the current invocation id to this request id. + if requestId != "" && el.Type == string(telemetryapi.PlatformReport) { + r.updateCurrentRequestId(requestId) + } + + if requestId == "" { + requestId = r.getCurrentRequestId() } // in JSON format https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function @@ -261,6 +282,10 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { if functionVersion != "" { r.faasFunctionVersion = functionVersion } + } else if el.Type == string(telemetryapi.PlatformStart) { + if version, _ := record["version"].(string); version != "" { + r.faasFunctionVersion = version + } } message := createPlatformMessage(requestId, r.faasFunctionVersion, el.Type, record) @@ -271,8 +296,13 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { logRecord.Body().SetStr(line) } } else { - if r.currentFaasInvocationID != "" { - logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, r.currentFaasInvocationID) + requestId := r.getRecordRequestId(nil) + if requestId == "" { + requestId = r.getCurrentRequestId() + } + + if requestId != "" { + logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId) } // in plain text https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function if line, ok := el.Record.(string); ok { @@ -280,7 +310,7 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { } } if el.Type == string(telemetryapi.PlatformRuntimeDone) { - r.currentFaasInvocationID = "" + r.updateCurrentRequestId("") } } return log, nil @@ -373,42 +403,33 @@ func createPlatformMessage(requestId string, functionVersion string, eventType s } func createPlatformReportMessage(requestId string, record map[string]interface{}) string { - // gathering metrics - metrics, ok := record["metrics"].(map[string]interface{}) - if !ok { + if requestId == "" { return "" } - var durationMs, billedDurationMs, memorySizeMB, maxMemoryUsedMB float64 - if durationMs, ok = metrics[string(telemetryapi.MetricDurationMs)].(float64); !ok { - return "" + + message := fmt.Sprintf(platformReportLogFmt, requestId) + metrics, ok := record["metrics"].(map[string]interface{}) + if !ok { + return message } - if billedDurationMs, ok = metrics[string(telemetryapi.MetricBilledDurationMs)].(float64); !ok { - return "" + + if durationMs, ok := metrics["durationMs"].(float64); ok { + message += fmt.Sprintf(" Duration: %.2f ms", durationMs) } - if memorySizeMB, ok = metrics[string(telemetryapi.MetricMemorySizeMB)].(float64); !ok { - return "" + + if billedDurationMs, ok := metrics["billedDurationMs"].(float64); ok { + message += fmt.Sprintf(" Billed Duration: %.0f ms", billedDurationMs) } - if maxMemoryUsedMB, ok = metrics[string(telemetryapi.MetricMaxMemoryUsedMB)].(float64); !ok { - return "" + + if memorySizeMB, ok := metrics["memorySizeMB"].(float64); ok { + message += fmt.Sprintf(" Memory Size: %.0f MB", memorySizeMB) } - // optionally gather information about cold start time - var initDurationMs float64 - if initDurationMsVal, exists := metrics[string(telemetryapi.MetricInitDurationMs)]; exists { - if val, ok := initDurationMsVal.(float64); ok { - initDurationMs = val - } + if memoryUsedMB, ok := metrics["memoryUsedMB"].(float64); ok { + message += fmt.Sprintf(" Memory Used: %.0f MB", memoryUsedMB) } - message := fmt.Sprintf( - platformReportLogFmt, - requestId, - durationMs, - billedDurationMs, - memorySizeMB, - maxMemoryUsedMB, - ) - if initDurationMs > 0 { + if initDurationMs, ok := metrics["initDurationMs"].(float64); ok { message += fmt.Sprintf(" Init Duration: %.2f ms", initDurationMs) } @@ -526,14 +547,17 @@ func newTelemetryAPIReceiver( } } + lambdaInitType := lambdalifecycle.InitTypeFromEnv(lambdalifecycle.InitTypeEnvVar) + return &telemetryAPIReceiver{ - logger: set.Logger, - queue: queue.New(initialQueueSize), - extensionID: cfg.extensionID, - port: cfg.Port, - types: subscribedTypes, - resource: r, - logReport: cfg.LogReport, + logger: set.Logger, + queue: queue.New(initialQueueSize), + extensionID: cfg.extensionID, + port: cfg.Port, + types: subscribedTypes, + resource: r, + lambdaInitType: lambdaInitType, + logReport: cfg.LogReport, }, nil } diff --git a/python/src/otel/otel_sdk/otel-instrument b/python/src/otel/otel_sdk/otel-instrument index 9d89c96b62..85c7206404 100755 --- a/python/src/otel/otel_sdk/otel-instrument +++ b/python/src/otel/otel_sdk/otel-instrument @@ -146,21 +146,18 @@ else export OTEL_RESOURCE_ATTRIBUTES="$LAMBDA_RESOURCE_ATTRIBUTES,$OTEL_RESOURCE_ATTRIBUTES"; fi - -# - Uses the default `OTEL_PROPAGATORS` which is set to `tracecontext,baggage` - -# - Use a wrapper because AWS Lambda's `python3 /var/runtime/bootstrap.py` will -# use `imp.load_module` to load the function from the `_HANDLER` environment -# variable. This RELOADS the module and REMOVES any instrumentation patching -# done earlier. So we delay instrumentation until `bootstrap.py` imports -# `otel_wrapper.py` at which we know the patching will be picked up. +# Redirect Lambda to load the `otel_wrapper.py` wrapper script instead of the +# user's handler. The wrapper initializes OpenTelemetry instrumentation at module +# load time and then delegates to the original handler. # -# See more: -# https://docs.python.org/3/library/imp.html#imp.load_module - +# _HANDLER is a reserved environment variable used by AWS Lambda containing the +# application handler path (e.g., "mymodule.handler"). We save it to ORIG_HANDLER +# to allow the `otel_wrapper.py` wrapper script to retrieve it later so it +# can delegate to the original handler. +# see: https://docs.aws.amazon.com/lambda/latest/dg/configuration-envvars.html#configuration-envvars-runtime export ORIG_HANDLER=$_HANDLER; export _HANDLER="otel_wrapper.lambda_handler"; -# - Call the upstream auto instrumentation script - -exec python3 $LAMBDA_LAYER_PKGS_DIR/bin/opentelemetry-instrument "$@" +# Handoff to the Lambda bootstrap process with _HANDLER set to +# `otel_wrapper.lambda_handler` +exec "$@" diff --git a/python/src/otel/otel_sdk/otel_wrapper.py b/python/src/otel/otel_sdk/otel_wrapper.py index 295410406c..764e8b32cd 100644 --- a/python/src/otel/otel_sdk/otel_wrapper.py +++ b/python/src/otel/otel_sdk/otel_wrapper.py @@ -13,54 +13,83 @@ # limitations under the License. """ -`otel_wrapper.py` - -This file serves as a wrapper over the user's Lambda function. - -Usage ------ -Patch the reserved `_HANDLER` Lambda environment variable to point to this -file's `otel_wrapper.lambda_handler` property. Do this having saved the original -`_HANDLER` in the `ORIG_HANDLER` environment variable. Doing this makes it so -that **on import of this file, the handler is instrumented**. - -Instrumenting any earlier will cause the instrumentation to be lost because the -AWS Service uses `imp.load_module` to import the handler which RELOADS the -module. This is why AwsLambdaInstrumentor cannot be instrumented with the -`opentelemetry-instrument` script. - -See more: -https://docs.python.org/3/library/imp.html#imp.load_module - +OpenTelemetry Lambda Handler Wrapper + +This module wraps the user's Lambda function to enable automatic OpenTelemetry +instrumentation. It acts as wrapper script that instruments the Lambda function +before loading the module containing the user's handler. + +The instrumentation process works as follows: +------------ +1. The `otel-instrument` shell script sets _HANDLER to point to this file's + `lambda_handler`, saving the original handler path to ORIG_HANDLER. + +2. When AWS Lambda imports this module, `auto_instrumentation.initialize()` runs + immediately, instrumenting the application before any user code executes. + +3. The module containing the user's handler is loaded by this script and the + `lambda_handler` variable is bound to the user's original handler function, + allowing Lambda invocations to be transparently forwarded to the original handler. + +Details on why the `opentelemetry-instrument` CLI wrapper is insufficient: +------------------------------------------------ +The `opentelemetry-instrument` CLI wrapper only instruments the initial Python process. +AWS Lambda may spawn fresh Python processes for new invocations (e.g. as is the case with +lambda managed instances), which would bypass CLI based instrumentation. By +calling `auto_instrumentation.initialize()` at module import time, we ensure every +Lambda execution context is instrumented. + +Environment Variables +--------------------- +ORIG_HANDLER : str + The original Lambda handler path (e.g., "mymodule.handler"). Set by + `otel-instrument` before this module is loaded. """ + import os from importlib import import_module -from opentelemetry.instrumentation.aws_lambda import AwsLambdaInstrumentor +from opentelemetry.instrumentation import auto_instrumentation + +# Initialize OpenTelemetry instrumentation immediately on module import. +# This must happen before the user's handler module is loaded (below) to ensure +# all library patches are applied before any user code runs. +auto_instrumentation.initialize() -def modify_module_name(module_name): - """Returns a valid modified module to get imported""" - return ".".join(module_name.split("/")) +def _get_orig_handler(): + """ + Resolve and return the user's original Lambda handler function. + Reads the handler path from the ORIG_HANDLER environment variable, + dynamically imports the handler's module and returns the handler + function. + """ -class HandlerError(Exception): - pass + handler_path = os.environ.get("ORIG_HANDLER") + if handler_path is None: + raise RuntimeError( + "ORIG_HANDLER is not defined." + ) -AwsLambdaInstrumentor().instrument() + # Split "module/path.handler_name" into module path and function name. + # The handler path uses the last "." as the separator between module and function. + try: + module_path, handler_name = handler_path.rsplit(".", 1) + except ValueError as e: + raise RuntimeError( + f"Invalid ORIG_HANDLER format '{handler_path}': expected " + f"'module.handler_name' or 'path/to/module.handler_name'. Error: {e}" + ) -path = os.environ.get("ORIG_HANDLER") + # Convert path separators to Python module notation + module_name = ".".join(module_path.split("/")) -if path is None: - raise HandlerError("ORIG_HANDLER is not defined.") + handler_module = import_module(module_name) + return getattr(handler_module, handler_name) -try: - (mod_name, handler_name) = path.rsplit(".", 1) -except ValueError as e: - raise HandlerError("Bad path '{}' for ORIG_HANDLER: {}".format(path, str(e))) -modified_mod_name = modify_module_name(mod_name) -handler_module = import_module(modified_mod_name) -lambda_handler = getattr(handler_module, handler_name) +# Resolve to the user's handler at module load time. +lambda_handler = _get_orig_handler() From 30a6af27b2bb2e9f9326e827c498c9b7ae6d0e51 Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Fri, 19 Dec 2025 23:32:46 -0500 Subject: [PATCH 2/5] fix go tidy error --- collector/lambdalifecycle/go.mod | 3 ++- collector/lambdalifecycle/go.sum | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/collector/lambdalifecycle/go.mod b/collector/lambdalifecycle/go.mod index 00cab696ac..27f0704919 100644 --- a/collector/lambdalifecycle/go.mod +++ b/collector/lambdalifecycle/go.mod @@ -2,9 +2,10 @@ module github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle go 1.24.4 +require github.com/stretchr/testify v1.11.1 + require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/stretchr/testify v1.11.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/collector/lambdalifecycle/go.sum b/collector/lambdalifecycle/go.sum index cc8b3f4798..c4c1710c47 100644 --- a/collector/lambdalifecycle/go.sum +++ b/collector/lambdalifecycle/go.sum @@ -4,6 +4,7 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= From 465886ece15db5a820ed8025b0fbd64ada2460dd Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Fri, 19 Dec 2025 23:40:36 -0500 Subject: [PATCH 3/5] update lambdalifecycle path for telemetryapireceiver --- collector/receiver/telemetryapireceiver/go.mod | 3 +++ 1 file changed, 3 insertions(+) diff --git a/collector/receiver/telemetryapireceiver/go.mod b/collector/receiver/telemetryapireceiver/go.mod index 7cd1f37cea..27ec2a37cd 100644 --- a/collector/receiver/telemetryapireceiver/go.mod +++ b/collector/receiver/telemetryapireceiver/go.mod @@ -4,9 +4,12 @@ go 1.24.4 replace github.com/open-telemetry/opentelemetry-lambda/collector => ../../ +replace github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle => ../../lambdalifecycle + require ( github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259 github.com/open-telemetry/opentelemetry-lambda/collector v0.98.0 + github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle v0.0.0-00010101000000-000000000000 github.com/stretchr/testify v1.11.1 go.opentelemetry.io/collector/component v1.44.0 go.opentelemetry.io/collector/component/componenttest v0.138.0 From f57cdfbaf3a4ff74543feedf09935ebf68210355 Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Fri, 19 Dec 2025 23:43:31 -0500 Subject: [PATCH 4/5] fix failing test --- collector/lambdalifecycle/types_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/collector/lambdalifecycle/types_test.go b/collector/lambdalifecycle/types_test.go index 32254635b6..d935a6428e 100644 --- a/collector/lambdalifecycle/types_test.go +++ b/collector/lambdalifecycle/types_test.go @@ -29,7 +29,7 @@ func TestInitType_String(t *testing.T) { {OnDemand, "on-demand"}, {ProvisionedConcurrency, "provisioned-concurrency"}, {SnapStart, "snap-start"}, - {LambdaManagedInstances, "lambda-managed-instance"}, + {LambdaManagedInstances, "lambda-managed-instances"}, {Unknown, "unknown"}, {InitType(99), "unknown"}, } From 88e25023756dc63bba27a2f3886689a2427c68e6 Mon Sep 17 00:00:00 2001 From: Lukas Hering Date: Sat, 20 Dec 2025 00:07:42 -0500 Subject: [PATCH 5/5] fix missing request id --- collector/receiver/telemetryapireceiver/receiver.go | 10 +++++++--- .../receiver/telemetryapireceiver/receiver_test.go | 2 +- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index 781ee1127b..ec83755cd0 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -254,7 +254,7 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { // If this is the first event in the invocation with a request id (i.e. the "platform.start" event), // set the current invocation id to this request id. - if requestId != "" && el.Type == string(telemetryapi.PlatformReport) { + if requestId != "" && el.Type == string(telemetryapi.PlatformStart) { r.updateCurrentRequestId(requestId) } @@ -262,6 +262,10 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { requestId = r.getCurrentRequestId() } + if requestId != "" { + logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId) + } + // in JSON format https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function if timestamp, ok := record["timestamp"].(string); ok { if t, err := time.Parse(time.RFC3339, timestamp); err == nil { @@ -425,8 +429,8 @@ func createPlatformReportMessage(requestId string, record map[string]interface{} message += fmt.Sprintf(" Memory Size: %.0f MB", memorySizeMB) } - if memoryUsedMB, ok := metrics["memoryUsedMB"].(float64); ok { - message += fmt.Sprintf(" Memory Used: %.0f MB", memoryUsedMB) + if maxMemoryUsedMB, ok := metrics["maxMemoryUsedMB"].(float64); ok { + message += fmt.Sprintf(" Max Memory Used: %.0f MB", maxMemoryUsedMB) } if initDurationMs, ok := metrics["initDurationMs"].(float64); ok { diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index b3beafd2bd..0ed684764d 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -969,7 +969,7 @@ func TestCreatePlatformMessage(t *testing.T) { functionVersion: "$LATEST", eventType: "platform.report", record: map[string]interface{}{}, - expected: "", + expected: "REPORT RequestId: test-request-id", }, { desc: "platform.initStart with runtimeVersion and runtimeVersionArn",