From e7dd2ff5fe0952c6fc39463db4c8e667c7ea0668 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Tue, 8 Aug 2023 16:56:39 -0700 Subject: [PATCH] Clean task queue name from matching before writing to WorkflowTaskScheduledEvent (#4565) **What changed?** In some cases we write a new WorkflowTaskScheduledEvent right before a WorkflowTaskStartedEvent in RecordWorkflowTaskStarted. In that case, the task queue name comes from RecordWorkflowTaskStarted, but that name comes from matching and may be a specific task queue partition, not the base name. We should write only the base name. **Why?** Fixes #4557 **How did you test it?** Unit test --- .../workflow/mutable_state_impl_test.go | 14 +++++++++++++- .../workflow/workflow_task_state_machine.go | 18 +++++++++++++++++- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/service/history/workflow/mutable_state_impl_test.go b/service/history/workflow/mutable_state_impl_test.go index da7eef56f0c..3bf7c57c406 100644 --- a/service/history/workflow/mutable_state_impl_test.go +++ b/service/history/workflow/mutable_state_impl_test.go @@ -60,6 +60,7 @@ import ( "go.temporal.io/server/common/persistence/versionhistory" "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/common/searchattribute" + "go.temporal.io/server/common/tqname" "go.temporal.io/server/common/worker_versioning" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/events" @@ -461,14 +462,25 @@ func (s *mutableStateSuite) TestTransientWorkflowTaskStart_CurrentVersionChanged err = s.mutableState.UpdateCurrentVersion(version+1, true) s.NoError(err) + name, err := tqname.FromBaseName("tq") + s.NoError(err) + _, _, err = s.mutableState.AddWorkflowTaskStartedEvent( s.mutableState.GetNextEventID(), uuid.New(), - &taskqueuepb.TaskQueue{}, + &taskqueuepb.TaskQueue{Name: name.WithPartition(5).FullName()}, "random identity", ) s.NoError(err) s.Equal(0, s.mutableState.hBuilder.NumBufferedEvents()) + + mutation, err := s.mutableState.hBuilder.Finish(true) + s.NoError(err) + s.Equal(1, len(mutation.DBEventsBatches)) + s.Equal(2, len(mutation.DBEventsBatches[0])) + attrs := mutation.DBEventsBatches[0][0].GetWorkflowTaskScheduledEventAttributes() + s.NotNil(attrs) + s.Equal("tq", attrs.TaskQueue.Name) } func (s *mutableStateSuite) TestSanitizedMutableState() { diff --git a/service/history/workflow/workflow_task_state_machine.go b/service/history/workflow/workflow_task_state_machine.go index e17f4dac65f..fb84b343266 100644 --- a/service/history/workflow/workflow_task_state_machine.go +++ b/service/history/workflow/workflow_task_state_machine.go @@ -47,6 +47,7 @@ import ( "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/primitives/timestamp" + "go.temporal.io/server/common/tqname" ) type ( @@ -458,7 +459,9 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent( workflowTask.Type = enumsspb.WORKFLOW_TASK_TYPE_NORMAL workflowTaskScheduledEventCreated = true scheduledEvent := m.ms.hBuilder.AddWorkflowTaskScheduledEvent( - taskQueue, + // taskQueue may come directly from RecordWorkflowTaskStarted from matching, which will + // contain a specific partition name. We only want to record the base name here. + cleanTaskQueue(taskQueue), workflowTask.WorkflowTaskTimeout, workflowTask.Attempt, startTime, @@ -1035,3 +1038,16 @@ func (m *workflowTaskStateMachine) convertSpeculativeWorkflowTaskToNormal() erro return nil } + +func cleanTaskQueue(tq *taskqueuepb.TaskQueue) *taskqueuepb.TaskQueue { + if tq == nil { + return tq + } + name, err := tqname.Parse(tq.Name) + if err != nil { + return tq + } + cleanTq := *tq + cleanTq.Name = name.BaseNameString() + return &cleanTq +}