Skip to content

Commit

Permalink
refactor: accept error in failed callback
Browse files Browse the repository at this point in the history
  • Loading branch information
kalbhor committed Mar 14, 2024
1 parent 7394168 commit b4aca01
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ type TaskOpts struct {
Queue string
SuccessCB func(JobCtx)
ProcessingCB func(JobCtx)
RetryingCB func(JobCtx)
FailedCB func(JobCtx)
RetryingCB func(JobCtx, error)
FailedCB func(JobCtx, error)
}

// RegisterTask maps a new task against the tasks map on the server.
Expand Down Expand Up @@ -295,7 +295,7 @@ func (s *Server) process(ctx context.Context, w chan []byte) {

if err := s.execJob(ctx, msg, task); err != nil {
s.spanError(span, err)
s.log.Error("could not execute job. err", "error", err)
s.log.Error("could not execute job", "error", err)
}
}
}
Expand Down Expand Up @@ -358,12 +358,12 @@ func (s *Server) execJob(ctx context.Context, msg JobMessage, task Task) error {
// Try queueing the job again.
if msg.MaxRetry != msg.Retried {
if task.opts.RetryingCB != nil {
task.opts.RetryingCB(taskCtx)
task.opts.RetryingCB(taskCtx, err)
}
return s.retryJob(ctx, msg)
} else {
if task.opts.FailedCB != nil {
task.opts.FailedCB(taskCtx)
task.opts.FailedCB(taskCtx, err)
}
// If we hit max retries, set the task status as failed.
return s.statusFailed(ctx, msg)
Expand All @@ -382,13 +382,13 @@ func (s *Server) execJob(ctx context.Context, msg JobMessage, task Task) error {
meta := DefaultMeta(nj.Opts)
meta.PrevJobResult, err = s.GetResult(ctx, msg.ID)
if err != nil {
return err
return fmt.Errorf("could not get result for id (%s) : %w", msg.ID, err)
}

// Set the ID of the next job in the chain
msg.OnSuccessID, err = s.enqueueWithMeta(ctx, nj, meta)
if err != nil {
return err
return fmt.Errorf("could not enqueue job id (%s) : %w", msg.ID, err)
}
}

Expand Down

0 comments on commit b4aca01

Please # to comment.