|
@@ -1,10 +1,10 @@
|
|
|
use {
|
|
use {
|
|
|
crate::{broadcast_stage::BroadcastStage, retransmit_stage::RetransmitStage},
|
|
crate::{broadcast_stage::BroadcastStage, retransmit_stage::RetransmitStage},
|
|
|
- agave_feature_set as feature_set,
|
|
|
|
|
|
|
+ agave_feature_set::{self as feature_set},
|
|
|
itertools::Either,
|
|
itertools::Either,
|
|
|
lazy_lru::LruCache,
|
|
lazy_lru::LruCache,
|
|
|
- rand::{seq::SliceRandom, Rng, SeedableRng},
|
|
|
|
|
- rand_chacha::ChaChaRng,
|
|
|
|
|
|
|
+ rand::{seq::SliceRandom, Rng, RngCore, SeedableRng},
|
|
|
|
|
+ rand_chacha::{ChaCha8Rng, ChaChaRng},
|
|
|
solana_clock::{Epoch, Slot},
|
|
solana_clock::{Epoch, Slot},
|
|
|
solana_cluster_type::ClusterType,
|
|
solana_cluster_type::ClusterType,
|
|
|
solana_gossip::{
|
|
solana_gossip::{
|
|
@@ -85,6 +85,7 @@ pub struct ClusterNodes<T> {
|
|
|
// Reverse index from nodes pubkey to their index in self.nodes.
|
|
// Reverse index from nodes pubkey to their index in self.nodes.
|
|
|
index: HashMap<Pubkey, /*index:*/ usize>,
|
|
index: HashMap<Pubkey, /*index:*/ usize>,
|
|
|
weighted_shuffle: WeightedShuffle</*stake:*/ u64>,
|
|
weighted_shuffle: WeightedShuffle</*stake:*/ u64>,
|
|
|
|
|
+ use_cha_cha_8: bool,
|
|
|
_phantom: PhantomData<T>,
|
|
_phantom: PhantomData<T>,
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -200,17 +201,67 @@ impl<T> ClusterNodes<T> {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+/// Encapsulates the possible RNG implementations for turbine.
|
|
|
|
|
+/// This was implemented for the transition from ChaCha20 to ChaCha8.
|
|
|
|
|
+enum TurbineRng {
|
|
|
|
|
+ Legacy(ChaChaRng),
|
|
|
|
|
+ ChaCha8(ChaCha8Rng),
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+impl TurbineRng {
|
|
|
|
|
+ /// Create a new seeded TurbineRng of the correct implementation
|
|
|
|
|
+ fn new_seeded(leader: &Pubkey, shred: &ShredId, use_cha_cha_8: bool) -> Self {
|
|
|
|
|
+ let seed = shred.seed(leader);
|
|
|
|
|
+ if use_cha_cha_8 {
|
|
|
|
|
+ TurbineRng::ChaCha8(ChaCha8Rng::from_seed(seed))
|
|
|
|
|
+ } else {
|
|
|
|
|
+ TurbineRng::Legacy(ChaChaRng::from_seed(seed))
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+impl RngCore for TurbineRng {
|
|
|
|
|
+ fn next_u32(&mut self) -> u32 {
|
|
|
|
|
+ match self {
|
|
|
|
|
+ TurbineRng::Legacy(cha_cha20_rng) => cha_cha20_rng.next_u32(),
|
|
|
|
|
+ TurbineRng::ChaCha8(cha_cha8_rng) => cha_cha8_rng.next_u32(),
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ fn next_u64(&mut self) -> u64 {
|
|
|
|
|
+ match self {
|
|
|
|
|
+ TurbineRng::Legacy(cha_cha20_rng) => cha_cha20_rng.next_u64(),
|
|
|
|
|
+ TurbineRng::ChaCha8(cha_cha8_rng) => cha_cha8_rng.next_u64(),
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ fn fill_bytes(&mut self, dest: &mut [u8]) {
|
|
|
|
|
+ match self {
|
|
|
|
|
+ TurbineRng::Legacy(cha_cha20_rng) => cha_cha20_rng.fill_bytes(dest),
|
|
|
|
|
+ TurbineRng::ChaCha8(cha_cha8_rng) => cha_cha8_rng.fill_bytes(dest),
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
|
|
|
|
|
+ match self {
|
|
|
|
|
+ TurbineRng::Legacy(cha_cha20_rng) => cha_cha20_rng.try_fill_bytes(dest),
|
|
|
|
|
+ TurbineRng::ChaCha8(cha_cha8_rng) => cha_cha8_rng.try_fill_bytes(dest),
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
impl ClusterNodes<BroadcastStage> {
|
|
impl ClusterNodes<BroadcastStage> {
|
|
|
pub fn new(
|
|
pub fn new(
|
|
|
cluster_info: &ClusterInfo,
|
|
cluster_info: &ClusterInfo,
|
|
|
cluster_type: ClusterType,
|
|
cluster_type: ClusterType,
|
|
|
stakes: &HashMap<Pubkey, u64>,
|
|
stakes: &HashMap<Pubkey, u64>,
|
|
|
|
|
+ use_cha_cha_8: bool,
|
|
|
) -> Self {
|
|
) -> Self {
|
|
|
- new_cluster_nodes(cluster_info, cluster_type, stakes)
|
|
|
|
|
|
|
+ new_cluster_nodes(cluster_info, cluster_type, stakes, use_cha_cha_8)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
pub(crate) fn get_broadcast_peer(&self, shred: &ShredId) -> Option<&ContactInfo> {
|
|
pub(crate) fn get_broadcast_peer(&self, shred: &ShredId) -> Option<&ContactInfo> {
|
|
|
- let mut rng = get_seeded_rng(/*leader:*/ &self.pubkey, shred);
|
|
|
|
|
|
|
+ let mut rng = TurbineRng::new_seeded(&self.pubkey, shred, self.use_cha_cha_8);
|
|
|
let index = self.weighted_shuffle.first(&mut rng)?;
|
|
let index = self.weighted_shuffle.first(&mut rng)?;
|
|
|
self.nodes[index].contact_info()
|
|
self.nodes[index].contact_info()
|
|
|
}
|
|
}
|
|
@@ -236,7 +287,7 @@ impl ClusterNodes<RetransmitStage> {
|
|
|
if let Some(index) = self.index.get(slot_leader) {
|
|
if let Some(index) = self.index.get(slot_leader) {
|
|
|
weighted_shuffle.remove_index(*index);
|
|
weighted_shuffle.remove_index(*index);
|
|
|
}
|
|
}
|
|
|
- let mut rng = get_seeded_rng(slot_leader, shred);
|
|
|
|
|
|
|
+ let mut rng = TurbineRng::new_seeded(slot_leader, shred, self.use_cha_cha_8);
|
|
|
let (index, peers) = get_retransmit_peers(
|
|
let (index, peers) = get_retransmit_peers(
|
|
|
fanout,
|
|
fanout,
|
|
|
|k| self.nodes[k].pubkey() == &self.pubkey,
|
|
|k| self.nodes[k].pubkey() == &self.pubkey,
|
|
@@ -284,7 +335,8 @@ impl ClusterNodes<RetransmitStage> {
|
|
|
if let Some(index) = self.index.get(leader).copied() {
|
|
if let Some(index) = self.index.get(leader).copied() {
|
|
|
weighted_shuffle.remove_index(index);
|
|
weighted_shuffle.remove_index(index);
|
|
|
}
|
|
}
|
|
|
- let mut rng = get_seeded_rng(leader, shred);
|
|
|
|
|
|
|
+
|
|
|
|
|
+ let mut rng = TurbineRng::new_seeded(leader, shred, self.use_cha_cha_8);
|
|
|
// Only need shuffled nodes until this node itself.
|
|
// Only need shuffled nodes until this node itself.
|
|
|
let nodes: Vec<_> = weighted_shuffle
|
|
let nodes: Vec<_> = weighted_shuffle
|
|
|
.shuffle(&mut rng)
|
|
.shuffle(&mut rng)
|
|
@@ -300,6 +352,7 @@ pub fn new_cluster_nodes<T: 'static>(
|
|
|
cluster_info: &ClusterInfo,
|
|
cluster_info: &ClusterInfo,
|
|
|
cluster_type: ClusterType,
|
|
cluster_type: ClusterType,
|
|
|
stakes: &HashMap<Pubkey, u64>,
|
|
stakes: &HashMap<Pubkey, u64>,
|
|
|
|
|
+ use_cha_cha_8: bool,
|
|
|
) -> ClusterNodes<T> {
|
|
) -> ClusterNodes<T> {
|
|
|
let self_pubkey = cluster_info.id();
|
|
let self_pubkey = cluster_info.id();
|
|
|
let nodes = get_nodes(cluster_info, cluster_type, stakes);
|
|
let nodes = get_nodes(cluster_info, cluster_type, stakes);
|
|
@@ -320,6 +373,7 @@ pub fn new_cluster_nodes<T: 'static>(
|
|
|
index,
|
|
index,
|
|
|
weighted_shuffle,
|
|
weighted_shuffle,
|
|
|
_phantom: PhantomData,
|
|
_phantom: PhantomData,
|
|
|
|
|
+ use_cha_cha_8,
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -440,11 +494,6 @@ fn dedup_tvu_addrs(nodes: &mut Vec<Node>) {
|
|
|
})
|
|
})
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-fn get_seeded_rng(leader: &Pubkey, shred: &ShredId) -> ChaChaRng {
|
|
|
|
|
- let seed = shred.seed(leader);
|
|
|
|
|
- ChaChaRng::from_seed(seed)
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
// root : [0]
|
|
// root : [0]
|
|
|
// 1st layer: [1, 2, ..., fanout]
|
|
// 1st layer: [1, 2, ..., fanout]
|
|
|
// 2nd layer: [[fanout + 1, ..., fanout * 2],
|
|
// 2nd layer: [[fanout + 1, ..., fanout * 2],
|
|
@@ -542,6 +591,11 @@ impl<T: 'static> ClusterNodesCache<T> {
|
|
|
let cache = self.cache.read().unwrap();
|
|
let cache = self.cache.read().unwrap();
|
|
|
get_epoch_entry(&cache, epoch, self.ttl)
|
|
get_epoch_entry(&cache, epoch, self.ttl)
|
|
|
};
|
|
};
|
|
|
|
|
+ let use_cha_cha_8 = check_feature_activation(
|
|
|
|
|
+ &feature_set::switch_to_chacha8_turbine::ID,
|
|
|
|
|
+ shred_slot,
|
|
|
|
|
+ root_bank,
|
|
|
|
|
+ );
|
|
|
// Fall back to exclusive lock if there is a cache miss or the cached
|
|
// Fall back to exclusive lock if there is a cache miss or the cached
|
|
|
// entry has already expired.
|
|
// entry has already expired.
|
|
|
let entry: Arc<OnceLock<_>> = entry.unwrap_or_else(|| {
|
|
let entry: Arc<OnceLock<_>> = entry.unwrap_or_else(|| {
|
|
@@ -567,8 +621,12 @@ impl<T: 'static> ClusterNodesCache<T> {
|
|
|
inc_new_counter_error!("cluster_nodes-unknown_epoch_staked_nodes", 1);
|
|
inc_new_counter_error!("cluster_nodes-unknown_epoch_staked_nodes", 1);
|
|
|
Arc::<HashMap<Pubkey, /*stake:*/ u64>>::default()
|
|
Arc::<HashMap<Pubkey, /*stake:*/ u64>>::default()
|
|
|
});
|
|
});
|
|
|
- let nodes =
|
|
|
|
|
- new_cluster_nodes::<T>(cluster_info, root_bank.cluster_type(), &epoch_staked_nodes);
|
|
|
|
|
|
|
+ let nodes = new_cluster_nodes::<T>(
|
|
|
|
|
+ cluster_info,
|
|
|
|
|
+ root_bank.cluster_type(),
|
|
|
|
|
+ &epoch_staked_nodes,
|
|
|
|
|
+ use_cha_cha_8,
|
|
|
|
|
+ );
|
|
|
(Instant::now(), Arc::new(nodes))
|
|
(Instant::now(), Arc::new(nodes))
|
|
|
});
|
|
});
|
|
|
nodes.clone()
|
|
nodes.clone()
|
|
@@ -727,10 +785,105 @@ mod tests {
|
|
|
use {
|
|
use {
|
|
|
super::*,
|
|
super::*,
|
|
|
itertools::Itertools,
|
|
itertools::Itertools,
|
|
|
- std::{fmt::Debug, hash::Hash},
|
|
|
|
|
|
|
+ solana_hash::Hash as SolanaHash,
|
|
|
|
|
+ solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shredder},
|
|
|
|
|
+ std::{collections::VecDeque, fmt::Debug, hash::Hash},
|
|
|
test_case::test_case,
|
|
test_case::test_case,
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
|
|
+ #[test_case(true /* chacha8 */)]
|
|
|
|
|
+ #[test_case(false /* chacha20 */)]
|
|
|
|
|
+ /// Test that we provide a complete coverage
|
|
|
|
|
+ /// of all the nodes with weighted shuffles
|
|
|
|
|
+ fn test_complete_cluster_coverage(use_cha_cha_8: bool) {
|
|
|
|
|
+ let fanout = 10;
|
|
|
|
|
+ let mut rng = rand::thread_rng();
|
|
|
|
|
+
|
|
|
|
|
+ let (_nodes, stakes, cluster_info) = make_test_cluster(&mut rng, 20, Some((0, 1)));
|
|
|
|
|
+ let slot_leader = cluster_info.id();
|
|
|
|
|
+
|
|
|
|
|
+ // create a test cluster
|
|
|
|
|
+ let cluster_nodes = new_cluster_nodes::<BroadcastStage>(
|
|
|
|
|
+ &cluster_info,
|
|
|
|
|
+ ClusterType::Development,
|
|
|
|
|
+ &stakes,
|
|
|
|
|
+ use_cha_cha_8,
|
|
|
|
|
+ );
|
|
|
|
|
+
|
|
|
|
|
+ let shred = Shredder::new(2, 1, 0, 0)
|
|
|
|
|
+ .unwrap()
|
|
|
|
|
+ .entries_to_merkle_shreds_for_tests(
|
|
|
|
|
+ &Keypair::new(),
|
|
|
|
|
+ &[],
|
|
|
|
|
+ true,
|
|
|
|
|
+ SolanaHash::default(),
|
|
|
|
|
+ 0,
|
|
|
|
|
+ 0,
|
|
|
|
|
+ &ReedSolomonCache::default(),
|
|
|
|
|
+ &mut ProcessShredsStats::default(),
|
|
|
|
|
+ )
|
|
|
|
|
+ .0
|
|
|
|
|
+ .pop()
|
|
|
|
|
+ .unwrap();
|
|
|
|
|
+
|
|
|
|
|
+ let mut weighted_shuffle = cluster_nodes.weighted_shuffle.clone();
|
|
|
|
|
+ let mut chacha_rng = TurbineRng::new_seeded(&slot_leader, &shred.id(), use_cha_cha_8);
|
|
|
|
|
+
|
|
|
|
|
+ let shuffled_nodes: Vec<&Node> = weighted_shuffle
|
|
|
|
|
+ .shuffle(&mut chacha_rng)
|
|
|
|
|
+ .map(|i| &cluster_nodes.nodes[i])
|
|
|
|
|
+ .collect();
|
|
|
|
|
+
|
|
|
|
|
+ // Slot leader obviously has the shred
|
|
|
|
|
+ let mut covered: HashSet<Pubkey> = HashSet::from([slot_leader]);
|
|
|
|
|
+ // The root node has the shred sent to it initially
|
|
|
|
|
+ let mut queue = VecDeque::from([*shuffled_nodes[0].pubkey()]);
|
|
|
|
|
+
|
|
|
|
|
+ // traverse the turbine tree using the queue of nodes to visit (BFS)
|
|
|
|
|
+ while let Some(addr) = queue.pop_front() {
|
|
|
|
|
+ if !covered.insert(addr) {
|
|
|
|
|
+ panic!("Should not send to already covered nodes, instead sending to {addr}");
|
|
|
|
|
+ }
|
|
|
|
|
+ let (_, peers) = get_retransmit_peers(
|
|
|
|
|
+ fanout,
|
|
|
|
|
+ |n: &Node| n.pubkey() == &addr,
|
|
|
|
|
+ shuffled_nodes.clone(),
|
|
|
|
|
+ );
|
|
|
|
|
+
|
|
|
|
|
+ // visit all child nodes
|
|
|
|
|
+ for peer in peers {
|
|
|
|
|
+ trace!("{} is child of {addr}", peer.pubkey());
|
|
|
|
|
+ queue.push_back(*peer.pubkey());
|
|
|
|
|
+ if stakes[peer.pubkey()] == 0 {
|
|
|
|
|
+ continue; // no check of retransmit parents for unstaked nodes
|
|
|
|
|
+ }
|
|
|
|
|
+ // luckily for us, ClusterNodes<RetransmitStage> does not do anything with own identity
|
|
|
|
|
+ let mut peer_cluster_nodes = new_cluster_nodes::<RetransmitStage>(
|
|
|
|
|
+ &cluster_info,
|
|
|
|
|
+ ClusterType::Development,
|
|
|
|
|
+ &stakes,
|
|
|
|
|
+ use_cha_cha_8,
|
|
|
|
|
+ );
|
|
|
|
|
+ peer_cluster_nodes.pubkey = *peer.pubkey();
|
|
|
|
|
+ // check that the parent computed by the child matches actual parent.
|
|
|
|
|
+ let parent = peer_cluster_nodes
|
|
|
|
|
+ .get_retransmit_parent(&slot_leader, &shred.id(), fanout)
|
|
|
|
|
+ .unwrap();
|
|
|
|
|
+
|
|
|
|
|
+ assert_eq!(
|
|
|
|
|
+ Some(addr),
|
|
|
|
|
+ parent,
|
|
|
|
|
+ "Found incorrect parent for node {}",
|
|
|
|
|
+ peer_cluster_nodes.pubkey
|
|
|
|
|
+ );
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // Convert cluster_nodes into hashset of pubkeys
|
|
|
|
|
+ let all_nodes: HashSet<_> = cluster_nodes.nodes.iter().map(|n| *n.pubkey()).collect();
|
|
|
|
|
+ assert_eq!(all_nodes, covered, "All nodes must be covered");
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
#[test]
|
|
#[test]
|
|
|
fn test_cluster_nodes_retransmit() {
|
|
fn test_cluster_nodes_retransmit() {
|
|
|
let mut rng = rand::thread_rng();
|
|
let mut rng = rand::thread_rng();
|
|
@@ -740,8 +893,12 @@ mod tests {
|
|
|
cluster_info.tvu_peers(GossipContactInfo::clone).len(),
|
|
cluster_info.tvu_peers(GossipContactInfo::clone).len(),
|
|
|
nodes.len() - 1
|
|
nodes.len() - 1
|
|
|
);
|
|
);
|
|
|
- let cluster_nodes =
|
|
|
|
|
- new_cluster_nodes::<RetransmitStage>(&cluster_info, ClusterType::Development, &stakes);
|
|
|
|
|
|
|
+ let cluster_nodes = new_cluster_nodes::<RetransmitStage>(
|
|
|
|
|
+ &cluster_info,
|
|
|
|
|
+ ClusterType::Development,
|
|
|
|
|
+ &stakes,
|
|
|
|
|
+ false,
|
|
|
|
|
+ );
|
|
|
// All nodes with contact-info should be in the index.
|
|
// All nodes with contact-info should be in the index.
|
|
|
// Staked nodes with no contact-info should be included.
|
|
// Staked nodes with no contact-info should be included.
|
|
|
assert!(cluster_nodes.nodes.len() > nodes.len());
|
|
assert!(cluster_nodes.nodes.len() > nodes.len());
|
|
@@ -770,8 +927,9 @@ mod tests {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- #[test]
|
|
|
|
|
- fn test_cluster_nodes_broadcast() {
|
|
|
|
|
|
|
+ #[test_case(true)/*ChaCha8 */]
|
|
|
|
|
+ #[test_case(false)/*ChaCha20 */]
|
|
|
|
|
+ fn test_cluster_nodes_broadcast(use_cha_cha_8: bool) {
|
|
|
let mut rng = rand::thread_rng();
|
|
let mut rng = rand::thread_rng();
|
|
|
let (nodes, stakes, cluster_info) = make_test_cluster(&mut rng, 1_000, None);
|
|
let (nodes, stakes, cluster_info) = make_test_cluster(&mut rng, 1_000, None);
|
|
|
// ClusterInfo::tvu_peers excludes the node itself.
|
|
// ClusterInfo::tvu_peers excludes the node itself.
|
|
@@ -779,8 +937,12 @@ mod tests {
|
|
|
cluster_info.tvu_peers(GossipContactInfo::clone).len(),
|
|
cluster_info.tvu_peers(GossipContactInfo::clone).len(),
|
|
|
nodes.len() - 1
|
|
nodes.len() - 1
|
|
|
);
|
|
);
|
|
|
- let cluster_nodes =
|
|
|
|
|
- ClusterNodes::<BroadcastStage>::new(&cluster_info, ClusterType::Development, &stakes);
|
|
|
|
|
|
|
+ let cluster_nodes = ClusterNodes::<BroadcastStage>::new(
|
|
|
|
|
+ &cluster_info,
|
|
|
|
|
+ ClusterType::Development,
|
|
|
|
|
+ &stakes,
|
|
|
|
|
+ use_cha_cha_8,
|
|
|
|
|
+ );
|
|
|
// All nodes with contact-info should be in the index.
|
|
// All nodes with contact-info should be in the index.
|
|
|
// Excluding this node itself.
|
|
// Excluding this node itself.
|
|
|
// Staked nodes with no contact-info should be included.
|
|
// Staked nodes with no contact-info should be included.
|