Explorar el Código

(Alpenglow) Upstream voting_service and staked_validators_cache. (#8197)

* (Alpenglow) Upstream voting_service and staked_validators_cache.

* Cache epoch_schedule and initialize alpenglow_sockets with capacity.
Wen hace 1 mes
padre
commit
3b835652f5
Se han modificado 6 ficheros con 1058 adiciones y 0 borrados
  1. 14 0
      Cargo.lock
  2. 8 0
      programs/sbf/Cargo.lock
  3. 14 0
      votor/Cargo.toml
  4. 6 0
      votor/src/lib.rs
  5. 605 0
      votor/src/staked_validators_cache.rs
  6. 411 0
      votor/src/voting_service.rs

+ 14 - 0
Cargo.lock

@@ -558,20 +558,26 @@ dependencies = [
  "dashmap",
  "itertools 0.12.1",
  "log",
+ "lru",
  "parking_lot 0.12.3",
  "qualifier_attr",
+ "rand 0.8.5",
  "rayon",
  "serde",
  "serde_bytes",
  "serde_derive",
+ "solana-account",
  "solana-accounts-db",
  "solana-bloom",
  "solana-bls-signatures",
+ "solana-client",
  "solana-clock",
+ "solana-connection-cache",
  "solana-entry",
  "solana-epoch-schedule",
  "solana-frozen-abi",
  "solana-frozen-abi-macro",
+ "solana-genesis-config",
  "solana-gossip",
  "solana-hash",
  "solana-keypair",
@@ -579,17 +585,25 @@ dependencies = [
  "solana-logger",
  "solana-measure",
  "solana-metrics",
+ "solana-net-utils",
+ "solana-perf",
  "solana-pubkey",
  "solana-rpc",
  "solana-runtime",
+ "solana-sdk-ids",
  "solana-signature",
  "solana-signer",
  "solana-signer-store",
+ "solana-streamer",
  "solana-time-utils",
  "solana-transaction",
+ "solana-transaction-error",
+ "solana-vote",
+ "solana-vote-program",
  "solana-votor-messages",
  "test-case",
  "thiserror 2.0.16",
+ "tokio-util 0.7.16",
 ]
 
 [[package]]

+ 8 - 0
programs/sbf/Cargo.lock

@@ -296,18 +296,23 @@ dependencies = [
  "dashmap",
  "itertools 0.12.1",
  "log",
+ "lru",
  "parking_lot 0.12.2",
  "qualifier_attr",
  "rayon",
  "serde",
  "serde_bytes",
  "serde_derive",
+ "solana-account",
  "solana-accounts-db",
  "solana-bloom",
  "solana-bls-signatures",
+ "solana-client",
  "solana-clock",
+ "solana-connection-cache",
  "solana-entry",
  "solana-epoch-schedule",
+ "solana-genesis-config",
  "solana-gossip",
  "solana-hash",
  "solana-keypair",
@@ -323,6 +328,9 @@ dependencies = [
  "solana-signer-store",
  "solana-time-utils",
  "solana-transaction",
+ "solana-transaction-error",
+ "solana-vote",
+ "solana-vote-program",
  "solana-votor-messages",
  "thiserror 2.0.16",
 ]

+ 14 - 0
votor/Cargo.toml

@@ -31,16 +31,20 @@ crossbeam-channel = { workspace = true }
 dashmap = { workspace = true, features = ["rayon", "raw-api"] }
 itertools = { workspace = true }
 log = { workspace = true }
+lru = { workspace = true }
 parking_lot = { workspace = true }
 qualifier_attr = { workspace = true }
 rayon = { workspace = true }
 serde = { workspace = true }
 serde_bytes = { workspace = true }
 serde_derive = { workspace = true }
+solana-account = { workspace = true }
 solana-accounts-db = { workspace = true }
 solana-bloom = { workspace = true }
 solana-bls-signatures = { workspace = true }
+solana-client = { workspace = true }
 solana-clock = { workspace = true }
+solana-connection-cache = { workspace = true }
 solana-entry = { workspace = true }
 solana-epoch-schedule = { workspace = true }
 solana-frozen-abi = { workspace = true, optional = true, features = [
@@ -49,6 +53,7 @@ solana-frozen-abi = { workspace = true, optional = true, features = [
 solana-frozen-abi-macro = { workspace = true, optional = true, features = [
     "frozen-abi",
 ] }
+solana-genesis-config = { workspace = true }
 solana-gossip = { workspace = true }
 solana-hash = { workspace = true }
 solana-keypair = { workspace = true }
@@ -64,12 +69,21 @@ solana-signer = { workspace = true }
 solana-signer-store = { workspace = true }
 solana-time-utils = { workspace = true }
 solana-transaction = { workspace = true }
+solana-transaction-error = { workspace = true }
+solana-vote = { workspace = true }
+solana-vote-program = { workspace = true }
 solana-votor-messages = { workspace = true }
 thiserror = { workspace = true }
 
 [dev-dependencies]
+rand = { workspace = true }
+solana-net-utils = { workspace = true }
+solana-perf = { workspace = true, features = ["dev-context-only-utils"] }
 solana-runtime = { workspace = true, features = ["dev-context-only-utils"] }
+solana-sdk-ids = { workspace = true }
+solana-streamer = { workspace = true, features = ["dev-context-only-utils"] }
 test-case = { workspace = true }
+tokio-util = { workspace = true }
 
 [lints]
 workspace = true

+ 6 - 0
votor/src/lib.rs

@@ -22,11 +22,17 @@ extern crate log;
 #[cfg(feature = "agave-unstable-api")]
 extern crate serde_derive;
 
+#[cfg(feature = "agave-unstable-api")]
+mod staked_validators_cache;
+
 #[cfg(feature = "agave-unstable-api")]
 pub mod vote_history;
 #[cfg(feature = "agave-unstable-api")]
 pub mod vote_history_storage;
 
+#[cfg(feature = "agave-unstable-api")]
+mod voting_service;
+
 #[cfg_attr(feature = "frozen-abi", macro_use)]
 #[cfg(feature = "frozen-abi")]
 extern crate solana_frozen_abi_macro;

+ 605 - 0
votor/src/staked_validators_cache.rs

@@ -0,0 +1,605 @@
+#![allow(dead_code)]
+
+use {
+    crate::voting_service::AlpenglowPortOverride,
+    lru::LruCache,
+    solana_clock::{Epoch, Slot},
+    solana_epoch_schedule::EpochSchedule,
+    solana_gossip::cluster_info::ClusterInfo,
+    solana_pubkey::Pubkey,
+    solana_runtime::bank_forks::BankForks,
+    std::{
+        collections::HashMap,
+        net::SocketAddr,
+        sync::{Arc, RwLock},
+        time::{Duration, Instant},
+    },
+};
+
+struct StakedValidatorsCacheEntry {
+    /// Alpenglow Sockets associated with the staked validators
+    alpenglow_sockets: Vec<SocketAddr>,
+
+    /// The time at which this entry was created
+    creation_time: Instant,
+}
+
+/// Maintain `SocketAddr`s associated with all staked validators for a particular protocol (e.g.,
+/// UDP, QUIC) over number of epochs.
+///
+/// We employ an LRU cache with capped size, mapping Epoch to cache entries that store the socket
+/// information. We also track cache entry times, forcing recalculations of cache entries that are
+/// accessed after a specified TTL.
+pub struct StakedValidatorsCache {
+    /// key: the epoch for which we have cached our stake validators list
+    /// value: the cache entry
+    cache: LruCache<Epoch, StakedValidatorsCacheEntry>,
+
+    /// Time to live for cache entries
+    ttl: Duration,
+
+    /// Bank forks
+    bank_forks: Arc<RwLock<BankForks>>,
+
+    // Cache Epoch schedule since it never changes
+    epoch_schedule: EpochSchedule,
+
+    /// Whether to include the running validator's socket address in cache entries
+    include_self: bool,
+
+    /// Optional override for Alpenglow port, used for testing purposes
+    alpenglow_port_override: Option<AlpenglowPortOverride>,
+
+    /// timestamp of the last alpenglow port override we read
+    alpenglow_port_override_last_modified: Instant,
+}
+
+impl StakedValidatorsCache {
+    pub fn new(
+        bank_forks: Arc<RwLock<BankForks>>,
+        ttl: Duration,
+        max_cache_size: usize,
+        include_self: bool,
+        alpenglow_port_override: Option<AlpenglowPortOverride>,
+    ) -> Self {
+        let epoch_schedule = bank_forks
+            .read()
+            .unwrap()
+            .working_bank()
+            .epoch_schedule()
+            .clone();
+        Self {
+            cache: LruCache::new(max_cache_size),
+            ttl,
+            bank_forks,
+            epoch_schedule,
+            include_self,
+            alpenglow_port_override,
+            alpenglow_port_override_last_modified: Instant::now(),
+        }
+    }
+
+    #[inline]
+    fn cur_epoch(&self, slot: Slot) -> Epoch {
+        self.epoch_schedule.get_epoch(slot)
+    }
+
+    fn refresh_cache_entry(
+        &mut self,
+        epoch: Epoch,
+        cluster_info: &ClusterInfo,
+        update_time: Instant,
+    ) {
+        let banks = {
+            let bank_forks = self.bank_forks.read().unwrap();
+            [bank_forks.root_bank(), bank_forks.working_bank()]
+        };
+
+        let epoch_staked_nodes = banks
+            .iter()
+            .find_map(|bank| bank.epoch_staked_nodes(epoch))
+            .unwrap_or_else(|| {
+                error!(
+                    "StakedValidatorsCache::get: unknown Bank::epoch_staked_nodes for epoch: \
+                     {epoch}"
+                );
+                Arc::<HashMap<Pubkey, u64>>::default()
+            });
+
+        struct Node {
+            pubkey: Pubkey,
+            stake: u64,
+            alpenglow_socket: SocketAddr,
+        }
+
+        let mut nodes: Vec<_> = epoch_staked_nodes
+            .iter()
+            .filter(|(pubkey, stake)| {
+                let positive_stake = **stake > 0;
+                let not_self = pubkey != &&cluster_info.id();
+
+                positive_stake && (self.include_self || not_self)
+            })
+            .filter_map(|(pubkey, stake)| {
+                cluster_info.lookup_contact_info(pubkey, |node| {
+                    node.alpenglow().map(|alpenglow_socket| Node {
+                        pubkey: *pubkey,
+                        stake: *stake,
+                        alpenglow_socket,
+                    })
+                })?
+            })
+            .collect();
+
+        nodes.dedup_by_key(|node| node.alpenglow_socket);
+        nodes.sort_unstable_by(|a, b| a.stake.cmp(&b.stake));
+
+        let mut alpenglow_sockets = Vec::with_capacity(nodes.len());
+        let override_map = self
+            .alpenglow_port_override
+            .as_ref()
+            .map(|x| x.get_override_map());
+        for node in nodes {
+            let alpenglow_socket = node.alpenglow_socket;
+            let socket = if let Some(override_map) = &override_map {
+                // If we have an override, use it.
+                override_map
+                    .get(&node.pubkey)
+                    .cloned()
+                    .unwrap_or(alpenglow_socket)
+            } else {
+                alpenglow_socket
+            };
+            alpenglow_sockets.push(socket);
+        }
+        self.cache.push(
+            epoch,
+            StakedValidatorsCacheEntry {
+                alpenglow_sockets,
+                creation_time: update_time,
+            },
+        );
+    }
+
+    pub fn get_staked_validators_by_slot(
+        &mut self,
+        slot: Slot,
+        cluster_info: &ClusterInfo,
+        access_time: Instant,
+    ) -> (&[SocketAddr], bool) {
+        let epoch = self.cur_epoch(slot);
+        // Check if self.alpenglow_port_override has a different last_modified.
+        // Immediately refresh the cache if it does.
+        if let Some(alpenglow_port_override) = &self.alpenglow_port_override {
+            if alpenglow_port_override.has_new_override(self.alpenglow_port_override_last_modified)
+            {
+                self.alpenglow_port_override_last_modified =
+                    alpenglow_port_override.last_modified();
+                trace!(
+                    "refreshing cache entry for epoch {epoch} due to alpenglow port override \
+                     last_modified change"
+                );
+                self.refresh_cache_entry(epoch, cluster_info, access_time);
+            }
+        }
+
+        self.get_staked_validators_by_epoch(epoch, cluster_info, access_time)
+    }
+
+    fn get_staked_validators_by_epoch(
+        &mut self,
+        epoch: Epoch,
+        cluster_info: &ClusterInfo,
+        access_time: Instant,
+    ) -> (&[SocketAddr], bool) {
+        // For a given epoch, if we either:
+        //
+        // (1) have a cache entry that has expired
+        // (2) have no existing cache entry
+        //
+        // then update the cache.
+        let refresh_cache = self
+            .cache
+            .get(&epoch)
+            .map(|v| Some(access_time) > v.creation_time.checked_add(self.ttl))
+            .unwrap_or(true);
+
+        if refresh_cache {
+            self.refresh_cache_entry(epoch, cluster_info, access_time);
+        }
+
+        (
+            // Unwrapping is fine here, since update_cache guarantees that we push a cache entry to
+            // self.cache[epoch].
+            self.cache
+                .get(&epoch)
+                .map(|v| &*v.alpenglow_sockets)
+                .unwrap(),
+            refresh_cache,
+        )
+    }
+
+    #[cfg(test)]
+    pub fn len(&self) -> usize {
+        self.cache.len()
+    }
+
+    #[cfg(test)]
+    pub fn is_empty(&self) -> bool {
+        self.cache.is_empty()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use {
+        super::StakedValidatorsCache,
+        crate::voting_service::AlpenglowPortOverride,
+        rand::Rng,
+        solana_gossip::{
+            cluster_info::ClusterInfo, contact_info::ContactInfo, crds::GossipRoute,
+            crds_data::CrdsData, crds_value::CrdsValue, node::Node,
+        },
+        solana_keypair::Keypair,
+        solana_pubkey::Pubkey,
+        solana_runtime::{
+            bank::Bank,
+            bank_forks::BankForks,
+            genesis_utils::{
+                create_genesis_config_with_alpenglow_vote_accounts, ValidatorVoteKeypairs,
+            },
+        },
+        solana_signer::Signer,
+        solana_streamer::socket::SocketAddrSpace,
+        solana_time_utils::timestamp,
+        std::{
+            collections::HashMap,
+            net::{Ipv4Addr, SocketAddr},
+            sync::{Arc, RwLock},
+            time::{Duration, Instant},
+        },
+        test_case::test_case,
+    };
+
+    fn update_cluster_info(
+        cluster_info: &mut ClusterInfo,
+        node_keypair_map: HashMap<Pubkey, Keypair>,
+    ) {
+        // Update cluster info
+        {
+            let node_contact_info = node_keypair_map
+                .keys()
+                .enumerate()
+                .map(|(node_ix, pubkey)| {
+                    let mut contact_info = ContactInfo::new(*pubkey, 0_u64, 0_u16);
+
+                    assert!(contact_info
+                        .set_alpenglow((
+                            Ipv4Addr::LOCALHOST,
+                            8080_u16.saturating_add(node_ix as u16)
+                        ))
+                        .is_ok());
+
+                    contact_info
+                });
+
+            for contact_info in node_contact_info {
+                let node_pubkey = *contact_info.pubkey();
+
+                let entry = CrdsValue::new(
+                    CrdsData::ContactInfo(contact_info),
+                    &node_keypair_map[&node_pubkey],
+                );
+
+                assert_eq!(node_pubkey, entry.label().pubkey());
+
+                {
+                    let mut gossip_crds = cluster_info.gossip.crds.write().unwrap();
+
+                    gossip_crds
+                        .insert(entry, timestamp(), GossipRoute::LocalMessage)
+                        .unwrap();
+                }
+            }
+        }
+    }
+
+    /// Create a number of nodes; each node will have exactly one vote account. Each vote account
+    /// will have random stake in [1, 997), with the exception of the first few vote accounts
+    /// having exactly 0 stake.
+    fn create_bank_forks_and_cluster_info(
+        num_nodes: usize,
+        num_zero_stake_nodes: usize,
+        base_slot: u64,
+    ) -> (Arc<RwLock<BankForks>>, ClusterInfo, Vec<Pubkey>) {
+        let mut rng = rand::thread_rng();
+        let validator_keypairs = (0..num_nodes)
+            .map(|_| ValidatorVoteKeypairs::new(Keypair::new(), Keypair::new(), Keypair::new()))
+            .collect::<Vec<ValidatorVoteKeypairs>>();
+
+        let my_keypair = validator_keypairs
+            .last()
+            .unwrap()
+            .node_keypair
+            .insecure_clone();
+
+        let node_keypair_map: HashMap<Pubkey, Keypair> = validator_keypairs
+            .iter()
+            .map(|v| (v.node_keypair.pubkey(), v.node_keypair.insecure_clone()))
+            .collect();
+        let stakes: Vec<u64> = (0..num_nodes)
+            .map(|node_ix| {
+                if node_ix < num_zero_stake_nodes {
+                    0
+                } else {
+                    rng.gen_range(1..997)
+                }
+            })
+            .collect();
+
+        let genesis = create_genesis_config_with_alpenglow_vote_accounts(
+            1_000_000_000,
+            &validator_keypairs,
+            stakes,
+        );
+
+        let mut bank0 = Bank::new_for_tests(&genesis.genesis_config);
+        let base_slot_epoch = bank0.epoch_schedule().get_epoch(base_slot);
+        bank0.set_epoch_stakes_for_test(base_slot_epoch, bank0.epoch_stakes(0).unwrap().clone());
+        let bank_forks = BankForks::new_rw_arc(bank0);
+
+        let mut cluster_info = ClusterInfo::new(
+            Node::new_localhost_with_pubkey(&my_keypair.pubkey()).info,
+            Arc::new(my_keypair),
+            SocketAddrSpace::Unspecified,
+        );
+        update_cluster_info(&mut cluster_info, node_keypair_map);
+        (
+            bank_forks,
+            cluster_info,
+            validator_keypairs
+                .iter()
+                .map(|v| v.node_keypair.pubkey())
+                .collect::<Vec<Pubkey>>(),
+        )
+    }
+
+    #[test_case(1_usize, 0_usize)]
+    #[test_case(10_usize, 2_usize)]
+    #[test_case(50_usize, 7_usize)]
+    fn test_detect_only_staked_nodes_and_refresh_after_ttl(
+        num_nodes: usize,
+        num_zero_stake_nodes: usize,
+    ) {
+        let slot_num = 325_000_000_u64;
+        let (bank_forks, cluster_info, _) =
+            create_bank_forks_and_cluster_info(num_nodes, num_zero_stake_nodes, slot_num);
+
+        // Create our staked validators cache
+        let mut svc = StakedValidatorsCache::new(bank_forks, Duration::from_secs(5), 5, true, None);
+
+        let now = Instant::now();
+
+        let (sockets, refreshed) = svc.get_staked_validators_by_slot(slot_num, &cluster_info, now);
+
+        assert!(refreshed);
+        assert_eq!(
+            num_nodes.saturating_sub(num_zero_stake_nodes),
+            sockets.len()
+        );
+        assert_eq!(1, svc.len());
+
+        // Re-fetch from the cache right before the 5-second deadline
+        let (sockets, refreshed) = svc.get_staked_validators_by_slot(
+            slot_num,
+            &cluster_info,
+            now.checked_add(Duration::from_secs_f64(4.999)).unwrap(),
+        );
+
+        assert!(!refreshed);
+        assert_eq!(
+            num_nodes.saturating_sub(num_zero_stake_nodes),
+            sockets.len()
+        );
+        assert_eq!(1, svc.len());
+
+        // Re-fetch from the cache right at the 5-second deadline - we still shouldn't refresh.
+        let (sockets, refreshed) = svc.get_staked_validators_by_slot(
+            slot_num,
+            &cluster_info,
+            now.checked_add(Duration::from_secs(5)).unwrap(),
+        );
+        assert!(!refreshed);
+        assert_eq!(
+            num_nodes.saturating_sub(num_zero_stake_nodes),
+            sockets.len()
+        );
+        assert_eq!(1, svc.len());
+
+        // Re-fetch from the cache right after the 5-second deadline - now we should refresh.
+        let (sockets, refreshed) = svc.get_staked_validators_by_slot(
+            slot_num,
+            &cluster_info,
+            now.checked_add(Duration::from_secs_f64(5.001)).unwrap(),
+        );
+
+        assert!(refreshed);
+        assert_eq!(
+            num_nodes.saturating_sub(num_zero_stake_nodes),
+            sockets.len()
+        );
+        assert_eq!(1, svc.len());
+
+        // Re-fetch from the cache well after the 5-second deadline - we should refresh.
+        let (sockets, refreshed) = svc.get_staked_validators_by_slot(
+            slot_num,
+            &cluster_info,
+            now.checked_add(Duration::from_secs(100)).unwrap(),
+        );
+
+        assert!(refreshed);
+        assert_eq!(
+            num_nodes.saturating_sub(num_zero_stake_nodes),
+            sockets.len()
+        );
+        assert_eq!(1, svc.len());
+    }
+
+    #[test]
+    fn test_cache_eviction() {
+        let base_slot = 325_000_000_000;
+        let (bank_forks, cluster_info, _) = create_bank_forks_and_cluster_info(50, 7, base_slot);
+
+        // Create our staked validators cache
+        let mut svc = StakedValidatorsCache::new(bank_forks, Duration::from_secs(5), 5, true, None);
+
+        assert_eq!(0, svc.len());
+        assert!(svc.is_empty());
+
+        let now = Instant::now();
+
+        // Populate the first five entries; accessing the cache once again shouldn't trigger any
+        // refreshes.
+        for entry_ix in 1_u64..=5_u64 {
+            let (_, refreshed) = svc.get_staked_validators_by_slot(
+                entry_ix.saturating_mul(base_slot),
+                &cluster_info,
+                now,
+            );
+            assert!(refreshed);
+            assert_eq!(entry_ix as usize, svc.len());
+
+            let (_, refreshed) = svc.get_staked_validators_by_slot(
+                entry_ix.saturating_mul(base_slot),
+                &cluster_info,
+                now,
+            );
+            assert!(!refreshed);
+            assert_eq!(entry_ix as usize, svc.len());
+        }
+
+        // Entry 6 - this shouldn't increase the cache length.
+        let (_, refreshed) = svc.get_staked_validators_by_slot(6 * base_slot, &cluster_info, now);
+        assert!(refreshed);
+        assert_eq!(5, svc.len());
+
+        // Epoch 1 should have been evicted
+        assert!(!svc.cache.contains(&svc.cur_epoch(base_slot)));
+
+        // Epochs 2 - 6 should have entries
+        for entry_ix in 2_u64..=6_u64 {
+            assert!(svc
+                .cache
+                .contains(&svc.cur_epoch(entry_ix.saturating_mul(base_slot))));
+        }
+
+        // Accessing the cache after TTL should recalculate everything; the size remains 5, since
+        // we only ever lazily evict cache entries.
+        for entry_ix in 1_u64..=5_u64 {
+            let (_, refreshed) = svc.get_staked_validators_by_slot(
+                entry_ix.saturating_mul(base_slot),
+                &cluster_info,
+                now.checked_add(Duration::from_secs(10)).unwrap(),
+            );
+            assert!(refreshed);
+            assert_eq!(5, svc.len());
+        }
+    }
+
+    #[test]
+    fn test_only_update_once_per_epoch() {
+        let slot_num = 325_000_000_u64;
+        let num_nodes = 10_usize;
+        let num_zero_stake_nodes = 2_usize;
+
+        let (bank_forks, cluster_info, _) =
+            create_bank_forks_and_cluster_info(num_nodes, num_zero_stake_nodes, slot_num);
+
+        // Create our staked validators cache
+        let mut svc = StakedValidatorsCache::new(bank_forks, Duration::from_secs(5), 5, true, None);
+
+        let now = Instant::now();
+
+        let (_, refreshed) = svc.get_staked_validators_by_slot(slot_num, &cluster_info, now);
+        assert!(refreshed);
+
+        let (_, refreshed) = svc.get_staked_validators_by_slot(slot_num, &cluster_info, now);
+        assert!(!refreshed);
+
+        let (_, refreshed) = svc.get_staked_validators_by_slot(2 * slot_num, &cluster_info, now);
+        assert!(refreshed);
+    }
+
+    #[test_case(1_usize)]
+    #[test_case(10_usize)]
+    fn test_exclude_self_from_cache(num_nodes: usize) {
+        let slot_num = 325_000_000_u64;
+
+        let (bank_forks, cluster_info, _) =
+            create_bank_forks_and_cluster_info(num_nodes, 0, slot_num);
+
+        let keypair = cluster_info.keypair().insecure_clone();
+
+        let my_socket_addr = cluster_info
+            .lookup_contact_info(&keypair.pubkey(), |node| node.alpenglow().unwrap())
+            .unwrap();
+
+        // Create our staked validators cache - set include_self to true
+        let mut svc =
+            StakedValidatorsCache::new(bank_forks.clone(), Duration::from_secs(5), 5, true, None);
+
+        let (sockets, _) =
+            svc.get_staked_validators_by_slot(slot_num, &cluster_info, Instant::now());
+        assert_eq!(sockets.len(), num_nodes);
+        assert!(sockets.contains(&my_socket_addr));
+
+        // Create our staked validators cache - set include_self to false
+        let mut svc =
+            StakedValidatorsCache::new(bank_forks.clone(), Duration::from_secs(5), 5, false, None);
+
+        let (sockets, _) =
+            svc.get_staked_validators_by_slot(slot_num, &cluster_info, Instant::now());
+        // We should have num_nodes - 1 sockets, since we exclude our own socket address.
+        assert_eq!(sockets.len(), num_nodes.checked_sub(1).unwrap());
+        assert!(!sockets.contains(&my_socket_addr));
+    }
+
+    #[test]
+    fn test_alpenglow_port_override() {
+        solana_logger::setup();
+        let (bank_forks, cluster_info, node_pubkeys) = create_bank_forks_and_cluster_info(3, 0, 1);
+        let pubkey_b = node_pubkeys[1];
+
+        let alpenglow_port_override = AlpenglowPortOverride::default();
+        let blackhole_addr: SocketAddr = "0.0.0.0:0".parse().unwrap();
+
+        // Create our staked validators cache - set include_self to false
+        let mut svc = StakedValidatorsCache::new(
+            bank_forks.clone(),
+            Duration::from_secs(5),
+            5,
+            false,
+            Some(alpenglow_port_override.clone()),
+        );
+        // Nothing in the override, so we should get the original socket addresses.
+        let (sockets, _) = svc.get_staked_validators_by_slot(0, &cluster_info, Instant::now());
+        assert_eq!(sockets.len(), 2);
+        assert!(!sockets.contains(&blackhole_addr));
+
+        // Add an override for pubkey_B, and check that we get the overridden socket address.
+        alpenglow_port_override.update_override(HashMap::from([(pubkey_b, blackhole_addr)]));
+        let (sockets, _) = svc.get_staked_validators_by_slot(0, &cluster_info, Instant::now());
+        assert_eq!(sockets.len(), 2);
+        // Sort sockets to ensure the blackhole address is at index 0.
+        let mut sockets: Vec<_> = sockets.to_vec();
+        sockets.sort();
+        assert_eq!(sockets[0], blackhole_addr);
+        assert_ne!(sockets[1], blackhole_addr);
+
+        // Now clear the override, and check that we get the original socket addresses.
+        alpenglow_port_override.clear();
+        let (sockets, _) = svc.get_staked_validators_by_slot(0, &cluster_info, Instant::now());
+        assert_eq!(sockets.len(), 2);
+        assert!(!sockets.contains(&blackhole_addr));
+    }
+}

+ 411 - 0
votor/src/voting_service.rs

@@ -0,0 +1,411 @@
+#![allow(dead_code)]
+
+use {
+    crate::{
+        staked_validators_cache::StakedValidatorsCache,
+        vote_history_storage::{SavedVoteHistoryVersions, VoteHistoryStorage},
+    },
+    bincode::serialize,
+    crossbeam_channel::Receiver,
+    solana_client::connection_cache::ConnectionCache,
+    solana_clock::Slot,
+    solana_connection_cache::client_connection::ClientConnection,
+    solana_gossip::cluster_info::ClusterInfo,
+    solana_measure::measure::Measure,
+    solana_pubkey::Pubkey,
+    solana_runtime::bank_forks::BankForks,
+    solana_transaction_error::TransportError,
+    solana_votor_messages::consensus_message::{CertificateMessage, ConsensusMessage},
+    std::{
+        collections::HashMap,
+        net::SocketAddr,
+        sync::{Arc, RwLock},
+        thread::{self, Builder, JoinHandle},
+        time::{Duration, Instant},
+    },
+    thiserror::Error,
+};
+
+const STAKED_VALIDATORS_CACHE_TTL_S: u64 = 5;
+const STAKED_VALIDATORS_CACHE_NUM_EPOCH_CAP: usize = 5;
+
+#[derive(Debug, Error)]
+enum SendVoteError {
+    #[error(transparent)]
+    BincodeError(#[from] bincode::Error),
+    #[error(transparent)]
+    TransportError(#[from] TransportError),
+}
+
+#[derive(Debug)]
+pub enum BLSOp {
+    PushVote {
+        message: Arc<ConsensusMessage>,
+        slot: Slot,
+        saved_vote_history: SavedVoteHistoryVersions,
+    },
+    PushCertificate {
+        certificate: Arc<CertificateMessage>,
+    },
+}
+
+fn send_message(
+    buf: Vec<u8>,
+    socket: &SocketAddr,
+    connection_cache: &Arc<ConnectionCache>,
+) -> Result<(), TransportError> {
+    let client = connection_cache.get_connection(socket);
+
+    client.send_data_async(Arc::new(buf))
+}
+
+pub struct VotingService {
+    thread_hdl: JoinHandle<()>,
+}
+
+/// Override for Alpenglow ports to allow testing with different ports
+/// The last_modified is used to determine if the override has changed so
+/// StakedValidatorsCache can refresh its cache.
+/// Inside the map, the key is the validator's vote pubkey and the value
+/// is the overridden socket address.
+/// For example, if you want validator A to send messages for validator B's
+/// Alpenglow port to a new_address, you would insert an entry into the A's
+/// map like this: (B will not get the message as a result):
+/// `override_map.insert(validator_b_pubkey, new_address);`
+#[derive(Clone, Default)]
+pub struct AlpenglowPortOverride {
+    inner: Arc<RwLock<AlpenglowPortOverrideInner>>,
+}
+
+#[derive(Clone)]
+struct AlpenglowPortOverrideInner {
+    override_map: HashMap<Pubkey, SocketAddr>,
+    last_modified: Instant,
+}
+
+impl Default for AlpenglowPortOverrideInner {
+    fn default() -> Self {
+        Self {
+            override_map: HashMap::new(),
+            last_modified: Instant::now(),
+        }
+    }
+}
+
+impl AlpenglowPortOverride {
+    pub fn update_override(&self, new_override: HashMap<Pubkey, SocketAddr>) {
+        let mut inner = self.inner.write().unwrap();
+        inner.override_map = new_override;
+        inner.last_modified = Instant::now();
+    }
+
+    pub fn has_new_override(&self, previous: Instant) -> bool {
+        self.inner.read().unwrap().last_modified != previous
+    }
+
+    pub fn last_modified(&self) -> Instant {
+        self.inner.read().unwrap().last_modified
+    }
+
+    pub fn clear(&self) {
+        let mut inner = self.inner.write().unwrap();
+        inner.override_map.clear();
+        inner.last_modified = Instant::now();
+    }
+
+    pub fn get_override_map(&self) -> HashMap<Pubkey, SocketAddr> {
+        self.inner.read().unwrap().override_map.clone()
+    }
+}
+
+#[derive(Clone)]
+pub struct VotingServiceOverride {
+    pub additional_listeners: Vec<SocketAddr>,
+    pub alpenglow_port_override: AlpenglowPortOverride,
+}
+
+impl VotingService {
+    pub fn new(
+        bls_receiver: Receiver<BLSOp>,
+        cluster_info: Arc<ClusterInfo>,
+        vote_history_storage: Arc<dyn VoteHistoryStorage>,
+        connection_cache: Arc<ConnectionCache>,
+        bank_forks: Arc<RwLock<BankForks>>,
+        test_override: Option<VotingServiceOverride>,
+    ) -> Self {
+        let (additional_listeners, alpenglow_port_override) = match test_override {
+            None => (Vec::new(), None),
+            Some(VotingServiceOverride {
+                additional_listeners,
+                alpenglow_port_override,
+            }) => (additional_listeners, Some(alpenglow_port_override)),
+        };
+
+        let thread_hdl = Builder::new()
+            .name("solVoteService".to_string())
+            .spawn(move || {
+                let mut staked_validators_cache = StakedValidatorsCache::new(
+                    bank_forks.clone(),
+                    Duration::from_secs(STAKED_VALIDATORS_CACHE_TTL_S),
+                    STAKED_VALIDATORS_CACHE_NUM_EPOCH_CAP,
+                    false,
+                    alpenglow_port_override,
+                );
+
+                loop {
+                    let Ok(bls_op) = bls_receiver.recv() else {
+                        break;
+                    };
+                    Self::handle_bls_op(
+                        &cluster_info,
+                        vote_history_storage.as_ref(),
+                        bls_op,
+                        connection_cache.clone(),
+                        &additional_listeners,
+                        &mut staked_validators_cache,
+                    );
+                }
+            })
+            .unwrap();
+        Self { thread_hdl }
+    }
+
+    fn broadcast_consensus_message(
+        slot: Slot,
+        cluster_info: &ClusterInfo,
+        message: &ConsensusMessage,
+        connection_cache: Arc<ConnectionCache>,
+        additional_listeners: &[SocketAddr],
+        staked_validators_cache: &mut StakedValidatorsCache,
+    ) {
+        let buf = match serialize(message) {
+            Ok(buf) => buf,
+            Err(err) => {
+                error!("Failed to serialize alpenglow message: {err:?}");
+                return;
+            }
+        };
+
+        let (staked_validator_alpenglow_sockets, _) = staked_validators_cache
+            .get_staked_validators_by_slot(slot, cluster_info, Instant::now());
+        let sockets = additional_listeners
+            .iter()
+            .chain(staked_validator_alpenglow_sockets.iter());
+
+        // We use send_message in a loop right now because we worry that sending packets too fast
+        // will cause a packet spike and overwhelm the network. If we later find out that this is
+        // not an issue, we can optimize this by using multi_targret_send or similar methods.
+        for socket in sockets {
+            if let Err(e) = send_message(buf.clone(), socket, &connection_cache) {
+                warn!("Failed to send alpenglow message to {socket}: {e:?}");
+            }
+        }
+    }
+
+    fn handle_bls_op(
+        cluster_info: &ClusterInfo,
+        vote_history_storage: &dyn VoteHistoryStorage,
+        bls_op: BLSOp,
+        connection_cache: Arc<ConnectionCache>,
+        additional_listeners: &[SocketAddr],
+        staked_validators_cache: &mut StakedValidatorsCache,
+    ) {
+        match bls_op {
+            BLSOp::PushVote {
+                message,
+                slot,
+                saved_vote_history,
+            } => {
+                let mut measure = Measure::start("alpenglow vote history save");
+                if let Err(err) = vote_history_storage.store(&saved_vote_history) {
+                    error!("Unable to save vote history to storage: {err:?}");
+                    std::process::exit(1);
+                }
+                measure.stop();
+                trace!("{measure}");
+
+                Self::broadcast_consensus_message(
+                    slot,
+                    cluster_info,
+                    &message,
+                    connection_cache,
+                    additional_listeners,
+                    staked_validators_cache,
+                );
+            }
+            BLSOp::PushCertificate { certificate } => {
+                let vote_slot = certificate.certificate.slot();
+                let message = ConsensusMessage::Certificate((*certificate).clone());
+                Self::broadcast_consensus_message(
+                    vote_slot,
+                    cluster_info,
+                    &message,
+                    connection_cache,
+                    additional_listeners,
+                    staked_validators_cache,
+                );
+            }
+        }
+    }
+
+    pub fn join(self) -> thread::Result<()> {
+        self.thread_hdl.join()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use {
+        super::*,
+        crate::vote_history_storage::{
+            NullVoteHistoryStorage, SavedVoteHistory, SavedVoteHistoryVersions,
+        },
+        solana_bls_signatures::Signature as BLSSignature,
+        solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo},
+        solana_keypair::Keypair,
+        solana_runtime::{
+            bank::Bank,
+            bank_forks::BankForks,
+            genesis_utils::{
+                create_genesis_config_with_alpenglow_vote_accounts, ValidatorVoteKeypairs,
+            },
+        },
+        solana_signer::Signer,
+        solana_streamer::{
+            quic::{spawn_server_with_cancel, QuicServerParams, SpawnServerResult},
+            socket::SocketAddrSpace,
+            streamer::StakedNodes,
+        },
+        solana_votor_messages::{
+            consensus_message::{
+                Certificate, CertificateMessage, CertificateType, ConsensusMessage, VoteMessage,
+            },
+            vote::Vote,
+        },
+        std::{net::SocketAddr, sync::Arc},
+        test_case::test_case,
+        tokio_util::sync::CancellationToken,
+    };
+
+    fn create_voting_service(
+        bls_receiver: Receiver<BLSOp>,
+        listener: SocketAddr,
+    ) -> (VotingService, Vec<ValidatorVoteKeypairs>) {
+        // Create 10 node validatorvotekeypairs vec
+        let validator_keypairs = (0..10)
+            .map(|_| ValidatorVoteKeypairs::new_rand())
+            .collect::<Vec<_>>();
+        let genesis = create_genesis_config_with_alpenglow_vote_accounts(
+            1_000_000_000,
+            &validator_keypairs,
+            vec![100; validator_keypairs.len()],
+        );
+        let bank0 = Bank::new_for_tests(&genesis.genesis_config);
+        let bank_forks = BankForks::new_rw_arc(bank0);
+        let keypair = Keypair::new();
+        let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0);
+        let cluster_info = ClusterInfo::new(
+            contact_info,
+            Arc::new(keypair),
+            SocketAddrSpace::Unspecified,
+        );
+
+        (
+            VotingService::new(
+                bls_receiver,
+                Arc::new(cluster_info),
+                Arc::new(NullVoteHistoryStorage::default()),
+                Arc::new(ConnectionCache::new_quic(
+                    "TestAlpenglowConnectionCache",
+                    10,
+                )),
+                bank_forks.clone(),
+                Some(VotingServiceOverride {
+                    additional_listeners: vec![listener],
+                    alpenglow_port_override: AlpenglowPortOverride::default(),
+                }),
+            ),
+            validator_keypairs,
+        )
+    }
+
+    #[test_case(BLSOp::PushVote {
+        message: Arc::new(ConsensusMessage::Vote(VoteMessage {
+            vote: Vote::new_skip_vote(5),
+            signature: BLSSignature::default(),
+            rank: 1,
+        })),
+        slot: 5,
+        saved_vote_history: SavedVoteHistoryVersions::Current(SavedVoteHistory::default()),
+    }, ConsensusMessage::Vote(VoteMessage {
+        vote: Vote::new_skip_vote(5),
+        signature: BLSSignature::default(),
+        rank: 1,
+    }))]
+    #[test_case(BLSOp::PushCertificate {
+        certificate: Arc::new(CertificateMessage {
+            certificate: Certificate::new(CertificateType::Skip, 5, None),
+            signature: BLSSignature::default(),
+            bitmap: Vec::new(),
+        }),
+    }, ConsensusMessage::Certificate(CertificateMessage {
+        certificate: Certificate::new(CertificateType::Skip, 5, None),
+        signature: BLSSignature::default(),
+        bitmap: Vec::new(),
+    }))]
+    fn test_send_message(bls_op: BLSOp, expected_message: ConsensusMessage) {
+        solana_logger::setup();
+        let (bls_sender, bls_receiver) = crossbeam_channel::unbounded();
+        // Create listener thread on a random port we allocated and return SocketAddr to create VotingService
+
+        // Bind to a random UDP port
+        let socket = solana_net_utils::bind_to_localhost().unwrap();
+        let listener_addr = socket.local_addr().unwrap();
+
+        // Create VotingService with the listener address
+        let (_, validator_keypairs) = create_voting_service(bls_receiver, listener_addr);
+
+        // Send a BLS message via the VotingService
+        assert!(bls_sender.send(bls_op).is_ok());
+
+        // Start a quick streamer to handle quick control packets
+        let (sender, receiver) = crossbeam_channel::unbounded();
+        let stakes = validator_keypairs
+            .iter()
+            .map(|x| (x.node_keypair.pubkey(), 100))
+            .collect();
+        let staked_nodes: Arc<RwLock<StakedNodes>> = Arc::new(RwLock::new(StakedNodes::new(
+            Arc::new(stakes),
+            HashMap::<Pubkey, u64>::default(), // overrides
+        )));
+        let cancel_token = CancellationToken::new();
+        let SpawnServerResult {
+            thread: quic_server_thread,
+            ..
+        } = spawn_server_with_cancel(
+            "AlpenglowLocalClusterTest",
+            "quic_streamer_test",
+            [socket],
+            &Keypair::new(),
+            sender,
+            staked_nodes,
+            QuicServerParams::default_for_tests(),
+            cancel_token.clone(),
+        )
+        .unwrap();
+        let packets = receiver.recv().unwrap();
+        let packet = packets.first().expect("No packets received");
+        let received_message = packet
+            .deserialize_slice::<ConsensusMessage, _>(..)
+            .unwrap_or_else(|err| {
+                panic!(
+                    "Failed to deserialize BLSMessage: {:?} {:?}",
+                    size_of::<ConsensusMessage>(),
+                    err
+                )
+            });
+        assert_eq!(received_message, expected_message);
+        cancel_token.cancel();
+        quic_server_thread.join().unwrap();
+    }
+}