Skip to content

Commit

Permalink
Updating wikistream example to 0.20.0 API
Browse files Browse the repository at this point in the history
  • Loading branch information
miccioest committed May 24, 2024
1 parent c0a615a commit 0cbd2b4
Showing 1 changed file with 9 additions and 9 deletions.
18 changes: 9 additions & 9 deletions charts/bytewax/examples/wikistream.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
from typing import List, Optional, Tuple

import bytewax.operators as op
import bytewax.operators.window as win
import bytewax.operators.windowing as win

# pip install aiohttp-sse-client
from aiohttp_sse_client.client import EventSource
from bytewax.connectors.stdio import StdOutSink
from bytewax.dataflow import Dataflow
from bytewax.inputs import FixedPartitionedSource, StatefulSourcePartition, batch_async
from bytewax.operators.window import SystemClockConfig, TumblingWindow, WindowMetadata
from bytewax.operators.windowing import SystemClock, TumblingWindower


async def _sse_agen(url):
Expand Down Expand Up @@ -53,19 +53,19 @@ def get_server_name(data_dict):
server_counts = win.count_window(
"count",
inp,
SystemClockConfig(),
TumblingWindow(
SystemClock(),
TumblingWindower(
length=timedelta(seconds=2), align_to=datetime(2023, 1, 1, tzinfo=timezone.utc)
),
get_server_name,
)
# ("server.name", count_per_window)
# ("server.name", (window_id, count_per_window))


def keep_max(
max_count: Optional[int], new_window_count: Tuple[WindowMetadata, int]
max_count: Optional[int], id_count: Tuple[int, int]
) -> Tuple[Optional[int], int]:
_metadata, new_count = new_window_count
_win_id, new_count = id_count
if max_count is None:
new_max = new_count
else:
Expand All @@ -74,7 +74,7 @@ def keep_max(
return (new_max, new_max)


max_count_per_window = op.stateful_map("keep_max", server_counts, keep_max)
max_count_per_window = op.stateful_map("keep_max", server_counts.down, keep_max)
# ("server.name", max_per_window)


Expand All @@ -84,4 +84,4 @@ def format_nice(name_max):


out = op.map("format", max_count_per_window, format_nice)
op.output("out", out, StdOutSink())
op.output("out", out, StdOutSink())

0 comments on commit 0cbd2b4

Please # to comment.