Skip to content

Commit

Permalink
Move describe workflow to api package (#3469)
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 authored Oct 11, 2022
1 parent 0e70cf7 commit 7278168
Show file tree
Hide file tree
Showing 2 changed files with 196 additions and 147 deletions.
194 changes: 194 additions & 0 deletions service/history/api/describeworkflow/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package describeworkflow

import (
"context"

commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
workflowpb "go.temporal.io/api/workflow/v1"

enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/shard"
)

func Invoke(
ctx context.Context,
req *historyservice.DescribeWorkflowExecutionRequest,
shard shard.Context,
workflowConsistencyChecker api.WorkflowConsistencyChecker,
) (_ *historyservice.DescribeWorkflowExecutionResponse, retError error) {
namespaceID := namespace.ID(req.GetNamespaceId())
err := api.ValidateNamespaceUUID(namespaceID)
if err != nil {
return nil, err
}

weCtx, err := workflowConsistencyChecker.GetWorkflowContext(
ctx,
nil,
api.BypassMutableStateConsistencyPredicate,
definition.NewWorkflowKey(
req.NamespaceId,
req.Request.Execution.WorkflowId,
req.Request.Execution.RunId,
),
)
if err != nil {
return nil, err
}
defer func() { weCtx.GetReleaseFn()(retError) }()

mutableState := weCtx.GetMutableState()
executionInfo := mutableState.GetExecutionInfo()
executionState := mutableState.GetExecutionState()
result := &historyservice.DescribeWorkflowExecutionResponse{
ExecutionConfig: &workflowpb.WorkflowExecutionConfig{
TaskQueue: &taskqueuepb.TaskQueue{
Name: executionInfo.TaskQueue,
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
},
WorkflowExecutionTimeout: executionInfo.WorkflowExecutionTimeout,
WorkflowRunTimeout: executionInfo.WorkflowRunTimeout,
DefaultWorkflowTaskTimeout: executionInfo.DefaultWorkflowTaskTimeout,
},
WorkflowExecutionInfo: &workflowpb.WorkflowExecutionInfo{
Execution: &commonpb.WorkflowExecution{
WorkflowId: executionInfo.WorkflowId,
RunId: executionState.RunId,
},
Type: &commonpb.WorkflowType{Name: executionInfo.WorkflowTypeName},
StartTime: executionInfo.StartTime,
Status: executionState.Status,
HistoryLength: mutableState.GetNextEventID() - common.FirstEventID,
ExecutionTime: executionInfo.ExecutionTime,
Memo: &commonpb.Memo{Fields: executionInfo.Memo},
SearchAttributes: &commonpb.SearchAttributes{IndexedFields: executionInfo.SearchAttributes},
AutoResetPoints: executionInfo.AutoResetPoints,
TaskQueue: executionInfo.TaskQueue,
StateTransitionCount: executionInfo.StateTransitionCount,
},
}

if executionInfo.ParentRunId != "" {
result.WorkflowExecutionInfo.ParentExecution = &commonpb.WorkflowExecution{
WorkflowId: executionInfo.ParentWorkflowId,
RunId: executionInfo.ParentRunId,
}
result.WorkflowExecutionInfo.ParentNamespaceId = executionInfo.ParentNamespaceId
}
if executionState.State == enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED {
// for closed workflow
result.WorkflowExecutionInfo.Status = executionState.Status
closeTime, err := mutableState.GetWorkflowCloseTime(ctx)
if err != nil {
return nil, err
}
result.WorkflowExecutionInfo.CloseTime = closeTime
}

if len(mutableState.GetPendingActivityInfos()) > 0 {
for _, ai := range mutableState.GetPendingActivityInfos() {
p := &workflowpb.PendingActivityInfo{
ActivityId: ai.ActivityId,
}
if ai.CancelRequested {
p.State = enumspb.PENDING_ACTIVITY_STATE_CANCEL_REQUESTED
} else if ai.StartedEventId != common.EmptyEventID {
p.State = enumspb.PENDING_ACTIVITY_STATE_STARTED
} else {
p.State = enumspb.PENDING_ACTIVITY_STATE_SCHEDULED
}
if !timestamp.TimeValue(ai.LastHeartbeatUpdateTime).IsZero() {
p.LastHeartbeatTime = ai.LastHeartbeatUpdateTime
p.HeartbeatDetails = ai.LastHeartbeatDetails
}
// TODO: move to mutable state instead of loading it from event
scheduledEvent, err := mutableState.GetActivityScheduledEvent(ctx, ai.ScheduledEventId)
if err != nil {
return nil, err
}
p.ActivityType = scheduledEvent.GetActivityTaskScheduledEventAttributes().ActivityType
if p.State == enumspb.PENDING_ACTIVITY_STATE_SCHEDULED {
p.ScheduledTime = ai.ScheduledTime
} else {
p.LastStartedTime = ai.StartedTime
}
p.LastWorkerIdentity = ai.StartedIdentity
if ai.HasRetryPolicy {
p.Attempt = ai.Attempt
p.ExpirationTime = ai.RetryExpirationTime
if ai.RetryMaximumAttempts != 0 {
p.MaximumAttempts = ai.RetryMaximumAttempts
}
if ai.RetryLastFailure != nil {
p.LastFailure = ai.RetryLastFailure
}
if p.LastWorkerIdentity == "" && ai.RetryLastWorkerIdentity != "" {
p.LastWorkerIdentity = ai.RetryLastWorkerIdentity
}
} else {
p.Attempt = 1
}
result.PendingActivities = append(result.PendingActivities, p)
}
}

if len(mutableState.GetPendingChildExecutionInfos()) > 0 {
for _, ch := range mutableState.GetPendingChildExecutionInfos() {
p := &workflowpb.PendingChildExecutionInfo{
WorkflowId: ch.StartedWorkflowId,
RunId: ch.StartedRunId,
WorkflowTypeName: ch.WorkflowTypeName,
InitiatedId: ch.InitiatedEventId,
ParentClosePolicy: ch.ParentClosePolicy,
}
result.PendingChildren = append(result.PendingChildren, p)
}
}

if pendingWorkflowTask, ok := mutableState.GetPendingWorkflowTask(); ok {
result.PendingWorkflowTask = &workflowpb.PendingWorkflowTaskInfo{
State: enumspb.PENDING_WORKFLOW_TASK_STATE_SCHEDULED,
ScheduledTime: pendingWorkflowTask.ScheduledTime,
OriginalScheduledTime: pendingWorkflowTask.OriginalScheduledTime,
Attempt: pendingWorkflowTask.Attempt,
}
if pendingWorkflowTask.StartedEventID != common.EmptyEventID {
result.PendingWorkflowTask.State = enumspb.PENDING_WORKFLOW_TASK_STATE_STARTED
result.PendingWorkflowTask.StartedTime = pendingWorkflowTask.StartedTime
}
}

return result, nil
}
149 changes: 2 additions & 147 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,8 @@ import (
historypb "go.temporal.io/api/history/v1"
querypb "go.temporal.io/api/query/v1"
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
workflowpb "go.temporal.io/api/workflow/v1"
"go.temporal.io/api/workflowservice/v1"

enumsspb "go.temporal.io/server/api/enums/v1"
historyspb "go.temporal.io/server/api/history/v1"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/api/matchingservice/v1"
Expand All @@ -61,6 +58,7 @@ import (
"go.temporal.io/server/common/sdk"
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/api/describeworkflow"
"go.temporal.io/server/service/history/api/recordactivitytaskheartbeat"
"go.temporal.io/server/service/history/api/recordactivitytaskstarted"
"go.temporal.io/server/service/history/api/recordchildworkflowcompleted"
Expand Down Expand Up @@ -751,150 +749,7 @@ func (e *historyEngineImpl) DescribeWorkflowExecution(
ctx context.Context,
request *historyservice.DescribeWorkflowExecutionRequest,
) (_ *historyservice.DescribeWorkflowExecutionResponse, retError error) {

namespaceID := namespace.ID(request.GetNamespaceId())
err := api.ValidateNamespaceUUID(namespaceID)
if err != nil {
return nil, err
}

weCtx, err := e.workflowConsistencyChecker.GetWorkflowContext(
ctx,
nil,
api.BypassMutableStateConsistencyPredicate,
definition.NewWorkflowKey(
request.NamespaceId,
request.Request.Execution.WorkflowId,
request.Request.Execution.RunId,
),
)
if err != nil {
return nil, err
}
defer func() { weCtx.GetReleaseFn()(retError) }()

mutableState := weCtx.GetMutableState()
executionInfo := mutableState.GetExecutionInfo()
executionState := mutableState.GetExecutionState()
result := &historyservice.DescribeWorkflowExecutionResponse{
ExecutionConfig: &workflowpb.WorkflowExecutionConfig{
TaskQueue: &taskqueuepb.TaskQueue{
Name: executionInfo.TaskQueue,
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
},
WorkflowExecutionTimeout: executionInfo.WorkflowExecutionTimeout,
WorkflowRunTimeout: executionInfo.WorkflowRunTimeout,
DefaultWorkflowTaskTimeout: executionInfo.DefaultWorkflowTaskTimeout,
},
WorkflowExecutionInfo: &workflowpb.WorkflowExecutionInfo{
Execution: &commonpb.WorkflowExecution{
WorkflowId: executionInfo.WorkflowId,
RunId: executionState.RunId,
},
Type: &commonpb.WorkflowType{Name: executionInfo.WorkflowTypeName},
StartTime: executionInfo.StartTime,
Status: executionState.Status,
HistoryLength: mutableState.GetNextEventID() - common.FirstEventID,
ExecutionTime: executionInfo.ExecutionTime,
Memo: &commonpb.Memo{Fields: executionInfo.Memo},
SearchAttributes: &commonpb.SearchAttributes{IndexedFields: executionInfo.SearchAttributes},
AutoResetPoints: executionInfo.AutoResetPoints,
TaskQueue: executionInfo.TaskQueue,
StateTransitionCount: executionInfo.StateTransitionCount,
},
}

if executionInfo.ParentRunId != "" {
result.WorkflowExecutionInfo.ParentExecution = &commonpb.WorkflowExecution{
WorkflowId: executionInfo.ParentWorkflowId,
RunId: executionInfo.ParentRunId,
}
result.WorkflowExecutionInfo.ParentNamespaceId = executionInfo.ParentNamespaceId
}
if executionState.State == enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED {
// for closed workflow
result.WorkflowExecutionInfo.Status = executionState.Status
closeTime, err := mutableState.GetWorkflowCloseTime(ctx)
if err != nil {
return nil, err
}
result.WorkflowExecutionInfo.CloseTime = closeTime
}

if len(mutableState.GetPendingActivityInfos()) > 0 {
for _, ai := range mutableState.GetPendingActivityInfos() {
p := &workflowpb.PendingActivityInfo{
ActivityId: ai.ActivityId,
}
if ai.CancelRequested {
p.State = enumspb.PENDING_ACTIVITY_STATE_CANCEL_REQUESTED
} else if ai.StartedEventId != common.EmptyEventID {
p.State = enumspb.PENDING_ACTIVITY_STATE_STARTED
} else {
p.State = enumspb.PENDING_ACTIVITY_STATE_SCHEDULED
}
if !timestamp.TimeValue(ai.LastHeartbeatUpdateTime).IsZero() {
p.LastHeartbeatTime = ai.LastHeartbeatUpdateTime
p.HeartbeatDetails = ai.LastHeartbeatDetails
}
// TODO: move to mutable state instead of loading it from event
scheduledEvent, err := mutableState.GetActivityScheduledEvent(ctx, ai.ScheduledEventId)
if err != nil {
return nil, err
}
p.ActivityType = scheduledEvent.GetActivityTaskScheduledEventAttributes().ActivityType
if p.State == enumspb.PENDING_ACTIVITY_STATE_SCHEDULED {
p.ScheduledTime = ai.ScheduledTime
} else {
p.LastStartedTime = ai.StartedTime
}
p.LastWorkerIdentity = ai.StartedIdentity
if ai.HasRetryPolicy {
p.Attempt = ai.Attempt
p.ExpirationTime = ai.RetryExpirationTime
if ai.RetryMaximumAttempts != 0 {
p.MaximumAttempts = ai.RetryMaximumAttempts
}
if ai.RetryLastFailure != nil {
p.LastFailure = ai.RetryLastFailure
}
if p.LastWorkerIdentity == "" && ai.RetryLastWorkerIdentity != "" {
p.LastWorkerIdentity = ai.RetryLastWorkerIdentity
}
} else {
p.Attempt = 1
}
result.PendingActivities = append(result.PendingActivities, p)
}
}

if len(mutableState.GetPendingChildExecutionInfos()) > 0 {
for _, ch := range mutableState.GetPendingChildExecutionInfos() {
p := &workflowpb.PendingChildExecutionInfo{
WorkflowId: ch.StartedWorkflowId,
RunId: ch.StartedRunId,
WorkflowTypeName: ch.WorkflowTypeName,
InitiatedId: ch.InitiatedEventId,
ParentClosePolicy: ch.ParentClosePolicy,
}
result.PendingChildren = append(result.PendingChildren, p)
}
}

if pendingWorkflowTask, ok := mutableState.GetPendingWorkflowTask(); ok {
result.PendingWorkflowTask = &workflowpb.PendingWorkflowTaskInfo{
State: enumspb.PENDING_WORKFLOW_TASK_STATE_SCHEDULED,
ScheduledTime: pendingWorkflowTask.ScheduledTime,
OriginalScheduledTime: pendingWorkflowTask.OriginalScheduledTime,
Attempt: pendingWorkflowTask.Attempt,
}
if pendingWorkflowTask.StartedEventID != common.EmptyEventID {
result.PendingWorkflowTask.State = enumspb.PENDING_WORKFLOW_TASK_STATE_STARTED
result.PendingWorkflowTask.StartedTime = pendingWorkflowTask.StartedTime
}
}

return result, nil
return describeworkflow.Invoke(ctx, request, e.shard, e.workflowConsistencyChecker)
}

func (e *historyEngineImpl) RecordActivityTaskStarted(
Expand Down

0 comments on commit 7278168

Please # to comment.