Quellcode durchsuchen

Support send client pubkey (#8836)

* Support sending client Pubkey in QUIC streamer along with PacketBatch

* addressed some feedback from Alex on unittest

* Update abi digest

* resolved merge conflicts properly
Lijun Wang vor 3 Tagen
Ursprung
Commit
54d95e1a8d

Datei-Diff unterdrückt, da er zu groß ist
+ 160 - 117
Cargo.lock


+ 2 - 2
Cargo.toml

@@ -405,7 +405,7 @@ solana-account-decoder = { path = "account-decoder", version = "=4.0.0-alpha.0",
 solana-account-decoder-client-types = { path = "account-decoder-client-types", version = "=4.0.0-alpha.0", features = ["agave-unstable-api"] }
 solana-account-info = "3.0.0"
 solana-accounts-db = { path = "accounts-db", version = "=4.0.0-alpha.0", features = ["agave-unstable-api"] }
-solana-address = "1.0.0"
+solana-address = "1.1.0"
 solana-address-lookup-table-interface = "3.0.0"
 solana-atomic-u64 = "3.0.0"
 solana-banks-client = { path = "banks-client", version = "=4.0.0-alpha.0", features = ["agave-unstable-api"] }
@@ -493,7 +493,7 @@ solana-nonce = "3.0.0"
 solana-nonce-account = "3.0.0"
 solana-notifier = { path = "notifier", version = "=4.0.0-alpha.0", features = ["agave-unstable-api"] }
 solana-offchain-message = "3.0.0"
-solana-packet = "3.0.0"
+solana-packet = "4.0.0"
 solana-perf = { path = "perf", version = "=4.0.0-alpha.0", features = ["agave-unstable-api"] }
 solana-poh = { path = "poh", version = "=4.0.0-alpha.0", features = ["agave-unstable-api"] }
 solana-poh-config = "3.0.0"

+ 5 - 6
core/src/banking_stage/tpu_to_pack.rs

@@ -199,12 +199,11 @@ mod tests {
     fn test_copy_packet_and_populate_message() {
         let packet_bytes = vec![1, 2, 3, 4, 5];
         let src_ip = Ipv4Addr::new(192, 168, 1, 1);
-        let packet_meta = solana_packet::Meta {
-            size: packet_bytes.len(),
-            addr: IpAddr::V4(src_ip),
-            port: 1,
-            flags: PacketFlags::all(),
-        };
+        let mut packet_meta = solana_packet::Meta::default();
+        packet_meta.size = packet_bytes.len();
+        packet_meta.addr = IpAddr::V4(src_ip);
+        packet_meta.port = 1;
+        packet_meta.flags = PacketFlags::all();
 
         // Buffer to simulate allocated memory
         let mut buffer = [0u8; 256];

+ 1 - 1
core/src/banking_trace.rs

@@ -64,7 +64,7 @@ pub struct BankingTracer {
 #[cfg_attr(
     feature = "frozen-abi",
     derive(AbiExample),
-    frozen_abi(digest = "dJWSTAdP7tkT5KWT8xfSWXYg3MVaGp846y9j871Xov2")
+    frozen_abi(digest = "DY2zjwewCSNansb5xwtoxkCcNuXbVmWZe3U9nNH2kzNz")
 )]
 #[derive(Serialize, Deserialize, Debug)]
 pub struct TimedTracedEvent(pub std::time::SystemTime, pub TracedEvent);

+ 3 - 4
core/src/forwarding_stage.rs

@@ -885,10 +885,9 @@ mod tests {
     }
 
     fn meta_with_flags(packet_flags: PacketFlags) -> packet::Meta {
-        packet::Meta {
-            flags: packet_flags,
-            ..packet::Meta::default()
-        }
+        let mut meta = packet::Meta::default();
+        meta.flags = packet_flags;
+        meta
     }
 
     fn simple_transfer_with_flags(packet_flags: PacketFlags) -> Packet {

+ 5 - 6
core/src/shred_fetch_stage.rs

@@ -436,12 +436,11 @@ pub(crate) fn receive_quic_datagrams(
         let packet_batch: BytesPacketBatch = entries
             .filter(|(_, _, bytes)| bytes.len() <= PACKET_DATA_SIZE)
             .map(|(_pubkey, addr, bytes)| {
-                let meta = Meta {
-                    size: bytes.len(),
-                    addr: addr.ip(),
-                    port: addr.port(),
-                    flags,
-                };
+                let mut meta = Meta::default();
+                meta.size = bytes.len();
+                meta.addr = addr.ip();
+                meta.port = addr.port();
+                meta.flags = flags;
                 BytesPacket::new(bytes, meta)
             })
             .collect();

Datei-Diff unterdrückt, da er zu groß ist
+ 159 - 116
dev-bins/Cargo.lock


+ 4 - 8
perf/src/packet.rs

@@ -52,10 +52,8 @@ impl BytesPacket {
     #[cfg(feature = "dev-context-only-utils")]
     pub fn from_bytes(dest: Option<&SocketAddr>, buffer: impl Into<Bytes>) -> Self {
         let buffer = buffer.into();
-        let mut meta = Meta {
-            size: buffer.len(),
-            ..Default::default()
-        };
+        let mut meta = Meta::default();
+        meta.size = buffer.len();
         if let Some(dest) = dest {
             meta.set_socket_addr(dest);
         }
@@ -74,10 +72,8 @@ impl BytesPacket {
         let buffer = writer.into_inner();
         let buffer = buffer.freeze();
 
-        let mut meta = Meta {
-            size: buffer.len(),
-            ..Default::default()
-        };
+        let mut meta = Meta::default();
+        meta.size = buffer.len();
         if let Some(dest) = dest {
             meta.set_socket_addr(dest);
         }

Datei-Diff unterdrückt, da er zu groß ist
+ 158 - 115
programs/sbf/Cargo.lock


+ 4 - 1
streamer/src/nonblocking/quic.rs

@@ -630,8 +630,11 @@ async fn handle_connection<Q, C>(
         let mut meta = Meta::default();
         meta.set_socket_addr(&remote_addr);
         meta.set_from_staked_node(matches!(peer_type, ConnectionPeerType::Staked(_)));
-        let mut accum = PacketAccumulator::new(meta);
+        if let Some(pubkey) = context.remote_pubkey() {
+            meta.set_remote_pubkey(pubkey);
+        }
 
+        let mut accum = PacketAccumulator::new(meta);
         // Virtually all small transactions will fit in 1 chunk. Larger transactions will fit in 1
         // or 2 chunks if the first chunk starts towards the end of a datagram. A small number of
         // transaction will have other protocol frames inserted in the middle. Empirically it's been

+ 121 - 12
streamer/src/quic.rs

@@ -719,12 +719,16 @@ pub fn spawn_simple_qos_server(
 mod test {
     use {
         super::*,
-        crate::nonblocking::{quic::test::*, testing_utilities::check_multiple_streams},
-        crossbeam_channel::unbounded,
+        crate::nonblocking::{
+            quic::test::*,
+            testing_utilities::{check_multiple_streams, make_client_endpoint},
+        },
+        crossbeam_channel::{unbounded, Receiver},
         solana_net_utils::sockets::bind_to_localhost_unique,
         solana_pubkey::Pubkey,
         solana_signer::Signer,
-        std::{collections::HashMap, net::SocketAddr},
+        std::{collections::HashMap, net::SocketAddr, time::Instant},
+        tokio::time::sleep,
     };
 
     fn rt_for_test() -> Runtime {
@@ -878,9 +882,8 @@ mod test {
 
     #[test]
     fn test_quic_server_multiple_packets_with_simple_qos() {
-        // Send multiple writes from a staked node with SimpleStreamsPerSecond QoS mode
-        // with a super low staked client stake to ensure it can send all packets
-        // within the rate limit.
+        // Send multiple writes from a staked node with simple QoS mode
+        // and verify pubkey is sent along with packets.
         agave_logger::setup();
         let client_keypair = Keypair::new();
         let rich_node_keypair = Keypair::new();
@@ -894,12 +897,10 @@ mod test {
             HashMap::<Pubkey, u64>::default(), // overrides
         );
 
-        let server_params = QuicStreamerConfig {
-            ..QuicStreamerConfig::default_for_tests()
-        };
+        let server_params = QuicStreamerConfig::default_for_tests();
         let qos_config = SimpleQosConfig {
-            max_connections_per_peer: 1,
-            max_streams_per_second: 20, // low limit to ensure staked node can send all packets
+            max_connections_per_peer: 2,
+            max_streams_per_second: 20,
             ..Default::default()
         };
         let server_params = SimpleQosQuicStreamerConfig {
@@ -912,10 +913,11 @@ mod test {
         let runtime = rt_for_test();
         let num_expected_packets = 20;
 
-        runtime.block_on(check_multiple_packets(
+        runtime.block_on(check_multiple_packets_with_client_id(
             receiver,
             server_address,
             Some(&client_keypair),
+            Some(&rich_node_keypair),
             num_expected_packets,
         ));
         cancel.cancel();
@@ -958,4 +960,111 @@ mod test {
         cancel.cancel();
         t.join().unwrap();
     }
+
+    async fn check_multiple_packets_with_client_id(
+        receiver: Receiver<PacketBatch>,
+        server_address: SocketAddr,
+        client_keypair1: Option<&Keypair>,
+        client_keypair2: Option<&Keypair>,
+        num_expected_packets: usize,
+    ) {
+        let conn1 = Arc::new(make_client_endpoint(&server_address, client_keypair1).await);
+        let conn2 = Arc::new(make_client_endpoint(&server_address, client_keypair2).await);
+
+        debug!(
+            "Connections established: {} and {}",
+            conn1.remote_address(),
+            conn2.remote_address(),
+        );
+
+        let expected_client_pubkey_1 = client_keypair1.map(|kp| kp.pubkey());
+        let expected_client_pubkey_2 = client_keypair2.map(|kp| kp.pubkey());
+
+        let mut num_packets_sent = 0;
+        for i in 0..num_expected_packets {
+            debug!("Sending stream pair {i}");
+            let c1 = conn1.clone();
+            let c2 = conn2.clone();
+
+            let mut s1 = c1.open_uni().await.unwrap();
+            let mut s2 = c2.open_uni().await.unwrap();
+
+            s1.write_all(&[0u8]).await.unwrap();
+            s1.finish().unwrap();
+            debug!("Stream {i}.1 sent and finished");
+
+            if i < num_expected_packets - 1 {
+                s2.write_all(&[1u8]).await.unwrap();
+                s2.finish().unwrap();
+                debug!("Stream {i}.2 sent and finished");
+                num_packets_sent += 2;
+            } else {
+                num_packets_sent += 1;
+            }
+        }
+
+        debug!("All streams sent, expecting {num_packets_sent} packets with client ID");
+
+        let now = Instant::now();
+        let mut total_packets = 0;
+        let mut iterations = 0;
+
+        while now.elapsed().as_secs() < 2 {
+            iterations += 1;
+            match receiver.try_recv() {
+                Ok(packet_batch) => {
+                    debug!("Received packet batch (iteration {iterations})");
+
+                    // Verify we get the client pubkey
+                    match &packet_batch {
+                        PacketBatch::Bytes(_) => {
+                            panic!("Expected PacketBatch::Simple but got PacketBatch::Bytes");
+                        }
+                        PacketBatch::Pinned(_) => {
+                            panic!("Expected PacketBatch::Simple but got PacketBatch::Pinned");
+                        }
+                        PacketBatch::Single(packet) => {
+                            if *packet.data(0).unwrap() == 0u8 {
+                                debug!("Packet from stream with client 1");
+                                assert_eq!(packet.meta().remote_pubkey(), expected_client_pubkey_1);
+                            } else if *packet.data(0).unwrap() == 1u8 {
+                                debug!("Packet from stream with client 2");
+                                assert_eq!(packet.meta().remote_pubkey(), expected_client_pubkey_2);
+                            } else {
+                                panic!("Unexpected data in packet: {:?}", packet.data(0));
+                            }
+                            total_packets += 1;
+                        }
+                    }
+                }
+                Err(e) => {
+                    if iterations % 10 == 0 {
+                        debug!("No packets yet (iteration {iterations}): {e:?}");
+                    }
+                    sleep(Duration::from_millis(100)).await;
+                }
+            }
+
+            if total_packets >= num_packets_sent {
+                debug!("Received all expected packets with client ID!");
+                break;
+            }
+
+            if iterations % 50 == 0 {
+                debug!(
+                    "Still waiting... received {total_packets}/{num_packets_sent} packets after \
+                     {iterations} iterations",
+                );
+            }
+        }
+
+        debug!(
+            "Final: received {total_packets}/{num_packets_sent} packets in {iterations} iterations",
+        );
+
+        assert!(
+            total_packets >= num_packets_sent,
+            "Expected at least {num_packets_sent} packets with client ID, got {total_packets}",
+        );
+    }
 }

Einige Dateien werden nicht angezeigt, da zu viele Dateien in diesem Diff geändert wurden.