Skip to content

Commit

Permalink
setup aggregation number before connecting children
Browse files Browse the repository at this point in the history
  • Loading branch information
sokra committed Jan 15, 2025
1 parent 64c5f96 commit b36ada1
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 63 deletions.
81 changes: 66 additions & 15 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ pub use self::{operation::AnyOperation, storage::TaskDataCategory};
use crate::{
backend::{
operation::{
connect_children, get_aggregation_number, is_root_node, AggregatedDataUpdate,
AggregationUpdateJob, AggregationUpdateQueue, CleanupOldEdgesOperation,
ConnectChildOperation, ExecuteContext, ExecuteContextImpl, Operation, OutdatedEdge,
TaskDirtyCause, TaskGuard,
connect_children, get_aggregation_number, is_root_node, prepare_new_children,
AggregatedDataUpdate, AggregationUpdateJob, AggregationUpdateQueue,
CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
Operation, OutdatedEdge, TaskDirtyCause, TaskGuard,
},
persisted_storage_log::PersistedStorageLog,
storage::{get, get_many, get_mut, get_mut_or_insert_with, iter_many, remove, Storage},
Expand Down Expand Up @@ -1253,17 +1253,22 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
let mut removed_data = Vec::new();
let mut old_edges = Vec::new();

// Connect children
{
for old_child in iter_many!(task, Child { task } => task) {
if !new_children.remove(&old_child) {
old_edges.push(OutdatedEdge::Child(old_child));
}
}
// Prepare all new children
prepare_new_children(task_id, &mut task, &new_children, &mut queue);

let active_count =
get!(task, Activeness).map_or(0, |activeness| activeness.active_counter);
connect_children(task_id, &mut task, new_children, &mut queue, active_count);
// Filter actual new children
let mut kept_children = Vec::new();
for old_child in iter_many!(task, Child { task } => task) {
if !new_children.remove(&old_child) {
old_edges.push(OutdatedEdge::Child(old_child));
} else {
kept_children.push(old_child);
}
}
if !kept_children.is_empty() {
queue.push(AggregationUpdateJob::DecreaseActiveCounts {
task_ids: kept_children,
});
}

// Remove no longer existing cells and notify in progress cells
Expand Down Expand Up @@ -1323,7 +1328,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
drop(task);

{
let _span = tracing::trace_span!("CleanupOldEdgesOperation").entered();
let _span = tracing::trace_span!("remove old edges and prepare new children").entered();
// Remove outdated edges first, before removing in_progress+dirty flag.
// We need to make sure all outdated edges are removed before the task can potentially
// be scheduled and executed again
Expand All @@ -1334,6 +1339,52 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
// suspend in `CleanupOldEdgesOperation`), but that's ok as the task is still dirty and
// would be executed again.

let mut task = ctx.task(task_id, TaskDataCategory::All);
let Some(in_progress) = get!(task, InProgress) else {
panic!("Task execution completed, but task is not in progress: {task:#?}");
};
let InProgressState::InProgress(box InProgressStateInner { stale, .. }) = in_progress
else {
panic!("Task execution completed, but task is not in progress: {task:#?}");
};

// If the task is stale, reschedule it
if *stale {
let Some(InProgressState::InProgress(box InProgressStateInner {
done_event,
new_children,
..
})) = remove!(task, InProgress)
else {
unreachable!();
};
task.add_new(CachedDataItem::InProgress {
value: InProgressState::Scheduled { done_event },
});

// All `new_children` are currently hold active with an active count and we need to undo
// that.
AggregationUpdateQueue::run(
AggregationUpdateJob::DecreaseActiveCounts {
task_ids: new_children.into_iter().collect(),
},
&mut ctx,
);
return true;
}

let mut queue = AggregationUpdateQueue::new();

let active_count = get!(task, Activeness).map_or(0, |activeness| activeness.active_counter);
connect_children(task_id, &mut task, new_children, &mut queue, active_count);

drop(task);

{
let _span = tracing::trace_span!("connect new children").entered();
queue.execute(&mut ctx);
}

let mut task = ctx.task(task_id, TaskDataCategory::All);
let Some(in_progress) = remove!(task, InProgress) else {
panic!("Task execution completed, but task is not in progress: {task:#?}");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,14 @@
use std::{cmp::max, num::NonZeroU32};

use rustc_hash::FxHashSet;
use turbo_tasks::TaskId;

use crate::{
backend::{
get,
operation::{
get_uppers, is_aggregating_node, is_root_node, AggregationUpdateJob,
AggregationUpdateQueue, TaskGuard,
},
backend::operation::{
get_aggregation_number, get_uppers, is_aggregating_node, AggregationUpdateJob,
AggregationUpdateQueue, TaskGuard,
},
data::CachedDataItem,
};

const AGGREGATION_NUMBER_BUFFER_SPACE: u32 = 3;

pub fn connect_children(
parent_task_id: TaskId,
parent_task: &mut impl TaskGuard,
Expand All @@ -26,45 +19,8 @@ pub fn connect_children(
if new_children.is_empty() {
return;
}
let children_count = new_children.len();

// Compute future parent aggregation number based on the number of children
let current_parent_aggregation = get!(parent_task, AggregationNumber)
.copied()
.unwrap_or_default();
let (parent_aggregation, future_parent_aggregation) =
if is_root_node(current_parent_aggregation.base) {
(u32::MAX, u32::MAX)
} else {
let target_distance = children_count.ilog2() * 2;
if target_distance > current_parent_aggregation.distance {
queue.push(AggregationUpdateJob::UpdateAggregationNumber {
task_id: parent_task_id,
base_aggregation_number: 0,
distance: NonZeroU32::new(target_distance),
})
}
(
current_parent_aggregation.effective,
current_parent_aggregation
.base
.saturating_add(max(target_distance, current_parent_aggregation.distance)),
)
};

// When the parent is a leaf node, we need to increase the aggregation number of the children to
// be counting from the parent's aggregation number.
if !is_aggregating_node(future_parent_aggregation) {
let child_base_aggregation_number =
future_parent_aggregation + AGGREGATION_NUMBER_BUFFER_SPACE;
for &new_child in new_children.iter() {
queue.push(AggregationUpdateJob::UpdateAggregationNumber {
task_id: new_child,
base_aggregation_number: child_base_aggregation_number,
distance: None,
});
}
};
let parent_aggregation = get_aggregation_number(parent_task);

for &new_child in new_children.iter() {
parent_task.add_new(CachedDataItem::Child {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod cleanup_old_edges;
mod connect_child;
mod connect_children;
mod invalidate;
mod prepare_new_children;
mod update_cell;
mod update_collectible;
mod update_output;
Expand Down Expand Up @@ -751,6 +752,7 @@ pub use self::{
cleanup_old_edges::OutdatedEdge,
connect_children::connect_children,
invalidate::TaskDirtyCause,
prepare_new_children::prepare_new_children,
update_cell::UpdateCellOperation,
update_collectible::UpdateCollectibleOperation,
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use std::{cmp::max, num::NonZeroU32};

use rustc_hash::FxHashSet;
use turbo_tasks::TaskId;

use crate::backend::{
get,
operation::{
is_aggregating_node, is_root_node, AggregationUpdateJob, AggregationUpdateQueue, TaskGuard,
},
};

const AGGREGATION_NUMBER_BUFFER_SPACE: u32 = 3;

pub fn prepare_new_children(
parent_task_id: TaskId,
parent_task: &mut impl TaskGuard,
new_children: &FxHashSet<TaskId>,
queue: &mut AggregationUpdateQueue,
) {
if new_children.is_empty() {
return;
}
let children_count = new_children.len();

// Compute future parent aggregation number based on the number of children
let current_parent_aggregation = get!(parent_task, AggregationNumber)
.copied()
.unwrap_or_default();
let future_parent_aggregation = if is_root_node(current_parent_aggregation.base) {
u32::MAX
} else {
let target_distance = children_count.ilog2() * 2;
if target_distance > current_parent_aggregation.distance {
queue.push(AggregationUpdateJob::UpdateAggregationNumber {
task_id: parent_task_id,
base_aggregation_number: 0,
distance: NonZeroU32::new(target_distance),
})
}
current_parent_aggregation
.base
.saturating_add(max(target_distance, current_parent_aggregation.distance))
};

// When the parent is a leaf node, we need to increase the aggregation number of the children to
// be counting from the parent's aggregation number.
if !is_aggregating_node(future_parent_aggregation) {
let child_base_aggregation_number =
future_parent_aggregation + AGGREGATION_NUMBER_BUFFER_SPACE;
for &new_child in new_children.iter() {
queue.push(AggregationUpdateJob::UpdateAggregationNumber {
task_id: new_child,
base_aggregation_number: child_base_aggregation_number,
distance: None,
});
}
};
}

0 comments on commit b36ada1

Please # to comment.