diff --git a/functions/errors.go b/functions/errors.go index 709bfd9..23d7bcf 100644 --- a/functions/errors.go +++ b/functions/errors.go @@ -24,9 +24,27 @@ func (e ErrAlreadyRegistered) Error() string { // ErrValidation occurs when function payload doesn't validate. type ErrValidation struct { - original string + message string } func (e ErrValidation) Error() string { - return fmt.Sprintf("Function doesn't validate. Validation error: %q", e.original) + return fmt.Sprintf("Function doesn't validate. Validation error: %q", e.message) +} + +// ErrFunctionCallFailed occurs when function call failed because of provider error. +type ErrFunctionCallFailed struct { + original error +} + +func (e ErrFunctionCallFailed) Error() string { + return fmt.Sprintf("Function call failed. Error: %q", e.original) +} + +// ErrFunctionCallFailedProviderError occurs when function call failed because of provider error. +type ErrFunctionCallFailedProviderError struct { + original error +} + +func (e ErrFunctionCallFailedProviderError) Error() string { + return fmt.Sprintf("Function call failed because of provider error. Error: %q", e.original) } diff --git a/functions/types.go b/functions/types.go index edc7e26..e4ee0e6 100644 --- a/functions/types.go +++ b/functions/types.go @@ -3,6 +3,7 @@ package functions import ( "bytes" "errors" + "fmt" "io/ioutil" "math/rand" "net/http" @@ -12,6 +13,7 @@ import ( validator "gopkg.in/go-playground/validator.v9" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/lambda" @@ -124,6 +126,16 @@ func (f *Function) callAWSLambda(payload []byte) ([]byte, error) { FunctionName: &f.Provider.ARN, Payload: payload, }) + if err != nil { + if awserr, ok := err.(awserr.Error); ok { + switch awserr.Code() { + case lambda.ErrCodeServiceException: + return nil, &ErrFunctionCallFailedProviderError{awserr} + default: + return nil, &ErrFunctionCallFailed{awserr} + } + } + } return invokeOutput.Payload, err } @@ -135,10 +147,13 @@ func (f *Function) callHTTP(payload []byte) ([]byte, error) { resp, err := client.Post(f.Provider.URL, "application/json", bytes.NewReader(payload)) if err != nil { - return []byte{}, err + return nil, &ErrFunctionCallFailed{err} + } + if resp.StatusCode == http.StatusInternalServerError { + return nil, &ErrFunctionCallFailedProviderError{fmt.Errorf("HTTP status code: %d", http.StatusInternalServerError)} } - defer resp.Body.Close() + defer resp.Body.Close() return ioutil.ReadAll(resp.Body) } diff --git a/router/event.go b/router/event.go index 65f6363..9d392c4 100644 --- a/router/event.go +++ b/router/event.go @@ -10,8 +10,8 @@ import ( "github.com/serverless/event-gateway/pubsub" ) -// Schema is a default event schema. All data that passes through the Event Gateway is formatted as an Event, based on this schema. -type Schema struct { +// Event is a default event structure. All data that passes through the Event Gateway is formatted as an Event, based on this schema. +type Event struct { Event string `json:"event"` ID string `json:"id"` ReceivedAt uint64 `json:"receivedAt"` @@ -19,8 +19,19 @@ type Schema struct { DataType string `json:"dataType"` } -// HTTPSchema is a event schema used for sending events to HTTP subscriptions. -type HTTPSchema struct { +// NewEvent return new instance of Event. +func NewEvent(name, mime string, payload interface{}) *Event { + return &Event{ + Event: name, + ID: uuid.NewV4().String(), + ReceivedAt: uint64(time.Now().UnixNano() / int64(time.Millisecond)), + DataType: mime, + Data: payload, + } +} + +// HTTPEvent is a event schema used for sending events to HTTP subscriptions. +type HTTPEvent struct { Headers map[string][]string `json:"headers"` Query map[string][]string `json:"query"` Body interface{} `json:"body"` @@ -31,7 +42,7 @@ const ( mimeOctetStrem = "application/octet-stream" ) -func fromRequest(r *http.Request) (*Schema, error) { +func fromRequest(r *http.Request) (*Event, error) { name := r.Header.Get("event") if name == "" { name = eventHTTP @@ -47,13 +58,7 @@ func fromRequest(r *http.Request) (*Schema, error) { return nil, err } - event := &Schema{ - Event: name, - ID: uuid.NewV4().String(), - ReceivedAt: uint64(time.Now().UnixNano() / int64(time.Millisecond)), - DataType: mime, - Data: body, - } + event := NewEvent(name, mime, body) if mime == mimeJSON && len(body) > 0 { err := json.Unmarshal(body, &event.Data) @@ -63,7 +68,7 @@ func fromRequest(r *http.Request) (*Schema, error) { } if event.Event == eventHTTP { - event.Data = &HTTPSchema{ + event.Data = &HTTPEvent{ Headers: r.Header, Query: r.URL.Query(), Body: event.Data, diff --git a/router/integration_test.go b/router/integration_test.go index 2926b64..865c6cd 100644 --- a/router/integration_test.go +++ b/router/integration_test.go @@ -55,7 +55,7 @@ func TestIntegrationSubscription(t *testing.T) { func(w http.ResponseWriter, r *http.Request) { reqBuf, _ := ioutil.ReadAll(r.Body) - var event Schema + var event Event err := json.Unmarshal(reqBuf, &event) if err != nil { panic(err) diff --git a/router/router.go b/router/router.go index b27fa99..a84a094 100644 --- a/router/router.go +++ b/router/router.go @@ -150,6 +150,8 @@ const ( // headerFunctionID is a header name for specifing function id for sync invocation. headerFunctionID = "function-id" + + internalFunctionProviderError = "gateway.warn.functionProviderError" ) var ( @@ -157,7 +159,7 @@ var ( errUnableToLookUpRegisteredFunction = errors.New("unable to look up registered function") ) -func (router *Router) handleHTTPEvent(event *Schema, w http.ResponseWriter, r *http.Request) { +func (router *Router) handleHTTPEvent(event *Event, w http.ResponseWriter, r *http.Request) { payload, err := json.Marshal(event) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -186,7 +188,7 @@ func (router *Router) handleHTTPEvent(event *Schema, w http.ResponseWriter, r *h } } -func (router *Router) handleEvent(instance *Schema, w http.ResponseWriter, r *http.Request) { +func (router *Router) handleEvent(instance *Event, w http.ResponseWriter, r *http.Request) { payload, err := json.Marshal(instance) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -261,13 +263,23 @@ func (router *Router) callFunction(backingFunctionID functions.FunctionID, paylo return []byte{}, errUnableToLookUpRegisteredFunction } - result, err := f.Call(payload) - if err != nil { - resErr := errors.New("unable to reach backing function: " + err.Error()) - return []byte{}, resErr + resp, err := f.Call(payload) + + if _, ok := err.(*functions.ErrFunctionCallFailedProviderError); ok { + internal := NewEvent(internalFunctionProviderError, mimeJSON, struct { + FunctionID string `json:"functionId"` + }{string(backingFunctionID)}) + payload, err = json.Marshal(internal) + if err == nil { + router.log.Debug("Event received.", zap.String("event", string(payload))) + router.processEvent(event{ + topics: []pubsub.TopicID{pubsub.TopicID(internal.Event)}, + payload: payload, + }) + } } - return result, nil + return resp, err } // loop is the main loop for a pub/sub worker goroutine @@ -308,9 +320,7 @@ func (router *Router) loop() { } } -// processEvent sends event to a set of topics, -// and for each of the functions that get called -// as part of those topics. +// processEvent sends event to a set of topics, and for each of the functions that get called as part of those topics. func (router *Router) processEvent(e event) { for _, topicID := range e.topics { subscribers := router.targetCache.SubscribersOfTopic(topicID) @@ -349,14 +359,12 @@ func (router *Router) enqueueWork(topicMap map[pubsub.TopicID]struct{}, payload payload: payload, }: default: - // We could not submit any work, this is NOT good but - // we will sacrifice consistency for availability for now. + // We could not submit any work, this is NOT good but we will sacrifice consistency for availability for now. router.dropMetric.Inc() } } -// isDraining returns true if this Router is being drained of items -// in its work queue before shutting down. +// isDraining returns true if this Router is being drained of items in its work queue before shutting down. func (router *Router) isDraining() bool { select { case <-router.drain: