3434from sagemaker .core .remote_function .job import JOBS_CONTAINER_ENTRYPOINT
3535from sagemaker .core .s3 import s3_path_join
3636from sagemaker .core .helper .session_helper import Session
37- from sagemaker .core .common_utils import resolve_value_from_config , retry_with_backoff , format_tags , Tags
37+ from sagemaker .core .common_utils import (
38+ resolve_value_from_config ,
39+ retry_with_backoff ,
40+ format_tags ,
41+ Tags ,
42+ )
43+
3844# Orchestration imports (now in mlops)
3945from sagemaker .mlops .workflow .callback_step import CallbackOutput , CallbackStep
4046from sagemaker .mlops .workflow ._event_bridge_client_helper import (
4450 EXECUTION_TIME_PIPELINE_PARAMETER_FORMAT ,
4551)
4652from sagemaker .mlops .workflow .lambda_step import LambdaOutput , LambdaStep
53+ from sagemaker .mlops .workflow .mlflow_config import MlflowConfig
4754from sagemaker .core .helper .pipeline_variable import (
4855 RequestType ,
4956 PipelineVariable ,
5057)
58+
5159# Primitive imports (stay in core)
5260from sagemaker .core .workflow .execution_variables import ExecutionVariables
5361from sagemaker .core .workflow .parameters import Parameter
62+
5463# Orchestration imports (now in mlops)
5564from sagemaker .core .workflow .pipeline_definition_config import PipelineDefinitionConfig
5665from sagemaker .mlops .workflow .pipeline_experiment_config import PipelineExperimentConfig
5766from sagemaker .mlops .workflow .parallelism_config import ParallelismConfiguration
67+
5868# Primitive imports (stay in core)
5969from sagemaker .core .workflow .properties import Properties
70+
6071# Orchestration imports (now in mlops)
6172from sagemaker .mlops .workflow .selective_execution_config import SelectiveExecutionConfig
6273from sagemaker .core .workflow .step_outputs import StepOutput
@@ -87,6 +98,7 @@ def __init__(
8798 name : str = "" ,
8899 parameters : Optional [Sequence [Parameter ]] = None ,
89100 pipeline_experiment_config : Optional [PipelineExperimentConfig ] = _DEFAULT_EXPERIMENT_CFG ,
101+ mlflow_config : Optional [MlflowConfig ] = None ,
90102 steps : Optional [Sequence [Union [Step , StepOutput ]]] = None ,
91103 sagemaker_session : Optional [Session ] = None ,
92104 pipeline_definition_config : Optional [PipelineDefinitionConfig ] = _DEFAULT_DEFINITION_CFG ,
@@ -102,6 +114,8 @@ def __init__(
102114 the same name already exists. By default, pipeline name is used as
103115 experiment name and execution id is used as the trial name.
104116 If set to None, no experiment or trial will be created automatically.
117+ mlflow_config (Optional[MlflowConfig]): If set, the pipeline will be configured
118+ with MLflow tracking for experiment tracking and model versioning.
105119 steps (Sequence[Union[Step, StepOutput]]): The list of the
106120 non-conditional steps associated with the pipeline. Any steps that are within the
107121 `if_steps` or `else_steps` of a `ConditionStep` cannot be listed in the steps of a
@@ -118,6 +132,7 @@ def __init__(
118132 self .name = name
119133 self .parameters = parameters if parameters else []
120134 self .pipeline_experiment_config = pipeline_experiment_config
135+ self .mlflow_config = mlflow_config
121136 self .steps = steps if steps else []
122137 self .sagemaker_session = sagemaker_session if sagemaker_session else Session ()
123138 self .pipeline_definition_config = pipeline_definition_config
@@ -337,6 +352,7 @@ def start(
337352 execution_description : str = None ,
338353 parallelism_config : ParallelismConfiguration = None ,
339354 selective_execution_config : SelectiveExecutionConfig = None ,
355+ mlflow_experiment_name : str = None ,
340356 ):
341357 """Starts a Pipeline execution in the Workflow service.
342358
@@ -350,6 +366,10 @@ def start(
350366 over the parallelism configuration of the parent pipeline.
351367 selective_execution_config (Optional[SelectiveExecutionConfig]): The configuration for
352368 selective step execution.
369+ mlflow_experiment_name (str): Optional MLflow experiment name to override
370+ the experiment name specified in the pipeline's mlflow_config.
371+ If provided, this will override the experiment name for this specific
372+ pipeline execution only, without modifying the pipeline definition.
353373
354374 Returns:
355375 A `_PipelineExecution` instance, if successful.
@@ -371,6 +391,7 @@ def start(
371391 PipelineExecutionDisplayName = execution_display_name ,
372392 ParallelismConfiguration = parallelism_config ,
373393 SelectiveExecutionConfig = selective_execution_config ,
394+ MlflowExperimentName = mlflow_experiment_name ,
374395 )
375396 if self .sagemaker_session .local_mode :
376397 update_args (kwargs , PipelineParameters = parameters )
@@ -409,14 +430,25 @@ def definition(self) -> str:
409430 if self .pipeline_experiment_config is not None
410431 else None
411432 ),
433+ "MlflowConfig" : (
434+ self .mlflow_config .to_request () if self .mlflow_config is not None else None
435+ ),
412436 "Steps" : list_to_request (compiled_steps ),
413437 }
414-
415- request_dict ["PipelineExperimentConfig" ] = interpolate (
416- request_dict ["PipelineExperimentConfig" ], {}, {}, pipeline_name = self .name
417- )
418438 callback_output_to_step_map = _map_callback_outputs (self .steps )
419439 lambda_output_to_step_name = _map_lambda_outputs (self .steps )
440+ request_dict ["PipelineExperimentConfig" ] = interpolate (
441+ request_dict ["PipelineExperimentConfig" ],
442+ callback_output_to_step_map = callback_output_to_step_map ,
443+ lambda_output_to_step_map = lambda_output_to_step_name ,
444+ pipeline_name = self .name ,
445+ )
446+ request_dict ["MlflowConfig" ] = interpolate (
447+ request_dict ["MlflowConfig" ],
448+ callback_output_to_step_map = callback_output_to_step_map ,
449+ lambda_output_to_step_map = lambda_output_to_step_name ,
450+ pipeline_name = self .name ,
451+ )
420452 request_dict ["Steps" ] = interpolate (
421453 request_dict ["Steps" ],
422454 callback_output_to_step_map = callback_output_to_step_map ,
@@ -1036,6 +1068,7 @@ def get_function_step_result(
10361068 return deserialize_obj_from_s3 (
10371069 sagemaker_session = sagemaker_session ,
10381070 s3_uri = s3_uri ,
1071+ hmac_key = describe_training_job_response ["Environment" ]["REMOTE_FUNCTION_SECRET_KEY" ],
10391072 )
10401073
10411074 raise RemoteFunctionError (_ERROR_MSG_OF_STEP_INCOMPLETE )
@@ -1081,7 +1114,6 @@ def _initialize_adjacency_list(self) -> Dict[str, List[str]]:
10811114 if isinstance (child_step , Step ):
10821115 dependency_list [child_step .name ].add (step .name )
10831116
1084-
10851117 adjacency_list = {}
10861118 for step in dependency_list :
10871119 for step_dependency in dependency_list [step ]:
@@ -1119,9 +1151,7 @@ def is_cyclic_helper(current_step):
11191151 return True
11201152 return False
11211153
1122- def get_steps_in_sub_dag (
1123- self , current_step : Step , sub_dag_steps : Set [str ] = None
1124- ) -> Set [str ]:
1154+ def get_steps_in_sub_dag (self , current_step : Step , sub_dag_steps : Set [str ] = None ) -> Set [str ]:
11251155 """Get names of all steps (including current step) in the sub dag of current step.
11261156
11271157 Returns a set of step names in the sub dag.
@@ -1161,4 +1191,4 @@ def __next__(self) -> Step:
11611191
11621192 while self .stack :
11631193 return self .step_map .get (self .stack .pop ())
1164- raise StopIteration
1194+ raise StopIteration
0 commit comments