Forráskód Böngészése

new token bucket impl (#6893)

* new atomic token bucket implemenation

Adds TokenBucket and KeyedRateLimiter to replace governor crate
and in general allow for better control over rate-limiting options

* shuttle test
Alex Pyattaev 2 hónapja
szülő
commit
cc3f387437

+ 4 - 0
Cargo.lock

@@ -9397,6 +9397,8 @@ dependencies = [
  "anyhow",
  "bincode",
  "bytes",
+ "cfg-if 1.0.3",
+ "dashmap",
  "hxdmp",
  "itertools 0.12.1",
  "log",
@@ -9405,9 +9407,11 @@ dependencies = [
  "rand 0.8.5",
  "serde",
  "serde_derive",
+ "shuttle",
  "socket2 0.6.0",
  "solana-logger",
  "solana-serde",
+ "solana-svm-type-overrides",
  "tokio",
  "url 2.5.7",
 ]

+ 9 - 0
net-utils/Cargo.toml

@@ -19,11 +19,14 @@ name = "solana_net_utils"
 agave-unstable-api = []
 default = []
 dev-context-only-utils = ["dep:pcap-file", "dep:hxdmp"]
+shuttle-test = ["dep:shuttle", "solana-svm-type-overrides/shuttle-test"]
 
 [dependencies]
 anyhow = { workspace = true }
 bincode = { workspace = true }
 bytes = { workspace = true }
+cfg-if = { workspace = true }
+dashmap = { workspace = true, features = ["raw-api"] }
 hxdmp = { version = "0.2.1", optional = true }
 itertools = { workspace = true }
 log = { workspace = true }
@@ -32,8 +35,10 @@ pcap-file = { version = "2.0.0", optional = true }
 rand = { workspace = true }
 serde = { workspace = true }
 serde_derive = { workspace = true }
+shuttle = { workspace = true, optional = true }
 socket2 = { workspace = true }
 solana-serde = { workspace = true }
+solana-svm-type-overrides = { workspace = true }
 tokio = { workspace = true, features = ["full"] }
 url = { workspace = true }
 
@@ -42,3 +47,7 @@ solana-logger = { workspace = true }
 
 [lints]
 workspace = true
+
+[[bench]]
+name = "token_bucket"
+harness = false

+ 177 - 0
net-utils/benches/token_bucket.rs

@@ -0,0 +1,177 @@
+#![allow(clippy::arithmetic_side_effects)]
+use {
+    solana_net_utils::token_bucket::*,
+    std::{
+        net::{IpAddr, Ipv4Addr},
+        sync::atomic::{AtomicUsize, Ordering},
+        time::{Duration, Instant},
+    },
+};
+
+fn bench_token_bucket() {
+    println!("Running bench_token_bucket...");
+    let run_duration = Duration::from_secs(5);
+    let fill_rate = 10000.0;
+    let request_size = 3;
+    let target_rate = fill_rate / request_size as f64;
+    let tb = TokenBucket::new(1, 600, fill_rate);
+
+    let accepted = AtomicUsize::new(0);
+    let rejected = AtomicUsize::new(0);
+
+    let start = Instant::now();
+    let workers = 8;
+
+    std::thread::scope(|scope| {
+        for _ in 0..workers {
+            scope.spawn(|| loop {
+                if start.elapsed() > run_duration {
+                    break;
+                }
+                match tb.consume_tokens(request_size) {
+                    Ok(_) => accepted.fetch_add(1, Ordering::Relaxed),
+                    Err(_) => rejected.fetch_add(1, Ordering::Relaxed),
+                };
+            });
+        }
+        // periodically check for races
+        let jh = scope.spawn(|| loop {
+            std::thread::sleep(Duration::from_millis(100));
+            let elapsed = start.elapsed();
+            if elapsed > run_duration {
+                break;
+            }
+            let acc = accepted.load(Ordering::Relaxed);
+            let rate = acc as f64 / elapsed.as_secs_f64();
+            assert!(
+                tb.current_tokens() < request_size * 2,
+                "bucket should have no spare tokens"
+            );
+            assert!(
+                // allow 1% error
+                (rate - target_rate).abs() < target_rate / 100.0,
+                "Accepted rate should be about {target_rate}, actual {rate}"
+            );
+        });
+        jh.join().expect("Rate checks should pass");
+    });
+
+    let acc = accepted.load(Ordering::Relaxed);
+    let rej = rejected.load(Ordering::Relaxed);
+    println!("Run complete over {:?} seconds", run_duration.as_secs());
+    println!("Accepted {acc}, Rejected: {rej}");
+    println!(
+        "processed {} requests, {} per second",
+        acc + rej,
+        (acc + rej) as f32 / run_duration.as_secs_f32()
+    );
+}
+
+fn bench_token_bucket_eviction() {
+    println!("Running bench_token_bucket_eviction...");
+    let run_duration = Duration::from_secs(5);
+    let target_size = 256;
+    let tb = TokenBucket::new(1, 60, 100.0);
+    let mut limiter = KeyedRateLimiter::new(target_size, tb, 8);
+    // make shrinking more aggressive than default
+    // since only one worker is shrinking the
+    // datastructure at any given moment so we do not flake this test
+    // too hard
+    limiter.set_shrink_interval(32);
+
+    let accepted = AtomicUsize::new(0);
+    let rejected = AtomicUsize::new(0);
+
+    let start = Instant::now();
+    let ip_pool = 1024;
+    let workers = 8;
+
+    let max_size = AtomicUsize::new(0);
+    std::thread::scope(|scope| {
+        for _ in 0..workers {
+            scope.spawn(|| {
+                for i in 1.. {
+                    if Instant::now() > start + run_duration {
+                        break;
+                    }
+                    let ip = IpAddr::V4(Ipv4Addr::from_bits(i % ip_pool as u32));
+                    if limiter.consume_tokens(ip, 1).is_ok() {
+                        accepted.fetch_add(1, Ordering::Relaxed);
+                    } else {
+                        rejected.fetch_add(1, Ordering::Relaxed);
+                    }
+                    let len_approx = limiter.len_approx();
+                    max_size.fetch_max(len_approx, Ordering::Relaxed);
+                }
+            });
+        }
+    });
+
+    let acc = accepted.load(Ordering::Relaxed);
+    let rej = rejected.load(Ordering::Relaxed);
+    println!("Run complete over {:?} seconds", run_duration.as_secs());
+    eprintln!("Max observed size was {}", max_size.load(Ordering::Relaxed));
+    assert!(
+        max_size.load(Ordering::Relaxed) <= target_size * 2,
+        "Max target size should never be exceeded"
+    );
+    println!(
+        "processed {} requests, {} per second",
+        acc + rej,
+        (acc + rej) as f32 / run_duration.as_secs_f32()
+    );
+    println!("Rejected: {rej}");
+}
+
+fn bench_keyed_rate_limiter() {
+    println!("Running bench_keyed_rate_limiter...");
+    let run_duration = Duration::from_secs(5);
+    let tb = TokenBucket::new(1, 60, 100.0);
+    let limiter = KeyedRateLimiter::new(2048, tb, 8);
+
+    let accepted = AtomicUsize::new(0);
+    let rejected = AtomicUsize::new(0);
+
+    let start = Instant::now();
+    let ip_pool = 2048;
+    let expected_total_accepts = (run_duration.as_secs() * 100 * ip_pool) as i64;
+    let workers = 32;
+
+    std::thread::scope(|scope| {
+        for _ in 0..workers {
+            scope.spawn(|| {
+                for i in 1.. {
+                    if Instant::now() > start + run_duration {
+                        break;
+                    }
+                    let ip = IpAddr::V4(Ipv4Addr::from_bits(i % ip_pool as u32));
+                    if limiter.consume_tokens(ip, 1).is_ok() {
+                        accepted.fetch_add(1, Ordering::Relaxed);
+                    } else {
+                        rejected.fetch_add(1, Ordering::Relaxed);
+                    }
+                }
+            });
+        }
+    });
+
+    let acc = accepted.load(Ordering::Relaxed);
+    let rej = rejected.load(Ordering::Relaxed);
+    println!("Run complete over {:?} seconds", run_duration.as_secs());
+    println!("Accepted: {acc} (target {expected_total_accepts})");
+    println!("Rejected: {rej}");
+    println!(
+        "processed {} requests, {} per second",
+        acc + rej,
+        (acc + rej) as f32 / run_duration.as_secs_f32()
+    );
+    assert!(((acc as i64) - expected_total_accepts).abs() < expected_total_accepts / 10);
+}
+
+fn main() {
+    bench_token_bucket();
+    println!("==========");
+    bench_token_bucket_eviction();
+    println!("==========");
+    bench_keyed_rate_limiter();
+}

+ 1 - 0
net-utils/src/lib.rs

@@ -14,6 +14,7 @@ mod ip_echo_client;
 mod ip_echo_server;
 pub mod multihomed_sockets;
 pub mod sockets;
+pub mod token_bucket;
 
 #[cfg(feature = "dev-context-only-utils")]
 pub mod tooling_for_tests;

+ 480 - 0
net-utils/src/token_bucket.rs

@@ -0,0 +1,480 @@
+//! This module contains [`TokenBucket`], which provides ability to limit
+//! rate of certain events, while allowing bursts through.
+//! [`KeyedRateLimiter`] allows to rate-limit multiple keyed items, such
+//! as connections.
+use {
+    cfg_if::cfg_if,
+    dashmap::{mapref::entry::Entry, DashMap},
+    solana_svm_type_overrides::sync::atomic::{AtomicU64, AtomicUsize, Ordering},
+    std::{borrow::Borrow, cmp::Reverse, hash::Hash, time::Instant},
+};
+
+/// Enforces a rate limit on the volume of requests per unit time.
+///
+/// Instances update the amount of tokens upon access, and thus does not need to
+/// be constantly polled to refill. Uses atomics internally so should be
+/// relatively cheap to access from many threads
+pub struct TokenBucket {
+    new_tokens_per_us: f64,
+    max_tokens: u64,
+    /// bucket creation
+    base_time: Instant,
+    tokens: AtomicU64,
+    /// time of last update in us since base_time
+    last_update: AtomicU64,
+    /// time unused in last token creation round
+    credit_time_us: AtomicU64,
+}
+
+#[cfg(feature = "shuttle-test")]
+static TIME_US: AtomicU64 = AtomicU64::new(0); //used to override Instant::now()
+
+// If changing this impl, make sure to run benches and ensure they do not panic.
+// much of the testing is impossible outside of real multithreading in release mode.
+impl TokenBucket {
+    /// Allocate a new TokenBucket
+    pub fn new(initial_tokens: u64, max_tokens: u64, new_tokens_per_second: f64) -> Self {
+        assert!(
+            new_tokens_per_second > 0.0,
+            "Token bucket can not have zero influx rate"
+        );
+        assert!(
+            initial_tokens <= max_tokens,
+            "Can not have more initial tokens than max tokens"
+        );
+        let base_time = Instant::now();
+        TokenBucket {
+            // recompute into us to avoid FP division on every update
+            new_tokens_per_us: new_tokens_per_second / 1e6,
+            max_tokens,
+            tokens: AtomicU64::new(initial_tokens),
+            last_update: AtomicU64::new(0),
+            base_time,
+            credit_time_us: AtomicU64::new(0),
+        }
+    }
+
+    /// Return current amount of tokens in the bucket.
+    /// This may be somewhat inconsistent across threads
+    /// due to Relaxed atomics.
+    #[inline]
+    pub fn current_tokens(&self) -> u64 {
+        let now = self.time_us();
+        self.update_state(now);
+        self.tokens.load(Ordering::Relaxed)
+    }
+
+    /// Attempts to consume tokens from bucket.
+    ///
+    /// On success, returns Ok(amount of tokens left in the bucket).
+    /// On failure, returns Err(amount of tokens missing to fill request).
+    #[inline]
+    pub fn consume_tokens(&self, request_size: u64) -> Result<u64, u64> {
+        let now = self.time_us();
+        self.update_state(now);
+        match self.tokens.fetch_update(
+            Ordering::AcqRel,  // winner publishes new amount
+            Ordering::Acquire, // everyone observed correct number
+            |tokens| {
+                if tokens >= request_size {
+                    Some(tokens.saturating_sub(request_size))
+                } else {
+                    None
+                }
+            },
+        ) {
+            Ok(prev) => Ok(prev.saturating_sub(request_size)),
+            Err(prev) => Err(request_size.saturating_sub(prev)),
+        }
+    }
+
+    /// Retrieves monotonic time since bucket creation.
+    fn time_us(&self) -> u64 {
+        cfg_if! {
+            if #[cfg(feature="shuttle-test")] {
+                TIME_US.load(Ordering::Relaxed)
+            } else {
+                let now = Instant::now();
+                let elapsed = now.saturating_duration_since(self.base_time);
+                elapsed.as_micros() as u64
+            }
+        }
+    }
+
+    /// Updates internal state of the bucket by
+    /// depositing new tokens (if appropriate)
+    fn update_state(&self, now: u64) {
+        // fetch last update time
+        let last = self.last_update.load(Ordering::SeqCst);
+
+        // If time has not advanced, nothing to do.
+        if now <= last {
+            return;
+        }
+
+        // Try to claim the interval [last, now].
+        // If we can not claim it, someone else will claim [last..some other time] when they
+        // touch the bucket.
+        // If we can claim interval [last, now], no other thread can credit tokens for it anymore.
+        // If [last, now] is too short to mint any tokens, spare time will be preserved in credit_time_us.
+        match self.last_update.compare_exchange(
+            last,
+            now,
+            Ordering::AcqRel,  // winner publishes new timestamp
+            Ordering::Acquire, // loser observes updates
+        ) {
+            Ok(_) => {
+                // This thread won the race and is responsible for minting tokens
+                let elapsed = now.saturating_sub(last);
+
+                // also add leftovers from previous conversion attempts.
+                // we do not care about who uses the spare_time_us, so relaxed is ok here.
+                let elapsed =
+                    elapsed.saturating_add(self.credit_time_us.swap(0, Ordering::Relaxed));
+
+                let new_tokens_f64 = elapsed as f64 * self.new_tokens_per_us;
+
+                // amount of full tokens to be minted
+                let new_tokens = new_tokens_f64.floor() as u64;
+
+                let time_to_return = if new_tokens >= 1 {
+                    // Credit tokens, saturating at max_tokens
+                    let _ = self.tokens.fetch_update(
+                        Ordering::AcqRel,  // writer publishes new amount
+                        Ordering::Acquire, //we fetch the correct amount
+                        |tokens| Some(tokens.saturating_add(new_tokens).min(self.max_tokens)),
+                    );
+                    // Fractional remainder of elapsed time (not enough to mint a whole token)
+                    // that will be credited to other minters
+                    (new_tokens_f64.fract() / self.new_tokens_per_us) as u64
+                } else {
+                    // No whole tokens minted → return whole interval
+                    elapsed
+                };
+                // Save unused elapsed time for other threads
+                self.credit_time_us
+                    .fetch_add(time_to_return, Ordering::Relaxed);
+            }
+            Err(_) => {
+                // Another thread advanced last_update first → nothing we can do now.
+            }
+        }
+    }
+}
+
+impl Clone for TokenBucket {
+    /// Clones the TokenBucket with approximate state
+    /// of the original. While this will never return an object in an
+    /// invalid state, using this in a contended environment is not recommended.
+    fn clone(&self) -> Self {
+        Self {
+            new_tokens_per_us: self.new_tokens_per_us,
+            max_tokens: self.max_tokens,
+            base_time: self.base_time,
+            tokens: AtomicU64::new(self.tokens.load(Ordering::Relaxed)),
+            last_update: AtomicU64::new(self.last_update.load(Ordering::Relaxed)),
+            credit_time_us: AtomicU64::new(self.credit_time_us.load(Ordering::Relaxed)),
+        }
+    }
+}
+
+/// Provides rate limiting for multiple contexts at the same time
+///
+/// This can use e.g. IP address as a Key.
+/// Internally this is a [DashMap] of [TokenBucket] instances
+/// that are created on demand using a prototype [TokenBucket]
+/// to copy initial state from.
+/// Uses LazyLru logic under the hood to keep the amount of items
+/// under control.
+pub struct KeyedRateLimiter<K>
+where
+    K: Hash + Eq,
+{
+    data: DashMap<K, TokenBucket>,
+    target_capacity: usize,
+    prototype_bucket: TokenBucket,
+    countdown_to_shrink: AtomicUsize,
+    approx_len: AtomicUsize,
+    shrink_interval: usize,
+}
+
+impl<K> KeyedRateLimiter<K>
+where
+    K: Hash + Eq,
+{
+    /// Creates a new KeyedRateLimiter with a specified taget capacity and shard amount for the
+    /// underlying DashMap. This uses a LazyLRU style eviction policy, so actual memory consumption
+    /// will be 2 * target_capacity.
+    ///
+    /// shard_amount should be greater than 0 and be a power of two.
+    /// If a shard_amount which is not a power of two is provided, the function will panic.
+    #[allow(clippy::arithmetic_side_effects)]
+    pub fn new(target_capacity: usize, prototype_bucket: TokenBucket, shard_amount: usize) -> Self {
+        let shrink_interval = target_capacity / 4;
+        Self {
+            data: DashMap::with_capacity_and_shard_amount(target_capacity * 2, shard_amount),
+            target_capacity,
+            prototype_bucket,
+            countdown_to_shrink: AtomicUsize::new(shrink_interval),
+            approx_len: AtomicUsize::new(0),
+            shrink_interval,
+        }
+    }
+
+    /// Fetches amount of tokens available for key.
+    ///
+    /// Returns None if no bucket exists for the key provided
+    #[inline]
+    pub fn current_tokens(&self, key: impl Borrow<K>) -> Option<u64> {
+        let bucket = self.data.get(key.borrow())?;
+        Some(bucket.current_tokens())
+    }
+
+    /// Consumes request_size tokens from a bucket at given key.
+    ///
+    /// On success, returns Ok(amount of tokens left in the bucket)
+    /// On failure, returns Err(amount of tokens missing to fill request)
+    /// If no bucket exists at key, a new bucket will be allocated, and normal policy will be applied to it
+    /// Outdated buckets may be evicted on an LRU basis.
+    pub fn consume_tokens(&self, key: K, request_size: u64) -> Result<u64, u64> {
+        let (entry_added, res) = {
+            let bucket = self.data.entry(key);
+            match bucket {
+                Entry::Occupied(entry) => (false, entry.get().consume_tokens(request_size)),
+                Entry::Vacant(entry) => {
+                    // if the key is not in the LRU, we need to allocate a new bucket
+                    let bucket = self.prototype_bucket.clone();
+                    let res = bucket.consume_tokens(request_size);
+                    entry.insert(bucket);
+                    (true, res)
+                }
+            }
+        };
+
+        if entry_added {
+            if let Ok(count) =
+                self.countdown_to_shrink
+                    .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
+                        if v == 0 {
+                            // reset the countup to starting position
+                            // thus preventing other threads from racing for locks
+                            None
+                        } else {
+                            Some(v.saturating_sub(1))
+                        }
+                    })
+            {
+                if count == 1 {
+                    // the last "previous" value we will see before counter reaches zero
+                    self.maybe_shrink();
+                    self.countdown_to_shrink
+                        .store(self.shrink_interval, Ordering::Relaxed);
+                }
+            } else {
+                self.approx_len.fetch_add(1, Ordering::Relaxed);
+            }
+        }
+        res
+    }
+
+    /// Returns approximate amount of entries in the datastructure.
+    /// Should be within ~10% of the true amount.
+    #[inline]
+    pub fn len_approx(&self) -> usize {
+        self.approx_len.load(Ordering::Relaxed)
+    }
+
+    // apply lazy-LRU eviction policy to each DashMap shard.
+    // Allowing side-effects here since overflows here are not
+    // actually possible
+    #[allow(clippy::arithmetic_side_effects)]
+    fn maybe_shrink(&self) {
+        let mut actual_len = 0;
+        let target_shard_size = self.target_capacity / self.data.shards().len();
+        let mut entries = Vec::with_capacity(target_shard_size * 2);
+        for shardlock in self.data.shards() {
+            let mut shard = shardlock.write();
+
+            if shard.len() <= target_shard_size * 3 / 2 {
+                actual_len += shard.len();
+                continue;
+            }
+            entries.clear();
+            entries.extend(
+                shard.drain().map(|(key, value)| {
+                    (key, value.get().last_update.load(Ordering::SeqCst), value)
+                }),
+            );
+
+            entries.select_nth_unstable_by_key(target_shard_size, |(_, last_update, _)| {
+                Reverse(*last_update)
+            });
+
+            shard.extend(
+                entries
+                    .drain(..)
+                    .take(target_shard_size)
+                    .map(|(key, _last_update, value)| (key, value)),
+            );
+            debug_assert!(shard.len() <= target_shard_size);
+            actual_len += shard.len();
+        }
+        self.approx_len.store(actual_len, Ordering::Relaxed);
+    }
+
+    /// Set the auto-shrink interval. Set to 0 to disable shrinking.
+    /// During writes we want to check for length, but not too often
+    /// to reduce probability of lock contention, so keeping this
+    /// large is good for perf (at cost of memory use)
+    pub fn set_shrink_interval(&mut self, interval: usize) {
+        self.shrink_interval = interval;
+    }
+
+    /// Get the auto-shrink interval.
+    pub fn shrink_interval(&self) -> usize {
+        self.shrink_interval
+    }
+}
+
+#[cfg(test)]
+pub mod test {
+    use {
+        super::*,
+        solana_svm_type_overrides::thread,
+        std::{
+            net::{IpAddr, Ipv4Addr},
+            time::Duration,
+        },
+    };
+
+    #[test]
+    fn test_token_bucket() {
+        let tb = TokenBucket::new(100, 100, 1000.0);
+        assert_eq!(tb.current_tokens(), 100);
+        tb.consume_tokens(50).expect("Bucket is initially full");
+        tb.consume_tokens(50)
+            .expect("We should still have >50 tokens left");
+        tb.consume_tokens(50)
+            .expect_err("There should not be enough tokens now");
+        thread::sleep(Duration::from_millis(50));
+        assert!(
+            tb.current_tokens() > 40,
+            "We should be refilling at ~1 token per millisecond"
+        );
+        assert!(
+            tb.current_tokens() < 70,
+            "We should be refilling at ~1 token per millisecond"
+        );
+        tb.consume_tokens(40)
+            .expect("Bucket should have enough for another request now");
+        thread::sleep(Duration::from_millis(120));
+        assert_eq!(tb.current_tokens(), 100, "Bucket should not overfill");
+    }
+    #[test]
+    fn test_keyed_rate_limiter() {
+        let prototype_bucket = TokenBucket::new(100, 100, 1000.0);
+        let rl = KeyedRateLimiter::new(8, prototype_bucket, 2);
+        let ip1 = IpAddr::V4(Ipv4Addr::from_bits(1234));
+        let ip2 = IpAddr::V4(Ipv4Addr::from_bits(4321));
+        assert_eq!(rl.current_tokens(ip1), None, "Initially no buckets exist");
+        rl.consume_tokens(ip1, 50)
+            .expect("Bucket is initially full");
+        rl.consume_tokens(ip1, 50)
+            .expect("We should still have >50 tokens left");
+        rl.consume_tokens(ip1, 50)
+            .expect_err("There should not be enough tokens now");
+        rl.consume_tokens(ip2, 50)
+            .expect("Bucket is initially full");
+        rl.consume_tokens(ip2, 50)
+            .expect("We should still have >50 tokens left");
+        rl.consume_tokens(ip2, 50)
+            .expect_err("There should not be enough tokens now");
+        std::thread::sleep(Duration::from_millis(50));
+        assert!(
+            rl.current_tokens(ip1).unwrap() > 40,
+            "We should be refilling at ~1 token per millisecond"
+        );
+        assert!(
+            rl.current_tokens(ip1).unwrap() < 70,
+            "We should be refilling at ~1 token per millisecond"
+        );
+        rl.consume_tokens(ip1, 40)
+            .expect("Bucket should have enough for another request now");
+        thread::sleep(Duration::from_millis(120));
+        assert_eq!(
+            rl.current_tokens(ip1),
+            Some(100),
+            "Bucket should not overfill"
+        );
+        assert_eq!(
+            rl.current_tokens(ip2),
+            Some(100),
+            "Bucket should not overfill"
+        );
+
+        rl.consume_tokens(ip2, 100).expect("Bucket should be full");
+        // go several times over the capacity of the TB to make sure old record
+        // is erased no matter in which bucket it lands
+        for ip in 0..64 {
+            let ip = IpAddr::V4(Ipv4Addr::from_bits(ip));
+            rl.consume_tokens(ip, 50).unwrap();
+        }
+        assert_eq!(
+            rl.current_tokens(ip1),
+            None,
+            "Very old record should have been erased"
+        );
+        rl.consume_tokens(ip2, 100)
+            .expect("New bucket should have been made for ip2");
+    }
+
+    #[cfg(feature = "shuttle-test")]
+    #[test]
+    fn shuttle_test_token_bucket_race() {
+        use shuttle::sync::atomic::AtomicBool;
+        shuttle::check_random(
+            || {
+                TIME_US.store(0, Ordering::SeqCst);
+                let test_duration_us = 2500;
+                let run: &AtomicBool = Box::leak(Box::new(AtomicBool::new(true)));
+                let tb: &TokenBucket = Box::leak(Box::new(TokenBucket::new(10, 20, 5000.0)));
+
+                // time advancement thread
+                let time_advancer = thread::spawn(move || {
+                    let mut current_time = 0;
+                    while current_time < test_duration_us && run.load(Ordering::SeqCst) {
+                        let increment = 100; // microseconds
+                        current_time += increment;
+                        TIME_US.store(current_time, Ordering::SeqCst);
+                        shuttle::thread::yield_now();
+                    }
+                    run.store(false, Ordering::SeqCst);
+                });
+
+                let threads: Vec<_> = (0..2)
+                    .map(|_| {
+                        thread::spawn(move || {
+                            let mut total = 0;
+                            while run.load(Ordering::SeqCst) {
+                                if tb.consume_tokens(5).is_ok() {
+                                    total += 1;
+                                }
+                                shuttle::thread::yield_now();
+                            }
+                            total
+                        })
+                    })
+                    .collect();
+
+                time_advancer.join().unwrap();
+                let received = threads.into_iter().map(|t| t.join().unwrap()).sum();
+
+                // Initial tokens: 10, refill rate: 5000 tokens/sec (5 tokens/ms)
+                // In 2ms: 10 + (5 * 2) = 20 tokens total
+                // Each consumption: 5 tokens → 4 total consumptions expected
+                assert_eq!(4, received);
+            },
+            100,
+        );
+    }
+}

