|
| 1 | + |
| 2 | +package suwayomi.tachidesk.util |
| 3 | + |
| 4 | +import com.cronutils.model.CronType.CRON4J |
| 5 | +import com.cronutils.model.definition.CronDefinitionBuilder |
| 6 | +import com.cronutils.model.time.ExecutionTime |
| 7 | +import com.cronutils.parser.CronParser |
| 8 | +import it.sauronsoftware.cron4j.Scheduler |
| 9 | +import it.sauronsoftware.cron4j.Task |
| 10 | +import it.sauronsoftware.cron4j.TaskExecutionContext |
| 11 | +import mu.KotlinLogging |
| 12 | +import java.time.ZonedDateTime |
| 13 | +import java.util.PriorityQueue |
| 14 | +import java.util.Timer |
| 15 | +import java.util.TimerTask |
| 16 | +import kotlin.time.Duration |
| 17 | +import kotlin.time.Duration.Companion.minutes |
| 18 | +import kotlin.time.Duration.Companion.seconds |
| 19 | + |
| 20 | +val cronParser = CronParser(CronDefinitionBuilder.instanceDefinitionFor(CRON4J)) |
| 21 | + |
| 22 | +class HATask(val id: String, val cronExpr: String, val execute: () -> Unit, val name: String?) : Comparable<HATask> { |
| 23 | + private val executionTime = ExecutionTime.forCron(cronParser.parse(cronExpr)) |
| 24 | + |
| 25 | + fun getLastExecutionTime(): Long { |
| 26 | + return executionTime.lastExecution(ZonedDateTime.now()).get().toEpochSecond().seconds.inWholeMilliseconds |
| 27 | + } |
| 28 | + |
| 29 | + fun getNextExecutionTime(): Long { |
| 30 | + return executionTime.nextExecution(ZonedDateTime.now()).get().toEpochSecond().seconds.inWholeMilliseconds |
| 31 | + } |
| 32 | + |
| 33 | + fun getTimeToNextExecution(): Long { |
| 34 | + return executionTime.timeToNextExecution(ZonedDateTime.now()).get().toMillis() |
| 35 | + } |
| 36 | + |
| 37 | + override fun compareTo(other: HATask): Int { |
| 38 | + return getTimeToNextExecution().compareTo(other.getTimeToNextExecution()) |
| 39 | + } |
| 40 | +} |
| 41 | + |
| 42 | +/** |
| 43 | + * The "HAScheduler" ("HibernateAwareScheduler") is a scheduler that recognizes when the system was hibernating/suspended |
| 44 | + * and triggers tasks that have missed their execution points. |
| 45 | + */ |
| 46 | +object HAScheduler { |
| 47 | + private val logger = KotlinLogging.logger { } |
| 48 | + |
| 49 | + private val scheduledTasks = PriorityQueue<HATask>() |
| 50 | + private val scheduler = Scheduler() |
| 51 | + |
| 52 | + private val HIBERNATION_THRESHOLD = 10.seconds.inWholeMilliseconds |
| 53 | + private const val TASK_THRESHOLD = 0.1 |
| 54 | + |
| 55 | + init { |
| 56 | + scheduleHibernateCheckerTask(1.minutes) |
| 57 | + } |
| 58 | + |
| 59 | + private fun scheduleHibernateCheckerTask(interval: Duration) { |
| 60 | + val timer = Timer() |
| 61 | + timer.scheduleAtFixedRate( |
| 62 | + object : TimerTask() { |
| 63 | + var lastExecutionTime = System.currentTimeMillis() |
| 64 | + |
| 65 | + override fun run() { |
| 66 | + val currentTime = System.currentTimeMillis() |
| 67 | + val elapsedTime = currentTime - lastExecutionTime |
| 68 | + lastExecutionTime = currentTime |
| 69 | + |
| 70 | + val systemWasInHibernation = elapsedTime > interval.inWholeMilliseconds + HIBERNATION_THRESHOLD |
| 71 | + if (systemWasInHibernation) { |
| 72 | + logger.debug { "System hibernation detected, task was delayed by ${elapsedTime - interval.inWholeMilliseconds}ms" } |
| 73 | + scheduledTasks.forEach { |
| 74 | + val missedExecution = currentTime - it.getLastExecutionTime() - elapsedTime < 0 |
| 75 | + val taskInterval = it.getNextExecutionTime() - it.getLastExecutionTime() |
| 76 | + // in case the next task execution doesn't take long the missed execution can be ignored to prevent a double execution |
| 77 | + val taskThresholdMet = taskInterval * TASK_THRESHOLD > it.getTimeToNextExecution() |
| 78 | + |
| 79 | + val triggerTask = missedExecution && taskThresholdMet |
| 80 | + if (triggerTask) { |
| 81 | + logger.debug { "Task \"${it.name ?: it.id}\" missed its execution, executing now..." } |
| 82 | + reschedule(it.id, it.cronExpr) |
| 83 | + it.execute() |
| 84 | + } |
| 85 | + |
| 86 | + // queue is ordered by next execution time, thus, loop can be exited early |
| 87 | + if (!missedExecution) { |
| 88 | + return@forEach |
| 89 | + } |
| 90 | + } |
| 91 | + } |
| 92 | + } |
| 93 | + }, |
| 94 | + interval.inWholeMilliseconds, |
| 95 | + interval.inWholeMilliseconds |
| 96 | + ) |
| 97 | + } |
| 98 | + |
| 99 | + fun schedule(execute: () -> Unit, cronExpr: String, name: String?): String { |
| 100 | + if (!scheduler.isStarted) { |
| 101 | + scheduler.start() |
| 102 | + } |
| 103 | + |
| 104 | + val taskId = scheduler.schedule( |
| 105 | + cronExpr, |
| 106 | + object : Task() { |
| 107 | + override fun execute(context: TaskExecutionContext?) { |
| 108 | + execute() |
| 109 | + } |
| 110 | + } |
| 111 | + ) |
| 112 | + |
| 113 | + scheduledTasks.add(HATask(taskId, cronExpr, execute, name)) |
| 114 | + |
| 115 | + return taskId |
| 116 | + } |
| 117 | + |
| 118 | + fun deschedule(taskId: String) { |
| 119 | + scheduler.deschedule(taskId) |
| 120 | + scheduledTasks.removeIf { it.id == taskId } |
| 121 | + } |
| 122 | + |
| 123 | + fun reschedule(taskId: String, cronExpr: String) { |
| 124 | + val task = scheduledTasks.find { it.id == taskId } ?: return |
| 125 | + |
| 126 | + scheduledTasks.remove(task) |
| 127 | + scheduledTasks.add(HATask(taskId, cronExpr, task.execute, task.name)) |
| 128 | + |
| 129 | + scheduler.reschedule(taskId, cronExpr) |
| 130 | + } |
| 131 | +} |
0 commit comments