Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"strings"

"github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle"
"go.opentelemetry.io/collector/confmap"
)

Expand All @@ -43,6 +44,14 @@ func New() confmap.Converter {
}

func (c converter) Convert(_ context.Context, conf *confmap.Conf) error {
initType := lambdalifecycle.InitTypeFromEnv(lambdalifecycle.InitTypeEnvVar)

// Do not append decouple processors for Lambda Managed Instances
// see: https://docs.aws.amazon.com/lambda/latest/dg/lambda-managed-instances-execution-environment.html
if initType == lambdalifecycle.LambdaManagedInstances {
return nil
}

serviceVal := conf.Get(serviceKey)
service, ok := serviceVal.(map[string]interface{})
if !ok {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"testing"

"github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle"
"go.opentelemetry.io/collector/confmap"

"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -151,3 +152,38 @@ func TestConvert(t *testing.T) {
})
}
}

func TestConvert_LambdaManagedInstances(t *testing.T) {
t.Setenv(lambdalifecycle.InitTypeEnvVar, lambdalifecycle.LambdaManagedInstances.String())

// Config that would normally have decouple appended
input := confmap.NewFromStringMap(map[string]interface{}{
"service": map[string]interface{}{
"pipelines": map[string]interface{}{
"traces": map[string]interface{}{
"processors": []interface{}{"batch"},
},
},
},
})

// Expected to remain unchanged
expected := confmap.NewFromStringMap(map[string]interface{}{
"service": map[string]interface{}{
"pipelines": map[string]interface{}{
"traces": map[string]interface{}{
"processors": []interface{}{"batch"},
},
},
},
})

c := New()
err := c.Convert(context.Background(), input)
if err != nil {
t.Errorf("unexpected error converting: %v", err)
}
if diff := cmp.Diff(expected.ToStringMap(), input.ToStringMap()); diff != "" {
t.Errorf("Convert() mismatch: (-want +got):\n%s", diff)
}
}
6 changes: 4 additions & 2 deletions collector/internal/extensionapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,17 @@ type Client struct {
httpClient *http.Client
extensionID string
logger *zap.Logger
events []EventType
}

