This repository has been archived by the owner on Jun 28, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
sched_algs.py
139 lines (107 loc) · 4.24 KB
/
sched_algs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#!/usr/bin/env python3
"""
Definitions for scheduling algorithms like First Come First Served (FCFS) scheduling,
Shortest Job First (preemptive) / Shortest Remaining Time First scheduling, Priority
Scheduling (preemptive) and Round Robin Scheduling (preemptive).
"""
from typing import Callable, List, Tuple
from sortedcontainers import SortedKeyList
def fcfs_schedule(tasks: List[dict]):
"""
Schedule tasks according to FCFS algorithm and set waiting and turnaround times.
"""
timer = tasks[0]["arrival_time"]
for task in tasks:
# Set waiting and turnaround time of process
task["waiting_time"] = timer - task["arrival_time"]
task["turnaround_time"] = task["waiting_time"] + task["burst_time"]
timer += task["burst_time"]
def _schedule_key(
tasks: List[dict], quantum: int, key: Callable[[dict], Tuple[int, int, int]]
):
"""
Schedule tasks according to algorithm determined by the key and set waiting and
turnaround times.
"""
tasks_sorted = SortedKeyList(key=key)
tasks_sorted.add(tasks[0])
end = 1
timer = tasks[0]["arrival_time"]
while (num := len(tasks_sorted)) > 0:
# Add tasks that have arrived after previous iteration
for task in tasks[end:]:
if task["arrival_time"] <= timer:
task["waiting_time"] = timer - task["arrival_time"]
task["turnaround_time"] = task["waiting_time"]
tasks_sorted.add(task)
num += 1
end += 1
min_task = tasks_sorted[0]
t_rem = min_task["burst_time"] - min_task["exec_time"]
time = min([quantum, t_rem])
# Increment waiting and turnaround time of all other processes
for i in range(1, num):
tasks_sorted[i]["waiting_time"] += time
tasks_sorted[i]["turnaround_time"] += time
# Remove process and execute process
task = tasks_sorted.pop(0)
task["exec_time"] += time
task["turnaround_time"] += time
timer += time
# Insert only if execution time is left
if task["exec_time"] < task["burst_time"]:
tasks_sorted.add(task)
def rr_schedule(tasks: List[dict], quantum: int):
"""
Schedule tasks according to preemptive Round Robin algorithm and set waiting and
turnaround times.
"""
tasks_queued = []
tasks_queued.append(tasks[0])
end = 1
timer = tasks[0]["arrival_time"]
while (num := len(tasks_queued)) > 0:
# Add tasks that have arrived after previous iteration
for task in tasks[end:]:
if task["arrival_time"] <= timer:
task["waiting_time"] = timer - task["arrival_time"]
task["turnaround_time"] = task["waiting_time"]
tasks_queued.append(task)
num += 1
end += 1
t_rem = tasks_queued[0]["burst_time"] - tasks_queued[0]["exec_time"]
time = min([quantum, t_rem])
# Increment waiting and turnaround time of all other processes
for i in range(1, num):
tasks_queued[i]["waiting_time"] += time
tasks_queued[i]["turnaround_time"] += time
# Remove process and execute process
task = tasks_queued.pop(0)
task["exec_time"] += time
task["turnaround_time"] += time
timer += time
# Insert only if execution time is left
if task["exec_time"] < task["burst_time"]:
tasks_queued.append(task)
def sjf_schedule(tasks: List[dict], quantum: int):
"""
Schedule tasks according to preemptive SJF algorithm and set waiting and turnaround
times.
"""
get_remaining_exec_time: Callable[[dict], Tuple[int, int, int]] = lambda task: (
task["burst_time"] - task["exec_time"],
task["arrival_time"],
tasks.index(task),
)
_schedule_key(tasks, quantum, get_remaining_exec_time)
def priority_schedule(tasks: List[dict], quantum: int):
"""
Schedule tasks according to preemptive priority scheduling algorithm and set waiting
and turnaround times.
"""
get_nice: Callable[[dict], Tuple[int, int, int]] = lambda task: (
task["nice"],
task["arrival_time"],
tasks.index(task),
)
_schedule_key(tasks, quantum, get_nice)