-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathstateful.rs
37 lines (34 loc) · 924 Bytes
/
stateful.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
use arcon::prelude::*;
#[arcon::proto]
#[derive(Arcon, Arrow, Copy, Clone)]
pub struct Event {
pub id: u64,
pub data: f32,
}
#[derive(ArconState)]
pub struct MyState<B: Backend> {
#[table = "events"]
events: EagerValue<Event, B>,
}
#[arcon::app]
fn main() {
(0..1000000)
.map(|x| Event { id: x, data: 1.5 })
.to_stream(|conf| {
conf.set_timestamp_extractor(|x: &Event| x.id);
})
.key_by(|event: &Event| &event.id)
.operator(OperatorBuilder {
operator: Arc::new(|| {
Map::stateful(|event, state: &mut MyState<_>| {
state.events().put(event)?;
Ok(event)
})
}),
state: Arc::new(|backend| MyState {
events: EagerValue::new("_events", backend),
}),
conf: Default::default(),
})
.ignore()
}