Skip to content

Commit

Permalink
latest changes from mainstream MR (#382)
Browse files Browse the repository at this point in the history
  • Loading branch information
astelmashenko authored Jan 3, 2023
1 parent 2db82ee commit 6be61db
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 21 deletions.
111 changes: 97 additions & 14 deletions pkg/channel/jetstream/dispatcher/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@ package dispatcher
import (
"context"
"errors"
"net/http"
"sync"
"time"

"knative.dev/pkg/logging"

cejs "github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/protocol"
"github.com/nats-io/nats.go"
"go.opencensus.io/trace"
"go.uber.org/zap"
Expand Down Expand Up @@ -69,20 +74,81 @@ func (c *Consumer) Close() error {
}

func (c *Consumer) MsgHandler(msg *nats.Msg) {
if err := c.doHandle(msg); err != nil {
c.logger.Errorw("failed to handle message", zap.Error(err))
return
}
logger := c.logger.With(zap.String("msg_id", msg.Header.Get(nats.MsgIdHdr)))
ctx := logging.WithLogger(c.ctx, logger)
tickerCtx, tickerCancel := context.WithCancel(c.ctx)

tickerDone := make(chan struct{})

go func() {
defer close(tickerDone)

// TODO(dan-j): this should be a fraction of the Consumer's AckWait
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
select {
case <-tickerCtx.Done():
logger.Debugw("ticker: context done", "ctx_err", tickerCtx.Err())

return
case <-ticker.C:
logger.Debugw("ticker: sending +WPI to JetStream", "ctx_err", tickerCtx.Err())

if err := msg.InProgress(nats.Context(tickerCtx)); err != nil && !errors.Is(err, context.Canceled) {
logging.FromContext(ctx).Errorw("failed to mark message as in progress", zap.Error(err))
}
}
}
}()

go func() {
var result protocol.Result

// wrap the handler in a local function so that the tickerCtx is cancelled even if a panic occurs.
func() {
defer tickerCancel()
result = c.doHandle(ctx, msg)
}()

// wait for the ticker to stop to prevent attempts to mark the message as in progress after it has been acked
// or nacked
<-tickerDone

switch {
case protocol.IsACK(result):
if err := msg.Ack(nats.Context(ctx)); err != nil {
logger.Errorw("failed to Ack message after successful delivery to subscriber", zap.Error(err))
}
case protocol.IsNACK(result):
if err := msg.Nak(nats.Context(ctx)); err != nil {
logger.Errorw("failed to Nack message after failed delivery to subscriber", zap.Error(err))
}
default:
if err := msg.Term(nats.Context(ctx)); err != nil {
logger.Errorw("failed to Term message after failed delivery to subscriber", zap.Error(err))
}
}
}()
}

if err := msg.Ack(); err != nil {
c.logger.Errorw("failed to Ack message after successful delivery to subscriber", zap.Error(err))
return
// doHandle forwards the received event to the subscriber, the return has three outcomes:
// - Ack (includes `nil`): the event was successfully delivered to the subscriber
// - Nack: the event was not delivered to the subscriber, but it can be retried
// - any other error: the event should be terminated and not retried
func (c *Consumer) doHandle(ctx context.Context, msg *nats.Msg) protocol.Result {
logger := logging.FromContext(ctx)

if logger.Desugar().Core().Enabled(zap.DebugLevel) {
var debugKVs []interface{}
if meta, err := msg.Metadata(); err == nil {
debugKVs = append(debugKVs, zap.Any("msg_metadata", meta))
}

logger.Debugw("received message from JetStream consumer", debugKVs...)
}
}

func (c *Consumer) doHandle(msg *nats.Msg) error {
logger := c.logger.With(zap.String("msg_id", msg.Header.Get(nats.MsgIdHdr)))
logger.Debugw("received message from JetStream consumer")
message := cejs.NewMessage(msg)
if message.ReadEncoding() == binding.EncodingUnknown {
return errors.New("received a message with unknown encoding")
Expand All @@ -104,7 +170,7 @@ func (c *Consumer) doHandle(msg *nats.Msg) error {
te := kncloudevents.TypeExtractorTransformer("")

dispatchExecutionInfo, err := c.dispatcher.DispatchMessageWithRetries(
c.ctx,
ctx,
message,
additionalHeaders,
c.sub.Subscriber,
Expand All @@ -120,6 +186,23 @@ func (c *Consumer) doHandle(msg *nats.Msg) error {
}
_ = fanout.ParseDispatchResultAndReportMetrics(fanout.NewDispatchResult(err, dispatchExecutionInfo), c.reporter, args)

logger.Debugw("message forward to downstream subscriber")
return err
if err != nil {
logger.Errorw("failed to forward message to downstream subscriber",
zap.Error(err),
zap.Any("dispatch_resp_code", dispatchExecutionInfo.ResponseCode))

code := dispatchExecutionInfo.ResponseCode
if code/100 == 5 || code == http.StatusTooManyRequests || code == http.StatusRequestTimeout {
// tell JSM to redeliver the message later
return protocol.NewReceipt(false, "%w", err)
}

// let knative decide what to do with the message, if it wraps an Ack/Nack then that is what will happen,
// otherwise we will Terminate the message
return err
}

logger.Debug("message forwarded to downstream subscriber")

return protocol.ResultACK
}
3 changes: 2 additions & 1 deletion pkg/channel/jetstream/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ func (d *Dispatcher) subscribe(ctx context.Context, config ChannelConfig, sub Su
ctx: ctx,
}

consumer.jsSub, err = d.js.QueueSubscribe(info.Config.DeliverSubject, info.Config.DeliverGroup, consumer.MsgHandler, nats.Bind(info.Stream, info.Name))
consumer.jsSub, err = d.js.QueueSubscribe(info.Config.DeliverSubject, info.Config.DeliverGroup, consumer.MsgHandler,
nats.Bind(info.Stream, info.Name), nats.ManualAck())
if err != nil {
logger.Errorw("failed to create queue subscription for consumer")
return SubscriberStatusTypeError, err
Expand Down
26 changes: 20 additions & 6 deletions pkg/channel/jetstream/dispatcher/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,18 @@ import (
"fmt"
"time"

"knative.dev/eventing-natss/pkg/client/clientset/versioned"
commonerr "knative.dev/eventing-natss/pkg/common/error"

"github.com/nats-io/nats.go"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"knative.dev/eventing-natss/pkg/client/clientset/versioned"
commonerr "knative.dev/eventing-natss/pkg/common/error"
v1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/channel"
"knative.dev/eventing/pkg/channel/fanout"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/pkg/apis/duck"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
Expand All @@ -44,10 +46,9 @@ import (
const (
// Name of the corev1.Events emitted from the reconciliation process.

ReasonJetstreamStreamCreated = "JetstreamStreamCreated"
ReasonJetstreamStreamFailed = "JetstreamStreamFailed"
ReasonJetstreamConsumerCreated = "JetstreamConsumerCreated"
ReasonJetstreamConsumerFailed = "JetstreamConsumerFailed"
ReasonJetstreamStreamCreated = "JetstreamStreamCreated"
ReasonJetstreamStreamFailed = "JetstreamStreamFailed"
ReasonJetstreamConsumerFailed = "JetstreamConsumerFailed"
)

// Reconciler reconciles incoming NatsJetstreamChannel CRDs by ensuring the following states:
Expand Down Expand Up @@ -287,6 +288,19 @@ func (r *Reconciler) newConfigFromChannel(nc *v1alpha1.NatsJetStreamChannel) Cha
for i, source := range nc.Spec.SubscribableSpec.Subscribers {
innerSub, _ := fanout.SubscriberSpecToFanoutConfig(source)

// This functionality cannot be configured via the Subscription CRD. The default implementation is
// kncloudevents.RetryIfGreaterThan300 which is not ideal behaviour since we will retry on bad requests.
// This may be improved in the future with a more specific implementation, but consumers which wish the
// event be redelivered should respond with a 429 Too Many Requests code.
//
// We could leverage JetStream's ability to redeliver messages to a consumer, but this would require
// not using DispatchMessageWithRetries, and translate the subscription's delivery configuration into
// the JetStream ConsumerConfig. We would then use dispatcher.Consumer's MsgHandler function to handle
// whether to ack, nack or term the message.
if innerSub.RetryConfig != nil {
innerSub.RetryConfig.CheckRetry = kncloudevents.SelectiveRetry
}

newSubs[i] = Subscription{
Subscription: *innerSub,
UID: source.UID,
Expand Down

0 comments on commit 6be61db

Please # to comment.