diff --git a/pool/node.go b/pool/node.go index db53d95..92a1367 100644 --- a/pool/node.go +++ b/pool/node.go @@ -42,6 +42,7 @@ type ( workerTTL time.Duration // Worker considered dead if keep-alive not updated after this duration workerShutdownTTL time.Duration // Worker considered dead if not shutdown after this duration pendingJobTTL time.Duration // Job lease expires if not acked after this duration + ackGracePeriod time.Duration // Wait for return status up to this duration logger pulse.Logger h hasher stop chan struct{} // closed when node is stopped @@ -191,6 +192,7 @@ func AddNode(ctx context.Context, name string, rdb *redis.Client, opts ...NodeOp workerTTL: o.workerTTL, workerShutdownTTL: o.workerShutdownTTL, pendingJobTTL: o.pendingJobTTL, + ackGracePeriod: o.ackGracePeriod, h: jumpHash{crc64.New(crc64.MakeTable(crc64.ECMA))}, stop: make(chan struct{}), rdb: rdb, @@ -313,16 +315,26 @@ func (node *Node) DispatchJob(ctx context.Context, key string, payload []byte) e node.pendingJobs[eventID] = cherr node.lock.Unlock() - // Wait for return status. + // Wait for return status up to ack grace period. + timer := time.NewTimer(2 * node.ackGracePeriod) + defer timer.Stop() + select { case err = <-cherr: + case <-timer.C: + err = fmt.Errorf("DispatchJob: job %q timed out, TTL: %v", key, 2*node.ackGracePeriod) case <-ctx.Done(): err = ctx.Err() } + node.lock.Lock() + delete(node.pendingJobs, eventID) close(cherr) + node.lock.Unlock() if err == nil { node.logger.Info("dispatched", "key", key) + } else { + node.logger.Error(fmt.Errorf("DispatchJob: failed to dispatch job: %w", err), "key", key) } return err } @@ -636,9 +648,9 @@ func (node *Node) returnDispatchStatus(_ context.Context, ev *streaming.Event) { return } node.logger.Debug("dispatch return", "event", ev.EventName, "id", ev.ID, "ack-id", ack.EventID) - delete(node.pendingJobs, ack.EventID) if cherr == nil { // Event was requeued. + delete(node.pendingJobs, ack.EventID) return } var err error diff --git a/pool/scheduler.go b/pool/scheduler.go index 0bfc5c0..97446c2 100644 --- a/pool/scheduler.go +++ b/pool/scheduler.go @@ -134,11 +134,11 @@ func (sched *scheduler) startJobs(ctx context.Context, jobs []*JobParam) error { for _, job := range jobs { err := sched.node.DispatchJob(ctx, job.Key, job.Payload) if err != nil { - sched.logger.Error(err, "failed to dispatch job", "job", job.Key) + sched.logger.Error(fmt.Errorf("failed to dispatch job: %w", err), "job", job.Key) continue } if _, err := sched.jobMap.Set(ctx, job.Key, time.Now().String()); err != nil { - sched.logger.Error(err, "failed to store job", "job", job.Key) + sched.logger.Error(fmt.Errorf("failed to store job: %w", err), "job", job.Key) continue } } @@ -164,11 +164,11 @@ func (sched *scheduler) stopJobs(ctx context.Context, plan *JobPlan) error { for _, key := range toStop { err := sched.node.StopJob(ctx, key) if err != nil { - sched.logger.Error(err, "failed to stop job", "job", key) + sched.logger.Error(fmt.Errorf("failed to stop job: %w", err), "job", key) continue } if _, err := sched.jobMap.Delete(ctx, key); err != nil { - sched.logger.Error(err, "failed to delete job", "job", key) + sched.logger.Error(fmt.Errorf("failed to delete job: %w", err), "job", key) } } return nil diff --git a/streaming/sink.go b/streaming/sink.go index 9d47a34..17c2a25 100644 --- a/streaming/sink.go +++ b/streaming/sink.go @@ -410,12 +410,13 @@ func (s *Sink) periodicKeepAlive() { for { select { case <-ticker.C: + s.lock.Lock() now := time.Now().UnixNano() if _, err := s.consumersKeepAliveMap.Set(ctx, s.consumer, strconv.FormatInt(now, 10)); err != nil { s.logger.Error(fmt.Errorf("failed to update sink keep-alive: %v", err)) + s.lock.Unlock() continue } - s.lock.Lock() s.lastKeepAlive = now s.lock.Unlock()