bootstrap.rs 63 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670
  1. use {
  2. agave_snapshots::{
  3. paths as snapshot_paths, snapshot_archive_info::SnapshotArchiveInfoGetter as _,
  4. SnapshotArchiveKind,
  5. },
  6. itertools::Itertools,
  7. log::*,
  8. rand::{seq::SliceRandom, thread_rng, Rng},
  9. rayon::prelude::*,
  10. solana_account::ReadableAccount,
  11. solana_clock::Slot,
  12. solana_commitment_config::CommitmentConfig,
  13. solana_core::validator::{ValidatorConfig, ValidatorStartProgress},
  14. solana_download_utils::{download_snapshot_archive, DownloadProgressRecord},
  15. solana_genesis_utils::download_then_check_genesis_hash,
  16. solana_gossip::{
  17. cluster_info::ClusterInfo,
  18. contact_info::{ContactInfo, Protocol},
  19. crds_data,
  20. gossip_service::GossipService,
  21. node::Node,
  22. },
  23. solana_hash::Hash,
  24. solana_keypair::Keypair,
  25. solana_metrics::datapoint_info,
  26. solana_net_utils::SocketAddrSpace,
  27. solana_pubkey::Pubkey,
  28. solana_rpc_client::rpc_client::RpcClient,
  29. solana_signer::Signer,
  30. solana_vote_program::vote_state::VoteStateV4,
  31. std::{
  32. collections::{hash_map::RandomState, HashMap, HashSet},
  33. net::{SocketAddr, TcpListener, TcpStream, UdpSocket},
  34. path::Path,
  35. process::exit,
  36. sync::{
  37. atomic::{AtomicBool, Ordering},
  38. Arc, RwLock,
  39. },
  40. time::{Duration, Instant},
  41. },
  42. thiserror::Error,
  43. };
  44. /// When downloading snapshots, wait at most this long for snapshot hashes from
  45. /// _all_ known validators. Afterwards, wait for snapshot hashes from _any_
  46. /// known validator.
  47. const WAIT_FOR_ALL_KNOWN_VALIDATORS: Duration = Duration::from_secs(60);
  48. /// If we don't have any alternative peers after this long, better off trying
  49. /// blacklisted peers again.
  50. const BLACKLIST_CLEAR_THRESHOLD: Duration = Duration::from_secs(60);
  51. /// If we can't find a good snapshot download candidate after this time, just
  52. /// give up.
  53. const NEWER_SNAPSHOT_THRESHOLD: Duration = Duration::from_secs(180);
  54. /// If we haven't found any RPC peers after this time, just give up.
  55. const GET_RPC_PEERS_TIMEOUT: Duration = Duration::from_secs(300);
  56. pub const MAX_RPC_CONNECTIONS_EVALUATED_PER_ITERATION: usize = 32;
  57. pub const PING_TIMEOUT: Duration = Duration::from_secs(2);
  58. #[derive(Debug, PartialEq, Clone)]
  59. pub struct RpcBootstrapConfig {
  60. pub no_genesis_fetch: bool,
  61. pub no_snapshot_fetch: bool,
  62. pub only_known_rpc: bool,
  63. pub max_genesis_archive_unpacked_size: u64,
  64. pub check_vote_account: Option<String>,
  65. pub incremental_snapshot_fetch: bool,
  66. }
  67. fn verify_reachable_ports(
  68. node: &Node,
  69. cluster_entrypoint: &ContactInfo,
  70. validator_config: &ValidatorConfig,
  71. socket_addr_space: &SocketAddrSpace,
  72. ) -> bool {
  73. let verify_address = |addr: &Option<SocketAddr>| -> bool {
  74. addr.as_ref()
  75. .map(|addr| socket_addr_space.check(addr))
  76. .unwrap_or_default()
  77. };
  78. let mut udp_sockets = vec![&node.sockets.repair];
  79. udp_sockets.extend(node.sockets.gossip.iter());
  80. if verify_address(&node.info.serve_repair(Protocol::UDP)) {
  81. udp_sockets.push(&node.sockets.serve_repair);
  82. }
  83. if verify_address(&node.info.tpu(Protocol::UDP)) {
  84. udp_sockets.extend(node.sockets.tpu.iter());
  85. udp_sockets.extend(&node.sockets.tpu_quic);
  86. }
  87. if verify_address(&node.info.tpu_forwards(Protocol::UDP)) {
  88. udp_sockets.extend(node.sockets.tpu_forwards.iter());
  89. udp_sockets.extend(&node.sockets.tpu_forwards_quic);
  90. }
  91. if verify_address(&node.info.tpu_vote(Protocol::UDP)) {
  92. udp_sockets.extend(node.sockets.tpu_vote.iter());
  93. }
  94. if verify_address(&node.info.tvu(Protocol::UDP)) {
  95. udp_sockets.extend(node.sockets.tvu.iter());
  96. udp_sockets.extend(node.sockets.broadcast.iter());
  97. udp_sockets.extend(node.sockets.retransmit_sockets.iter());
  98. }
  99. if !solana_net_utils::verify_all_reachable_udp(
  100. &cluster_entrypoint.gossip().unwrap(),
  101. &udp_sockets,
  102. ) {
  103. return false;
  104. }
  105. let mut tcp_listeners = vec![];
  106. if let Some((rpc_addr, rpc_pubsub_addr)) = validator_config.rpc_addrs {
  107. for (purpose, bind_addr, public_addr) in &[
  108. ("RPC", rpc_addr, node.info.rpc()),
  109. ("RPC pubsub", rpc_pubsub_addr, node.info.rpc_pubsub()),
  110. ] {
  111. if verify_address(public_addr) {
  112. tcp_listeners.push(TcpListener::bind(bind_addr).unwrap_or_else(|err| {
  113. error!("Unable to bind to tcp {bind_addr:?} for {purpose}: {err}");
  114. exit(1);
  115. }));
  116. }
  117. }
  118. }
  119. if let Some(ip_echo) = &node.sockets.ip_echo {
  120. let ip_echo = ip_echo.try_clone().expect("unable to clone tcp_listener");
  121. tcp_listeners.push(ip_echo);
  122. }
  123. solana_net_utils::verify_all_reachable_tcp(&cluster_entrypoint.gossip().unwrap(), tcp_listeners)
  124. }
  125. fn is_known_validator(id: &Pubkey, known_validators: &Option<HashSet<Pubkey>>) -> bool {
  126. if let Some(known_validators) = known_validators {
  127. known_validators.contains(id)
  128. } else {
  129. false
  130. }
  131. }
  132. fn start_gossip_node(
  133. identity_keypair: Arc<Keypair>,
  134. cluster_entrypoints: &[ContactInfo],
  135. ledger_path: &Path,
  136. gossip_addr: &SocketAddr,
  137. gossip_sockets: Arc<[UdpSocket]>,
  138. expected_shred_version: u16,
  139. gossip_validators: Option<HashSet<Pubkey>>,
  140. should_check_duplicate_instance: bool,
  141. socket_addr_space: SocketAddrSpace,
  142. ) -> (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService) {
  143. let contact_info = ClusterInfo::gossip_contact_info(
  144. identity_keypair.pubkey(),
  145. *gossip_addr,
  146. expected_shred_version,
  147. );
  148. let mut cluster_info = ClusterInfo::new(contact_info, identity_keypair, socket_addr_space);
  149. cluster_info.set_entrypoints(cluster_entrypoints.to_vec());
  150. cluster_info.restore_contact_info(ledger_path, 0);
  151. let cluster_info = Arc::new(cluster_info);
  152. let gossip_exit_flag = Arc::new(AtomicBool::new(false));
  153. let gossip_service = GossipService::new(
  154. &cluster_info,
  155. None,
  156. gossip_sockets,
  157. gossip_validators,
  158. should_check_duplicate_instance,
  159. None,
  160. gossip_exit_flag.clone(),
  161. );
  162. (cluster_info, gossip_exit_flag, gossip_service)
  163. }
  164. fn get_rpc_peers(
  165. cluster_info: &ClusterInfo,
  166. validator_config: &ValidatorConfig,
  167. blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
  168. blacklist_timeout: &Instant,
  169. retry_reason: &mut Option<String>,
  170. bootstrap_config: &RpcBootstrapConfig,
  171. ) -> Vec<ContactInfo> {
  172. let shred_version = validator_config
  173. .expected_shred_version
  174. .unwrap_or_else(|| cluster_info.my_shred_version());
  175. info!(
  176. "Searching for an RPC service with shred version {shred_version}{}...",
  177. retry_reason
  178. .as_ref()
  179. .map(|s| format!(" (Retrying: {s})"))
  180. .unwrap_or_default()
  181. );
  182. let mut rpc_peers = cluster_info.rpc_peers();
  183. if bootstrap_config.only_known_rpc {
  184. rpc_peers.retain(|rpc_peer| {
  185. is_known_validator(rpc_peer.pubkey(), &validator_config.known_validators)
  186. });
  187. }
  188. let rpc_peers_total = rpc_peers.len();
  189. // Filter out blacklisted nodes
  190. let rpc_peers: Vec<_> = rpc_peers
  191. .into_iter()
  192. .filter(|rpc_peer| !blacklisted_rpc_nodes.contains(rpc_peer.pubkey()))
  193. .collect();
  194. let rpc_peers_blacklisted = rpc_peers_total - rpc_peers.len();
  195. let rpc_known_peers = rpc_peers
  196. .iter()
  197. .filter(|rpc_peer| {
  198. is_known_validator(rpc_peer.pubkey(), &validator_config.known_validators)
  199. })
  200. .count();
  201. info!(
  202. "Total {rpc_peers_total} RPC nodes found. {rpc_known_peers} known, \
  203. {rpc_peers_blacklisted} blacklisted"
  204. );
  205. if rpc_peers_blacklisted == rpc_peers_total {
  206. *retry_reason = if !blacklisted_rpc_nodes.is_empty()
  207. && blacklist_timeout.elapsed() > BLACKLIST_CLEAR_THRESHOLD
  208. {
  209. // All nodes are blacklisted and no additional nodes recently discovered.
  210. // Remove all nodes from the blacklist and try them again.
  211. blacklisted_rpc_nodes.clear();
  212. Some("Blacklist timeout expired".to_owned())
  213. } else {
  214. Some("Wait for known rpc peers".to_owned())
  215. };
  216. return vec![];
  217. }
  218. rpc_peers
  219. }
  220. fn check_vote_account(
  221. rpc_client: &RpcClient,
  222. identity_pubkey: &Pubkey,
  223. vote_account_address: &Pubkey,
  224. authorized_voter_pubkeys: &[Pubkey],
  225. ) -> Result<(), String> {
  226. let vote_account = rpc_client
  227. .get_account_with_commitment(vote_account_address, CommitmentConfig::confirmed())
  228. .map_err(|err| format!("failed to fetch vote account: {err}"))?
  229. .value
  230. .ok_or_else(|| format!("vote account does not exist: {vote_account_address}"))?;
  231. if vote_account.owner != solana_vote_program::id() {
  232. return Err(format!(
  233. "not a vote account (owned by {}): {}",
  234. vote_account.owner, vote_account_address
  235. ));
  236. }
  237. let identity_account = rpc_client
  238. .get_account_with_commitment(identity_pubkey, CommitmentConfig::confirmed())
  239. .map_err(|err| format!("failed to fetch identity account: {err}"))?
  240. .value
  241. .ok_or_else(|| format!("identity account does not exist: {identity_pubkey}"))?;
  242. let vote_state = VoteStateV4::deserialize(vote_account.data(), vote_account_address).ok();
  243. if let Some(vote_state) = vote_state {
  244. if vote_state.authorized_voters.is_empty() {
  245. return Err("Vote account not yet initialized".to_string());
  246. }
  247. if vote_state.node_pubkey != *identity_pubkey {
  248. return Err(format!(
  249. "vote account's identity ({}) does not match the validator's identity {}).",
  250. vote_state.node_pubkey, identity_pubkey
  251. ));
  252. }
  253. for (_, vote_account_authorized_voter_pubkey) in vote_state.authorized_voters.iter() {
  254. if !authorized_voter_pubkeys.contains(vote_account_authorized_voter_pubkey) {
  255. return Err(format!(
  256. "authorized voter {vote_account_authorized_voter_pubkey} not available"
  257. ));
  258. }
  259. }
  260. } else {
  261. return Err(format!(
  262. "invalid vote account data for {vote_account_address}"
  263. ));
  264. }
  265. // Maybe we can calculate minimum voting fee; rather than 1 lamport
  266. if identity_account.lamports <= 1 {
  267. return Err(format!(
  268. "underfunded identity account ({}): only {} lamports available",
  269. identity_pubkey, identity_account.lamports
  270. ));
  271. }
  272. Ok(())
  273. }
  274. #[derive(Error, Debug)]
  275. pub enum GetRpcNodeError {
  276. #[error("Unable to find any RPC peers")]
  277. NoRpcPeersFound,
  278. #[error("Giving up, did not get newer snapshots from the cluster")]
  279. NoNewerSnapshots,
  280. }
  281. /// Struct to wrap the return value from get_rpc_nodes(). The `rpc_contact_info` is the peer to
  282. /// download from, and `snapshot_hash` is the (optional) full and (optional) incremental
  283. /// snapshots to download.
  284. #[derive(Debug)]
  285. struct GetRpcNodeResult {
  286. rpc_contact_info: ContactInfo,
  287. snapshot_hash: Option<SnapshotHash>,
  288. }
  289. /// Struct to wrap the peers & snapshot hashes together.
  290. #[derive(Debug, PartialEq, Eq, Clone)]
  291. struct PeerSnapshotHash {
  292. rpc_contact_info: ContactInfo,
  293. snapshot_hash: SnapshotHash,
  294. }
  295. /// A snapshot hash. In this context (bootstrap *with* incremental snapshots), a snapshot hash
  296. /// is _both_ a full snapshot hash and an (optional) incremental snapshot hash.
  297. #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
  298. pub struct SnapshotHash {
  299. full: (Slot, Hash),
  300. incr: Option<(Slot, Hash)>,
  301. }
  302. pub fn fail_rpc_node(
  303. err: String,
  304. known_validators: &Option<HashSet<Pubkey, RandomState>>,
  305. rpc_id: &Pubkey,
  306. blacklisted_rpc_nodes: &mut HashSet<Pubkey, RandomState>,
  307. ) {
  308. warn!("{err}");
  309. if let Some(known_validators) = known_validators {
  310. if known_validators.contains(rpc_id) {
  311. return;
  312. }
  313. }
  314. info!("Excluding {rpc_id} as a future RPC candidate");
  315. blacklisted_rpc_nodes.insert(*rpc_id);
  316. }
  317. fn shutdown_gossip_service(gossip: (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService)) {
  318. let (cluster_info, gossip_exit_flag, gossip_service) = gossip;
  319. cluster_info.save_contact_info();
  320. gossip_exit_flag.store(true, Ordering::Relaxed);
  321. gossip_service.join().unwrap();
  322. }
  323. #[allow(clippy::too_many_arguments)]
  324. pub fn attempt_download_genesis_and_snapshot(
  325. rpc_contact_info: &ContactInfo,
  326. ledger_path: &Path,
  327. validator_config: &mut ValidatorConfig,
  328. bootstrap_config: &RpcBootstrapConfig,
  329. use_progress_bar: bool,
  330. gossip: &mut Option<(Arc<ClusterInfo>, Arc<AtomicBool>, GossipService)>,
  331. rpc_client: &RpcClient,
  332. maximum_local_snapshot_age: Slot,
  333. start_progress: &Arc<RwLock<ValidatorStartProgress>>,
  334. minimal_snapshot_download_speed: f32,
  335. maximum_snapshot_download_abort: u64,
  336. download_abort_count: &mut u64,
  337. snapshot_hash: Option<SnapshotHash>,
  338. identity_keypair: &Arc<Keypair>,
  339. vote_account: &Pubkey,
  340. authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
  341. ) -> Result<(), String> {
  342. download_then_check_genesis_hash(
  343. &rpc_contact_info
  344. .rpc()
  345. .ok_or_else(|| String::from("Invalid RPC address"))?,
  346. ledger_path,
  347. &mut validator_config.expected_genesis_hash,
  348. bootstrap_config.max_genesis_archive_unpacked_size,
  349. bootstrap_config.no_genesis_fetch,
  350. use_progress_bar,
  351. rpc_client,
  352. )?;
  353. if let Some(gossip) = gossip.take() {
  354. shutdown_gossip_service(gossip);
  355. }
  356. let rpc_client_slot = rpc_client
  357. .get_slot_with_commitment(CommitmentConfig::finalized())
  358. .map_err(|err| format!("Failed to get RPC node slot: {err}"))?;
  359. info!("RPC node root slot: {rpc_client_slot}");
  360. download_snapshots(
  361. validator_config,
  362. bootstrap_config,
  363. use_progress_bar,
  364. maximum_local_snapshot_age,
  365. start_progress,
  366. minimal_snapshot_download_speed,
  367. maximum_snapshot_download_abort,
  368. download_abort_count,
  369. snapshot_hash,
  370. rpc_contact_info,
  371. )?;
  372. if let Some(url) = bootstrap_config.check_vote_account.as_ref() {
  373. let rpc_client = RpcClient::new(url);
  374. check_vote_account(
  375. &rpc_client,
  376. &identity_keypair.pubkey(),
  377. vote_account,
  378. &authorized_voter_keypairs
  379. .read()
  380. .unwrap()
  381. .iter()
  382. .map(|k| k.pubkey())
  383. .collect::<Vec<_>>(),
  384. )
  385. .unwrap_or_else(|err| {
  386. // Consider failures here to be more likely due to user error (eg,
  387. // incorrect `agave-validator` command-line arguments) rather than the
  388. // RPC node failing.
  389. //
  390. // Power users can always use the `--no-check-vote-account` option to
  391. // bypass this check entirely
  392. error!("{err}");
  393. exit(1);
  394. });
  395. }
  396. Ok(())
  397. }
  398. /// simple ping helper function which returns the time to connect
  399. fn ping(addr: &SocketAddr) -> Option<Duration> {
  400. let start = Instant::now();
  401. match TcpStream::connect_timeout(addr, PING_TIMEOUT) {
  402. Ok(_) => Some(start.elapsed()),
  403. Err(_) => None,
  404. }
  405. }
  406. // Populates `vetted_rpc_nodes` with a list of RPC nodes that are ready to be
  407. // used for downloading latest snapshots and/or the genesis block. Guaranteed to
  408. // find at least one viable node or terminate the process.
  409. fn get_vetted_rpc_nodes(
  410. vetted_rpc_nodes: &mut Vec<(ContactInfo, Option<SnapshotHash>, RpcClient)>,
  411. cluster_info: &Arc<ClusterInfo>,
  412. validator_config: &ValidatorConfig,
  413. blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
  414. bootstrap_config: &RpcBootstrapConfig,
  415. ) {
  416. while vetted_rpc_nodes.is_empty() {
  417. let rpc_node_details = match get_rpc_nodes(
  418. cluster_info,
  419. validator_config,
  420. blacklisted_rpc_nodes,
  421. bootstrap_config,
  422. ) {
  423. Ok(rpc_node_details) => rpc_node_details,
  424. Err(err) => {
  425. error!(
  426. "Failed to get RPC nodes: {err}. Consider checking system clock, removing \
  427. `--no-port-check`, or adjusting `--known-validator ...` arguments as \
  428. applicable"
  429. );
  430. exit(1);
  431. }
  432. };
  433. let newly_blacklisted_rpc_nodes = RwLock::new(HashSet::new());
  434. vetted_rpc_nodes.extend(
  435. rpc_node_details
  436. .into_par_iter()
  437. .filter_map(|rpc_node_details| {
  438. let GetRpcNodeResult {
  439. rpc_contact_info,
  440. snapshot_hash,
  441. } = rpc_node_details;
  442. info!(
  443. "Using RPC service from node {}: {:?}",
  444. rpc_contact_info.pubkey(),
  445. rpc_contact_info.rpc()
  446. );
  447. let rpc_addr = rpc_contact_info.rpc()?;
  448. let ping_time = ping(&rpc_addr);
  449. let rpc_client =
  450. RpcClient::new_socket_with_timeout(rpc_addr, Duration::from_secs(5));
  451. Some((rpc_contact_info, snapshot_hash, rpc_client, ping_time))
  452. })
  453. .filter(
  454. |(rpc_contact_info, _snapshot_hash, rpc_client, ping_time)| match rpc_client
  455. .get_version()
  456. {
  457. Ok(rpc_version) => {
  458. if let Some(ping_time) = ping_time {
  459. info!(
  460. "RPC node version: {} Ping: {}ms",
  461. rpc_version.solana_core,
  462. ping_time.as_millis()
  463. );
  464. true
  465. } else {
  466. fail_rpc_node(
  467. "Failed to ping RPC".to_string(),
  468. &validator_config.known_validators,
  469. rpc_contact_info.pubkey(),
  470. &mut newly_blacklisted_rpc_nodes.write().unwrap(),
  471. );
  472. false
  473. }
  474. }
  475. Err(err) => {
  476. fail_rpc_node(
  477. format!("Failed to get RPC node version: {err}"),
  478. &validator_config.known_validators,
  479. rpc_contact_info.pubkey(),
  480. &mut newly_blacklisted_rpc_nodes.write().unwrap(),
  481. );
  482. false
  483. }
  484. },
  485. )
  486. .collect::<Vec<(
  487. ContactInfo,
  488. Option<SnapshotHash>,
  489. RpcClient,
  490. Option<Duration>,
  491. )>>()
  492. .into_iter()
  493. .sorted_by_key(|(_, _, _, ping_time)| ping_time.unwrap())
  494. .map(|(rpc_contact_info, snapshot_hash, rpc_client, _)| {
  495. (rpc_contact_info, snapshot_hash, rpc_client)
  496. })
  497. .collect::<Vec<(ContactInfo, Option<SnapshotHash>, RpcClient)>>(),
  498. );
  499. blacklisted_rpc_nodes.extend(newly_blacklisted_rpc_nodes.into_inner().unwrap());
  500. }
  501. }
  502. #[allow(clippy::too_many_arguments)]
  503. pub fn rpc_bootstrap(
  504. node: &Node,
  505. identity_keypair: &Arc<Keypair>,
  506. ledger_path: &Path,
  507. vote_account: &Pubkey,
  508. authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
  509. cluster_entrypoints: &[ContactInfo],
  510. validator_config: &mut ValidatorConfig,
  511. bootstrap_config: RpcBootstrapConfig,
  512. do_port_check: bool,
  513. use_progress_bar: bool,
  514. maximum_local_snapshot_age: Slot,
  515. should_check_duplicate_instance: bool,
  516. start_progress: &Arc<RwLock<ValidatorStartProgress>>,
  517. minimal_snapshot_download_speed: f32,
  518. maximum_snapshot_download_abort: u64,
  519. socket_addr_space: SocketAddrSpace,
  520. ) {
  521. if do_port_check {
  522. let mut order: Vec<_> = (0..cluster_entrypoints.len()).collect();
  523. order.shuffle(&mut thread_rng());
  524. if order.into_iter().all(|i| {
  525. !verify_reachable_ports(
  526. node,
  527. &cluster_entrypoints[i],
  528. validator_config,
  529. &socket_addr_space,
  530. )
  531. }) {
  532. exit(1);
  533. }
  534. }
  535. if bootstrap_config.no_genesis_fetch && bootstrap_config.no_snapshot_fetch {
  536. return;
  537. }
  538. let total_snapshot_download_time = Instant::now();
  539. let mut get_rpc_nodes_time = Duration::new(0, 0);
  540. let mut snapshot_download_time = Duration::new(0, 0);
  541. let mut blacklisted_rpc_nodes = HashSet::new();
  542. let mut gossip = None;
  543. let mut vetted_rpc_nodes = vec![];
  544. let mut download_abort_count = 0;
  545. loop {
  546. if gossip.is_none() {
  547. *start_progress.write().unwrap() = ValidatorStartProgress::SearchingForRpcService;
  548. gossip = Some(start_gossip_node(
  549. identity_keypair.clone(),
  550. cluster_entrypoints,
  551. ledger_path,
  552. &node
  553. .info
  554. .gossip()
  555. .expect("Operator must spin up node with valid gossip address"),
  556. node.sockets.gossip.clone(),
  557. validator_config
  558. .expected_shred_version
  559. .expect("expected_shred_version should not be None"),
  560. validator_config.gossip_validators.clone(),
  561. should_check_duplicate_instance,
  562. socket_addr_space,
  563. ));
  564. }
  565. let get_rpc_nodes_start = Instant::now();
  566. get_vetted_rpc_nodes(
  567. &mut vetted_rpc_nodes,
  568. &gossip.as_ref().unwrap().0,
  569. validator_config,
  570. &mut blacklisted_rpc_nodes,
  571. &bootstrap_config,
  572. );
  573. let (rpc_contact_info, snapshot_hash, rpc_client) = vetted_rpc_nodes.pop().unwrap();
  574. get_rpc_nodes_time += get_rpc_nodes_start.elapsed();
  575. let snapshot_download_start = Instant::now();
  576. let download_result = attempt_download_genesis_and_snapshot(
  577. &rpc_contact_info,
  578. ledger_path,
  579. validator_config,
  580. &bootstrap_config,
  581. use_progress_bar,
  582. &mut gossip,
  583. &rpc_client,
  584. maximum_local_snapshot_age,
  585. start_progress,
  586. minimal_snapshot_download_speed,
  587. maximum_snapshot_download_abort,
  588. &mut download_abort_count,
  589. snapshot_hash,
  590. identity_keypair,
  591. vote_account,
  592. authorized_voter_keypairs.clone(),
  593. );
  594. snapshot_download_time += snapshot_download_start.elapsed();
  595. match download_result {
  596. Ok(()) => break,
  597. Err(err) => {
  598. fail_rpc_node(
  599. err,
  600. &validator_config.known_validators,
  601. rpc_contact_info.pubkey(),
  602. &mut blacklisted_rpc_nodes,
  603. );
  604. }
  605. }
  606. }
  607. if let Some(gossip) = gossip.take() {
  608. shutdown_gossip_service(gossip);
  609. }
  610. datapoint_info!(
  611. "bootstrap-snapshot-download",
  612. (
  613. "total_time_secs",
  614. total_snapshot_download_time.elapsed().as_secs(),
  615. i64
  616. ),
  617. ("get_rpc_nodes_time_secs", get_rpc_nodes_time.as_secs(), i64),
  618. (
  619. "snapshot_download_time_secs",
  620. snapshot_download_time.as_secs(),
  621. i64
  622. ),
  623. ("download_abort_count", download_abort_count, i64),
  624. ("blacklisted_nodes_count", blacklisted_rpc_nodes.len(), i64),
  625. );
  626. }
  627. /// Get RPC peer node candidates to download from.
  628. ///
  629. /// This function finds the highest compatible snapshots from the cluster and returns RPC peers.
  630. fn get_rpc_nodes(
  631. cluster_info: &ClusterInfo,
  632. validator_config: &ValidatorConfig,
  633. blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
  634. bootstrap_config: &RpcBootstrapConfig,
  635. ) -> Result<Vec<GetRpcNodeResult>, GetRpcNodeError> {
  636. let mut blacklist_timeout = Instant::now();
  637. let mut get_rpc_peers_timout = Instant::now();
  638. let mut newer_cluster_snapshot_timeout = None;
  639. let mut retry_reason = None;
  640. loop {
  641. // Give gossip some time to populate and not spin on grabbing the crds lock
  642. std::thread::sleep(Duration::from_secs(1));
  643. info!("\n{}", cluster_info.rpc_info_trace());
  644. let rpc_peers = get_rpc_peers(
  645. cluster_info,
  646. validator_config,
  647. blacklisted_rpc_nodes,
  648. &blacklist_timeout,
  649. &mut retry_reason,
  650. bootstrap_config,
  651. );
  652. if rpc_peers.is_empty() {
  653. if get_rpc_peers_timout.elapsed() > GET_RPC_PEERS_TIMEOUT {
  654. return Err(GetRpcNodeError::NoRpcPeersFound);
  655. }
  656. continue;
  657. }
  658. // Reset timeouts if we found any viable RPC peers.
  659. blacklist_timeout = Instant::now();
  660. get_rpc_peers_timout = Instant::now();
  661. if bootstrap_config.no_snapshot_fetch {
  662. let random_peer = &rpc_peers[thread_rng().gen_range(0..rpc_peers.len())];
  663. return Ok(vec![GetRpcNodeResult {
  664. rpc_contact_info: random_peer.clone(),
  665. snapshot_hash: None,
  666. }]);
  667. }
  668. let known_validators_to_wait_for = if newer_cluster_snapshot_timeout
  669. .as_ref()
  670. .map(|timer: &Instant| timer.elapsed() < WAIT_FOR_ALL_KNOWN_VALIDATORS)
  671. .unwrap_or(true)
  672. {
  673. KnownValidatorsToWaitFor::All
  674. } else {
  675. KnownValidatorsToWaitFor::Any
  676. };
  677. let peer_snapshot_hashes = get_peer_snapshot_hashes(
  678. cluster_info,
  679. &rpc_peers,
  680. validator_config.known_validators.as_ref(),
  681. known_validators_to_wait_for,
  682. bootstrap_config.incremental_snapshot_fetch,
  683. );
  684. if peer_snapshot_hashes.is_empty() {
  685. match newer_cluster_snapshot_timeout {
  686. None => newer_cluster_snapshot_timeout = Some(Instant::now()),
  687. Some(newer_cluster_snapshot_timeout) => {
  688. if newer_cluster_snapshot_timeout.elapsed() > NEWER_SNAPSHOT_THRESHOLD {
  689. return Err(GetRpcNodeError::NoNewerSnapshots);
  690. }
  691. }
  692. }
  693. retry_reason = Some("No snapshots available".to_owned());
  694. continue;
  695. } else {
  696. let rpc_peers = peer_snapshot_hashes
  697. .iter()
  698. .map(|peer_snapshot_hash| peer_snapshot_hash.rpc_contact_info.pubkey())
  699. .collect::<Vec<_>>();
  700. let final_snapshot_hash = peer_snapshot_hashes[0].snapshot_hash;
  701. info!(
  702. "Highest available snapshot slot is {}, available from {} node{}: {:?}",
  703. final_snapshot_hash
  704. .incr
  705. .map(|(slot, _hash)| slot)
  706. .unwrap_or(final_snapshot_hash.full.0),
  707. rpc_peers.len(),
  708. if rpc_peers.len() > 1 { "s" } else { "" },
  709. rpc_peers,
  710. );
  711. let rpc_node_results = peer_snapshot_hashes
  712. .iter()
  713. .map(|peer_snapshot_hash| GetRpcNodeResult {
  714. rpc_contact_info: peer_snapshot_hash.rpc_contact_info.clone(),
  715. snapshot_hash: Some(peer_snapshot_hash.snapshot_hash),
  716. })
  717. .take(MAX_RPC_CONNECTIONS_EVALUATED_PER_ITERATION)
  718. .collect();
  719. return Ok(rpc_node_results);
  720. }
  721. }
  722. }
  723. /// Get the Slot and Hash of the local snapshot with the highest slot. Can be either a full
  724. /// snapshot or an incremental snapshot.
  725. fn get_highest_local_snapshot_hash(
  726. full_snapshot_archives_dir: impl AsRef<Path>,
  727. incremental_snapshot_archives_dir: impl AsRef<Path>,
  728. incremental_snapshot_fetch: bool,
  729. ) -> Option<(Slot, Hash)> {
  730. snapshot_paths::get_highest_full_snapshot_archive_info(full_snapshot_archives_dir)
  731. .and_then(|full_snapshot_info| {
  732. if incremental_snapshot_fetch {
  733. snapshot_paths::get_highest_incremental_snapshot_archive_info(
  734. incremental_snapshot_archives_dir,
  735. full_snapshot_info.slot(),
  736. )
  737. .map(|incremental_snapshot_info| {
  738. (
  739. incremental_snapshot_info.slot(),
  740. *incremental_snapshot_info.hash(),
  741. )
  742. })
  743. } else {
  744. None
  745. }
  746. .or_else(|| Some((full_snapshot_info.slot(), *full_snapshot_info.hash())))
  747. })
  748. .map(|(slot, snapshot_hash)| (slot, snapshot_hash.0))
  749. }
  750. /// Get peer snapshot hashes
  751. ///
  752. /// The result is a vector of peers with snapshot hashes that:
  753. /// 1. match a snapshot hash from the known validators
  754. /// 2. have the highest incremental snapshot slot
  755. /// 3. have the highest full snapshot slot of (2)
  756. fn get_peer_snapshot_hashes(
  757. cluster_info: &ClusterInfo,
  758. rpc_peers: &[ContactInfo],
  759. known_validators: Option<&HashSet<Pubkey>>,
  760. known_validators_to_wait_for: KnownValidatorsToWaitFor,
  761. incremental_snapshot_fetch: bool,
  762. ) -> Vec<PeerSnapshotHash> {
  763. let mut peer_snapshot_hashes = get_eligible_peer_snapshot_hashes(cluster_info, rpc_peers);
  764. if let Some(known_validators) = known_validators {
  765. let known_snapshot_hashes = get_snapshot_hashes_from_known_validators(
  766. cluster_info,
  767. known_validators,
  768. known_validators_to_wait_for,
  769. );
  770. retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
  771. &known_snapshot_hashes,
  772. &mut peer_snapshot_hashes,
  773. );
  774. }
  775. if incremental_snapshot_fetch {
  776. // Only filter by highest incremental snapshot slot if we're actually going to download an
  777. // incremental snapshot. Otherwise this could remove higher full snapshot slots from
  778. // being selected. For example, if there are two peer snapshot hashes:
  779. // (A) full snapshot slot: 100, incremental snapshot slot: 160
  780. // (B) full snapshot slot: 150, incremental snapshot slot: None
  781. // Then (A) has the highest overall snapshot slot. But if we're not downloading and
  782. // incremental snapshot, (B) should be selected since it's full snapshot of 150 is highest.
  783. retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(
  784. &mut peer_snapshot_hashes,
  785. );
  786. }
  787. retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut peer_snapshot_hashes);
  788. peer_snapshot_hashes
  789. }
  790. /// Map full snapshot hashes to a set of incremental snapshot hashes. Each full snapshot hash
  791. /// is treated as the base for its set of incremental snapshot hashes.
  792. type KnownSnapshotHashes = HashMap<(Slot, Hash), HashSet<(Slot, Hash)>>;
  793. /// Get the snapshot hashes from known validators.
  794. ///
  795. /// The snapshot hashes are put into a map from full snapshot hash to a set of incremental
  796. /// snapshot hashes. This map will be used as the "known snapshot hashes"; when peers are
  797. /// queried for their individual snapshot hashes, their results will be checked against this
  798. /// map to verify correctness.
  799. ///
  800. /// NOTE: Only a single snashot hash is allowed per slot. If somehow two known validators have
  801. /// a snapshot hash with the same slot and _different_ hashes, the second will be skipped.
  802. /// This applies to both full and incremental snapshot hashes.
  803. fn get_snapshot_hashes_from_known_validators(
  804. cluster_info: &ClusterInfo,
  805. known_validators: &HashSet<Pubkey>,
  806. known_validators_to_wait_for: KnownValidatorsToWaitFor,
  807. ) -> KnownSnapshotHashes {
  808. // Get the snapshot hashes for a node from CRDS
  809. let get_snapshot_hashes_for_node = |node| get_snapshot_hashes_for_node(cluster_info, node);
  810. if !do_known_validators_have_all_snapshot_hashes(
  811. known_validators,
  812. known_validators_to_wait_for,
  813. get_snapshot_hashes_for_node,
  814. ) {
  815. debug!(
  816. "Snapshot hashes have not been discovered from known validators. This likely means \
  817. the gossip tables are not fully populated. We will sleep and retry..."
  818. );
  819. return KnownSnapshotHashes::default();
  820. }
  821. build_known_snapshot_hashes(known_validators, get_snapshot_hashes_for_node)
  822. }
  823. /// Check if we can discover snapshot hashes for the known validators.
  824. ///
  825. /// This is a heuristic to ensure the gossip tables are populated enough so that the bootstrap
  826. /// process will download snapshots.
  827. ///
  828. /// This function will return false if we do not yet have snapshot hashes from known validators;
  829. /// and true otherwise. Either require snapshot hashes from *all* or *any* of the known validators
  830. /// based on the `KnownValidatorsToWaitFor` parameter.
  831. fn do_known_validators_have_all_snapshot_hashes<'a>(
  832. known_validators: impl IntoIterator<Item = &'a Pubkey>,
  833. known_validators_to_wait_for: KnownValidatorsToWaitFor,
  834. get_snapshot_hashes_for_node: impl Fn(&'a Pubkey) -> Option<SnapshotHash>,
  835. ) -> bool {
  836. let node_has_snapshot_hashes = |node| get_snapshot_hashes_for_node(node).is_some();
  837. match known_validators_to_wait_for {
  838. KnownValidatorsToWaitFor::All => known_validators.into_iter().all(node_has_snapshot_hashes),
  839. KnownValidatorsToWaitFor::Any => known_validators.into_iter().any(node_has_snapshot_hashes),
  840. }
  841. }
  842. /// When waiting for snapshot hashes from the known validators, should we wait for *all* or *any*
  843. /// of them?
  844. #[derive(Debug, Copy, Clone, Eq, PartialEq)]
  845. enum KnownValidatorsToWaitFor {
  846. All,
  847. Any,
  848. }
  849. /// Build the known snapshot hashes from a set of nodes.
  850. ///
  851. /// The `get_snapshot_hashes_for_node` parameter is a function that map a pubkey to its snapshot
  852. /// hashes. This parameter exist to provide a way to test the inner algorithm without needing
  853. /// runtime information such as the ClusterInfo or ValidatorConfig.
  854. fn build_known_snapshot_hashes<'a>(
  855. nodes: impl IntoIterator<Item = &'a Pubkey>,
  856. get_snapshot_hashes_for_node: impl Fn(&'a Pubkey) -> Option<SnapshotHash>,
  857. ) -> KnownSnapshotHashes {
  858. let mut known_snapshot_hashes = KnownSnapshotHashes::new();
  859. /// Check to see if there exists another snapshot hash in the haystack with the *same* slot
  860. /// but *different* hash as the needle.
  861. fn is_any_same_slot_and_different_hash<'a>(
  862. needle: &(Slot, Hash),
  863. haystack: impl IntoIterator<Item = &'a (Slot, Hash)>,
  864. ) -> bool {
  865. haystack
  866. .into_iter()
  867. .any(|hay| needle.0 == hay.0 && needle.1 != hay.1)
  868. }
  869. 'to_next_node: for node in nodes {
  870. let Some(SnapshotHash {
  871. full: full_snapshot_hash,
  872. incr: incremental_snapshot_hash,
  873. }) = get_snapshot_hashes_for_node(node)
  874. else {
  875. continue 'to_next_node;
  876. };
  877. // Do not add this snapshot hash if there's already a full snapshot hash with the
  878. // same slot but with a _different_ hash.
  879. // NOTE: Nodes should not produce snapshots at the same slot with _different_
  880. // hashes. So if it happens, keep the first and ignore the rest.
  881. if is_any_same_slot_and_different_hash(&full_snapshot_hash, known_snapshot_hashes.keys()) {
  882. warn!(
  883. "Ignoring all snapshot hashes from node {node} since we've seen a different full \
  884. snapshot hash with this slot. full snapshot hash: {full_snapshot_hash:?}"
  885. );
  886. debug!(
  887. "known full snapshot hashes: {:#?}",
  888. known_snapshot_hashes.keys(),
  889. );
  890. continue 'to_next_node;
  891. }
  892. // Insert a new full snapshot hash into the known snapshot hashes IFF an entry
  893. // doesn't already exist. This is to ensure we don't overwrite existing
  894. // incremental snapshot hashes that may be present for this full snapshot hash.
  895. let known_incremental_snapshot_hashes =
  896. known_snapshot_hashes.entry(full_snapshot_hash).or_default();
  897. if let Some(incremental_snapshot_hash) = incremental_snapshot_hash {
  898. // Do not add this snapshot hash if there's already an incremental snapshot
  899. // hash with the same slot, but with a _different_ hash.
  900. // NOTE: Nodes should not produce snapshots at the same slot with _different_
  901. // hashes. So if it happens, keep the first and ignore the rest.
  902. if is_any_same_slot_and_different_hash(
  903. &incremental_snapshot_hash,
  904. known_incremental_snapshot_hashes.iter(),
  905. ) {
  906. warn!(
  907. "Ignoring incremental snapshot hash from node {node} since we've seen a \
  908. different incremental snapshot hash with this slot. full snapshot hash: \
  909. {full_snapshot_hash:?}, incremental snapshot hash: \
  910. {incremental_snapshot_hash:?}"
  911. );
  912. debug!(
  913. "known incremental snapshot hashes based on this slot: {:#?}",
  914. known_incremental_snapshot_hashes.iter(),
  915. );
  916. continue 'to_next_node;
  917. }
  918. known_incremental_snapshot_hashes.insert(incremental_snapshot_hash);
  919. };
  920. }
  921. trace!("known snapshot hashes: {known_snapshot_hashes:?}");
  922. known_snapshot_hashes
  923. }
  924. /// Get snapshot hashes from all eligible peers.
  925. ///
  926. /// This fn will get only one snapshot hash per peer (the one with the highest slot).
  927. /// This may be just a full snapshot hash, or a combo full snapshot hash and
  928. /// incremental snapshot hash.
  929. fn get_eligible_peer_snapshot_hashes(
  930. cluster_info: &ClusterInfo,
  931. rpc_peers: &[ContactInfo],
  932. ) -> Vec<PeerSnapshotHash> {
  933. let peer_snapshot_hashes = rpc_peers
  934. .iter()
  935. .flat_map(|rpc_peer| {
  936. get_snapshot_hashes_for_node(cluster_info, rpc_peer.pubkey()).map(|snapshot_hash| {
  937. PeerSnapshotHash {
  938. rpc_contact_info: rpc_peer.clone(),
  939. snapshot_hash,
  940. }
  941. })
  942. })
  943. .collect();
  944. trace!("peer snapshot hashes: {peer_snapshot_hashes:?}");
  945. peer_snapshot_hashes
  946. }
  947. /// Retain the peer snapshot hashes that match a snapshot hash from the known snapshot hashes
  948. fn retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
  949. known_snapshot_hashes: &KnownSnapshotHashes,
  950. peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
  951. ) {
  952. peer_snapshot_hashes.retain(|peer_snapshot_hash| {
  953. known_snapshot_hashes
  954. .get(&peer_snapshot_hash.snapshot_hash.full)
  955. .map(|known_incremental_hashes| {
  956. if peer_snapshot_hash.snapshot_hash.incr.is_none() {
  957. // If the peer's full snapshot hashes match, but doesn't have any
  958. // incremental snapshots, that's fine; keep 'em!
  959. true
  960. } else {
  961. known_incremental_hashes
  962. .contains(peer_snapshot_hash.snapshot_hash.incr.as_ref().unwrap())
  963. }
  964. })
  965. .unwrap_or(false)
  966. });
  967. trace!(
  968. "retain peer snapshot hashes that match known snapshot hashes: {peer_snapshot_hashes:?}"
  969. );
  970. }
  971. /// Retain the peer snapshot hashes with the highest full snapshot slot
  972. fn retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(
  973. peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
  974. ) {
  975. let highest_full_snapshot_hash = peer_snapshot_hashes
  976. .iter()
  977. .map(|peer_snapshot_hash| peer_snapshot_hash.snapshot_hash.full)
  978. .max_by_key(|(slot, _hash)| *slot);
  979. let Some(highest_full_snapshot_hash) = highest_full_snapshot_hash else {
  980. // `max_by_key` will only be `None` IFF the input `peer_snapshot_hashes` is empty.
  981. // In that case there's nothing to do (additionally, without a valid 'max' value, there
  982. // will be nothing to compare against within the `retain()` predicate).
  983. return;
  984. };
  985. peer_snapshot_hashes.retain(|peer_snapshot_hash| {
  986. peer_snapshot_hash.snapshot_hash.full == highest_full_snapshot_hash
  987. });
  988. trace!("retain peer snapshot hashes with highest full snapshot slot: {peer_snapshot_hashes:?}");
  989. }
  990. /// Retain the peer snapshot hashes with the highest incremental snapshot slot
  991. fn retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(
  992. peer_snapshot_hashes: &mut Vec<PeerSnapshotHash>,
  993. ) {
  994. let highest_incremental_snapshot_hash = peer_snapshot_hashes
  995. .iter()
  996. .flat_map(|peer_snapshot_hash| peer_snapshot_hash.snapshot_hash.incr)
  997. .max_by_key(|(slot, _hash)| *slot);
  998. peer_snapshot_hashes.retain(|peer_snapshot_hash| {
  999. peer_snapshot_hash.snapshot_hash.incr == highest_incremental_snapshot_hash
  1000. });
  1001. trace!(
  1002. "retain peer snapshot hashes with highest incremental snapshot slot: \
  1003. {peer_snapshot_hashes:?}"
  1004. );
  1005. }
  1006. /// Check to see if we can use our local snapshots, otherwise download newer ones.
  1007. #[allow(clippy::too_many_arguments)]
  1008. fn download_snapshots(
  1009. validator_config: &ValidatorConfig,
  1010. bootstrap_config: &RpcBootstrapConfig,
  1011. use_progress_bar: bool,
  1012. maximum_local_snapshot_age: Slot,
  1013. start_progress: &Arc<RwLock<ValidatorStartProgress>>,
  1014. minimal_snapshot_download_speed: f32,
  1015. maximum_snapshot_download_abort: u64,
  1016. download_abort_count: &mut u64,
  1017. snapshot_hash: Option<SnapshotHash>,
  1018. rpc_contact_info: &ContactInfo,
  1019. ) -> Result<(), String> {
  1020. if snapshot_hash.is_none() {
  1021. return Ok(());
  1022. }
  1023. let SnapshotHash {
  1024. full: full_snapshot_hash,
  1025. incr: incremental_snapshot_hash,
  1026. } = snapshot_hash.unwrap();
  1027. let full_snapshot_archives_dir = &validator_config.snapshot_config.full_snapshot_archives_dir;
  1028. let incremental_snapshot_archives_dir = &validator_config
  1029. .snapshot_config
  1030. .incremental_snapshot_archives_dir;
  1031. // If the local snapshots are new enough, then use 'em; no need to download new snapshots
  1032. if should_use_local_snapshot(
  1033. full_snapshot_archives_dir,
  1034. incremental_snapshot_archives_dir,
  1035. maximum_local_snapshot_age,
  1036. full_snapshot_hash,
  1037. incremental_snapshot_hash,
  1038. bootstrap_config.incremental_snapshot_fetch,
  1039. ) {
  1040. return Ok(());
  1041. }
  1042. // Check and see if we've already got the full snapshot; if not, download it
  1043. if snapshot_paths::get_full_snapshot_archives(full_snapshot_archives_dir)
  1044. .into_iter()
  1045. .any(|snapshot_archive| {
  1046. snapshot_archive.slot() == full_snapshot_hash.0
  1047. && snapshot_archive.hash().0 == full_snapshot_hash.1
  1048. })
  1049. {
  1050. info!(
  1051. "Full snapshot archive already exists locally. Skipping download. slot: {}, hash: {}",
  1052. full_snapshot_hash.0, full_snapshot_hash.1
  1053. );
  1054. } else {
  1055. download_snapshot(
  1056. validator_config,
  1057. bootstrap_config,
  1058. use_progress_bar,
  1059. start_progress,
  1060. minimal_snapshot_download_speed,
  1061. maximum_snapshot_download_abort,
  1062. download_abort_count,
  1063. rpc_contact_info,
  1064. full_snapshot_hash,
  1065. SnapshotArchiveKind::Full,
  1066. )?;
  1067. }
  1068. if bootstrap_config.incremental_snapshot_fetch {
  1069. // Check and see if we've already got the incremental snapshot; if not, download it
  1070. if let Some(incremental_snapshot_hash) = incremental_snapshot_hash {
  1071. if snapshot_paths::get_incremental_snapshot_archives(incremental_snapshot_archives_dir)
  1072. .into_iter()
  1073. .any(|snapshot_archive| {
  1074. snapshot_archive.slot() == incremental_snapshot_hash.0
  1075. && snapshot_archive.hash().0 == incremental_snapshot_hash.1
  1076. && snapshot_archive.base_slot() == full_snapshot_hash.0
  1077. })
  1078. {
  1079. info!(
  1080. "Incremental snapshot archive already exists locally. Skipping download. \
  1081. slot: {}, hash: {}",
  1082. incremental_snapshot_hash.0, incremental_snapshot_hash.1
  1083. );
  1084. } else {
  1085. download_snapshot(
  1086. validator_config,
  1087. bootstrap_config,
  1088. use_progress_bar,
  1089. start_progress,
  1090. minimal_snapshot_download_speed,
  1091. maximum_snapshot_download_abort,
  1092. download_abort_count,
  1093. rpc_contact_info,
  1094. incremental_snapshot_hash,
  1095. SnapshotArchiveKind::Incremental(full_snapshot_hash.0),
  1096. )?;
  1097. }
  1098. }
  1099. }
  1100. Ok(())
  1101. }
  1102. /// Download a snapshot
  1103. #[allow(clippy::too_many_arguments)]
  1104. fn download_snapshot(
  1105. validator_config: &ValidatorConfig,
  1106. bootstrap_config: &RpcBootstrapConfig,
  1107. use_progress_bar: bool,
  1108. start_progress: &Arc<RwLock<ValidatorStartProgress>>,
  1109. minimal_snapshot_download_speed: f32,
  1110. maximum_snapshot_download_abort: u64,
  1111. download_abort_count: &mut u64,
  1112. rpc_contact_info: &ContactInfo,
  1113. desired_snapshot_hash: (Slot, Hash),
  1114. snapshot_kind: SnapshotArchiveKind,
  1115. ) -> Result<(), String> {
  1116. let maximum_full_snapshot_archives_to_retain = validator_config
  1117. .snapshot_config
  1118. .maximum_full_snapshot_archives_to_retain;
  1119. let maximum_incremental_snapshot_archives_to_retain = validator_config
  1120. .snapshot_config
  1121. .maximum_incremental_snapshot_archives_to_retain;
  1122. let full_snapshot_archives_dir = &validator_config.snapshot_config.full_snapshot_archives_dir;
  1123. let incremental_snapshot_archives_dir = &validator_config
  1124. .snapshot_config
  1125. .incremental_snapshot_archives_dir;
  1126. *start_progress.write().unwrap() = ValidatorStartProgress::DownloadingSnapshot {
  1127. slot: desired_snapshot_hash.0,
  1128. rpc_addr: rpc_contact_info
  1129. .rpc()
  1130. .ok_or_else(|| String::from("Invalid RPC address"))?,
  1131. };
  1132. let desired_snapshot_hash = (
  1133. desired_snapshot_hash.0,
  1134. agave_snapshots::snapshot_hash::SnapshotHash(desired_snapshot_hash.1),
  1135. );
  1136. download_snapshot_archive(
  1137. &rpc_contact_info
  1138. .rpc()
  1139. .ok_or_else(|| String::from("Invalid RPC address"))?,
  1140. full_snapshot_archives_dir,
  1141. incremental_snapshot_archives_dir,
  1142. desired_snapshot_hash,
  1143. snapshot_kind,
  1144. maximum_full_snapshot_archives_to_retain,
  1145. maximum_incremental_snapshot_archives_to_retain,
  1146. use_progress_bar,
  1147. &mut Some(Box::new(|download_progress: &DownloadProgressRecord| {
  1148. debug!("Download progress: {download_progress:?}");
  1149. if download_progress.last_throughput < minimal_snapshot_download_speed
  1150. && download_progress.notification_count <= 1
  1151. && download_progress.percentage_done <= 2_f32
  1152. && download_progress.estimated_remaining_time > 60_f32
  1153. && *download_abort_count < maximum_snapshot_download_abort
  1154. {
  1155. if let Some(ref known_validators) = validator_config.known_validators {
  1156. if known_validators.contains(rpc_contact_info.pubkey())
  1157. && known_validators.len() == 1
  1158. && bootstrap_config.only_known_rpc
  1159. {
  1160. warn!(
  1161. "The snapshot download is too slow, throughput: {} < min speed {} \
  1162. bytes/sec, but will NOT abort and try a different node as it is the \
  1163. only known validator and the --only-known-rpc flag is set. Abort \
  1164. count: {}, Progress detail: {:?}",
  1165. download_progress.last_throughput,
  1166. minimal_snapshot_download_speed,
  1167. download_abort_count,
  1168. download_progress,
  1169. );
  1170. return true; // Do not abort download from the one-and-only known validator
  1171. }
  1172. }
  1173. warn!(
  1174. "The snapshot download is too slow, throughput: {} < min speed {} bytes/sec, \
  1175. will abort and try a different node. Abort count: {}, Progress detail: {:?}",
  1176. download_progress.last_throughput,
  1177. minimal_snapshot_download_speed,
  1178. download_abort_count,
  1179. download_progress,
  1180. );
  1181. *download_abort_count += 1;
  1182. false
  1183. } else {
  1184. true
  1185. }
  1186. })),
  1187. )
  1188. }
  1189. /// Check to see if bootstrap should load from its local snapshots or not. If not, then snapshots
  1190. /// will be downloaded.
  1191. fn should_use_local_snapshot(
  1192. full_snapshot_archives_dir: &Path,
  1193. incremental_snapshot_archives_dir: &Path,
  1194. maximum_local_snapshot_age: Slot,
  1195. full_snapshot_hash: (Slot, Hash),
  1196. incremental_snapshot_hash: Option<(Slot, Hash)>,
  1197. incremental_snapshot_fetch: bool,
  1198. ) -> bool {
  1199. let cluster_snapshot_slot = incremental_snapshot_hash
  1200. .map(|(slot, _)| slot)
  1201. .unwrap_or(full_snapshot_hash.0);
  1202. match get_highest_local_snapshot_hash(
  1203. full_snapshot_archives_dir,
  1204. incremental_snapshot_archives_dir,
  1205. incremental_snapshot_fetch,
  1206. ) {
  1207. None => {
  1208. info!(
  1209. "Downloading a snapshot for slot {cluster_snapshot_slot} since there is not a \
  1210. local snapshot."
  1211. );
  1212. false
  1213. }
  1214. Some((local_snapshot_slot, _)) => {
  1215. if local_snapshot_slot
  1216. >= cluster_snapshot_slot.saturating_sub(maximum_local_snapshot_age)
  1217. {
  1218. info!(
  1219. "Reusing local snapshot at slot {local_snapshot_slot} instead of downloading \
  1220. a snapshot for slot {cluster_snapshot_slot}."
  1221. );
  1222. true
  1223. } else {
  1224. info!(
  1225. "Local snapshot from slot {local_snapshot_slot} is too old. Downloading a \
  1226. newer snapshot for slot {cluster_snapshot_slot}."
  1227. );
  1228. false
  1229. }
  1230. }
  1231. }
  1232. }
  1233. /// Get the node's highest snapshot hashes from CRDS
  1234. fn get_snapshot_hashes_for_node(cluster_info: &ClusterInfo, node: &Pubkey) -> Option<SnapshotHash> {
  1235. cluster_info.get_snapshot_hashes_for_node(node).map(
  1236. |crds_data::SnapshotHashes {
  1237. full, incremental, ..
  1238. }| {
  1239. let highest_incremental_snapshot_hash = incremental.into_iter().max();
  1240. SnapshotHash {
  1241. full,
  1242. incr: highest_incremental_snapshot_hash,
  1243. }
  1244. },
  1245. )
  1246. }
  1247. #[cfg(test)]
  1248. mod tests {
  1249. use super::*;
  1250. impl PeerSnapshotHash {
  1251. fn new(
  1252. rpc_contact_info: ContactInfo,
  1253. full_snapshot_hash: (Slot, Hash),
  1254. incremental_snapshot_hash: Option<(Slot, Hash)>,
  1255. ) -> Self {
  1256. Self {
  1257. rpc_contact_info,
  1258. snapshot_hash: SnapshotHash {
  1259. full: full_snapshot_hash,
  1260. incr: incremental_snapshot_hash,
  1261. },
  1262. }
  1263. }
  1264. }
  1265. fn default_contact_info_for_tests() -> ContactInfo {
  1266. ContactInfo::new_localhost(&Pubkey::default(), /*now:*/ 1_681_834_947_321)
  1267. }
  1268. #[test]
  1269. fn test_build_known_snapshot_hashes() {
  1270. agave_logger::setup();
  1271. let full_snapshot_hash1 = (400_000, Hash::new_unique());
  1272. let full_snapshot_hash2 = (400_000, Hash::new_unique());
  1273. let incremental_snapshot_hash1 = (400_800, Hash::new_unique());
  1274. let incremental_snapshot_hash2 = (400_800, Hash::new_unique());
  1275. // simulate a set of known validators with various snapshot hashes
  1276. let oracle = {
  1277. let mut oracle = HashMap::new();
  1278. for (full, incr) in [
  1279. // only a full snapshot
  1280. (full_snapshot_hash1, None),
  1281. // full and incremental snapshots
  1282. (full_snapshot_hash1, Some(incremental_snapshot_hash1)),
  1283. // full and incremental snapshots, with different incremental hash
  1284. (full_snapshot_hash1, Some(incremental_snapshot_hash2)),
  1285. // ...and now with different full hashes
  1286. (full_snapshot_hash2, None),
  1287. (full_snapshot_hash2, Some(incremental_snapshot_hash1)),
  1288. (full_snapshot_hash2, Some(incremental_snapshot_hash2)),
  1289. ] {
  1290. // also simulate multiple known validators having the same snapshot hashes
  1291. oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
  1292. oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
  1293. oracle.insert(Pubkey::new_unique(), Some(SnapshotHash { full, incr }));
  1294. }
  1295. // no snapshots at all
  1296. oracle.insert(Pubkey::new_unique(), None);
  1297. oracle.insert(Pubkey::new_unique(), None);
  1298. oracle.insert(Pubkey::new_unique(), None);
  1299. oracle
  1300. };
  1301. let node_to_snapshot_hashes = |node| *oracle.get(node).unwrap();
  1302. let known_snapshot_hashes =
  1303. build_known_snapshot_hashes(oracle.keys(), node_to_snapshot_hashes);
  1304. // ensure there's only one full snapshot hash, since they all used the same slot and there
  1305. // can be only one snapshot hash per slot
  1306. let known_full_snapshot_hashes = known_snapshot_hashes.keys();
  1307. assert_eq!(known_full_snapshot_hashes.len(), 1);
  1308. let known_full_snapshot_hash = known_full_snapshot_hashes.into_iter().next().unwrap();
  1309. // and for the same reasons, ensure there is only one incremental snapshot hash
  1310. let known_incremental_snapshot_hashes =
  1311. known_snapshot_hashes.get(known_full_snapshot_hash).unwrap();
  1312. assert_eq!(known_incremental_snapshot_hashes.len(), 1);
  1313. let known_incremental_snapshot_hash =
  1314. known_incremental_snapshot_hashes.iter().next().unwrap();
  1315. // The resulting `known_snapshot_hashes` can be different from run-to-run due to how
  1316. // `oracle.keys()` returns nodes during iteration. Because of that, we cannot just assert
  1317. // the full and incremental snapshot hashes are `full_snapshot_hash1` and
  1318. // `incremental_snapshot_hash1`. Instead, we assert that the full and incremental
  1319. // snapshot hashes are exactly one or the other, since it depends on which nodes are seen
  1320. // "first" when building the known snapshot hashes.
  1321. assert!(
  1322. known_full_snapshot_hash == &full_snapshot_hash1
  1323. || known_full_snapshot_hash == &full_snapshot_hash2
  1324. );
  1325. assert!(
  1326. known_incremental_snapshot_hash == &incremental_snapshot_hash1
  1327. || known_incremental_snapshot_hash == &incremental_snapshot_hash2
  1328. );
  1329. }
  1330. #[test]
  1331. fn test_retain_peer_snapshot_hashes_that_match_known_snapshot_hashes() {
  1332. let known_snapshot_hashes: KnownSnapshotHashes = [
  1333. (
  1334. (200_000, Hash::new_unique()),
  1335. [
  1336. (200_200, Hash::new_unique()),
  1337. (200_400, Hash::new_unique()),
  1338. (200_600, Hash::new_unique()),
  1339. (200_800, Hash::new_unique()),
  1340. ]
  1341. .iter()
  1342. .cloned()
  1343. .collect(),
  1344. ),
  1345. (
  1346. (300_000, Hash::new_unique()),
  1347. [
  1348. (300_200, Hash::new_unique()),
  1349. (300_400, Hash::new_unique()),
  1350. (300_600, Hash::new_unique()),
  1351. ]
  1352. .iter()
  1353. .cloned()
  1354. .collect(),
  1355. ),
  1356. ]
  1357. .iter()
  1358. .cloned()
  1359. .collect();
  1360. let known_snapshot_hash = known_snapshot_hashes.iter().next().unwrap();
  1361. let known_full_snapshot_hash = known_snapshot_hash.0;
  1362. let known_incremental_snapshot_hash = known_snapshot_hash.1.iter().next().unwrap();
  1363. let contact_info = default_contact_info_for_tests();
  1364. let peer_snapshot_hashes = vec![
  1365. // bad full snapshot hash, no incremental snapshot hash
  1366. PeerSnapshotHash::new(contact_info.clone(), (111_000, Hash::default()), None),
  1367. // bad everything
  1368. PeerSnapshotHash::new(
  1369. contact_info.clone(),
  1370. (111_000, Hash::default()),
  1371. Some((111_111, Hash::default())),
  1372. ),
  1373. // good full snapshot hash, no incremental snapshot hash
  1374. PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None),
  1375. // bad full snapshot hash, good (not possible) incremental snapshot hash
  1376. PeerSnapshotHash::new(
  1377. contact_info.clone(),
  1378. (111_000, Hash::default()),
  1379. Some(*known_incremental_snapshot_hash),
  1380. ),
  1381. // good full snapshot hash, bad incremental snapshot hash
  1382. PeerSnapshotHash::new(
  1383. contact_info.clone(),
  1384. *known_full_snapshot_hash,
  1385. Some((111_111, Hash::default())),
  1386. ),
  1387. // good everything
  1388. PeerSnapshotHash::new(
  1389. contact_info.clone(),
  1390. *known_full_snapshot_hash,
  1391. Some(*known_incremental_snapshot_hash),
  1392. ),
  1393. ];
  1394. let expected = vec![
  1395. PeerSnapshotHash::new(contact_info.clone(), *known_full_snapshot_hash, None),
  1396. PeerSnapshotHash::new(
  1397. contact_info,
  1398. *known_full_snapshot_hash,
  1399. Some(*known_incremental_snapshot_hash),
  1400. ),
  1401. ];
  1402. let mut actual = peer_snapshot_hashes;
  1403. retain_peer_snapshot_hashes_that_match_known_snapshot_hashes(
  1404. &known_snapshot_hashes,
  1405. &mut actual,
  1406. );
  1407. assert_eq!(expected, actual);
  1408. }
  1409. #[test]
  1410. fn test_retain_peer_snapshot_hashes_with_highest_full_snapshot_slot() {
  1411. let contact_info = default_contact_info_for_tests();
  1412. let peer_snapshot_hashes = vec![
  1413. // old
  1414. PeerSnapshotHash::new(contact_info.clone(), (100_000, Hash::default()), None),
  1415. PeerSnapshotHash::new(
  1416. contact_info.clone(),
  1417. (100_000, Hash::default()),
  1418. Some((100_100, Hash::default())),
  1419. ),
  1420. PeerSnapshotHash::new(
  1421. contact_info.clone(),
  1422. (100_000, Hash::default()),
  1423. Some((100_200, Hash::default())),
  1424. ),
  1425. PeerSnapshotHash::new(
  1426. contact_info.clone(),
  1427. (100_000, Hash::default()),
  1428. Some((100_300, Hash::default())),
  1429. ),
  1430. // new
  1431. PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
  1432. PeerSnapshotHash::new(
  1433. contact_info.clone(),
  1434. (200_000, Hash::default()),
  1435. Some((200_100, Hash::default())),
  1436. ),
  1437. PeerSnapshotHash::new(
  1438. contact_info.clone(),
  1439. (200_000, Hash::default()),
  1440. Some((200_200, Hash::default())),
  1441. ),
  1442. PeerSnapshotHash::new(
  1443. contact_info.clone(),
  1444. (200_000, Hash::default()),
  1445. Some((200_300, Hash::default())),
  1446. ),
  1447. ];
  1448. let expected = vec![
  1449. PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
  1450. PeerSnapshotHash::new(
  1451. contact_info.clone(),
  1452. (200_000, Hash::default()),
  1453. Some((200_100, Hash::default())),
  1454. ),
  1455. PeerSnapshotHash::new(
  1456. contact_info.clone(),
  1457. (200_000, Hash::default()),
  1458. Some((200_200, Hash::default())),
  1459. ),
  1460. PeerSnapshotHash::new(
  1461. contact_info,
  1462. (200_000, Hash::default()),
  1463. Some((200_300, Hash::default())),
  1464. ),
  1465. ];
  1466. let mut actual = peer_snapshot_hashes;
  1467. retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut actual);
  1468. assert_eq!(expected, actual);
  1469. }
  1470. #[test]
  1471. fn test_retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot_some() {
  1472. let contact_info = default_contact_info_for_tests();
  1473. let peer_snapshot_hashes = vec![
  1474. PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::default()), None),
  1475. PeerSnapshotHash::new(
  1476. contact_info.clone(),
  1477. (200_000, Hash::default()),
  1478. Some((200_100, Hash::default())),
  1479. ),
  1480. PeerSnapshotHash::new(
  1481. contact_info.clone(),
  1482. (200_000, Hash::default()),
  1483. Some((200_200, Hash::default())),
  1484. ),
  1485. PeerSnapshotHash::new(
  1486. contact_info.clone(),
  1487. (200_000, Hash::default()),
  1488. Some((200_300, Hash::default())),
  1489. ),
  1490. PeerSnapshotHash::new(
  1491. contact_info.clone(),
  1492. (200_000, Hash::default()),
  1493. Some((200_010, Hash::default())),
  1494. ),
  1495. PeerSnapshotHash::new(
  1496. contact_info.clone(),
  1497. (200_000, Hash::default()),
  1498. Some((200_020, Hash::default())),
  1499. ),
  1500. PeerSnapshotHash::new(
  1501. contact_info.clone(),
  1502. (200_000, Hash::default()),
  1503. Some((200_030, Hash::default())),
  1504. ),
  1505. ];
  1506. let expected = vec![PeerSnapshotHash::new(
  1507. contact_info,
  1508. (200_000, Hash::default()),
  1509. Some((200_300, Hash::default())),
  1510. )];
  1511. let mut actual = peer_snapshot_hashes;
  1512. retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
  1513. assert_eq!(expected, actual);
  1514. }
  1515. /// Ensure that retaining the highest incremental snapshot hashes works as expected even if
  1516. /// there are *zero* peers with incremental snapshots.
  1517. #[test]
  1518. fn test_retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot_none() {
  1519. let contact_info = default_contact_info_for_tests();
  1520. let peer_snapshot_hashes = vec![
  1521. PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::new_unique()), None),
  1522. PeerSnapshotHash::new(contact_info.clone(), (200_000, Hash::new_unique()), None),
  1523. PeerSnapshotHash::new(contact_info, (200_000, Hash::new_unique()), None),
  1524. ];
  1525. let expected = peer_snapshot_hashes.clone();
  1526. let mut actual = peer_snapshot_hashes;
  1527. retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
  1528. assert_eq!(expected, actual);
  1529. }
  1530. /// Ensure that retaining the highest snapshot hashes works (i.e. doesn't crash) even if the
  1531. /// peer snapshot hashes input is empty.
  1532. #[test]
  1533. fn test_retain_peer_snapshot_hashes_with_highest_slot_empty() {
  1534. {
  1535. let mut actual = vec![];
  1536. let expected = actual.clone();
  1537. retain_peer_snapshot_hashes_with_highest_full_snapshot_slot(&mut actual);
  1538. assert_eq!(expected, actual);
  1539. }
  1540. {
  1541. let mut actual = vec![];
  1542. let expected = actual.clone();
  1543. retain_peer_snapshot_hashes_with_highest_incremental_snapshot_slot(&mut actual);
  1544. assert_eq!(expected, actual);
  1545. }
  1546. }
  1547. }