Skip to content

Commit f176a86

Browse files
committed
fix(Worker): Fixed the IterationStateProcessor which was not properly merging the output collection, if any, back into the state data
fix(Worker): Fixed the ForEachStateProcessor which was outputting the iteration parameter, which is now deleted from the state data fix(Worker): Fixed the OpenApiFunctionProcessor, which was not removing espace characters inherited from JSON when iterating through expression matches Fixes #180 Fixes #181 Fixes #179
1 parent 5ac30b4 commit f176a86

File tree

3 files changed

+37
-9
lines changed

3 files changed

+37
-9
lines changed

src/apps/Synapse.Worker/Services/Processors/ForEachStateProcessor.cs

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,17 @@ public class ForEachStateProcessor
3131

3232
/// <inheritdoc/>
3333
public ForEachStateProcessor(ILoggerFactory loggerFactory, IWorkflowRuntimeContext context, IWorkflowActivityProcessorFactory activityProcessorFactory,
34-
IOptions<ApplicationOptions> options, V1WorkflowActivity activity, ForEachStateDefinition state)
34+
IJsonSerializer jsonSerializer, IOptions<ApplicationOptions> options, V1WorkflowActivity activity, ForEachStateDefinition state)
3535
: base(loggerFactory, context, activityProcessorFactory, options, activity, state)
3636
{
37-
37+
this.JsonSerializer = jsonSerializer;
3838
}
3939

40+
/// <summary>
41+
/// Gets the service used to serialize/deserialize to/from JSON
42+
/// </summary>
43+
protected IJsonSerializer JsonSerializer { get; }
44+
4045
/// <inheritdoc/>
4146
protected override IWorkflowActivityProcessor CreateProcessorFor(V1WorkflowActivity activity)
4247
{
@@ -65,7 +70,10 @@ protected override async Task InitializeAsync(CancellationToken cancellationToke
6570
}
6671
var iterationParamValue = inputCollection.First();
6772
var input = (DynamicObject)this.Activity.Input!;
68-
input.Set(this.State.IterationParameter!, iterationParamValue);
73+
var iterationParam = this.State.IterationParameter;
74+
if (string.IsNullOrWhiteSpace(iterationParam))
75+
iterationParam = "item";
76+
input.Set(iterationParam, iterationParamValue);
6977
var metadata = new Dictionary<string, string>()
7078
{
7179
{ V1WorkflowActivityMetadata.State, this.State.Name! },
@@ -141,13 +149,31 @@ protected virtual async Task OnIterationResultAsync(IterationProcessor processor
141149
|| inputCollection.Count() - 1 <= iterationIndex)
142150
{
143151
var outputCollection = await this.GetOutputCollectionAsync(cancellationToken);
144-
foreach (var itarationActivity in (await this.Context.Workflow.GetActivitiesAsync(this.Activity, cancellationToken))
145-
.Where(a => a.Type == V1WorkflowActivityType.Iteration && a.Status == V1WorkflowActivityStatus.Completed)
152+
foreach (var iterationActivity in (await this.Context.Workflow.GetActivitiesAsync(this.Activity, cancellationToken))
153+
.Where(a => a.Type == V1WorkflowActivityType.Iteration && a.Status == V1WorkflowActivityStatus.Completed && a.Output != null)
146154
.ToList())
147155
{
148-
outputCollection.Add(itarationActivity.Output!.ToObject()!);
156+
outputCollection.Add(iterationActivity.Output!.ToObject()!);
157+
}
158+
var output = this.Activity.Input.ToObject();
159+
var expression = this.State.IterationParameter;
160+
if (string.IsNullOrWhiteSpace(expression))
161+
expression = "item";
162+
if (expression.StartsWith("${"))
163+
expression = expression[2..^1];
164+
expression = $". |= del(.{expression})";
165+
if (output != null)
166+
output = await this.Context.EvaluateAsync(expression, output, cancellationToken);
167+
if (!string.IsNullOrWhiteSpace(this.State.OutputCollection))
168+
{
169+
expression = this.State.OutputCollection.Trim();
170+
var outputCollectionJson = await this.JsonSerializer.SerializeAsync(outputCollection, cancellationToken);
171+
if (expression.StartsWith("${"))
172+
expression = expression[2..^1];
173+
expression = $"{expression} = {outputCollectionJson}";
174+
output = await this.Context.EvaluateAsync(expression, output, cancellationToken);
149175
}
150-
await this.OnNextAsync(new V1WorkflowActivityCompletedIntegrationEvent(this.Activity.Id, outputCollection), cancellationToken);
176+
await this.OnNextAsync(new V1WorkflowActivityCompletedIntegrationEvent(this.Activity.Id, output), cancellationToken);
151177
return;
152178
}
153179
iterationIndex += 1;

src/apps/Synapse.Worker/Services/Processors/IterationProcessor.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ protected override async Task ProcessAsync(CancellationToken cancellationToken)
133133
/// <returns>The result of the action outputs aggregation</returns>
134134
protected virtual async Task<object> AggregateActionOutputsAsync(CancellationToken cancellationToken)
135135
{
136-
var output = this.Activity.Input.ToObject();
136+
object? output = null;
137137
foreach (var activity in (await this.Context.Workflow.GetActivitiesAsync(this.Activity, cancellationToken))
138138
.Where(a => a.Type == V1WorkflowActivityType.Action && a.Status == V1WorkflowActivityStatus.Completed)
139139
.OrderBy(a => a.ExecutedAt))
@@ -156,6 +156,8 @@ protected virtual async Task<object> AggregateActionOutputsAsync(CancellationTok
156156
expression = expression[2..^1];
157157
expression = $"{expression} = {json}";
158158
}
159+
if (output == null)
160+
output = new();
159161
output = await this.Context.EvaluateAsync(expression, output, cancellationToken);
160162
}
161163
}

src/apps/Synapse.Worker/Services/Processors/OpenApiFunctionProcessor.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ protected virtual async Task BuildParametersAsync(CancellationToken cancellation
263263
var jsonArgs = JsonConvert.SerializeObject(this.FunctionReference.Arguments);
264264
foreach (Match match in Regex.Matches(jsonArgs, @"""\$\{.+?\}"""))
265265
{
266-
var expression = match.Value[3..^2].Trim();
266+
var expression = match.Value[3..^2].Trim().Replace(@"\""", @"""");
267267
var evaluationResult = await this.Context.EvaluateAsync(expression, this.Activity.Input!.ToObject()!, cancellationToken);
268268
if(evaluationResult == null)
269269
{

0 commit comments

Comments
 (0)