Skip to content

Commit

Permalink
fix(allocator): job capacity exceeded on runners (#723)
Browse files Browse the repository at this point in the history
Fixes a bug which allowed a runner's job capacity to be exceeded.

The allocator is responsible for allocated jobs to a runner. It should
respect the runner's capacity, i.e. the max number of jobs it is
permitted to run at once. It keeps a tally of the number of jobs
allocated to each runner. However, the allocator would also subscribe to
runner events and whenever an event arrived, it would overwrite its
local tally with the tally embedded in the event. That's problematic
because an event is something that happened in the past, and it can be
stale, containing a lower tally than that maintained locally. The
allocator would then wrongly allocate another job to the runner.

This fix instead maintains a local tally for each runner in the
allocator and ignores the tallies in events.
  • Loading branch information
leg100 authored Jan 21, 2025
1 parent fc476c5 commit 53cae20
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 24 deletions.
79 changes: 66 additions & 13 deletions internal/runner/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ type allocator struct {
// service for seeding and streaming pools, runners, and jobs, and for
// allocating jobs to runners.
client allocatorClient
// runners to allocate jobs to, keyed by runner ID
// runners keyed by runner ID
runners map[resource.ID]*RunnerMeta
// jobs awaiting allocation to an runner, keyed by job ID
// jobs keyed by job ID
jobs map[resource.ID]*Job
// total current jobs allocated to each runner keyed by runner ID
currentJobs map[resource.ID]int
}

type allocatorClient interface {
Expand Down Expand Up @@ -63,8 +65,17 @@ func (a *allocator) Start(ctx context.Context) error {
return pubsub.ErrSubscriptionTerminated
}
switch event.Type {
case pubsub.CreatedEvent:
a.addRunner(event.Payload)
case pubsub.UpdatedEvent:
switch event.Payload.Status {
case RunnerExited, RunnerErrored:
// Delete runners in terminal state.
a.deleteRunner(event.Payload)
}
a.addRunner(event.Payload)
case pubsub.DeletedEvent:
delete(a.runners, event.Payload.ID)
a.deleteRunner(event.Payload)
default:
a.runners[event.Payload.ID] = event.Payload
}
Expand All @@ -87,11 +98,17 @@ func (a *allocator) Start(ctx context.Context) error {

func (a *allocator) seed(runners []*RunnerMeta, jobs []*Job) {
a.runners = make(map[resource.ID]*RunnerMeta, len(runners))
a.currentJobs = make(map[resource.ID]int, len(runners))
for _, runner := range runners {
a.runners[runner.ID] = runner
a.addRunner(runner)
}
a.jobs = make(map[resource.ID]*Job, len(jobs))
for _, job := range jobs {
// skip jobs in terminal state
switch job.Status {
case JobErrored, JobCanceled, JobFinished:
continue
}
a.jobs[job.ID] = job
}
}
Expand Down Expand Up @@ -121,23 +138,22 @@ func (a *allocator) allocate(ctx context.Context) error {
// job has completed: remove and adjust number of current jobs
// runner has
delete(a.jobs, job.ID)
a.runners[*job.RunnerID].CurrentJobs--
a.decrementCurrentJobs(*job.RunnerID)
continue
default:
// job running; ignore
continue
}
// allocate job to available runner
var available []*RunnerMeta
var (
available []*RunnerMeta
insufficientCapacity bool
)
for _, runner := range a.runners {
if runner.Status != RunnerIdle && runner.Status != RunnerBusy {
// skip runners that are not ready for jobs
continue
}
// skip runners with insufficient capacity
if runner.CurrentJobs == runner.MaxJobs {
continue
}
if runner.AgentPool == nil {
// if runner has a nil agent pool ID then it is a server
// runner and it only handles jobs with a nil pool ID.
Expand All @@ -151,10 +167,20 @@ func (a *allocator) allocate(ctx context.Context) error {
continue
}
}
// skip runners with insufficient capacity
if a.currentJobs[runner.ID] == runner.MaxJobs {
insufficientCapacity = true
continue
}
available = append(available, runner)
}
if len(available) == 0 {
a.Error(nil, "no available runners found for job", "job", job)
// If there is at least one appropriate runner but it has
// insufficient capacity then it is a normal and temporary issue and
// not worthy of reporting as an error.
if !insufficientCapacity {
a.Error(nil, "no available runners found for job", "job", job)
}
continue
}
// select runner that has most recently sent a ping
Expand All @@ -177,15 +203,42 @@ func (a *allocator) allocate(ctx context.Context) error {
if err != nil {
return err
}
a.runners[from].CurrentJobs--
a.decrementCurrentJobs(from)
} else {
updatedJob, err = a.client.allocateJob(ctx, job.ID, runner.ID)
if err != nil {
return err
}
}
a.jobs[job.ID] = updatedJob
a.runners[runner.ID].CurrentJobs++
a.incrementCurrentJobs(runner.ID)
}
return nil
}

func (a *allocator) addRunner(runner *RunnerMeta) {
// skip runners in terminal state (exited, errored)
switch runner.Status {
case RunnerExited, RunnerErrored:
return
}
a.runners[runner.ID] = runner
a.currentJobs[runner.ID] = runner.CurrentJobs
currentJobsMetric.WithLabelValues(runner.ID.String()).Set(float64(runner.CurrentJobs))
}

func (a *allocator) deleteRunner(runner *RunnerMeta) {
delete(a.runners, runner.ID)
delete(a.currentJobs, runner.ID)
currentJobsMetric.DeleteLabelValues(runner.ID.String())
}

func (a *allocator) incrementCurrentJobs(runnerID resource.ID) {
a.currentJobs[runnerID]++
currentJobsMetric.WithLabelValues(runnerID.String()).Inc()
}

func (a *allocator) decrementCurrentJobs(runnerID resource.ID) {
a.currentJobs[runnerID]--
currentJobsMetric.WithLabelValues(runnerID.String()).Dec()
}
29 changes: 18 additions & 11 deletions internal/runner/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func TestAllocator_allocate(t *testing.T) {
wantJob *Job
// want these runners after allocation
wantRunners map[resource.ID]*RunnerMeta
// want this tally of current jobs after allocation
wantCurrentJobs map[resource.ID]int
}{
{
name: "allocate job to server runner",
Expand All @@ -76,8 +78,9 @@ func TestAllocator_allocate(t *testing.T) {
RunnerID: &runner1ID,
},
wantRunners: map[resource.ID]*RunnerMeta{
runner1ID: {ID: runner1ID, Status: RunnerIdle, MaxJobs: 1, CurrentJobs: 1},
runner1ID: {ID: runner1ID, Status: RunnerIdle, MaxJobs: 1},
},
wantCurrentJobs: map[resource.ID]int{runner1ID: 1},
},
{
name: "allocate job to agent that has pinged more recently than another",
Expand All @@ -95,9 +98,10 @@ func TestAllocator_allocate(t *testing.T) {
RunnerID: &runner1ID,
},
wantRunners: map[resource.ID]*RunnerMeta{
runner1ID: {ID: runner1ID, Status: RunnerIdle, MaxJobs: 1, CurrentJobs: 1, LastPingAt: now},
runner2ID: {ID: runner2ID, Status: RunnerIdle, MaxJobs: 1, CurrentJobs: 0, LastPingAt: now.Add(-time.Second)},
runner1ID: {ID: runner1ID, Status: RunnerIdle, MaxJobs: 1, LastPingAt: now},
runner2ID: {ID: runner2ID, Status: RunnerIdle, MaxJobs: 1, LastPingAt: now.Add(-time.Second)},
},
wantCurrentJobs: map[resource.ID]int{runner1ID: 1},
},
{
name: "allocate job to pool agent",
Expand All @@ -124,8 +128,9 @@ func TestAllocator_allocate(t *testing.T) {
RunnerID: &runner1ID,
},
wantRunners: map[resource.ID]*RunnerMeta{
runner1ID: {ID: runner1ID, Status: RunnerIdle, MaxJobs: 1, CurrentJobs: 1, AgentPool: &RunnerMetaAgentPool{ID: pool1ID}},
runner1ID: {ID: runner1ID, Status: RunnerIdle, MaxJobs: 1, AgentPool: &RunnerMetaAgentPool{ID: pool1ID}},
},
wantCurrentJobs: map[resource.ID]int{runner1ID: 1},
},
{
name: "do not allocate job to agent with insufficient capacity",
Expand All @@ -148,8 +153,8 @@ func TestAllocator_allocate(t *testing.T) {
{
name: "re-allocate job from unresponsive agent",
runners: []*RunnerMeta{
{ID: runner1ID, Status: RunnerUnknown, CurrentJobs: 1},
{ID: runner2ID, Status: RunnerIdle, MaxJobs: 1, CurrentJobs: 0},
{ID: runner1ID, Status: RunnerUnknown},
{ID: runner2ID, Status: RunnerIdle, MaxJobs: 1},
},
job: &Job{
ID: job1ID,
Expand All @@ -162,20 +167,22 @@ func TestAllocator_allocate(t *testing.T) {
RunnerID: &runner2ID,
},
wantRunners: map[resource.ID]*RunnerMeta{
runner1ID: {ID: runner1ID, Status: RunnerUnknown, CurrentJobs: 0},
runner2ID: {ID: runner2ID, Status: RunnerIdle, MaxJobs: 1, CurrentJobs: 1},
runner1ID: {ID: runner1ID, Status: RunnerUnknown},
runner2ID: {ID: runner2ID, Status: RunnerIdle, MaxJobs: 1},
},
wantCurrentJobs: map[resource.ID]int{runner2ID: 1},
},
{
name: "de-allocate finished job",
name: "deallocate finished job",
runners: []*RunnerMeta{{ID: runner1ID, CurrentJobs: 1}},
job: &Job{
ID: job1ID,
Status: JobFinished,
RunnerID: &runner1ID,
},
wantJob: nil,
wantRunners: map[resource.ID]*RunnerMeta{runner1ID: {ID: runner1ID, CurrentJobs: 0}},
wantJob: nil,
wantRunners: map[resource.ID]*RunnerMeta{runner1ID: {ID: runner1ID, CurrentJobs: 1}},
wantCurrentJobs: map[resource.ID]int{runner1ID: 0},
},
{
name: "ignore running job",
Expand Down
16 changes: 16 additions & 0 deletions internal/runner/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package runner

import "github.com/prometheus/client_golang/prometheus"

func init() {
prometheus.MustRegister(currentJobsMetric)
}

const runnerIDLabel = "runner_id"

var currentJobsMetric = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "otf",
Subsystem: "allocator",
Name: "current_jobs",
Help: "Current jobs by runner ID",
}, []string{runnerIDLabel})

0 comments on commit 53cae20

Please # to comment.