packet.rs 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751
  1. //! The `packet` module defines data structures and methods to pull data from the network.
  2. use crate::cuda_runtime::PinnedVec;
  3. use crate::recvmmsg::{recv_mmsg, NUM_RCVMMSGS};
  4. use crate::recycler::{Recycler, Reset};
  5. use crate::result::{Error, Result};
  6. use bincode;
  7. use byteorder::{ByteOrder, LittleEndian};
  8. use serde::Serialize;
  9. use solana_ledger::erasure::ErasureConfig;
  10. use solana_metrics::inc_new_counter_debug;
  11. pub use solana_sdk::packet::{Meta, Packet, PACKET_DATA_SIZE};
  12. use solana_sdk::pubkey::Pubkey;
  13. use solana_sdk::signature::Signable;
  14. use solana_sdk::signature::Signature;
  15. use std::borrow::Cow;
  16. use std::cmp;
  17. use std::fmt;
  18. use std::io;
  19. use std::io::Cursor;
  20. use std::mem;
  21. use std::mem::size_of;
  22. use std::net::{SocketAddr, UdpSocket};
  23. use std::ops::{Deref, DerefMut};
  24. use std::sync::{Arc, RwLock};
  25. use std::time::Instant;
  26. pub type SharedBlob = Arc<RwLock<Blob>>;
  27. pub type SharedBlobs = Vec<SharedBlob>;
  28. pub const NUM_PACKETS: usize = 1024 * 8;
  29. pub const BLOB_SIZE: usize = (2 * 1024 - 128); // wikipedia says there should be 20b for ipv4 headers
  30. pub const BLOB_DATA_SIZE: usize = BLOB_SIZE - (BLOB_HEADER_SIZE * 2);
  31. pub const BLOB_DATA_ALIGN: usize = 16; // safe for erasure input pointers, gf.c needs 16byte-aligned buffers
  32. pub const NUM_BLOBS: usize = (NUM_PACKETS * PACKET_DATA_SIZE) / BLOB_SIZE;
  33. pub const PACKETS_PER_BATCH: usize = 256;
  34. pub const PACKETS_BATCH_SIZE: usize = (PACKETS_PER_BATCH * PACKET_DATA_SIZE);
  35. #[derive(Debug, Clone)]
  36. pub struct Packets {
  37. pub packets: PinnedVec<Packet>,
  38. recycler: Option<PacketsRecycler>,
  39. }
  40. impl Drop for Packets {
  41. fn drop(&mut self) {
  42. if let Some(ref recycler) = self.recycler {
  43. let old = mem::replace(&mut self.packets, PinnedVec::default());
  44. recycler.recycle(old)
  45. }
  46. }
  47. }
  48. impl Reset for Packets {
  49. fn reset(&mut self) {
  50. self.packets.resize(0, Packet::default());
  51. }
  52. }
  53. impl Reset for PinnedVec<Packet> {
  54. fn reset(&mut self) {
  55. self.resize(0, Packet::default());
  56. }
  57. }
  58. //auto derive doesn't support large arrays
  59. impl Default for Packets {
  60. fn default() -> Packets {
  61. let packets = PinnedVec::with_capacity(NUM_RCVMMSGS);
  62. Packets {
  63. packets,
  64. recycler: None,
  65. }
  66. }
  67. }
  68. pub type PacketsRecycler = Recycler<PinnedVec<Packet>>;
  69. impl Packets {
  70. pub fn new(packets: Vec<Packet>) -> Self {
  71. let packets = PinnedVec::from_vec(packets);
  72. Self {
  73. packets,
  74. recycler: None,
  75. }
  76. }
  77. pub fn new_with_recycler(recycler: PacketsRecycler, size: usize, name: &'static str) -> Self {
  78. let mut packets = recycler.allocate(name);
  79. packets.reserve_and_pin(size);
  80. Packets {
  81. packets,
  82. recycler: Some(recycler),
  83. }
  84. }
  85. pub fn set_addr(&mut self, addr: &SocketAddr) {
  86. for m in self.packets.iter_mut() {
  87. m.meta.set_addr(&addr);
  88. }
  89. }
  90. }
  91. #[repr(align(16))] // 16 === BLOB_DATA_ALIGN
  92. pub struct BlobData {
  93. pub data: [u8; BLOB_SIZE],
  94. }
  95. impl Clone for BlobData {
  96. fn clone(&self) -> Self {
  97. BlobData { data: self.data }
  98. }
  99. }
  100. impl Default for BlobData {
  101. fn default() -> Self {
  102. BlobData {
  103. data: [0u8; BLOB_SIZE],
  104. }
  105. }
  106. }
  107. impl PartialEq for BlobData {
  108. fn eq(&self, other: &BlobData) -> bool {
  109. let self_data: &[u8] = self.data.as_ref();
  110. let other_data: &[u8] = other.data.as_ref();
  111. self_data == other_data
  112. }
  113. }
  114. // this code hides _data, maps it to _data.data
  115. impl Deref for Blob {
  116. type Target = BlobData;
  117. fn deref(&self) -> &Self::Target {
  118. &self._data
  119. }
  120. }
  121. impl DerefMut for Blob {
  122. fn deref_mut(&mut self) -> &mut Self::Target {
  123. &mut self._data
  124. }
  125. }
  126. #[derive(Clone, Default, PartialEq)]
  127. pub struct Blob {
  128. _data: BlobData, // hidden member, passed through by Deref
  129. pub meta: Meta,
  130. }
  131. impl fmt::Debug for Blob {
  132. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  133. write!(
  134. f,
  135. "Blob {{ size: {:?}, addr: {:?} }}",
  136. self.meta.size,
  137. self.meta.addr()
  138. )
  139. }
  140. }
  141. #[derive(Debug)]
  142. pub enum BlobError {
  143. /// the Blob's meta and data are not self-consistent
  144. BadState,
  145. /// Blob verification failed
  146. VerificationFailed,
  147. }
  148. impl Packets {
  149. pub fn recv_from(&mut self, socket: &UdpSocket) -> Result<usize> {
  150. let mut i = 0;
  151. //DOCUMENTED SIDE-EFFECT
  152. //Performance out of the IO without poll
  153. // * block on the socket until it's readable
  154. // * set the socket to non blocking
  155. // * read until it fails
  156. // * set it back to blocking before returning
  157. socket.set_nonblocking(false)?;
  158. trace!("receiving on {}", socket.local_addr().unwrap());
  159. let start = Instant::now();
  160. let mut total_size = 0;
  161. loop {
  162. self.packets.resize(i + NUM_RCVMMSGS, Packet::default());
  163. match recv_mmsg(socket, &mut self.packets[i..]) {
  164. Err(_) if i > 0 => {
  165. if start.elapsed().as_millis() > 1 {
  166. break;
  167. }
  168. }
  169. Err(e) => {
  170. trace!("recv_from err {:?}", e);
  171. return Err(Error::IO(e));
  172. }
  173. Ok((size, npkts)) => {
  174. if i == 0 {
  175. socket.set_nonblocking(true)?;
  176. }
  177. trace!("got {} packets", npkts);
  178. i += npkts;
  179. total_size += size;
  180. // Try to batch into big enough buffers
  181. // will cause less re-shuffling later on.
  182. if start.elapsed().as_millis() > 1 || total_size >= PACKETS_BATCH_SIZE {
  183. break;
  184. }
  185. }
  186. }
  187. }
  188. self.packets.truncate(i);
  189. inc_new_counter_debug!("packets-recv_count", i);
  190. Ok(i)
  191. }
  192. pub fn send_to(&self, socket: &UdpSocket) -> Result<()> {
  193. for p in &self.packets {
  194. let a = p.meta.addr();
  195. socket.send_to(&p.data[..p.meta.size], &a)?;
  196. }
  197. Ok(())
  198. }
  199. }
  200. pub fn to_packets_chunked<T: Serialize>(xs: &[T], chunks: usize) -> Vec<Packets> {
  201. let mut out = vec![];
  202. for x in xs.chunks(chunks) {
  203. let mut p = Packets::default();
  204. p.packets.resize(x.len(), Packet::default());
  205. for (i, o) in x.iter().zip(p.packets.iter_mut()) {
  206. let mut wr = io::Cursor::new(&mut o.data[..]);
  207. bincode::serialize_into(&mut wr, &i).expect("serialize request");
  208. let len = wr.position() as usize;
  209. o.meta.size = len;
  210. }
  211. out.push(p);
  212. }
  213. out
  214. }
  215. pub fn to_packets<T: Serialize>(xs: &[T]) -> Vec<Packets> {
  216. to_packets_chunked(xs, NUM_PACKETS)
  217. }
  218. pub fn to_blob<T: Serialize>(resp: T, rsp_addr: SocketAddr) -> Result<Blob> {
  219. let mut b = Blob::default();
  220. let v = bincode::serialize(&resp)?;
  221. let len = v.len();
  222. if len > BLOB_SIZE {
  223. return Err(Error::ToBlobError);
  224. }
  225. b.data[..len].copy_from_slice(&v);
  226. b.meta.size = len;
  227. b.meta.set_addr(&rsp_addr);
  228. Ok(b)
  229. }
  230. pub fn to_blobs<T: Serialize>(rsps: Vec<(T, SocketAddr)>) -> Result<Vec<Blob>> {
  231. let mut blobs = Vec::new();
  232. for (resp, rsp_addr) in rsps {
  233. blobs.push(to_blob(resp, rsp_addr)?);
  234. }
  235. Ok(blobs)
  236. }
  237. pub fn to_shared_blob<T: Serialize>(resp: T, rsp_addr: SocketAddr) -> Result<SharedBlob> {
  238. let blob = Arc::new(RwLock::new(to_blob(resp, rsp_addr)?));
  239. Ok(blob)
  240. }
  241. pub fn to_shared_blobs<T: Serialize>(rsps: Vec<(T, SocketAddr)>) -> Result<SharedBlobs> {
  242. let mut blobs = Vec::new();
  243. for (resp, rsp_addr) in rsps {
  244. blobs.push(to_shared_blob(resp, rsp_addr)?);
  245. }
  246. Ok(blobs)
  247. }
  248. macro_rules! range {
  249. ($prev:expr, $type:ident) => {
  250. $prev..$prev + size_of::<$type>()
  251. };
  252. }
  253. const SIGNATURE_RANGE: std::ops::Range<usize> = range!(0, Signature);
  254. const FORWARDED_RANGE: std::ops::Range<usize> = range!(SIGNATURE_RANGE.end, bool);
  255. const PARENT_RANGE: std::ops::Range<usize> = range!(FORWARDED_RANGE.end, u64);
  256. const VERSION_RANGE: std::ops::Range<usize> = range!(PARENT_RANGE.end, u64);
  257. const SLOT_RANGE: std::ops::Range<usize> = range!(VERSION_RANGE.end, u64);
  258. const INDEX_RANGE: std::ops::Range<usize> = range!(SLOT_RANGE.end, u64);
  259. const ID_RANGE: std::ops::Range<usize> = range!(INDEX_RANGE.end, Pubkey);
  260. const FLAGS_RANGE: std::ops::Range<usize> = range!(ID_RANGE.end, u32);
  261. const ERASURE_CONFIG_RANGE: std::ops::Range<usize> = range!(FLAGS_RANGE.end, ErasureConfig);
  262. const SIZE_RANGE: std::ops::Range<usize> = range!(ERASURE_CONFIG_RANGE.end, u64);
  263. macro_rules! align {
  264. ($x:expr, $align:expr) => {
  265. $x + ($align - 1) & !($align - 1)
  266. };
  267. }
  268. pub const BLOB_HEADER_SIZE: usize = align!(SIZE_RANGE.end, BLOB_DATA_ALIGN); // make sure data() is safe for erasure
  269. pub const SIGNABLE_START: usize = PARENT_RANGE.start;
  270. pub const BLOB_FLAG_IS_LAST_IN_SLOT: u32 = 0x2;
  271. pub const BLOB_FLAG_IS_CODING: u32 = 0x1;
  272. impl Blob {
  273. pub fn new(data: &[u8]) -> Self {
  274. let mut blob = Self::default();
  275. assert!(data.len() <= blob.data.len());
  276. let data_len = cmp::min(data.len(), blob.data.len());
  277. let bytes = &data[..data_len];
  278. blob.data[..data_len].copy_from_slice(bytes);
  279. blob.meta.size = data_len;
  280. blob
  281. }
  282. pub fn from_serializable<T: Serialize + ?Sized>(data: &T) -> Self {
  283. let mut blob = Self::default();
  284. let pos = {
  285. let mut out = Cursor::new(blob.data_mut());
  286. bincode::serialize_into(&mut out, data).expect("failed to serialize output");
  287. out.position() as usize
  288. };
  289. blob.set_size(pos);
  290. blob.set_erasure_config(&ErasureConfig::default());
  291. blob
  292. }
  293. pub fn parent(&self) -> u64 {
  294. LittleEndian::read_u64(&self.data[PARENT_RANGE])
  295. }
  296. pub fn set_parent(&mut self, ix: u64) {
  297. LittleEndian::write_u64(&mut self.data[PARENT_RANGE], ix);
  298. }
  299. pub fn version(&self) -> u64 {
  300. LittleEndian::read_u64(&self.data[VERSION_RANGE])
  301. }
  302. pub fn set_version(&mut self, version: u64) {
  303. LittleEndian::write_u64(&mut self.data[VERSION_RANGE], version);
  304. }
  305. pub fn slot(&self) -> u64 {
  306. LittleEndian::read_u64(&self.data[SLOT_RANGE])
  307. }
  308. pub fn set_slot(&mut self, ix: u64) {
  309. LittleEndian::write_u64(&mut self.data[SLOT_RANGE], ix);
  310. }
  311. pub fn index(&self) -> u64 {
  312. LittleEndian::read_u64(&self.data[INDEX_RANGE])
  313. }
  314. pub fn set_index(&mut self, ix: u64) {
  315. LittleEndian::write_u64(&mut self.data[INDEX_RANGE], ix);
  316. }
  317. pub fn set_erasure_config(&mut self, config: &ErasureConfig) {
  318. self.data[ERASURE_CONFIG_RANGE].copy_from_slice(&bincode::serialize(config).unwrap())
  319. }
  320. pub fn erasure_config(&self) -> ErasureConfig {
  321. bincode::deserialize(&self.data[ERASURE_CONFIG_RANGE]).unwrap_or_default()
  322. }
  323. pub fn seed(&self) -> [u8; 32] {
  324. let mut seed = [0; 32];
  325. let seed_len = seed.len();
  326. let signature_bytes = self.get_signature_bytes();
  327. seed[0..seed_len].copy_from_slice(&signature_bytes[(signature_bytes.len() - seed_len)..]);
  328. seed
  329. }
  330. /// sender id, we use this for identifying if its a blob from the leader that we should
  331. /// retransmit. eventually blobs should have a signature that we can use for spam filtering
  332. pub fn id(&self) -> Pubkey {
  333. Pubkey::new(&self.data[ID_RANGE])
  334. }
  335. pub fn set_id(&mut self, id: &Pubkey) {
  336. self.data[ID_RANGE].copy_from_slice(id.as_ref())
  337. }
  338. /// Used to determine whether or not this blob should be forwarded in retransmit
  339. /// A bool is used here instead of a flag because this item is not intended to be signed when
  340. /// blob signatures are introduced
  341. pub fn should_forward(&self) -> bool {
  342. self.data[FORWARDED_RANGE][0] & 0x1 == 0
  343. }
  344. /// Mark this blob's forwarded status
  345. pub fn set_forwarded(&mut self, forward: bool) {
  346. self.data[FORWARDED_RANGE][0] = u8::from(forward)
  347. }
  348. pub fn flags(&self) -> u32 {
  349. LittleEndian::read_u32(&self.data[FLAGS_RANGE])
  350. }
  351. pub fn set_flags(&mut self, ix: u32) {
  352. LittleEndian::write_u32(&mut self.data[FLAGS_RANGE], ix);
  353. }
  354. pub fn is_coding(&self) -> bool {
  355. (self.flags() & BLOB_FLAG_IS_CODING) != 0
  356. }
  357. pub fn set_coding(&mut self) {
  358. let flags = self.flags();
  359. self.set_flags(flags | BLOB_FLAG_IS_CODING);
  360. }
  361. pub fn set_is_last_in_slot(&mut self) {
  362. let flags = self.flags();
  363. self.set_flags(flags | BLOB_FLAG_IS_LAST_IN_SLOT);
  364. }
  365. pub fn is_last_in_slot(&self) -> bool {
  366. (self.flags() & BLOB_FLAG_IS_LAST_IN_SLOT) != 0
  367. }
  368. pub fn data_size(&self) -> u64 {
  369. cmp::min(
  370. LittleEndian::read_u64(&self.data[SIZE_RANGE]),
  371. BLOB_SIZE as u64,
  372. )
  373. }
  374. pub fn set_data_size(&mut self, size: u64) {
  375. LittleEndian::write_u64(&mut self.data[SIZE_RANGE], size);
  376. }
  377. pub fn data(&self) -> &[u8] {
  378. &self.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + BLOB_DATA_SIZE]
  379. }
  380. pub fn data_mut(&mut self) -> &mut [u8] {
  381. &mut self.data[BLOB_HEADER_SIZE..BLOB_HEADER_SIZE + BLOB_DATA_SIZE]
  382. }
  383. pub fn size(&self) -> usize {
  384. let size = self.data_size() as usize;
  385. if size > BLOB_HEADER_SIZE && size == self.meta.size {
  386. size - BLOB_HEADER_SIZE
  387. } else {
  388. 0
  389. }
  390. }
  391. pub fn set_size(&mut self, size: usize) {
  392. let new_size = size + BLOB_HEADER_SIZE;
  393. self.meta.size = new_size;
  394. self.set_data_size(new_size as u64);
  395. }
  396. pub fn get_signature_bytes(&self) -> &[u8] {
  397. &self.data[SIGNATURE_RANGE]
  398. }
  399. pub fn recv_blob(socket: &UdpSocket, r: &SharedBlob) -> io::Result<()> {
  400. let mut p = r.write().unwrap();
  401. trace!("receiving on {}", socket.local_addr().unwrap());
  402. let (nrecv, from) = socket.recv_from(&mut p.data)?;
  403. p.meta.size = nrecv;
  404. p.meta.set_addr(&from);
  405. trace!("got {} bytes from {}", nrecv, from);
  406. Ok(())
  407. }
  408. pub fn recv_from(socket: &UdpSocket) -> Result<SharedBlobs> {
  409. let mut v = Vec::new();
  410. //DOCUMENTED SIDE-EFFECT
  411. //Performance out of the IO without poll
  412. // * block on the socket until it's readable
  413. // * set the socket to non blocking
  414. // * read until it fails
  415. // * set it back to blocking before returning
  416. socket.set_nonblocking(false)?;
  417. for i in 0..NUM_BLOBS {
  418. let r = SharedBlob::default();
  419. match Blob::recv_blob(socket, &r) {
  420. Err(_) if i > 0 => {
  421. trace!("got {:?} messages on {}", i, socket.local_addr().unwrap());
  422. break;
  423. }
  424. Err(e) => {
  425. if e.kind() != io::ErrorKind::WouldBlock && e.kind() != io::ErrorKind::TimedOut
  426. {
  427. info!("recv_from err {:?}", e);
  428. }
  429. return Err(Error::IO(e));
  430. }
  431. Ok(()) => {
  432. if i == 0 {
  433. socket.set_nonblocking(true)?;
  434. }
  435. }
  436. }
  437. v.push(r);
  438. }
  439. Ok(v)
  440. }
  441. pub fn send_to(socket: &UdpSocket, v: SharedBlobs) -> Result<()> {
  442. for r in v {
  443. {
  444. let p = r.read().unwrap();
  445. let a = p.meta.addr();
  446. if let Err(e) = socket.send_to(&p.data[..p.meta.size], &a) {
  447. warn!(
  448. "error sending {} byte packet to {:?}: {:?}",
  449. p.meta.size, a, e
  450. );
  451. return Err(e.into());
  452. }
  453. }
  454. }
  455. Ok(())
  456. }
  457. }
  458. impl Signable for Blob {
  459. fn pubkey(&self) -> Pubkey {
  460. self.id()
  461. }
  462. fn signable_data(&self) -> Cow<[u8]> {
  463. let end = cmp::max(SIGNABLE_START, self.data_size() as usize);
  464. Cow::Borrowed(&self.data[SIGNABLE_START..end])
  465. }
  466. fn get_signature(&self) -> Signature {
  467. Signature::new(self.get_signature_bytes())
  468. }
  469. fn set_signature(&mut self, signature: Signature) {
  470. self.data[SIGNATURE_RANGE].copy_from_slice(signature.as_ref())
  471. }
  472. }
  473. pub fn index_blobs(blobs: &[SharedBlob], id: &Pubkey, mut blob_index: u64, slot: u64, parent: u64) {
  474. // enumerate all the blobs, those are the indices
  475. for blob in blobs.iter() {
  476. let mut blob = blob.write().unwrap();
  477. blob.set_index(blob_index);
  478. blob.set_slot(slot);
  479. blob.set_parent(parent);
  480. blob.set_id(id);
  481. blob_index += 1;
  482. }
  483. }
  484. #[cfg(test)]
  485. mod tests {
  486. use super::*;
  487. use solana_sdk::hash::Hash;
  488. use solana_sdk::signature::{Keypair, KeypairUtil};
  489. use solana_sdk::system_transaction;
  490. use std::io;
  491. use std::io::Write;
  492. use std::net::{SocketAddr, UdpSocket};
  493. #[test]
  494. fn test_packets_set_addr() {
  495. // test that the address is actually being updated
  496. let send_addr = socketaddr!([127, 0, 0, 1], 123);
  497. let packets = vec![Packet::default()];
  498. let mut msgs = Packets::new(packets);
  499. msgs.set_addr(&send_addr);
  500. assert_eq!(SocketAddr::from(msgs.packets[0].meta.addr()), send_addr);
  501. }
  502. #[test]
  503. pub fn packet_send_recv() {
  504. solana_logger::setup();
  505. let recv_socket = UdpSocket::bind("127.0.0.1:0").expect("bind");
  506. let addr = recv_socket.local_addr().unwrap();
  507. let send_socket = UdpSocket::bind("127.0.0.1:0").expect("bind");
  508. let saddr = send_socket.local_addr().unwrap();
  509. let mut p = Packets::default();
  510. p.packets.resize(10, Packet::default());
  511. for m in p.packets.iter_mut() {
  512. m.meta.set_addr(&addr);
  513. m.meta.size = PACKET_DATA_SIZE;
  514. }
  515. p.send_to(&send_socket).unwrap();
  516. let recvd = p.recv_from(&recv_socket).unwrap();
  517. assert_eq!(recvd, p.packets.len());
  518. for m in &p.packets {
  519. assert_eq!(m.meta.size, PACKET_DATA_SIZE);
  520. assert_eq!(m.meta.addr(), saddr);
  521. }
  522. }
  523. #[test]
  524. fn test_to_packets() {
  525. let keypair = Keypair::new();
  526. let hash = Hash::new(&[1; 32]);
  527. let tx = system_transaction::transfer_now(&keypair, &keypair.pubkey(), 1, hash);
  528. let rv = to_packets(&vec![tx.clone(); 1]);
  529. assert_eq!(rv.len(), 1);
  530. assert_eq!(rv[0].packets.len(), 1);
  531. let rv = to_packets(&vec![tx.clone(); NUM_PACKETS]);
  532. assert_eq!(rv.len(), 1);
  533. assert_eq!(rv[0].packets.len(), NUM_PACKETS);
  534. let rv = to_packets(&vec![tx.clone(); NUM_PACKETS + 1]);
  535. assert_eq!(rv.len(), 2);
  536. assert_eq!(rv[0].packets.len(), NUM_PACKETS);
  537. assert_eq!(rv[1].packets.len(), 1);
  538. }
  539. #[test]
  540. pub fn blob_send_recv() {
  541. trace!("start");
  542. let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
  543. let addr = reader.local_addr().unwrap();
  544. let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
  545. let p = SharedBlob::default();
  546. p.write().unwrap().meta.set_addr(&addr);
  547. p.write().unwrap().meta.size = 1024;
  548. let v = vec![p];
  549. Blob::send_to(&sender, v).unwrap();
  550. trace!("send_to");
  551. let rv = Blob::recv_from(&reader).unwrap();
  552. trace!("recv_from");
  553. assert_eq!(rv.len(), 1);
  554. assert_eq!(rv[0].read().unwrap().meta.size, 1024);
  555. }
  556. #[cfg(all(feature = "ipv6", test))]
  557. #[test]
  558. pub fn blob_ipv6_send_recv() {
  559. let reader = UdpSocket::bind("[::1]:0").expect("bind");
  560. let addr = reader.local_addr().unwrap();
  561. let sender = UdpSocket::bind("[::1]:0").expect("bind");
  562. let p = SharedBlob::default();
  563. p.as_mut().unwrap().meta.set_addr(&addr);
  564. p.as_mut().unwrap().meta.size = 1024;
  565. let mut v = VecDeque::default();
  566. v.push_back(p);
  567. Blob::send_to(&r, &sender, &mut v).unwrap();
  568. let mut rv = Blob::recv_from(&reader).unwrap();
  569. let rp = rv.pop_front().unwrap();
  570. assert_eq!(rp.as_mut().meta.size, 1024);
  571. }
  572. #[test]
  573. pub fn debug_trait() {
  574. write!(io::sink(), "{:?}", Packet::default()).unwrap();
  575. write!(io::sink(), "{:?}", Packets::default()).unwrap();
  576. write!(io::sink(), "{:?}", Blob::default()).unwrap();
  577. }
  578. #[test]
  579. pub fn blob_test() {
  580. let mut b = Blob::default();
  581. b.set_index(<u64>::max_value());
  582. assert_eq!(b.index(), <u64>::max_value());
  583. b.data_mut()[0] = 1;
  584. assert_eq!(b.data()[0], 1);
  585. assert_eq!(b.index(), <u64>::max_value());
  586. assert_eq!(b.meta, Meta::default());
  587. }
  588. #[test]
  589. fn test_blob_forward() {
  590. let mut b = Blob::default();
  591. assert!(b.should_forward());
  592. b.set_forwarded(true);
  593. assert!(!b.should_forward());
  594. }
  595. #[test]
  596. fn test_blob_erasure_config() {
  597. let mut b = Blob::default();
  598. let config = ErasureConfig::new(32, 16);
  599. b.set_erasure_config(&config);
  600. assert_eq!(config, b.erasure_config());
  601. }
  602. #[test]
  603. fn test_blob_data_align() {
  604. assert_eq!(std::mem::align_of::<BlobData>(), BLOB_DATA_ALIGN);
  605. }
  606. #[test]
  607. fn test_packet_partial_eq() {
  608. let mut p1 = Packet::default();
  609. let mut p2 = Packet::default();
  610. p1.meta.size = 1;
  611. p1.data[0] = 0;
  612. p2.meta.size = 1;
  613. p2.data[0] = 0;
  614. assert!(p1 == p2);
  615. p2.data[0] = 4;
  616. assert!(p1 != p2);
  617. }
  618. #[test]
  619. fn test_blob_partial_eq() {
  620. let p1 = Blob::default();
  621. let mut p2 = Blob::default();
  622. assert!(p1 == p2);
  623. p2.data[1] = 4;
  624. assert!(p1 != p2);
  625. }
  626. #[test]
  627. fn test_sign_blob() {
  628. let mut b = Blob::default();
  629. let k = Keypair::new();
  630. let p = k.pubkey();
  631. b.set_id(&p);
  632. b.sign(&k);
  633. assert!(b.verify());
  634. // Set a bigger chunk of data to sign
  635. b.set_size(80);
  636. b.sign(&k);
  637. assert!(b.verify());
  638. }
  639. #[test]
  640. fn test_packets_reset() {
  641. let mut packets = Packets::default();
  642. packets.packets.resize(10, Packet::default());
  643. assert_eq!(packets.packets.len(), 10);
  644. packets.reset();
  645. assert_eq!(packets.packets.len(), 0);
  646. }
  647. #[test]
  648. fn test_version() {
  649. let mut b = Blob::default();
  650. assert_eq!(b.version(), 0);
  651. b.set_version(1);
  652. assert_eq!(b.version(), 1);
  653. }
  654. }