Skip to content

Commit

Permalink
fix(payments): do not lock everything at every scheduling (#1739)
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-nicolas authored Dec 2, 2024
1 parent 0a2efa1 commit 2a41dd3
Showing 1 changed file with 13 additions and 10 deletions.
23 changes: 13 additions & 10 deletions components/payments/cmd/connectors/internal/task/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type DefaultTaskScheduler struct {
metricsRegistry metrics.MetricsRegistry
containerFactory ContainerCreateFunc
tasks map[string]*taskHolder
mu sync.Mutex
mu sync.RWMutex
resolver Resolver
stopped bool
workerPool *pond.WorkerPool
Expand Down Expand Up @@ -87,9 +87,6 @@ func (s *DefaultTaskScheduler) Schedule(ctx context.Context, descriptor models.T
// the scheduler option is equal to OPTIONS_RUN_NOW_SYNC. Otherwise, it will
// return an error chan that will be closed immediately after task creation.
func (s *DefaultTaskScheduler) schedule(ctx context.Context, descriptor models.TaskDescriptor, options models.TaskSchedulerOptions) <-chan error {
s.mu.Lock()
defer s.mu.Unlock()

returnErrorFunc := func(err error) <-chan error {
errChan := make(chan error, 1)
if err != nil {
Expand All @@ -104,7 +101,10 @@ func (s *DefaultTaskScheduler) schedule(ctx context.Context, descriptor models.T
return returnErrorFunc(err)
}

if _, ok := s.tasks[taskID]; ok {
s.mu.RLock()
_, ok := s.tasks[taskID]
s.mu.RUnlock()
if ok {
switch options.RestartOption {
case models.OPTIONS_STOP_AND_RESTART, models.OPTIONS_RESTART_ALWAYS:
// We still want to restart the task
Expand Down Expand Up @@ -212,21 +212,20 @@ func (s *DefaultTaskScheduler) registerTaskError(ctx context.Context, holder *ta
}

func (s *DefaultTaskScheduler) deleteTask(ctx context.Context, holder *taskHolder) {
s.mu.Lock()
defer s.mu.Unlock()

taskID, err := holder.descriptor.EncodeToString()
if err != nil {
holder.logger.Errorf("Error encoding task descriptor: %s", err)

return
}

s.mu.Lock()
delete(s.tasks, taskID)

if s.stopped {
s.mu.Unlock()
return
}
s.mu.Unlock()

oldestPendingTask, err := s.store.ReadOldestPendingTask(ctx, s.connectorID)
if err != nil {
Expand Down Expand Up @@ -263,8 +262,10 @@ func (s *DefaultTaskScheduler) deleteTask(ctx context.Context, holder *taskHolde

type StopChan chan chan struct{}

// Lock should be held when calling this function
func (s *DefaultTaskScheduler) stopTask(ctx context.Context, descriptor models.TaskDescriptor) error {
s.mu.Lock()
defer s.mu.Unlock()

taskID, err := descriptor.EncodeToString()
if err != nil {
return err
Expand Down Expand Up @@ -416,7 +417,9 @@ func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models.
return errChan
}

s.mu.Lock()
s.tasks[taskID] = holder
s.mu.Unlock()

sendError := false
switch options.ScheduleOption {
Expand Down

0 comments on commit 2a41dd3

Please # to comment.