Skip to content

Commit

Permalink
Clean task queue name from matching before writing to WorkflowTaskSch…
Browse files Browse the repository at this point in the history
…eduledEvent (#4565)

**What changed?**
In some cases we write a new WorkflowTaskScheduledEvent right before a
WorkflowTaskStartedEvent in RecordWorkflowTaskStarted. In that case, the
task queue name comes from RecordWorkflowTaskStarted, but that name
comes from matching and may be a specific task queue partition, not the
base name. We should write only the base name.

**Why?**
Fixes #4557

**How did you test it?**
Unit test
  • Loading branch information
dnr authored Aug 8, 2023
1 parent b1804c2 commit e7dd2ff
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 2 deletions.
14 changes: 13 additions & 1 deletion service/history/workflow/mutable_state_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import (
"go.temporal.io/server/common/persistence/versionhistory"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/common/tqname"
"go.temporal.io/server/common/worker_versioning"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/events"
Expand Down Expand Up @@ -461,14 +462,25 @@ func (s *mutableStateSuite) TestTransientWorkflowTaskStart_CurrentVersionChanged
err = s.mutableState.UpdateCurrentVersion(version+1, true)
s.NoError(err)

name, err := tqname.FromBaseName("tq")
s.NoError(err)

_, _, err = s.mutableState.AddWorkflowTaskStartedEvent(
s.mutableState.GetNextEventID(),
uuid.New(),
&taskqueuepb.TaskQueue{},
&taskqueuepb.TaskQueue{Name: name.WithPartition(5).FullName()},
"random identity",
)
s.NoError(err)
s.Equal(0, s.mutableState.hBuilder.NumBufferedEvents())

mutation, err := s.mutableState.hBuilder.Finish(true)
s.NoError(err)
s.Equal(1, len(mutation.DBEventsBatches))
s.Equal(2, len(mutation.DBEventsBatches[0]))
attrs := mutation.DBEventsBatches[0][0].GetWorkflowTaskScheduledEventAttributes()
s.NotNil(attrs)
s.Equal("tq", attrs.TaskQueue.Name)
}

func (s *mutableStateSuite) TestSanitizedMutableState() {
Expand Down
18 changes: 17 additions & 1 deletion service/history/workflow/workflow_task_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/tqname"
)

type (
Expand Down Expand Up @@ -458,7 +459,9 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskStartedEvent(
workflowTask.Type = enumsspb.WORKFLOW_TASK_TYPE_NORMAL
workflowTaskScheduledEventCreated = true
scheduledEvent := m.ms.hBuilder.AddWorkflowTaskScheduledEvent(
taskQueue,
// taskQueue may come directly from RecordWorkflowTaskStarted from matching, which will
// contain a specific partition name. We only want to record the base name here.
cleanTaskQueue(taskQueue),
workflowTask.WorkflowTaskTimeout,
workflowTask.Attempt,
startTime,
Expand Down Expand Up @@ -1035,3 +1038,16 @@ func (m *workflowTaskStateMachine) convertSpeculativeWorkflowTaskToNormal() erro

return nil
}

func cleanTaskQueue(tq *taskqueuepb.TaskQueue) *taskqueuepb.TaskQueue {
if tq == nil {
return tq
}
name, err := tqname.Parse(tq.Name)
if err != nil {
return tq
}
cleanTq := *tq
cleanTq.Name = name.BaseNameString()
return &cleanTq
}

0 comments on commit e7dd2ff

Please # to comment.