-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathwindow.rs
30 lines (29 loc) · 936 Bytes
/
window.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
fn window_sum(buffer: &[u64]) -> u64 {
buffer.iter().sum()
}
#[arcon::app]
fn main() {
(0u64..100000)
.to_stream(|conf| {
conf.set_arcon_time(ArconTime::Event);
conf.set_timestamp_extractor(|x: &u64| *x);
})
.operator(OperatorBuilder {
operator: Arc::new(|| {
let conf = WindowConf {
assigner: Assigner::Sliding {
length: Time::seconds(1000),
slide: Time::seconds(500),
late_arrival: Time::seconds(0),
},
};
WindowAssigner::new(conf)
}),
state: Arc::new(|backend| {
let index = AppenderWindow::new(backend.clone(), &window_sum);
WindowState::new(index, backend)
}),
conf: OperatorConf::default(),
})
.print()
}