Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Fix Turbine making Port based decisions #7774

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions core/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,7 @@ fn retransmit(
continue;
}
if packet.meta.repair {
total_packets -= 1;
repair_total += 1;
continue;
}

let mut compute_turbine_peers = Measure::start("turbine_start");
Expand Down Expand Up @@ -111,7 +109,8 @@ fn retransmit(
let leader =
leader_schedule_cache.slot_leader_at(packet.meta.slot, Some(r_bank.as_ref()));
let mut retransmit_time = Measure::start("retransmit_to");
if !packet.meta.forward {
// If I am on the critical path for this packet, send it to everyone
if my_index % DATA_PLANE_FANOUT == 0 {
ClusterInfo::retransmit_to(&neighbors, packet, leader, sock, true)?;
ClusterInfo::retransmit_to(&children, packet, leader, sock, false)?;
} else {
Expand Down
17 changes: 7 additions & 10 deletions core/tests/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::sync::Arc;
use std::sync::Mutex;
use std::time::Instant;

type Nodes = HashMap<Pubkey, (bool, HashSet<i32>, Receiver<(i32, bool)>)>;
type Nodes = HashMap<Pubkey, (bool, HashSet<i32>, Receiver<i32>)>;

fn num_threads() -> usize {
sys_info::cpu_num().unwrap_or(10) as usize
Expand All @@ -29,11 +29,10 @@ fn find_insert_shred(id: &Pubkey, shred: i32, batches: &mut [Nodes]) {

fn retransmit(
mut shuffled_nodes: Vec<ContactInfo>,
senders: &HashMap<Pubkey, Sender<(i32, bool)>>,
senders: &HashMap<Pubkey, Sender<i32>>,
cluster: &ClusterInfo,
fanout: usize,
shred: i32,
retransmit: bool,
) -> i32 {
let mut seed = [0; 32];
let mut my_index = 0;
Expand All @@ -48,17 +47,18 @@ fn retransmit(
}
});
seed[0..4].copy_from_slice(&shred.to_le_bytes());
let retransmit = my_index % fanout == 0;
let shuffled_indices = (0..shuffled_nodes.len()).collect();
let (neighbors, children) = compute_retransmit_peers(fanout, my_index, shuffled_indices);
children.into_iter().for_each(|i| {
let s = senders.get(&shuffled_nodes[i].id).unwrap();
let _ = s.send((shred, retransmit));
let _ = s.send(shred);
});

if retransmit {
neighbors.into_iter().for_each(|i| {
let s = senders.get(&shuffled_nodes[i].id).unwrap();
let _ = s.send((shred, false));
let _ = s.send(shred);
});
}

Expand All @@ -79,8 +79,7 @@ fn run_simulation(stakes: &[u64], fanout: usize) {

// setup accounts for all nodes (leader has 0 bal)
let (s, r) = channel();
let senders: Arc<Mutex<HashMap<Pubkey, Sender<(i32, bool)>>>> =
Arc::new(Mutex::new(HashMap::new()));
let senders: Arc<Mutex<HashMap<Pubkey, Sender<i32>>>> = Arc::new(Mutex::new(HashMap::new()));
senders.lock().unwrap().insert(leader_info.id, s);
let mut batches: Vec<Nodes> = Vec::with_capacity(num_threads);
(0..num_threads).for_each(|_| batches.push(HashMap::new()));
Expand Down Expand Up @@ -159,7 +158,6 @@ fn run_simulation(stakes: &[u64], fanout: usize) {
&cluster,
fanout,
*i,
true,
);
});
*layer1_done = true;
Expand All @@ -169,15 +167,14 @@ fn run_simulation(stakes: &[u64], fanout: usize) {
if recv.len() < shreds_len {
loop {
match r.try_recv() {
Ok((data, retx)) => {
Ok(data) => {
if recv.insert(data) {
let _ = retransmit(
shuffled_peers[data as usize].clone(),
&senders,
&cluster,
fanout,
data,
retx,
);
}
if recv.len() == shreds_len {
Expand Down