소스 검색

moves turbine to a separate crate out of solana/core (#32226)

behzad nouri 2 년 전
부모
커밋
f6e039b0b3

+ 34 - 0
Cargo.lock

@@ -5617,6 +5617,7 @@ dependencies = [
  "solana-streamer",
  "solana-tpu-client",
  "solana-transaction-status",
+ "solana-turbine",
  "solana-version",
  "solana-vote-program",
  "static_assertions",
@@ -6095,6 +6096,7 @@ dependencies = [
  "solana-streamer",
  "solana-thin-client",
  "solana-tpu-client",
+ "solana-turbine",
  "solana-vote-program",
  "tempfile",
  "trees",
@@ -7062,6 +7064,38 @@ dependencies = [
  "thiserror",
 ]
 
+[[package]]
+name = "solana-turbine"
+version = "1.17.0"
+dependencies = [
+ "bincode",
+ "crossbeam-channel",
+ "itertools",
+ "log",
+ "lru",
+ "matches",
+ "rand 0.7.3",
+ "rand_chacha 0.2.2",
+ "rayon",
+ "solana-client",
+ "solana-entry",
+ "solana-gossip",
+ "solana-ledger",
+ "solana-logger",
+ "solana-measure",
+ "solana-metrics",
+ "solana-perf",
+ "solana-poh",
+ "solana-quic-client",
+ "solana-rayon-threadlimit",
+ "solana-rpc",
+ "solana-rpc-client-api",
+ "solana-runtime",
+ "solana-sdk",
+ "solana-streamer",
+ "thiserror",
+]
+
 [[package]]
 name = "solana-udp-client"
 version = "1.17.0"

+ 2 - 0
Cargo.toml

@@ -97,6 +97,7 @@ members = [
     "tpu-client",
     "transaction-dos",
     "transaction-status",
+    "turbine",
     "udp-client",
     "upload-perf",
     "validator",
@@ -355,6 +356,7 @@ solana-test-validator = { path = "test-validator", version = "=1.17.0" }
 solana-thin-client = { path = "thin-client", version = "=1.17.0" }
 solana-tpu-client = { path = "tpu-client", version = "=1.17.0", default-features = false }
 solana-transaction-status = { path = "transaction-status", version = "=1.17.0" }
+solana-turbine = { path = "turbine", version = "=1.17.0" }
 solana-udp-client = { path = "udp-client", version = "=1.17.0" }
 solana-version = { path = "version", version = "=1.17.0" }
 solana-vote-program = { path = "programs/vote", version = "=1.17.0" }

+ 4 - 0
ci/bench/part1.sh

@@ -17,3 +17,7 @@ _ cargo +"$rust_nightly" bench --manifest-path gossip/Cargo.toml ${V:+--verbose}
 # Run poh benches
 _ cargo +"$rust_nightly" bench --manifest-path poh/Cargo.toml ${V:+--verbose} \
   -- -Z unstable-options --format=json | tee -a "$BENCH_FILE"
+
+# Run turbine benches
+_ cargo +"$rust_nightly" bench --manifest-path turbine/Cargo.toml ${V:+--verbose} \
+  -- -Z unstable-options --format=json | tee -a "$BENCH_FILE"

+ 1 - 6
core/Cargo.toml

@@ -60,6 +60,7 @@ solana-send-transaction-service = { workspace = true }
 solana-streamer = { workspace = true }
 solana-tpu-client = { workspace = true }
 solana-transaction-status = { workspace = true }
+solana-turbine = { workspace = true }
 solana-version = { workspace = true }
 solana-vote-program = { workspace = true }
 strum = { workspace = true, features = ["derive"] }
@@ -92,17 +93,11 @@ rustc_version = { workspace = true }
 [[bench]]
 name = "banking_stage"
 
-[[bench]]
-name = "cluster_info"
-
 [[bench]]
 name = "gen_keys"
 
 [[bench]]
 name = "sigverify_stage"
 
-[[bench]]
-name = "retransmit_stage"
-
 [package.metadata.docs.rs]
 targets = ["x86_64-unknown-linux-gnu"]

+ 0 - 4
core/src/lib.rs

@@ -13,10 +13,8 @@ pub mod admin_rpc_post_init;
 pub mod ancestor_hashes_service;
 pub mod banking_stage;
 pub mod banking_trace;
-pub mod broadcast_stage;
 pub mod cache_block_meta_service;
 pub mod cluster_info_vote_listener;
-pub mod cluster_nodes;
 pub mod cluster_slot_state_verifier;
 pub mod cluster_slots;
 pub mod cluster_slots_service;
@@ -48,14 +46,12 @@ pub mod repair_weighted_traversal;
 pub mod replay_stage;
 pub mod request_response;
 mod result;
-pub mod retransmit_stage;
 pub mod rewards_recorder_service;
 pub mod sample_performance_service;
 pub mod serve_repair;
 pub mod serve_repair_service;
 mod shred_fetch_stage;
 pub mod sigverify;
-pub mod sigverify_shreds;
 pub mod sigverify_stage;
 pub mod snapshot_packager_service;
 pub mod staked_nodes_updater_service;

+ 5 - 7
core/src/replay_stage.rs

@@ -4,7 +4,6 @@ use {
     crate::{
         ancestor_hashes_service::AncestorHashesReplayUpdateSender,
         banking_trace::BankingTracer,
-        broadcast_stage::RetransmitSlotsSender,
         cache_block_meta_service::CacheBlockMetaSender,
         cluster_info_vote_listener::{
             GossipDuplicateConfirmedSlotsReceiver, GossipVerifiedVoteHashReceiver, VoteTracker,
@@ -487,7 +486,7 @@ impl ReplayStage {
         maybe_process_blockstore: Option<ProcessBlockStore>,
         vote_tracker: Arc<VoteTracker>,
         cluster_slots: Arc<ClusterSlots>,
-        retransmit_slots_sender: RetransmitSlotsSender,
+        retransmit_slots_sender: Sender<Slot>,
         ancestor_duplicate_slots_receiver: AncestorDuplicateSlotsReceiver,
         replay_vote_sender: ReplayVoteSender,
         gossip_duplicate_confirmed_slots_receiver: GossipDuplicateConfirmedSlotsReceiver,
@@ -1152,7 +1151,7 @@ impl ReplayStage {
 
     fn maybe_retransmit_unpropagated_slots(
         metric_name: &'static str,
-        retransmit_slots_sender: &RetransmitSlotsSender,
+        retransmit_slots_sender: &Sender<Slot>,
         progress: &mut ProgressMap,
         latest_leader_slot: Slot,
     ) {
@@ -1192,7 +1191,7 @@ impl ReplayStage {
 
     fn retransmit_latest_unpropagated_leader_slot(
         poh_recorder: &Arc<RwLock<PohRecorder>>,
-        retransmit_slots_sender: &RetransmitSlotsSender,
+        retransmit_slots_sender: &Sender<Slot>,
         progress: &mut ProgressMap,
     ) {
         let start_slot = poh_recorder.read().unwrap().start_slot();
@@ -1835,7 +1834,7 @@ impl ReplayStage {
         leader_schedule_cache: &Arc<LeaderScheduleCache>,
         rpc_subscriptions: &Arc<RpcSubscriptions>,
         progress_map: &mut ProgressMap,
-        retransmit_slots_sender: &RetransmitSlotsSender,
+        retransmit_slots_sender: &Sender<Slot>,
         skipped_slots_info: &mut SkippedSlotsInfo,
         banking_tracer: &Arc<BankingTracer>,
         has_new_vote_been_rooted: bool,
@@ -3835,7 +3834,6 @@ pub(crate) mod tests {
     use {
         super::*,
         crate::{
-            broadcast_stage::RetransmitSlotsReceiver,
             consensus::Tower,
             progress_map::{ValidatorStakeInfo, RETRANSMIT_BASE_DELAY_MS},
             replay_stage::ReplayStage,
@@ -7373,7 +7371,7 @@ pub(crate) mod tests {
         );
     }
 
-    fn receive_slots(retransmit_slots_receiver: &RetransmitSlotsReceiver) -> Vec<Slot> {
+    fn receive_slots(retransmit_slots_receiver: &Receiver<Slot>) -> Vec<Slot> {
         let mut slots = Vec::default();
         while let Ok(slot) = retransmit_slots_receiver.recv_timeout(Duration::from_millis(10)) {
             slots.push(slot);

+ 0 - 2
core/src/result.rs

@@ -27,8 +27,6 @@ pub enum Error {
     RecvTimeout(#[from] crossbeam_channel::RecvTimeoutError),
     #[error("Send")]
     Send,
-    #[error(transparent)]
-    TransportError(#[from] solana_sdk::transport::TransportError),
     #[error("TrySend")]
     TrySend,
     #[error(transparent)]

+ 2 - 1
core/src/shred_fetch_stage.rs

@@ -1,7 +1,7 @@
 //! The `shred_fetch_stage` pulls shreds from UDP sockets and sends it to a channel.
 
 use {
-    crate::{cluster_nodes::check_feature_activation, serve_repair::ServeRepair},
+    crate::serve_repair::ServeRepair,
     crossbeam_channel::{unbounded, Sender},
     solana_gossip::{cluster_info::ClusterInfo, contact_info::Protocol},
     solana_ledger::shred::{should_discard_shred, ShredFetchStats},
@@ -12,6 +12,7 @@ use {
         feature_set,
     },
     solana_streamer::streamer::{self, PacketBatchReceiver, StakedNodes, StreamerReceiveStats},
+    solana_turbine::cluster_nodes::check_feature_activation,
     std::{
         net::UdpSocket,
         sync::{

+ 3 - 3
core/src/tpu.rs

@@ -6,7 +6,6 @@ use {
     crate::{
         banking_stage::BankingStage,
         banking_trace::{BankingTracer, TracerThread},
-        broadcast_stage::{BroadcastStage, BroadcastStageType, RetransmitSlotsReceiver},
         cluster_info_vote_listener::{
             ClusterInfoVoteListener, GossipDuplicateConfirmedSlotsSender,
             GossipVerifiedVoteHashSender, VerifiedVoteSender, VoteTracker,
@@ -36,12 +35,13 @@ use {
         prioritization_fee_cache::PrioritizationFeeCache,
         vote_sender_types::{ReplayVoteReceiver, ReplayVoteSender},
     },
-    solana_sdk::{pubkey::Pubkey, signature::Keypair},
+    solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair},
     solana_streamer::{
         nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
         quic::{spawn_server, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
         streamer::StakedNodes,
     },
+    solana_turbine::broadcast_stage::{BroadcastStage, BroadcastStageType},
     std::{
         collections::HashMap,
         net::UdpSocket,
@@ -83,7 +83,7 @@ impl Tpu {
         cluster_info: &Arc<ClusterInfo>,
         poh_recorder: &Arc<RwLock<PohRecorder>>,
         entry_receiver: Receiver<WorkingBankEntry>,
-        retransmit_slots_receiver: RetransmitSlotsReceiver,
+        retransmit_slots_receiver: Receiver<Slot>,
         sockets: TpuSockets,
         subscriptions: &Arc<RpcSubscriptions>,
         transaction_status_sender: Option<TransactionStatusSender>,

+ 4 - 6
core/src/tvu.rs

@@ -4,7 +4,6 @@
 use {
     crate::{
         banking_trace::BankingTracer,
-        broadcast_stage::RetransmitSlotsSender,
         cache_block_meta_service::CacheBlockMetaSender,
         cluster_info_vote_listener::{
             GossipDuplicateConfirmedSlotsReceiver, GossipVerifiedVoteHashReceiver,
@@ -18,17 +17,15 @@ use {
         ledger_cleanup_service::LedgerCleanupService,
         repair_service::RepairInfo,
         replay_stage::{ReplayStage, ReplayStageConfig},
-        retransmit_stage::RetransmitStage,
         rewards_recorder_service::RewardsRecorderSender,
         shred_fetch_stage::ShredFetchStage,
-        sigverify_shreds,
         tower_storage::TowerStorage,
         validator::ProcessBlockStore,
         voting_service::VotingService,
         warm_quic_cache_service::WarmQuicCacheService,
         window_service::WindowService,
     },
-    crossbeam_channel::{unbounded, Receiver},
+    crossbeam_channel::{unbounded, Receiver, Sender},
     solana_client::connection_cache::ConnectionCache,
     solana_geyser_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierLock,
     solana_gossip::{
@@ -52,6 +49,7 @@ use {
     },
     solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair},
     solana_streamer::streamer::StakedNodes,
+    solana_turbine::retransmit_stage::RetransmitStage,
     std::{
         collections::HashSet,
         net::UdpSocket,
@@ -124,7 +122,7 @@ impl Tvu {
         cache_block_meta_sender: Option<CacheBlockMetaSender>,
         entry_notification_sender: Option<EntryNotifierSender>,
         vote_tracker: Arc<VoteTracker>,
-        retransmit_slots_sender: RetransmitSlotsSender,
+        retransmit_slots_sender: Sender<Slot>,
         gossip_verified_vote_hash_receiver: GossipVerifiedVoteHashReceiver,
         verified_vote_receiver: VerifiedVoteReceiver,
         replay_vote_sender: ReplayVoteSender,
@@ -171,7 +169,7 @@ impl Tvu {
 
         let (verified_sender, verified_receiver) = unbounded();
         let (retransmit_sender, retransmit_receiver) = unbounded();
-        let shred_sigverify = sigverify_shreds::spawn_shred_sigverify(
+        let shred_sigverify = solana_turbine::sigverify_shreds::spawn_shred_sigverify(
             cluster_info.clone(),
             bank_forks.clone(),
             leader_schedule_cache.clone(),

+ 1 - 1
core/src/validator.rs

@@ -6,7 +6,6 @@ use {
         accounts_hash_verifier::{AccountsHashFaultInjector, AccountsHashVerifier},
         admin_rpc_post_init::AdminRpcRequestMetadataPostInit,
         banking_trace::{self, BankingTracer},
-        broadcast_stage::BroadcastStageType,
         cache_block_meta_service::{CacheBlockMetaSender, CacheBlockMetaService},
         cluster_info_vote_listener::VoteTracker,
         completed_data_sets_service::CompletedDataSetsService,
@@ -110,6 +109,7 @@ use {
     },
     solana_send_transaction_service::send_transaction_service,
     solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes},
+    solana_turbine::broadcast_stage::BroadcastStageType,
     solana_vote_program::vote_state,
     std::{
         collections::{HashMap, HashSet},

+ 1 - 0
local-cluster/Cargo.toml

@@ -30,6 +30,7 @@ solana-stake-program = { workspace = true }
 solana-streamer = { workspace = true }
 solana-thin-client = { workspace = true }
 solana-tpu-client = { workspace = true }
+solana-turbine = { workspace = true }
 solana-vote-program = { workspace = true }
 tempfile = { workspace = true }
 trees = { workspace = true }

+ 1 - 1
local-cluster/tests/common/mod.rs

@@ -2,7 +2,6 @@
 use {
     log::*,
     solana_core::{
-        broadcast_stage::BroadcastStageType,
         consensus::{Tower, SWITCH_FORK_THRESHOLD},
         tower_storage::FileTowerStorage,
         validator::ValidatorConfig,
@@ -33,6 +32,7 @@ use {
         signature::{Keypair, Signer},
     },
     solana_streamer::socket::SocketAddrSpace,
+    solana_turbine::broadcast_stage::BroadcastStageType,
     std::{
         collections::HashSet,
         fs, iter,

+ 3 - 3
local-cluster/tests/local_cluster.rs

@@ -8,9 +8,6 @@ use {
     serial_test::serial,
     solana_client::thin_client::ThinClient,
     solana_core::{
-        broadcast_stage::{
-            broadcast_duplicates_run::BroadcastDuplicatesConfig, BroadcastStageType,
-        },
         consensus::{Tower, SWITCH_FORK_THRESHOLD, VOTE_THRESHOLD_DEPTH},
         optimistic_confirmation_verifier::OptimisticConfirmationVerifier,
         replay_stage::DUPLICATE_THRESHOLD,
@@ -63,6 +60,9 @@ use {
         vote::state::VoteStateUpdate,
     },
     solana_streamer::socket::SocketAddrSpace,
+    solana_turbine::broadcast_stage::{
+        broadcast_duplicates_run::BroadcastDuplicatesConfig, BroadcastStageType,
+    },
     solana_vote_program::{vote_state::MAX_LOCKOUT_HISTORY, vote_transaction},
     std::{
         collections::{BTreeSet, HashMap, HashSet},

+ 31 - 0
programs/sbf/Cargo.lock

@@ -4754,6 +4754,7 @@ dependencies = [
  "solana-streamer",
  "solana-tpu-client",
  "solana-transaction-status",
+ "solana-turbine",
  "solana-version",
  "solana-vote-program",
  "strum",
@@ -6138,6 +6139,36 @@ dependencies = [
  "thiserror",
 ]
 
+[[package]]
+name = "solana-turbine"
+version = "1.17.0"
+dependencies = [
+ "bincode",
+ "crossbeam-channel",
+ "itertools",
+ "log",
+ "lru",
+ "rand 0.7.3",
+ "rand_chacha 0.2.2",
+ "rayon",
+ "solana-client",
+ "solana-entry",
+ "solana-gossip",
+ "solana-ledger",
+ "solana-measure",
+ "solana-metrics",
+ "solana-perf",
+ "solana-poh",
+ "solana-quic-client",
+ "solana-rayon-threadlimit",
+ "solana-rpc",
+ "solana-rpc-client-api",
+ "solana-runtime",
+ "solana-sdk",
+ "solana-streamer",
+ "thiserror",
+]
+
 [[package]]
 name = "solana-udp-client"
 version = "1.17.0"

+ 49 - 0
turbine/Cargo.toml

@@ -0,0 +1,49 @@
+[package]
+name = "solana-turbine"
+description = "Blockchain, Rebuilt for Scale"
+documentation = "https://docs.rs/solana-turbine"
+version = { workspace = true }
+authors = { workspace = true }
+repository = { workspace = true }
+homepage = { workspace = true }
+license = { workspace = true }
+edition = { workspace = true }
+
+[dependencies]
+bincode = { workspace = true }
+crossbeam-channel = { workspace = true }
+itertools = { workspace = true }
+log = { workspace = true }
+lru = { workspace = true }
+rand = { workspace = true }
+rand_chacha = { workspace = true }
+rayon = { workspace = true }
+solana-client = { workspace = true }
+solana-entry = { workspace = true }
+solana-gossip = { workspace = true }
+solana-ledger = { workspace = true }
+solana-measure = { workspace = true }
+solana-metrics = { workspace = true }
+solana-perf = { workspace = true }
+solana-poh = { workspace = true }
+solana-quic-client = { workspace = true }
+solana-rayon-threadlimit = { workspace = true }
+solana-rpc = { workspace = true }
+solana-rpc-client-api = { workspace = true }
+solana-runtime = { workspace = true }
+solana-sdk = { workspace = true }
+solana-streamer = { workspace = true }
+thiserror = { workspace = true }
+
+[dev-dependencies]
+matches = { workspace = true }
+solana-logger = { workspace = true }
+
+[[bench]]
+name = "cluster_info"
+
+[[bench]]
+name = "cluster_nodes"
+
+[[bench]]
+name = "retransmit_stage"

+ 7 - 8
core/benches/cluster_info.rs → turbine/benches/cluster_info.rs

@@ -4,13 +4,6 @@ extern crate test;
 
 use {
     rand::{thread_rng, Rng},
-    solana_core::{
-        broadcast_stage::{
-            broadcast_metrics::TransmitShredsStats, broadcast_shreds, BroadcastStage,
-        },
-        cluster_nodes::ClusterNodesCache,
-        validator::TURBINE_QUIC_CONNECTION_POOL_SIZE,
-    },
     solana_gossip::{
         cluster_info::{ClusterInfo, Node},
         contact_info::ContactInfo,
@@ -27,6 +20,12 @@ use {
         timing::{timestamp, AtomicInterval},
     },
     solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes},
+    solana_turbine::{
+        broadcast_stage::{
+            broadcast_metrics::TransmitShredsStats, broadcast_shreds, BroadcastStage,
+        },
+        cluster_nodes::ClusterNodesCache,
+    },
     std::{
         collections::HashMap,
         net::{IpAddr, Ipv4Addr, UdpSocket},
@@ -45,7 +44,7 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) {
         &leader_keypair,
         IpAddr::V4(Ipv4Addr::LOCALHOST),
         &Arc::<RwLock<StakedNodes>>::default(),
-        TURBINE_QUIC_CONNECTION_POOL_SIZE,
+        4, // connection_pool_size
     )
     .unwrap();
     let leader_info = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());

+ 4 - 4
core/benches/cluster_nodes.rs → turbine/benches/cluster_nodes.rs

@@ -4,13 +4,13 @@ extern crate test;
 
 use {
     rand::{seq::SliceRandom, Rng},
-    solana_core::{
-        cluster_nodes::{make_test_cluster, new_cluster_nodes, ClusterNodes},
-        retransmit_stage::RetransmitStage,
-    },
     solana_gossip::legacy_contact_info::LegacyContactInfo as ContactInfo,
     solana_ledger::shred::{Shred, ShredFlags},
     solana_sdk::{clock::Slot, pubkey::Pubkey},
+    solana_turbine::{
+        cluster_nodes::{make_test_cluster, new_cluster_nodes, ClusterNodes},
+        retransmit_stage::RetransmitStage,
+    },
     test::Bencher,
 };
 

+ 3 - 3
core/benches/retransmit_stage.rs → turbine/benches/retransmit_stage.rs

@@ -1,12 +1,11 @@
 #![feature(test)]
 
-extern crate solana_core;
+extern crate solana_turbine;
 extern crate test;
 
 use {
     crossbeam_channel::unbounded,
     log::*,
-    solana_core::{retransmit_stage::retransmitter, validator::TURBINE_QUIC_CONNECTION_POOL_SIZE},
     solana_entry::entry::Entry,
     solana_gossip::{
         cluster_info::{ClusterInfo, Node},
@@ -27,6 +26,7 @@ use {
         timing::timestamp,
     },
     solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes},
+    solana_turbine::retransmit_stage::retransmitter,
     std::{
         iter::repeat_with,
         net::{IpAddr, Ipv4Addr, UdpSocket},
@@ -103,7 +103,7 @@ fn bench_retransmitter(bencher: &mut Bencher) {
             &keypair,
             IpAddr::V4(Ipv4Addr::LOCALHOST),
             &Arc::<RwLock<StakedNodes>>::default(),
-            TURBINE_QUIC_CONNECTION_POOL_SIZE,
+            4, // connection_pool_size
         )
         .unwrap(),
     );

+ 35 - 12
core/src/broadcast_stage.rs → turbine/src/broadcast_stage.rs

@@ -8,10 +8,7 @@ use {
         fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun,
         standard_broadcast_run::StandardBroadcastRun,
     },
-    crate::{
-        cluster_nodes::{self, ClusterNodes, ClusterNodesCache},
-        result::{Error, Result},
-    },
+    crate::cluster_nodes::{self, ClusterNodes, ClusterNodesCache},
     crossbeam_channel::{unbounded, Receiver, RecvError, RecvTimeoutError, Sender},
     itertools::{Either, Itertools},
     solana_client::tpu_connection::TpuConnection,
@@ -46,6 +43,7 @@ use {
         thread::{self, Builder, JoinHandle},
         time::{Duration, Instant},
     },
+    thiserror::Error,
 };
 
 pub mod broadcast_duplicates_run;
@@ -59,11 +57,31 @@ const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = 8;
 const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5);
 
 pub(crate) const NUM_INSERT_THREADS: usize = 2;
-pub(crate) type RetransmitSlotsSender = Sender<Slot>;
-pub(crate) type RetransmitSlotsReceiver = Receiver<Slot>;
 pub(crate) type RecordReceiver = Receiver<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>;
 pub(crate) type TransmitReceiver = Receiver<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>;
 
+#[derive(Debug, Error)]
+pub enum Error {
+    #[error(transparent)]
+    Blockstore(#[from] solana_ledger::blockstore::BlockstoreError),
+    #[error(transparent)]
+    ClusterInfo(#[from] solana_gossip::cluster_info::ClusterInfoError),
+    #[error(transparent)]
+    Io(#[from] std::io::Error),
+    #[error(transparent)]
+    Recv(#[from] crossbeam_channel::RecvError),
+    #[error(transparent)]
+    RecvTimeout(#[from] crossbeam_channel::RecvTimeoutError),
+    #[error("Send")]
+    Send,
+    #[error(transparent)]
+    Serialize(#[from] std::boxed::Box<bincode::ErrorKind>),
+    #[error(transparent)]
+    TransportError(#[from] solana_sdk::transport::TransportError),
+}
+
+type Result<T> = std::result::Result<T, Error>;
+
 #[derive(Debug, PartialEq, Eq, Clone)]
 pub enum BroadcastStageReturnType {
     ChannelDisconnected,
@@ -84,7 +102,7 @@ impl BroadcastStageType {
         sock: Vec<UdpSocket>,
         cluster_info: Arc<ClusterInfo>,
         receiver: Receiver<WorkingBankEntry>,
-        retransmit_slots_receiver: RetransmitSlotsReceiver,
+        retransmit_slots_receiver: Receiver<Slot>,
         exit_sender: Arc<AtomicBool>,
         blockstore: Arc<Blockstore>,
         bank_forks: Arc<RwLock<BankForks>>,
@@ -243,7 +261,7 @@ impl BroadcastStage {
         socks: Vec<UdpSocket>,
         cluster_info: Arc<ClusterInfo>,
         receiver: Receiver<WorkingBankEntry>,
-        retransmit_slots_receiver: RetransmitSlotsReceiver,
+        retransmit_slots_receiver: Receiver<Slot>,
         exit: Arc<AtomicBool>,
         blockstore: Arc<Blockstore>,
         bank_forks: Arc<RwLock<BankForks>>,
@@ -333,7 +351,7 @@ impl BroadcastStage {
 
     fn check_retransmit_signals(
         blockstore: &Blockstore,
-        retransmit_slots_receiver: &RetransmitSlotsReceiver,
+        retransmit_slots_receiver: &Receiver<Slot>,
         socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
     ) -> Result<()> {
         const RECV_TIMEOUT: Duration = Duration::from_millis(100);
@@ -454,11 +472,16 @@ pub fn broadcast_shreds(
     result
 }
 
+impl<T> From<crossbeam_channel::SendError<T>> for Error {
+    fn from(_: crossbeam_channel::SendError<T>) -> Error {
+        Error::Send
+    }
+}
+
 #[cfg(test)]
 pub mod test {
     use {
         super::*,
-        crate::validator::TURBINE_QUIC_CONNECTION_POOL_SIZE,
         crossbeam_channel::unbounded,
         solana_entry::entry::create_ticks,
         solana_gossip::cluster_info::{ClusterInfo, Node},
@@ -603,7 +626,7 @@ pub mod test {
         leader_keypair: Arc<Keypair>,
         ledger_path: &Path,
         entry_receiver: Receiver<WorkingBankEntry>,
-        retransmit_slots_receiver: RetransmitSlotsReceiver,
+        retransmit_slots_receiver: Receiver<Slot>,
     ) -> MockBroadcastStage {
         // Make the database ledger
         let blockstore = Arc::new(Blockstore::open(ledger_path).unwrap());
@@ -613,7 +636,7 @@ pub mod test {
                 &leader_keypair,
                 IpAddr::V4(Ipv4Addr::LOCALHOST),
                 &Arc::<RwLock<StakedNodes>>::default(),
-                TURBINE_QUIC_CONNECTION_POOL_SIZE,
+                4, // connection_pool_size
             )
             .unwrap(),
         );

+ 0 - 0
core/src/broadcast_stage/broadcast_duplicates_run.rs → turbine/src/broadcast_stage/broadcast_duplicates_run.rs


+ 0 - 0
core/src/broadcast_stage/broadcast_fake_shreds_run.rs → turbine/src/broadcast_stage/broadcast_fake_shreds_run.rs


+ 0 - 0
core/src/broadcast_stage/broadcast_metrics.rs → turbine/src/broadcast_stage/broadcast_metrics.rs


+ 1 - 1
core/src/broadcast_stage/broadcast_utils.rs → turbine/src/broadcast_stage/broadcast_utils.rs

@@ -1,5 +1,5 @@
 use {
-    crate::result::Result,
+    super::Result,
     bincode::serialized_size,
     crossbeam_channel::Receiver,
     solana_entry::entry::Entry,

+ 0 - 0
core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs → turbine/src/broadcast_stage/fail_entry_verification_broadcast_run.rs


+ 1 - 2
core/src/broadcast_stage/standard_broadcast_run.rs → turbine/src/broadcast_stage/standard_broadcast_run.rs

@@ -509,7 +509,6 @@ fn should_use_merkle_variant(slot: Slot, cluster_type: ClusterType, shred_versio
 mod test {
     use {
         super::*,
-        crate::validator::TURBINE_QUIC_CONNECTION_POOL_SIZE,
         solana_entry::entry::create_ticks,
         solana_gossip::cluster_info::{ClusterInfo, Node},
         solana_ledger::{
@@ -580,7 +579,7 @@ mod test {
                 keypair,
                 IpAddr::V4(Ipv4Addr::LOCALHOST),
                 &Arc::<RwLock<StakedNodes>>::default(),
-                TURBINE_QUIC_CONNECTION_POOL_SIZE,
+                4, // connection_pool_size
             )
             .unwrap(),
         )

+ 1 - 5
core/src/cluster_nodes.rs → turbine/src/cluster_nodes.rs

@@ -495,11 +495,7 @@ fn enable_turbine_fanout_experiments(shred_slot: Slot, root_bank: &Bank) -> bool
 
 // Returns true if the feature is effective for the shred slot.
 #[must_use]
-pub(crate) fn check_feature_activation(
-    feature: &Pubkey,
-    shred_slot: Slot,
-    root_bank: &Bank,
-) -> bool {
+pub fn check_feature_activation(feature: &Pubkey, shred_slot: Slot, root_bank: &Bank) -> bool {
     match root_bank.feature_set.activated_slot(feature) {
         None => false,
         Some(feature_slot) => {

+ 16 - 0
turbine/src/lib.rs

@@ -0,0 +1,16 @@
+#![allow(clippy::integer_arithmetic)]
+
+pub mod broadcast_stage;
+pub mod cluster_nodes;
+pub mod retransmit_stage;
+pub mod sigverify_shreds;
+
+#[macro_use]
+extern crate log;
+
+#[macro_use]
+extern crate solana_metrics;
+
+#[cfg(test)]
+#[macro_use]
+extern crate matches;

+ 2 - 2
core/src/retransmit_stage.rs → turbine/src/retransmit_stage.rs

@@ -427,7 +427,7 @@ pub struct RetransmitStage {
 }
 
 impl RetransmitStage {
-    pub(crate) fn new(
+    pub fn new(
         bank_forks: Arc<RwLock<BankForks>>,
         leader_schedule_cache: Arc<LeaderScheduleCache>,
         cluster_info: Arc<ClusterInfo>,
@@ -453,7 +453,7 @@ impl RetransmitStage {
         }
     }
 
-    pub(crate) fn join(self) -> thread::Result<()> {
+    pub fn join(self) -> thread::Result<()> {
         self.retransmit_thread_handle.join()
     }
 }

+ 1 - 1
core/src/sigverify_shreds.rs → turbine/src/sigverify_shreds.rs

@@ -28,7 +28,7 @@ enum Error {
     SendError,
 }
 
-pub(crate) fn spawn_shred_sigverify(
+pub fn spawn_shred_sigverify(
     cluster_info: Arc<ClusterInfo>,
     bank_forks: Arc<RwLock<BankForks>>,
     leader_schedule_cache: Arc<LeaderScheduleCache>,