diff --git a/charts/bytewax/examples/wikistream.py b/charts/bytewax/examples/wikistream.py index 3b825ea..00d6ee4 100644 --- a/charts/bytewax/examples/wikistream.py +++ b/charts/bytewax/examples/wikistream.py @@ -3,14 +3,14 @@ from typing import List, Optional, Tuple import bytewax.operators as op -import bytewax.operators.window as win +import bytewax.operators.windowing as win # pip install aiohttp-sse-client from aiohttp_sse_client.client import EventSource from bytewax.connectors.stdio import StdOutSink from bytewax.dataflow import Dataflow from bytewax.inputs import FixedPartitionedSource, StatefulSourcePartition, batch_async -from bytewax.operators.window import SystemClockConfig, TumblingWindow, WindowMetadata +from bytewax.operators.windowing import SystemClock, TumblingWindower async def _sse_agen(url): @@ -53,19 +53,19 @@ def get_server_name(data_dict): server_counts = win.count_window( "count", inp, - SystemClockConfig(), - TumblingWindow( + SystemClock(), + TumblingWindower( length=timedelta(seconds=2), align_to=datetime(2023, 1, 1, tzinfo=timezone.utc) ), get_server_name, ) -# ("server.name", count_per_window) +# ("server.name", (window_id, count_per_window)) def keep_max( - max_count: Optional[int], new_window_count: Tuple[WindowMetadata, int] + max_count: Optional[int], id_count: Tuple[int, int] ) -> Tuple[Optional[int], int]: - _metadata, new_count = new_window_count + _win_id, new_count = id_count if max_count is None: new_max = new_count else: @@ -74,7 +74,7 @@ def keep_max( return (new_max, new_max) -max_count_per_window = op.stateful_map("keep_max", server_counts, keep_max) +max_count_per_window = op.stateful_map("keep_max", server_counts.down, keep_max) # ("server.name", max_per_window) @@ -84,4 +84,4 @@ def format_nice(name_max): out = op.map("format", max_count_per_window, format_nice) -op.output("out", out, StdOutSink()) +op.output("out", out, StdOutSink()) \ No newline at end of file