+ 41 - 38
programs/sbf/Cargo.lock

@@ -43,7 +43,7 @@ version = "0.8.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "cipher",
  "cpufeatures",
 ]
@@ -356,7 +356,7 @@ version = "0.8.11"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "getrandom 0.2.10",
  "once_cell",
  "version_check",
@@ -864,7 +864,7 @@ checksum = "4319208da049c43661739c5fade2ba182f09d1dc2299b32298d3a31692b17e12"
 dependencies = [
  "addr2line",
  "cc",
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "libc",
  "miniz_oxide",
  "object 0.31.1",
@@ -982,7 +982,7 @@ dependencies = [
  "arrayref",
  "arrayvec",
  "cc",
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "constant_time_eq",
  "digest 0.10.7",
 ]
@@ -1224,9 +1224,9 @@ checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
 
 [[package]]
 name = "cfg-if"
-version = "1.0.0"
+version = "1.0.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
+checksum = "2fd1289c04a9ea8cb22300a459a72a385d7c73d3259e2ed7dcb2af674838cfa9"
 
 [[package]]
 name = "cfg_aliases"
@@ -1480,7 +1480,7 @@ version = "0.8.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "6455c0ca19f0d2fbf751b908d5c55c1f5cbc65e03c4225427254b46890bdde1e"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "crossbeam-epoch",
  "crossbeam-utils",
 ]
