Skip to content

Commit 44785df

Browse files
wangyb-AAlex Wang
andauthored
feat: add serdes for map and parallel (#8)
Co-authored-by: Alex Wang <wangyb@amazon.com>
1 parent 785f3ba commit 44785df

File tree

6 files changed

+133
-6
lines changed

6 files changed

+133
-6
lines changed

src/aws_durable_execution_sdk_python/concurrency.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
from aws_durable_execution_sdk_python.config import ChildConfig, CompletionConfig
2727
from aws_durable_execution_sdk_python.lambda_service import OperationSubType
28+
from aws_durable_execution_sdk_python.serdes import SerDes
2829
from aws_durable_execution_sdk_python.state import ExecutionState
2930
from aws_durable_execution_sdk_python.types import DurableContext
3031

@@ -478,6 +479,7 @@ def __init__(
478479
sub_type_top: OperationSubType,
479480
sub_type_iteration: OperationSubType,
480481
name_prefix: str,
482+
serdes: SerDes | None,
481483
):
482484
self.executables = executables
483485
self.max_concurrency = max_concurrency
@@ -508,6 +510,7 @@ def __init__(
508510
tolerated_failure_percentage,
509511
)
510512
self.executables_with_state: list[ExecutableWithState] = []
513+
self.serdes = serdes
511514

512515
@abstractmethod
513516
def execute_item(
@@ -697,7 +700,7 @@ def execute_in_child_context(child_context: DurableContext) -> ResultType:
697700
return run_in_child_context(
698701
execute_in_child_context,
699702
f"{self.name_prefix}{executable.index}",
700-
ChildConfig(sub_type=self.sub_type_iteration),
703+
ChildConfig(serdes=self.serdes, sub_type=self.sub_type_iteration),
701704
)
702705

703706

src/aws_durable_execution_sdk_python/operation/map.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
if TYPE_CHECKING:
1818
from aws_durable_execution_sdk_python.config import ChildConfig
19+
from aws_durable_execution_sdk_python.serdes import SerDes
1920
from aws_durable_execution_sdk_python.state import ExecutionState
2021
from aws_durable_execution_sdk_python.types import DurableContext
2122

@@ -38,6 +39,7 @@ def __init__(
3839
top_level_sub_type: OperationSubType,
3940
iteration_sub_type: OperationSubType,
4041
name_prefix: str,
42+
serdes: SerDes | None,
4143
):
4244
super().__init__(
4345
executables=executables,
@@ -46,6 +48,7 @@ def __init__(
4648
sub_type_top=top_level_sub_type,
4749
sub_type_iteration=iteration_sub_type,
4850
name_prefix=name_prefix,
51+
serdes=serdes,
4952
)
5053
self.items = items
5154

@@ -69,6 +72,7 @@ def from_items(
6972
top_level_sub_type=OperationSubType.MAP,
7073
iteration_sub_type=OperationSubType.MAP_ITERATION,
7174
name_prefix="map-item-",
75+
serdes=config.serdes,
7276
)
7377

7478
def execute_item(self, child_context, executable: Executable[Callable]) -> R:

src/aws_durable_execution_sdk_python/operation/parallel.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
if TYPE_CHECKING:
1414
from aws_durable_execution_sdk_python.concurrency import BatchResult
1515
from aws_durable_execution_sdk_python.config import ChildConfig
16+
from aws_durable_execution_sdk_python.serdes import SerDes
1617
from aws_durable_execution_sdk_python.state import ExecutionState
1718
from aws_durable_execution_sdk_python.types import DurableContext
1819

@@ -31,6 +32,7 @@ def __init__(
3132
top_level_sub_type: OperationSubType,
3233
iteration_sub_type: OperationSubType,
3334
name_prefix: str,
35+
serdes: SerDes | None,
3436
):
3537
super().__init__(
3638
executables=executables,
@@ -39,6 +41,7 @@ def __init__(
3941
sub_type_top=top_level_sub_type,
4042
sub_type_iteration=iteration_sub_type,
4143
name_prefix=name_prefix,
44+
serdes=serdes,
4245
)
4346

4447
@classmethod
@@ -51,14 +54,14 @@ def from_callables(
5154
executables: list[Executable[Callable]] = [
5255
Executable(index=i, func=func) for i, func in enumerate(callables)
5356
]
54-
5557
return cls(
5658
executables=executables,
5759
max_concurrency=config.max_concurrency,
5860
completion_config=config.completion_config,
5961
top_level_sub_type=OperationSubType.PARALLEL,
6062
iteration_sub_type=OperationSubType.PARALLEL_BRANCH,
6163
name_prefix="parallel-branch-",
64+
serdes=config.serdes,
6265
)
6366

6467
def execute_item(self, child_context, executable: Executable[Callable]) -> R:

tests/concurrency_test.py

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -700,6 +700,7 @@ def execute_item(self, child_context, executable):
700700
sub_type_top="TOP",
701701
sub_type_iteration="ITER",
702702
name_prefix="test_",
703+
serdes=None,
703704
)
704705

705706
# Test basic properties
@@ -731,6 +732,7 @@ def execute_item(self, child_context, executable):
731732
sub_type_top="TOP",
732733
sub_type_iteration="ITER",
733734
name_prefix="test_",
735+
serdes=None,
734736
)
735737

736738
execution_state = Mock()
@@ -791,6 +793,7 @@ def execute_item(self, child_context, executable):
791793
sub_type_top="TOP",
792794
sub_type_iteration="ITER",
793795
name_prefix="test_",
796+
serdes=None,
794797
)
795798

796799
exe_state = ExecutableWithState(executables[0])
@@ -827,6 +830,7 @@ def execute_item(self, child_context, executable):
827830
sub_type_top="TOP",
828831
sub_type_iteration="ITER",
829832
name_prefix="test_",
833+
serdes=None,
830834
)
831835

832836
exe_state = ExecutableWithState(executables[0])
@@ -861,6 +865,7 @@ def execute_item(self, child_context, executable):
861865
sub_type_top="TOP",
862866
sub_type_iteration="ITER",
863867
name_prefix="test_",
868+
serdes=None,
864869
)
865870

