Skip to content

Commit

Permalink
Add new ModuleRuntime::list_with_details (#503)
Browse files Browse the repository at this point in the history
Some of the existing code used `ModuleRuntime::list` to list all modules,
then called `Module::runtime_state` for each of them. This can fail if a
module is deleted after it's returned in the `list` result and before
its `runtime_state` is called.

The new `list_with_details` API filters out those modules whose
`runtime_state` fails with `NotFound`.
  • Loading branch information
arsing authored Nov 1, 2018
1 parent 49badb3 commit 645545a
Show file tree
Hide file tree
Showing 10 changed files with 384 additions and 117 deletions.
65 changes: 39 additions & 26 deletions edgelet/edgelet-core/src/authorization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use error::Error;
use futures::future::Either;
use futures::{future, Future};
use futures::{future, Future, Stream};
use module::{Module, ModuleRuntime};
use pid::Pid;

Expand Down Expand Up @@ -54,30 +54,23 @@ where
) -> impl Future<Item = bool, Error = Error> {
name.map_or_else(
|| Either::A(future::ok(false)),
|name| {
Either::B(
self.runtime
.list()
.map_err(|e| e.into())
.and_then(move |list| {
list.iter()
.filter_map(|m| if m.name() == name { Some(m) } else { None })
.nth(0)
.map(|m| {
Either::A(m.runtime_state().map_err(|e| e.into()).and_then(
move |rs| {
let authorized = rs.pid() == &pid;
if !authorized {
info!("Request not authorized - expected caller pid: {}, actual caller pid: {}", rs.pid(), pid);
}
Ok(authorized)
},
))
})
.unwrap_or_else(|| Either::B(future::ok(false)))
}),
)
},
|name| Either::B(
self.runtime
.list_with_details()
.map_err(|e| e.into())
.filter_map(move |(m, rs)| if m.name() == name { Some(rs) } else { None })
.into_future()
.then(move |result| match result {
Ok((Some(rs), _)) => {
let authorized = rs.pid() == &pid;
if !authorized {
info!("Request not authorized - expected caller pid: {}, actual caller pid: {}", rs.pid(), pid);
}
Ok(authorized)
},
Ok((None, _)) => Ok(false),
Err((err, _)) => Err(err),
})),
)
}

Expand All @@ -97,9 +90,9 @@ mod tests {
use super::*;
use error::{Error, ErrorKind};
use failure::Context;
use futures::future;
use futures::future::FutureResult;
use futures::stream::Empty;
use futures::{future, stream};
use module::{
LogOptions, Module, ModuleRegistry, ModuleRuntimeState, ModuleSpec,
SystemInfo as CoreSystemInfo,
Expand Down Expand Up @@ -282,6 +275,12 @@ mod tests {
};
}

macro_rules! notimpl_error_stream {
() => {
stream::once(Err(Error::new(Context::new(ErrorKind::ModuleRuntime))))
};
}

impl Module for TestModule {
type Config = TestConfig;
type Error = Error;
Expand Down Expand Up @@ -363,6 +362,8 @@ mod tests {
type CreateFuture = FutureResult<(), Self::Error>;
type InitFuture = FutureResult<(), Self::Error>;
type ListFuture = FutureResult<Vec<Self::Module>, Self::Error>;
type ListWithDetailsStream =
Box<Stream<Item = (Self::Module, ModuleRuntimeState), Error = Self::Error> + Send>;
type LogsFuture = FutureResult<Self::Logs, Self::Error>;
type RemoveFuture = FutureResult<(), Self::Error>;
type RestartFuture = FutureResult<(), Self::Error>;
Expand Down Expand Up @@ -406,6 +407,18 @@ mod tests {
}
}

fn list_with_details(&self) -> Self::ListWithDetailsStream {
match self.behavior {
TestModuleListBehavior::Default => Box::new(stream::futures_unordered(
self.modules
.clone()
.into_iter()
.map(|m| m.runtime_state().map(|rs| (m, rs))),
)),
TestModuleListBehavior::FailList => Box::new(notimpl_error_stream!()),
}
}

fn logs(&self, _id: &str, _options: &LogOptions) -> Self::LogsFuture {
notimpl_error!()
}
Expand Down
5 changes: 5 additions & 0 deletions edgelet/edgelet-core/src/module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,10 @@ pub trait ModuleRuntime {
type CreateFuture: Future<Item = (), Error = Self::Error> + Send;
type InitFuture: Future<Item = (), Error = Self::Error> + Send;
type ListFuture: Future<Item = Vec<Self::Module>, Error = Self::Error> + Send;
type ListWithDetailsStream: Stream<
Item = (Self::Module, ModuleRuntimeState),
Error = Self::Error,
> + Send;
type LogsFuture: Future<Item = Self::Logs, Error = Self::Error> + Send;
type RemoveFuture: Future<Item = (), Error = Self::Error> + Send;
type RestartFuture: Future<Item = (), Error = Self::Error> + Send;
Expand All @@ -371,6 +375,7 @@ pub trait ModuleRuntime {
fn remove(&self, id: &str) -> Self::RemoveFuture;
fn system_info(&self) -> Self::SystemInfoFuture;
fn list(&self) -> Self::ListFuture;
fn list_with_details(&self) -> Self::ListWithDetailsStream;
fn logs(&self, id: &str, options: &LogOptions) -> Self::LogsFuture;
fn registry(&self) -> &Self::ModuleRegistry;
fn remove_all(&self) -> Self::RemoveAllFuture;
Expand Down
Loading

0 comments on commit 645545a

Please # to comment.