-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathBaseWorkflow.cs
145 lines (132 loc) · 6.52 KB
/
BaseWorkflow.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
using AIDocumentPipeline.Shared.Observability;
using Microsoft.DurableTask;
using Microsoft.DurableTask.Client;
using OpenTelemetry.Trace;
namespace AIDocumentPipeline.Shared;
/// <summary>
/// Defines a base class for all workflow classes.
/// </summary>
/// <param name="name">The name of the workflow used for observability.</param>
[ActivitySource]
public abstract class BaseWorkflow(string name)
{
// The retry policy ensures that the workflow retries activity execution up to 5 times, making the first retry after 5 seconds, with subsequent retries increasing exponentially.
private readonly TaskOptions _retryPolicy = TaskOptions.FromRetryPolicy(
new RetryPolicy(
maxNumberOfAttempts: 5,
firstRetryInterval: TimeSpan.FromSeconds(5),
backoffCoefficient: 1.5));
/// <summary>
/// Defines the tracer for the workflow.
/// </summary>
protected readonly Tracer Tracer = TracerProvider.Default.GetTracer(name);
/// <summary>
/// Starts a new workflow instance from the request of a durable function.
/// </summary>
/// <param name="durableFunctionClient">The durable function client used to start the workflow.</param>
/// <param name="input">The input for the workflow.</param>
/// <param name="spanContext">The parent span context for the workflow.</param>
/// <param name="cancellationToken">An optional cancellation token for the operation.</param>
/// <returns>A task representing the asynchronous operation that returns the ID of the started workflow instance.</returns>
protected Task<string> StartWorkflowAsync(
DurableTaskClient durableFunctionClient,
IWorkflowRequest input,
SpanContext spanContext = default,
CancellationToken cancellationToken = default)
{
if (spanContext != default)
{
input.InjectObservabilityContext(spanContext);
}
return durableFunctionClient.ScheduleNewOrchestrationInstanceAsync(name, input, cancellation: cancellationToken);
}
/// <summary>
/// Calls a sub-workflow from the current workflow.
/// </summary>
/// <typeparam name="T">The type of the result from the sub-workflow.</typeparam>
/// <param name="workflowContext">The current workflow context.</param>
/// <param name="subWorkflowName">The name of the sub-workflow to call.</param>
/// <param name="input">The input for the sub-workflow.</param>
/// <param name="spanContext">The parent span context for the sub-workflow.</param>
/// <returns>A task representing the asynchronous operation that returns the result from the sub-workflow.</returns>
protected Task<T> CallWorkflowAsync<T>(
TaskOrchestrationContext workflowContext,
string subWorkflowName,
IWorkflowRequest input,
SpanContext spanContext = default)
{
if (spanContext != default)
{
input.InjectObservabilityContext(spanContext);
}
return workflowContext.CallSubOrchestratorAsync<T>(subWorkflowName, input, _retryPolicy);
}
/// <summary>
/// Calls an activity from the current workflow.
/// </summary>
/// <param name="workflowContext">The current workflow context.</param>
/// <param name="activityName">The name of the activity to call. Use the nameof operator to get the name of the activity.</param>
/// <param name="input">The input for the activity.</param>
/// <param name="spanContext">The parent span context for the activity.</param>
/// <returns>A task representing the asynchronous operation that returns the result from the activity.</returns>
protected Task CallActivityAsync(
TaskOrchestrationContext workflowContext,
string activityName,
IWorkflowRequest input,
SpanContext spanContext = default)
{
if (spanContext != default)
{
input.InjectObservabilityContext(spanContext);
}
return workflowContext.CallActivityAsync(activityName, input, _retryPolicy);
}
/// <summary>
/// Calls an activity from the current workflow.
/// </summary>
/// <typeparam name="T">The type of the result from the activity.</typeparam>
/// <param name="workflowContext">The current workflow context.</param>
/// <param name="activityName">The name of the activity to call. Use the nameof operator to get the name of the activity.</param>
/// <param name="input">The input for the activity.</param>
/// <param name="spanContext">The parent span context for the activity.</param>
/// <returns>A task representing the asynchronous operation that returns the result from the activity.</returns>
protected Task<T> CallActivityAsync<T>(
TaskOrchestrationContext workflowContext,
string activityName,
IWorkflowRequest input,
SpanContext spanContext = default)
{
if (spanContext != default)
{
input.InjectObservabilityContext(spanContext);
}
return workflowContext.CallActivityAsync<T>(activityName, input, _retryPolicy);
}
/// <summary>
/// Extracts the input for the workflow from the request body.
/// </summary>
/// <typeparam name="TInput">The type of <see cref="IWorkflowRequest"/> to extract.</typeparam>
/// <param name="requestBody">The request body to extract the input from.</param>
/// <returns>The extracted input for the workflow.</returns>
/// <exception cref="ArgumentException">Thrown when the request body is not a valid JSON representation of a <see cref="IWorkflowRequest"/> object.</exception>
protected static TInput ExtractInput<TInput>(string requestBody)
where TInput : class, IWorkflowRequest
{
return JsonSerializer.Deserialize<TInput>(requestBody) ??
throw new ArgumentException(
$"The request body is not a valid JSON representation of a {nameof(IWorkflowRequest)} object.",
nameof(requestBody));
}
/// <summary>
/// Starts a span for the workflow and makes it the active span.
/// </summary>
/// <param name="spanName">The name of the workflow span.</param>
/// <param name="context">The observability context for the workflow.</param>
/// <returns>A new active span for the workflow.</returns>
protected TelemetrySpan StartActiveSpan(string spanName, IObservabilityContext? context = default)
{
return context != default
? Tracer.StartActiveSpan(spanName, SpanKind.Internal, context.ExtractObservabilityContext())
: Tracer.StartActiveSpan(spanName);
}
}