Эх сурвалжийг харах

Rename StreamStats to StreamerStats (#2028)

Name StreamStats is misleading because this structure has metrics not only for streams but also foe connections. Hence, it makes sense to rename it to StreamerStats.
kirill lykov 1 жил өмнө
parent
commit
b24f492225

+ 16 - 16
streamer/src/nonblocking/quic.rs

@@ -7,7 +7,7 @@ use {
                 STREAM_THROTTLING_INTERVAL_MS,
             },
         },
-        quic::{configure_server, QuicServerError, StreamStats},
+        quic::{configure_server, QuicServerError, StreamerStats},
         streamer::StakedNodes,
         tls_certificates::get_pubkey_from_tls_certificate,
     },
@@ -99,7 +99,7 @@ const TOTAL_CONNECTIONS_PER_SECOND: u64 = 2500;
 /// The threshold of the size of the connection rate limiter map. When
 /// the map size is above this, we will trigger a cleanup of older
 /// entries used by past requests.
-const CONNECITON_RATE_LIMITER_CLEANUP_SIZE_THRESHOLD: usize = 100_000;
+const CONNECTION_RATE_LIMITER_CLEANUP_SIZE_THRESHOLD: usize = 100_000;
 
 // A sequence of bytes that is part of a packet
 // along with where in the packet it is
@@ -140,7 +140,7 @@ impl ConnectionPeerType {
 
 pub struct SpawnNonBlockingServerResult {
     pub endpoints: Vec<Endpoint>,
-    pub stats: Arc<StreamStats>,
+    pub stats: Arc<StreamerStats>,
     pub thread: JoinHandle<()>,
     pub max_concurrent_connections: usize,
 }
@@ -212,7 +212,7 @@ pub fn spawn_server_multi(
             .map_err(QuicServerError::EndpointFailed)
         })
         .collect::<Result<Vec<_>, _>>()?;
