From 4b5c8327f5d5c1ca80009f86f176374a0a3cf583 Mon Sep 17 00:00:00 2001 From: qupeng Date: Mon, 9 Sep 2024 19:13:09 +0800 Subject: [PATCH] This is an automated cherry-pick of #11577 Signed-off-by: ti-chi-bot --- .../engine/pebble/event_sorter.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/cdc/processor/sourcemanager/engine/pebble/event_sorter.go b/cdc/processor/sourcemanager/engine/pebble/event_sorter.go index d664593ea66..8fc6f822066 100644 --- a/cdc/processor/sourcemanager/engine/pebble/event_sorter.go +++ b/cdc/processor/sourcemanager/engine/pebble/event_sorter.go @@ -319,6 +319,7 @@ func (s *EventSorter) SlotsAndHasher() (slotCount int, hasher func(tablepb.Span, // Next implements sorter.EventIterator. func (s *EventIter) Next() (event *model.PolymorphicEvent, pos engine.Position, err error) { valid := s.iter != nil && s.iter.Valid() +<<<<<<< HEAD:cdc/processor/sourcemanager/engine/pebble/event_sorter.go var value []byte for valid { nextStart := time.Now() @@ -327,10 +328,28 @@ func (s *EventIter) Next() (event *model.PolymorphicEvent, pos engine.Position, event = &model.PolymorphicEvent{} if _, err = s.serde.Unmarshal(event, value); err != nil { +======= + var nextEvent *model.PolymorphicEvent + + // We need to decide whether the current event is the last event in this transactions + // If the current event is the last one, we need to set txnFinished + // Thus, we need to fetch the next event and compare the commitTs and startTs with it + for valid { + nextEvent = &model.PolymorphicEvent{} + if _, err = s.serde.Unmarshal(nextEvent, s.iter.Value()); err != nil { +>>>>>>> 50659c4b5d (sorter: correct wrong metric about next duration (#11577)):cdc/processor/sourcemanager/sorter/pebble/event_sorter.go return } + + nextStart := time.Now() valid = s.iter.Next() +<<<<<<< HEAD:cdc/processor/sourcemanager/engine/pebble/event_sorter.go if s.headItem != nil { +======= + s.nextDuration.Observe(time.Since(nextStart).Seconds()) + + if s.currentEvent != nil { +>>>>>>> 50659c4b5d (sorter: correct wrong metric about next duration (#11577)):cdc/processor/sourcemanager/sorter/pebble/event_sorter.go break } s.headItem, event = event, nil