|
|
@@ -9,9 +9,12 @@ use crate::repair_service::{RepairService, RepairStrategy};
|
|
|
use crate::result::{Error, Result};
|
|
|
use crate::service::Service;
|
|
|
use crate::streamer::{BlobReceiver, BlobSender};
|
|
|
+use rayon::prelude::*;
|
|
|
+use rayon::ThreadPool;
|
|
|
use solana_metrics::{inc_new_counter_debug, inc_new_counter_error};
|
|
|
use solana_runtime::bank::Bank;
|
|
|
use solana_sdk::pubkey::Pubkey;
|
|
|
+use solana_sdk::signature::Signable;
|
|
|
use solana_sdk::timing::duration_as_ms;
|
|
|
use std::net::UdpSocket;
|
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
|
|
@@ -20,6 +23,8 @@ use std::sync::{Arc, RwLock};
|
|
|
use std::thread::{self, Builder, JoinHandle};
|
|
|
use std::time::{Duration, Instant};
|
|
|
|
|
|
+pub const NUM_THREADS: u32 = 10;
|
|
|
+
|
|
|
fn retransmit_blobs(blobs: &[SharedBlob], retransmit: &BlobSender, id: &Pubkey) -> Result<()> {
|
|
|
let mut retransmit_queue: Vec<SharedBlob> = Vec::new();
|
|
|
for blob in blobs {
|
|
|
@@ -86,7 +91,10 @@ pub fn should_retransmit_and_persist(
|
|
|
Some(bank) => leader_schedule_cache.slot_leader_at(blob.slot(), Some(&bank)),
|
|
|
};
|
|
|
|
|
|
- if blob.id() == *my_pubkey {
|
|
|
+ if !blob.verify() {
|
|
|
+ inc_new_counter_debug!("streamer-recv_window-invalid_signature", 1);
|
|
|
+ false
|
|
|
+ } else if blob.id() == *my_pubkey {
|
|
|
inc_new_counter_debug!("streamer-recv_window-circular_transmission", 1);
|
|
|
false
|
|
|
} else if slot_leader_pubkey == None {
|
|
|
@@ -108,9 +116,11 @@ fn recv_window<F>(
|
|
|
r: &BlobReceiver,
|
|
|
retransmit: &BlobSender,
|
|
|
blob_filter: F,
|
|
|
+ thread_pool: &ThreadPool,
|
|
|
) -> Result<()>
|
|
|
where
|
|
|
F: Fn(&Blob) -> bool,
|
|
|
+ F: Sync,
|
|
|
{
|
|
|
let timer = Duration::from_millis(200);
|
|
|
let mut blobs = r.recv_timeout(timer)?;
|
|
|
@@ -121,7 +131,12 @@ where
|
|
|
let now = Instant::now();
|
|
|
inc_new_counter_debug!("streamer-recv_window-recv", blobs.len(), 0, 1000);
|
|
|
|
|
|
- blobs.retain(|blob| blob_filter(&blob.read().unwrap()));
|
|
|
+ let blobs: Vec<_> = thread_pool.install(|| {
|
|
|
+ blobs
|
|
|
+ .into_par_iter()
|
|
|
+ .filter(|b| blob_filter(&b.read().unwrap()))
|
|
|
+ .collect()
|
|
|
+ });
|
|
|
|
|
|
retransmit_blobs(&blobs, retransmit, my_pubkey)?;
|
|
|
|
|
|
@@ -200,20 +215,31 @@ impl WindowService {
|
|
|
let _exit = Finalizer::new(exit.clone());
|
|
|
let id = cluster_info.read().unwrap().id();
|
|
|
trace!("{}: RECV_WINDOW started", id);
|
|
|
+ let thread_pool = rayon::ThreadPoolBuilder::new()
|
|
|
+ .num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize)
|
|
|
+ .build()
|
|
|
+ .unwrap();
|
|
|
loop {
|
|
|
if exit.load(Ordering::Relaxed) {
|
|
|
break;
|
|
|
}
|
|
|
|
|
|
- if let Err(e) = recv_window(&blocktree, &id, &r, &retransmit, |blob| {
|
|
|
- blob_filter(
|
|
|
- &id,
|
|
|
- blob,
|
|
|
- bank_forks
|
|
|
- .as_ref()
|
|
|
- .map(|bank_forks| bank_forks.read().unwrap().working_bank()),
|
|
|
- )
|
|
|
- }) {
|
|
|
+ if let Err(e) = recv_window(
|
|
|
+ &blocktree,
|
|
|
+ &id,
|
|
|
+ &r,
|
|
|
+ &retransmit,
|
|
|
+ |blob| {
|
|
|
+ blob_filter(
|
|
|
+ &id,
|
|
|
+ blob,
|
|
|
+ bank_forks
|
|
|
+ .as_ref()
|
|
|
+ .map(|bank_forks| bank_forks.read().unwrap().working_bank()),
|
|
|
+ )
|
|
|
+ },
|
|
|
+ &thread_pool,
|
|
|
+ ) {
|
|
|
match e {
|
|
|
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
|
|
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
|
|
@@ -249,13 +275,14 @@ mod test {
|
|
|
use crate::bank_forks::BankForks;
|
|
|
use crate::blocktree::{get_tmp_ledger_path, Blocktree};
|
|
|
use crate::cluster_info::{ClusterInfo, Node};
|
|
|
- use crate::entry::{make_consecutive_blobs, make_tiny_test_entries, EntrySlice};
|
|
|
+ use crate::entry::{make_consecutive_blobs, make_tiny_test_entries, Entry, EntrySlice};
|
|
|
use crate::genesis_utils::create_genesis_block_with_leader;
|
|
|
- use crate::packet::{index_blobs, Blob};
|
|
|
+ use crate::packet::index_blobs;
|
|
|
use crate::service::Service;
|
|
|
use crate::streamer::{blob_receiver, responder};
|
|
|
use solana_runtime::epoch_schedule::MINIMUM_SLOT_LENGTH;
|
|
|
use solana_sdk::hash::Hash;
|
|
|
+ use solana_sdk::signature::{Keypair, KeypairUtil};
|
|
|
use std::fs::remove_dir_all;
|
|
|
use std::net::UdpSocket;
|
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
|
|
@@ -289,14 +316,17 @@ mod test {
|
|
|
#[test]
|
|
|
fn test_should_retransmit_and_persist() {
|
|
|
let me_id = Pubkey::new_rand();
|
|
|
- let leader_pubkey = Pubkey::new_rand();
|
|
|
+ let leader_keypair = Keypair::new();
|
|
|
+ let leader_pubkey = leader_keypair.pubkey();
|
|
|
let bank = Arc::new(Bank::new(
|
|
|
&create_genesis_block_with_leader(100, &leader_pubkey, 10).genesis_block,
|
|
|
));
|
|
|
let cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
|
|
|
|
|
|
- let mut blob = Blob::default();
|
|
|
+ let entry = Entry::default();
|
|
|
+ let mut blob = entry.to_blob();
|
|
|
blob.set_id(&leader_pubkey);
|
|
|
+ blob.sign(&leader_keypair);
|
|
|
|
|
|
// without a Bank and blobs not from me, blob gets thrown out
|
|
|
assert_eq!(
|