Эх сурвалжийг харах

Sleep packet_batch_sender task when we don't have a packet batch (#1986)

* Sleep the packet_batch_sender when we don't have a packet batch we need to send until we actually get a packet. Also set sleep times based on the remaining timeout on the coalesce time.
ryleung-solana 1 жил өмнө
parent
commit
afd92ab932

+ 17 - 1
streamer/src/nonblocking/quic.rs

@@ -807,6 +807,8 @@ fn handle_connection_error(e: quinn::ConnectionError, stats: &StreamerStats, fro
     }
     }
 }
 }
 
 
+// Holder(s) of the AsyncSender<PacketAccumulator> on the other end should not
+// wait for this function to exit to exit
 async fn packet_batch_sender(
 async fn packet_batch_sender(
     packet_sender: Sender<PacketBatch>,
     packet_sender: Sender<PacketBatch>,
     packet_receiver: AsyncReceiver<PacketAccumulator>,
     packet_receiver: AsyncReceiver<PacketAccumulator>,
@@ -862,7 +864,19 @@ async fn packet_batch_sender(
                 break;
                 break;
             }
             }
 
 
-            let timeout_res = timeout(Duration::from_micros(250), packet_receiver.recv()).await;
+            let timeout_res = if !packet_batch.is_empty() {
+                // If we get here, elapsed < coalesce (see above if condition)
+                timeout(coalesce - elapsed, packet_receiver.recv()).await
+            } else {
+                // Small bit of non-idealness here: the holder(s) of the other end
+                // of packet_receiver must drop it (without waiting for us to exit)
+                // or we have a chance of sleeping here forever
+                // and never polling exit. Not a huge deal in practice as the
+                // only time this happens is when we tear down the server
+                // and at that time the other end does indeed not wait for us
+                // to exit here
+                Ok(packet_receiver.recv().await)
+            };
 
 
             if let Ok(Ok(packet_accumulator)) = timeout_res {
             if let Ok(Ok(packet_accumulator)) = timeout_res {
                 // Start the timeout from when the packet batch first becomes non-empty
                 // Start the timeout from when the packet batch first becomes non-empty
@@ -1782,6 +1796,8 @@ pub mod test {
         }
         }
         assert_eq!(i, num_packets);
         assert_eq!(i, num_packets);
         exit.store(true, Ordering::Relaxed);
         exit.store(true, Ordering::Relaxed);
+        // Explicit drop to wake up packet_batch_sender
+        drop(ptk_sender);
         handle.await.unwrap();
         handle.await.unwrap();
     }
     }