Skip to content

Commit

Permalink
implemented dispatch with retry (#376)
Browse files Browse the repository at this point in the history
  • Loading branch information
astelmashenko authored Oct 28, 2022
1 parent 6a38645 commit b035fac
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 8 deletions.
25 changes: 21 additions & 4 deletions pkg/dispatcher/jetstream_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"sync/atomic"
"time"

"knative.dev/eventing/pkg/kncloudevents"

"go.opencensus.io/trace"
"knative.dev/eventing-natss/pkg/tracing"

Expand Down Expand Up @@ -317,9 +319,24 @@ func (s *jetSubscriptionsSupervisor) subscribe(ctx context.Context, channel even
}

var deadLetter *url.URL
if subscription.Delivery != nil && subscription.Delivery.DeadLetterSink != nil && !subscription.Delivery.DeadLetterSink.URI.IsEmpty() {
deadLetter = subscription.Delivery.DeadLetterSink.URI.URL()
s.logger.Debug("dispatch message", zap.String("deadLetter", deadLetter.String()))
retryConfig := kncloudevents.NoRetries()

if subscription.Delivery != nil {
// Extract The DeadLetterSink From The Subscriber.Delivery
if subscription.Delivery != nil && subscription.Delivery.DeadLetterSink != nil && !subscription.Delivery.DeadLetterSink.URI.IsEmpty() {
deadLetter = subscription.Delivery.DeadLetterSink.URI.URL()
s.logger.Debug("dispatch message", zap.String("deadLetter", deadLetter.String()))
}

// Extract The RetryConfig From The Subscriber.Delivery
var err error
retryConfig, err = kncloudevents.RetryConfigFromDeliverySpec(*subscription.Delivery)
if err != nil {
s.logger.Error("Failed To Parse RetryConfig From DeliverySpec - No Retries Will Occur", zap.Error(err))
} else {
s.logger.Info("Successfully Parsed RetryConfig From DeliverySpec", zap.Int("RetryMax", retryConfig.RetryMax))
retryConfig.CheckRetry = kncloudevents.SelectiveRetry // Specify Custom CheckRetry Function
}
}

event := tracing.ConvertNatsMsgToEvent(s.logger, stanMsg)
Expand All @@ -335,7 +352,7 @@ func (s *jetSubscriptionsSupervisor) subscribe(ctx context.Context, channel even
}
defer span.End()

executionInfo, err := s.dispatcher.DispatchMessage(ctx, message, additionalHeaders, destination, reply, deadLetter)
executionInfo, err := s.dispatcher.DispatchMessageWithRetries(ctx, message, additionalHeaders, destination, reply, deadLetter, &retryConfig)
if err != nil {
s.logger.Error("Failed to dispatch message: ", zap.Error(err))
return
Expand Down
23 changes: 19 additions & 4 deletions pkg/dispatcher/natss_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,24 @@ func (s *subscriptionsSupervisor) subscribe(ctx context.Context, channel eventin
}

var deadLetter *url.URL
if subscription.Delivery != nil && subscription.Delivery.DeadLetterSink != nil && !subscription.Delivery.DeadLetterSink.URI.IsEmpty() {
deadLetter = subscription.Delivery.DeadLetterSink.URI.URL()
s.logger.Debug("dispatch message", zap.String("deadLetter", deadLetter.String()))
retryConfig := kncloudevents.NoRetries()

if subscription.Delivery != nil {
// Extract The DeadLetterSink From The Subscriber.Delivery
if subscription.Delivery != nil && subscription.Delivery.DeadLetterSink != nil && !subscription.Delivery.DeadLetterSink.URI.IsEmpty() {
deadLetter = subscription.Delivery.DeadLetterSink.URI.URL()
s.logger.Debug("dispatch message", zap.String("deadLetter", deadLetter.String()))
}

// Extract The RetryConfig From The Subscriber.Delivery
var err error
retryConfig, err = kncloudevents.RetryConfigFromDeliverySpec(*subscription.Delivery)
if err != nil {
s.logger.Error("Failed To Parse RetryConfig From DeliverySpec - No Retries Will Occur", zap.Error(err))
} else {
s.logger.Info("Successfully Parsed RetryConfig From DeliverySpec", zap.Int("RetryMax", retryConfig.RetryMax))
retryConfig.CheckRetry = kncloudevents.SelectiveRetry // Specify Custom CheckRetry Function
}
}

event := tracing.ConvertNatssMsgToEvent(s.logger, stanMsg)
Expand All @@ -339,7 +354,7 @@ func (s *subscriptionsSupervisor) subscribe(ctx context.Context, channel eventin
}
defer span.End()

executionInfo, err := s.dispatcher.DispatchMessage(ctx, message, additionalHeaders, destination, reply, deadLetter)
executionInfo, err := s.dispatcher.DispatchMessageWithRetries(ctx, message, additionalHeaders, destination, reply, deadLetter, &retryConfig)
if err != nil {
s.logger.Error("Failed to dispatch message: ", zap.Error(err))
return
Expand Down

0 comments on commit b035fac

Please # to comment.