Skip to content

Commit 4cb7e31

Browse files
committed
Stream produced by watch_object will include an item when the object isn't in any initial list
fixes kube-rs#1576 Signed-off-by: Mark Ingram <mark@lincs.dev>
1 parent 63644d2 commit 4cb7e31

File tree

1 file changed

+31
-13
lines changed

1 file changed

+31
-13
lines changed

kube-runtime/src/watcher.rs

+31-13
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use kube_client::{
1414
Api, Error as ClientErr,
1515
};
1616
use serde::de::DeserializeOwned;
17-
use std::{clone::Clone, collections::VecDeque, fmt::Debug, time::Duration};
17+
use std::{clone::Clone, collections::VecDeque, fmt::Debug, future, time::Duration};
1818
use thiserror::Error;
1919
use tracing::{debug, error, warn};
2020

@@ -844,18 +844,36 @@ pub fn watch_object<K: Resource + Clone + DeserializeOwned + Debug + Send + 'sta
844844
// filtering by object name in given scope, so there's at most one matching object
845845
// footgun: Api::all may generate events from namespaced objects with the same name in different namespaces
846846
let fields = format!("metadata.name={name}");
847-
watcher(api, Config::default().fields(&fields)).filter_map(|event| async {
848-
match event {
849-
// Pass up `Some` for Found / Updated
850-
Ok(Event::Apply(obj) | Event::InitApply(obj)) => Some(Ok(Some(obj))),
851-
// Pass up `None` for Deleted
852-
Ok(Event::Delete(_)) => Some(Ok(None)),
853-
// Ignore marker events
854-
Ok(Event::Init | Event::InitDone) => None,
855-
// Bubble up errors
856-
Err(err) => Some(Err(err)),
857-
}
858-
})
847+
watcher(api, Config::default().fields(&fields))
848+
// track whether the object was seen in each initial listing
849+
.scan(false, |obj_seen, event| {
850+
if matches!(event, Ok(Event::Init)) {
851+
*obj_seen = false;
852+
} else if matches!(event, Ok(Event::InitApply(_))) {
853+
*obj_seen = true;
854+
}
855+
future::ready(Some((*obj_seen, event)))
856+
})
857+
.filter_map(|(obj_seen, event)| async move {
858+
match event {
859+
// Pass up `Some` for Found / Updated
860+
Ok(Event::Apply(obj)) | Ok(Event::InitApply(obj)) => Some(Ok(Some(obj))),
861+
// Pass up `None` for Deleted
862+
Ok(Event::Delete(_)) => Some(Ok(None)),
863+
// Ignore marker event
864+
Ok(Event::Init) => None,
865+
// Pass up `None` if the object wasn't seen in any initial list
866+
Ok(Event::InitDone) => {
867+
if obj_seen {
868+
None
869+
} else {
870+
Some(Ok(None))
871+
}
872+
}
873+
// Bubble up errors
874+
Err(err) => Some(Err(err)),
875+
}
876+
})
859877
}
860878

861879
/// Default watcher backoff inspired by Kubernetes' client-go.

0 commit comments

Comments
 (0)