From 4f5bc58a7326e9b4971d99abbcc86a5dcfd3f721 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Mon, 31 Jul 2023 17:52:34 -0700 Subject: [PATCH 1/2] Move getTaskQueueManager call into redirect methods --- service/matching/matching_engine.go | 36 ++++-------------------- service/matching/task_queue_manager.go | 39 +++++++++++++++++--------- 2 files changed, 32 insertions(+), 43 deletions(-) diff --git a/service/matching/matching_engine.go b/service/matching/matching_engine.go index d637fd7b7a6..8b8443d0027 100644 --- a/service/matching/matching_engine.go +++ b/service/matching/matching_engine.go @@ -335,12 +335,7 @@ func (e *matchingEngineImpl) AddWorkflowTask( // We don't need the userDataChanged channel here because: // - if we sync match or sticky worker unavailable, we're done // - if we spool to db, we'll re-resolve when it comes out of the db - taskQueue, _, err := baseTqm.RedirectToVersionedQueueForAdd(ctx, addRequest.VersionDirective) - if err != nil { - return false, err - } - - tqm, err := e.getTaskQueueManager(ctx, taskQueue, stickyInfo, true) + tqm, _, err := baseTqm.RedirectToVersionedQueueForAdd(ctx, addRequest.VersionDirective) if err != nil { return false, err } @@ -393,12 +388,7 @@ func (e *matchingEngineImpl) AddActivityTask( // We don't need the userDataChanged channel here because: // - if we sync match, we're done // - if we spool to db, we'll re-resolve when it comes out of the db - taskQueue, _, err := baseTqm.RedirectToVersionedQueueForAdd(ctx, addRequest.VersionDirective) - if err != nil { - return false, err - } - - tqm, err := e.getTaskQueueManager(ctx, taskQueue, stickyInfo, true) + tqm, _, err := baseTqm.RedirectToVersionedQueueForAdd(ctx, addRequest.VersionDirective) if err != nil { return false, err } @@ -447,12 +437,7 @@ func (e *matchingEngineImpl) DispatchSpooledTask( if err != nil { return err } - taskQueue, userDataChanged, err := baseTqm.RedirectToVersionedQueueForAdd(ctx, directive) - if err != nil { - return err - } - sticky := stickyInfo.kind == enumspb.TASK_QUEUE_KIND_STICKY - tqm, err := e.getTaskQueueManager(ctx, taskQueue, stickyInfo, !sticky) + tqm, userDataChanged, err := baseTqm.RedirectToVersionedQueueForAdd(ctx, directive) if err != nil { return err } @@ -692,18 +677,13 @@ func (e *matchingEngineImpl) QueryWorkflow( // We don't need the userDataChanged channel here because we either do this sync (local or remote) // or fail with a relatively short timeout. - taskQueue, _, err := baseTqm.RedirectToVersionedQueueForAdd(ctx, queryRequest.VersionDirective) + tqm, _, err := baseTqm.RedirectToVersionedQueueForAdd(ctx, queryRequest.VersionDirective) if err != nil { return nil, err - } else if taskQueue.VersionSet() == dlqVersionSet { + } else if tqm.QueueID().VersionSet() == dlqVersionSet { return nil, serviceerror.NewFailedPrecondition("Operations on versioned workflows are disabled") } - tqm, err := e.getTaskQueueManager(ctx, taskQueue, stickyInfo, !sticky) - if err != nil { - return nil, err - } - taskID := uuid.New() resp, err := tqm.DispatchQueryTask(ctx, taskID, queryRequest) @@ -1204,7 +1184,7 @@ func (e *matchingEngineImpl) getTask( return nil, err } - taskQueue, err := baseTqm.RedirectToVersionedQueueForPoll(pollMetadata.workerVersionCapabilities) + tqm, err := baseTqm.RedirectToVersionedQueueForPoll(ctx, pollMetadata.workerVersionCapabilities) if err != nil { if errors.Is(err, errUserDataDisabled) { // Rewrite to nicer error message @@ -1212,10 +1192,6 @@ func (e *matchingEngineImpl) getTask( } return nil, err } - tqm, err := e.getTaskQueueManager(ctx, taskQueue, stickyInfo, true) - if err != nil { - return nil, err - } // We need to set a shorter timeout than the original ctx; otherwise, by the time ctx deadline is // reached, instead of emptyTask, context timeout error is returned to the frontend by the rpc stack, diff --git a/service/matching/task_queue_manager.go b/service/matching/task_queue_manager.go index ef201ce8b4f..543ce649026 100644 --- a/service/matching/task_queue_manager.go +++ b/service/matching/task_queue_manager.go @@ -148,8 +148,8 @@ type ( QueueID() *taskQueueID TaskQueueKind() enumspb.TaskQueueKind LongPollExpirationInterval() time.Duration - RedirectToVersionedQueueForAdd(context.Context, *taskqueuespb.TaskVersionDirective) (*taskQueueID, chan struct{}, error) - RedirectToVersionedQueueForPoll(*commonpb.WorkerVersionCapabilities) (*taskQueueID, error) + RedirectToVersionedQueueForAdd(context.Context, *taskqueuespb.TaskVersionDirective) (taskQueueManager, chan struct{}, error) + RedirectToVersionedQueueForPoll(context.Context, *commonpb.WorkerVersionCapabilities) (taskQueueManager, error) } // Single task queue in memory state @@ -765,11 +765,11 @@ func (c *taskQueueManagerImpl) LongPollExpirationInterval() time.Duration { return c.config.LongPollExpirationInterval() } -func (c *taskQueueManagerImpl) RedirectToVersionedQueueForPoll(caps *commonpb.WorkerVersionCapabilities) (*taskQueueID, error) { +func (c *taskQueueManagerImpl) RedirectToVersionedQueueForPoll(ctx context.Context, caps *commonpb.WorkerVersionCapabilities) (taskQueueManager, error) { if !caps.GetUseVersioning() { // Either this task queue is versioned, or there are still some workflows running on // the "unversioned" set. - return c.taskQueueID, nil + return c, nil } // We don't need the userDataChanged channel here because polls have a timeout and the // client will retry, so if we're blocked on the wrong matcher it'll just take one poll @@ -789,7 +789,7 @@ func (c *taskQueueManagerImpl) RedirectToVersionedQueueForPoll(caps *commonpb.Wo if unknownBuild { c.recordUnknownBuildPoll(caps.BuildId) } - return c.taskQueueID, nil + return c, nil } versionSet, unknownBuild, err := lookupVersionSetForPoll(data, caps) @@ -799,10 +799,12 @@ func (c *taskQueueManagerImpl) RedirectToVersionedQueueForPoll(caps *commonpb.Wo if unknownBuild { c.recordUnknownBuildPoll(caps.BuildId) } - return newTaskQueueIDWithVersionSet(c.taskQueueID, versionSet), nil + + newId := newTaskQueueIDWithVersionSet(c.taskQueueID, versionSet) + return c.engine.getTaskQueueManager(ctx, newId, c.stickyInfo, true) } -func (c *taskQueueManagerImpl) RedirectToVersionedQueueForAdd(ctx context.Context, directive *taskqueuespb.TaskVersionDirective) (*taskQueueID, chan struct{}, error) { +func (c *taskQueueManagerImpl) RedirectToVersionedQueueForAdd(ctx context.Context, directive *taskqueuespb.TaskVersionDirective) (taskQueueManager, chan struct{}, error) { var buildId string switch dir := directive.GetValue().(type) { case *taskqueuespb.TaskVersionDirective_UseDefault: @@ -811,7 +813,7 @@ func (c *taskQueueManagerImpl) RedirectToVersionedQueueForAdd(ctx context.Contex buildId = dir.BuildId default: // Unversioned task, leave on unversioned queue. - return c.taskQueueID, nil, nil + return c, nil, nil } // Have to look up versioning data. @@ -820,14 +822,19 @@ func (c *taskQueueManagerImpl) RedirectToVersionedQueueForAdd(ctx context.Contex if errors.Is(err, errUserDataDisabled) { // When user data disabled, send "default" tasks to unversioned queue. if buildId == "" { - return c.taskQueueID, userDataChanged, nil + return c, userDataChanged, nil } // Send versioned sticky back to regular queue so they can go in the dlq. if c.kind == enumspb.TASK_QUEUE_KIND_STICKY { return nil, nil, serviceerrors.NewStickyWorkerUnavailable() } // Send versioned tasks to dlq. - return newTaskQueueIDWithVersionSet(c.taskQueueID, dlqVersionSet), userDataChanged, nil + newId := newTaskQueueIDWithVersionSet(c.taskQueueID, dlqVersionSet) + tqm, err := c.engine.getTaskQueueManager(ctx, newId, c.stickyInfo, true) + if err != nil { + return nil, nil, err + } + return tqm, userDataChanged, nil } return nil, nil, err } @@ -844,13 +851,13 @@ func (c *taskQueueManagerImpl) RedirectToVersionedQueueForAdd(ctx context.Contex // Don't bother persisting the unknown build id in this case: sticky tasks have a // short timeout, so it doesn't matter if they get lost. } - return c.taskQueueID, userDataChanged, nil + return c, userDataChanged, nil } versionSet, unknownBuild, err := lookupVersionSetForAdd(data, buildId) if err == errEmptyVersioningData { // nolint:goerr113 // default was requested for an unversioned queue - return c.taskQueueID, userDataChanged, nil + return c, userDataChanged, nil } else if err != nil { return nil, nil, err } @@ -868,7 +875,13 @@ func (c *taskQueueManagerImpl) RedirectToVersionedQueueForAdd(ctx context.Contex return nil, nil, err } } - return newTaskQueueIDWithVersionSet(c.taskQueueID, versionSet), userDataChanged, nil + + newId := newTaskQueueIDWithVersionSet(c.taskQueueID, versionSet) + tqm, err := c.engine.getTaskQueueManager(ctx, newId, c.stickyInfo, true) + if err != nil { + return nil, nil, err + } + return tqm, userDataChanged, nil } func (c *taskQueueManagerImpl) recordUnknownBuildPoll(buildId string) { From 47d0de72eea39baa6bc7b5dd96777265ed594099 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Wed, 9 Aug 2023 17:54:50 -0700 Subject: [PATCH 2/2] comments about loading tqms --- service/matching/matching_engine.go | 4 ++++ service/matching/task_queue_manager.go | 2 ++ 2 files changed, 6 insertions(+) diff --git a/service/matching/matching_engine.go b/service/matching/matching_engine.go index 8b8443d0027..3171d3bb3cd 100644 --- a/service/matching/matching_engine.go +++ b/service/matching/matching_engine.go @@ -433,6 +433,10 @@ func (e *matchingEngineImpl) DispatchSpooledTask( unversionedOrigTaskQueue := newTaskQueueIDWithVersionSet(origTaskQueue, "") // Redirect and re-resolve if we're blocked in matcher and user data changes. for { + // If normal queue: always load the base tqm to get versioning data. + // If sticky queue: sticky is not versioned, so if we got here (by taskReader calling this), + // the queue is already loaded. + // So we can always use true here. baseTqm, err := e.getTaskQueueManager(ctx, unversionedOrigTaskQueue, stickyInfo, true) if err != nil { return err diff --git a/service/matching/task_queue_manager.go b/service/matching/task_queue_manager.go index 543ce649026..d0dea2e3409 100644 --- a/service/matching/task_queue_manager.go +++ b/service/matching/task_queue_manager.go @@ -830,6 +830,8 @@ func (c *taskQueueManagerImpl) RedirectToVersionedQueueForAdd(ctx context.Contex } // Send versioned tasks to dlq. newId := newTaskQueueIDWithVersionSet(c.taskQueueID, dlqVersionSet) + // If we're called by QueryWorkflow, then we technically don't need to load the dlq + // tqm here. But it's not a big deal if we do. tqm, err := c.engine.getTaskQueueManager(ctx, newId, c.stickyInfo, true) if err != nil { return nil, nil, err