|
@@ -53,8 +53,6 @@ use {
|
|
|
pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000;
|
|
pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000;
|
|
|
// Retention period of hashes of received outdated values.
|
|
// Retention period of hashes of received outdated values.
|
|
|
const FAILED_INSERTS_RETENTION_MS: u64 = 20_000;
|
|
const FAILED_INSERTS_RETENTION_MS: u64 = 20_000;
|
|
|
-// Maximum number of pull requests to send out each time around.
|
|
|
|
|
-const MAX_NUM_PULL_REQUESTS: usize = 1024;
|
|
|
|
|
pub const FALSE_RATE: f64 = 0.1f64;
|
|
pub const FALSE_RATE: f64 = 0.1f64;
|
|
|
pub const KEYS: f64 = 8f64;
|
|
pub const KEYS: f64 = 8f64;
|
|
|
|
|
|
|
@@ -143,19 +141,26 @@ impl CrdsFilter {
|
|
|
|
|
|
|
|
/// A vector of crds filters that together hold a complete set of Hashes.
|
|
/// A vector of crds filters that together hold a complete set of Hashes.
|
|
|
struct CrdsFilterSet {
|
|
struct CrdsFilterSet {
|
|
|
- filters: Vec<AtomicBloom<Hash>>,
|
|
|
|
|
|
|
+ filters: Vec<Option<AtomicBloom<Hash>>>,
|
|
|
mask_bits: u32,
|
|
mask_bits: u32,
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
impl CrdsFilterSet {
|
|
impl CrdsFilterSet {
|
|
|
- fn new(num_items: usize, max_bytes: usize) -> Self {
|
|
|
|
|
|
|
+ fn new<R: Rng>(rng: &mut R, num_items: usize, max_bytes: usize) -> Self {
|
|
|
|
|
+ const SAMPLE_RATE: usize = 8;
|
|
|
|
|
+ const MAX_NUM_FILTERS: usize = 1024;
|
|
|
let max_bits = (max_bytes * 8) as f64;
|
|
let max_bits = (max_bytes * 8) as f64;
|
|
|
let max_items = CrdsFilter::max_items(max_bits, FALSE_RATE, KEYS);
|
|
let max_items = CrdsFilter::max_items(max_bits, FALSE_RATE, KEYS);
|
|
|
let mask_bits = CrdsFilter::mask_bits(num_items as f64, max_items);
|
|
let mask_bits = CrdsFilter::mask_bits(num_items as f64, max_items);
|
|
|
- let filters =
|
|
|
|
|
- repeat_with(|| Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize).into())
|
|
|
|
|
- .take(1 << mask_bits)
|
|
|
|
|
- .collect();
|
|
|
|
|
|
|
+ let mut filters: Vec<_> = repeat_with(|| None).take(1usize << mask_bits).collect();
|
|
|
|
|
+ let mut indices: Vec<_> = (0..filters.len()).collect();
|
|
|
|
|
+ let size = (filters.len() + SAMPLE_RATE - 1) / SAMPLE_RATE;
|
|
|
|
|
+ for _ in 0..MAX_NUM_FILTERS.min(size) {
|
|
|
|
|
+ let k = rng.gen_range(0..indices.len());
|
|
|
|
|
+ let k = indices.swap_remove(k);
|
|
|
|
|
+ let filter = Bloom::random(max_items as usize, FALSE_RATE, max_bits as usize);
|
|
|
|
|
+ filters[k] = Some(AtomicBloom::<Hash>::from(filter));
|
|
|
|
|
+ }
|
|
|
Self { filters, mask_bits }
|
|
Self { filters, mask_bits }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -167,7 +172,9 @@ impl CrdsFilterSet {
|
|
|
.unwrap_or_default(),
|
|
.unwrap_or_default(),
|
|
|
)
|
|
)
|
|
|
.unwrap();
|
|
.unwrap();
|
|
|
- self.filters[index].add(&hash_value);
|
|
|
|
|
|
|
+ if let Some(filter) = &self.filters[index] {
|
|
|
|
|
+ filter.add(&hash_value);
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -177,10 +184,12 @@ impl From<CrdsFilterSet> for Vec<CrdsFilter> {
|
|
|
cfs.filters
|
|
cfs.filters
|
|
|
.into_iter()
|
|
.into_iter()
|
|
|
.enumerate()
|
|
.enumerate()
|
|
|
- .map(|(seed, filter)| CrdsFilter {
|
|
|
|
|
- filter: filter.into(),
|
|
|
|
|
- mask: CrdsFilter::compute_mask(seed as u64, mask_bits),
|
|
|
|
|
- mask_bits,
|
|
|
|
|
|
|
+ .filter_map(|(seed, filter)| {
|
|
|
|
|
+ Some(CrdsFilter {
|
|
|
|
|
+ filter: Bloom::<Hash>::from(filter?),
|
|
|
|
|
+ mask: CrdsFilter::compute_mask(seed as u64, mask_bits),
|
|
|
|
|
+ mask_bits,
|
|
|
|
|
+ })
|
|
|
})
|
|
})
|
|
|
.collect()
|
|
.collect()
|
|
|
}
|
|
}
|
|
@@ -269,14 +278,7 @@ impl CrdsGossipPull {
|
|
|
if nodes.is_empty() {
|
|
if nodes.is_empty() {
|
|
|
return Err(CrdsGossipError::NoPeers);
|
|
return Err(CrdsGossipError::NoPeers);
|
|
|
}
|
|
}
|
|
|
- let mut filters = self.build_crds_filters(thread_pool, crds, bloom_size);
|
|
|
|
|
- if filters.len() > MAX_NUM_PULL_REQUESTS {
|
|
|
|
|
- for i in 0..MAX_NUM_PULL_REQUESTS {
|
|
|
|
|
- let j = rng.gen_range(i..filters.len());
|
|
|
|
|
- filters.swap(i, j);
|
|
|
|
|
- }
|
|
|
|
|
- filters.truncate(MAX_NUM_PULL_REQUESTS);
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ let filters = self.build_crds_filters(thread_pool, crds, bloom_size);
|
|
|
// Associate each pull-request filter with a randomly selected peer.
|
|
// Associate each pull-request filter with a randomly selected peer.
|
|
|
let dist = WeightedIndex::new(weights).unwrap();
|
|
let dist = WeightedIndex::new(weights).unwrap();
|
|
|
let nodes = repeat_with(|| nodes[dist.sample(&mut rng)].clone());
|
|
let nodes = repeat_with(|| nodes[dist.sample(&mut rng)].clone());
|
|
@@ -425,7 +427,7 @@ impl CrdsGossipPull {
|
|
|
let crds = crds.read().unwrap();
|
|
let crds = crds.read().unwrap();
|
|
|
let num_items = crds.len() + crds.num_purged() + failed_inserts.len();
|
|
let num_items = crds.len() + crds.num_purged() + failed_inserts.len();
|
|
|
let num_items = MIN_NUM_BLOOM_ITEMS.max(num_items);
|
|
let num_items = MIN_NUM_BLOOM_ITEMS.max(num_items);
|
|
|
- let filters = CrdsFilterSet::new(num_items, bloom_size);
|
|
|
|
|
|
|
+ let filters = CrdsFilterSet::new(&mut rand::thread_rng(), num_items, bloom_size);
|
|
|
thread_pool.install(|| {
|
|
thread_pool.install(|| {
|
|
|
crds.par_values()
|
|
crds.par_values()
|
|
|
.with_min_len(PAR_MIN_LENGTH)
|
|
.with_min_len(PAR_MIN_LENGTH)
|
|
@@ -669,45 +671,61 @@ pub(crate) mod tests {
|
|
|
|
|
|
|
|
#[test]
|
|
#[test]
|
|
|
fn test_crds_filter_set_add() {
|
|
fn test_crds_filter_set_add() {
|
|
|
- let crds_filter_set =
|
|
|
|
|
- CrdsFilterSet::new(/*num_items=*/ 9672788, /*max_bytes=*/ 8196);
|
|
|
|
|
- let hash_values: Vec<_> = repeat_with(Hash::new_unique).take(1024).collect();
|
|
|
|
|
|
|
+ let mut rng = rand::thread_rng();
|
|
|
|
|
+ let crds_filter_set = CrdsFilterSet::new(
|
|
|
|
|
+ &mut rng, /*num_items=*/ 59672788, /*max_bytes=*/ 8196,
|
|
|
|
|
+ );
|
|
|
|
|
+ let hash_values: Vec<_> = repeat_with(|| {
|
|
|
|
|
+ let buf: [u8; 32] = rng.gen();
|
|
|
|
|
+ solana_sdk::hash::hashv(&[&buf])
|
|
|
|
|
+ })
|
|
|
|
|
+ .take(1024)
|
|
|
|
|
+ .collect();
|
|
|
|
|
+ assert_eq!(crds_filter_set.filters.len(), 8192);
|
|
|
for hash_value in &hash_values {
|
|
for hash_value in &hash_values {
|
|
|
crds_filter_set.add(*hash_value);
|
|
crds_filter_set.add(*hash_value);
|
|
|
}
|
|
}
|
|
|
let filters: Vec<CrdsFilter> = crds_filter_set.into();
|
|
let filters: Vec<CrdsFilter> = crds_filter_set.into();
|
|
|
|
|
+ let mut num_hits = 0;
|
|
|
assert_eq!(filters.len(), 1024);
|
|
assert_eq!(filters.len(), 1024);
|
|
|
for hash_value in hash_values {
|
|
for hash_value in hash_values {
|
|
|
- let mut num_hits = 0;
|
|
|
|
|
|
|
+ let mut hit = false;
|
|
|
let mut false_positives = 0;
|
|
let mut false_positives = 0;
|
|
|
for filter in &filters {
|
|
for filter in &filters {
|
|
|
if filter.test_mask(&hash_value) {
|
|
if filter.test_mask(&hash_value) {
|
|
|
num_hits += 1;
|
|
num_hits += 1;
|
|
|
|
|
+ assert!(!hit);
|
|
|
|
|
+ hit = true;
|
|
|
assert!(filter.contains(&hash_value));
|
|
assert!(filter.contains(&hash_value));
|
|
|
assert!(filter.filter.contains(&hash_value));
|
|
assert!(filter.filter.contains(&hash_value));
|
|
|
} else if filter.filter.contains(&hash_value) {
|
|
} else if filter.filter.contains(&hash_value) {
|
|
|
false_positives += 1;
|
|
false_positives += 1;
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- assert_eq!(num_hits, 1);
|
|
|
|
|
assert!(false_positives < 5);
|
|
assert!(false_positives < 5);
|
|
|
}
|
|
}
|
|
|
|
|
+ assert!(num_hits > 96, "num_hits: {num_hits}");
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
#[test]
|
|
|
fn test_crds_filter_set_new() {
|
|
fn test_crds_filter_set_new() {
|
|
|
// Validates invariances required by CrdsFilterSet::get in the
|
|
// Validates invariances required by CrdsFilterSet::get in the
|
|
|
// vector of filters generated by CrdsFilterSet::new.
|
|
// vector of filters generated by CrdsFilterSet::new.
|
|
|
- let filters: Vec<CrdsFilter> =
|
|
|
|
|
- CrdsFilterSet::new(/*num_items=*/ 55345017, /*max_bytes=*/ 4098).into();
|
|
|
|
|
- assert_eq!(filters.len(), 16384);
|
|
|
|
|
|
|
+ let filters = CrdsFilterSet::new(
|
|
|
|
|
+ &mut rand::thread_rng(),
|
|
|
|
|
+ 55345017, // num_items
|
|
|
|
|
+ 4098, // max_bytes
|
|
|
|
|
+ );
|
|
|
|
|
+ assert_eq!(filters.filters.len(), 16384);
|
|
|
|
|
+ let filters = Vec::<CrdsFilter>::from(filters);
|
|
|
|
|
+ assert_eq!(filters.len(), 1024);
|
|
|
let mask_bits = filters[0].mask_bits;
|
|
let mask_bits = filters[0].mask_bits;
|
|
|
let right_shift = 64 - mask_bits;
|
|
let right_shift = 64 - mask_bits;
|
|
|
let ones = !0u64 >> mask_bits;
|
|
let ones = !0u64 >> mask_bits;
|
|
|
- for (i, filter) in filters.iter().enumerate() {
|
|
|
|
|
|
|
+ for filter in &filters {
|
|
|
// Check that all mask_bits are equal.
|
|
// Check that all mask_bits are equal.
|
|
|
assert_eq!(mask_bits, filter.mask_bits);
|
|
assert_eq!(mask_bits, filter.mask_bits);
|
|
|
- assert_eq!(i as u64, filter.mask >> right_shift);
|
|
|
|
|
|
|
+ assert!((0..16384).contains(&(filter.mask >> right_shift)));
|
|
|
assert_eq!(ones, ones & filter.mask);
|
|
assert_eq!(ones, ones & filter.mask);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -740,7 +758,7 @@ pub(crate) mod tests {
|
|
|
let crds = RwLock::new(crds);
|
|
let crds = RwLock::new(crds);
|
|
|
assert!(num_inserts > 30_000, "num inserts: {num_inserts}");
|
|
assert!(num_inserts > 30_000, "num inserts: {num_inserts}");
|
|
|
let filters = crds_gossip_pull.build_crds_filters(&thread_pool, &crds, MAX_BLOOM_SIZE);
|
|
let filters = crds_gossip_pull.build_crds_filters(&thread_pool, &crds, MAX_BLOOM_SIZE);
|
|
|
- assert_eq!(filters.len(), MIN_NUM_BLOOM_FILTERS.max(32));
|
|
|
|
|
|
|
+ assert_eq!(filters.len(), MIN_NUM_BLOOM_FILTERS.max(4));
|
|
|
let crds = crds.read().unwrap();
|
|
let crds = crds.read().unwrap();
|
|
|
let purged: Vec<_> = thread_pool.install(|| crds.purged().collect());
|
|
let purged: Vec<_> = thread_pool.install(|| crds.purged().collect());
|
|
|
let hash_values: Vec<_> = crds.values().map(|v| v.value_hash).chain(purged).collect();
|
|
let hash_values: Vec<_> = crds.values().map(|v| v.value_hash).chain(purged).collect();
|
|
@@ -751,21 +769,24 @@ pub(crate) mod tests {
|
|
|
"hash_values.len(): {}",
|
|
"hash_values.len(): {}",
|
|
|
hash_values.len()
|
|
hash_values.len()
|
|
|
);
|
|
);
|
|
|
|
|
+ let mut num_hits = 0;
|
|
|
let mut false_positives = 0;
|
|
let mut false_positives = 0;
|
|
|
for hash_value in hash_values {
|
|
for hash_value in hash_values {
|
|
|
- let mut num_hits = 0;
|
|
|
|
|
|
|
+ let mut hit = false;
|
|
|
for filter in &filters {
|
|
for filter in &filters {
|
|
|
if filter.test_mask(&hash_value) {
|
|
if filter.test_mask(&hash_value) {
|
|
|
num_hits += 1;
|
|
num_hits += 1;
|
|
|
|
|
+ assert!(!hit);
|
|
|
|
|
+ hit = true;
|
|
|
assert!(filter.contains(&hash_value));
|
|
assert!(filter.contains(&hash_value));
|
|
|
assert!(filter.filter.contains(&hash_value));
|
|
assert!(filter.filter.contains(&hash_value));
|
|
|
} else if filter.filter.contains(&hash_value) {
|
|
} else if filter.filter.contains(&hash_value) {
|
|
|
false_positives += 1;
|
|
false_positives += 1;
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- assert_eq!(num_hits, 1);
|
|
|
|
|
}
|
|
}
|
|
|
- assert!(false_positives < 150_000, "fp: {false_positives}");
|
|
|
|
|
|
|
+ assert!(num_hits > 4000, "num_hits: {num_hits}");
|
|
|
|
|
+ assert!(false_positives < 20_000, "fp: {false_positives}");
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
#[test]
|
|
@@ -1308,7 +1329,8 @@ pub(crate) mod tests {
|
|
|
}
|
|
}
|
|
|
#[test]
|
|
#[test]
|
|
|
fn test_crds_filter_complete_set_add_mask() {
|
|
fn test_crds_filter_complete_set_add_mask() {
|
|
|
- let mut filters: Vec<CrdsFilter> = CrdsFilterSet::new(1000, 10).into();
|
|
|
|
|
|
|
+ let mut filters =
|
|
|
|
|
+ Vec::<CrdsFilter>::from(CrdsFilterSet::new(&mut rand::thread_rng(), 1000, 10));
|
|
|
assert!(filters.iter().all(|f| f.mask_bits > 0));
|
|
assert!(filters.iter().all(|f| f.mask_bits > 0));
|
|
|
let mut h: Hash = Hash::default();
|
|
let mut h: Hash = Hash::default();
|
|
|
// rev to make the hash::default() miss on the first few test_masks
|
|
// rev to make the hash::default() miss on the first few test_masks
|