diff --git a/pool/node.go b/pool/node.go index 60cf77f..8fa1e6b 100644 --- a/pool/node.go +++ b/pool/node.go @@ -749,11 +749,17 @@ func (node *Node) processInactiveWorkers(ctx context.Context) { if !ok { continue // worker is already being deleted } + mustRequeue := len(keys) requeued := make(map[string]chan error) for _, key := range keys { payload, ok := node.jobPayloadsMap.Get(key) if !ok { node.logger.Error(fmt.Errorf("processInactiveWorkers: payload for job not found"), "job", key, "worker", id) + // No need to keep the job around if the payload is not found. + if _, _, err := node.jobsMap.RemoveValues(ctx, id, key); err != nil { + node.logger.Error(fmt.Errorf("processInactiveWorkers: failed to remove job %q from jobs map: %w", key, err), "job", key, "worker", id) + } + mustRequeue-- continue } job := &Job{ @@ -770,16 +776,15 @@ func (node *Node) processInactiveWorkers(ctx context.Context) { requeued[job.Key] = cherr } - if len(requeued) != len(keys) { - node.logger.Error(fmt.Errorf("processInactiveWorkers: failed to requeue all inactive jobs: %d/%d, will retry later", len(requeued), len(keys)), "worker", id) - continue + if len(requeued) != mustRequeue { + node.logger.Error(fmt.Errorf("processInactiveWorkers: failed to requeue all inactive jobs: %d/%d, will retry later", len(requeued), mustRequeue), "worker", id) } - go node.processRequeuedJobs(ctx, id, requeued) + go node.processRequeuedJobs(ctx, id, requeued, len(requeued) == mustRequeue) } } // processRequeuedJobs processes the requeued jobs concurrently. -func (node *Node) processRequeuedJobs(ctx context.Context, id string, requeued map[string]chan error) { +func (node *Node) processRequeuedJobs(ctx context.Context, id string, requeued map[string]chan error, deleteWorker bool) { var wg sync.WaitGroup var succeeded int64 for key, cherr := range requeued { @@ -806,8 +811,10 @@ func (node *Node) processRequeuedJobs(ctx context.Context, id string, requeued m } node.logger.Info("requeued worker jobs", "worker", id, "requeued", len(requeued)) - if err := node.deleteWorker(ctx, id); err != nil { - node.logger.Error(fmt.Errorf("processRequeuedJobs: failed to delete worker %q: %w", id, err), "worker", id) + if deleteWorker { + if err := node.deleteWorker(ctx, id); err != nil { + node.logger.Error(fmt.Errorf("processRequeuedJobs: failed to delete worker %q: %w", id, err), "worker", id) + } } } diff --git a/pool/worker.go b/pool/worker.go index 9215d86..1c8f969 100644 --- a/pool/worker.go +++ b/pool/worker.go @@ -380,7 +380,7 @@ func (w *Worker) rebalance(ctx context.Context, activeWorkers []string) { delete(rebalanced, key) cherrs[key] = cherr } - go w.Node.processRequeuedJobs(ctx, w.ID, cherrs) + go w.Node.processRequeuedJobs(ctx, w.ID, cherrs, false) } // requeueJobs requeues the jobs handled by the worker.