diff --git a/components/payments/cmd/connectors/internal/task/scheduler.go b/components/payments/cmd/connectors/internal/task/scheduler.go index b7584cb80f..a4397fd1e6 100644 --- a/components/payments/cmd/connectors/internal/task/scheduler.go +++ b/components/payments/cmd/connectors/internal/task/scheduler.go @@ -45,7 +45,7 @@ type DefaultTaskScheduler struct { metricsRegistry metrics.MetricsRegistry containerFactory ContainerCreateFunc tasks map[string]*taskHolder - mu sync.Mutex + mu sync.RWMutex resolver Resolver stopped bool workerPool *pond.WorkerPool @@ -87,9 +87,6 @@ func (s *DefaultTaskScheduler) Schedule(ctx context.Context, descriptor models.T // the scheduler option is equal to OPTIONS_RUN_NOW_SYNC. Otherwise, it will // return an error chan that will be closed immediately after task creation. func (s *DefaultTaskScheduler) schedule(ctx context.Context, descriptor models.TaskDescriptor, options models.TaskSchedulerOptions) <-chan error { - s.mu.Lock() - defer s.mu.Unlock() - returnErrorFunc := func(err error) <-chan error { errChan := make(chan error, 1) if err != nil { @@ -104,7 +101,10 @@ func (s *DefaultTaskScheduler) schedule(ctx context.Context, descriptor models.T return returnErrorFunc(err) } - if _, ok := s.tasks[taskID]; ok { + s.mu.RLock() + _, ok := s.tasks[taskID] + s.mu.RUnlock() + if ok { switch options.RestartOption { case models.OPTIONS_STOP_AND_RESTART, models.OPTIONS_RESTART_ALWAYS: // We still want to restart the task @@ -212,9 +212,6 @@ func (s *DefaultTaskScheduler) registerTaskError(ctx context.Context, holder *ta } func (s *DefaultTaskScheduler) deleteTask(ctx context.Context, holder *taskHolder) { - s.mu.Lock() - defer s.mu.Unlock() - taskID, err := holder.descriptor.EncodeToString() if err != nil { holder.logger.Errorf("Error encoding task descriptor: %s", err) @@ -222,11 +219,13 @@ func (s *DefaultTaskScheduler) deleteTask(ctx context.Context, holder *taskHolde return } + s.mu.Lock() delete(s.tasks, taskID) - if s.stopped { + s.mu.Unlock() return } + s.mu.Unlock() oldestPendingTask, err := s.store.ReadOldestPendingTask(ctx, s.connectorID) if err != nil { @@ -263,8 +262,10 @@ func (s *DefaultTaskScheduler) deleteTask(ctx context.Context, holder *taskHolde type StopChan chan chan struct{} -// Lock should be held when calling this function func (s *DefaultTaskScheduler) stopTask(ctx context.Context, descriptor models.TaskDescriptor) error { + s.mu.Lock() + defer s.mu.Unlock() + taskID, err := descriptor.EncodeToString() if err != nil { return err @@ -416,7 +417,9 @@ func (s *DefaultTaskScheduler) startTask(ctx context.Context, descriptor models. return errChan } + s.mu.Lock() s.tasks[taskID] = holder + s.mu.Unlock() sendError := false switch options.ScheduleOption {