Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Drop task on serialization error #3803

Merged
merged 5 commits into from
Jan 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1433,11 +1433,12 @@ var (
TaskNotActiveCounter = NewCounterDef("task_errors_not_active_counter")
TaskLimitExceededCounter = NewCounterDef("task_errors_limit_exceeded_counter")
TaskNamespaceHandoverCounter = NewCounterDef("task_errors_namespace_handover")
TaskThrottledCounter = NewCounterDef("task_errors_throttled")
TaskCorruptionCounter = NewCounterDef("task_errors_corruption")
TaskScheduleToStartLatency = NewTimerDef("task_schedule_to_start_latency")
TransferTaskMissingEventCounter = NewCounterDef("transfer_task_missing_event_counter")
TaskBatchCompleteCounter = NewCounterDef("task_batch_complete_counter")
TaskReschedulerPendingTasks = NewDimensionlessHistogramDef("task_rescheduler_pending_tasks")
TaskThrottledCounter = NewCounterDef("task_throttled_counter")
PendingTasksCounter = NewDimensionlessHistogramDef("pending_tasks")
QueueScheduleLatency = NewTimerDef("queue_latency_schedule") // latency for scheduling 100 tasks in one task channel
QueueReaderCountHistogram = NewDimensionlessHistogramDef("queue_reader_count")
Expand Down
9 changes: 4 additions & 5 deletions common/persistence/data_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,21 @@
package persistence

import (
"fmt"

commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
)

// NewDataBlob returns a new DataBlob
// TODO: return an UnknowEncodingType error with the actual type string when encodingTypeStr is invalid
func NewDataBlob(data []byte, encodingTypeStr string) *commonpb.DataBlob {
if len(data) == 0 {
return nil
}

encodingType, ok := enumspb.EncodingType_value[encodingTypeStr]
if !ok || (enumspb.EncodingType(encodingType) != enumspb.ENCODING_TYPE_PROTO3 &&
enumspb.EncodingType(encodingType) != enumspb.ENCODING_TYPE_JSON) {
panic(fmt.Sprintf("Invalid encoding: %v", encodingTypeStr))
if !ok {
// encodingTypeStr not valid, an error will be returned on deserialization
encodingType = int32(enumspb.ENCODING_TYPE_UNSPECIFIED)
}

return &commonpb.DataBlob{
Expand Down
12 changes: 5 additions & 7 deletions common/persistence/serialization/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
package serialization

import (
"fmt"

"github.com/gogo/protobuf/proto"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
Expand Down Expand Up @@ -137,7 +135,7 @@ func encode(
case enumspb.ENCODING_TYPE_PROTO3:
return proto3Encode(object)
default:
return commonpb.DataBlob{}, fmt.Errorf("unknown encoding type: %v", encoding)
return commonpb.DataBlob{}, NewUnknownEncodingTypeError(encoding.String(), enumspb.ENCODING_TYPE_JSON, enumspb.ENCODING_TYPE_PROTO3)
}
}

Expand All @@ -156,27 +154,27 @@ func decode(
case enumspb.ENCODING_TYPE_PROTO3:
return proto3Decode(blob, encoding, result)
default:
return fmt.Errorf("unknown encoding type: %v", encoding)
return NewUnknownEncodingTypeError(encoding, enumspb.ENCODING_TYPE_JSON, enumspb.ENCODING_TYPE_PROTO3)
}
}

func proto3Encode(m proto.Message) (commonpb.DataBlob, error) {
blob := commonpb.DataBlob{EncodingType: enumspb.ENCODING_TYPE_PROTO3}
data, err := proto.Marshal(m)
if err != nil {
return blob, fmt.Errorf("error serializing struct to blob using %v encoding: %w", enumspb.ENCODING_TYPE_PROTO3, err)
return blob, NewSerializationError(enumspb.ENCODING_TYPE_PROTO3, err)
}
blob.Data = data
return blob, nil
}

func proto3Decode(blob []byte, encoding string, result proto.Message) error {
if e, ok := enumspb.EncodingType_value[encoding]; !ok || enumspb.EncodingType(e) != enumspb.ENCODING_TYPE_PROTO3 {
return fmt.Errorf("encoding %s doesn't match expected encoding %v", encoding, enumspb.ENCODING_TYPE_PROTO3)
return NewUnknownEncodingTypeError(encoding, enumspb.ENCODING_TYPE_PROTO3)
}

if err := proto.Unmarshal(blob, result); err != nil {
return fmt.Errorf("error deserializing blob using %v encoding: %w", enumspb.ENCODING_TYPE_PROTO3, err)
return NewDeserializationError(enumspb.ENCODING_TYPE_PROTO3, err)
}
return nil
}
93 changes: 68 additions & 25 deletions common/persistence/serialization/serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
package serialization

import (
"errors"
"fmt"
"reflect"
"strings"

"github.com/gogo/protobuf/proto"
commonpb "go.temporal.io/api/common/v1"
Expand Down Expand Up @@ -107,17 +109,20 @@ type (

// SerializationError is an error type for serialization
SerializationError struct {
msg string
encodingType enumspb.EncodingType
wrappedErr error
}

// DeserializationError is an error type for deserialization
DeserializationError struct {
msg string
encodingType enumspb.EncodingType
wrappedErr error
}

// UnknownEncodingTypeError is an error type for unknown or unsupported encoding type
UnknownEncodingTypeError struct {
encodingType enumspb.EncodingType
encodingTypeStr string
expectedEncodingStr []string
}

serializerImpl struct {
Expand Down Expand Up @@ -149,7 +154,7 @@ func (t *serializerImpl) DeserializeEvents(data *commonpb.DataBlob) ([]*historyp
// Client API currently specifies encodingType on requests which span multiple of these objects
err = events.Unmarshal(data.Data)
default:
return nil, NewDeserializationError("DeserializeEvents invalid encoding")
return nil, NewUnknownEncodingTypeError(data.EncodingType.String(), enumspb.ENCODING_TYPE_PROTO3)
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -179,7 +184,7 @@ func (t *serializerImpl) DeserializeEvent(data *commonpb.DataBlob) (*historypb.H
// Client API currently specifies encodingType on requests which span multiple of these objects
err = event.Unmarshal(data.Data)
default:
return nil, NewDeserializationError("DeserializeEvent invalid encoding")
return nil, NewUnknownEncodingTypeError(data.EncodingType.String(), enumspb.ENCODING_TYPE_PROTO3)
}

if err != nil {
Expand Down Expand Up @@ -212,7 +217,7 @@ func (t *serializerImpl) DeserializeClusterMetadata(data *commonpb.DataBlob) (*p
// Client API currently specifies encodingType on requests which span multiple of these objects
err = cm.Unmarshal(data.Data)
default:
return nil, NewDeserializationError("DeserializeClusterMetadata invalid encoding")
return nil, NewUnknownEncodingTypeError(data.EncodingType.String(), enumspb.ENCODING_TYPE_PROTO3)
}

if err != nil {
Expand All @@ -235,11 +240,11 @@ func (t *serializerImpl) serialize(p proto.Marshaler, encodingType enumspb.Encod
// Client API currently specifies encodingType on requests which span multiple of these objects
data, err = p.Marshal()
default:
return nil, NewUnknownEncodingTypeError(encodingType)
return nil, NewUnknownEncodingTypeError(encodingType.String(), enumspb.ENCODING_TYPE_PROTO3)
}

if err != nil {
return nil, NewSerializationError(err.Error())
return nil, NewSerializationError(enumspb.ENCODING_TYPE_PROTO3, err)
}

// Shouldn't happen, but keeping
Expand All @@ -254,30 +259,68 @@ func (t *serializerImpl) serialize(p proto.Marshaler, encodingType enumspb.Encod
}

// NewUnknownEncodingTypeError returns a new instance of encoding type error
func NewUnknownEncodingTypeError(encodingType enumspb.EncodingType) error {
return &UnknownEncodingTypeError{encodingType: encodingType}
func NewUnknownEncodingTypeError(
encodingTypeStr string,
expectedEncoding ...enumspb.EncodingType,
) error {
if len(expectedEncoding) == 0 {
for encodingType := range enumspb.EncodingType_name {
expectedEncoding = append(expectedEncoding, enumspb.EncodingType(encodingType))
}
}
expectedEncodingStr := make([]string, 0, len(expectedEncoding))
for _, encodingType := range expectedEncoding {
expectedEncodingStr = append(expectedEncodingStr, encodingType.String())
}
return &UnknownEncodingTypeError{
encodingTypeStr: encodingTypeStr,
expectedEncodingStr: expectedEncodingStr,
}
}

func (e *UnknownEncodingTypeError) Error() string {
return fmt.Sprintf("unknown or unsupported encoding type %v", e.encodingType)
return fmt.Sprintf("unknown or unsupported encoding type %v, supported types: %v",
e.encodingTypeStr,
strings.Join(e.expectedEncodingStr, ","),
)
}

// NewSerializationError returns a SerializationError
func NewSerializationError(msg string) error {
return &SerializationError{msg: msg}
func NewSerializationError(
encodingType enumspb.EncodingType,
serializationErr error,
) error {
return &SerializationError{
encodingType: encodingType,
wrappedErr: serializationErr,
}
}

func (e *SerializationError) Error() string {
return fmt.Sprintf("serialization error: %v", e.msg)
return fmt.Sprintf("error serializing using %v encoding: %v", e.encodingType, e.wrappedErr)
}

func (e *SerializationError) Unwrap() error {
return e.wrappedErr
}

// NewDeserializationError returns a DeserializationError
func NewDeserializationError(msg string) error {
return &DeserializationError{msg: msg}
func NewDeserializationError(
encodingType enumspb.EncodingType,
deserializationErr error,
) error {
return &DeserializationError{
encodingType: encodingType,
wrappedErr: deserializationErr,
}
}

func (e *DeserializationError) Error() string {
return fmt.Sprintf("deserialization error: %v", e.msg)
return fmt.Sprintf("error deserializing using %v encoding: %v", e.encodingType, e.wrappedErr)
}

func (e *DeserializationError) Unwrap() error {
return e.wrappedErr
}

func (t *serializerImpl) ShardInfoToBlob(info *persistencespb.ShardInfo, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
Expand Down Expand Up @@ -469,23 +512,23 @@ func (t *serializerImpl) ReplicationTaskFromBlob(data *commonpb.DataBlob) (*repl
func ProtoDecodeBlob(data *commonpb.DataBlob, result proto.Message) error {
if data == nil {
// TODO: should we return nil or error?
return NewDeserializationError("cannot decode nil")
return NewDeserializationError(enumspb.ENCODING_TYPE_UNSPECIFIED, errors.New("cannot decode nil"))
}

if data.EncodingType != enumspb.ENCODING_TYPE_PROTO3 {
return NewDeserializationError(fmt.Sprintf("encoding %v doesn't match expected encoding %v", data.EncodingType, enumspb.ENCODING_TYPE_PROTO3))
return NewUnknownEncodingTypeError(data.EncodingType.String(), enumspb.ENCODING_TYPE_PROTO3)
}

if err := proto.Unmarshal(data.Data, result); err != nil {
return NewDeserializationError(fmt.Sprintf("error deserializing blob using %v encoding: %s", enumspb.ENCODING_TYPE_PROTO3, err))
return NewDeserializationError(enumspb.ENCODING_TYPE_PROTO3, err)
}
return nil
}

func decodeBlob(data *commonpb.DataBlob, result proto.Message) error {
if data == nil {
// TODO: should we return nil or error?
return NewDeserializationError("cannot decode nil")
return NewDeserializationError(enumspb.ENCODING_TYPE_UNSPECIFIED, errors.New("cannot decode nil"))
}

if data.Data == nil {
Expand All @@ -498,7 +541,7 @@ func decodeBlob(data *commonpb.DataBlob, result proto.Message) error {
case enumspb.ENCODING_TYPE_PROTO3:
return ProtoDecodeBlob(data, result)
default:
return NewUnknownEncodingTypeError(data.EncodingType)
return NewUnknownEncodingTypeError(data.EncodingType.String(), enumspb.ENCODING_TYPE_JSON, enumspb.ENCODING_TYPE_PROTO3)
}
}

Expand All @@ -523,13 +566,13 @@ func encodeBlob(o proto.Message, encoding enumspb.EncodingType) (*commonpb.DataB
case enumspb.ENCODING_TYPE_PROTO3:
return ProtoEncodeBlob(o, enumspb.ENCODING_TYPE_PROTO3)
default:
return nil, NewUnknownEncodingTypeError(encoding)
return nil, NewUnknownEncodingTypeError(encoding.String(), enumspb.ENCODING_TYPE_JSON, enumspb.ENCODING_TYPE_PROTO3)
}
}

func ProtoEncodeBlob(m proto.Message, encoding enumspb.EncodingType) (*commonpb.DataBlob, error) {
if encoding != enumspb.ENCODING_TYPE_PROTO3 {
return nil, NewUnknownEncodingTypeError(encoding)
return nil, NewUnknownEncodingTypeError(encoding.String(), enumspb.ENCODING_TYPE_PROTO3)
}

if m == nil || (reflect.ValueOf(m).Kind() == reflect.Ptr && reflect.ValueOf(m).IsNil()) {
Expand All @@ -543,7 +586,7 @@ func ProtoEncodeBlob(m proto.Message, encoding enumspb.EncodingType) (*commonpb.
blob := &commonpb.DataBlob{EncodingType: enumspb.ENCODING_TYPE_PROTO3}
data, err := proto.Marshal(m)
if err != nil {
return nil, NewSerializationError(err.Error())
return nil, NewSerializationError(enumspb.ENCODING_TYPE_PROTO3, err)
}
blob.Data = data
return blob, nil
Expand Down
27 changes: 22 additions & 5 deletions service/history/queues/executable.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ package queues

import (
"context"
"errors"
"fmt"
"runtime/debug"
"sync"
"time"

Expand All @@ -42,6 +45,7 @@ import (
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence/serialization"
ctasks "go.temporal.io/server/common/tasks"
"go.temporal.io/server/common/util"
"go.temporal.io/server/service/history/consts"
Expand Down Expand Up @@ -178,19 +182,22 @@ func (e *executableImpl) Execute() (retErr error) {
headers.NewBackgroundCallerInfo(ns.String()),
)

var panicErr error
defer func() {
if panicErr != nil {
retErr = panicErr
if panicObj := recover(); panicObj != nil {
err, ok := panicObj.(error)
if !ok {
err = serviceerror.NewInternal(fmt.Sprintf("panic: %v", panicObj))
}

e.logger.Error("Panic is captured", tag.SysStackTrace(string(debug.Stack())), tag.Error(err))
retErr = err

// we need to guess the metrics tags here as we don't know which execution logic
// is actually used which is upto the executor implementation
e.taggedMetricsHandler = e.metricsHandler.WithTags(e.estimateTaskMetricTag()...)
}
}()

defer log.CapturePanic(e.logger, &panicErr)

startTime := e.timeSource.Now()

metricsTags, isActive, err := e.executor.Execute(ctx, e)
Expand Down Expand Up @@ -291,6 +298,16 @@ func (e *executableImpl) HandleErr(err error) (retErr error) {
return err
}

var deserializationError *serialization.DeserializationError
var encodingTypeError *serialization.UnknownEncodingTypeError
if errors.As(err, &deserializationError) || errors.As(err, &encodingTypeError) {
// likely due to data corruption, emit logs, metrics & drop the task by return nil so that
// task will be marked as completed.
e.taggedMetricsHandler.Counter(metrics.TaskCorruptionCounter.GetMetricName()).Record(1)
e.logger.Error("Drop task due to serialization error", tag.Error(err))
return nil
}

e.taggedMetricsHandler.Counter(metrics.TaskFailures.GetMetricName()).Record(1)

e.logger.Error("Fail to process task", tag.Error(err), tag.LifeCycleProcessingFailed)
Expand Down
Loading