// NewClient returns a Lambda Extensions API client.
func NewClient(logger *zap.Logger, awsLambdaRuntimeAPI string) *Client {
func NewClient(logger *zap.Logger, awsLambdaRuntimeAPI string, events []EventType) *Client {
baseURL := fmt.Sprintf("http://%s/2020-01-01/extension", awsLambdaRuntimeAPI)
return &Client{
baseURL: baseURL,
httpClient: &http.Client{},
logger: logger.Named("extensionAPI.Client"),
events: events,
}
}

Expand All @@ -94,7 +96,7 @@ func (e *Client) Register(ctx context.Context, filename string) (*RegisterRespon
url := e.baseURL + action

reqBody, err := json.Marshal(map[string]interface{}{
"events": []EventType{Invoke, Shutdown},
"events": e.events,
})
if err != nil {
return nil, err
Expand Down
17 changes: 17 additions & 0 deletions collector/internal/lifecycle/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lifecycle

const RuntimeApiEnvVar = "AWS_LAMBDA_RUNTIME_API"
56 changes: 36 additions & 20 deletions collector/internal/lifecycle/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ package lifecycle
import (
"context"
"fmt"
"github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle"
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"

"github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle"

"go.uber.org/multierr"
"go.uber.org/zap"

Expand All @@ -49,6 +50,7 @@ type manager struct {
listener *telemetryapi.Listener
wg sync.WaitGroup
lifecycleListeners []lambdalifecycle.Listener
initType lambdalifecycle.InitType
}

func NewManager(ctx context.Context, logger *zap.Logger, version string) (context.Context, *manager) {
Expand All @@ -62,28 +64,40 @@ func NewManager(ctx context.Context, logger *zap.Logger, version string) (contex
logger.Info("received signal", zap.String("signal", s.String()))
}()

extensionClient := extensionapi.NewClient(logger, os.Getenv("AWS_LAMBDA_RUNTIME_API"))
var extensionEvents []extensionapi.EventType
initType := lambdalifecycle.InitTypeFromEnv(lambdalifecycle.InitTypeEnvVar)
if initType == lambdalifecycle.LambdaManagedInstances {
extensionEvents = []extensionapi.EventType{extensionapi.Shutdown}
} else {
extensionEvents = []extensionapi.EventType{extensionapi.Invoke, extensionapi.Shutdown}
}

extensionClient := extensionapi.NewClient(logger, os.Getenv(RuntimeApiEnvVar), extensionEvents)
res, err := extensionClient.Register(ctx, extensionName)
if err != nil {
logger.Fatal("Cannot register extension", zap.Error(err))
}

listener := telemetryapi.NewListener(logger)
addr, err := listener.Start()
if err != nil {
logger.Fatal("Cannot start Telemetry API Listener", zap.Error(err))
}
var listener *telemetryapi.Listener
if initType != lambdalifecycle.LambdaManagedInstances {
listener = telemetryapi.NewListener(logger)
addr, err := listener.Start()
if err != nil {
logger.Fatal("Cannot start Telemetry API Listener", zap.Error(err))
}

telemetryClient := telemetryapi.NewClient(logger)
_, err = telemetryClient.Subscribe(ctx, []telemetryapi.EventType{telemetryapi.Platform}, res.ExtensionID, addr)
if err != nil {
logger.Fatal("Cannot register Telemetry API client", zap.Error(err))
telemetryClient := telemetryapi.NewClient(logger)
_, err = telemetryClient.Subscribe(ctx, []telemetryapi.EventType{telemetryapi.Platform}, res.ExtensionID, addr)
if err != nil {
logger.Fatal("Cannot register Telemetry API client", zap.Error(err))
}
}

lm := &manager{
logger: logger.Named("lifecycle.manager"),
extensionClient: extensionClient,
listener: listener,
initType: initType,
}

factories, _ := lambdacomponents.Components(res.ExtensionID)
Expand Down Expand Up @@ -134,25 +148,27 @@ func (lm *manager) processEvents(ctx context.Context) error {
if res.EventType == extensionapi.Shutdown {
lm.logger.Info("Received SHUTDOWN event")
lm.notifyEnvironmentShutdown()
lm.listener.Shutdown()
if lm.listener != nil {
lm.listener.Shutdown()
}
err = lm.collector.Stop()
if err != nil {
if _, exitErr := lm.extensionClient.ExitError(ctx, fmt.Sprintf("error stopping collector: %v", err)); exitErr != nil {
return multierr.Combine(err, exitErr)
}
}
return err
}
} else if lm.listener != nil && res.EventType == extensionapi.Invoke {
lm.notifyFunctionInvoked()

lm.notifyFunctionInvoked()
err = lm.listener.Wait(ctx, res.RequestID)
if err != nil {
lm.logger.Error("problem waiting for platform.runtimeDone event", zap.Error(err), zap.String("requestID", res.RequestID))
}

err = lm.listener.Wait(ctx, res.RequestID)
if err != nil {
lm.logger.Error("problem waiting for platform.runtimeDone event", zap.Error(err), zap.String("requestID", res.RequestID))
// Check other components are ready before allowing the freezing of the environment.
lm.notifyFunctionFinished()
}

// Check other components are ready before allowing the freezing of the environment.
lm.notifyFunctionFinished()
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions collector/internal/lifecycle/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,27 +56,28 @@ func TestRun(t *testing.T) {
u, err := url.Parse(server.URL)
require.NoError(t, err)

extensionEventTypes := []extensionapi.EventType{extensionapi.Invoke, extensionapi.Shutdown}
// test with an error
lm := manager{
collector: &MockCollector{err: fmt.Errorf("test start error")},
logger: logger,
extensionClient: extensionapi.NewClient(logger, ""),
extensionClient: extensionapi.NewClient(logger, "", extensionEventTypes),
}
require.Error(t, lm.Run(ctx))
// test with no waitgroup
lm = manager{
collector: &MockCollector{},
logger: logger,
listener: telemetryapi.NewListener(logger),
extensionClient: extensionapi.NewClient(logger, u.Host),
extensionClient: extensionapi.NewClient(logger, u.Host, extensionEventTypes),
}
require.NoError(t, lm.Run(ctx))
// test with waitgroup counter incremented
lm = manager{
collector: &MockCollector{},
logger: logger,
listener: telemetryapi.NewListener(logger),
extensionClient: extensionapi.NewClient(logger, u.Host),
extensionClient: extensionapi.NewClient(logger, u.Host, extensionEventTypes),
}
lm.wg.Add(1)
go func() {
Expand Down Expand Up @@ -142,7 +143,7 @@ func TestProcessEvents(t *testing.T) {
collector: &MockCollector{err: tc.collectorError},
logger: logger,
listener: telemetryapi.NewListener(logger),
extensionClient: extensionapi.NewClient(logger, u.Host),
extensionClient: extensionapi.NewClient(logger, u.Host, []extensionapi.EventType{extensionapi.Invoke, extensionapi.Shutdown}),
}
lm.wg.Add(1)
if tc.err != nil {
Expand All @@ -152,7 +153,6 @@ func TestProcessEvents(t *testing.T) {
} else {
require.NoError(t, lm.processEvents(ctx))
}

})
}

Expand Down
17 changes: 17 additions & 0 deletions collector/lambdalifecycle/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lambdalifecycle

const InitTypeEnvVar = "AWS_LAMBDA_INITIALIZATION_TYPE"
8 changes: 8 additions & 0 deletions collector/lambdalifecycle/go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
module github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle

go 1.24.4

require github.com/stretchr/testify v1.11.1

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
10 changes: 10 additions & 0 deletions collector/lambdalifecycle/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
61 changes: 61 additions & 0 deletions collector/lambdalifecycle/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lambdalifecycle

import "os"

type InitType int

const (
OnDemand InitType = iota
ProvisionedConcurrency
SnapStart
LambdaManagedInstances
Unknown InitType = -1
)

func (t InitType) String() string {
switch t {
case OnDemand:
return "on-demand"
case ProvisionedConcurrency:
return "provisioned-concurrency"
case SnapStart:
return "snap-start"
case LambdaManagedInstances:
return "lambda-managed-instances"
default:
return "unknown"
}
}

func ParseInitType(s string) InitType {
switch s {
case "on-demand":
return OnDemand
case "provisioned-concurrency":
return ProvisionedConcurrency
case "snap-start":
return SnapStart
case "lambda-managed-instances":
return LambdaManagedInstances
default:
return Unknown
}
}

func InitTypeFromEnv(envVar string) InitType {
return ParseInitType(os.Getenv(envVar))
}
Loading
Loading