Skip to content
This repository was archived by the owner on Dec 9, 2024. It is now read-only.

Commit

Permalink
improve logging. Add missing logs. Better error reporting. Closes #265 (
Browse files Browse the repository at this point in the history
  • Loading branch information
mthenw authored Aug 14, 2017
1 parent f3fea77 commit 4bc8bb7
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 113 deletions.
2 changes: 1 addition & 1 deletion router/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,6 @@ func fromRequest(r *http.Request) (*Event, error) {
}

type event struct {
topics []subscriptions.TopicID
topic subscriptions.TopicID
payload []byte
}
184 changes: 72 additions & 112 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ func (router *Router) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

if event.Event == eventHTTP {
router.handleHTTPEvent(event, w, r)
if event.Event == eventHTTP || event.Event == eventInvoke {
router.handleSyncEvent(event, w, r)
} else if r.Method == http.MethodPost && r.URL.Path == "/" {
router.handleEvent(event, w, r)
router.handleAsyncEvent(event, w, r)
}
}

Expand Down Expand Up @@ -159,7 +159,7 @@ var (
errUnableToLookUpRegisteredFunction = errors.New("unable to look up registered function")
)

func (router *Router) handleHTTPEvent(event *Event, w http.ResponseWriter, r *http.Request) {
func (router *Router) handleSyncEvent(event *Event, w http.ResponseWriter, r *http.Request) {
payload, err := json.Marshal(event)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand All @@ -168,81 +168,68 @@ func (router *Router) handleHTTPEvent(event *Event, w http.ResponseWriter, r *ht

router.log.Debug("Event received.", zap.String("event", string(payload)))

endpointID := subscriptions.NewEndpointID(strings.ToUpper(r.Method), r.URL.EscapedPath())
res, err := router.callEndpoint(endpointID, payload)
var resp []byte
var functionID functions.FunctionID
if event.Event == eventInvoke {
functionID = functions.FunctionID(r.Header.Get(headerFunctionID))
} else if event.Event == eventHTTP {
endpointID := subscriptions.NewEndpointID(strings.ToUpper(r.Method), r.URL.EscapedPath())
backingFunction := router.targetCache.BackingFunction(endpointID)
if backingFunction == nil {
router.log.Debug("Function not found for HTTP event.", zap.String("event", string(payload)))
http.Error(w, "Resource not found", http.StatusNotFound)
return
}
functionID = *backingFunction
}

router.log.Debug("Function triggered.",
zap.String("functionId", string(functionID)), zap.String("event", string(payload)))

resp, err = router.callFunction(functionID, payload)
if err != nil {
router.log.Warn("Function invocation failed.",
zap.String("functionId", string(functionID)), zap.String("event", string(payload)), zap.Error(err))

if err == errUnableToLookUpBackingFunction {
http.Error(w, err.Error(), http.StatusNotFound)
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
}

if _, ok := err.(*functions.ErrFunctionCallFailedProviderError); ok {
internal := NewEvent(internalFunctionProviderError, mimeJSON, struct {
FunctionID string `json:"functionId"`
}{string(functionID)})
payload, err = json.Marshal(internal)
if err == nil {
router.enqueueWork(subscriptions.TopicID(internal.Event), payload)
}
}

return
}

_, err = w.Write(res)
router.log.Debug("Function finished.",
zap.String("functionId", string(functionID)), zap.String("event", string(payload)),
zap.String("response", string(resp)))

_, err = w.Write(resp)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}

func (router *Router) handleEvent(instance *Event, w http.ResponseWriter, r *http.Request) {
func (router *Router) handleAsyncEvent(instance *Event, w http.ResponseWriter, r *http.Request) {
payload, err := json.Marshal(instance)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

router.log.Debug("Event received.", zap.String("event", string(payload)))

if instance.Event == eventInvoke {
functionID := functions.FunctionID(r.Header.Get(headerFunctionID))
resp, err := router.callFunction(functionID, payload)
if err != nil {
if err == errUnableToLookUpRegisteredFunction {
http.Error(w, err.Error(), http.StatusNotFound)
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
return
}

_, err = w.Write(resp)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
} else {
router.processEvent(event{
topics: []subscriptions.TopicID{subscriptions.TopicID(instance.Event)},
payload: payload,
})

w.WriteHeader(http.StatusAccepted)
}
}

// callEndpoint determines which function to call when an endpoint is hit.
func (router *Router) callEndpoint(endpointID subscriptions.EndpointID, payload []byte) ([]byte, error) {
// Figure out what function we're targeting.
backingFunction := router.targetCache.BackingFunction(endpointID)
if backingFunction == nil {
return []byte{}, errUnableToLookUpBackingFunction
}

router.log.Debug("Function triggered.", zap.String("functionId", string(*backingFunction)), zap.String("event", string(payload)))

resp, err := router.callFunction(*backingFunction, payload)
if err != nil {
router.log.Warn("Function invocation failed.", zap.String("functionId", string(*backingFunction)),
zap.String("event", string(payload)), zap.Error(err))
} else {
router.log.Debug("Function finished.", zap.String("functionId", string(*backingFunction)),
zap.String("event", string(payload)), zap.String("response", string(resp)))
}

return resp, err
router.enqueueWork(subscriptions.TopicID(instance.Event), payload)
w.WriteHeader(http.StatusAccepted)
}

// callFunction looks up a function and calls it.
Expand All @@ -267,32 +254,7 @@ func (router *Router) callFunction(backingFunctionID functions.FunctionID, paylo
return []byte{}, errUnableToLookUpRegisteredFunction
}

router.log.Debug("Function triggered.", zap.String("functionId", string(f.ID)), zap.String("event", string(payload)))

resp, err := f.Call(payload)
if err != nil {
router.log.Warn("Function invocation failed.", zap.String("functionId", string(f.ID)),
zap.String("event", string(payload)), zap.Error(err))

if _, ok := err.(*functions.ErrFunctionCallFailedProviderError); ok {
internal := NewEvent(internalFunctionProviderError, mimeJSON, struct {
FunctionID string `json:"functionId"`
}{string(backingFunctionID)})
payload, err = json.Marshal(internal)
if err == nil {
router.log.Debug("Event received.", zap.String("event", string(payload)))
router.processEvent(event{
topics: []subscriptions.TopicID{subscriptions.TopicID(internal.Event)},
payload: payload,
})
}
}
} else {
router.log.Debug("Function finished.", zap.String("functionId", string(f.ID)),
zap.String("event", string(payload)), zap.String("response", string(resp)))
}

return resp, err
return f.Call(payload)
}

// loop is the main loop for a pub/sub worker goroutine
Expand Down Expand Up @@ -335,42 +297,40 @@ func (router *Router) loop() {

// processEvent sends event to a set of topics, and for each of the functions that get called as part of those topics.
func (router *Router) processEvent(e event) {
for _, topicID := range e.topics {
subscribers := router.targetCache.SubscribersOfTopic(topicID)
for _, subscriber := range subscribers {
router.log.Debug("Function triggered.", zap.String("functionId", string(subscriber)), zap.String("event", string(e.payload)))

res, err := router.callFunction(subscriber, e.payload)
if err != nil {
router.log.Warn(
"Function invocation failed.",
zap.String("functionId", string(subscriber)),
zap.String("event", string(e.payload)),
zap.Error(err),
)
} else {
router.log.Debug(
"Function finished.",
zap.String("functionId", string(subscriber)),
zap.String("event", string(e.payload)),
zap.String("response", string(res)),
)
subscribers := router.targetCache.SubscribersOfTopic(e.topic)
for _, subscriber := range subscribers {
router.log.Debug("Function triggered.",
zap.String("functionId", string(subscriber)), zap.String("event", string(e.payload)))

resp, err := router.callFunction(subscriber, e.payload)

if err != nil {
router.log.Warn("Function invocation failed.",
zap.String("functionId", string(subscriber)), zap.String("event", string(e.payload)), zap.Error(err))

if _, ok := err.(*functions.ErrFunctionCallFailedProviderError); ok {
internal := NewEvent(internalFunctionProviderError, mimeJSON, struct {
FunctionID string `json:"functionId"`
}{string(subscriber)})
payload, err := json.Marshal(internal)
if err == nil {
router.enqueueWork(subscriptions.TopicID(internal.Event), payload)
}
}
} else {
router.log.Debug("Function finished.",
zap.String("functionId", string(subscriber)), zap.String("event", string(e.payload)),
zap.String("response", string(resp)))
}
}
}

func (router *Router) enqueueWork(topicMap map[subscriptions.TopicID]struct{}, payload []byte) {
if len(topicMap) == 0 {
return
}
topics := []subscriptions.TopicID{}
for topic := range topicMap {
topics = append(topics, topic)
}
func (router *Router) enqueueWork(topic subscriptions.TopicID, payload []byte) {
router.log.Debug("Event received.", zap.String("event", string(payload)))

select {
case router.work <- event{
topics: topics,
topic: topic,
payload: payload,
}:
default:
Expand Down

0 comments on commit 4bc8bb7

Please # to comment.