Skip to content

Commit

Permalink
refactor: compute routes on demand (#1735)
Browse files Browse the repository at this point in the history
* refactor: compute routes on demand

* fix: disable routes when they were updated

* refactor: improve code
  • Loading branch information
wyfo authored Jan 23, 2025
1 parent 0054617 commit d37fc80
Show file tree
Hide file tree
Showing 19 changed files with 202 additions and 731 deletions.
8 changes: 2 additions & 6 deletions zenoh/src/net/routing/dispatcher/face.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,12 +390,8 @@ impl Primitives for Face {
&mut |p, m| declares.push((p.clone(), m)),
);

// recompute routes
// TODO: disable routes and recompute them in parallel to avoid holding
// tables write lock for a long time.
let mut root_res = wtables.root_res.clone();
update_data_routes_from(&mut wtables, &mut root_res);
update_query_routes_from(&mut wtables, &mut root_res);
disable_all_data_routes(&mut wtables);
disable_all_query_routes(&mut wtables);

drop(wtables);
drop(ctrl_lock);
Expand Down
118 changes: 24 additions & 94 deletions zenoh/src/net/routing/dispatcher/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@ use zenoh_sync::get_mut_unchecked;

use super::{
face::FaceState,
resource::{DataRoutes, Direction, Resource},
resource::{Direction, Resource},
tables::{NodeId, Route, RoutingExpr, Tables, TablesLock},
};
#[zenoh_macros::unstable]
use crate::key_expr::KeyExpr;
use crate::net::routing::hat::{HatTrait, SendDeclare};
use crate::net::routing::{
hat::{HatTrait, SendDeclare},
router::get_or_set_route,
};

#[derive(Copy, Clone)]
pub(crate) struct SubscriberInfo;
Expand Down Expand Up @@ -93,18 +96,6 @@ pub(crate) fn declare_subscription(

disable_matches_data_routes(&mut wtables, &mut res);
drop(wtables);

let rtables = zread!(tables.tables);
let matches_data_routes = compute_matches_data_routes(&rtables, &res);
drop(rtables);

let wtables = zwrite!(tables.tables);
for (mut res, data_routes) in matches_data_routes {
get_mut_unchecked(&mut res)
.context_mut()
.update_data_routes(data_routes);
}
drop(wtables);
}
None => tracing::error!(
"{} Declare subscriber {} for unknown scope {}!",
Expand Down Expand Up @@ -157,18 +148,6 @@ pub(crate) fn undeclare_subscription(
{
tracing::debug!("{} Undeclare subscriber {} ({})", face, id, res.expr());
disable_matches_data_routes(&mut wtables, &mut res);
drop(wtables);

let rtables = zread!(tables.tables);
let matches_data_routes = compute_matches_data_routes(&rtables, &res);
drop(rtables);

let wtables = zwrite!(tables.tables);
for (mut res, data_routes) in matches_data_routes {
get_mut_unchecked(&mut res)
.context_mut()
.update_data_routes(data_routes);
}
Resource::clean(&mut res);
drop(wtables);
} else {
Expand All @@ -177,67 +156,17 @@ pub(crate) fn undeclare_subscription(
}
}

pub(crate) fn compute_data_routes(tables: &Tables, expr: &mut RoutingExpr) -> DataRoutes {
let mut routes = DataRoutes::default();
tables
.hat_code
.compute_data_routes(tables, &mut routes, expr);
routes
}

pub(crate) fn update_data_routes(tables: &Tables, res: &mut Arc<Resource>) {
if res.context.is_some() && !res.expr().contains('*') && res.has_subs() {
let mut res_mut = res.clone();
let res_mut = get_mut_unchecked(&mut res_mut);
tables.hat_code.compute_data_routes(
tables,
&mut res_mut.context_mut().data_routes,
&mut RoutingExpr::new(res, ""),
);
res_mut.context_mut().valid_data_routes = true;
}
}

pub(crate) fn update_data_routes_from(tables: &mut Tables, res: &mut Arc<Resource>) {
update_data_routes(tables, res);
let res = get_mut_unchecked(res);
for child in res.children.values_mut() {
update_data_routes_from(tables, child);
}
}

pub(crate) fn compute_matches_data_routes<'a>(
tables: &'a Tables,
res: &'a Arc<Resource>,
) -> Vec<(Arc<Resource>, DataRoutes)> {
let mut routes = vec![];
if res.context.is_some() {
if !res.expr().contains('*') && res.has_subs() {
let mut expr = RoutingExpr::new(res, "");
routes.push((res.clone(), compute_data_routes(tables, &mut expr)));
}
for match_ in &res.context().matches {
let match_ = match_.upgrade().unwrap();
if !Arc::ptr_eq(&match_, res) && !match_.expr().contains('*') && match_.has_subs() {
let mut expr = RoutingExpr::new(&match_, "");
let match_routes = compute_data_routes(tables, &mut expr);
routes.push((match_, match_routes));
}
pub(crate) fn disable_all_data_routes(tables: &mut Tables) {
pub(crate) fn disable_all_data_routes_rec(res: &mut Arc<Resource>) {
let res = get_mut_unchecked(res);
if let Some(ctx) = &mut res.context {
ctx.disable_data_routes();
}
}
routes
}

pub(crate) fn update_matches_data_routes<'a>(tables: &'a mut Tables, res: &'a mut Arc<Resource>) {
if res.context.is_some() {
update_data_routes(tables, res);
for match_ in &res.context().matches {
let mut match_ = match_.upgrade().unwrap();
if !Arc::ptr_eq(&match_, res) {
update_data_routes(tables, &mut match_);
}
for child in res.children.values_mut() {
disable_all_data_routes_rec(child);
}
}
disable_all_data_routes_rec(&mut tables.root_res)
}

pub(crate) fn disable_matches_data_routes(_tables: &mut Tables, res: &mut Arc<Resource>) {
Expand Down Expand Up @@ -298,16 +227,17 @@ fn get_data_route(
expr: &mut RoutingExpr,
routing_context: NodeId,
) -> Arc<Route> {
let local_context = tables
.hat_code
.map_routing_context(tables, face, routing_context);
res.as_ref()
.and_then(|res| res.data_route(face.whatami, local_context))
.unwrap_or_else(|| {
tables
.hat_code
.compute_data_route(tables, expr, local_context, face.whatami)
})
let hat = &tables.hat_code;
let local_context = hat.map_routing_context(tables, face, routing_context);
let mut compute_route = || hat.compute_data_route(tables, expr, local_context, face.whatami);
if let Some(data_routes) = res
.as_ref()
.and_then(|res| res.context.as_ref())
.map(|ctx| &ctx.data_routes)
{
return get_or_set_route(data_routes, face.whatami, local_context, compute_route);
}
compute_route()
}

#[zenoh_macros::unstable]
Expand Down
126 changes: 29 additions & 97 deletions zenoh/src/net/routing/dispatcher/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,15 @@ use zenoh_util::Timed;

use super::{
face::FaceState,
resource::{QueryRoute, QueryRoutes, QueryTargetQablSet, Resource},
resource::{QueryRoute, QueryTargetQablSet, Resource},
tables::{NodeId, RoutingExpr, Tables, TablesLock},
};
#[cfg(feature = "unstable")]
use crate::key_expr::KeyExpr;
use crate::net::routing::hat::{HatTrait, SendDeclare};
use crate::net::routing::{
hat::{HatTrait, SendDeclare},
router::get_or_set_route,
};

pub(crate) struct Query {
src_face: Arc<FaceState>,
Expand Down Expand Up @@ -120,18 +123,6 @@ pub(crate) fn declare_queryable(

disable_matches_query_routes(&mut wtables, &mut res);
drop(wtables);

let rtables = zread!(tables.tables);
let matches_query_routes = compute_matches_query_routes(&rtables, &res);
drop(rtables);

let wtables = zwrite!(tables.tables);
for (mut res, query_routes) in matches_query_routes {
get_mut_unchecked(&mut res)
.context_mut()
.update_query_routes(query_routes);
}
drop(wtables);
}
None => tracing::error!(
"{} Declare queryable {} for unknown scope {}!",
Expand Down Expand Up @@ -184,18 +175,6 @@ pub(crate) fn undeclare_queryable(
{
tracing::debug!("{} Undeclare queryable {} ({})", face, id, res.expr());
disable_matches_query_routes(&mut wtables, &mut res);
drop(wtables);

let rtables = zread!(tables.tables);
let matches_query_routes = compute_matches_query_routes(&rtables, &res);
drop(rtables);

let wtables = zwrite!(tables.tables);
for (mut res, query_routes) in matches_query_routes {
get_mut_unchecked(&mut res)
.context_mut()
.update_query_routes(query_routes);
}
Resource::clean(&mut res);
drop(wtables);
} else {
Expand All @@ -204,67 +183,6 @@ pub(crate) fn undeclare_queryable(
}
}

pub(crate) fn compute_query_routes(tables: &Tables, res: &Arc<Resource>) -> QueryRoutes {
let mut routes = QueryRoutes::default();
tables
.hat_code
.compute_query_routes(tables, &mut routes, &mut RoutingExpr::new(res, ""));
routes
}

pub(crate) fn update_query_routes(tables: &Tables, res: &Arc<Resource>) {
if res.context.is_some() && !res.expr().contains('*') && res.has_qabls() {
let mut res_mut = res.clone();
let res_mut = get_mut_unchecked(&mut res_mut);
tables.hat_code.compute_query_routes(
tables,
&mut res_mut.context_mut().query_routes,
&mut RoutingExpr::new(res, ""),
);
res_mut.context_mut().valid_query_routes = true;
}
}

pub(crate) fn update_query_routes_from(tables: &mut Tables, res: &mut Arc<Resource>) {
update_query_routes(tables, res);
let res = get_mut_unchecked(res);
for child in res.children.values_mut() {
update_query_routes_from(tables, child);
}
}

pub(crate) fn compute_matches_query_routes(
tables: &Tables,
res: &Arc<Resource>,
) -> Vec<(Arc<Resource>, QueryRoutes)> {
let mut routes = vec![];
if res.context.is_some() {
if !res.expr().contains('*') && res.has_qabls() {
routes.push((res.clone(), compute_query_routes(tables, res)));
}
for match_ in &res.context().matches {
let match_ = match_.upgrade().unwrap();
if !Arc::ptr_eq(&match_, res) && !match_.expr().contains('*') && match_.has_qabls() {
let match_routes = compute_query_routes(tables, &match_);
routes.push((match_, match_routes));
}
}
}
routes
}

pub(crate) fn update_matches_query_routes(tables: &Tables, res: &Arc<Resource>) {
if res.context.is_some() {
update_query_routes(tables, res);
for match_ in &res.context().matches {
let match_ = match_.upgrade().unwrap();
if !Arc::ptr_eq(&match_, res) {
update_query_routes(tables, &match_);
}
}
}
}

#[inline]
fn insert_pending_query(outface: &mut Arc<FaceState>, query: Arc<Query>) -> RequestId {
let outface_mut = get_mut_unchecked(outface);
Expand Down Expand Up @@ -417,6 +335,19 @@ impl Timed for QueryCleanup {
}
}

pub(crate) fn disable_all_query_routes(tables: &mut Tables) {
pub(crate) fn disable_all_query_routes_rec(res: &mut Arc<Resource>) {
let res = get_mut_unchecked(res);
if let Some(ctx) = &mut res.context {
ctx.disable_query_routes();
}
for child in res.children.values_mut() {
disable_all_query_routes_rec(child);
}
}
disable_all_query_routes_rec(&mut tables.root_res)
}

pub(crate) fn disable_matches_query_routes(_tables: &mut Tables, res: &mut Arc<Resource>) {
if res.context.is_some() {
get_mut_unchecked(res).context_mut().disable_query_routes();
Expand All @@ -439,16 +370,17 @@ fn get_query_route(
expr: &mut RoutingExpr,
routing_context: NodeId,
) -> Arc<QueryTargetQablSet> {
let local_context = tables
.hat_code
.map_routing_context(tables, face, routing_context);
res.as_ref()
.and_then(|res| res.query_route(face.whatami, local_context))
.unwrap_or_else(|| {
tables
.hat_code
.compute_query_route(tables, expr, local_context, face.whatami)
})
let hat = &tables.hat_code;
let local_context = hat.map_routing_context(tables, face, routing_context);
let mut compute_route = || hat.compute_query_route(tables, expr, local_context, face.whatami);
if let Some(query_routes) = res
.as_ref()
.and_then(|res| res.context.as_ref())
.map(|ctx| &ctx.query_routes)
{
return get_or_set_route(query_routes, face.whatami, local_context, compute_route);
}
compute_route()
}

#[cfg(feature = "stats")]
Expand Down
Loading

0 comments on commit d37fc80

Please # to comment.