Skip to content

Commit

Permalink
Fix lateness warnings in Windows (#700)
Browse files Browse the repository at this point in the history
  • Loading branch information
daniil-quix authored Jan 7, 2025
1 parent b94d203 commit 4585edb
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 5 deletions.
7 changes: 4 additions & 3 deletions quixstreams/dataframe/windows/sliding.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ def process_window(
# are still eligible for processing.
state_ts = state.get_latest_timestamp() or 0
latest_timestamp = max(timestamp_ms, state_ts)
max_expired_window_start = latest_timestamp - duration - grace - 1
max_expired_window_end = latest_timestamp - grace - 1
max_expired_window_start = max_expired_window_end - duration
max_deleted_window_start = max_expired_window_start - duration

left_start = max(0, timestamp_ms - duration)
Expand Down Expand Up @@ -119,7 +120,7 @@ def process_window(
self._log_expired_window(
window=[start, end],
timestamp_ms=timestamp_ms,
late_by_ms=max_expired_window_start + 1 - timestamp_ms,
late_by_ms=max_expired_window_end + 1 - timestamp_ms,
)

elif end == left_end:
Expand Down Expand Up @@ -153,7 +154,7 @@ def process_window(
self._log_expired_window(
window=[start, end],
timestamp_ms=timestamp_ms,
late_by_ms=max_expired_window_start + 1 - timestamp_ms,
late_by_ms=max_expired_window_end + 1 - timestamp_ms,
)
break

Expand Down
5 changes: 3 additions & 2 deletions quixstreams/dataframe/windows/time_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,15 @@ def process_window(

state_ts = state.get_latest_timestamp() or 0
latest_timestamp = max(timestamp_ms, state_ts)
max_expired_window_start = latest_timestamp - duration_ms - grace_ms
max_expired_window_end = latest_timestamp - grace_ms
max_expired_window_start = max_expired_window_end - duration_ms
updated_windows: list[WindowResult] = []
for start, end in ranges:
if start <= max_expired_window_start:
self._log_expired_window(
window=[start, end],
timestamp_ms=timestamp_ms,
late_by_ms=max_expired_window_start + 1 - timestamp_ms,
late_by_ms=max_expired_window_end - timestamp_ms,
)
continue

Expand Down

0 comments on commit 4585edb

Please # to comment.