diff --git a/server.go b/server.go index e1b3f58..7b27afd 100644 --- a/server.go +++ b/server.go @@ -57,8 +57,8 @@ type TaskOpts struct { Queue string SuccessCB func(JobCtx) ProcessingCB func(JobCtx) - RetryingCB func(JobCtx) - FailedCB func(JobCtx) + RetryingCB func(JobCtx, error) + FailedCB func(JobCtx, error) } // RegisterTask maps a new task against the tasks map on the server. @@ -295,7 +295,7 @@ func (s *Server) process(ctx context.Context, w chan []byte) { if err := s.execJob(ctx, msg, task); err != nil { s.spanError(span, err) - s.log.Error("could not execute job. err", "error", err) + s.log.Error("could not execute job", "error", err) } } } @@ -358,12 +358,12 @@ func (s *Server) execJob(ctx context.Context, msg JobMessage, task Task) error { // Try queueing the job again. if msg.MaxRetry != msg.Retried { if task.opts.RetryingCB != nil { - task.opts.RetryingCB(taskCtx) + task.opts.RetryingCB(taskCtx, err) } return s.retryJob(ctx, msg) } else { if task.opts.FailedCB != nil { - task.opts.FailedCB(taskCtx) + task.opts.FailedCB(taskCtx, err) } // If we hit max retries, set the task status as failed. return s.statusFailed(ctx, msg) @@ -382,13 +382,13 @@ func (s *Server) execJob(ctx context.Context, msg JobMessage, task Task) error { meta := DefaultMeta(nj.Opts) meta.PrevJobResult, err = s.GetResult(ctx, msg.ID) if err != nil { - return err + return fmt.Errorf("could not get result for id (%s) : %w", msg.ID, err) } // Set the ID of the next job in the chain msg.OnSuccessID, err = s.enqueueWithMeta(ctx, nj, meta) if err != nil { - return err + return fmt.Errorf("could not enqueue job id (%s) : %w", msg.ID, err) } }