Skip to content

Commit 6f1ef20

Browse files
committed
Fix incorrect logs on new schedule engine triggered taskss
Also added the ability to recover schedules in the schedule engine via an Admin API endpoint in the new schedule engine
1 parent f2db1b8 commit 6f1ef20

File tree

12 files changed

+715
-11
lines changed

12 files changed

+715
-11
lines changed
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import { ActionFunctionArgs, json, LoaderFunctionArgs } from "@remix-run/server-runtime";
2+
import { z } from "zod";
3+
import { prisma } from "~/db.server";
4+
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
5+
import { scheduleEngine } from "~/v3/scheduleEngine.server";
6+
7+
const ParamsSchema = z.object({
8+
environmentId: z.string(),
9+
});
10+
11+
export async function action({ request, params }: ActionFunctionArgs) {
12+
// Next authenticate the request
13+
const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request);
14+
15+
if (!authenticationResult) {
16+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
17+
}
18+
19+
const user = await prisma.user.findUnique({
20+
where: {
21+
id: authenticationResult.userId,
22+
},
23+
});
24+
25+
if (!user) {
26+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
27+
}
28+
29+
if (!user.admin) {
30+
return json({ error: "You must be an admin to perform this action" }, { status: 403 });
31+
}
32+
33+
const parsedParams = ParamsSchema.parse(params);
34+
35+
const environment = await prisma.runtimeEnvironment.findFirst({
36+
where: {
37+
id: parsedParams.environmentId,
38+
},
39+
include: {
40+
organization: true,
41+
project: true,
42+
},
43+
});
44+
45+
if (!environment) {
46+
return json({ error: "Environment not found" }, { status: 404 });
47+
}
48+
49+
const results = await scheduleEngine.recoverSchedulesInEnvironment(
50+
environment.projectId,
51+
environment.id
52+
);
53+
54+
return json({
55+
success: true,
56+
results,
57+
});
58+
}

