Skip to content

Commit

Permalink
Fix a few issues
Browse files Browse the repository at this point in the history
* Make sure that adding a dispatch return event to a node stream creates the stream if needed.
  This could cause jobs that are added right on startup to fail to be dispatched.
* Fix potential panic in cleanup goroutine when the pending jobs map contains nil values
* Make sure to close all maps when closing the node.
* Properly log event information when discarding stale ack events.
* More consistent logging
  • Loading branch information
raphael committed Nov 28, 2024
1 parent 9881791 commit 785f03b
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 78 deletions.
105 changes: 43 additions & 62 deletions pool/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,21 +397,6 @@ func (node *Node) Shutdown(ctx context.Context) error {
node.logger.Error(fmt.Errorf("Shutdown: failed to destroy pool stream: %w", err))
}

// Cleanup the jobs payloads map as ongoing requeues could prevent it
// from being cleaned up by the workers.
if err := node.jobPayloadsMap.Reset(ctx); err != nil {
node.logger.Error(fmt.Errorf("Shutdown: failed to reset job payloads map: %w", err))
}

// Now clean up the shutdown replicated map.
wsm, err := rmap.Join(ctx, shutdownMapName(node.PoolName), node.rdb, rmap.WithLogger(node.logger))
if err != nil {
node.logger.Error(fmt.Errorf("Shutdown: failed to join shutdown map for cleanup: %w", err))
}
if err := wsm.Reset(ctx); err != nil {
node.logger.Error(fmt.Errorf("Shutdown: failed to reset shutdown map: %w", err))
}

