-
-
Notifications
You must be signed in to change notification settings - Fork 332
/
Copy patherrorbounded_configmap_watcher.rs
53 lines (49 loc) · 1.5 KB
/
errorbounded_configmap_watcher.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
use futures::prelude::*;
use k8s_openapi::api::core::v1::ConfigMap;
use kube::{
api::{Api, ObjectMeta},
core::DeserializeGuard,
runtime::{reflector::ObjectRef, watcher, WatchStreamExt},
Client, Resource,
};
use serde::Deserialize;
use tracing::*;
// Variant of ConfigMap that only accepts ConfigMaps with a CA certificate
// to demonstrate parsing failure
#[derive(Resource, Deserialize, Debug, Clone)]
#[resource(inherit = ConfigMap)]
struct CaConfigMap {
metadata: ObjectMeta,
data: CaConfigMapData,
}
#[derive(Deserialize, Debug, Clone)]
struct CaConfigMapData {
#[serde(rename = "ca.crt")]
ca_crt: String,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let client = Client::try_default().await?;
let api = Api::<DeserializeGuard<CaConfigMap>>::default_namespaced(client);
let use_watchlist = std::env::var("WATCHLIST").map(|s| s == "1").unwrap_or(false);
let wc = if use_watchlist {
// requires WatchList feature gate on 1.27 or later
watcher::Config::default().streaming_lists()
} else {
watcher::Config::default()
};
watcher(api, wc)
.applied_objects()
.default_backoff()
.try_for_each(|cm| async move {
info!("saw {}", ObjectRef::from_obj(&cm));
match cm.0 {
Ok(cm) => info!("contents: {cm:?}"),
Err(err) => warn!("failed to parse: {err}"),
}
Ok(())
})
.await?;
Ok(())
}