866871
exe_state = ExecutableWithState(executables[0])
@@ -905,6 +910,7 @@ def failure_callable():
905910
sub_type_top="TOP",
906911
sub_type_iteration="ITER",
907912
name_prefix="test_",
913+
serdes=None,
908914
)
909915

910916
execution_state = Mock()
@@ -942,6 +948,7 @@ def execute_item(self, child_context, executable):
942948
sub_type_top="TOP",
943949
sub_type_iteration="ITER",
944950
name_prefix="test_",
951+
serdes=None,
945952
)
946953

947954
def mock_run_in_child_context(func, name, config):
@@ -990,6 +997,7 @@ def failure_callable():
990997
sub_type_top="TOP",
991998
sub_type_iteration="ITER",
992999
name_prefix="test_",
1000+
serdes=None,
9931001
)
9941002

9951003
execution_state = Mock()
@@ -1024,6 +1032,7 @@ def execute_item(self, child_context, executable):
10241032
sub_type_top="TOP",
10251033
sub_type_iteration="ITER",
10261034
name_prefix="test_",
1035+
serdes=None,
10271036
)
10281037

10291038
execution_state = Mock()
@@ -1068,6 +1077,7 @@ def execute_item(self, child_context, executable):
10681077
sub_type_top="TOP",
10691078
sub_type_iteration="ITER",
10701079
name_prefix="test_",
1080+
serdes=None,
10711081
)
10721082

10731083
execution_state = Mock()
@@ -1111,6 +1121,7 @@ def execute_item(self, child_context, executable):
11111121
sub_type_top="TOP",
11121122
sub_type_iteration="ITER",
11131123
name_prefix="test_",
1124+
serdes=None,
11141125
)
11151126

11161127
execution_state = Mock()
@@ -1182,6 +1193,7 @@ def execute_item(self, child_context, executable):
11821193
sub_type_top="TOP",
11831194
sub_type_iteration="ITER",
11841195
name_prefix="test_",
1196+
serdes=None,
11851197
)
11861198

11871199
execution_state = Mock()
@@ -1244,6 +1256,7 @@ def execute_item(self, child_context, executable):
12441256
sub_type_top="TOP",
12451257
sub_type_iteration="ITER",
12461258
name_prefix="test_",
1259+
serdes=None,
12471260
)
12481261

12491262
# Create executable with state in SUSPENDED_WITH_TIMEOUT
@@ -1281,6 +1294,7 @@ def execute_item(self, child_context, executable):
12811294
sub_type_top="TOP",
12821295
sub_type_iteration="ITER",
12831296
name_prefix="test_",
1297+
serdes=None,
12841298
)
12851299

12861300
# Create executable with state in SUSPENDED (indefinite)
@@ -1321,6 +1335,7 @@ def failure_callable():
13211335
sub_type_top="TOP",
13221336
sub_type_iteration="ITER",
13231337
name_prefix="test_",
1338+
serdes=None,
13241339
)
13251340

13261341
execution_state = Mock()
@@ -1379,6 +1394,7 @@ def execute_item(self, child_context, executable):
13791394
sub_type_top="TOP",
13801395
sub_type_iteration="ITER",
13811396
name_prefix="test_",
1397+
serdes=None,
13821398
)
13831399