-    let stats = Arc::<StreamStats>::default();
+    let stats = Arc::<StreamerStats>::default();
     let handle = tokio::spawn(run_server(
         name,
         endpoints.clone(),
@@ -248,7 +248,7 @@ async fn run_server(
     max_unstaked_connections: usize,
     max_streams_per_ms: u64,
     max_connections_per_ipaddr_per_min: u64,
-    stats: Arc<StreamStats>,
+    stats: Arc<StreamerStats>,
     wait_for_chunk_timeout: Duration,
     coalesce: Duration,
 ) {
@@ -331,7 +331,7 @@ async fn run_server(
                 continue;
             }
 
-            if rate_limiter.len() > CONNECITON_RATE_LIMITER_CLEANUP_SIZE_THRESHOLD {
+            if rate_limiter.len() > CONNECTION_RATE_LIMITER_CLEANUP_SIZE_THRESHOLD {
                 rate_limiter.retain_recent();
             }
             stats
@@ -374,7 +374,7 @@ async fn run_server(
 fn prune_unstaked_connection_table(
     unstaked_connection_table: &mut ConnectionTable,
     max_unstaked_connections: usize,
-    stats: Arc<StreamStats>,
+    stats: Arc<StreamerStats>,
 ) {
     if unstaked_connection_table.total_size >= max_unstaked_connections {
         const PRUNE_TABLE_TO_PERCENTAGE: u8 = 90;
@@ -457,7 +457,7 @@ struct NewConnectionHandlerParams {
     peer_type: ConnectionPeerType,
     total_stake: u64,
     max_connections_per_peer: usize,
-    stats: Arc<StreamStats>,
+    stats: Arc<StreamerStats>,
     max_stake: u64,
     min_stake: u64,
 }
@@ -466,7 +466,7 @@ impl NewConnectionHandlerParams {
     fn new_unstaked(
         packet_sender: AsyncSender<PacketAccumulator>,
         max_connections_per_peer: usize,
-        stats: Arc<StreamStats>,
+        stats: Arc<StreamerStats>,
     ) -> NewConnectionHandlerParams {
         NewConnectionHandlerParams {
             packet_sender,
@@ -640,7 +640,7 @@ async fn setup_connection(
     max_staked_connections: usize,
     max_unstaked_connections: usize,
     max_streams_per_ms: u64,
-    stats: Arc<StreamStats>,
+    stats: Arc<StreamerStats>,
     wait_for_chunk_timeout: Duration,
     stream_load_ema: Arc<StakedStreamLoadEMA>,
 ) {
@@ -769,7 +769,7 @@ async fn setup_connection(
     }
 }
 
-fn handle_connection_error(e: quinn::ConnectionError, stats: &StreamStats, from: SocketAddr) {
+fn handle_connection_error(e: quinn::ConnectionError, stats: &StreamerStats, from: SocketAddr) {
     debug!("error: {:?} from: {:?}", e, from);
     stats.connection_setup_error.fetch_add(1, Ordering::Relaxed);
     match e {
@@ -811,7 +811,7 @@ async fn packet_batch_sender(
     packet_sender: Sender<PacketBatch>,
     packet_receiver: AsyncReceiver<PacketAccumulator>,
     exit: Arc<AtomicBool>,
-    stats: Arc<StreamStats>,
+    stats: Arc<StreamerStats>,
     coalesce: Duration,
 ) {
     trace!("enter packet_batch_sender");
@@ -902,7 +902,7 @@ async fn packet_batch_sender(
 
 fn track_streamer_fetch_packet_performance(
     packet_perf_measure: &[([u8; 64], Instant)],
-    stats: &StreamStats,
+    stats: &StreamerStats,
 ) {
     if packet_perf_measure.is_empty() {
         return;
@@ -1075,7 +1075,7 @@ async fn handle_chunk(
     packet_accum: &mut Option<PacketAccumulator>,
     remote_addr: &SocketAddr,
     packet_sender: &AsyncSender<PacketAccumulator>,
-    stats: Arc<StreamStats>,
+    stats: Arc<StreamerStats>,
     peer_type: ConnectionPeerType,
 ) -> bool {
     match chunk {
@@ -1493,7 +1493,7 @@ pub mod test {
         Arc<AtomicBool>,
         crossbeam_channel::Receiver<PacketBatch>,
         SocketAddr,
-        Arc<StreamStats>,
+        Arc<StreamerStats>,
     ) {
         let sockets = {
             #[cfg(not(target_os = "windows"))]
@@ -1742,7 +1742,7 @@ pub mod test {
         let (pkt_batch_sender, pkt_batch_receiver) = unbounded();
         let (ptk_sender, pkt_receiver) = async_unbounded();
         let exit = Arc::new(AtomicBool::new(false));
-        let stats = Arc::new(StreamStats::default());
+        let stats = Arc::new(StreamerStats::default());
 
         let handle = tokio::spawn(packet_batch_sender(
             pkt_batch_sender,

+ 10 - 10
streamer/src/nonblocking/stream_throttle.rs

@@ -1,5 +1,5 @@
 use {
-    crate::{nonblocking::quic::ConnectionPeerType, quic::StreamStats},
+    crate::{nonblocking::quic::ConnectionPeerType, quic::StreamerStats},
     percentage::Percentage,
     std::{
         cmp,
@@ -23,7 +23,7 @@ pub(crate) struct StakedStreamLoadEMA {
     current_load_ema: AtomicU64,
     load_in_recent_interval: AtomicU64,
     last_update: RwLock<Instant>,
-    stats: Arc<StreamStats>,
+    stats: Arc<StreamerStats>,
     // Maximum number of streams for a staked connection in EMA window
     // Note: EMA window can be different than stream throttling window. EMA is being calculated
     //       specifically for staked connections. Unstaked connections have fixed limit on
@@ -35,7 +35,7 @@ pub(crate) struct StakedStreamLoadEMA {
 
 impl StakedStreamLoadEMA {
     pub(crate) fn new(
-        stats: Arc<StreamStats>,
+        stats: Arc<StreamerStats>,
         max_unstaked_connections: usize,
         max_streams_per_ms: u64,
     ) -> Self {
@@ -239,7 +239,7 @@ pub mod test {
             nonblocking::{
                 quic::DEFAULT_MAX_STREAMS_PER_MS, stream_throttle::STREAM_LOAD_EMA_INTERVAL_MS,
             },
-            quic::{StreamStats, MAX_UNSTAKED_CONNECTIONS},
+            quic::{StreamerStats, MAX_UNSTAKED_CONNECTIONS},
         },
         std::{
             sync::{atomic::Ordering, Arc},
@@ -250,7 +250,7 @@ pub mod test {
     #[test]
     fn test_max_streams_for_unstaked_connection() {
         let load_ema = Arc::new(StakedStreamLoadEMA::new(
-            Arc::new(StreamStats::default()),
+            Arc::new(StreamerStats::default()),
             MAX_UNSTAKED_CONNECTIONS,
             DEFAULT_MAX_STREAMS_PER_MS,
         ));
@@ -267,7 +267,7 @@ pub mod test {
     #[test]
     fn test_max_streams_for_staked_connection() {
         let load_ema = Arc::new(StakedStreamLoadEMA::new(
-            Arc::new(StreamStats::default()),
+            Arc::new(StreamerStats::default()),
             MAX_UNSTAKED_CONNECTIONS,
             DEFAULT_MAX_STREAMS_PER_MS,
         ));
@@ -359,7 +359,7 @@ pub mod test {
     #[test]
     fn test_max_streams_for_staked_connection_with_no_unstaked_connections() {
         let load_ema = Arc::new(StakedStreamLoadEMA::new(
-            Arc::new(StreamStats::default()),
+            Arc::new(StreamerStats::default()),
             0,
             DEFAULT_MAX_STREAMS_PER_MS,
         ));
@@ -447,7 +447,7 @@ pub mod test {
     #[test]
     fn test_update_ema() {
         let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
-            Arc::new(StreamStats::default()),
+            Arc::new(StreamerStats::default()),
             MAX_UNSTAKED_CONNECTIONS,
             DEFAULT_MAX_STREAMS_PER_MS,
         ));
@@ -476,7 +476,7 @@ pub mod test {
     #[test]
     fn test_update_ema_missing_interval() {
         let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
-            Arc::new(StreamStats::default()),
+            Arc::new(StreamerStats::default()),
             MAX_UNSTAKED_CONNECTIONS,
             DEFAULT_MAX_STREAMS_PER_MS,
         ));
@@ -496,7 +496,7 @@ pub mod test {
     #[test]
     fn test_update_ema_if_needed() {
         let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
-            Arc::new(StreamStats::default()),
+            Arc::new(StreamerStats::default()),
             MAX_UNSTAKED_CONNECTIONS,
             DEFAULT_MAX_STREAMS_PER_MS,
         ));

+ 2 - 2
streamer/src/quic.rs

@@ -136,7 +136,7 @@ impl NotifyKeyUpdate for EndpointKeyUpdater {
 }
 
 #[derive(Default)]
-pub struct StreamStats {
+pub struct StreamerStats {
     pub(crate) total_connections: AtomicUsize,
     pub(crate) total_new_connections: AtomicUsize,
     pub(crate) total_streams: AtomicUsize,
@@ -199,7 +199,7 @@ pub struct StreamStats {
     pub(crate) total_incoming_connection_attempts: AtomicUsize,
 }
 
-impl StreamStats {
+impl StreamerStats {
     pub fn report(&self, name: &'static str) {
         let process_sampled_packets_us_hist = {
             let mut metrics = self.process_sampled_packets_us_hist.lock().unwrap();