Skip to content
This repository was archived by the owner on Dec 9, 2024. It is now read-only.

Commit

Permalink
add emitting internal gateway event (#274)
Browse files Browse the repository at this point in the history
  • Loading branch information
mthenw authored Aug 11, 2017
1 parent 63b91d3 commit 3de7860
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 32 deletions.
22 changes: 20 additions & 2 deletions functions/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,27 @@ func (e ErrAlreadyRegistered) Error() string {

// ErrValidation occurs when function payload doesn't validate.
type ErrValidation struct {
original string
message string
}

func (e ErrValidation) Error() string {
return fmt.Sprintf("Function doesn't validate. Validation error: %q", e.original)
return fmt.Sprintf("Function doesn't validate. Validation error: %q", e.message)
}

// ErrFunctionCallFailed occurs when function call failed because of provider error.
type ErrFunctionCallFailed struct {
original error
}

func (e ErrFunctionCallFailed) Error() string {
return fmt.Sprintf("Function call failed. Error: %q", e.original)
}

// ErrFunctionCallFailedProviderError occurs when function call failed because of provider error.
type ErrFunctionCallFailedProviderError struct {
original error
}

func (e ErrFunctionCallFailedProviderError) Error() string {
return fmt.Sprintf("Function call failed because of provider error. Error: %q", e.original)
}
19 changes: 17 additions & 2 deletions functions/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package functions
import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
Expand All @@ -12,6 +13,7 @@ import (
validator "gopkg.in/go-playground/validator.v9"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/lambda"
Expand Down Expand Up @@ -124,6 +126,16 @@ func (f *Function) callAWSLambda(payload []byte) ([]byte, error) {
FunctionName: &f.Provider.ARN,
Payload: payload,
})
if err != nil {
if awserr, ok := err.(awserr.Error); ok {
switch awserr.Code() {
case lambda.ErrCodeServiceException:
return nil, &ErrFunctionCallFailedProviderError{awserr}
default:
return nil, &ErrFunctionCallFailed{awserr}
}
}
}

return invokeOutput.Payload, err
}
Expand All @@ -135,10 +147,13 @@ func (f *Function) callHTTP(payload []byte) ([]byte, error) {

resp, err := client.Post(f.Provider.URL, "application/json", bytes.NewReader(payload))
if err != nil {
return []byte{}, err
return nil, &ErrFunctionCallFailed{err}
}
if resp.StatusCode == http.StatusInternalServerError {
return nil, &ErrFunctionCallFailedProviderError{fmt.Errorf("HTTP status code: %d", http.StatusInternalServerError)}
}
defer resp.Body.Close()

defer resp.Body.Close()
return ioutil.ReadAll(resp.Body)
}

Expand Down
31 changes: 18 additions & 13 deletions router/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,28 @@ import (
"github.com/serverless/event-gateway/pubsub"
)

// Schema is a default event schema. All data that passes through the Event Gateway is formatted as an Event, based on this schema.
type Schema struct {
// Event is a default event structure. All data that passes through the Event Gateway is formatted as an Event, based on this schema.
type Event struct {
Event string `json:"event"`
ID string `json:"id"`
ReceivedAt uint64 `json:"receivedAt"`
Data interface{} `json:"data"`
DataType string `json:"dataType"`
}

// HTTPSchema is a event schema used for sending events to HTTP subscriptions.
type HTTPSchema struct {
// NewEvent return new instance of Event.
func NewEvent(name, mime string, payload interface{}) *Event {
return &Event{
Event: name,
ID: uuid.NewV4().String(),
ReceivedAt: uint64(time.Now().UnixNano() / int64(time.Millisecond)),
DataType: mime,
Data: payload,
}
}

// HTTPEvent is a event schema used for sending events to HTTP subscriptions.
type HTTPEvent struct {
Headers map[string][]string `json:"headers"`
Query map[string][]string `json:"query"`
Body interface{} `json:"body"`
Expand All @@ -31,7 +42,7 @@ const (
mimeOctetStrem = "application/octet-stream"
)

func fromRequest(r *http.Request) (*Schema, error) {
func fromRequest(r *http.Request) (*Event, error) {
name := r.Header.Get("event")
if name == "" {
name = eventHTTP
Expand All @@ -47,13 +58,7 @@ func fromRequest(r *http.Request) (*Schema, error) {
return nil, err
}

event := &Schema{
Event: name,
ID: uuid.NewV4().String(),
ReceivedAt: uint64(time.Now().UnixNano() / int64(time.Millisecond)),
DataType: mime,
Data: body,
}
event := NewEvent(name, mime, body)

if mime == mimeJSON && len(body) > 0 {
err := json.Unmarshal(body, &event.Data)
Expand All @@ -63,7 +68,7 @@ func fromRequest(r *http.Request) (*Schema, error) {
}

if event.Event == eventHTTP {
event.Data = &HTTPSchema{
event.Data = &HTTPEvent{
Headers: r.Header,
Query: r.URL.Query(),
Body: event.Data,
Expand Down
2 changes: 1 addition & 1 deletion router/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestIntegrationSubscription(t *testing.T) {
func(w http.ResponseWriter, r *http.Request) {
reqBuf, _ := ioutil.ReadAll(r.Body)

var event Schema
var event Event
err := json.Unmarshal(reqBuf, &event)
if err != nil {
panic(err)
Expand Down
36 changes: 22 additions & 14 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,16 @@ const (

// headerFunctionID is a header name for specifing function id for sync invocation.
headerFunctionID = "function-id"

internalFunctionProviderError = "gateway.warn.functionProviderError"
)

var (
errUnableToLookUpBackingFunction = errors.New("unable to look up backing function")
errUnableToLookUpRegisteredFunction = errors.New("unable to look up registered function")
)

func (router *Router) handleHTTPEvent(event *Schema, w http.ResponseWriter, r *http.Request) {
func (router *Router) handleHTTPEvent(event *Event, w http.ResponseWriter, r *http.Request) {
payload, err := json.Marshal(event)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand Down Expand Up @@ -186,7 +188,7 @@ func (router *Router) handleHTTPEvent(event *Schema, w http.ResponseWriter, r *h
}
}

func (router *Router) handleEvent(instance *Schema, w http.ResponseWriter, r *http.Request) {
func (router *Router) handleEvent(instance *Event, w http.ResponseWriter, r *http.Request) {
payload, err := json.Marshal(instance)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand Down Expand Up @@ -261,13 +263,23 @@ func (router *Router) callFunction(backingFunctionID functions.FunctionID, paylo
return []byte{}, errUnableToLookUpRegisteredFunction
}

result, err := f.Call(payload)
if err != nil {
resErr := errors.New("unable to reach backing function: " + err.Error())
return []byte{}, resErr
resp, err := f.Call(payload)

if _, ok := err.(*functions.ErrFunctionCallFailedProviderError); ok {
internal := NewEvent(internalFunctionProviderError, mimeJSON, struct {
FunctionID string `json:"functionId"`
}{string(backingFunctionID)})
payload, err = json.Marshal(internal)
if err == nil {
router.log.Debug("Event received.", zap.String("event", string(payload)))
router.processEvent(event{
topics: []pubsub.TopicID{pubsub.TopicID(internal.Event)},
payload: payload,
})
}
}

return result, nil
return resp, err
}

// loop is the main loop for a pub/sub worker goroutine
Expand Down Expand Up @@ -308,9 +320,7 @@ func (router *Router) loop() {
}
}

// processEvent sends event to a set of topics,
// and for each of the functions that get called
// as part of those topics.
// processEvent sends event to a set of topics, and for each of the functions that get called as part of those topics.
func (router *Router) processEvent(e event) {
for _, topicID := range e.topics {
subscribers := router.targetCache.SubscribersOfTopic(topicID)
Expand Down Expand Up @@ -349,14 +359,12 @@ func (router *Router) enqueueWork(topicMap map[pubsub.TopicID]struct{}, payload
payload: payload,
}:
default:
// We could not submit any work, this is NOT good but
// we will sacrifice consistency for availability for now.
// We could not submit any work, this is NOT good but we will sacrifice consistency for availability for now.
router.dropMetric.Inc()
}
}

// isDraining returns true if this Router is being drained of items
// in its work queue before shutting down.
// isDraining returns true if this Router is being drained of items in its work queue before shutting down.
func (router *Router) isDraining() bool {
select {
case <-router.drain:
Expand Down

0 comments on commit 3de7860

Please # to comment.