13841400
# Create one with timed suspend and one with indefinite suspend
@@ -1419,6 +1435,7 @@ def execute_item(self, child_context, executable):
14191435
sub_type_top="TOP",
14201436
sub_type_iteration="ITER",
14211437
name_prefix="test_",
1438+
serdes=None,
14221439
)
14231440

14241441
# Create two with different timed suspends
@@ -1487,7 +1504,15 @@ def execute_item(self, child_context, executable):
14871504
tolerated_failure_percentage=None,
14881505
)
14891506

1490-
executor = TestExecutor(executables, 3, completion_config, "TOP", "ITER", "test_")
1507+
executor = TestExecutor(
1508+
executables=executables,
1509+
max_concurrency=3,
1510+
completion_config=completion_config,
1511+
sub_type_top="TOP",
1512+
sub_type_iteration="ITER",
1513+
name_prefix="test_",
1514+
serdes=None,
1515+
)
14911516

14921517
# Create three executables with different suspend times
14931518
exe_state1 = ExecutableWithState(executables[0])
@@ -1527,7 +1552,15 @@ def failure_callable():
15271552
min_successful=1, tolerated_failure_count=0, tolerated_failure_percentage=None
15281553
)
15291554

1530-
executor = TestExecutor(executables, 1, completion_config, "TOP", "ITER", "test_")
1555+
executor = TestExecutor(
1556+
executables=executables,
1557+
max_concurrency=1,
1558+
completion_config=completion_config,
1559+
sub_type_top="TOP",
1560+
sub_type_iteration="ITER",
1561+
name_prefix="test_",
1562+
serdes=None,
1563+
)
15311564

15321565
execution_state = Mock()
15331566
execution_state.create_checkpoint = Mock()
@@ -1580,7 +1613,15 @@ def success_callable():
15801613
tolerated_failure_percentage=None,
15811614
)
15821615

1583-
executor = TestExecutor(executables, 1, completion_config, "TOP", "ITER", "test_")
1616+
executor = TestExecutor(
1617+
executables=executables,
1618+
max_concurrency=1,
1619+
completion_config=completion_config,
1620+
sub_type_top="TOP",
1621+
sub_type_iteration="ITER",
1622+
name_prefix="test_",
1623+
serdes=None,
1624+
)
15841625

15851626
execution_state = Mock()
15861627
execution_state.create_checkpoint = Mock()
@@ -1613,7 +1654,15 @@ def suspend_callable():
16131654
tolerated_failure_percentage=None,
16141655
)
16151656

1616-
executor = TestExecutor(executables, 1, completion_config, "TOP", "ITER", "test_")
1657+
executor = TestExecutor(
1658+
executables=executables,
1659+
max_concurrency=1,
1660+
completion_config=completion_config,
1661+
sub_type_top="TOP",
1662+
sub_type_iteration="ITER",
1663+
name_prefix="test_",
1664+
serdes=None,
1665+
)
16171666

16181667
execution_state = Mock()
16191668
execution_state.create_checkpoint = Mock()

tests/operation/map_test.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
from aws_durable_execution_sdk_python.config import CompletionConfig, MapConfig
77
from aws_durable_execution_sdk_python.lambda_service import OperationSubType
88
from aws_durable_execution_sdk_python.operation.map import MapExecutor, map_handler
9+
from aws_durable_execution_sdk_python.serdes import serialize
10+
from tests.serdes_test import CustomStrSerDes
911

1012

1113
def test_map_executor_init():
@@ -21,6 +23,7 @@ def test_map_executor_init():
2123
top_level_sub_type=OperationSubType.MAP,
2224
iteration_sub_type=OperationSubType.MAP_ITERATION,
2325
name_prefix="test-",
26+
serdes=None,
2427
)
2528

2629
assert executor.items == items
@@ -275,3 +278,34 @@ class MockExecutionState:
275278
assert isinstance(call_args.kwargs["config"], MapConfig)
276279

277280
assert result == mock_batch_result
281+
282+
283+
def test_map_handler_with_serdes():
284+
"""Test that map_handler calls executor.execute method."""
285+
items = ["test_item"]
286+
287+
def callable_func(ctx, item, idx, items):
288+
return f"result_{item}"
289+
290+
# Mock the executor.execute method
291+
292+
def mock_run_in_child_context(func, name, config):
293+
return serialize(
294+
serdes=config.serdes,
295+
value=func("mock_context"),
296+
operation_id="op_id",
297+
durable_execution_arn="durable_execution_arn",
298+
)
299+
300+
class MockExecutionState:
301+
pass
302+
303+
execution_state = MockExecutionState()
304+
config = MapConfig(serdes=CustomStrSerDes())
305+
306+
result = map_handler(
307+
items, callable_func, config, execution_state, mock_run_in_child_context
308+
)
309+
310+
# Verify execute was called
311+
assert result.all[0].result == "RESULT_TEST_ITEM"

0 commit comments

Comments
 (0)