@@ -1491,7 +1491,7 @@ version = "0.9.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "4ec02e091aa634e2c3ada4a392989e7c3116673ef0ac5b72232439094d73b7fd"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "crossbeam-utils",
  "lazy_static",
  "memoffset 0.6.4",
@@ -1504,7 +1504,7 @@ version = "0.8.18"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "c3a430a770ebd84726f584a90ee7f020d28db52c6d02138900f22341f866d39c"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
 ]
 
 [[package]]
@@ -1574,7 +1574,7 @@ version = "4.1.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "cpufeatures",
  "curve25519-dalek-derive",
  "digest 0.10.7",
@@ -1638,7 +1638,7 @@ version = "5.5.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "hashbrown 0.14.3",
  "lock_api",
  "once_cell",
@@ -1802,7 +1802,7 @@ version = "2.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "b98cf8ebf19c3d1b223e151f99a4f9f0690dca41414773390fc824184ac833e1"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "dirs-sys-next",
 ]
 
@@ -2130,7 +2130,7 @@ version = "3.0.13"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "ef033ed5e9bad94e55838ca0ca906db0e043f517adda0c8b79c7a8c66c93c1b5"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "rustix 0.38.39",
  "windows-sys 0.48.0",
 ]
@@ -2415,7 +2415,7 @@ version = "0.2.10"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "js-sys",
  "libc",
  "wasi 0.11.0+wasi-snapshot-preview1",
