push_active_set.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. use {
  2. crate::weighted_shuffle::WeightedShuffle,
  3. indexmap::IndexMap,
  4. rand::Rng,
  5. solana_bloom::bloom::{Bloom, ConcurrentBloom},
  6. solana_native_token::LAMPORTS_PER_SOL,
  7. solana_pubkey::Pubkey,
  8. std::collections::HashMap,
  9. };
  10. const NUM_PUSH_ACTIVE_SET_ENTRIES: usize = 25;
  11. // Each entry corresponds to a stake bucket for
  12. // min stake of { this node, crds value owner }
  13. // The entry represents set of gossip nodes to actively
  14. // push to for crds values belonging to the bucket.
  15. #[derive(Default)]
  16. pub(crate) struct PushActiveSet([PushActiveSetEntry; NUM_PUSH_ACTIVE_SET_ENTRIES]);
  17. // Keys are gossip nodes to push messages to.
  18. // Values are which origins the node has pruned.
  19. #[derive(Default)]
  20. struct PushActiveSetEntry(IndexMap</*node:*/ Pubkey, /*origins:*/ ConcurrentBloom<Pubkey>>);
  21. impl PushActiveSet {
  22. #[cfg(debug_assertions)]
  23. const MIN_NUM_BLOOM_ITEMS: usize = 512;
  24. #[cfg(not(debug_assertions))]
  25. const MIN_NUM_BLOOM_ITEMS: usize = crate::cluster_info::CRDS_UNIQUE_PUBKEY_CAPACITY;
  26. pub(crate) fn get_nodes<'a>(
  27. &'a self,
  28. pubkey: &'a Pubkey, // This node.
  29. origin: &'a Pubkey, // CRDS value owner.
  30. stakes: &HashMap<Pubkey, u64>,
  31. ) -> impl Iterator<Item = &'a Pubkey> + 'a + use<'a> {
  32. let stake = stakes.get(pubkey).min(stakes.get(origin));
  33. self.get_entry(stake).get_nodes(pubkey, origin)
  34. }
  35. // Prunes origins for the given gossip node.
  36. // We will stop pushing messages from the specified origins to the node.
  37. pub(crate) fn prune(
  38. &self,
  39. pubkey: &Pubkey, // This node.
  40. node: &Pubkey, // Gossip node.
  41. origins: &[Pubkey], // CRDS value owners.
  42. stakes: &HashMap<Pubkey, u64>,
  43. ) {
  44. let stake = stakes.get(pubkey);
  45. for origin in origins {
  46. if origin == pubkey {
  47. continue;
  48. }
  49. let stake = stake.min(stakes.get(origin));
  50. self.get_entry(stake).prune(node, origin)
  51. }
  52. }
  53. pub(crate) fn rotate<R: Rng>(
  54. &mut self,
  55. rng: &mut R,
  56. size: usize, // Number of nodes to retain in each active-set entry.
  57. cluster_size: usize,
  58. // Gossip nodes to be sampled for each push active set.
  59. nodes: &[Pubkey],
  60. stakes: &HashMap<Pubkey, u64>,
  61. ) {
  62. let num_bloom_filter_items = cluster_size.max(Self::MIN_NUM_BLOOM_ITEMS);
  63. // Active set of nodes to push to are sampled from these gossip nodes,
  64. // using sampling probabilities obtained from the stake bucket of each
  65. // node.
  66. let buckets: Vec<_> = nodes
  67. .iter()
  68. .map(|node| get_stake_bucket(stakes.get(node)))
  69. .collect();
  70. // (k, entry) represents push active set where the stake bucket of
  71. // min stake of {this node, crds value owner}
  72. // is equal to `k`. The `entry` maintains set of gossip nodes to
  73. // actively push to for crds values belonging to this bucket.
  74. for (k, entry) in self.0.iter_mut().enumerate() {
  75. let weights: Vec<u64> = buckets
  76. .iter()
  77. .map(|&bucket| {
  78. // bucket <- get_stake_bucket(min stake of {
  79. // this node, crds value owner and gossip peer
  80. // })
  81. // weight <- (bucket + 1)^2
  82. // min stake of {...} is a proxy for how much we care about
  83. // the link, and tries to mirror similar logic on the
  84. // receiving end when pruning incoming links:
  85. // https://github.com/solana-labs/solana/blob/81394cf92/gossip/src/received_cache.rs#L100-L105
  86. let bucket = bucket.min(k) as u64;
  87. bucket.saturating_add(1).saturating_pow(2)
  88. })
  89. .collect();
  90. entry.rotate(rng, size, num_bloom_filter_items, nodes, &weights);
  91. }
  92. }
  93. fn get_entry(&self, stake: Option<&u64>) -> &PushActiveSetEntry {
  94. &self.0[get_stake_bucket(stake)]
  95. }
  96. }
  97. impl PushActiveSetEntry {
  98. const BLOOM_FALSE_RATE: f64 = 0.1;
  99. const BLOOM_MAX_BITS: usize = 1024 * 8 * 4;
  100. fn get_nodes<'a>(
  101. &'a self,
  102. pubkey: &'a Pubkey, // This node.
  103. origin: &'a Pubkey, // CRDS value owner.
  104. ) -> impl Iterator<Item = &'a Pubkey> + 'a {
  105. let pubkey_eq_origin = pubkey == origin;
  106. self.0
  107. .iter()
  108. .filter(move |(node, bloom_filter)| {
  109. // Bloom filter can return false positive for origin == pubkey
  110. // but a node should always be able to push its own values.
  111. !bloom_filter.contains(origin) || (pubkey_eq_origin && &pubkey != node)
  112. })
  113. .map(|(node, _bloom_filter)| node)
  114. }
  115. fn prune(
  116. &self,
  117. node: &Pubkey, // Gossip node.
  118. origin: &Pubkey, // CRDS value owner
  119. ) {
  120. if let Some(bloom_filter) = self.0.get(node) {
  121. bloom_filter.add(origin);
  122. }
  123. }
  124. fn rotate<R: Rng>(
  125. &mut self,
  126. rng: &mut R,
  127. size: usize, // Number of nodes to retain.
  128. num_bloom_filter_items: usize,
  129. nodes: &[Pubkey],
  130. weights: &[u64],
  131. ) {
  132. debug_assert_eq!(nodes.len(), weights.len());
  133. debug_assert!(weights.iter().all(|&weight| weight != 0u64));
  134. let mut weighted_shuffle = WeightedShuffle::new("rotate-active-set", weights);
  135. for node in weighted_shuffle.shuffle(rng).map(|k| &nodes[k]) {
  136. // We intend to discard the oldest/first entry in the index-map.
  137. if self.0.len() > size {
  138. break;
  139. }
  140. if self.0.contains_key(node) {
  141. continue;
  142. }
  143. let bloom = ConcurrentBloom::from(Bloom::random(
  144. num_bloom_filter_items,
  145. Self::BLOOM_FALSE_RATE,
  146. Self::BLOOM_MAX_BITS,
  147. ));
  148. bloom.add(node);
  149. self.0.insert(*node, bloom);
  150. }
  151. // Drop the oldest entry while preserving the ordering of others.
  152. while self.0.len() > size {
  153. self.0.shift_remove_index(0);
  154. }
  155. }
  156. }
  157. // Maps stake to bucket index.
  158. fn get_stake_bucket(stake: Option<&u64>) -> usize {
  159. let stake = stake.copied().unwrap_or_default() / LAMPORTS_PER_SOL;
  160. let bucket = u64::BITS - stake.leading_zeros();
  161. (bucket as usize).min(NUM_PUSH_ACTIVE_SET_ENTRIES - 1)
  162. }
  163. #[cfg(test)]
  164. mod tests {
  165. use {
  166. super::*, itertools::iproduct, rand::SeedableRng, rand_chacha::ChaChaRng,
  167. std::iter::repeat_with,
  168. };
  169. #[test]
  170. fn test_get_stake_bucket() {
  171. assert_eq!(get_stake_bucket(None), 0);
  172. let buckets = [0, 1, 2, 2, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5];
  173. for (k, bucket) in buckets.into_iter().enumerate() {
  174. let stake = (k as u64) * LAMPORTS_PER_SOL;
  175. assert_eq!(get_stake_bucket(Some(&stake)), bucket);
  176. }
  177. for (stake, bucket) in [
  178. (4_194_303, 22),
  179. (4_194_304, 23),
  180. (8_388_607, 23),
  181. (8_388_608, 24),
  182. ] {
  183. let stake = stake * LAMPORTS_PER_SOL;
  184. assert_eq!(get_stake_bucket(Some(&stake)), bucket);
  185. }
  186. assert_eq!(
  187. get_stake_bucket(Some(&u64::MAX)),
  188. NUM_PUSH_ACTIVE_SET_ENTRIES - 1
  189. );
  190. }
  191. #[test]
  192. fn test_push_active_set() {
  193. const CLUSTER_SIZE: usize = 117;
  194. const MAX_STAKE: u64 = (1 << 20) * LAMPORTS_PER_SOL;
  195. let mut rng = ChaChaRng::from_seed([189u8; 32]);
  196. let pubkey = Pubkey::new_unique();
  197. let nodes: Vec<_> = repeat_with(Pubkey::new_unique).take(20).collect();
  198. let stakes = repeat_with(|| rng.gen_range(1..MAX_STAKE));
  199. let mut stakes: HashMap<_, _> = nodes.iter().copied().zip(stakes).collect();
  200. stakes.insert(pubkey, rng.gen_range(1..MAX_STAKE));
  201. let mut active_set = PushActiveSet::default();
  202. assert!(active_set.0.iter().all(|entry| entry.0.is_empty()));
  203. active_set.rotate(&mut rng, 5, CLUSTER_SIZE, &nodes, &stakes);
  204. assert!(active_set.0.iter().all(|entry| entry.0.len() == 5));
  205. // Assert that for all entries, each filter already prunes the key.
  206. for entry in &active_set.0 {
  207. for (node, filter) in entry.0.iter() {
  208. assert!(filter.contains(node));
  209. }
  210. }
  211. let other = &nodes[5];
  212. let origin = &nodes[17];
  213. assert!(active_set
  214. .get_nodes(&pubkey, origin, &stakes)
  215. .eq([13, 5, 18, 16, 0].into_iter().map(|k| &nodes[k])));
  216. assert!(active_set
  217. .get_nodes(&pubkey, other, &stakes)
  218. .eq([13, 18, 16, 0].into_iter().map(|k| &nodes[k])));
  219. active_set.prune(&pubkey, &nodes[5], &[*origin], &stakes);
  220. active_set.prune(&pubkey, &nodes[3], &[*origin], &stakes);
  221. active_set.prune(&pubkey, &nodes[16], &[*origin], &stakes);
  222. assert!(active_set
  223. .get_nodes(&pubkey, origin, &stakes)
  224. .eq([13, 18, 0].into_iter().map(|k| &nodes[k])));
  225. assert!(active_set
  226. .get_nodes(&pubkey, other, &stakes)
  227. .eq([13, 18, 16, 0].into_iter().map(|k| &nodes[k])));
  228. active_set.rotate(&mut rng, 7, CLUSTER_SIZE, &nodes, &stakes);
  229. assert!(active_set.0.iter().all(|entry| entry.0.len() == 7));
  230. assert!(active_set
  231. .get_nodes(&pubkey, origin, &stakes)
  232. .eq([18, 0, 7, 15, 11].into_iter().map(|k| &nodes[k])));
  233. assert!(active_set
  234. .get_nodes(&pubkey, other, &stakes)
  235. .eq([18, 16, 0, 7, 15, 11].into_iter().map(|k| &nodes[k])));
  236. let origins = [*origin, *other];
  237. active_set.prune(&pubkey, &nodes[18], &origins, &stakes);
  238. active_set.prune(&pubkey, &nodes[0], &origins, &stakes);
  239. active_set.prune(&pubkey, &nodes[15], &origins, &stakes);
  240. assert!(active_set
  241. .get_nodes(&pubkey, origin, &stakes)
  242. .eq([7, 11].into_iter().map(|k| &nodes[k])));
  243. assert!(active_set
  244. .get_nodes(&pubkey, other, &stakes)
  245. .eq([16, 7, 11].into_iter().map(|k| &nodes[k])));
  246. }
  247. #[test]
  248. fn test_push_active_set_entry() {
  249. const NUM_BLOOM_FILTER_ITEMS: usize = 100;
  250. let mut rng = ChaChaRng::from_seed([147u8; 32]);
  251. let nodes: Vec<_> = repeat_with(Pubkey::new_unique).take(20).collect();
  252. let weights: Vec<_> = repeat_with(|| rng.gen_range(1..1000)).take(20).collect();
  253. let mut entry = PushActiveSetEntry::default();
  254. entry.rotate(
  255. &mut rng,
  256. 5, // size
  257. NUM_BLOOM_FILTER_ITEMS,
  258. &nodes,
  259. &weights,
  260. );
  261. assert_eq!(entry.0.len(), 5);
  262. let keys = [&nodes[16], &nodes[11], &nodes[17], &nodes[14], &nodes[5]];
  263. assert!(entry.0.keys().eq(keys));
  264. for (pubkey, origin) in iproduct!(&nodes, &nodes) {
  265. if !keys.contains(&origin) {
  266. assert!(entry.get_nodes(pubkey, origin).eq(keys));
  267. } else {
  268. assert!(entry
  269. .get_nodes(pubkey, origin)
  270. .eq(keys.into_iter().filter(|&key| key != origin)));
  271. }
  272. }
  273. // Assert that each filter already prunes the key.
  274. for (node, filter) in entry.0.iter() {
  275. assert!(filter.contains(node));
  276. }
  277. for (pubkey, origin) in iproduct!(&nodes, keys) {
  278. assert!(entry
  279. .get_nodes(pubkey, origin)
  280. .eq(keys.into_iter().filter(|&node| node != origin)));
  281. }
  282. // Assert that prune excludes node from get.
  283. let origin = &nodes[3];
  284. entry.prune(&nodes[11], origin);
  285. entry.prune(&nodes[14], origin);
  286. entry.prune(&nodes[19], origin);
  287. for pubkey in &nodes {
  288. assert!(entry.get_nodes(pubkey, origin).eq(keys
  289. .into_iter()
  290. .filter(|&&node| pubkey == origin || (node != nodes[11] && node != nodes[14]))));
  291. }
  292. // Assert that rotate adds new nodes.
  293. entry.rotate(&mut rng, 5, NUM_BLOOM_FILTER_ITEMS, &nodes, &weights);
  294. let keys = [&nodes[11], &nodes[17], &nodes[14], &nodes[5], &nodes[7]];
  295. assert!(entry.0.keys().eq(keys));
  296. entry.rotate(&mut rng, 6, NUM_BLOOM_FILTER_ITEMS, &nodes, &weights);
  297. let keys = [
  298. &nodes[17], &nodes[14], &nodes[5], &nodes[7], &nodes[1], &nodes[13],
  299. ];
  300. assert!(entry.0.keys().eq(keys));
  301. entry.rotate(&mut rng, 4, NUM_BLOOM_FILTER_ITEMS, &nodes, &weights);
  302. let keys = [&nodes[5], &nodes[7], &nodes[1], &nodes[13]];
  303. assert!(entry.0.keys().eq(keys));
  304. }
  305. }