diff --git a/pool/node.go b/pool/node.go index 200f1f5..f229b26 100644 --- a/pool/node.go +++ b/pool/node.go @@ -397,21 +397,6 @@ func (node *Node) Shutdown(ctx context.Context) error { node.logger.Error(fmt.Errorf("Shutdown: failed to destroy pool stream: %w", err)) } - // Cleanup the jobs payloads map as ongoing requeues could prevent it - // from being cleaned up by the workers. - if err := node.jobPayloadsMap.Reset(ctx); err != nil { - node.logger.Error(fmt.Errorf("Shutdown: failed to reset job payloads map: %w", err)) - } - - // Now clean up the shutdown replicated map. - wsm, err := rmap.Join(ctx, shutdownMapName(node.PoolName), node.rdb, rmap.WithLogger(node.logger)) - if err != nil { - node.logger.Error(fmt.Errorf("Shutdown: failed to join shutdown map for cleanup: %w", err)) - } - if err := wsm.Reset(ctx); err != nil { - node.logger.Error(fmt.Errorf("Shutdown: failed to reset shutdown map: %w", err)) - } - node.logger.Info("shutdown complete") return nil } @@ -421,7 +406,7 @@ func (node *Node) Shutdown(ctx context.Context) error { // workers of the node. One of Shutdown or Close should be called before the // node is garbage collected unless it is client-only. func (node *Node) Close(ctx context.Context) error { - return node.close(ctx, true) + return node.close(ctx, false) } // IsShutdown returns true if the pool is shutdown. @@ -440,7 +425,7 @@ func (node *Node) IsClosed() bool { // close is the internal implementation of Close. It handles the actual closing // process and optionally requeues jobs. -func (node *Node) close(ctx context.Context, requeue bool) error { +func (node *Node) close(ctx context.Context, shutdown bool) error { node.lock.Lock() if node.closing { node.lock.Unlock() @@ -451,8 +436,8 @@ func (node *Node) close(ctx context.Context, requeue bool) error { node.logger.Info("closing") - // If we're not requeuing then stop all the jobs. - if !requeue { + // If we're shutting down then stop all the jobs. + if shutdown { node.logger.Info("stopping all jobs") var wg sync.WaitGroup var total atomic.Int32 @@ -487,37 +472,48 @@ func (node *Node) close(ctx context.Context, requeue bool) error { return true }) wg.Wait() + node.localWorkers.Range(func(key, value any) bool { + worker := value.(*Worker) + worker.cleanup(ctx) + return true + }) node.logger.Debug("workers stopped") // Requeue jobs. - if requeue { + if !shutdown { var wg sync.WaitGroup node.localWorkers.Range(func(key, value any) bool { wg.Add(1) - worker := value.(*Worker) pulse.Go(ctx, func() { defer wg.Done() - if err := worker.requeueJobs(ctx); err != nil { - node.logger.Error(fmt.Errorf("Close: failed to requeue jobs for worker %q: %w", worker.ID, err)) - return + if err := value.(*Worker).requeueJobs(ctx); err != nil { + node.logger.Error(fmt.Errorf("Close: failed to requeue jobs for worker %q: %w", key, err)) } }) return true }) wg.Wait() + node.logger.Debug("jobs requeued") } - // Cleanup - node.localWorkers.Range(func(key, value any) bool { - worker := value.(*Worker) - worker.cleanup(ctx) - return true - }) if !node.clientOnly { + if shutdown { + if err := node.jobPayloadsMap.Reset(ctx); err != nil { + node.logger.Error(fmt.Errorf("Shutdown: failed to reset job payloads map: %w", err)) + } + if err := node.shutdownMap.Reset(ctx); err != nil { + node.logger.Error(fmt.Errorf("Shutdown: failed to reset shutdown map: %w", err)) + } + } node.poolSink.Close() node.tickerMap.Close() node.keepAliveMap.Close() + node.jobPayloadsMap.Close() + node.workerMap.Close() + node.jobsMap.Close() + node.tickerMap.Close() } + node.shutdownMap.Close() node.nodeReader.Close() if err := node.nodeStream.Destroy(ctx); err != nil { node.logger.Error(fmt.Errorf("Close: failed to destroy node event stream: %w", err)) @@ -629,7 +625,7 @@ func (node *Node) ackWorkerEvent(ctx context.Context, ev *streaming.Event) { return } ack.EventID = pending.ID - if _, err := stream.Add(ctx, evDispatchReturn, marshalAck(ack), soptions.WithOnlyIfStreamExists()); err != nil { + if _, err := stream.Add(ctx, evDispatchReturn, marshalAck(ack)); err != nil { node.logger.Error(fmt.Errorf("ackWorkerEvent: failed to dispatch return to stream %q: %w", nodeStreamName(node.PoolName, nodeID), err)) } } @@ -646,11 +642,11 @@ func (node *Node) ackWorkerEvent(ctx context.Context, ev *streaming.Event) { ev := value.(*streaming.Event) if time.Since(ev.CreatedAt()) > pendingEventTTL { staleKeys = append(staleKeys, key.(string)) + node.logger.Error(fmt.Errorf("ackWorkerEvent: stale event, removing from pending events"), "event", ev.EventName, "id", ev.ID, "since", time.Since(ev.CreatedAt()), "TTL", pendingEventTTL) } return true }) for _, key := range staleKeys { - node.logger.Error(fmt.Errorf("ackWorkerEvent: stale event, removing from pending events"), "event", ev.EventName, "id", ev.ID, "since", time.Since(ev.CreatedAt()), "TTL", pendingEventTTL) node.pendingEvents.Delete(key) } } @@ -664,9 +660,8 @@ func (node *Node) returnDispatchStatus(_ context.Context, ev *streaming.Event) { return } node.logger.Debug("dispatch return", "event", ev.EventName, "id", ev.ID, "ack-id", ack.EventID) - cherr := val.(chan error) - if cherr == nil { - // Event was requeued. + if val == nil { + // Event was requeued, just clean up node.pendingJobs.Delete(ack.EventID) return } @@ -674,7 +669,7 @@ func (node *Node) returnDispatchStatus(_ context.Context, ev *streaming.Event) { if ack.Error != "" { err = errors.New(ack.Error) } - cherr <- err + val.(chan error) <- err } // manageWorkers monitors the workers replicated map and triggers job rebalancing @@ -682,17 +677,9 @@ func (node *Node) returnDispatchStatus(_ context.Context, ev *streaming.Event) { func (node *Node) manageWorkers(ctx context.Context) { defer node.wg.Done() defer node.logger.Debug("manageWorkers: exiting") - defer node.workerMap.Close() - - ch := node.workerMap.Subscribe() - for { - select { - case <-ch: - node.logger.Debug("manageWorkers: worker map updated") - node.handleWorkerMapUpdate(ctx) - case <-node.stop: - return - } + for range node.workerMap.Subscribe() { + node.logger.Debug("manageWorkers: worker map updated") + node.handleWorkerMapUpdate(ctx) } } @@ -708,7 +695,9 @@ func (node *Node) handleWorkerMapUpdate(ctx context.Context) { // If it's not in the worker map, then it's not active and its jobs // have already been requeued. node.logger.Info("handleWorkerMapUpdate: removing inactive local worker", "worker", worker.ID) - node.deleteWorker(ctx, worker.ID) + if err := node.deleteWorker(ctx, worker.ID); err != nil { + node.logger.Error(fmt.Errorf("handleWorkerMapUpdate: failed to delete inactive worker %q: %w", worker.ID, err), "worker", worker.ID) + } worker.stopAndWait(ctx) node.localWorkers.Delete(key) return true @@ -755,17 +744,9 @@ func (node *Node) requeueJob(ctx context.Context, workerID string, job *Job) (ch func (node *Node) manageShutdown(ctx context.Context) { defer node.wg.Done() defer node.logger.Debug("manageShutdown: exiting") - defer node.shutdownMap.Close() - - ch := node.shutdownMap.Subscribe() - for { - select { - case <-ch: - node.logger.Debug("manageShutdown: shutdown map updated, initiating shutdown") - node.handleShutdownMapUpdate(ctx) - case <-node.stop: - return - } + for range node.shutdownMap.Subscribe() { + node.logger.Debug("manageShutdown: shutdown map updated") + node.handleShutdownMapUpdate(ctx) } } @@ -781,13 +762,13 @@ func (node *Node) handleShutdownMapUpdate(ctx context.Context) { // There is only one value in the map requestingNode = node } - node.logger.Info("shutdown", "requested-by", requestingNode) - node.close(ctx, false) + node.logger.Debug("manageShutdown: shutting down", "requested-by", requestingNode) + node.close(ctx, true) node.lock.Lock() node.shutdown = true node.lock.Unlock() - node.logger.Info("shutdown") + node.logger.Info("shutdown", "requested-by", requestingNode) } // manageInactiveWorkers periodically checks for inactive workers and requeues their jobs. diff --git a/pool/worker.go b/pool/worker.go index 5a4ef0e..2c1c4e2 100644 --- a/pool/worker.go +++ b/pool/worker.go @@ -278,6 +278,7 @@ func (w *Worker) stopJob(ctx context.Context, key string, forRequeue bool) error if err := w.handler.Stop(key); err != nil { return fmt.Errorf("failed to stop job %q: %w", key, err) } + w.logger.Debug("stopped job", "job", key) w.jobs.Delete(key) if _, _, err := w.jobsMap.RemoveValues(ctx, w.ID, key); err != nil { w.logger.Error(fmt.Errorf("stop job: failed to remove job %q from jobs map: %w", key, err)) @@ -375,6 +376,7 @@ func (w *Worker) rebalance(ctx context.Context, activeWorkers []string) { w.logger.Error(fmt.Errorf("rebalance: failed to stop job: %w", err), "job", key) continue } + w.logger.Debug("stopped job", "job", key) w.jobs.Delete(key) cherr, err := w.Node.requeueJob(ctx, w.ID, job) if err != nil { @@ -414,8 +416,8 @@ func (w *Worker) requeueJobs(ctx context.Context) error { if err != nil { return fmt.Errorf("requeueJobs: failed to mark worker as inactive: %w", err) } - if prev == "-" || prev == "" { - w.logger.Debug("requeueJobs: worker already marked as inactive, skipping requeue") + if prev == "-" { + w.logger.Debug("requeueJobs: jobs already requeued, skipping requeue") return nil } @@ -446,28 +448,24 @@ func (w *Worker) attemptRequeue(ctx context.Context, jobsToRequeue map[string]*J err error } resultChan := make(chan result, len(jobsToRequeue)) + defer close(resultChan) + wg.Add(len(jobsToRequeue)) for key, job := range jobsToRequeue { - wg.Add(1) - go func(k string, j *Job) { + pulse.Go(ctx, func() { defer wg.Done() - err := w.requeueJob(ctx, j) - resultChan <- result{key: k, err: err} - }(key, job) + w.logger.Debug("requeueJobs: requeuing", "job", key) + err := w.requeueJob(ctx, job) + w.logger.Debug("requeueJobs: requeued", "job", key, "error", err) + resultChan <- result{key: key, err: err} + }) } - - go func() { - wg.Wait() - close(resultChan) - }() + wg.Wait() remainingJobs := make(map[string]*Job) for { select { - case res, ok := <-resultChan: - if !ok { - return remainingJobs - } + case res := <-resultChan: if res.err != nil { w.logger.Error(fmt.Errorf("requeueJobs: failed to requeue job %q: %w", res.key, res.err)) remainingJobs[res.key] = jobsToRequeue[res.key] @@ -475,6 +473,10 @@ func (w *Worker) attemptRequeue(ctx context.Context, jobsToRequeue map[string]*J } delete(remainingJobs, res.key) w.logger.Info("requeued", "job", res.key) + if len(remainingJobs) == 0 { + w.logger.Debug("requeueJobs: all jobs requeued") + return remainingJobs + } case <-time.After(w.workerTTL): w.logger.Error(fmt.Errorf("requeueJobs: timeout reached, some jobs may not have been processed")) return remainingJobs