@@ -2428,7 +2428,7 @@ version = "0.3.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "43a49c392881ce6d5c3b8cb70f98717b7c07aabbdff06687b9030dbfbe2725f8"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "js-sys",
  "libc",
  "wasi 0.13.3+wasi-0.2.2",
@@ -2486,7 +2486,7 @@ version = "0.6.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "68a7f542ee6b35af73b06abc0dad1c1bae89964e4e253bc4b587b91c9637867b"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "dashmap",
  "futures 0.3.31",
  "futures-timer",
@@ -2632,7 +2632,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "03b876ecf37e86b359573c16c8366bc3eba52b689884a0fc42ba3f67203d2a8b"
 dependencies = [
  "cc",
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "libc",
  "pkg-config",
  "windows-sys 0.48.0",
@@ -3162,7 +3162,7 @@ version = "0.1.9"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "61124eeebbd69b8190558df225adf7e4caafce0d743919e5d6b19652314ec5ec"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
 ]
 
 [[package]]
@@ -3172,7 +3172,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "046fa2d4d00aea763528b4950358d0ead425372445dc8ff86312b3c69ff7727b"
 dependencies = [
  "bitflags 2.9.4",
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "libc",
 ]
 
@@ -3229,7 +3229,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "1a87aa2bb7d2af34197c04845522473242e1aa17c12f4935d5856491a7fb8c97"
 dependencies = [
  "cesu8",
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "combine 4.6.7",
  "jni-sys",
  "log",
@@ -3400,7 +3400,7 @@ version = "0.13.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "f6e3919bbaa2945715f0bb6d3934a173d1e9a59ac23767fbaaef277265a7411b"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "ecdsa",
  "elliptic-curve",
  "once_cell",
@@ -3460,7 +3460,7 @@ version = "0.7.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "b67380fd3b2fbe7527a606e18729d21c6f3951633d0500574c4dc22d2d638b9f"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "winapi 0.3.9",
 ]
 
