Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/WorkflowCore/Interface/IWorkflowModifier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -183,5 +183,15 @@ IStepBuilder<TData, Activity> Activity(string activityName, Expression<Func<TDat
/// <returns></returns>
IStepBuilder<TData, Activity> Activity(Expression<Func<TData, IStepExecutionContext, string>> activityName, Expression<Func<TData, object>> parameters = null,
Expression<Func<TData, DateTime>> effectiveDate = null, Expression<Func<TData, bool>> cancelCondition = null);

/// <summary>
/// Execute a sub-workflow
/// </summary>
/// <param name="subWorkflowId">Id of the sub-workflow to start</param>
/// <param name="parameters">The data to pass to the sub-workflow</param>
/// <param name="cancelCondition">A condition that when true will cancel this sub-workflow</param>
/// <returns></returns>
IStepBuilder<TData, SubWorkflowStepBody> SubWorkflow(string subWorkflowId, Expression<Func<TData, object>> parameters = null,
Expression<Func<TData, bool>> cancelCondition = null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace WorkflowCore.Models.LifeCycleEvents
{
public class SubWorkflowLifeCycleEvent : LifeCycleEvent
{

}
}
49 changes: 47 additions & 2 deletions src/WorkflowCore/Primitives/SubWorkflowStepBody.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,60 @@
using System;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using WorkflowCore.Models.LifeCycleEvents;

namespace WorkflowCore.Primitives
{
public class SubWorkflowStepBody : StepBody
{
private readonly IScopeProvider _scopeProvider;

public SubWorkflowStepBody(IScopeProvider scopeProvider)
{
_scopeProvider = scopeProvider;
}

public override ExecutionResult Run(IStepExecutionContext context)
{
// TODO: What is this supposed to do?
throw new NotImplementedException();
var scope = _scopeProvider.CreateScope(context);
var workflowController = scope.ServiceProvider.GetRequiredService<IWorkflowController>();
var logger = scope.ServiceProvider.GetRequiredService<ILoggerFactory>().CreateLogger(
typeof(SubWorkflowStepBody).Namespace + "." + nameof(SubWorkflowStepBody));

if (!context.ExecutionPointer.EventPublished)
{
var result = workflowController.StartWorkflow(SubWorkflowId, context.Workflow.Data, context.Workflow.Id).Result;

logger.LogDebug("Started sub workflow {Name} with id='{SubId}' from workflow {WorkflowDefinitionId} ({Id})",
SubWorkflowId, result, context.Workflow.WorkflowDefinitionId, context.Workflow.Id);

logger.LogDebug("Workflow {Name} ({SubId}) is waiting for event SubWorkflowLifeCycleEvent with key='{EventKey}'",
SubWorkflowId, result, result);

var effectiveDate = DateTime.MinValue;
return ExecutionResult.WaitForEvent(nameof(SubWorkflowLifeCycleEvent), result, effectiveDate);
}

logger.LogDebug("Sub workflow {Name} ({SubId}) completed", SubWorkflowId,
context.ExecutionPointer.EventKey);

var persistenceProvider = scope.ServiceProvider.GetRequiredService<IPersistenceProvider>();
var workflowInstance = persistenceProvider.GetWorkflowInstance(context.ExecutionPointer.EventKey).Result;
if (workflowInstance.Status == WorkflowStatus.Terminated)
{
throw new NotImplementedException(workflowInstance.Status.ToString());
}

Result = workflowInstance.Data;
return ExecutionResult.Next();
}

public string SubWorkflowId { get; set; }

public object Parameters { get; set; }

public object Result { get; set; }
}
}
1 change: 1 addition & 0 deletions src/WorkflowCore/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public static IServiceCollection AddWorkflow(this IServiceCollection services, A
services.AddTransient<ISyncWorkflowRunner, SyncWorkflowRunner>();

services.AddTransient<Foreach>();
services.AddTransient<SubWorkflowStepBody>();

return services;
}
Expand Down
29 changes: 29 additions & 0 deletions src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -588,5 +588,34 @@ public IStepBuilder<TData, Activity> Activity(Expression<Func<TData, IStepExecut
Step.Outcomes.Add(new ValueOutcome { NextStep = newStep.Id });
return stepBuilder;
}

public IStepBuilder<TData, SubWorkflowStepBody> SubWorkflow(
string subWorkflowId,
Expression<Func<TData, object>> parameters = null,
Expression<Func<TData, bool>> cancelCondition = null)
{
var newStep = new WorkflowStep<SubWorkflowStepBody>();
newStep.CancelCondition = cancelCondition;

WorkflowBuilder.AddStep(newStep);
var stepBuilder = new StepBuilder<TData, SubWorkflowStepBody>(WorkflowBuilder, newStep);
stepBuilder.Input((step) => step.SubWorkflowId, (data) => subWorkflowId);

if (parameters != null)
stepBuilder.Input((step) => step.Parameters, parameters);

// use the result of the sub workflow as an output
// merge it with parent workflow data
stepBuilder.Output((body, data) =>
{
foreach (var prop in typeof(TData).GetProperties())
{
prop.SetValue(data, prop.GetValue(body.Result));
}
});

Step.Outcomes.Add(new ValueOutcome { NextStep = newStep.Id });
return stepBuilder;
}
}
}
6 changes: 6 additions & 0 deletions src/WorkflowCore/Services/FluentBuilders/WorkflowBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,16 @@ public IStepBuilder<TData, Activity> Activity(string activityName, Expression<Fu
{
return Start().Activity(activityName, parameters, effectiveDate, cancelCondition);
}

public IStepBuilder<TData, Activity> Activity(Expression<Func<TData, IStepExecutionContext, string>> activityName, Expression<Func<TData, object>> parameters = null, Expression<Func<TData, DateTime>> effectiveDate = null, Expression<Func<TData, bool>> cancelCondition = null)
{
return Start().Activity(activityName, parameters, effectiveDate, cancelCondition);
}

public IStepBuilder<TData, SubWorkflowStepBody> SubWorkflow(string subWorkflowId, Expression<Func<TData, object>> parameters = null, Expression<Func<TData, bool>> cancelCondition = null)
{
return Start().SubWorkflow(subWorkflowId, parameters, cancelCondition);
}

private IStepBuilder<TData, InlineStepBody> Start()
{
Expand Down
43 changes: 40 additions & 3 deletions src/WorkflowCore/Services/WorkflowExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public async Task<WorkflowExecutorResult> Execute(WorkflowInstance workflow, Can
WorkflowId = workflow.Id,
ExecutionPointerId = pointer.Id,
ErrorTime = _datetimeProvider.UtcNow,
Message = ex.Message
Message = ex.ToString()
});

_executionResultProcessor.HandleStepException(workflow, def, pointer, step, ex);
Expand Down Expand Up @@ -156,9 +156,11 @@ private async Task ExecuteStep(WorkflowInstance workflow, WorkflowStep step, Exe
CancellationToken = cancellationToken
};

var stepInfo = $"{step.Name ?? step.BodyType.Name} ({step.Id})";

using (var scope = _scopeProvider.CreateScope(context))
{
_logger.LogDebug("Starting step {StepName} on workflow {WorkflowId}", step.Name, workflow.Id);
_logger.LogDebug("Starting step {StepName} on workflow {WorkflowDefinitionId} ({WorkflowId})", stepInfo, workflow.WorkflowDefinitionId, workflow.Id);

IStepBody body = step.ConstructBody(scope.ServiceProvider);
var stepExecutor = scope.ServiceProvider.GetRequiredService<IStepExecutor>();
Expand Down Expand Up @@ -221,6 +223,13 @@ private async Task DetermineNextExecutionTime(WorkflowInstance workflow, Workflo

if (workflow.Status == WorkflowStatus.Complete)
{
await OnComplete(workflow, def);
return;
}

if (workflow.Status == WorkflowStatus.Terminated)
{
await OnTerminated(workflow, def);
return;
}

Expand Down Expand Up @@ -256,6 +265,11 @@ private async Task DetermineNextExecutionTime(WorkflowInstance workflow, Workflo
return;
}

await OnComplete(workflow, def);
}

private async Task OnComplete(WorkflowInstance workflow, WorkflowDefinition def)
{
workflow.Status = WorkflowStatus.Complete;
workflow.CompleteTime = _datetimeProvider.UtcNow;

Expand All @@ -264,7 +278,7 @@ private async Task DetermineNextExecutionTime(WorkflowInstance workflow, Workflo
var middlewareRunner = scope.ServiceProvider.GetRequiredService<IWorkflowMiddlewareRunner>();
await middlewareRunner.RunPostMiddleware(workflow, def);
}

_publisher.PublishNotification(new WorkflowCompleted
{
EventTimeUtc = _datetimeProvider.UtcNow,
Expand All @@ -274,5 +288,28 @@ private async Task DetermineNextExecutionTime(WorkflowInstance workflow, Workflo
Version = workflow.Version
});
}

private async Task OnTerminated(WorkflowInstance workflow, WorkflowDefinition def)
{
workflow.Status = WorkflowStatus.Terminated;
workflow.CompleteTime = _datetimeProvider.UtcNow;

using (var scope = _serviceProvider.CreateScope())
{
var middlewareRunner = scope.ServiceProvider.GetRequiredService<IWorkflowMiddlewareRunner>();
await middlewareRunner.RunPostMiddleware(workflow, def);
}

_logger.LogDebug("Workflow {WorkflowDefinitionId} ({Id}) terminated", workflow.WorkflowDefinitionId, workflow.Id);

_publisher.PublishNotification(new WorkflowTerminated
{
EventTimeUtc = _datetimeProvider.UtcNow,
Reference = workflow.Reference,
WorkflowInstanceId = workflow.Id,
WorkflowDefinitionId = workflow.WorkflowDefinitionId,
Version = workflow.Version
});
}
}
}
9 changes: 9 additions & 0 deletions src/WorkflowCore/Services/WorkflowHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,15 @@ public Task<bool> TerminateWorkflow(string workflowId)

public void HandleLifeCycleEvent(LifeCycleEvent evt)
{
switch (evt)
{
// publish the event as sub workflow lifecycle event
case WorkflowCompleted _:
case WorkflowTerminated _:
_workflowController.PublishEvent(nameof(SubWorkflowLifeCycleEvent), evt.WorkflowInstanceId, evt.Reference);
break;
}

OnLifeCycleEvent?.Invoke(evt);
}

Expand Down
Loading