apps/webapp/app/runEngine/concerns/traceEvents.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ export class DefaultTraceEventsConcern implements TraceEventConcern {
3939
},
4040
incomplete: true,
4141
immediate: true,
42+
startTime: request.options?.overrideCreatedAt
43+
? BigInt(request.options.overrideCreatedAt.getTime()) * BigInt(1000000)
44+
: undefined,
4245
},
4346
async (event, traceContext, traceparent) => {
4447
return await callback({

apps/webapp/app/runEngine/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ export type TriggerTaskServiceOptions = {
2121
runFriendlyId?: string;
2222
skipChecks?: boolean;
2323
oneTimeUseToken?: string;
24+
overrideCreatedAt?: Date;
2425
};
2526

2627
// domain/triggerTask.ts

apps/webapp/app/v3/eventRepository.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -970,7 +970,7 @@ export class EventRepository {
970970
const propagatedContext = extractContextFromCarrier(options.context ?? {});
971971

972972
const start = process.hrtime.bigint();
973-
const startTime = getNowInNanoseconds();
973+
const startTime = options.startTime ?? getNowInNanoseconds();
974974

975975
const traceId = options.spanParentAsLink
976976
? this.generateTraceId()

apps/webapp/app/v3/services/triggerTaskV1.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,9 @@ export class TriggerTaskServiceV1 extends BaseService {
312312
},
313313
incomplete: true,
314314
immediate: true,
315+
startTime: options.overrideCreatedAt
316+
? BigInt(options.overrideCreatedAt.getTime()) * BigInt(1000000)
317+
: undefined,
315318
},
316319
async (event, traceContext, traceparent) => {
317320
const run = await autoIncrementCounter.incrementInTransaction(

apps/webapp/app/v3/taskEventStore.server.ts

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -82,17 +82,20 @@ export class TaskEventStore {
8282
let finalWhere: Prisma.TaskEventWhereInput = where;
8383

8484
if (table === "taskEventPartitioned") {
85-
// Add 1 minute to endCreatedAt to make sure we include all events in the range.
85+
// Add buffer to start and end of the range to make sure we include all events in the range.
8686
const end = endCreatedAt
8787
? new Date(endCreatedAt.getTime() + env.TASK_EVENT_PARTITIONED_WINDOW_IN_SECONDS * 1000)
8888
: new Date();
89+
const startCreatedAtWithBuffer = new Date(
90+
startCreatedAt.getTime() - env.TASK_EVENT_PARTITIONED_WINDOW_IN_SECONDS * 1000
91+
);
8992

9093
finalWhere = {
9194
AND: [
9295
where,
9396
{
9497
createdAt: {
95-
gte: startCreatedAt,
98+
gte: startCreatedAtWithBuffer,
9699
lt: end,
97100
},
98101
},
@@ -138,6 +141,11 @@ export class TaskEventStore {
138141
options?.includeDebugLogs === false || options?.includeDebugLogs === undefined;
139142

140143
if (table === "taskEventPartitioned") {
144+
const createdAtBufferInMillis = env.TASK_EVENT_PARTITIONED_WINDOW_IN_SECONDS * 1000;
145+
const startCreatedAtWithBuffer = new Date(startCreatedAt.getTime() - createdAtBufferInMillis);
146+
const $endCreatedAt = endCreatedAt ?? new Date();
147+
const endCreatedAtWithBuffer = new Date($endCreatedAt.getTime() + createdAtBufferInMillis);
148+
141149
return await this.readReplica.$queryRaw<TraceEvent[]>`
142150
SELECT
143151
"spanId",
@@ -158,11 +166,8 @@ export class TaskEventStore {
158166
FROM "TaskEventPartitioned"
159167
WHERE
160168
"traceId" = ${traceId}
161-
AND "createdAt" >= ${startCreatedAt.toISOString()}::timestamp
162-
AND "createdAt" < ${(endCreatedAt
163-
? new Date(endCreatedAt.getTime() + env.TASK_EVENT_PARTITIONED_WINDOW_IN_SECONDS * 1000)
164-
: new Date()
165-
).toISOString()}::timestamp
169+
AND "createdAt" >= ${startCreatedAtWithBuffer.toISOString()}::timestamp
170+
AND "createdAt" < ${endCreatedAtWithBuffer.toISOString()}::timestamp
166171
${
167172
filterDebug
168173
? Prisma.sql`AND \"kind\" <> CAST('LOG'::text AS "public"."TaskEventKind")`

internal-packages/schedule-engine/src/engine/index.ts

Lines changed: 135 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import {
88
Tracer,
99
} from "@internal/tracing";
1010
import { Logger } from "@trigger.dev/core/logger";
11-
import { PrismaClient } from "@trigger.dev/database";
11+
import { PrismaClient, TaskSchedule, TaskScheduleInstance } from "@trigger.dev/database";
1212
import { Worker, type JobHandlerParams } from "@trigger.dev/redis-worker";
1313
import { calculateDistributedExecutionTime } from "./distributedScheduling.js";
1414
import { calculateNextScheduledTimestamp, nextScheduledTimestamps } from "./scheduleCalculation.js";
@@ -645,6 +645,140 @@ export class ScheduleEngine {
645645
});
646646
}
647647

648+
public recoverSchedulesInEnvironment(projectId: string, environmentId: string) {
649+
return startSpan(this.tracer, "recoverSchedulesInEnvironment", async (span) => {
650+
this.logger.info("Recovering schedules in environment", {
651+
environmentId,
652+
projectId,
653+
});
654+
655+
span.setAttribute("environmentId", environmentId);
656+
657+
const schedules = await this.prisma.taskSchedule.findMany({
658+
where: {
659+
projectId,
660+
instances: {
661+
some: {
662+
environmentId,
663+
},
664+
},
665+
},
666+
select: {
667+
id: true,
668+
generatorExpression: true,
669+
instances: {
670+
select: {
671+
id: true,
672+
environmentId: true,
673+
lastScheduledTimestamp: true,
674+
nextScheduledTimestamp: true,
675+
},
676+
},
677+
},
678+
});
679+
680+
const instancesWithSchedule = schedules
681+
.map((schedule) => ({
682+
schedule,
683+
instance: schedule.instances.find((instance) => instance.environmentId === environmentId),
684+
}))
685+
.filter((instance) => instance.instance) as Array<{
686+
schedule: Omit<(typeof schedules)[number], "instances">;
687+
instance: NonNullable<(typeof schedules)[number]["instances"][number]>;
688+
}>;
689+
690+
if (instancesWithSchedule.length === 0) {
691+
this.logger.info("No instances found for environment", {
692+
environmentId,
693+
projectId,
694+
});
695+
696+
return {
697+
recovered: [],
698+
skipped: [],
699+
};
700+
}
701+
702+
const results = {
703+
recovered: [],
704+
skipped: [],
705+
} as { recovered: string[]; skipped: string[] };
706+
707+
for (const { instance, schedule } of instancesWithSchedule) {
708+
this.logger.info("Recovering schedule", {
709+
schedule,
710+
instance,
711+
});
712+
713+
const [recoverError, result] = await tryCatch(
714+
this.#recoverTaskScheduleInstance({ instance, schedule })
715+
);
716+
717+
if (recoverError) {
718+
this.logger.error("Error recovering schedule", {
719+
error: recoverError instanceof Error ? recoverError.message : String(recoverError),
720+
});
721+
722+
span.setAttribute("recover_error", true);
723+
span.setAttribute(
724+
"recover_error_message",
725+
recoverError instanceof Error ? recoverError.message : String(recoverError)
726+
);
727+
} else {
728+
span.setAttribute("recover_success", true);
729+
730+
if (result === "recovered") {
731+
results.recovered.push(instance.id);
732+
} else {
733+
results.skipped.push(instance.id);
734+
}
735+
}
736+
}
737+
738+
return results;
739+
});
740+
}
741+
742+
async #recoverTaskScheduleInstance({
743+
instance,
744+
schedule,
745+
}: {
746+
instance: {
747+
id: string;
748+
environmentId: string;
749+
lastScheduledTimestamp: Date | null;
750+
nextScheduledTimestamp: Date | null;
751+
};
752+
schedule: { id: string; generatorExpression: string };
753+
}) {
754+
// inspect the schedule worker to see if there is a job for this instance
755+
const job = await this.worker.getJob(`scheduled-task-instance:${instance.id}`);
756+
757+
if (job) {
758+
this.logger.info("Job already exists for instance", {
759+
instanceId: instance.id,
760+
job,
761+
schedule,
762+
});
763+
764+
return "skipped";
765+
}
766+
767+
this.logger.info("No job found for instance, registering next run", {
768+
instanceId: instance.id,
769+
schedule,
770+
});
771+
772+
// If the job does not exist, register the next run
773+
await this.registerNextTaskScheduleInstance({ instanceId: instance.id });
774+
775+
return "recovered";
776+
}
777+
778+
async getJob(id: string) {
779+
return this.worker.getJob(id);
780+
}
781+
648782
async quit() {
649783
this.logger.info("Shutting down schedule engine");
650784

internal-packages/schedule-engine/test/scheduleEngine.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import { containerTest } from "@internal/testcontainers";
22
import { trace } from "@internal/tracing";
3-
import { describe, expect, vi } from "vitest";
4-
import { ScheduleEngine } from "../src/index.js";
53
import { setTimeout } from "timers/promises";
4+
import { describe, expect, vi } from "vitest";
65
import { TriggerScheduledTaskParams } from "../src/engine/types.js";
6+
import { ScheduleEngine } from "../src/index.js";
77

88
describe("ScheduleEngine Integration", () => {
99
containerTest(

0 commit comments

Comments
 (0)