diff --git a/docs/utilities/batch-processing.md b/docs/utilities/batch-processing.md index 9b464723..19b3aa0c 100644 --- a/docs/utilities/batch-processing.md +++ b/docs/utilities/batch-processing.md @@ -138,40 +138,7 @@ Processing batches from SQS using typed Lambda handler decorator with automatic === "Function.cs" ```csharp hl_lines="1 8 19 29 32" - public class Product - { - public int Id { get; set; } - public string? Name { get; set; } - public decimal Price { get; set; } - } - - public class TypedSqsRecordHandler : ITypedRecordHandler // (1)! - { - public async Task HandleAsync(Product product, CancellationToken cancellationToken) - { - /* - * Your business logic with automatic deserialization. - * If an exception is thrown, the item will be marked as a partial batch item failure. - */ - - Logger.LogInformation($"Processing product {product.Id} - {product.Name} (${product.Price})"); - - if (product.Id == 4) // (2)! - { - throw new ArgumentException("Error on id 4"); - } - - return await Task.FromResult(RecordHandlerResult.None); // (3)! - } - - } - - [BatchProcessor(TypedRecordHandler = typeof(TypedSqsRecordHandler))] - public BatchItemFailuresResponse HandlerUsingTypedAttribute(SQSEvent _) - { - return TypedSqsBatchProcessor.Result.BatchItemFailuresResponse; // (4)! - } - + --8<-- "examples/BatchProcessing/snippets/GettingStartedWithSqs.cs:sqs_typed_handler_decorator" ``` 1. **Step 1**. Creates a class that implements ITypedRecordHandler interface - Product is automatically deserialized from SQS message body. @@ -247,33 +214,7 @@ Processing batches from SQS using Lambda handler decorator works in three stages === "Function.cs" ```csharp hl_lines="1 12 22 17 25" - public class CustomSqsRecordHandler : ISqsRecordHandler // (1)! - { - public async Task HandleAsync(SQSEvent.SQSMessage record, CancellationToken cancellationToken) - { - /* - * Your business logic. - * If an exception is thrown, the item will be marked as a partial batch item failure. - */ - - var product = JsonSerializer.Deserialize(record.Body); - - if (product.Id == 4) // (2)! - { - throw new ArgumentException("Error on id 4"); - } - - return await Task.FromResult(RecordHandlerResult.None); // (3)! - } - - } - - [BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler))] - public BatchItemFailuresResponse HandlerUsingAttribute(SQSEvent _) - { - return SqsBatchProcessor.Result.BatchItemFailuresResponse; // (4)! - } - + --8<-- "examples/BatchProcessing/snippets/GettingStartedWithSqs.cs:sqs_handler_decorator_traditional" ``` 1. **Step 1**. Creates a class that implements ISqsRecordHandler interface and the HandleAsync method. @@ -379,35 +320,7 @@ Processing batches from Kinesis using typed Lambda handler decorator with automa === "Function.cs" ```csharp hl_lines="1 9 15 20 24 27" - public class Order - { - public string? OrderId { get; set; } - public DateTime OrderDate { get; set; } - public List Items { get; set; } = new(); - public decimal TotalAmount { get; set; } - } - - internal class TypedKinesisRecordHandler : ITypedRecordHandler // (1)! - { - public async Task HandleAsync(Order order, CancellationToken cancellationToken) - { - Logger.LogInformation($"Processing order {order.OrderId} with {order.Items.Count} items"); - - if (order.TotalAmount <= 0) // (2)! - { - throw new ArgumentException("Invalid order total"); - } - - return await Task.FromResult(RecordHandlerResult.None); // (3)! - } - } - - [BatchProcessor(TypedRecordHandler = typeof(TypedKinesisRecordHandler))] - public BatchItemFailuresResponse HandlerUsingTypedAttribute(KinesisEvent _) - { - return TypedKinesisEventBatchProcessor.Result.BatchItemFailuresResponse; // (4)! - } - + --8<-- "examples/BatchProcessing/snippets/GettingStartedBasic.cs:kinesis_typed_handler_decorator" ``` 1. **Step 1**. Creates a class that implements ITypedRecordHandler interface - Order is automatically deserialized from Kinesis record data. @@ -427,28 +340,7 @@ Processing batches from Kinesis using Lambda handler decorator works in three st === "Function.cs" ```csharp hl_lines="1 7 12 17 20" - internal class CustomKinesisEventRecordHandler : IKinesisEventRecordHandler // (1)! - { - public async Task HandleAsync(KinesisEvent.KinesisEventRecord record, CancellationToken cancellationToken) - { - var product = JsonSerializer.Deserialize(record.Kinesis.Data); - - if (product.Id == 4) // (2)! - { - throw new ArgumentException("Error on id 4"); - } - - return await Task.FromResult(RecordHandlerResult.None); // (3)! - } - } - - - [BatchProcessor(RecordHandler = typeof(CustomKinesisEventRecordHandler))] - public BatchItemFailuresResponse HandlerUsingAttribute(KinesisEvent _) - { - return KinesisEventBatchProcessor.Result.BatchItemFailuresResponse; // (4)! - } - + --8<-- "examples/BatchProcessing/snippets/GettingStartedBasic.cs:kinesis_handler_decorator_traditional" ``` 1. **Step 1**. Creates a class that implements the IKinesisEventRecordHandler interface and the HandleAsync method. @@ -545,35 +437,7 @@ Processing batches from DynamoDB Streams using typed Lambda handler decorator wi === "Function.cs" ```csharp hl_lines="1 9 15 20 24 27" - public class Customer - { - public string? CustomerId { get; set; } - public string? Name { get; set; } - public string? Email { get; set; } - public DateTime CreatedAt { get; set; } - } - - internal class TypedDynamoDbRecordHandler : ITypedRecordHandler // (1)! - { - public async Task HandleAsync(Customer customer, CancellationToken cancellationToken) - { - Logger.LogInformation($"Processing customer {customer.CustomerId} - {customer.Name}"); - - if (string.IsNullOrEmpty(customer.Email)) // (2)! - { - throw new ArgumentException("Customer email is required"); - } - - return await Task.FromResult(RecordHandlerResult.None); // (3)! - } - } - - [BatchProcessor(TypedRecordHandler = typeof(TypedDynamoDbRecordHandler))] - public BatchItemFailuresResponse HandlerUsingTypedAttribute(DynamoDBEvent _) - { - return TypedDynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse; // (4)! - } - + --8<-- "examples/BatchProcessing/snippets/GettingStartedBasic.cs:dynamodb_typed_handler_decorator" ``` 1. **Step 1**. Creates a class that implements ITypedRecordHandler interface - Customer is automatically deserialized from DynamoDB stream record. @@ -593,28 +457,7 @@ Processing batches from DynamoDB Streams using Lambda handler decorator works in === "Function.cs" ```csharp hl_lines="1 7 12 17 20" - internal class CustomDynamoDbStreamRecordHandler : IDynamoDbStreamRecordHandler // (1)! - { - public async Task HandleAsync(DynamoDBEvent.DynamodbStreamRecord record, CancellationToken cancellationToken) - { - var product = JsonSerializer.Deserialize(record.Dynamodb.NewImage["Product"].S); - - if (product.Id == 4) // (2)! - { - throw new ArgumentException("Error on id 4"); - } - - return await Task.FromResult(RecordHandlerResult.None); // (3)! - } - } - - - [BatchProcessor(RecordHandler = typeof(CustomDynamoDbStreamRecordHandler))] - public BatchItemFailuresResponse HandlerUsingAttribute(DynamoDBEvent _) - { - return DynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse; // (4)! - } - + --8<-- "examples/BatchProcessing/snippets/GettingStartedBasic.cs:dynamodb_handler_decorator_traditional" ``` 1. **Step 1**. Creates a class that implements the IDynamoDbStreamRecordHandler and the HandleAsync method. @@ -700,27 +543,7 @@ This allows us to **(1)** continue processing the batch, **(2)** collect each ba === "Function.cs" ```csharp hl_lines="14" - public class CustomSqsRecordHandler : ISqsRecordHandler // (1)! - { - public async Task HandleAsync(SQSEvent.SQSMessage record, CancellationToken cancellationToken) - { - /* - * Your business logic. - * If an exception is thrown, the item will be marked as a partial batch item failure. - */ - - var product = JsonSerializer.Deserialize(record.Body); - - if (product.Id == 4) // (2)! - { - throw new ArgumentException("Error on id 4"); - } - - return await Task.FromResult(RecordHandlerResult.None); // (3)! - } - - } - + --8<-- "examples/BatchProcessing/snippets/AdvancedErrorHandling.cs:sqs_record_handler_error_handling" ``` === "Sample event" @@ -828,13 +651,7 @@ Another approach is to decorate the handler and use one of the policies in the * === "Function.cs" ```csharp hl_lines="2" - [BatchProcessor(RecordHandler = typeof(CustomDynamoDbStreamRecordHandler), - ErrorHandlingPolicy = BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure)] - public BatchItemFailuresResponse HandlerUsingAttribute(DynamoDBEvent _) - { - return DynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse; - } - + --8<-- "examples/BatchProcessing/snippets/AdvancedErrorHandling.cs:error_handling_policy_attribute" ``` ### Partial failure mechanics @@ -967,29 +784,13 @@ For Native AOT scenarios, you can configure JsonSerializerContext: === "JsonSerializerContext Configuration" ```csharp - [JsonSerializable(typeof(Product))] - [JsonSerializable(typeof(Order))] - [JsonSerializable(typeof(Customer))] - [JsonSerializable(typeof(List))] - [JsonSourceGenerationOptions( - PropertyNamingPolicy = JsonKnownNamingPolicy.CamelCase, - WriteIndented = false, - DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull)] - public partial class MyJsonSerializerContext : JsonSerializerContext - { - } + --8<-- "examples/BatchProcessing/snippets/CustomSerialization.cs:json_serializer_context_configuration" ``` === "Using with Attribute" ```csharp hl_lines="2 3" - [BatchProcessor( - TypedRecordHandler = typeof(TypedSqsRecordHandler), - JsonSerializerContext = typeof(MyJsonSerializerContext))] - public BatchItemFailuresResponse ProcessWithAot(SQSEvent sqsEvent) - { - return TypedSqsBatchProcessor.Result.BatchItemFailuresResponse; - } + --8<-- "examples/BatchProcessing/snippets/CustomSerialization.cs:json_serializer_context_using_with_attribute" ``` @@ -1001,32 +802,13 @@ For typed handlers that need access to Lambda context, use `ITypedRecordHandlerW === "Handler with Context" ```csharp hl_lines="1 3" - public class ProductHandlerWithContext : ITypedRecordHandlerWithContext - { - public async Task HandleAsync(Product product, ILambdaContext context, CancellationToken cancellationToken) - { - Logger.LogInformation($"Processing product {product.Id} in request {context.AwsRequestId}"); - Logger.LogInformation($"Remaining time: {context.RemainingTime.TotalSeconds}s"); - - // Use context for timeout handling - if (context.RemainingTime.TotalSeconds < 5) - { - Logger.LogWarning("Low remaining time, processing quickly"); - } - - return RecordHandlerResult.None; - } - } + --8<-- "examples/BatchProcessing/snippets/GettingStartedWithSqs.cs:typed_handler_with_context" ``` === "Function Usage" ```csharp hl_lines="1 2" - [BatchProcessor(TypedRecordHandlerWithContext = typeof(ProductHandlerWithContext))] - public BatchItemFailuresResponse ProcessWithContext(SQSEvent sqsEvent, ILambdaContext context) - { - return TypedSqsBatchProcessor.Result.BatchItemFailuresResponse; - } + --8<-- "examples/BatchProcessing/snippets/GettingStartedWithSqs.cs:function_usage_with_context" ``` ### Migration from Traditional to Typed Handlers @@ -1036,52 +818,13 @@ You can gradually migrate from traditional to typed handlers: === "Before (Traditional)" ```csharp hl_lines="1 6" - public class TraditionalSqsHandler : ISqsRecordHandler - { - public async Task HandleAsync(SQSEvent.SQSMessage record, CancellationToken cancellationToken) - { - // Manual deserialization - var product = JsonSerializer.Deserialize(record.Body); - - Logger.LogInformation($"Processing product {product.Id}"); - - if (product.Price < 0) - throw new ArgumentException("Invalid price"); - - return RecordHandlerResult.None; - } - } - - [BatchProcessor(RecordHandler = typeof(TraditionalSqsHandler))] - public BatchItemFailuresResponse ProcessSqs(SQSEvent sqsEvent) - { - return SqsBatchProcessor.Result.BatchItemFailuresResponse; - } + --8<-- "examples/BatchProcessing/snippets/GettingStartedWithSqs.cs:migration_before_traditional" ``` === "After (Typed)" ```csharp hl_lines="1 5" - public class TypedSqsHandler : ITypedRecordHandler - { - public async Task HandleAsync(Product product, CancellationToken cancellationToken) - { - // Automatic deserialization - product is already deserialized! - Logger.LogInformation($"Processing product {product.Id}"); - - // Same business logic - if (product.Price < 0) - throw new ArgumentException("Invalid price"); - - return RecordHandlerResult.None; - } - } - - [BatchProcessor(TypedRecordHandler = typeof(TypedSqsHandler))] - public BatchItemFailuresResponse ProcessSqs(SQSEvent sqsEvent) - { - return TypedSqsBatchProcessor.Result.BatchItemFailuresResponse; - } + --8<-- "examples/BatchProcessing/snippets/GettingStartedWithSqs.cs:migration_after_typed" ``` ### Error Handling with Typed Processors @@ -1091,13 +834,7 @@ Typed processors support the same error handling policies as traditional process === "Custom Error Handling" ```csharp hl_lines="2" - [BatchProcessor( - TypedRecordHandler = typeof(TypedSqsHandler), - ErrorHandlingPolicy = BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure)] - public BatchItemFailuresResponse ProcessWithErrorPolicy(SQSEvent sqsEvent) - { - return TypedSqsBatchProcessor.Result.BatchItemFailuresResponse; - } + --8<-- "examples/BatchProcessing/snippets/AdvancedErrorHandling.cs:typed_custom_error_handling" ``` ### Advanced @@ -1111,20 +848,7 @@ Calling the **`ProcessAsync`** method on the Instance of the static BatchProcess === "Function.cs" ```csharp hl_lines="3" - public async Task HandlerUsingUtility(DynamoDBEvent dynamoDbEvent) - { - var result = await DynamoDbStreamBatchProcessor.Instance.ProcessAsync(dynamoDbEvent, RecordHandler.From(record => - { - var product = JsonSerializer.Deserialize(record.Dynamodb.NewImage["Product"].S); - - if (product.GetProperty("Id").GetInt16() == 4) - { - throw new ArgumentException("Error on 4"); - } - })); - return result.BatchItemFailuresResponse; - } - + --8<-- "examples/BatchProcessing/snippets/GettingStartedBasic.cs:using_utility_outside_decorator" ``` To make the handler testable you can use Dependency Injection to resolve the BatchProcessor (`SqsBatchProcessor`, `DynamoDbStreamBatchProcessor`, `KinesisEventBatchProcessor`) instance and then call the **`ProcessAsync`** method. @@ -1132,52 +856,19 @@ To make the handler testable you can use Dependency Injection to resolve the Bat === "GetRequiredService inside the method" ```csharp hl_lines="3 4 5" - public async Task HandlerUsingUtilityFromIoc(DynamoDBEvent dynamoDbEvent) - { - var batchProcessor = Services.Provider.GetRequiredService(); - var recordHandler = Services.Provider.GetRequiredService(); - var result = await batchProcessor.ProcessAsync(dynamoDbEvent, recordHandler); - return result.BatchItemFailuresResponse; - } - + --8<-- "examples/BatchProcessing/snippets/GettingStartedBasic.cs:using_utility_from_ioc_getrequiredservice" ``` === "Injecting method parameters" ```csharp hl_lines="2 4" - public async Task HandlerUsingUtilityFromIoc(DynamoDBEvent dynamoDbEvent, - IDynamoDbStreamBatchProcessor batchProcessor, IDynamoDbStreamRecordHandler recordHandler) - { - var result = await batchProcessor.ProcessAsync(dynamoDbEvent, recordHandler); - return result.BatchItemFailuresResponse; - } - + --8<-- "examples/BatchProcessing/snippets/GettingStartedBasic.cs:using_utility_from_ioc_injected_parameters" ``` === "Example implementation of IServiceProvider" ```csharp hl_lines="16 17" - internal class Services - { - private static readonly Lazy LazyInstance = new(Build); - - private static ServiceCollection _services; - public static IServiceProvider Provider => LazyInstance.Value; - - public static IServiceProvider Init() - { - return LazyInstance.Value; - } - - private static IServiceProvider Build() - { - _services = new ServiceCollection(); - _services.AddScoped(); - _services.AddScoped(); - return _services.BuildServiceProvider(); - } - } - + --8<-- "examples/BatchProcessing/snippets/GettingStartedBasic.cs:example_implementation_of_iserviceprovider" ``` #### Processing messages in parallel @@ -1206,11 +897,7 @@ You can also set `POWERTOOLS_BATCH_MAX_DEGREE_OF_PARALLELISM` Environment Variab === "Function.cs" ```csharp hl_lines="1" - [BatchProcessor(RecordHandler = typeof(CustomDynamoDbStreamRecordHandler), BatchParallelProcessingEnabled = true )] - public BatchItemFailuresResponse HandlerUsingAttribute(DynamoDBEvent _) - { - return DynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse; - } + --8<-- "examples/BatchProcessing/snippets/GettingStartedBasic.cs:processing_messages_in_parallel" ``` #### Working with full batch failures @@ -1224,31 +911,13 @@ For these scenarios, you can set `POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE = === "Setting ThrowOnFullBatchFailure on Decorator" ```csharp hl_lines="3" - [BatchProcessor( - RecordHandler = typeof(CustomSqsRecordHandler), - ThrowOnFullBatchFailure = false)] - public BatchItemFailuresResponse HandlerUsingAttribute(SQSEvent _) - { - return SqsBatchProcessor.Result.BatchItemFailuresResponse; - } - + --8<-- "examples/BatchProcessing/snippets/PartialFailureHandling.cs:throw_on_full_batch_failure_decorator" ``` === "Setting ThrowOnFullBatchFailure outside Decorator" ```csharp hl_lines="8" - public async Task HandlerUsingUtility(SQSEvent sqsEvent) - { - var result = await SqsBatchProcessor.Instance.ProcessAsync(sqsEvent, RecordHandler.From(x => - { - // Inline handling of SQS message... - }), new ProcessingOptions - { - ThrowOnFullBatchFailure = false - }); - return result.BatchItemFailuresResponse; - } - + --8<-- "examples/BatchProcessing/snippets/PartialFailureHandling.cs:throw_on_full_batch_failure_outside_decorator" ``` #### Extending BatchProcessor @@ -1266,108 +935,7 @@ For these scenarios, you can create a class that inherits from `BatchProcessor` === "Function.cs" ```csharp hl_lines="1 21 54 97" - - public class CustomDynamoDbStreamBatchProcessor : DynamoDbStreamBatchProcessor - { - public override async Task> ProcessAsync(DynamoDBEvent @event, - IRecordHandler recordHandler, ProcessingOptions processingOptions) - { - ProcessingResult = new ProcessingResult(); - - // Prepare batch records (order is preserved) - var batchRecords = GetRecordsFromEvent(@event).Select(x => new KeyValuePair(GetRecordId(x), x)) - .ToArray(); - - // We assume all records fail by default to avoid loss of data - var failureBatchRecords = batchRecords.Select(x => new KeyValuePair>(x.Key, - new RecordFailure - { - Exception = new UnprocessedRecordException($"Record: '{x.Key}' has not been processed."), - Record = x.Value - })); - - // Override to fail on first failure - var errorHandlingPolicy = BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure; - - var successRecords = new Dictionary>(); - var failureRecords = new Dictionary>(failureBatchRecords); - - try - { - foreach (var pair in batchRecords) - { - var (recordId, record) = pair; - - try - { - var result = await HandleRecordAsync(record, recordHandler, CancellationToken.None); - failureRecords.Remove(recordId, out _); - successRecords.TryAdd(recordId, new RecordSuccess - { - Record = record, - RecordId = recordId, - HandlerResult = result - }); - } - catch (Exception ex) - { - // Capture exception - failureRecords[recordId] = new RecordFailure - { - Exception = new RecordProcessingException( - $"Failed processing record: '{recordId}'. See inner exception for details.", ex), - Record = record, - RecordId = recordId - }; - - Metrics.AddMetric("BatchRecordFailures", 1, MetricUnit.Count); - - try - { - // Invoke hook - await HandleRecordFailureAsync(record, ex); - } - catch - { - // NOOP - } - - // Check if we should stop record processing on first error - // ReSharper disable once ConditionIsAlwaysTrueOrFalse - if (errorHandlingPolicy == BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure) - { - // This causes the loop's (inner) cancellation token to be cancelled for all operations already scheduled internally - throw new CircuitBreakerException( - "Error handling policy is configured to stop processing on first batch item failure. See inner exception for details.", - ex); - } - } - } - } - catch (Exception ex) when (ex is CircuitBreakerException or OperationCanceledException) - { - // NOOP - } - - ProcessingResult.BatchRecords.AddRange(batchRecords.Select(x => x.Value)); - ProcessingResult.BatchItemFailuresResponse.BatchItemFailures.AddRange(failureRecords.Select(x => - new BatchItemFailuresResponse.BatchItemFailure - { - ItemIdentifier = x.Key - })); - ProcessingResult.FailureRecords.AddRange(failureRecords.Values); - - ProcessingResult.SuccessRecords.AddRange(successRecords.Values); - - return ProcessingResult; - } - - // ReSharper disable once RedundantOverriddenMember - protected override async Task HandleRecordFailureAsync(DynamoDBEvent.DynamodbStreamRecord record, Exception exception) - { - await base.HandleRecordFailureAsync(record, exception); - } - } + --8<-- "examples/BatchProcessing/snippets/PartialFailureHandling.cs:extending_batch_processor" ``` ## Testing your code @@ -1379,66 +947,13 @@ Testing typed batch processors is straightforward since you work directly with y === "Typed Handler Test" ```csharp - [Fact] - public async Task TypedHandler_ValidProduct_ProcessesSuccessfully() - { - // Arrange - var product = new Product { Id = 1, Name = "Test Product", Price = 10.99m }; - var handler = new TypedSqsRecordHandler(); - var cancellationToken = CancellationToken.None; - - // Act - var result = await handler.HandleAsync(product, cancellationToken); - - // Assert - Assert.Equal(RecordHandlerResult.None, result); - } - - [Fact] - public async Task TypedHandler_InvalidProduct_ThrowsException() - { - // Arrange - var product = new Product { Id = 4, Name = "Invalid", Price = -10 }; - var handler = new TypedSqsRecordHandler(); - - // Act & Assert - await Assert.ThrowsAsync(() => - handler.HandleAsync(product, CancellationToken.None)); - } + --8<-- "examples/BatchProcessing/snippets/PartialFailureHandling.cs:typed_handler_test" ``` === "Integration Test" ```csharp - [Fact] - public async Task ProcessSqsEvent_WithTypedHandler_ProcessesAllRecords() - { - // Arrange - var sqsEvent = new SQSEvent - { - Records = new List - { - new() { - MessageId = "1", - Body = JsonSerializer.Serialize(new Product { Id = 1, Name = "Product 1", Price = 10 }), - EventSourceArn = "arn:aws:sqs:us-east-1:123456789012:my-queue" - }, - new() { - MessageId = "2", - Body = JsonSerializer.Serialize(new Product { Id = 2, Name = "Product 2", Price = 20 }), - EventSourceArn = "arn:aws:sqs:us-east-1:123456789012:my-queue" - } - } - }; - - var function = new TypedFunction(); - - // Act - var result = function.HandlerUsingTypedAttribute(sqsEvent); - - // Assert - Assert.Empty(result.BatchItemFailures); - } + --8<-- "examples/BatchProcessing/snippets/PartialFailureHandling.cs:integration_test" ``` ### Testing Traditional Handlers @@ -1448,91 +963,25 @@ As there is no external calls, you can unit test your code with `BatchProcessor` === "Test.cs" ```csharp - [Fact] - public Task Sqs_Handler_Using_Attribute() - { - var request = new SQSEvent - { - Records = TestHelper.SqsMessages - }; - - var function = new HandlerFunction(); - - var response = function.HandlerUsingAttribute(request); - - Assert.Equal(2, response.BatchItemFailures.Count); - Assert.Equal("2", response.BatchItemFailures[0].ItemIdentifier); - Assert.Equal("4", response.BatchItemFailures[1].ItemIdentifier); - - return Task.CompletedTask; - } + --8<-- "examples/BatchProcessing/snippets/PartialFailureHandling.cs:traditional_handler_test" ``` === "Function.cs" ```csharp - [BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler))] - public BatchItemFailuresResponse HandlerUsingAttribute(SQSEvent _) - { - return SqsBatchProcessor.Result.BatchItemFailuresResponse; - } + --8<-- "examples/BatchProcessing/snippets/PartialFailureHandling.cs:function_handler_using_attribute" ``` === "CustomSqsRecordHandler.cs" ```csharp - public class CustomSqsRecordHandler : ISqsRecordHandler - { - public async Task HandleAsync(SQSEvent.SQSMessage record, CancellationToken cancellationToken) - { - var product = JsonSerializer.Deserialize(record.Body); - - if (product.GetProperty("Id").GetInt16() == 4) - { - throw new ArgumentException("Error on 4"); - } - - return await Task.FromResult(RecordHandlerResult.None); - } - } + --8<-- "examples/BatchProcessing/snippets/PartialFailureHandling.cs:custom_sqs_record_handler" ``` === "SQS Event.cs" ```csharp - internal static List SqsMessages => new() - { - new SQSEvent.SQSMessage - { - MessageId = "1", - Body = "{\"Id\":1,\"Name\":\"product-4\",\"Price\":14}", - EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue" - }, - new SQSEvent.SQSMessage - { - MessageId = "2", - Body = "fail", - EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue" - }, - new SQSEvent.SQSMessage - { - MessageId = "3", - Body = "{\"Id\":3,\"Name\":\"product-4\",\"Price\":14}", - EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue" - }, - new SQSEvent.SQSMessage - { - MessageId = "4", - Body = "{\"Id\":4,\"Name\":\"product-4\",\"Price\":14}", - EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue" - }, - new SQSEvent.SQSMessage - { - MessageId = "5", - Body = "{\"Id\":5,\"Name\":\"product-4\",\"Price\":14}", - EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue" - }, - }; + --8<-- "examples/BatchProcessing/snippets/PartialFailureHandling.cs:sqs_event_test_helper" ``` ## Complete Examples and Documentation @@ -1541,4 +990,4 @@ The [BatchProcessing example](https://github.com/aws-powertools/powertools-lambd - **TypedFunction.cs** - Complete examples using all typed batch processing patterns - **TypedHandlers/** - Example implementations for SQS, Kinesis, and DynamoDB -- **Sample Events** - Test events for all event types with typed data \ No newline at end of file +- **Sample Events** - Test events for all event types with typed data diff --git a/docs/utilities/idempotency.md b/docs/utilities/idempotency.md index 2efe4658..4b96c1dc 100644 --- a/docs/utilities/idempotency.md +++ b/docs/utilities/idempotency.md @@ -143,19 +143,7 @@ You can quickly start by configuring `Idempotency` and using it with the `Idempo Initialization and configuration of the `Idempotency` must be performed outside the handler, preferably in the constructor. ```csharp hl_lines="5 8" - public class Function - { - public Function() - { - Idempotency.Configure(builder => builder.UseDynamoDb("idempotency_table")); - } - - [Idempotent] - public Task FunctionHandler(string input, ILambdaContext context) - { - return Task.FromResult(input.ToUpper()); - } - } + --8<-- "examples/Idempotency/snippets/GettingStartedBasic.cs:idempotent_attribute" ``` #### Idempotent attribute on another method @@ -170,24 +158,7 @@ When using `Idempotent` attribute on another method, you must tell which paramet !!! info "The parameter must be serializable in JSON. We use `System.Text.Json` internally to (de)serialize objects" ```csharp hl_lines="5 14-15" - public class Function - { - public Function() - { - Idempotency.Configure(builder => builder.UseDynamoDb("idempotency_table")); - } - - public Task FunctionHandler(string input, ILambdaContext context) - { - MyInternalMethod("hello", "world") - return Task.FromResult(input.ToUpper()); - } - - [Idempotent] - private string MyInternalMethod(string argOne, [IdempotencyKey] string argTwo) { - return "something"; - } - } + --8<-- "examples/Idempotency/snippets/GettingStartedBasic.cs:idempotent_attribute_on_another_method" ``` ### Choosing a payload subset for idempotency @@ -214,11 +185,7 @@ If we were to treat the entire request as our idempotency key, a simple HTTP hea === "Payment function" ```csharp hl_lines="4" - Idempotency.Configure(builder => - builder - .WithOptions(optionsBuilder => - optionsBuilder.WithEventKeyJmesPath("powertools_json(Body).[\"user_id\", \"product_id\"]")) - .UseDynamoDb("idempotency_table")); + --8<-- "examples/Idempotency/snippets/CustomIdempotencyKey.cs:event_key_jmespath_payment" ``` === "Sample event" @@ -263,20 +230,7 @@ By default, the idempotency key is prefixed with `[ClassName].[DecoratedMethodNa You can customize this prefix by setting the `KeyPrefix` property in the Idempotency decorator: ```csharp hl_lines="9" -public class Function -{ - public Function() - { - var tableName = Environment.GetEnvironmentVariable("IDEMPOTENCY_TABLE_NAME"); - Idempotency.Configure(builder => builder.UseDynamoDb(tableName)); - } - - [Idempotent(KeyPrefix = "MyCustomKeyPrefix")] - public APIGatewayProxyResponse FunctionHandler(APIGatewayProxyRequest apigwProxyEvent, ILambdaContext context) - { - return TestHelper.TestMethod(apigwProxyEvent); - } -} +--8<-- "examples/Idempotency/snippets/CustomIdempotencyKey.cs:custom_key_prefix" ``` ### Lambda timeouts @@ -301,25 +255,7 @@ Here is an example on how you register the Lambda context in your handler: === "Registering the Lambda context" ```csharp hl_lines="10" title="Registering the Lambda context" - public class Function - { - public Function() - { - Idempotency.Configure(builder => builder.UseDynamoDb("idempotency_table")); - } - - public Task FunctionHandler(string input, ILambdaContext context) - { - Idempotency.RegisterLambdaContext(context); - MyInternalMethod("hello", "world") - return Task.FromResult(input.ToUpper()); - } - - [Idempotent] - private string MyInternalMethod(string argOne, [IdempotencyKey] string argTwo) { - return "something"; - } - } + --8<-- "examples/Idempotency/snippets/AdvancedConfiguration.cs:register_lambda_context" ``` ### Handling exceptions @@ -357,31 +293,7 @@ If an Exception is raised _outside_ the scope of the decorated method and after === "Handling exceptions" ```csharp hl_lines="10-12 16-18 21" title="Exception not affecting idempotency record sample" - public class Function - { - public Function() - { - Idempotency.Configure(builder => builder.UseDynamoDb("idempotency_table")); - } - - public Task FunctionHandler(string input, ILambdaContext context) - { - Idempotency.RegisterLambdaContext(context); - // If an exception is thrown here, no idempotent record will ever get created as the - // idempotent method does not get called - - MyInternalMethod("hello", "world") - - // This exception will not cause the idempotent record to be deleted, since it - // happens after the decorated method has been successfully called - throw new Exception(); - } - - [Idempotent] - private string MyInternalMethod(string argOne, [IdempotencyKey] string argTwo) { - return "something"; - } - } + --8<-- "examples/Idempotency/snippets/AdvancedConfiguration.cs:exception_not_affecting_record" ``` ### Idempotency request flow @@ -588,15 +500,7 @@ This persistence store is built-in, and you can either use an existing DynamoDB Use the builder to customize the table structure: ```csharp title="Customizing DynamoDBPersistenceStore to suit your table structure" -new DynamoDBPersistenceStoreBuilder() - .WithTableName("TABLE_NAME") - .WithKeyAttr("idempotency_key") - .WithExpiryAttr("expires_at") - .WithStatusAttr("current_status") - .WithDataAttr("result_data") - .WithValidationAttr("validation_key") - .WithInProgressExpiryAttr("in_progress_expires_at") - .Build() +--8<-- "examples/Idempotency/snippets/DynamoDbConfiguration.cs:dynamodb_persistence_store_builder" ``` When using DynamoDB as a persistence layer, you can alter the attribute names by passing these parameters when initializing the persistence layer: @@ -619,14 +523,7 @@ When using DynamoDB as a persistence layer, you can alter the attribute names by Idempotency behavior can be further configured with **`IdempotencyOptions`** using a builder: ```csharp -new IdempotencyOptionsBuilder() - .WithEventKeyJmesPath("id") - .WithPayloadValidationJmesPath("paymentId") - .WithThrowOnNoIdempotencyKey(true) - .WithExpiration(TimeSpan.FromMinutes(1)) - .WithUseLocalCache(true) - .WithHashFunction("MD5") - .Build(); +--8<-- "examples/Idempotency/snippets/AdvancedConfiguration.cs:idempotency_options_builder" ``` These are the available options for further configuration: @@ -660,9 +557,7 @@ This is a locking mechanism for correctness. Since we don't know the result from You can enable it as seen before with: ```csharp title="Enable local cache" - new IdempotencyOptionsBuilder() - .WithUseLocalCache(true) - .Build() +--8<-- "examples/Idempotency/snippets/AdvancedConfiguration.cs:enable_local_cache" ``` When enabled, we cache a maximum of 255 records in each Lambda execution environment @@ -679,9 +574,7 @@ In most cases, it is not desirable to store the idempotency records forever. Rat You can change this window with the **`ExpirationInSeconds`** parameter: ```csharp title="Customizing expiration time" -new IdempotencyOptionsBuilder() - .WithExpiration(TimeSpan.FromMinutes(5)) - .Build() +--8<-- "examples/Idempotency/snippets/ExpirationSettings.cs:customizing_expiration_time" ``` Records older than 5 minutes will be marked as expired, and the Lambda handler will be executed normally even if it is invoked with a matching payload. @@ -701,13 +594,7 @@ With **`PayloadValidationJMESPath`**, you can provide an additional JMESPath exp === "Function.cs" ```csharp hl_lines="6" - Idempotency.Configure(builder => - builder - .WithOptions(optionsBuilder => - optionsBuilder - .WithEventKeyJmesPath("[userDetail, productId]") - .WithPayloadValidationJmesPath("amount")) - .UseDynamoDb("TABLE_NAME")); + --8<-- "examples/Idempotency/snippets/PayloadValidation.cs:payload_validation_jmespath" ``` === "Example Event 1" @@ -756,23 +643,7 @@ This means that we will throw **`IdempotencyKeyException`** if the evaluation of === "Function.cs" ```csharp hl_lines="9" - public App() - { - Idempotency.Configure(builder => - builder - .WithOptions(optionsBuilder => - optionsBuilder - // Requires "user"."uid" and "orderId" to be present - .WithEventKeyJmesPath("[user.uid, orderId]") - .WithThrowOnNoIdempotencyKey(true)) - .UseDynamoDb("TABLE_NAME")); - } - - [Idempotent] - public Task FunctionHandler(Order input, ILambdaContext context) - { - // ... - } + --8<-- "examples/Idempotency/snippets/CustomIdempotencyKey.cs:throw_on_no_idempotency_key" ``` === "Success Event" @@ -808,17 +679,7 @@ When creating the `DynamoDBPersistenceStore`, you can set a custom [`AmazonDynam === "Custom AmazonDynamoDBClient" ```csharp hl_lines="3 9" - public Function() - { - AmazonDynamoDBClient customClient = new AmazonDynamoDBClient(RegionEndpoint.APSouth1); - - Idempotency.Configure(builder => - builder.UseDynamoDb(storeBuilder => - storeBuilder. - WithTableName("TABLE_NAME") - .WithDynamoDBClient(customClient) - )); - } + --8<-- "examples/Idempotency/snippets/DynamoDbConfiguration.cs:custom_amazon_dynamodb_client" ``` ### Using a DynamoDB table with a composite primary key @@ -832,12 +693,7 @@ You can optionally set a static value for the partition key using the `StaticPkV === "Reusing a DynamoDB table that uses a composite primary key" ```csharp hl_lines="5" - Idempotency.Configure(builder => - builder.UseDynamoDb(storeBuilder => - storeBuilder. - WithTableName("TABLE_NAME") - .WithSortKeyAttr("sort_key") - )); + --8<-- "examples/Idempotency/snippets/DynamoDbConfiguration.cs:dynamodb_composite_primary_key" ``` Data would then be stored in DynamoDB like this: @@ -858,24 +714,7 @@ You can set up a response hook in the Idempotency configuration to manipulate th The example below shows how to append HTTP headers to an `APIGatewayProxyResponse`: ```csharp -Idempotency.Config() - .WithConfig(IdempotencyOptions.Builder() - .WithEventKeyJmesPath("powertools_json(body).address") - .WithResponseHook((responseData, dataRecord) => { - if (responseData is APIGatewayProxyResponse proxyResponse) - { - proxyResponse.Headers ??= new Dictionary(); - proxyResponse.Headers["x-idempotency-response"] = "true"; - proxyResponse.Headers["x-idempotency-expiration"] = dataRecord.ExpiryTimestamp.ToString(); - return proxyResponse; - } - return responseData; - }) - .Build()) - .WithPersistenceStore(DynamoDBPersistenceStore.Builder() - .WithTableName(Environment.GetEnvironmentVariable("IDEMPOTENCY_TABLE")) - .Build()) - .Configure(); +--8<-- "examples/Idempotency/snippets/AdvancedConfiguration.cs:response_hook" ``` ???+ info "Info: Using custom de-serialization?" @@ -906,54 +745,13 @@ This ensures that when serializing your payload, the utility uses the correct se In the example below, we use the default `LambdaFunctionJsonSerializerContext`: ```csharp -Idempotency.Configure(builder => -builder.WithJsonSerializationContext(LambdaFunctionJsonSerializerContext.Default))); - +--8<-- "examples/Idempotency/snippets/AdvancedConfiguration.cs:with_json_serialization_context" ``` Full example: ```csharp hl_lines="8" -public static class Function -{ - private static async Task Main() - { - var tableName = Environment.GetEnvironmentVariable("IDEMPOTENCY_TABLE_NAME"); - Idempotency.Configure(builder => - builder - .WithJsonSerializationContext(LambdaFunctionJsonSerializerContext.Default) - .WithOptions(optionsBuilder => optionsBuilder - .WithExpiration(TimeSpan.FromHours(1))) - .UseDynamoDb(storeBuilder => storeBuilder - .WithTableName(tableName) - )); - - Func handler = FunctionHandler; - await LambdaBootstrapBuilder.Create(handler, - new SourceGeneratorLambdaJsonSerializer()) - .Build() - .RunAsync(); - } - - [Idempotent] - public static APIGatewayProxyResponse FunctionHandler(APIGatewayProxyRequest apigwProxyEvent, - ILambdaContext context) - { - return new APIGatewayProxyResponse - { - Body = JsonSerializer.Serialize(response, typeof(Response), LambdaFunctionJsonSerializerContext.Default), - StatusCode = 200, - Headers = new Dictionary { { "Content-Type", "application/json" } } - }; - } -} - -[JsonSerializable(typeof(APIGatewayProxyRequest))] -[JsonSerializable(typeof(APIGatewayProxyResponse))] -[JsonSerializable(typeof(Response))] -public partial class LambdaFunctionJsonSerializerContext : JsonSerializerContext -{ -} +--8<-- "examples/Idempotency/snippets/AdvancedConfiguration.cs:with_json_serialization_context_full_example" ``` ## Testing your code diff --git a/examples/BatchProcessing/snippets/AdvancedErrorHandling.cs b/examples/BatchProcessing/snippets/AdvancedErrorHandling.cs new file mode 100644 index 00000000..5c6b12cc --- /dev/null +++ b/examples/BatchProcessing/snippets/AdvancedErrorHandling.cs @@ -0,0 +1,46 @@ +// This file is referenced by docs/utilities/batch-processing.md +// via pymdownx.snippets (mkdocs). + +namespace AWS.Lambda.Powertools.Docs.Snippets.BatchProcessing; + +// --8<-- [start:sqs_record_handler_error_handling] + public class CustomSqsRecordHandler : ISqsRecordHandler // (1)! + { + public async Task HandleAsync(SQSEvent.SQSMessage record, CancellationToken cancellationToken) + { + /* + * Your business logic. + * If an exception is thrown, the item will be marked as a partial batch item failure. + */ + + var product = JsonSerializer.Deserialize(record.Body); + + if (product.Id == 4) // (2)! + { + throw new ArgumentException("Error on id 4"); + } + + return await Task.FromResult(RecordHandlerResult.None); // (3)! + } + + } +// --8<-- [end:sqs_record_handler_error_handling] + +// --8<-- [start:error_handling_policy_attribute] +[BatchProcessor(RecordHandler = typeof(CustomDynamoDbStreamRecordHandler), + ErrorHandlingPolicy = BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure)] +public BatchItemFailuresResponse HandlerUsingAttribute(DynamoDBEvent _) +{ + return DynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse; +} +// --8<-- [end:error_handling_policy_attribute] + +// --8<-- [start:typed_custom_error_handling] +[BatchProcessor( + TypedRecordHandler = typeof(TypedSqsHandler), + ErrorHandlingPolicy = BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure)] +public BatchItemFailuresResponse ProcessWithErrorPolicy(SQSEvent sqsEvent) +{ + return TypedSqsBatchProcessor.Result.BatchItemFailuresResponse; +} +// --8<-- [end:typed_custom_error_handling] diff --git a/examples/BatchProcessing/snippets/CustomSerialization.cs b/examples/BatchProcessing/snippets/CustomSerialization.cs new file mode 100644 index 00000000..87d94e3b --- /dev/null +++ b/examples/BatchProcessing/snippets/CustomSerialization.cs @@ -0,0 +1,28 @@ +// This file is referenced by docs/utilities/batch-processing.md +// via pymdownx.snippets (mkdocs). + +namespace AWS.Lambda.Powertools.Docs.Snippets.BatchProcessing; + +// --8<-- [start:json_serializer_context_configuration] +[JsonSerializable(typeof(Product))] +[JsonSerializable(typeof(Order))] +[JsonSerializable(typeof(Customer))] +[JsonSerializable(typeof(List))] +[JsonSourceGenerationOptions( + PropertyNamingPolicy = JsonKnownNamingPolicy.CamelCase, + WriteIndented = false, + DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull)] +public partial class MyJsonSerializerContext : JsonSerializerContext +{ +} +// --8<-- [end:json_serializer_context_configuration] + +// --8<-- [start:json_serializer_context_using_with_attribute] +[BatchProcessor( + TypedRecordHandler = typeof(TypedSqsRecordHandler), + JsonSerializerContext = typeof(MyJsonSerializerContext))] +public BatchItemFailuresResponse ProcessWithAot(SQSEvent sqsEvent) +{ + return TypedSqsBatchProcessor.Result.BatchItemFailuresResponse; +} +// --8<-- [end:json_serializer_context_using_with_attribute] diff --git a/examples/BatchProcessing/snippets/GettingStartedBasic.cs b/examples/BatchProcessing/snippets/GettingStartedBasic.cs new file mode 100644 index 00000000..1672d5e8 --- /dev/null +++ b/examples/BatchProcessing/snippets/GettingStartedBasic.cs @@ -0,0 +1,180 @@ +// This file is referenced by docs/utilities/batch-processing.md +// via pymdownx.snippets (mkdocs). + +namespace AWS.Lambda.Powertools.Docs.Snippets.BatchProcessing; + +// --8<-- [start:kinesis_typed_handler_decorator] +public class Order +{ + public string? OrderId { get; set; } + public DateTime OrderDate { get; set; } + public List Items { get; set; } = new(); + public decimal TotalAmount { get; set; } +} + +internal class TypedKinesisRecordHandler : ITypedRecordHandler // (1)! +{ + public async Task HandleAsync(Order order, CancellationToken cancellationToken) + { + Logger.LogInformation($"Processing order {order.OrderId} with {order.Items.Count} items"); + + if (order.TotalAmount <= 0) // (2)! + { + throw new ArgumentException("Invalid order total"); + } + + return await Task.FromResult(RecordHandlerResult.None); // (3)! + } +} + +[BatchProcessor(TypedRecordHandler = typeof(TypedKinesisRecordHandler))] +public BatchItemFailuresResponse HandlerUsingTypedAttribute(KinesisEvent _) +{ + return TypedKinesisEventBatchProcessor.Result.BatchItemFailuresResponse; // (4)! +} +// --8<-- [end:kinesis_typed_handler_decorator] + +// --8<-- [start:kinesis_handler_decorator_traditional] +internal class CustomKinesisEventRecordHandler : IKinesisEventRecordHandler // (1)! +{ + public async Task HandleAsync(KinesisEvent.KinesisEventRecord record, CancellationToken cancellationToken) + { + var product = JsonSerializer.Deserialize(record.Kinesis.Data); + + if (product.Id == 4) // (2)! + { + throw new ArgumentException("Error on id 4"); + } + + return await Task.FromResult(RecordHandlerResult.None); // (3)! + } +} + + +[BatchProcessor(RecordHandler = typeof(CustomKinesisEventRecordHandler))] +public BatchItemFailuresResponse HandlerUsingAttribute(KinesisEvent _) +{ + return KinesisEventBatchProcessor.Result.BatchItemFailuresResponse; // (4)! +} +// --8<-- [end:kinesis_handler_decorator_traditional] + +// --8<-- [start:dynamodb_typed_handler_decorator] +public class Customer +{ + public string? CustomerId { get; set; } + public string? Name { get; set; } + public string? Email { get; set; } + public DateTime CreatedAt { get; set; } +} + +internal class TypedDynamoDbRecordHandler : ITypedRecordHandler // (1)! +{ + public async Task HandleAsync(Customer customer, CancellationToken cancellationToken) + { + Logger.LogInformation($"Processing customer {customer.CustomerId} - {customer.Name}"); + + if (string.IsNullOrEmpty(customer.Email)) // (2)! + { + throw new ArgumentException("Customer email is required"); + } + + return await Task.FromResult(RecordHandlerResult.None); // (3)! + } +} + +[BatchProcessor(TypedRecordHandler = typeof(TypedDynamoDbRecordHandler))] +public BatchItemFailuresResponse HandlerUsingTypedAttribute(DynamoDBEvent _) +{ + return TypedDynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse; // (4)! +} +// --8<-- [end:dynamodb_typed_handler_decorator] + +// --8<-- [start:dynamodb_handler_decorator_traditional] +internal class CustomDynamoDbStreamRecordHandler : IDynamoDbStreamRecordHandler // (1)! +{ + public async Task HandleAsync(DynamoDBEvent.DynamodbStreamRecord record, CancellationToken cancellationToken) + { + var product = JsonSerializer.Deserialize(record.Dynamodb.NewImage["Product"].S); + + if (product.Id == 4) // (2)! + { + throw new ArgumentException("Error on id 4"); + } + + return await Task.FromResult(RecordHandlerResult.None); // (3)! + } +} + + +[BatchProcessor(RecordHandler = typeof(CustomDynamoDbStreamRecordHandler))] +public BatchItemFailuresResponse HandlerUsingAttribute(DynamoDBEvent _) +{ + return DynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse; // (4)! +} +// --8<-- [end:dynamodb_handler_decorator_traditional] + +// --8<-- [start:using_utility_outside_decorator] +public async Task HandlerUsingUtility(DynamoDBEvent dynamoDbEvent) +{ + var result = await DynamoDbStreamBatchProcessor.Instance.ProcessAsync(dynamoDbEvent, RecordHandler.From(record => + { + var product = JsonSerializer.Deserialize(record.Dynamodb.NewImage["Product"].S); + + if (product.GetProperty("Id").GetInt16() == 4) + { + throw new ArgumentException("Error on 4"); + } + })); + return result.BatchItemFailuresResponse; +} +// --8<-- [end:using_utility_outside_decorator] + +// --8<-- [start:using_utility_from_ioc_getrequiredservice] +public async Task HandlerUsingUtilityFromIoc(DynamoDBEvent dynamoDbEvent) +{ + var batchProcessor = Services.Provider.GetRequiredService(); + var recordHandler = Services.Provider.GetRequiredService(); + var result = await batchProcessor.ProcessAsync(dynamoDbEvent, recordHandler); + return result.BatchItemFailuresResponse; +} +// --8<-- [end:using_utility_from_ioc_getrequiredservice] + +// --8<-- [start:using_utility_from_ioc_injected_parameters] +public async Task HandlerUsingUtilityFromIoc(DynamoDBEvent dynamoDbEvent, + IDynamoDbStreamBatchProcessor batchProcessor, IDynamoDbStreamRecordHandler recordHandler) +{ + var result = await batchProcessor.ProcessAsync(dynamoDbEvent, recordHandler); + return result.BatchItemFailuresResponse; +} +// --8<-- [end:using_utility_from_ioc_injected_parameters] + +// --8<-- [start:example_implementation_of_iserviceprovider] +internal class Services +{ + private static readonly Lazy LazyInstance = new(Build); + + private static ServiceCollection _services; + public static IServiceProvider Provider => LazyInstance.Value; + + public static IServiceProvider Init() + { + return LazyInstance.Value; + } + + private static IServiceProvider Build() + { + _services = new ServiceCollection(); + _services.AddScoped(); + _services.AddScoped(); + return _services.BuildServiceProvider(); + } +} +// --8<-- [end:example_implementation_of_iserviceprovider] + +// --8<-- [start:processing_messages_in_parallel] +[BatchProcessor(RecordHandler = typeof(CustomDynamoDbStreamRecordHandler), BatchParallelProcessingEnabled = true )] +public BatchItemFailuresResponse HandlerUsingAttribute(DynamoDBEvent _) +{ + return DynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse; +} +// --8<-- [end:processing_messages_in_parallel] diff --git a/examples/BatchProcessing/snippets/GettingStartedWithSqs.cs b/examples/BatchProcessing/snippets/GettingStartedWithSqs.cs new file mode 100644 index 00000000..ebddfe09 --- /dev/null +++ b/examples/BatchProcessing/snippets/GettingStartedWithSqs.cs @@ -0,0 +1,143 @@ +// This file is referenced by docs/utilities/batch-processing.md +// via pymdownx.snippets (mkdocs). + +namespace AWS.Lambda.Powertools.Docs.Snippets.BatchProcessing; + +// --8<-- [start:sqs_typed_handler_decorator] + public class Product + { + public int Id { get; set; } + public string? Name { get; set; } + public decimal Price { get; set; } + } + + public class TypedSqsRecordHandler : ITypedRecordHandler // (1)! + { + public async Task HandleAsync(Product product, CancellationToken cancellationToken) + { + /* + * Your business logic with automatic deserialization. + * If an exception is thrown, the item will be marked as a partial batch item failure. + */ + + Logger.LogInformation($"Processing product {product.Id} - {product.Name} (${product.Price})"); + + if (product.Id == 4) // (2)! + { + throw new ArgumentException("Error on id 4"); + } + + return await Task.FromResult(RecordHandlerResult.None); // (3)! + } + + } + + [BatchProcessor(TypedRecordHandler = typeof(TypedSqsRecordHandler))] + public BatchItemFailuresResponse HandlerUsingTypedAttribute(SQSEvent _) + { + return TypedSqsBatchProcessor.Result.BatchItemFailuresResponse; // (4)! + } +// --8<-- [end:sqs_typed_handler_decorator] + +// --8<-- [start:sqs_handler_decorator_traditional] + public class CustomSqsRecordHandler : ISqsRecordHandler // (1)! + { + public async Task HandleAsync(SQSEvent.SQSMessage record, CancellationToken cancellationToken) + { + /* + * Your business logic. + * If an exception is thrown, the item will be marked as a partial batch item failure. + */ + + var product = JsonSerializer.Deserialize(record.Body); + + if (product.Id == 4) // (2)! + { + throw new ArgumentException("Error on id 4"); + } + + return await Task.FromResult(RecordHandlerResult.None); // (3)! + } + + } + + [BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler))] + public BatchItemFailuresResponse HandlerUsingAttribute(SQSEvent _) + { + return SqsBatchProcessor.Result.BatchItemFailuresResponse; // (4)! + } +// --8<-- [end:sqs_handler_decorator_traditional] + +// --8<-- [start:typed_handler_with_context] +public class ProductHandlerWithContext : ITypedRecordHandlerWithContext +{ + public async Task HandleAsync(Product product, ILambdaContext context, CancellationToken cancellationToken) + { + Logger.LogInformation($"Processing product {product.Id} in request {context.AwsRequestId}"); + Logger.LogInformation($"Remaining time: {context.RemainingTime.TotalSeconds}s"); + + // Use context for timeout handling + if (context.RemainingTime.TotalSeconds < 5) + { + Logger.LogWarning("Low remaining time, processing quickly"); + } + + return RecordHandlerResult.None; + } +} +// --8<-- [end:typed_handler_with_context] + +// --8<-- [start:function_usage_with_context] +[BatchProcessor(TypedRecordHandlerWithContext = typeof(ProductHandlerWithContext))] +public BatchItemFailuresResponse ProcessWithContext(SQSEvent sqsEvent, ILambdaContext context) +{ + return TypedSqsBatchProcessor.Result.BatchItemFailuresResponse; +} +// --8<-- [end:function_usage_with_context] + +// --8<-- [start:migration_before_traditional] +public class TraditionalSqsHandler : ISqsRecordHandler +{ + public async Task HandleAsync(SQSEvent.SQSMessage record, CancellationToken cancellationToken) + { + // Manual deserialization + var product = JsonSerializer.Deserialize(record.Body); + + Logger.LogInformation($"Processing product {product.Id}"); + + if (product.Price < 0) + throw new ArgumentException("Invalid price"); + + return RecordHandlerResult.None; + } +} + +[BatchProcessor(RecordHandler = typeof(TraditionalSqsHandler))] +public BatchItemFailuresResponse ProcessSqs(SQSEvent sqsEvent) +{ + return SqsBatchProcessor.Result.BatchItemFailuresResponse; +} +// --8<-- [end:migration_before_traditional] + +// --8<-- [start:migration_after_typed] +public class TypedSqsHandler : ITypedRecordHandler +{ + public async Task HandleAsync(Product product, CancellationToken cancellationToken) + { + // Automatic deserialization - product is already deserialized! + Logger.LogInformation($"Processing product {product.Id}"); + + // Same business logic + if (product.Price < 0) + throw new ArgumentException("Invalid price"); + + return RecordHandlerResult.None; + } +} + +[BatchProcessor(TypedRecordHandler = typeof(TypedSqsHandler))] +public BatchItemFailuresResponse ProcessSqs(SQSEvent sqsEvent) +{ + return TypedSqsBatchProcessor.Result.BatchItemFailuresResponse; +} +// --8<-- [end:migration_after_typed] diff --git a/examples/BatchProcessing/snippets/PartialFailureHandling.cs b/examples/BatchProcessing/snippets/PartialFailureHandling.cs new file mode 100644 index 00000000..094094d3 --- /dev/null +++ b/examples/BatchProcessing/snippets/PartialFailureHandling.cs @@ -0,0 +1,276 @@ +// This file is referenced by docs/utilities/batch-processing.md +// via pymdownx.snippets (mkdocs). + +namespace AWS.Lambda.Powertools.Docs.Snippets.BatchProcessing; + +// --8<-- [start:throw_on_full_batch_failure_decorator] + [BatchProcessor( + RecordHandler = typeof(CustomSqsRecordHandler), + ThrowOnFullBatchFailure = false)] + public BatchItemFailuresResponse HandlerUsingAttribute(SQSEvent _) + { + return SqsBatchProcessor.Result.BatchItemFailuresResponse; + } +// --8<-- [end:throw_on_full_batch_failure_decorator] + +// --8<-- [start:throw_on_full_batch_failure_outside_decorator] +public async Task HandlerUsingUtility(SQSEvent sqsEvent) +{ + var result = await SqsBatchProcessor.Instance.ProcessAsync(sqsEvent, RecordHandler.From(x => + { + // Inline handling of SQS message... + }), new ProcessingOptions + { + ThrowOnFullBatchFailure = false + }); + return result.BatchItemFailuresResponse; +} +// --8<-- [end:throw_on_full_batch_failure_outside_decorator] + +// --8<-- [start:extending_batch_processor] + +public class CustomDynamoDbStreamBatchProcessor : DynamoDbStreamBatchProcessor +{ + public override async Task> ProcessAsync(DynamoDBEvent @event, + IRecordHandler recordHandler, ProcessingOptions processingOptions) + { + ProcessingResult = new ProcessingResult(); + + // Prepare batch records (order is preserved) + var batchRecords = GetRecordsFromEvent(@event).Select(x => new KeyValuePair(GetRecordId(x), x)) + .ToArray(); + + // We assume all records fail by default to avoid loss of data + var failureBatchRecords = batchRecords.Select(x => new KeyValuePair>(x.Key, + new RecordFailure + { + Exception = new UnprocessedRecordException($"Record: '{x.Key}' has not been processed."), + Record = x.Value + })); + + // Override to fail on first failure + var errorHandlingPolicy = BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure; + + var successRecords = new Dictionary>(); + var failureRecords = new Dictionary>(failureBatchRecords); + + try + { + foreach (var pair in batchRecords) + { + var (recordId, record) = pair; + + try + { + var result = await HandleRecordAsync(record, recordHandler, CancellationToken.None); + failureRecords.Remove(recordId, out _); + successRecords.TryAdd(recordId, new RecordSuccess + { + Record = record, + RecordId = recordId, + HandlerResult = result + }); + } + catch (Exception ex) + { + // Capture exception + failureRecords[recordId] = new RecordFailure + { + Exception = new RecordProcessingException( + $"Failed processing record: '{recordId}'. See inner exception for details.", ex), + Record = record, + RecordId = recordId + }; + + Metrics.AddMetric("BatchRecordFailures", 1, MetricUnit.Count); + + try + { + // Invoke hook + await HandleRecordFailureAsync(record, ex); + } + catch + { + // NOOP + } + + // Check if we should stop record processing on first error + // ReSharper disable once ConditionIsAlwaysTrueOrFalse + if (errorHandlingPolicy == BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure) + { + // This causes the loop's (inner) cancellation token to be cancelled for all operations already scheduled internally + throw new CircuitBreakerException( + "Error handling policy is configured to stop processing on first batch item failure. See inner exception for details.", + ex); + } + } + } + } + catch (Exception ex) when (ex is CircuitBreakerException or OperationCanceledException) + { + // NOOP + } + + ProcessingResult.BatchRecords.AddRange(batchRecords.Select(x => x.Value)); + ProcessingResult.BatchItemFailuresResponse.BatchItemFailures.AddRange(failureRecords.Select(x => + new BatchItemFailuresResponse.BatchItemFailure + { + ItemIdentifier = x.Key + })); + ProcessingResult.FailureRecords.AddRange(failureRecords.Values); + + ProcessingResult.SuccessRecords.AddRange(successRecords.Values); + + return ProcessingResult; + } + + // ReSharper disable once RedundantOverriddenMember + protected override async Task HandleRecordFailureAsync(DynamoDBEvent.DynamodbStreamRecord record, Exception exception) + { + await base.HandleRecordFailureAsync(record, exception); + } +} +// --8<-- [end:extending_batch_processor] + +// --8<-- [start:typed_handler_test] +[Fact] +public async Task TypedHandler_ValidProduct_ProcessesSuccessfully() +{ + // Arrange + var product = new Product { Id = 1, Name = "Test Product", Price = 10.99m }; + var handler = new TypedSqsRecordHandler(); + var cancellationToken = CancellationToken.None; + + // Act + var result = await handler.HandleAsync(product, cancellationToken); + + // Assert + Assert.Equal(RecordHandlerResult.None, result); +} + +[Fact] +public async Task TypedHandler_InvalidProduct_ThrowsException() +{ + // Arrange + var product = new Product { Id = 4, Name = "Invalid", Price = -10 }; + var handler = new TypedSqsRecordHandler(); + + // Act & Assert + await Assert.ThrowsAsync(() => + handler.HandleAsync(product, CancellationToken.None)); +} +// --8<-- [end:typed_handler_test] + +// --8<-- [start:integration_test] +[Fact] +public async Task ProcessSqsEvent_WithTypedHandler_ProcessesAllRecords() +{ + // Arrange + var sqsEvent = new SQSEvent + { + Records = new List + { + new() { + MessageId = "1", + Body = JsonSerializer.Serialize(new Product { Id = 1, Name = "Product 1", Price = 10 }), + EventSourceArn = "arn:aws:sqs:us-east-1:123456789012:my-queue" + }, + new() { + MessageId = "2", + Body = JsonSerializer.Serialize(new Product { Id = 2, Name = "Product 2", Price = 20 }), + EventSourceArn = "arn:aws:sqs:us-east-1:123456789012:my-queue" + } + } + }; + + var function = new TypedFunction(); + + // Act + var result = function.HandlerUsingTypedAttribute(sqsEvent); + + // Assert + Assert.Empty(result.BatchItemFailures); +} +// --8<-- [end:integration_test] + +// --8<-- [start:traditional_handler_test] +[Fact] +public Task Sqs_Handler_Using_Attribute() +{ + var request = new SQSEvent + { + Records = TestHelper.SqsMessages + }; + + var function = new HandlerFunction(); + + var response = function.HandlerUsingAttribute(request); + + Assert.Equal(2, response.BatchItemFailures.Count); + Assert.Equal("2", response.BatchItemFailures[0].ItemIdentifier); + Assert.Equal("4", response.BatchItemFailures[1].ItemIdentifier); + + return Task.CompletedTask; +} +// --8<-- [end:traditional_handler_test] + +// --8<-- [start:function_handler_using_attribute] +[BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler))] +public BatchItemFailuresResponse HandlerUsingAttribute(SQSEvent _) +{ + return SqsBatchProcessor.Result.BatchItemFailuresResponse; +} +// --8<-- [end:function_handler_using_attribute] + +// --8<-- [start:custom_sqs_record_handler] +public class CustomSqsRecordHandler : ISqsRecordHandler +{ + public async Task HandleAsync(SQSEvent.SQSMessage record, CancellationToken cancellationToken) + { + var product = JsonSerializer.Deserialize(record.Body); + + if (product.GetProperty("Id").GetInt16() == 4) + { + throw new ArgumentException("Error on 4"); + } + + return await Task.FromResult(RecordHandlerResult.None); + } +} +// --8<-- [end:custom_sqs_record_handler] + +// --8<-- [start:sqs_event_test_helper] +internal static List SqsMessages => new() +{ + new SQSEvent.SQSMessage + { + MessageId = "1", + Body = "{\"Id\":1,\"Name\":\"product-4\",\"Price\":14}", + EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue" + }, + new SQSEvent.SQSMessage + { + MessageId = "2", + Body = "fail", + EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue" + }, + new SQSEvent.SQSMessage + { + MessageId = "3", + Body = "{\"Id\":3,\"Name\":\"product-4\",\"Price\":14}", + EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue" + }, + new SQSEvent.SQSMessage + { + MessageId = "4", + Body = "{\"Id\":4,\"Name\":\"product-4\",\"Price\":14}", + EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue" + }, + new SQSEvent.SQSMessage + { + MessageId = "5", + Body = "{\"Id\":5,\"Name\":\"product-4\",\"Price\":14}", + EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue" + }, +}; +// --8<-- [end:sqs_event_test_helper] diff --git a/examples/Idempotency/snippets/AdvancedConfiguration.cs b/examples/Idempotency/snippets/AdvancedConfiguration.cs new file mode 100644 index 00000000..38cf337d --- /dev/null +++ b/examples/Idempotency/snippets/AdvancedConfiguration.cs @@ -0,0 +1,140 @@ +// This file is referenced by docs/utilities/idempotency.md +// via pymdownx.snippets (mkdocs). + +namespace AWS.Lambda.Powertools.Docs.Snippets.Idempotency; + +// --8<-- [start:register_lambda_context] + public class Function + { + public Function() + { + Idempotency.Configure(builder => builder.UseDynamoDb("idempotency_table")); + } + + public Task FunctionHandler(string input, ILambdaContext context) + { + Idempotency.RegisterLambdaContext(context); + MyInternalMethod("hello", "world"); + return Task.FromResult(input.ToUpper()); + } + + [Idempotent] + private string MyInternalMethod(string argOne, [IdempotencyKey] string argTwo) { + return "something"; + } + } +// --8<-- [end:register_lambda_context] + +// --8<-- [start:exception_not_affecting_record] + public class Function + { + public Function() + { + Idempotency.Configure(builder => builder.UseDynamoDb("idempotency_table")); + } + + public Task FunctionHandler(string input, ILambdaContext context) + { + Idempotency.RegisterLambdaContext(context); + // If an exception is thrown here, no idempotent record will ever get created as the + // idempotent method does not get called + + MyInternalMethod("hello", "world"); + + // This exception will not cause the idempotent record to be deleted, since it + // happens after the decorated method has been successfully called + throw new Exception(); + } + + [Idempotent] + private string MyInternalMethod(string argOne, [IdempotencyKey] string argTwo) { + return "something"; + } + } +// --8<-- [end:exception_not_affecting_record] + +// --8<-- [start:idempotency_options_builder] +new IdempotencyOptionsBuilder() + .WithEventKeyJmesPath("id") + .WithPayloadValidationJmesPath("paymentId") + .WithThrowOnNoIdempotencyKey(true) + .WithExpiration(TimeSpan.FromMinutes(1)) + .WithUseLocalCache(true) + .WithHashFunction("MD5") + .Build(); +// --8<-- [end:idempotency_options_builder] + +// --8<-- [start:enable_local_cache] + new IdempotencyOptionsBuilder() + .WithUseLocalCache(true) + .Build(); +// --8<-- [end:enable_local_cache] + +// --8<-- [start:response_hook] +Idempotency.Config() + .WithConfig(IdempotencyOptions.Builder() + .WithEventKeyJmesPath("powertools_json(body).address") + .WithResponseHook((responseData, dataRecord) => { + if (responseData is APIGatewayProxyResponse proxyResponse) + { + proxyResponse.Headers ??= new Dictionary(); + proxyResponse.Headers["x-idempotency-response"] = "true"; + proxyResponse.Headers["x-idempotency-expiration"] = dataRecord.ExpiryTimestamp.ToString(); + return proxyResponse; + } + return responseData; + }) + .Build()) + .WithPersistenceStore(DynamoDBPersistenceStore.Builder() + .WithTableName(Environment.GetEnvironmentVariable("IDEMPOTENCY_TABLE")) + .Build()) + .Configure(); +// --8<-- [end:response_hook] + +// --8<-- [start:with_json_serialization_context] +Idempotency.Configure(builder => + builder.WithJsonSerializationContext(LambdaFunctionJsonSerializerContext.Default)); +// --8<-- [end:with_json_serialization_context] + +// --8<-- [start:with_json_serialization_context_full_example] +public static class Function +{ + private static async Task Main() + { + var tableName = Environment.GetEnvironmentVariable("IDEMPOTENCY_TABLE_NAME"); + Idempotency.Configure(builder => + builder + .WithJsonSerializationContext(LambdaFunctionJsonSerializerContext.Default) + .WithOptions(optionsBuilder => optionsBuilder + .WithExpiration(TimeSpan.FromHours(1))) + .UseDynamoDb(storeBuilder => storeBuilder + .WithTableName(tableName) + )); + + Func handler = FunctionHandler; + await LambdaBootstrapBuilder.Create(handler, + new SourceGeneratorLambdaJsonSerializer()) + .Build() + .RunAsync(); + } + + [Idempotent] + public static APIGatewayProxyResponse FunctionHandler(APIGatewayProxyRequest apigwProxyEvent, + ILambdaContext context) + { + return new APIGatewayProxyResponse + { + Body = JsonSerializer.Serialize(response, typeof(Response), LambdaFunctionJsonSerializerContext.Default), + StatusCode = 200, + Headers = new Dictionary { { "Content-Type", "application/json" } } + }; + } +} + +[JsonSerializable(typeof(APIGatewayProxyRequest))] +[JsonSerializable(typeof(APIGatewayProxyResponse))] +[JsonSerializable(typeof(Response))] +public partial class LambdaFunctionJsonSerializerContext : JsonSerializerContext +{ +} +// --8<-- [end:with_json_serialization_context_full_example] diff --git a/examples/Idempotency/snippets/CustomIdempotencyKey.cs b/examples/Idempotency/snippets/CustomIdempotencyKey.cs new file mode 100644 index 00000000..471e262d --- /dev/null +++ b/examples/Idempotency/snippets/CustomIdempotencyKey.cs @@ -0,0 +1,50 @@ +// This file is referenced by docs/utilities/idempotency.md +// via pymdownx.snippets (mkdocs). + +// --8<-- [start:event_key_jmespath_payment] + Idempotency.Configure(builder => + builder + .WithOptions(optionsBuilder => + optionsBuilder.WithEventKeyJmesPath("powertools_json(Body).[\"user_id\", \"product_id\"]")) + .UseDynamoDb("idempotency_table")); +// --8<-- [end:event_key_jmespath_payment] + +namespace AWS.Lambda.Powertools.Docs.Snippets.Idempotency +{ +// --8<-- [start:custom_key_prefix] +public class Function +{ + public Function() + { + var tableName = Environment.GetEnvironmentVariable("IDEMPOTENCY_TABLE_NAME"); + Idempotency.Configure(builder => builder.UseDynamoDb(tableName)); + } + + [Idempotent(KeyPrefix = "MyCustomKeyPrefix")] + public APIGatewayProxyResponse FunctionHandler(APIGatewayProxyRequest apigwProxyEvent, ILambdaContext context) + { + return TestHelper.TestMethod(apigwProxyEvent); + } +} +// --8<-- [end:custom_key_prefix] + +// --8<-- [start:throw_on_no_idempotency_key] + public App() + { + Idempotency.Configure(builder => + builder + .WithOptions(optionsBuilder => + optionsBuilder + // Requires "user"."uid" and "orderId" to be present + .WithEventKeyJmesPath("[user.uid, orderId]") + .WithThrowOnNoIdempotencyKey(true)) + .UseDynamoDb("TABLE_NAME")); + } + + [Idempotent] + public Task FunctionHandler(Order input, ILambdaContext context) + { + // ... + } +// --8<-- [end:throw_on_no_idempotency_key] +} diff --git a/examples/Idempotency/snippets/DynamoDbConfiguration.cs b/examples/Idempotency/snippets/DynamoDbConfiguration.cs new file mode 100644 index 00000000..bca2f063 --- /dev/null +++ b/examples/Idempotency/snippets/DynamoDbConfiguration.cs @@ -0,0 +1,37 @@ +// This file is referenced by docs/utilities/idempotency.md +// via pymdownx.snippets (mkdocs). + +// --8<-- [start:dynamodb_persistence_store_builder] +new DynamoDBPersistenceStoreBuilder() + .WithTableName("TABLE_NAME") + .WithKeyAttr("idempotency_key") + .WithExpiryAttr("expires_at") + .WithStatusAttr("current_status") + .WithDataAttr("result_data") + .WithValidationAttr("validation_key") + .WithInProgressExpiryAttr("in_progress_expires_at") + .Build() +// --8<-- [end:dynamodb_persistence_store_builder] + +// --8<-- [start:custom_amazon_dynamodb_client] + public Function() + { + AmazonDynamoDBClient customClient = new AmazonDynamoDBClient(RegionEndpoint.APSouth1); + + Idempotency.Configure(builder => + builder.UseDynamoDb(storeBuilder => + storeBuilder. + WithTableName("TABLE_NAME") + .WithDynamoDBClient(customClient) + )); + } +// --8<-- [end:custom_amazon_dynamodb_client] + +// --8<-- [start:dynamodb_composite_primary_key] + Idempotency.Configure(builder => + builder.UseDynamoDb(storeBuilder => + storeBuilder. + WithTableName("TABLE_NAME") + .WithSortKeyAttr("sort_key") + )); +// --8<-- [end:dynamodb_composite_primary_key] diff --git a/examples/Idempotency/snippets/ExpirationSettings.cs b/examples/Idempotency/snippets/ExpirationSettings.cs new file mode 100644 index 00000000..2ee75af7 --- /dev/null +++ b/examples/Idempotency/snippets/ExpirationSettings.cs @@ -0,0 +1,8 @@ +// This file is referenced by docs/utilities/idempotency.md +// via pymdownx.snippets (mkdocs). + +// --8<-- [start:customizing_expiration_time] +new IdempotencyOptionsBuilder() + .WithExpiration(TimeSpan.FromMinutes(5)) + .Build() +// --8<-- [end:customizing_expiration_time] diff --git a/examples/Idempotency/snippets/GettingStartedBasic.cs b/examples/Idempotency/snippets/GettingStartedBasic.cs new file mode 100644 index 00000000..9e439148 --- /dev/null +++ b/examples/Idempotency/snippets/GettingStartedBasic.cs @@ -0,0 +1,41 @@ +// This file is referenced by docs/utilities/idempotency.md +// via pymdownx.snippets (mkdocs). + +namespace AWS.Lambda.Powertools.Docs.Snippets.Idempotency; + +// --8<-- [start:idempotent_attribute] + public class Function + { + public Function() + { + Idempotency.Configure(builder => builder.UseDynamoDb("idempotency_table")); + } + + [Idempotent] + public Task FunctionHandler(string input, ILambdaContext context) + { + return Task.FromResult(input.ToUpper()); + } + } +// --8<-- [end:idempotent_attribute] + +// --8<-- [start:idempotent_attribute_on_another_method] + public class Function + { + public Function() + { + Idempotency.Configure(builder => builder.UseDynamoDb("idempotency_table")); + } + + public Task FunctionHandler(string input, ILambdaContext context) + { + MyInternalMethod("hello", "world"); + return Task.FromResult(input.ToUpper()); + } + + [Idempotent] + private string MyInternalMethod(string argOne, [IdempotencyKey] string argTwo) { + return "something"; + } + } +// --8<-- [end:idempotent_attribute_on_another_method] diff --git a/examples/Idempotency/snippets/PayloadValidation.cs b/examples/Idempotency/snippets/PayloadValidation.cs new file mode 100644 index 00000000..3aaabb9e --- /dev/null +++ b/examples/Idempotency/snippets/PayloadValidation.cs @@ -0,0 +1,12 @@ +// This file is referenced by docs/utilities/idempotency.md +// via pymdownx.snippets (mkdocs). + +// --8<-- [start:payload_validation_jmespath] + Idempotency.Configure(builder => + builder + .WithOptions(optionsBuilder => + optionsBuilder + .WithEventKeyJmesPath("[userDetail, productId]") + .WithPayloadValidationJmesPath("amount")) + .UseDynamoDb("TABLE_NAME")); +// --8<-- [end:payload_validation_jmespath] diff --git a/mkdocs.yml b/mkdocs.yml index 9e2da2cf..c4df475c 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -83,6 +83,7 @@ markdown_extensions: - pymdownx.snippets: base_path: "." check_paths: True + dedent_subsections: true - meta - toc: permalink: true