فهرست منبع

Quic and Fetch break on seeing channel to sigverify disconnected (#5815)

Andrew Fitzgerald 7 ماه پیش
والد
کامیت
d7da5fb5d0
2فایلهای تغییر یافته به همراه16 افزوده شده و 4 حذف شده
  1. 8 2
      streamer/src/nonblocking/quic.rs
  2. 8 2
      streamer/src/streamer.rs

+ 8 - 2
streamer/src/nonblocking/quic.rs

@@ -12,7 +12,7 @@ use {
     },
     async_channel::{bounded as async_bounded, Receiver as AsyncReceiver, Sender as AsyncSender},
     bytes::Bytes,
-    crossbeam_channel::Sender,
+    crossbeam_channel::{Sender, TrySendError},
     futures::{stream::FuturesUnordered, Future, StreamExt as _},
     indexmap::map::{Entry, IndexMap},
     percentage::Percentage,
@@ -923,11 +923,17 @@ async fn packet_batch_sender(
                 let len = packet_batch.len();
                 track_streamer_fetch_packet_performance(&packet_perf_measure, &stats);
 
-                if let Err(e) = packet_sender.send(packet_batch) {
+                if let Err(e) = packet_sender.try_send(packet_batch) {
                     stats
                         .total_packet_batch_send_err
                         .fetch_add(1, Ordering::Relaxed);
                     trace!("Send error: {}", e);
+
+                    // The downstream channel is disconnected, this error is not recoverable.
+                    if matches!(e, TrySendError::Disconnected(_)) {
+                        exit.store(true, Ordering::Relaxed);
+                        return;
+                    }
                 } else {
                     stats
                         .total_packet_batches_sent

+ 8 - 2
streamer/src/streamer.rs

@@ -195,8 +195,14 @@ fn recv_loop(
                     packet_batch
                         .iter_mut()
                         .for_each(|p| p.meta_mut().set_from_staked_node(is_staked_service));
-                    if let Err(TrySendError::Full(_)) = packet_batch_sender.try_send(packet_batch) {
-                        stats.num_packets_dropped.fetch_add(len, Ordering::Relaxed);
+                    match packet_batch_sender.try_send(packet_batch) {
+                        Ok(_) => {}
+                        Err(TrySendError::Full(_)) => {
+                            stats.num_packets_dropped.fetch_add(len, Ordering::Relaxed);
+                        }
+                        Err(TrySendError::Disconnected(err)) => {
+                            return Err(StreamerError::Send(SendError(err)))
+                        }
                     }
                 }
                 break;