node.logger.Info("shutdown complete")
return nil
}
Expand All @@ -421,7 +406,7 @@ func (node *Node) Shutdown(ctx context.Context) error {
// workers of the node. One of Shutdown or Close should be called before the
// node is garbage collected unless it is client-only.
func (node *Node) Close(ctx context.Context) error {
return node.close(ctx, true)
return node.close(ctx, false)
}

// IsShutdown returns true if the pool is shutdown.
Expand All @@ -440,7 +425,7 @@ func (node *Node) IsClosed() bool {

// close is the internal implementation of Close. It handles the actual closing
// process and optionally requeues jobs.
func (node *Node) close(ctx context.Context, requeue bool) error {
func (node *Node) close(ctx context.Context, shutdown bool) error {
node.lock.Lock()
if node.closing {
node.lock.Unlock()
Expand All @@ -451,8 +436,8 @@ func (node *Node) close(ctx context.Context, requeue bool) error {

node.logger.Info("closing")

// If we're not requeuing then stop all the jobs.
if !requeue {
// If we're shutting down then stop all the jobs.
if shutdown {
node.logger.Info("stopping all jobs")
var wg sync.WaitGroup
var total atomic.Int32
Expand Down Expand Up @@ -487,37 +472,48 @@ func (node *Node) close(ctx context.Context, requeue bool) error {
return true
})
wg.Wait()
node.localWorkers.Range(func(key, value any) bool {
worker := value.(*Worker)
worker.cleanup(ctx)
return true
})
node.logger.Debug("workers stopped")

// Requeue jobs.
if requeue {
if !shutdown {
var wg sync.WaitGroup
node.localWorkers.Range(func(key, value any) bool {
wg.Add(1)
worker := value.(*Worker)
pulse.Go(ctx, func() {
defer wg.Done()
if err := worker.requeueJobs(ctx); err != nil {
node.logger.Error(fmt.Errorf("Close: failed to requeue jobs for worker %q: %w", worker.ID, err))
return
if err := value.(*Worker).requeueJobs(ctx); err != nil {
node.logger.Error(fmt.Errorf("Close: failed to requeue jobs for worker %q: %w", key, err))
}
})
return true
})
wg.Wait()
node.logger.Debug("jobs requeued")
}

// Cleanup
node.localWorkers.Range(func(key, value any) bool {
worker := value.(*Worker)
worker.cleanup(ctx)
return true
})
if !node.clientOnly {
if shutdown {
if err := node.jobPayloadsMap.Reset(ctx); err != nil {
node.logger.Error(fmt.Errorf("Shutdown: failed to reset job payloads map: %w", err))
}
if err := node.shutdownMap.Reset(ctx); err != nil {
node.logger.Error(fmt.Errorf("Shutdown: failed to reset shutdown map: %w", err))
}
}
node.poolSink.Close()
node.tickerMap.Close()
node.keepAliveMap.Close()
node.jobPayloadsMap.Close()
node.workerMap.Close()
node.jobsMap.Close()
node.tickerMap.Close()
}
node.shutdownMap.Close()
node.nodeReader.Close()
if err := node.nodeStream.Destroy(ctx); err != nil {
node.logger.Error(fmt.Errorf("Close: failed to destroy node event stream: %w", err))
Expand Down Expand Up @@ -629,7 +625,7 @@ func (node *Node) ackWorkerEvent(ctx context.Context, ev *streaming.Event) {
return
}
ack.EventID = pending.ID
if _, err := stream.Add(ctx, evDispatchReturn, marshalAck(ack), soptions.WithOnlyIfStreamExists()); err != nil {
if _, err := stream.Add(ctx, evDispatchReturn, marshalAck(ack)); err != nil {
node.logger.Error(fmt.Errorf("ackWorkerEvent: failed to dispatch return to stream %q: %w", nodeStreamName(node.PoolName, nodeID), err))
}
}
Expand All @@ -646,11 +642,11 @@ func (node *Node) ackWorkerEvent(ctx context.Context, ev *streaming.Event) {
ev := value.(*streaming.Event)
if time.Since(ev.CreatedAt()) > pendingEventTTL {
staleKeys = append(staleKeys, key.(string))
node.logger.Error(fmt.Errorf("ackWorkerEvent: stale event, removing from pending events"), "event", ev.EventName, "id", ev.ID, "since", time.Since(ev.CreatedAt()), "TTL", pendingEventTTL)
}
return true
})
for _, key := range staleKeys {
node.logger.Error(fmt.Errorf("ackWorkerEvent: stale event, removing from pending events"), "event", ev.EventName, "id", ev.ID, "since", time.Since(ev.CreatedAt()), "TTL", pendingEventTTL)
node.pendingEvents.Delete(key)
}
}
Expand All @@ -664,35 +660,26 @@ func (node *Node) returnDispatchStatus(_ context.Context, ev *streaming.Event) {
return
}
node.logger.Debug("dispatch return", "event", ev.EventName, "id", ev.ID, "ack-id", ack.EventID)
cherr := val.(chan error)
if cherr == nil {
// Event was requeued.
if val == nil {
// Event was requeued, just clean up
node.pendingJobs.Delete(ack.EventID)
return
}
var err error
if ack.Error != "" {
err = errors.New(ack.Error)
}
cherr <- err
val.(chan error) <- err
}

// manageWorkers monitors the workers replicated map and triggers job rebalancing
// when workers are added or removed from the pool.
func (node *Node) manageWorkers(ctx context.Context) {
defer node.wg.Done()
defer node.logger.Debug("manageWorkers: exiting")
defer node.workerMap.Close()

ch := node.workerMap.Subscribe()
for {
select {
case <-ch:
node.logger.Debug("manageWorkers: worker map updated")
node.handleWorkerMapUpdate(ctx)
case <-node.stop:
return
}
for range node.workerMap.Subscribe() {
node.logger.Debug("manageWorkers: worker map updated")
node.handleWorkerMapUpdate(ctx)
}
}

Expand All @@ -708,7 +695,9 @@ func (node *Node) handleWorkerMapUpdate(ctx context.Context) {
// If it's not in the worker map, then it's not active and its jobs
// have already been requeued.
node.logger.Info("handleWorkerMapUpdate: removing inactive local worker", "worker", worker.ID)
node.deleteWorker(ctx, worker.ID)
if err := node.deleteWorker(ctx, worker.ID); err != nil {
node.logger.Error(fmt.Errorf("handleWorkerMapUpdate: failed to delete inactive worker %q: %w", worker.ID, err), "worker", worker.ID)
}
worker.stopAndWait(ctx)
node.localWorkers.Delete(key)
return true
Expand Down Expand Up @@ -755,17 +744,9 @@ func (node *Node) requeueJob(ctx context.Context, workerID string, job *Job) (ch
func (node *Node) manageShutdown(ctx context.Context) {
defer node.wg.Done()
defer node.logger.Debug("manageShutdown: exiting")
defer node.shutdownMap.Close()

ch := node.shutdownMap.Subscribe()
for {
select {
case <-ch:
node.logger.Debug("manageShutdown: shutdown map updated, initiating shutdown")
node.handleShutdownMapUpdate(ctx)
case <-node.stop:
return
}
for range node.shutdownMap.Subscribe() {
node.logger.Debug("manageShutdown: shutdown map updated")
node.handleShutdownMapUpdate(ctx)
}
}

Expand All @@ -781,13 +762,13 @@ func (node *Node) handleShutdownMapUpdate(ctx context.Context) {
// There is only one value in the map
requestingNode = node
}
node.logger.Info("shutdown", "requested-by", requestingNode)
node.close(ctx, false)
node.logger.Debug("manageShutdown: shutting down", "requested-by", requestingNode)
node.close(ctx, true)

node.lock.Lock()
node.shutdown = true
node.lock.Unlock()
node.logger.Info("shutdown")
node.logger.Info("shutdown", "requested-by", requestingNode)
}

// manageInactiveWorkers periodically checks for inactive workers and requeues their jobs.
Expand Down
34 changes: 18 additions & 16 deletions pool/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ func (w *Worker) stopJob(ctx context.Context, key string, forRequeue bool) error
if err := w.handler.Stop(key); err != nil {
return fmt.Errorf("failed to stop job %q: %w", key, err)
}
w.logger.Debug("stopped job", "job", key)
w.jobs.Delete(key)
if _, _, err := w.jobsMap.RemoveValues(ctx, w.ID, key); err != nil {
w.logger.Error(fmt.Errorf("stop job: failed to remove job %q from jobs map: %w", key, err))
Expand Down Expand Up @@ -375,6 +376,7 @@ func (w *Worker) rebalance(ctx context.Context, activeWorkers []string) {
w.logger.Error(fmt.Errorf("rebalance: failed to stop job: %w", err), "job", key)
continue
}
w.logger.Debug("stopped job", "job", key)
w.jobs.Delete(key)
cherr, err := w.Node.requeueJob(ctx, w.ID, job)
if err != nil {
Expand Down Expand Up @@ -414,8 +416,8 @@ func (w *Worker) requeueJobs(ctx context.Context) error {
if err != nil {
return fmt.Errorf("requeueJobs: failed to mark worker as inactive: %w", err)
}
if prev == "-" || prev == "" {
w.logger.Debug("requeueJobs: worker already marked as inactive, skipping requeue")
if prev == "-" {
w.logger.Debug("requeueJobs: jobs already requeued, skipping requeue")
return nil
}

Expand Down Expand Up @@ -446,35 +448,35 @@ func (w *Worker) attemptRequeue(ctx context.Context, jobsToRequeue map[string]*J
err error
}
resultChan := make(chan result, len(jobsToRequeue))
defer close(resultChan)

wg.Add(len(jobsToRequeue))
for key, job := range jobsToRequeue {
wg.Add(1)
go func(k string, j *Job) {
pulse.Go(ctx, func() {
defer wg.Done()
err := w.requeueJob(ctx, j)
resultChan <- result{key: k, err: err}
}(key, job)
w.logger.Debug("requeueJobs: requeuing", "job", key)
err := w.requeueJob(ctx, job)
w.logger.Debug("requeueJobs: requeued", "job", key, "error", err)
resultChan <- result{key: key, err: err}
})
}

go func() {
wg.Wait()
close(resultChan)
}()
wg.Wait()

remainingJobs := make(map[string]*Job)
for {
select {
case res, ok := <-resultChan:
if !ok {
return remainingJobs
}
case res := <-resultChan:
if res.err != nil {
w.logger.Error(fmt.Errorf("requeueJobs: failed to requeue job %q: %w", res.key, res.err))
remainingJobs[res.key] = jobsToRequeue[res.key]
continue
}
delete(remainingJobs, res.key)
w.logger.Info("requeued", "job", res.key)
if len(remainingJobs) == 0 {
w.logger.Debug("requeueJobs: all jobs requeued")
return remainingJobs
}
case <-time.After(w.workerTTL):
w.logger.Error(fmt.Errorf("requeueJobs: timeout reached, some jobs may not have been processed"))
return remainingJobs
Expand Down

0 comments on commit 785f03b

Please # to comment.