diff --git a/cdc/processor/sourcemanager/engine/pebble/event_sorter.go b/cdc/processor/sourcemanager/engine/pebble/event_sorter.go index d664593ea66..8fc6f822066 100644 --- a/cdc/processor/sourcemanager/engine/pebble/event_sorter.go +++ b/cdc/processor/sourcemanager/engine/pebble/event_sorter.go @@ -319,6 +319,7 @@ func (s *EventSorter) SlotsAndHasher() (slotCount int, hasher func(tablepb.Span, // Next implements sorter.EventIterator. func (s *EventIter) Next() (event *model.PolymorphicEvent, pos engine.Position, err error) { valid := s.iter != nil && s.iter.Valid() +<<<<<<< HEAD:cdc/processor/sourcemanager/engine/pebble/event_sorter.go var value []byte for valid { nextStart := time.Now() @@ -327,10 +328,28 @@ func (s *EventIter) Next() (event *model.PolymorphicEvent, pos engine.Position, event = &model.PolymorphicEvent{} if _, err = s.serde.Unmarshal(event, value); err != nil { +======= + var nextEvent *model.PolymorphicEvent + + // We need to decide whether the current event is the last event in this transactions + // If the current event is the last one, we need to set txnFinished + // Thus, we need to fetch the next event and compare the commitTs and startTs with it + for valid { + nextEvent = &model.PolymorphicEvent{} + if _, err = s.serde.Unmarshal(nextEvent, s.iter.Value()); err != nil { +>>>>>>> 50659c4b5d (sorter: correct wrong metric about next duration (#11577)):cdc/processor/sourcemanager/sorter/pebble/event_sorter.go return } + + nextStart := time.Now() valid = s.iter.Next() +<<<<<<< HEAD:cdc/processor/sourcemanager/engine/pebble/event_sorter.go if s.headItem != nil { +======= + s.nextDuration.Observe(time.Since(nextStart).Seconds()) + + if s.currentEvent != nil { +>>>>>>> 50659c4b5d (sorter: correct wrong metric about next duration (#11577)):cdc/processor/sourcemanager/sorter/pebble/event_sorter.go break } s.headItem, event = event, nil