Skip to content

Commit 1af5a84

Browse files
povimichaelsproul
authored andcommittedDec 11, 2024
Fix subscribing to attestation subnets for aggregating (sigp#6681)
* Prevent scheduled subnet subscriptions from being overwritten by other subscriptions from same subnet with additional scoping by slot
1 parent b2590bc commit 1af5a84

File tree

2 files changed

+55
-7
lines changed

2 files changed

+55
-7
lines changed
 

‎beacon_node/network/src/subnet_service/mod.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ pub struct SubnetService<T: BeaconChainTypes> {
8686
subscriptions: HashSetDelay<Subnet>,
8787

8888
/// Subscriptions that need to be executed in the future.
89-
scheduled_subscriptions: HashSetDelay<Subnet>,
89+
scheduled_subscriptions: HashSetDelay<ExactSubnet>,
9090

9191
/// A list of permanent subnets that this node is subscribed to.
9292
// TODO: Shift this to a dynamic bitfield
@@ -485,7 +485,7 @@ impl<T: BeaconChainTypes> SubnetService<T> {
485485
} else {
486486
// This is a future slot, schedule subscribing.
487487
self.scheduled_subscriptions
488-
.insert_at(subnet, time_to_subscription_start);
488+
.insert_at(ExactSubnet { subnet, slot }, time_to_subscription_start);
489489
}
490490

491491
Ok(())
@@ -626,7 +626,8 @@ impl<T: BeaconChainTypes> Stream for SubnetService<T> {
626626
// Process scheduled subscriptions that might be ready, since those can extend a soon to
627627
// expire subscription.
628628
match self.scheduled_subscriptions.poll_next_unpin(cx) {
629-
Poll::Ready(Some(Ok(subnet))) => {
629+
Poll::Ready(Some(Ok(exact_subnet))) => {
630+
let ExactSubnet { subnet, .. } = exact_subnet;
630631
let current_slot = self.beacon_chain.slot_clock.now().unwrap_or_default();
631632
if let Err(e) = self.subscribe_to_subnet_immediately(subnet, current_slot + 1) {
632633
debug!(self.log, "Failed to subscribe to short lived subnet"; "subnet" => ?subnet, "err" => e);

‎beacon_node/network/src/subnet_service/tests/mod.rs

+51-4
Original file line numberDiff line numberDiff line change
@@ -500,12 +500,15 @@ mod test {
500500
// subscription config
501501
let committee_count = 1;
502502

503-
// Makes 2 validator subscriptions to the same subnet but at different slots.
504-
// There should be just 1 unsubscription event for the later slot subscription (subscription_slot2).
503+
// Makes 3 validator subscriptions to the same subnet but at different slots.
504+
// There should be just 1 unsubscription event for each of the later slots subscriptions
505+
// (subscription_slot2 and subscription_slot3).
505506
let subscription_slot1 = 0;
506507
let subscription_slot2 = MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD + 4;
508+
let subscription_slot3 = subscription_slot2 * 2;
507509
let com1 = MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD + 4;
508510
let com2 = 0;
511+
let com3 = CHAIN.chain.spec.attestation_subnet_count - com1;
509512

510513
// create the attestation service and subscriptions
511514
let mut subnet_service = get_subnet_service();
@@ -532,6 +535,13 @@ mod test {
532535
true,
533536
);
534537

538+
let sub3 = get_subscription(
539+
com3,
540+
current_slot + Slot::new(subscription_slot3),
541+
committee_count,
542+
true,
543+
);
544+
535545
let subnet_id1 = SubnetId::compute_subnet::<MainnetEthSpec>(
536546
current_slot + Slot::new(subscription_slot1),
537547
com1,
@@ -548,12 +558,23 @@ mod test {
548558
)
549559
.unwrap();
550560

561+
let subnet_id3 = SubnetId::compute_subnet::<MainnetEthSpec>(
562+
current_slot + Slot::new(subscription_slot3),
563+
com3,
564+
committee_count,
565+
&subnet_service.beacon_chain.spec,
566+
)
567+
.unwrap();
568+
551569
// Assert that subscriptions are different but their subnet is the same
552570
assert_ne!(sub1, sub2);
571+
assert_ne!(sub1, sub3);
572+
assert_ne!(sub2, sub3);
553573
assert_eq!(subnet_id1, subnet_id2);
574+
assert_eq!(subnet_id1, subnet_id3);
554575

555576
// submit the subscriptions
556-
subnet_service.validator_subscriptions(vec![sub1, sub2].into_iter());
577+
subnet_service.validator_subscriptions(vec![sub1, sub2, sub3].into_iter());
557578

558579
// Unsubscription event should happen at the end of the slot.
559580
// We wait for 2 slots, to avoid timeout issues
@@ -590,10 +611,36 @@ mod test {
590611
// If the permanent and short lived subnets are different, we should get an unsubscription event.
591612
if !subnet_service.is_subscribed(&Subnet::Attestation(subnet_id1)) {
592613
assert_eq!(
593-
[expected_subscription, expected_unsubscription],
614+
[
615+
expected_subscription.clone(),
616+
expected_unsubscription.clone(),
617+
],
594618
second_subscribe_event[..]
595619
);
596620
}
621+
622+
let subscription_slot = current_slot + subscription_slot3 - 1;
623+
624+
let wait_slots = subnet_service
625+
.beacon_chain
626+
.slot_clock
627+
.duration_to_slot(subscription_slot)
628+
.unwrap()
629+
.as_millis() as u64
630+
/ SLOT_DURATION_MILLIS;
631+
632+
let no_events = dbg!(get_events(&mut subnet_service, None, wait_slots as u32).await);
633+
634+
assert_eq!(no_events, []);
635+
636+
let third_subscribe_event = get_events(&mut subnet_service, None, 2).await;
637+
638+
if !subnet_service.is_subscribed(&Subnet::Attestation(subnet_id1)) {
639+
assert_eq!(
640+
[expected_subscription, expected_unsubscription],
641+
third_subscribe_event[..]
642+
);
643+
}
597644
}
598645

599646
#[tokio::test]

0 commit comments

Comments
 (0)