@@ -3787,7 +3787,7 @@ version = "0.11.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "4c84490118f2ee2d74570d114f3d0493cbf02790df303d2707606c3e14e07c96"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "downcast",
  "fragile",
  "lazy_static",
@@ -3802,7 +3802,7 @@ version = "0.11.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "22ce75669015c4f47b289fd4d4f56e894e4c96003ffdf3ac51313126f94c6cbb"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "proc-macro2",
  "quote",
  "syn 1.0.109",
@@ -3870,7 +3870,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6"
 dependencies = [
  "bitflags 2.9.4",
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "cfg_aliases",
  "libc",
  "memoffset 0.9.0",
@@ -4094,7 +4094,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "8505734d46c8ab1e19a1dce3aef597ad87dcb4c37e7188231769bd6bd51cebf8"
 dependencies = [
  "bitflags 2.9.4",
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "foreign-types",
  "libc",
  "once_cell",
@@ -4216,7 +4216,7 @@ version = "0.8.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "instant",
  "libc",
  "redox_syscall 0.2.10",
@@ -4230,7 +4230,7 @@ version = "0.9.8"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "libc",
  "redox_syscall 0.3.5",
  "smallvec",
@@ -4400,7 +4400,7 @@ version = "0.6.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "9d1fe60d06143b2430aa532c94cfe9e29783047f06c0d7fd359a9a51b729fa25"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "cpufeatures",
  "opaque-debug",
  "universal-hash",
@@ -5045,7 +5045,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7"
 dependencies = [
  "cc",
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "getrandom 0.2.10",
  "libc",
  "untrusted",
@@ -5487,7 +5487,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "99cd6713db3cf16b6c84e06321e049a9b9f699826e16096d23bbcc44d15d51a6"
 dependencies = [
  "block-buffer 0.9.0",
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "cpufeatures",
  "digest 0.9.0",
  "opaque-debug",
@@ -5499,7 +5499,7 @@ version = "0.10.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "cpufeatures",
  "digest 0.10.7",
 ]
@@ -5511,7 +5511,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "4d58a1e1bf39749807d89cf2d98ac2dfa0ff1cb3faa38fbb64dd88ac8013d800"
 dependencies = [
  "block-buffer 0.9.0",
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "cpufeatures",
  "digest 0.9.0",
  "opaque-debug",
@@ -5523,7 +5523,7 @@ version = "0.10.9"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "cpufeatures",
  "digest 0.10.7",
 ]
@@ -7363,6 +7363,8 @@ dependencies = [
  "anyhow",
  "bincode",
  "bytes",
+ "cfg-if 1.0.3",
+ "dashmap",
  "itertools 0.12.1",
  "log",
  "nix",
@@ -7371,6 +7373,7 @@ dependencies = [
  "serde_derive",
  "socket2 0.6.0",
  "solana-serde",
+ "solana-svm-type-overrides",
  "tokio",
  "url 2.5.7",
 ]
@@ -10715,7 +10718,7 @@ version = "1.1.8"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "once_cell",
 ]
 
@@ -11451,7 +11454,7 @@ version = "0.2.103"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "ab10a69fbd0a177f5f649ad4d8d3305499c42bab9aef2f7ff592d0ec8f833819"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "once_cell",
  "rustversion",
  "wasm-bindgen-macro",
@@ -11478,7 +11481,7 @@ version = "0.4.22"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "73157efb9af26fb564bb59a009afd1c7c334a44db171d280690d0c3faaec3468"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "js-sys",
  "wasm-bindgen",
  "web-sys",
@@ -11949,7 +11952,7 @@ version = "0.50.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1"
 dependencies = [
- "cfg-if 1.0.0",
+ "cfg-if 1.0.3",
  "windows-sys 0.48.0",
 ]
 

+ 50 - 1
streamer/src/nonblocking/connection_rate_limiter.rs

@@ -3,6 +3,7 @@ use {
     std::{net::IpAddr, num::NonZeroU32},
 };
 
+/// Limits the rate of connections per IP address.
 pub struct ConnectionRateLimiter {
     limiter: DefaultKeyedRateLimiter<IpAddr>,
 }
@@ -75,7 +76,17 @@ impl TotalConnectionRateLimiter {
 
 #[cfg(test)]
 pub mod test {
-    use {super::*, std::net::Ipv4Addr};
+    use {
+        super::*,
+        std::{
+            net::Ipv4Addr,
+            sync::{
+                atomic::{AtomicUsize, Ordering},
+                Arc,
+            },
+            time::{Duration, Instant},
+        },
+    };
 
     #[tokio::test]
     async fn test_total_connection_rate_limiter() {
@@ -104,4 +115,42 @@ pub mod test {
         assert!(limiter.is_allowed(&ip2));
         assert!(!limiter.is_allowed(&ip2));
     }
+
+    #[test]
+    fn test_bench_rate_limiter() {
+        let run_duration = Duration::from_secs(3);
+        let limiter = Arc::new(ConnectionRateLimiter::new(60 * 100));
+
+        let accepted = AtomicUsize::new(0);
+        let rejected = AtomicUsize::new(0);
+
+        let start = Instant::now();
+        let ip_pool = 2048;
+        let expected_total_accepts = (run_duration.as_secs() * 100 * ip_pool) as i64;
+        let workers = 8;
+
+        std::thread::scope(|scope| {
+            for _ in 0..workers {
+                scope.spawn(|| {
+                    for i in 1.. {
+                        if Instant::now() > start + run_duration {
+                            break;
+                        }
+                        let ip = IpAddr::V4(Ipv4Addr::from_bits(i % ip_pool as u32));
+                        if limiter.is_allowed(&ip) {
+                            accepted.fetch_add(1, Ordering::Relaxed);
+                        } else {
+                            rejected.fetch_add(1, Ordering::Relaxed);
+                        }
+                    }
+                });
+            }
+        });
+
+        let acc = accepted.load(Ordering::Relaxed);
+        let rej = rejected.load(Ordering::Relaxed);
+        println!("Run complete over {:?} seconds", run_duration.as_secs());
+        println!("Accepted: {acc} (target {expected_total_accepts})");
+        println!("Rejected: {rej}");
+    }
 }