serve_repair_service.rs 3.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. use {
  2. crate::repair::{quic_endpoint::RemoteRequest, serve_repair::ServeRepair},
  3. bytes::Bytes,
  4. crossbeam_channel::{unbounded, Receiver, Sender},
  5. solana_net_utils::SocketAddrSpace,
  6. solana_perf::{packet::PacketBatch, recycler::Recycler},
  7. solana_streamer::streamer::{self, StreamerReceiveStats},
  8. std::{
  9. net::{SocketAddr, UdpSocket},
  10. sync::{atomic::AtomicBool, Arc},
  11. thread::{self, Builder, JoinHandle},
  12. time::Duration,
  13. },
  14. tokio::sync::mpsc::Sender as AsyncSender,
  15. };
  16. pub struct ServeRepairService {
  17. thread_hdls: Vec<JoinHandle<()>>,
  18. }
  19. impl ServeRepairService {
  20. pub(crate) fn new(
  21. serve_repair: ServeRepair,
  22. remote_request_sender: Sender<RemoteRequest>,
  23. remote_request_receiver: Receiver<RemoteRequest>,
  24. repair_response_quic_sender: AsyncSender<(SocketAddr, Bytes)>,
  25. serve_repair_socket: UdpSocket,
  26. socket_addr_space: SocketAddrSpace,
  27. stats_reporter_sender: Sender<Box<dyn FnOnce() + Send>>,
  28. exit: Arc<AtomicBool>,
  29. ) -> Self {
  30. let (request_sender, request_receiver) = unbounded();
  31. let serve_repair_socket = Arc::new(serve_repair_socket);
  32. let t_receiver = streamer::receiver(
  33. "solRcvrServeRep".to_string(),
  34. serve_repair_socket.clone(),
  35. exit.clone(),
  36. request_sender,
  37. Recycler::default(),
  38. Arc::new(StreamerReceiveStats::new("serve_repair_receiver")),
  39. Some(Duration::from_millis(1)), // coalesce
  40. false, // use_pinned_memory
  41. None, // in_vote_only_mode
  42. false, // is_staked_service
  43. );
  44. let t_packet_adapter = Builder::new()
  45. .name(String::from("solServRAdapt"))
  46. .spawn(|| adapt_repair_requests_packets(request_receiver, remote_request_sender))
  47. .unwrap();
  48. let (response_sender, response_receiver) = unbounded();
  49. let t_responder = streamer::responder(
  50. "Repair",
  51. serve_repair_socket,
  52. response_receiver,
  53. socket_addr_space,
  54. Some(stats_reporter_sender),
  55. );
  56. let t_listen = serve_repair.listen(
  57. remote_request_receiver,
  58. response_sender,
  59. repair_response_quic_sender,
  60. exit,
  61. );
  62. let thread_hdls = vec![t_receiver, t_packet_adapter, t_responder, t_listen];
  63. Self { thread_hdls }
  64. }
  65. pub(crate) fn join(self) -> thread::Result<()> {
  66. self.thread_hdls.into_iter().try_for_each(JoinHandle::join)
  67. }
  68. }
  69. // Adapts incoming UDP repair requests into RemoteRequest struct.
  70. pub(crate) fn adapt_repair_requests_packets(
  71. packets_receiver: Receiver<PacketBatch>,
  72. remote_request_sender: Sender<RemoteRequest>,
  73. ) {
  74. for packets in packets_receiver {
  75. for packet in &packets {
  76. let Some(bytes) = packet.data(..).map(Vec::from) else {
  77. continue;
  78. };
  79. let request = RemoteRequest {
  80. remote_pubkey: None,
  81. remote_address: packet.meta().socket_addr(),
  82. bytes: Bytes::from(bytes),
  83. };
  84. if remote_request_sender.send(request).is_err() {
  85. return; // The receiver end of the channel is disconnected.
  86. }
  87. }
  88. }
  89. }