From 4bc8bb7e6edfa571f3129e3de9e5c55c086e4ff9 Mon Sep 17 00:00:00 2001 From: Maciej Winnicki Date: Mon, 14 Aug 2017 16:11:23 +0200 Subject: [PATCH] improve logging. Add missing logs. Better error reporting. Closes #265 (#285) --- router/event.go | 2 +- router/router.go | 184 +++++++++++++++++++---------------------------- 2 files changed, 73 insertions(+), 113 deletions(-) diff --git a/router/event.go b/router/event.go index a826a41..88e2024 100644 --- a/router/event.go +++ b/router/event.go @@ -83,6 +83,6 @@ func fromRequest(r *http.Request) (*Event, error) { } type event struct { - topics []subscriptions.TopicID + topic subscriptions.TopicID payload []byte } diff --git a/router/router.go b/router/router.go index 7168c8e..6c8588e 100644 --- a/router/router.go +++ b/router/router.go @@ -55,10 +55,10 @@ func (router *Router) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - if event.Event == eventHTTP { - router.handleHTTPEvent(event, w, r) + if event.Event == eventHTTP || event.Event == eventInvoke { + router.handleSyncEvent(event, w, r) } else if r.Method == http.MethodPost && r.URL.Path == "/" { - router.handleEvent(event, w, r) + router.handleAsyncEvent(event, w, r) } } @@ -159,7 +159,7 @@ var ( errUnableToLookUpRegisteredFunction = errors.New("unable to look up registered function") ) -func (router *Router) handleHTTPEvent(event *Event, w http.ResponseWriter, r *http.Request) { +func (router *Router) handleSyncEvent(event *Event, w http.ResponseWriter, r *http.Request) { payload, err := json.Marshal(event) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -168,81 +168,68 @@ func (router *Router) handleHTTPEvent(event *Event, w http.ResponseWriter, r *ht router.log.Debug("Event received.", zap.String("event", string(payload))) - endpointID := subscriptions.NewEndpointID(strings.ToUpper(r.Method), r.URL.EscapedPath()) - res, err := router.callEndpoint(endpointID, payload) + var resp []byte + var functionID functions.FunctionID + if event.Event == eventInvoke { + functionID = functions.FunctionID(r.Header.Get(headerFunctionID)) + } else if event.Event == eventHTTP { + endpointID := subscriptions.NewEndpointID(strings.ToUpper(r.Method), r.URL.EscapedPath()) + backingFunction := router.targetCache.BackingFunction(endpointID) + if backingFunction == nil { + router.log.Debug("Function not found for HTTP event.", zap.String("event", string(payload))) + http.Error(w, "Resource not found", http.StatusNotFound) + return + } + functionID = *backingFunction + } + + router.log.Debug("Function triggered.", + zap.String("functionId", string(functionID)), zap.String("event", string(payload))) + + resp, err = router.callFunction(functionID, payload) if err != nil { + router.log.Warn("Function invocation failed.", + zap.String("functionId", string(functionID)), zap.String("event", string(payload)), zap.Error(err)) + if err == errUnableToLookUpBackingFunction { http.Error(w, err.Error(), http.StatusNotFound) } else { http.Error(w, err.Error(), http.StatusInternalServerError) } + if _, ok := err.(*functions.ErrFunctionCallFailedProviderError); ok { + internal := NewEvent(internalFunctionProviderError, mimeJSON, struct { + FunctionID string `json:"functionId"` + }{string(functionID)}) + payload, err = json.Marshal(internal) + if err == nil { + router.enqueueWork(subscriptions.TopicID(internal.Event), payload) + } + } + return } - _, err = w.Write(res) + router.log.Debug("Function finished.", + zap.String("functionId", string(functionID)), zap.String("event", string(payload)), + zap.String("response", string(resp))) + + _, err = w.Write(resp) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } } -func (router *Router) handleEvent(instance *Event, w http.ResponseWriter, r *http.Request) { +func (router *Router) handleAsyncEvent(instance *Event, w http.ResponseWriter, r *http.Request) { payload, err := json.Marshal(instance) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - router.log.Debug("Event received.", zap.String("event", string(payload))) - - if instance.Event == eventInvoke { - functionID := functions.FunctionID(r.Header.Get(headerFunctionID)) - resp, err := router.callFunction(functionID, payload) - if err != nil { - if err == errUnableToLookUpRegisteredFunction { - http.Error(w, err.Error(), http.StatusNotFound) - } else { - http.Error(w, err.Error(), http.StatusInternalServerError) - } - return - } - - _, err = w.Write(resp) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - } else { - router.processEvent(event{ - topics: []subscriptions.TopicID{subscriptions.TopicID(instance.Event)}, - payload: payload, - }) - - w.WriteHeader(http.StatusAccepted) - } -} - -// callEndpoint determines which function to call when an endpoint is hit. -func (router *Router) callEndpoint(endpointID subscriptions.EndpointID, payload []byte) ([]byte, error) { - // Figure out what function we're targeting. - backingFunction := router.targetCache.BackingFunction(endpointID) - if backingFunction == nil { - return []byte{}, errUnableToLookUpBackingFunction - } - - router.log.Debug("Function triggered.", zap.String("functionId", string(*backingFunction)), zap.String("event", string(payload))) - - resp, err := router.callFunction(*backingFunction, payload) - if err != nil { - router.log.Warn("Function invocation failed.", zap.String("functionId", string(*backingFunction)), - zap.String("event", string(payload)), zap.Error(err)) - } else { - router.log.Debug("Function finished.", zap.String("functionId", string(*backingFunction)), - zap.String("event", string(payload)), zap.String("response", string(resp))) - } - - return resp, err + router.enqueueWork(subscriptions.TopicID(instance.Event), payload) + w.WriteHeader(http.StatusAccepted) } // callFunction looks up a function and calls it. @@ -267,32 +254,7 @@ func (router *Router) callFunction(backingFunctionID functions.FunctionID, paylo return []byte{}, errUnableToLookUpRegisteredFunction } - router.log.Debug("Function triggered.", zap.String("functionId", string(f.ID)), zap.String("event", string(payload))) - - resp, err := f.Call(payload) - if err != nil { - router.log.Warn("Function invocation failed.", zap.String("functionId", string(f.ID)), - zap.String("event", string(payload)), zap.Error(err)) - - if _, ok := err.(*functions.ErrFunctionCallFailedProviderError); ok { - internal := NewEvent(internalFunctionProviderError, mimeJSON, struct { - FunctionID string `json:"functionId"` - }{string(backingFunctionID)}) - payload, err = json.Marshal(internal) - if err == nil { - router.log.Debug("Event received.", zap.String("event", string(payload))) - router.processEvent(event{ - topics: []subscriptions.TopicID{subscriptions.TopicID(internal.Event)}, - payload: payload, - }) - } - } - } else { - router.log.Debug("Function finished.", zap.String("functionId", string(f.ID)), - zap.String("event", string(payload)), zap.String("response", string(resp))) - } - - return resp, err + return f.Call(payload) } // loop is the main loop for a pub/sub worker goroutine @@ -335,42 +297,40 @@ func (router *Router) loop() { // processEvent sends event to a set of topics, and for each of the functions that get called as part of those topics. func (router *Router) processEvent(e event) { - for _, topicID := range e.topics { - subscribers := router.targetCache.SubscribersOfTopic(topicID) - for _, subscriber := range subscribers { - router.log.Debug("Function triggered.", zap.String("functionId", string(subscriber)), zap.String("event", string(e.payload))) - - res, err := router.callFunction(subscriber, e.payload) - if err != nil { - router.log.Warn( - "Function invocation failed.", - zap.String("functionId", string(subscriber)), - zap.String("event", string(e.payload)), - zap.Error(err), - ) - } else { - router.log.Debug( - "Function finished.", - zap.String("functionId", string(subscriber)), - zap.String("event", string(e.payload)), - zap.String("response", string(res)), - ) + subscribers := router.targetCache.SubscribersOfTopic(e.topic) + for _, subscriber := range subscribers { + router.log.Debug("Function triggered.", + zap.String("functionId", string(subscriber)), zap.String("event", string(e.payload))) + + resp, err := router.callFunction(subscriber, e.payload) + + if err != nil { + router.log.Warn("Function invocation failed.", + zap.String("functionId", string(subscriber)), zap.String("event", string(e.payload)), zap.Error(err)) + + if _, ok := err.(*functions.ErrFunctionCallFailedProviderError); ok { + internal := NewEvent(internalFunctionProviderError, mimeJSON, struct { + FunctionID string `json:"functionId"` + }{string(subscriber)}) + payload, err := json.Marshal(internal) + if err == nil { + router.enqueueWork(subscriptions.TopicID(internal.Event), payload) + } } + } else { + router.log.Debug("Function finished.", + zap.String("functionId", string(subscriber)), zap.String("event", string(e.payload)), + zap.String("response", string(resp))) } } } -func (router *Router) enqueueWork(topicMap map[subscriptions.TopicID]struct{}, payload []byte) { - if len(topicMap) == 0 { - return - } - topics := []subscriptions.TopicID{} - for topic := range topicMap { - topics = append(topics, topic) - } +func (router *Router) enqueueWork(topic subscriptions.TopicID, payload []byte) { + router.log.Debug("Event received.", zap.String("event", string(payload))) + select { case router.work <- event{ - topics: topics, + topic: topic, payload: payload, }: default: