-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathHostBuilderExtensions.cs
263 lines (249 loc) · 13.1 KB
/
HostBuilderExtensions.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
using System;
using System.Diagnostics.CodeAnalysis;
using System.Threading.Tasks;
using Akka.Hosting;
using Akka.Util;
using Arcane.Framework.Contracts;
using Arcane.Framework.Services;
using Arcane.Framework.Services.Base;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Serilog;
using Snd.Sdk.ActorProviders;
using Snd.Sdk.Kubernetes.Providers;
using Snd.Sdk.Logs.Providers;
using Snd.Sdk.Logs.Providers.Configurations;
namespace Arcane.Framework.Providers.Hosting;
/// <summary>
/// Extension methods for building the console streaming host.
/// </summary>
public static class HostBuilderExtensions
{
private const string ENV_PREFIX = "STREAMCONTEXT__";
/// <summary>
/// Add the default logging configuration to the streaming host builder.
/// </summary>
/// <param name="builder">IHostBuilder instance</param>
/// <param name="configureLogger">Optional logger configuration callback.</param>
/// <param name="contextBuilder">StreamingHostBuilderContext builder function</param>
/// <returns>Configured IHostBuilder instance</returns>
[ExcludeFromCodeCoverage(Justification = "Trivial")]
public static IHostBuilder AddDatadogLogging(this IHostBuilder builder,
Action<HostBuilderContext, IServiceProvider, LoggerConfiguration> configureLogger = null,
Func<StreamingHostBuilderContext> contextBuilder = null)
{
var context = contextBuilder?.Invoke() ?? StreamingHostBuilderContext.FromEnvironment(ENV_PREFIX);
return builder.AddSerilogLogger(context.ApplicationName,
(hostBuilderContext, applicationName, loggerConfiguration) =>
{
configureLogger?.Invoke(hostBuilderContext, applicationName, loggerConfiguration);
loggerConfiguration
.Enrich.WithProperty("streamId", context.StreamId)
.Enrich.WithProperty("streamKind", context.StreamKind)
.AddDatadog();
}
);
}
/// <summary>
/// Adds the required services for the streaming host.
/// </summary>
/// <param name="builder">IHostBuilder instance</param>
/// <param name="getStreamGraphBuilder">The function that adds the stream graph builder to the services collection.</param>
/// <param name="getStreamRunnerService">
/// The function that adds the stream runner service to the services collection.
/// This parameter is optional. If omitted, the default implementation will be used.
/// </param>
/// <param name="getStreamLifetimeService">
/// The function that adds the stream lifetime service to the services collection.
/// This parameter is optional. If omitted, the default implementation will be used.
/// </param>
/// <param name="configureActorSystem">
/// The function that gets the custom configuration for the actor system.
/// This parameter is optional. If omitted, the actor system will be configured with default settings getd by the SnD.Sdk library.
/// </param>
/// <param name="getStreamHostContextBuilder">StreamingHostBuilderContext builder function</param>
/// <returns>Configured IHostBuilder instance</returns>
[ExcludeFromCodeCoverage(Justification = "Trivial")]
public static IHostBuilder ConfigureRequiredServices(this IHostBuilder builder,
Func<IServiceCollection, IServiceCollection> getStreamGraphBuilder,
Func<IServiceProvider, IStreamRunnerService> getStreamRunnerService = null,
Func<IServiceProvider, IStreamLifetimeService> getStreamLifetimeService = null,
Func<HostBuilderContext, StreamingHostBuilderContext, Action<AkkaConfigurationBuilder>> configureActorSystem = null,
Func<StreamingHostBuilderContext> getStreamHostContextBuilder = null
)
{
var context = getStreamHostContextBuilder?.Invoke() ?? StreamingHostBuilderContext.FromEnvironment(ENV_PREFIX);
return builder.ConfigureServices((HostBuilderContext, services) =>
{
getStreamGraphBuilder(services);
services.AddKubernetes()
.AddServiceWithOptionalFactory<IStreamRunnerService, StreamRunnerService>(getStreamRunnerService)
.AddServiceWithOptionalFactory<IStreamLifetimeService, StreamLifetimeService>(getStreamLifetimeService)
.AddLocalActorSystem(configureActorSystem?.Invoke(HostBuilderContext, context))
.AddSingleton<IArcaneExceptionHandler, ArcaneExceptionHandler>();
});
}
/// <summary>
/// Allows to configure additional services for the streaming host (metrics, logging, storage writers etc.).
/// </summary>
/// <param name="builder">IHostBuilder instance</param>
/// <param name="configureAdditionalServices">The function that adds services to the services collection.</param>
/// <param name="getStreamHostContextBuilder">StreamingHostBuilderContext builder function</param>
/// <returns>Configured IHostBuilder instance</returns>
[ExcludeFromCodeCoverage(Justification = "Trivial")]
public static IHostBuilder ConfigureAdditionalServices(this IHostBuilder builder,
Action<IServiceCollection, StreamingHostBuilderContext> configureAdditionalServices,
Func<StreamingHostBuilderContext> getStreamHostContextBuilder = null)
{
var context = getStreamHostContextBuilder?.Invoke() ?? StreamingHostBuilderContext.FromEnvironment(ENV_PREFIX);
return builder.ConfigureServices((_, services) =>
{
configureAdditionalServices.Invoke(services, context);
});
}
/// <summary>
/// This method adds the untyped stream graph builder to the services collection.
/// Untyped stream graph builder is used when the stream context is not known at compile time.
/// In this case the implementation of the stream graph builder should contain the logic for determining the stream context type.
/// </summary>
/// <param name="services">Services collection.</param>
/// <param name="getStreamContext">The factory function that provides the stream context instance.</param>
/// <param name="getStreamHostContextBuilder">StreamingHostBuilderContext builder function</param>
/// <param name="getStreamStatusService">StreamingHostBuilderContext builder function</param>
/// <typeparam name="TStreamGraphBuilder">The stream graph builder type.</typeparam>
/// <returns>Services collection</returns>
[ExcludeFromCodeCoverage(Justification = "Trivial")]
public static IServiceCollection AddStreamGraphBuilder<TStreamGraphBuilder>(this IServiceCollection services,
Func<StreamingHostBuilderContext, IStreamContext> getStreamContext,
Func<StreamingHostBuilderContext> getStreamHostContextBuilder = null,
Func<IServiceProvider, IStreamStatusService> getStreamStatusService = null)
where TStreamGraphBuilder : class, IStreamGraphBuilder<IStreamContext>
{
var context = getStreamHostContextBuilder?.Invoke() ?? StreamingHostBuilderContext.FromEnvironment(ENV_PREFIX);
services.AddSingleton<IStreamGraphBuilder<IStreamContext>, TStreamGraphBuilder>();
services.AddServiceWithOptionalFactory<IStreamStatusService, StreamStatusService>(getStreamStatusService);
services.AddSingleton(_ => getStreamContext(context));
return services;
}
/// <summary>
/// This method adds the typed stream graph builder to the services collection.
/// The typed stream graph builder is used when the stream context type is known at compile time.
/// I's preferable to use this method when possible.
/// </summary>
/// <param name="services">Services collection.</param>
/// <param name="getStreamHostContextBuilder">Provides a TStreamContext instance</param>
/// <param name="getStreamStatusService">Provides a TStreamContext instance</param>
/// <typeparam name="TStreamContext">The stream context type</typeparam>
/// <typeparam name="TStreamGraphBuilder">The stream graph builder type.</typeparam>
/// <returns></returns>
[ExcludeFromCodeCoverage(Justification = "Trivial")]
public static IServiceCollection AddStreamGraphBuilder<TStreamGraphBuilder, TStreamContext>(this IServiceCollection services,
Func<TStreamContext> getStreamHostContextBuilder = null,
Func<IServiceProvider, IStreamStatusService> getStreamStatusService = null)
where TStreamContext : class, IStreamContext, IStreamContextWriter, new()
where TStreamGraphBuilder : class, IStreamGraphBuilder<TStreamContext>
{
services.AddSingleton<IStreamGraphBuilder<TStreamContext>, TStreamGraphBuilder>();
services.AddServiceWithOptionalFactory<IStreamStatusService, StreamStatusService>(getStreamStatusService);
services.AddStreamContext<TStreamContext>(getStreamHostContextBuilder);
return services;
}
/// <summary>
/// Runs the stream created by the typed stream graph builder.
/// </summary>
/// <param name="host">Streaming application host</param>
/// <param name="logger">Static bootstrap logger</param>
/// <param name="handleUnknownException">
/// Exception handler for unhandled exceptions.
/// If omitted, the default handler will be used.
/// If provided, this handler will be invoked after the default handler.
/// The handler should return the application exit code.
/// </param>
/// <typeparam name="TContext">
/// The stream context type. This should match the type that uses injected implementation if the
/// IStreamGraphBuilder interface.
/// </typeparam>
/// <returns>Application exit code</returns>
public static async Task<int> RunStream<TContext>(this IHost host, ILogger logger, Func<Exception, ILogger, Task<Option<int>>> handleUnknownException = null)
where TContext : IStreamContext
{
var runner = host.Services.GetRequiredService<IStreamRunnerService>();
var exceptionHandler = host.Services.GetService<IArcaneExceptionHandler>();
var context = host.Services.GetRequiredService<TContext>();
using var lifetimeService = host.Services.GetRequiredService<IStreamLifetimeService>();
var graphBuilder = host.Services.GetRequiredService<IStreamGraphBuilder<TContext>>();
try
{
var completeTask = runner.RunStream(() => graphBuilder.BuildGraph(context));
await completeTask;
if (context.IsBackfilling && lifetimeService.IsStopRequested)
{
logger.Information("The stream was stopped during backfilling, retrying...");
return ExitCodes.RESTART;
}
}
catch (Exception e)
{
if (exceptionHandler is null)
{
return await TryHandleUnknownException(e, logger, handleUnknownException);
}
var handled = await exceptionHandler.HandleException(e);
return handled switch
{
{ HasValue: true, Value: var exitCode } => exitCode,
{ HasValue: false } => await TryHandleUnknownException(e, logger, handleUnknownException),
};
}
logger.Information("Streaming job is completed successfully, exiting");
return ExitCodes.SUCCESS;
}
/// <summary>
/// Runs the stream created by the untyped stream graph builder.
/// </summary>
/// <param name="host">Streaming application host</param>
/// <param name="logger">Static bootstrap logger</param>
/// <param name="handleUnknownException">
/// Exception handler for unhandled exceptions.
/// If omitted, the default handler will be used.
/// If provided, this handler will be invoked after the default handler.
/// The handler should return the application exit code.
/// </param>
/// <returns>Application exit code</returns>
public static async Task<int> RunStream(this IHost host, ILogger logger,
Func<Exception, ILogger, Task<Option<int>>> handleUnknownException = null)
{
return await RunStream<IStreamContext>(host, logger, handleUnknownException);
}
private static async Task<int> TryHandleUnknownException(Exception e, ILogger logger, Func<Exception, ILogger, Task<Option<int>>> handleUnknownException = null)
{
if (handleUnknownException is null)
{
return FatalExit(e, logger);
}
return await handleUnknownException(e, logger) switch
{
{ HasValue: true, Value: var exitCode } => exitCode,
_ => FatalExit(e, logger),
};
}
private static int FatalExit(Exception e, ILogger logger)
{
logger.Error(e, "Unhandled exception occurred");
return ExitCodes.FATAL;
}
[ExcludeFromCodeCoverage(Justification = "Trivial")]
private static IServiceCollection AddServiceWithOptionalFactory<TService, TImplementation>(this IServiceCollection services, Func<IServiceProvider, TService> factory = null)
where TService : class where TImplementation : class, TService
{
if (factory != null)
{
services.AddSingleton(factory);
}
else
{
services.AddSingleton<TService, TImplementation>();
}
return services;
}
}