Skip to content

Commit

Permalink
refactor: ♻️ 计划作业模块
Browse files Browse the repository at this point in the history
  • Loading branch information
tk authored and nsnail committed Jan 7, 2025
1 parent e82a172 commit f50dfc8
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ public DefaultEventPublisher()
_ = new TaskFactory<Task>().StartNew( //
async state => {
var subscribers = (List<MethodInfo>)state;
await foreach (var msg in _eventChannel.Reader.ReadAllAsync()) {
_ = Parallel.ForEach( //
subscribers.Where(x => x.GetParameters().FirstOrDefault()?.ParameterType == msg.GetType())
, (x, _) => x.Invoke(App.GetService(x.DeclaringType), [msg]));
}
await Parallel.ForEachAsync(_eventChannel.Reader.ReadAllAsync(), (msg, __) => {
_ = Parallel.ForEach( //
subscribers.Where(x => x.GetParameters().FirstOrDefault()?.ParameterType == msg.GetType())
, (x, _) => x.Invoke(App.GetService(x.DeclaringType), [msg]));
return ValueTask.CompletedTask;
})
.ConfigureAwait(false);
}, App.EffectiveTypes.Where(x => typeof(IEventSubscriber).IsAssignableFrom(x) && x.IsClass && !x.IsAbstract).SelectMany(x => x.GetMethods(BindingFlags.Instance | BindingFlags.Public).Where(y => y.IsDefined(typeof(EventSubscribeAttribute)))).ToList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<ItemGroup>
<PackageReference Include="NetAdmin.FreeSql.DbContext" Version="1.1.1" Label="refs"/>
<PackageReference Include="NetAdmin.FreeSql.Provider.Sqlite" Version="1.1.1" Label="refs"/>
<PackageReference Include="Gurion" Version="1.2.9" Label="refs"/>
<PackageReference Include="Gurion" Version="1.2.10" Label="refs"/>
<PackageReference Include="Microsoft.Extensions.Caching.StackExchangeRedis" Version="9.0.0"/>
<PackageReference Include="Minio" Version="6.0.4"/>
<PackageReference Include="NSExt" Version="2.3.3"/>
Expand Down
12 changes: 12 additions & 0 deletions src/backend/NetAdmin/NetAdmin.Infrastructure/Schedule/IJob.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace NetAdmin.Infrastructure.Schedule;

/// <summary>
/// 作业处理程序
/// </summary>
public interface IJob
{
/// <summary>
/// 具体处理逻辑
/// </summary>
Task ExecuteAsync(CancellationToken cancelToken);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace NetAdmin.Infrastructure.Schedule;

/// <summary>
/// 作业配置
/// </summary>
[AttributeUsage(AttributeTargets.Class)]
public sealed class JobConfigAttribute : Attribute
{
/// <summary>
/// 上一次执行时间
/// </summary>
public DateTime? LastExecutionTime { get; set; }

/// <summary>
/// 启动时运行
/// </summary>
public bool RunOnStart { get; init; }

/// <summary>
/// 触发器表达式
/// </summary>
public string TriggerCron { get; init; }
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
using Gurion.Schedule;
using Cronos;
using NetAdmin.Domain.Contexts;
using NetAdmin.Domain.Events;
using NetAdmin.Host.Filters;
using NetAdmin.SysComponent.Host.Jobs;
using NetAdmin.Host.Middlewares;
using NetAdmin.Infrastructure.Schedule;
using NetAdmin.SysComponent.Host.Utils;
using FreeSqlBuilder = NetAdmin.Infrastructure.Utils.FreeSqlBuilder;

Expand Down Expand Up @@ -62,17 +63,58 @@ public static IServiceCollection AddFreeSql( //
/// <summary>
/// 添加定时任务
/// </summary>
public static IServiceCollection AddSchedules(this IServiceCollection me, bool force = false, Action<ScheduleOptionsBuilder> optionsAction = null)
public static IServiceCollection AddSchedules(this IServiceCollection me, bool force = false)
{
return App.WebHostEnvironment.IsProduction() || force
? me.AddSchedule( //
builder => {
_ = builder //
.AddJob<ScheduledJob>(true, Triggers.PeriodSeconds(1).SetRunOnStart(true))
.AddJob<FreeScheduledJob>(true, Triggers.PeriodMinutes(1).SetRunOnStart(true));

optionsAction?.Invoke(builder);
})
: me;
if (!App.WebHostEnvironment.IsProduction() && !force) {
return me;
}

var jobTypes = App.EffectiveTypes
.Where(x => typeof(IJob).IsAssignableFrom(x) && x.IsClass && !x.IsAbstract && x.IsDefined(typeof(JobConfigAttribute)))
.ToDictionary(x => x, x => x.GetCustomAttribute<JobConfigAttribute>());
var runOnStartJobTypes = jobTypes.Where(x => //
x.Value.RunOnStart);
RunJob(runOnStartJobTypes);
_ = Task.Run(LoopTaskAsync);
return me;

#pragma warning disable S2190
async Task LoopTaskAsync()
#pragma warning restore S2190
{
while (true) {
await Task.Delay(1000).ConfigureAwait(false);
if (SafetyShopHostMiddleware.IsShutdown) {
Console.WriteLine(Ln.此节点已下线);
}
else {
RunJob(jobTypes.Where(Filter));
}
}

bool Filter(KeyValuePair<Type, JobConfigAttribute> x)
{
return !x.Value.TriggerCron.NullOrEmpty() &&
CronExpression.Parse(x.Value.TriggerCron, CronFormat.IncludeSeconds)
.GetNextOccurrence(x.Value.LastExecutionTime ?? DateTime.UtcNow.AddDays(-1), TimeZoneInfo.Local)
?.ToLocalTime() <= DateTime.Now;
}

// ReSharper disable once FunctionNeverReturns
}
}

private static void RunJob(IEnumerable<KeyValuePair<Type, JobConfigAttribute>> jobTypes)
{
foreach (var job in jobTypes) {
try {
_ = typeof(IJob).GetMethod(nameof(IJob.ExecuteAsync))!.Invoke( //
Activator.CreateInstance(job.Key), [CancellationToken.None]);
job.Value.LastExecutionTime = DateTime.UtcNow;
}
catch (Exception ex) {
LogHelper.Get<IServiceCollection>().Error(ex);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
using Gurion.Schedule;
using NetAdmin.Host.BackgroundRunning;
using NetAdmin.Host.Middlewares;
using NetAdmin.Infrastructure.Schedule;

namespace NetAdmin.SysComponent.Host.Jobs;

/// <summary>
/// 释放计划作业
/// </summary>
[JobConfig(TriggerCron = "0 * * * * *")]
public sealed class FreeScheduledJob : WorkBase<FreeScheduledJob>, IJob
{
private readonly IJobService _jobService;
Expand All @@ -22,17 +22,11 @@ public FreeScheduledJob()
/// <summary>
/// 具体处理逻辑
/// </summary>
/// <param name="context">作业执行前上下文</param>
/// <param name="stoppingToken">取消任务 Token</param>
/// <param name="cancelToken">取消任务 Token</param>
/// <exception cref="NetAdminGetLockerException">加锁失败异常</exception>
public async Task ExecuteAsync(JobExecutingContext context, CancellationToken stoppingToken)
public async Task ExecuteAsync(CancellationToken cancelToken)
{
if (SafetyShopHostMiddleware.IsShutdown) {
Console.WriteLine(Ln.此节点已下线);
return;
}

await WorkflowAsync(true, stoppingToken).ConfigureAwait(false);
await WorkflowAsync(true, cancelToken).ConfigureAwait(false);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
using FreeSql.Internal;
using Gurion.RemoteRequest;
using Gurion.RemoteRequest.Extensions;
using Gurion.Schedule;
using NetAdmin.Application.Extensions;
using NetAdmin.Domain.Dto.Sys.Job;
using NetAdmin.Domain.Dto.Sys.JobRecord;
using NetAdmin.Host.BackgroundRunning;
using NetAdmin.Host.Middlewares;
using NetAdmin.Infrastructure.Schedule;

namespace NetAdmin.SysComponent.Host.Jobs;

/// <summary>
/// 计划作业
/// </summary>
[JobConfig(TriggerCron = "* * * * * *")]
public sealed class ScheduledJob : WorkBase<ScheduledJob>, IJob
{
private static string _accessToken;
Expand All @@ -30,19 +30,12 @@ public ScheduledJob()
/// <summary>
/// 具体处理逻辑
/// </summary>
/// <param name="context">作业执行前上下文</param>
/// <param name="stoppingToken">取消任务 Token</param>
/// <param name="cancelToken">取消任务 Token</param>
/// <exception cref="NetAdminGetLockerException">加锁失败异常</exception>
public async Task ExecuteAsync(JobExecutingContext context, CancellationToken stoppingToken)
public Task ExecuteAsync(CancellationToken cancelToken)
{
if (SafetyShopHostMiddleware.IsShutdown) {
Console.WriteLine(Ln.此节点已下线);
return;
}

// ReSharper disable once MethodSupportsCancellation
await Parallel.ForAsync(0, Numbers.SCHEDULED_JOB_PARALLEL_NUM, async (_, _) => await WorkflowAsync(stoppingToken).ConfigureAwait(false))
.ConfigureAwait(false);
return Parallel.ForAsync(0, Numbers.SCHEDULED_JOB_PARALLEL_NUM, cancelToken
, async (_, _) => await WorkflowAsync(cancelToken).ConfigureAwait(false));
}

/// <summary>
Expand Down

0 comments on commit f50dfc8

Please # to comment.