Skip to content

Commit

Permalink
Don't retry requeuing jobs with missing payloads
Browse files Browse the repository at this point in the history
  • Loading branch information
raphael committed Oct 18, 2024
1 parent c9bf7ad commit 79b55e1
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 8 deletions.
21 changes: 14 additions & 7 deletions pool/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,11 +749,17 @@ func (node *Node) processInactiveWorkers(ctx context.Context) {
if !ok {
continue // worker is already being deleted
}
mustRequeue := len(keys)
requeued := make(map[string]chan error)
for _, key := range keys {
payload, ok := node.jobPayloadsMap.Get(key)
if !ok {
node.logger.Error(fmt.Errorf("processInactiveWorkers: payload for job not found"), "job", key, "worker", id)
// No need to keep the job around if the payload is not found.
if _, _, err := node.jobsMap.RemoveValues(ctx, id, key); err != nil {
node.logger.Error(fmt.Errorf("processInactiveWorkers: failed to remove job %q from jobs map: %w", key, err), "job", key, "worker", id)
}
mustRequeue--
continue
}
job := &Job{
Expand All @@ -770,16 +776,15 @@ func (node *Node) processInactiveWorkers(ctx context.Context) {
requeued[job.Key] = cherr
}

if len(requeued) != len(keys) {
node.logger.Error(fmt.Errorf("processInactiveWorkers: failed to requeue all inactive jobs: %d/%d, will retry later", len(requeued), len(keys)), "worker", id)
continue
if len(requeued) != mustRequeue {
node.logger.Error(fmt.Errorf("processInactiveWorkers: failed to requeue all inactive jobs: %d/%d, will retry later", len(requeued), mustRequeue), "worker", id)
}
go node.processRequeuedJobs(ctx, id, requeued)
go node.processRequeuedJobs(ctx, id, requeued, len(requeued) == mustRequeue)
}
}

// processRequeuedJobs processes the requeued jobs concurrently.
func (node *Node) processRequeuedJobs(ctx context.Context, id string, requeued map[string]chan error) {
func (node *Node) processRequeuedJobs(ctx context.Context, id string, requeued map[string]chan error, deleteWorker bool) {
var wg sync.WaitGroup
var succeeded int64
for key, cherr := range requeued {
Expand All @@ -806,8 +811,10 @@ func (node *Node) processRequeuedJobs(ctx context.Context, id string, requeued m
}

node.logger.Info("requeued worker jobs", "worker", id, "requeued", len(requeued))
if err := node.deleteWorker(ctx, id); err != nil {
node.logger.Error(fmt.Errorf("processRequeuedJobs: failed to delete worker %q: %w", id, err), "worker", id)
if deleteWorker {
if err := node.deleteWorker(ctx, id); err != nil {
node.logger.Error(fmt.Errorf("processRequeuedJobs: failed to delete worker %q: %w", id, err), "worker", id)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion pool/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (w *Worker) rebalance(ctx context.Context, activeWorkers []string) {
delete(rebalanced, key)
cherrs[key] = cherr
}
go w.Node.processRequeuedJobs(ctx, w.ID, cherrs)
go w.Node.processRequeuedJobs(ctx, w.ID, cherrs, false)
}

// requeueJobs requeues the jobs handled by the worker.
Expand Down

0 comments on commit 79b55e1

Please # to comment.