Skip to content

Commit

Permalink
Timeout DispatchJob if no dispatch return is received
Browse files Browse the repository at this point in the history
This is to avoid blocking in case of error.
  • Loading branch information
raphael committed Oct 22, 2024
1 parent fb0e73f commit 609a9d7
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 7 deletions.
16 changes: 14 additions & 2 deletions pool/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type (
workerTTL time.Duration // Worker considered dead if keep-alive not updated after this duration
workerShutdownTTL time.Duration // Worker considered dead if not shutdown after this duration
pendingJobTTL time.Duration // Job lease expires if not acked after this duration
ackGracePeriod time.Duration // Wait for return status up to this duration
logger pulse.Logger
h hasher
stop chan struct{} // closed when node is stopped
Expand Down Expand Up @@ -191,6 +192,7 @@ func AddNode(ctx context.Context, name string, rdb *redis.Client, opts ...NodeOp
workerTTL: o.workerTTL,
workerShutdownTTL: o.workerShutdownTTL,
pendingJobTTL: o.pendingJobTTL,
ackGracePeriod: o.ackGracePeriod,
h: jumpHash{crc64.New(crc64.MakeTable(crc64.ECMA))},
stop: make(chan struct{}),
rdb: rdb,
Expand Down Expand Up @@ -313,16 +315,26 @@ func (node *Node) DispatchJob(ctx context.Context, key string, payload []byte) e
node.pendingJobs[eventID] = cherr
node.lock.Unlock()

// Wait for return status.
// Wait for return status up to ack grace period.
timer := time.NewTimer(2 * node.ackGracePeriod)
defer timer.Stop()

select {
case err = <-cherr:
case <-timer.C:
err = fmt.Errorf("DispatchJob: job %q timed out, TTL: %v", key, 2*node.ackGracePeriod)
case <-ctx.Done():
err = ctx.Err()
}

node.lock.Lock()
delete(node.pendingJobs, eventID)
close(cherr)
node.lock.Unlock()
if err == nil {
node.logger.Info("dispatched", "key", key)
} else {
node.logger.Error(fmt.Errorf("DispatchJob: failed to dispatch job: %w", err), "key", key)
}
return err
}
Expand Down Expand Up @@ -636,9 +648,9 @@ func (node *Node) returnDispatchStatus(_ context.Context, ev *streaming.Event) {
return
}
node.logger.Debug("dispatch return", "event", ev.EventName, "id", ev.ID, "ack-id", ack.EventID)
delete(node.pendingJobs, ack.EventID)
if cherr == nil {
// Event was requeued.
delete(node.pendingJobs, ack.EventID)
return
}
var err error
Expand Down
8 changes: 4 additions & 4 deletions pool/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,11 @@ func (sched *scheduler) startJobs(ctx context.Context, jobs []*JobParam) error {
for _, job := range jobs {
err := sched.node.DispatchJob(ctx, job.Key, job.Payload)
if err != nil {
sched.logger.Error(err, "failed to dispatch job", "job", job.Key)
sched.logger.Error(fmt.Errorf("failed to dispatch job: %w", err), "job", job.Key)
continue
}
if _, err := sched.jobMap.Set(ctx, job.Key, time.Now().String()); err != nil {
sched.logger.Error(err, "failed to store job", "job", job.Key)
sched.logger.Error(fmt.Errorf("failed to store job: %w", err), "job", job.Key)
continue
}
}
Expand All @@ -164,11 +164,11 @@ func (sched *scheduler) stopJobs(ctx context.Context, plan *JobPlan) error {
for _, key := range toStop {
err := sched.node.StopJob(ctx, key)
if err != nil {
sched.logger.Error(err, "failed to stop job", "job", key)
sched.logger.Error(fmt.Errorf("failed to stop job: %w", err), "job", key)
continue
}
if _, err := sched.jobMap.Delete(ctx, key); err != nil {
sched.logger.Error(err, "failed to delete job", "job", key)
sched.logger.Error(fmt.Errorf("failed to delete job: %w", err), "job", key)
}
}
return nil
Expand Down
3 changes: 2 additions & 1 deletion streaming/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,12 +410,13 @@ func (s *Sink) periodicKeepAlive() {
for {
select {
case <-ticker.C:
s.lock.Lock()
now := time.Now().UnixNano()
if _, err := s.consumersKeepAliveMap.Set(ctx, s.consumer, strconv.FormatInt(now, 10)); err != nil {
s.logger.Error(fmt.Errorf("failed to update sink keep-alive: %v", err))
s.lock.Unlock()
continue
}
s.lock.Lock()
s.lastKeepAlive = now
s.lock.Unlock()

Expand Down

0 comments on commit 609a9d7

Please # to comment.