forked from Suwayomi/Suwayomi-Server
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathHAScheduler.kt
218 lines (172 loc) · 8.15 KB
/
HAScheduler.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
package suwayomi.tachidesk.util
import com.cronutils.model.CronType.CRON4J
import com.cronutils.model.definition.CronDefinitionBuilder
import com.cronutils.model.time.ExecutionTime
import com.cronutils.parser.CronParser
import it.sauronsoftware.cron4j.Scheduler
import it.sauronsoftware.cron4j.Task
import it.sauronsoftware.cron4j.TaskExecutionContext
import mu.KotlinLogging
import java.time.ZonedDateTime
import java.util.PriorityQueue
import java.util.Timer
import java.util.TimerTask
import java.util.UUID
import kotlin.time.Duration
import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds
val cronParser = CronParser(CronDefinitionBuilder.instanceDefinitionFor(CRON4J))
abstract class BaseHATask(val id: String, val execute: () -> Unit, val name: String?) : Comparable<BaseHATask> {
abstract fun getLastExecutionTime(): Long
abstract fun getNextExecutionTime(): Long
abstract fun getTimeToNextExecution(): Long
override fun compareTo(other: BaseHATask): Int {
return getTimeToNextExecution().compareTo(other.getTimeToNextExecution())
}
}
class HACronTask(id: String, val cronExpr: String, execute: () -> Unit, name: String?) : BaseHATask(id, execute, name) {
private val executionTime = ExecutionTime.forCron(cronParser.parse(cronExpr))
override fun getLastExecutionTime(): Long {
return executionTime.lastExecution(ZonedDateTime.now()).get().toEpochSecond().seconds.inWholeMilliseconds
}
override fun getNextExecutionTime(): Long {
return executionTime.nextExecution(ZonedDateTime.now()).get().toEpochSecond().seconds.inWholeMilliseconds
}
override fun getTimeToNextExecution(): Long {
return executionTime.timeToNextExecution(ZonedDateTime.now()).get().toMillis()
}
}
class HATask(id: String, val interval: Long, execute: () -> Unit, val timerTask: TimerTask, name: String?) : BaseHATask(id, execute, name) {
private val firstExecutionTime = System.currentTimeMillis() + interval
private fun getElapsedTimeOfCurrentInterval(): Long {
val timeSinceFirstExecution = System.currentTimeMillis() - firstExecutionTime
return timeSinceFirstExecution % interval
}
override fun getLastExecutionTime(): Long {
return System.currentTimeMillis() - getElapsedTimeOfCurrentInterval()
}
override fun getNextExecutionTime(): Long {
return System.currentTimeMillis() + getTimeToNextExecution()
}
override fun getTimeToNextExecution(): Long {
return interval - getElapsedTimeOfCurrentInterval()
}
}
/**
* The "HAScheduler" ("HibernateAwareScheduler") is a scheduler that recognizes when the system was hibernating/suspended
* and triggers tasks that have missed their execution points.
*/
object HAScheduler {
private val logger = KotlinLogging.logger { }
private val scheduledTasks = PriorityQueue<BaseHATask>()
private val scheduler = Scheduler()
private val timer = Timer()
private val HIBERNATION_THRESHOLD = 10.seconds.inWholeMilliseconds
private const val TASK_THRESHOLD = 0.1
init {
scheduleHibernateCheckerTask(1.minutes)
}
private fun scheduleHibernateCheckerTask(interval: Duration) {
timer.scheduleAtFixedRate(
object : TimerTask() {
var lastExecutionTime = System.currentTimeMillis()
override fun run() {
val currentTime = System.currentTimeMillis()
val elapsedTime = currentTime - lastExecutionTime
lastExecutionTime = currentTime
val systemWasInHibernation = elapsedTime > interval.inWholeMilliseconds + HIBERNATION_THRESHOLD
if (systemWasInHibernation) {
logger.debug { "System hibernation detected, task was delayed by ${elapsedTime - interval.inWholeMilliseconds}ms" }
scheduledTasks.forEach {
val wasLastExecutionMissed = currentTime - it.getLastExecutionTime() - elapsedTime < 0
if (wasLastExecutionMissed) {
logger.debug { "Task \"${it.name ?: it.id}\" missed its execution, executing now..." }
when (it) {
is HATask -> reschedule(it.id, it.interval)
is HACronTask -> {
rescheduleCron(it.id, it.cronExpr)
it.execute()
}
}
}
// queue is ordered by next execution time, thus, loop can be exited early
if (!wasLastExecutionMissed) {
return@forEach
}
}
}
}
},
interval.inWholeMilliseconds,
interval.inWholeMilliseconds
)
}
private fun createTimerTask(interval: Long, execute: () -> Unit): TimerTask {
return object : TimerTask() {
var lastExecutionTime: Long = 0
override fun run() {
// If a task scheduled via "Timer::scheduleAtFixedRate" is delayed for some reason, the Timer will
// trigger tasks in quick succession to "catch up" to the set interval.
//
// We want to prevent this, since we don't care about how many executions were missed and only want
// one execution to be triggered for these missed executions.
//
// The missed execution gets triggered by "HAScheduler::scheduleHibernateCheckerTask" and thus, we
// debounce this behaviour of "Timer::scheduleAtFixedRate".
val isCatchUpExecution = System.currentTimeMillis() - lastExecutionTime < interval - HIBERNATION_THRESHOLD
if (isCatchUpExecution) {
return
}
lastExecutionTime = System.currentTimeMillis()
execute()
}
}
}
fun schedule(execute: () -> Unit, interval: Long, delay: Long, name: String?): String {
val taskId = UUID.randomUUID().toString()
val task = createTimerTask(interval, execute)
scheduledTasks.add(HATask(taskId, interval, execute, task, name))
timer.scheduleAtFixedRate(task, delay, interval)
return taskId
}
fun deschedule(taskId: String): HATask? {
val task = (scheduledTasks.find { it.id == taskId } ?: return null) as HATask
task.timerTask.cancel()
scheduledTasks.remove(task)
return task
}
fun reschedule(taskId: String, interval: Long) {
val task = deschedule(taskId) ?: return
val timerTask = createTimerTask(interval, task.execute)
val timeToNextExecution = task.getTimeToNextExecution()
val intervalDifference = interval - task.interval
val remainingTimeTillNextExecution = (timeToNextExecution + intervalDifference).coerceAtLeast(0)
scheduledTasks.add(HATask(taskId, interval, task.execute, timerTask, task.name))
timer.scheduleAtFixedRate(timerTask, remainingTimeTillNextExecution, interval)
}
fun scheduleCron(execute: () -> Unit, cronExpr: String, name: String?): String {
if (!scheduler.isStarted) {
scheduler.start()
}
val taskId = scheduler.schedule(
cronExpr,
object : Task() {
override fun execute(context: TaskExecutionContext?) {
execute()
}
}
)
scheduledTasks.add(HACronTask(taskId, cronExpr, execute, name))
return taskId
}
fun descheduleCron(taskId: String) {
scheduler.deschedule(taskId)
scheduledTasks.removeIf { it.id == taskId }
}
fun rescheduleCron(taskId: String, cronExpr: String) {
val task = scheduledTasks.find { it.id == taskId } ?: return
scheduledTasks.remove(task)
scheduledTasks.add(HACronTask(taskId, cronExpr, task.execute, task.name))
scheduler.reschedule(taskId, cronExpr)
}
}