Skip to content

Commit

Permalink
support basic batch job preempt
Browse files Browse the repository at this point in the history
  • Loading branch information
carmark committed Mar 12, 2020
1 parent c39776e commit facc1a8
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 7 deletions.
13 changes: 12 additions & 1 deletion pkg/scheduler/api/job_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"sort"
"strings"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

Expand Down Expand Up @@ -387,3 +387,14 @@ func (ji *JobInfo) Pipelined() bool {

return occupied >= ji.MinAvailable
}

func (ji *JobInfo) Running() bool {
occupid := 0
for status, tasks := range ji.TaskStatusIndex {
if status == Running {
occupid = occupid + len(tasks)
}
}

return int32(occupid) >= ji.MinAvailable
}
10 changes: 4 additions & 6 deletions pkg/scheduler/plugins/gang/gang.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,19 @@ func (gp *gangPlugin) OnSessionOpen(ssn *framework.Session) {

preemptableFn := func(preemptor *api.TaskInfo, preemptees []*api.TaskInfo) []*api.TaskInfo {
var victims []*api.TaskInfo
jobOccupidMap := map[api.JobID]int32{}
var pJob *api.JobInfo = ssn.Jobs[preemptor.Job]

for _, preemptee := range preemptees {
job := ssn.Jobs[preemptee.Job]
if _, found := jobOccupidMap[job.UID]; !found {
jobOccupidMap[job.UID] = job.ReadyTaskNum()
if job.Running() == false {
continue
}
occupid := jobOccupidMap[job.UID]
preemptable := job.MinAvailable <= occupid-1 || job.MinAvailable == 1
preemptable := pJob.Priority > job.Priority

if !preemptable {
klog.V(4).Infof("Can not preempt task <%v/%v> because of gang-scheduling",
preemptee.Namespace, preemptee.Name)
} else {
jobOccupidMap[job.UID] = occupid - 1
victims = append(victims, preemptee)
}
}
Expand Down

0 comments on commit facc1a8

Please # to comment.