Skip to content
This repository has been archived by the owner on Jun 4, 2024. It is now read-only.

Commit

Permalink
flip pages early
Browse files Browse the repository at this point in the history
  • Loading branch information
tigrato committed May 14, 2024
1 parent b043444 commit b413446
Showing 1 changed file with 7 additions and 2 deletions.
9 changes: 7 additions & 2 deletions event-handler/teleport_events_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func (t *TeleportEventsWatcher) fetch(ctx context.Context) error {

// Page is empty: do nothing, return
if len(t.batch) == 0 {
t.pos = 0
return nil
}

Expand Down Expand Up @@ -236,6 +237,10 @@ func (t *TeleportEventsWatcher) getEvents(ctx context.Context) (string, error) {
}

func (t *TeleportEventsWatcher) canSkipToNextWindow(i int, rangeSplitByDay []time.Time, cursor string) bool {
if cursor != "" {
return false

}
if len(t.batch) == 0 && i < len(rangeSplitByDay)-1 {
log.Infof("No events found for the range %v to %v", rangeSplitByDay[i-1], rangeSplitByDay[i])
return true
Expand All @@ -251,7 +256,7 @@ func (t *TeleportEventsWatcher) canSkipToNextWindow(i int, rangeSplitByDay []tim
}
}

if cursor == "" && i < len(rangeSplitByDay)-1 && pos >= len(t.batch) {
if i < len(rangeSplitByDay)-1 && pos >= len(t.batch) {
log.WithField("pos", pos).WithField("len", len(t.batch)).Infof("No new events found for the range %v to %v", rangeSplitByDay[i-1], rangeSplitByDay[i])
return true
}
Expand Down Expand Up @@ -315,7 +320,7 @@ func (t *TeleportEventsWatcher) Events(ctx context.Context) (chan *TeleportEvent
}

// If there is still nothing, sleep
if len(t.batch) == 0 {
if len(t.batch) == 0 && t.nextCursor == "" {
if t.config.ExitOnLastEvent {
log.Info("All events are processed, exiting...")
break
Expand Down

0 comments on commit b413446

Please # to comment.