Skip to content

Commit 2968e1f

Browse files
committed
Support replays in Idempotency utility
Durable Functions introduces the concept of function replays. Previously, the idempotency utility would throw an "IdempotencyItemAlreadyExistsError", as the replay has the same payload of the initial execution. It appears as a duplicate, so is rejected. Now, a replay is allowed
1 parent 2c4f40f commit 2968e1f

File tree

4 files changed

+47
-13
lines changed

4 files changed

+47
-13
lines changed

aws_lambda_powertools/utilities/idempotency/base.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -123,10 +123,18 @@ def __init__(
123123

124124
self.persistence_store = persistence_store
125125

126-
def handle(self) -> Any:
126+
def handle(self, durable_mode: str | None = None) -> Any:
127127
"""
128128
Main entry point for handling idempotent execution of a function.
129129
130+
Parameters
131+
----------
132+
durable_mode : str | None, optional
133+
Mode for handling in-progress executions. Options:
134+
- "REPLAY_MODE": Allow replay of functions that are already in progress
135+
- "EXECUTION_MODE": Standard durable execution mode
136+
- None: Standard idempotency behavior (raises IdempotencyAlreadyInProgressError)
137+
130138
Returns
131139
-------
132140
Any
@@ -138,12 +146,12 @@ def handle(self) -> Any:
138146
# In most cases we can retry successfully on this exception.
139147
for i in range(MAX_RETRIES + 1): # pragma: no cover
140148
try:
141-
return self._process_idempotency()
149+
return self._process_idempotency(durable_mode)
142150
except IdempotencyInconsistentStateError:
143151
if i == MAX_RETRIES:
144152
raise # Bubble up when exceeded max tries
145153

146-
def _process_idempotency(self):
154+
def _process_idempotency(self, durable_mode: str | None):
147155
try:
148156
# We call save_inprogress first as an optimization for the most common case where no idempotent record
149157
# already exists. If it succeeds, there's no need to call get_record.
@@ -159,7 +167,8 @@ def _process_idempotency(self):
159167
# We give preference to ReturnValuesOnConditionCheckFailure because it is a faster and more cost-effective
160168
# way of retrieving the existing record after a failed conditional write operation.
161169
record = exc.old_data_record or self._get_idempotency_record()
162-
170+
if durable_mode == "REPLAY_MODE":
171+
return self._get_function_response()
163172
# If a record is found, handle it for status
164173
if record:
165174
return self._handle_for_status(record)

aws_lambda_powertools/utilities/idempotency/idempotency.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from aws_lambda_powertools.utilities.idempotency.persistence.base import (
2929
BasePersistenceLayer,
3030
)
31-
from aws_lambda_powertools.utilities.typing import LambdaContext
31+
from aws_lambda_powertools.utilities.typing import DurableContext, LambdaContext
3232

3333
from aws_lambda_powertools.warnings import PowertoolsUserWarning
3434

@@ -37,9 +37,9 @@
3737

3838
@lambda_handler_decorator
3939
def idempotent(
40-
handler: Callable[[Any, LambdaContext], Any],
40+
handler: Callable[[Any, LambdaContext | DurableContext], Any],
4141
event: dict[str, Any],
42-
context: LambdaContext,
42+
context: LambdaContext | DurableContext,
4343
persistence_store: BasePersistenceLayer,
4444
config: IdempotencyConfig | None = None,
4545
key_prefix: str | None = None,
@@ -55,7 +55,7 @@ def idempotent(
5555
event: dict
5656
Lambda's Event
5757
context: dict
58-
Lambda's Context
58+
Lambda's Context or Durable Context
5959
persistence_store: BasePersistenceLayer
6060
Instance of BasePersistenceLayer to store data
6161
config: IdempotencyConfig
@@ -91,7 +91,15 @@ def handler(event, context):
9191
return handler(event, context, **kwargs)
9292

9393
config = config or IdempotencyConfig()
94-
config.register_lambda_context(context)
94+
95+
if hasattr(context, "state"):
96+
# Extract lambda_context from DurableContext for idempotency tracking
97+
config.register_lambda_context(context.lambda_context)
98+
durable_mode = "REPLAY_MODE" if len(context.state.operations) > 1 else "EXECUTION_MODE"
99+
else:
100+
# Standard LambdaContext
101+
config.register_lambda_context(context)
102+
durable_mode = None
95103

96104
args = event, context
97105
idempotency_handler = IdempotencyHandler(
@@ -104,7 +112,7 @@ def handler(event, context):
104112
function_kwargs=kwargs,
105113
)
106114

107-
return idempotency_handler.handle()
115+
return idempotency_handler.handle(durable_mode=durable_mode)
108116

109117

110118
def idempotent_function(
@@ -193,6 +201,10 @@ def decorate(*args, **kwargs):
193201
f" Ensure this exists in your function's signature as well as the caller used it as a keyword argument",
194202
)
195203

204+
durable_mode = None
205+
if len(args) >= 2 and hasattr(args[1], "state"):
206+
durable_mode = "REPLAY_MODE" if len(args[1].state.operations) > 1 else "EXECUTION_MODE"
207+
196208
payload = kwargs.get(data_keyword_argument)
197209

198210
idempotency_handler = IdempotencyHandler(
@@ -206,6 +218,6 @@ def decorate(*args, **kwargs):
206218
function_kwargs=kwargs,
207219
)
208220

209-
return idempotency_handler.handle()
221+
return idempotency_handler.handle(durable_mode=durable_mode)
210222

211223
return cast(AnyCallableT, decorate)

aws_lambda_powertools/utilities/typing/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@
44
[`Typing`](../utilities/typing.md)
55
"""
66

7-
from .lambda_context import LambdaContext
7+
from .lambda_context import DurableContext, LambdaContext
88

9-
__all__ = ["LambdaContext"]
9+
__all__ = ["DurableContext", "LambdaContext"]

aws_lambda_powertools/utilities/typing/lambda_context.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,3 +93,16 @@ def tenant_id(self) -> str | None:
9393
def get_remaining_time_in_millis() -> int:
9494
"""Returns the number of milliseconds left before the execution times out."""
9595
return 0
96+
97+
98+
class DurableContext:
99+
_lambda_context: LambdaContext
100+
_state: object
101+
102+
@property
103+
def lambda_context(self) -> LambdaContext:
104+
return self._lambda_context
105+
106+
@property
107+
def state(self) -> object:
108+
return self._state

0 commit comments

Comments
 (0)