Ver Fonte

Revert "Add limit and shrink policy for recycler (#15320)"

This reverts commit c2e8814dcee0f8741bcf680dc9bea88b59862dcb.
behzad nouri há 4 anos atrás
pai
commit
e405747409

+ 0 - 1
Cargo.lock

@@ -4841,7 +4841,6 @@ dependencies = [
  "serde",
  "serde",
  "solana-budget-program",
  "solana-budget-program",
  "solana-logger 1.7.0",
  "solana-logger 1.7.0",
- "solana-measure",
  "solana-metrics",
  "solana-metrics",
  "solana-rayon-threadlimit",
  "solana-rayon-threadlimit",
  "solana-sdk",
  "solana-sdk",

+ 1 - 1
bench-streamer/src/main.rs

@@ -75,7 +75,7 @@ fn main() -> Result<()> {
 
 
     let mut read_channels = Vec::new();
     let mut read_channels = Vec::new();
     let mut read_threads = Vec::new();
     let mut read_threads = Vec::new();
-    let recycler = PacketsRecycler::new_without_limit("bench-streamer-recycler-shrink-stats");
+    let recycler = PacketsRecycler::default();
     for _ in 0..num_sockets {
     for _ in 0..num_sockets {
         let read = solana_net_utils::bind_to(ip_addr, port, false).unwrap();
         let read = solana_net_utils::bind_to(ip_addr, port, false).unwrap();
         read.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
         read.set_read_timeout(Some(Duration::new(1, 0))).unwrap();

+ 6 - 6
core/src/cluster_info.rs

@@ -1923,7 +1923,7 @@ impl ClusterInfo {
                 let mut last_contact_info_trace = timestamp();
                 let mut last_contact_info_trace = timestamp();
                 let mut last_contact_info_save = timestamp();
                 let mut last_contact_info_save = timestamp();
                 let mut entrypoints_processed = false;
                 let mut entrypoints_processed = false;
-                let recycler = PacketsRecycler::new_without_limit("gossip-recycler-shrink-stats");
+                let recycler = PacketsRecycler::default();
                 let crds_data = vec![
                 let crds_data = vec![
                     CrdsData::Version(Version::new(self.id())),
                     CrdsData::Version(Version::new(self.id())),
                     CrdsData::NodeInstance(self.instance.with_wallclock(timestamp())),
                     CrdsData::NodeInstance(self.instance.with_wallclock(timestamp())),
@@ -2187,7 +2187,7 @@ impl ClusterInfo {
             .process_pull_requests(callers.cloned(), timestamp());
             .process_pull_requests(callers.cloned(), timestamp());
         let output_size_limit =
         let output_size_limit =
             self.update_data_budget(stakes.len()) / PULL_RESPONSE_MIN_SERIALIZED_SIZE;
             self.update_data_budget(stakes.len()) / PULL_RESPONSE_MIN_SERIALIZED_SIZE;
-        let mut packets = Packets::new_with_recycler(recycler.clone(), 64).unwrap();
+        let mut packets = Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests");
         let (caller_and_filters, addrs): (Vec<_>, Vec<_>) = {
         let (caller_and_filters, addrs): (Vec<_>, Vec<_>) = {
             let mut rng = rand::thread_rng();
             let mut rng = rand::thread_rng();
             let check_pull_request =
             let check_pull_request =
@@ -2472,7 +2472,8 @@ impl ClusterInfo {
         if packets.is_empty() {
         if packets.is_empty() {
             None
             None
         } else {
         } else {
-            let packets = Packets::new_with_recycler_data(recycler, packets).unwrap();
+            let packets =
+                Packets::new_with_recycler_data(recycler, "handle_ping_messages", packets);
             Some(packets)
             Some(packets)
         }
         }
     }
     }
@@ -3164,8 +3165,7 @@ impl ClusterInfo {
         exit: &Arc<AtomicBool>,
         exit: &Arc<AtomicBool>,
     ) -> JoinHandle<()> {
     ) -> JoinHandle<()> {
         let exit = exit.clone();
         let exit = exit.clone();
-        let recycler =
-            PacketsRecycler::new_without_limit("cluster-info-listen-recycler-shrink-stats");
+        let recycler = PacketsRecycler::default();
         Builder::new()
         Builder::new()
             .name("solana-listen".to_string())
             .name("solana-listen".to_string())
             .spawn(move || {
             .spawn(move || {
@@ -3611,7 +3611,7 @@ mod tests {
             .iter()
             .iter()
             .map(|ping| Pong::new(ping, &this_node).unwrap())
             .map(|ping| Pong::new(ping, &this_node).unwrap())
             .collect();
             .collect();
-        let recycler = PacketsRecycler::new_without_limit("");
+        let recycler = PacketsRecycler::default();
         let packets = cluster_info
         let packets = cluster_info
             .handle_ping_messages(
             .handle_ping_messages(
                 remote_nodes
                 remote_nodes

+ 1 - 6
core/src/fetch_stage.rs

@@ -35,7 +35,6 @@ impl FetchStage {
                 exit,
                 exit,
                 &sender,
                 &sender,
                 &poh_recorder,
                 &poh_recorder,
-                None,
                 coalesce_ms,
                 coalesce_ms,
             ),
             ),
             receiver,
             receiver,
@@ -47,7 +46,6 @@ impl FetchStage {
         exit: &Arc<AtomicBool>,
         exit: &Arc<AtomicBool>,
         sender: &PacketSender,
         sender: &PacketSender,
         poh_recorder: &Arc<Mutex<PohRecorder>>,
         poh_recorder: &Arc<Mutex<PohRecorder>>,
-        allocated_packet_limit: Option<u32>,
         coalesce_ms: u64,
         coalesce_ms: u64,
     ) -> Self {
     ) -> Self {
         let tx_sockets = sockets.into_iter().map(Arc::new).collect();
         let tx_sockets = sockets.into_iter().map(Arc::new).collect();
@@ -58,7 +56,6 @@ impl FetchStage {
             exit,
             exit,
             &sender,
             &sender,
             &poh_recorder,
             &poh_recorder,
-            allocated_packet_limit,
             coalesce_ms,
             coalesce_ms,
         )
         )
     }
     }
@@ -104,11 +101,9 @@ impl FetchStage {
         exit: &Arc<AtomicBool>,
         exit: &Arc<AtomicBool>,
         sender: &PacketSender,
         sender: &PacketSender,
         poh_recorder: &Arc<Mutex<PohRecorder>>,
         poh_recorder: &Arc<Mutex<PohRecorder>>,
-        limit: Option<u32>,
         coalesce_ms: u64,
         coalesce_ms: u64,
     ) -> Self {
     ) -> Self {
-        let recycler: PacketsRecycler =
-            Recycler::warmed(1000, 1024, limit, "fetch_stage_recycler_shrink");
+        let recycler: PacketsRecycler = Recycler::warmed(1000, 1024);
 
 
         let tpu_threads = sockets.into_iter().map(|socket| {
         let tpu_threads = sockets.into_iter().map(|socket| {
             streamer::receiver(
             streamer::receiver(

+ 1 - 1
core/src/gossip_service.rs

@@ -47,7 +47,7 @@ impl GossipService {
             gossip_socket.clone(),
             gossip_socket.clone(),
             &exit,
             &exit,
             request_sender,
             request_sender,
-            Recycler::new_without_limit("gossip-receiver-recycler-shrink-stats"),
+            Recycler::default(),
             "gossip_receiver",
             "gossip_receiver",
             1,
             1,
         );
         );

+ 16 - 8
core/src/serve_repair.rs

@@ -278,7 +278,7 @@ impl ServeRepair {
         exit: &Arc<AtomicBool>,
         exit: &Arc<AtomicBool>,
     ) -> JoinHandle<()> {
     ) -> JoinHandle<()> {
         let exit = exit.clone();
         let exit = exit.clone();
-        let recycler = PacketsRecycler::new_without_limit("serve-repair-recycler-shrink-stats");
+        let recycler = PacketsRecycler::default();
         Builder::new()
         Builder::new()
             .name("solana-repair-listen".to_string())
             .name("solana-repair-listen".to_string())
             .spawn(move || {
             .spawn(move || {
@@ -490,7 +490,11 @@ impl ServeRepair {
 
 
             if let Some(packet) = packet {
             if let Some(packet) = packet {
                 inc_new_counter_debug!("serve_repair-window-request-ledger", 1);
                 inc_new_counter_debug!("serve_repair-window-request-ledger", 1);
-                return Some(Packets::new_with_recycler_data(recycler, vec![packet])).unwrap();
+                return Some(Packets::new_with_recycler_data(
+                    recycler,
+                    "run_window_request",
+                    vec![packet],
+                ));
             }
             }
         }
         }
 
 
@@ -526,7 +530,11 @@ impl ServeRepair {
                 from_addr,
                 from_addr,
                 nonce,
                 nonce,
             )?;
             )?;
-            return Packets::new_with_recycler_data(recycler, vec![packet]);
+            return Some(Packets::new_with_recycler_data(
+                recycler,
+                "run_highest_window_request",
+                vec![packet],
+            ));
         }
         }
         None
         None
     }
     }
@@ -539,7 +547,7 @@ impl ServeRepair {
         max_responses: usize,
         max_responses: usize,
         nonce: Nonce,
         nonce: Nonce,
     ) -> Option<Packets> {
     ) -> Option<Packets> {
-        let mut res = Packets::new_with_recycler(recycler.clone(), 64).unwrap();
+        let mut res = Packets::new_with_recycler(recycler.clone(), 64, "run_orphan");
         if let Some(blockstore) = blockstore {
         if let Some(blockstore) = blockstore {
             // Try to find the next "n" parent slots of the input slot
             // Try to find the next "n" parent slots of the input slot
             while let Ok(Some(meta)) = blockstore.meta(slot) {
             while let Ok(Some(meta)) = blockstore.meta(slot) {
@@ -593,7 +601,7 @@ mod tests {
 
 
     /// test run_window_request responds with the right shred, and do not overrun
     /// test run_window_request responds with the right shred, and do not overrun
     fn run_highest_window_request(slot: Slot, num_slots: u64, nonce: Nonce) {
     fn run_highest_window_request(slot: Slot, num_slots: u64, nonce: Nonce) {
-        let recycler = PacketsRecycler::new_without_limit("");
+        let recycler = PacketsRecycler::default();
         solana_logger::setup();
         solana_logger::setup();
         let ledger_path = get_tmp_ledger_path!();
         let ledger_path = get_tmp_ledger_path!();
         {
         {
@@ -661,7 +669,7 @@ mod tests {
 
 
     /// test window requests respond with the right shred, and do not overrun
     /// test window requests respond with the right shred, and do not overrun
     fn run_window_request(slot: Slot, nonce: Nonce) {
     fn run_window_request(slot: Slot, nonce: Nonce) {
-        let recycler = PacketsRecycler::new_without_limit("");
+        let recycler = PacketsRecycler::default();
         solana_logger::setup();
         solana_logger::setup();
         let ledger_path = get_tmp_ledger_path!();
         let ledger_path = get_tmp_ledger_path!();
         {
         {
@@ -829,7 +837,7 @@ mod tests {
 
 
     fn run_orphan(slot: Slot, num_slots: u64, nonce: Nonce) {
     fn run_orphan(slot: Slot, num_slots: u64, nonce: Nonce) {
         solana_logger::setup();
         solana_logger::setup();
-        let recycler = PacketsRecycler::new_without_limit("");
+        let recycler = PacketsRecycler::default();
         let ledger_path = get_tmp_ledger_path!();
         let ledger_path = get_tmp_ledger_path!();
         {
         {
             let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
             let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
@@ -900,7 +908,7 @@ mod tests {
     #[test]
     #[test]
     fn run_orphan_corrupted_shred_size() {
     fn run_orphan_corrupted_shred_size() {
         solana_logger::setup();
         solana_logger::setup();
-        let recycler = PacketsRecycler::new_without_limit("");
+        let recycler = PacketsRecycler::default();
         let ledger_path = get_tmp_ledger_path!();
         let ledger_path = get_tmp_ledger_path!();
         {
         {
             let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
             let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());

+ 1 - 1
core/src/serve_repair_service.rs

@@ -30,7 +30,7 @@ impl ServeRepairService {
             serve_repair_socket.clone(),
             serve_repair_socket.clone(),
             &exit,
             &exit,
             request_sender,
             request_sender,
-            Recycler::new_without_limit("serve-repair-receiver-recycler-shrink-stats"),
+            Recycler::default(),
             "serve_repair_receiver",
             "serve_repair_receiver",
             1,
             1,
         );
         );

+ 1 - 3
core/src/shred_fetch_stage.rs

@@ -168,10 +168,8 @@ impl ShredFetchStage {
         sender: &PacketSender,
         sender: &PacketSender,
         bank_forks: Option<Arc<RwLock<BankForks>>>,
         bank_forks: Option<Arc<RwLock<BankForks>>>,
         exit: &Arc<AtomicBool>,
         exit: &Arc<AtomicBool>,
-        limit: Option<u32>,
     ) -> Self {
     ) -> Self {
-        let recycler: PacketsRecycler =
-            Recycler::warmed(100, 1024, limit, "shred_fetch_stage_recycler_shrink");
+        let recycler: PacketsRecycler = Recycler::warmed(100, 1024);
 
 
         let (mut tvu_threads, tvu_filter) = Self::packet_modifier(
         let (mut tvu_threads, tvu_filter) = Self::packet_modifier(
             sockets,
             sockets,

+ 2 - 2
core/src/sigverify.rs

@@ -23,8 +23,8 @@ impl Default for TransactionSigVerifier {
     fn default() -> Self {
     fn default() -> Self {
         init();
         init();
         Self {
         Self {
-            recycler: Recycler::warmed(50, 4096, None, ""),
-            recycler_out: Recycler::warmed(50, 4096, None, ""),
+            recycler: Recycler::warmed(50, 4096),
+            recycler_out: Recycler::warmed(50, 4096),
         }
         }
     }
     }
 }
 }

+ 1 - 4
core/src/sigverify_shreds.rs

@@ -25,10 +25,7 @@ impl ShredSigVerifier {
         Self {
         Self {
             bank_forks,
             bank_forks,
             leader_schedule_cache,
             leader_schedule_cache,
-            recycler_cache: RecyclerCache::warmed(
-                "shred-sig-verifier-offsets-recycler-shrink-stats",
-                "shred-sig-verifier-buffer-recycler-shrink-stats",
-            ),
+            recycler_cache: RecyclerCache::warmed(),
         }
         }
     }
     }
     fn read_slots(batches: &[Packets]) -> HashSet<u64> {
     fn read_slots(batches: &[Packets]) -> HashSet<u64> {

+ 0 - 3
core/src/tpu.rs

@@ -75,9 +75,6 @@ impl Tpu {
             &exit,
             &exit,
             &packet_sender,
             &packet_sender,
             &poh_recorder,
             &poh_recorder,
-            // At 1024 packets per `Packet`, each packet about MTU size ~1k, this is roughly
-            // 20GB
-            Some(20_000),
             tpu_coalesce_ms,
             tpu_coalesce_ms,
         );
         );
         let (verified_sender, verified_receiver) = unbounded();
         let (verified_sender, verified_receiver) = unbounded();

+ 0 - 1
core/src/tvu.rs

@@ -151,7 +151,6 @@ impl Tvu {
             &fetch_sender,
             &fetch_sender,
             Some(bank_forks.clone()),
             Some(bank_forks.clone()),
             &exit,
             &exit,
-            None,
         );
         );
 
 
         let (verified_sender, verified_receiver) = unbounded();
         let (verified_sender, verified_receiver) = unbounded();

+ 1 - 1
ledger/benches/sigverify_shreds.rs

@@ -16,7 +16,7 @@ const NUM_PACKETS: usize = 256;
 const NUM_BATCHES: usize = 1;
 const NUM_BATCHES: usize = 1;
 #[bench]
 #[bench]
 fn bench_sigverify_shreds_sign_gpu(bencher: &mut Bencher) {
 fn bench_sigverify_shreds_sign_gpu(bencher: &mut Bencher) {
-    let recycler_cache = RecyclerCache::new("", "");
+    let recycler_cache = RecyclerCache::default();
 
 
     let mut packets = Packets::default();
     let mut packets = Packets::default();
     packets.packets.set_pinnable();
     packets.packets.set_pinnable();

+ 5 - 12
ledger/src/entry.rs

@@ -258,21 +258,12 @@ pub struct EntryVerificationState {
     device_verification_data: DeviceVerificationData,
     device_verification_data: DeviceVerificationData,
 }
 }
 
 
-#[derive(Clone)]
+#[derive(Default, Clone)]
 pub struct VerifyRecyclers {
 pub struct VerifyRecyclers {
     hash_recycler: Recycler<PinnedVec<Hash>>,
     hash_recycler: Recycler<PinnedVec<Hash>>,
     tick_count_recycler: Recycler<PinnedVec<u64>>,
     tick_count_recycler: Recycler<PinnedVec<u64>>,
 }
 }
 
 
-impl Default for VerifyRecyclers {
-    fn default() -> Self {
-        Self {
-            hash_recycler: Recycler::new_without_limit("hash_recycler_shrink_stats"),
-            tick_count_recycler: Recycler::new_without_limit("tick_count_recycler_shrink_stats"),
-        }
-    }
-}
-
 #[derive(PartialEq, Clone, Copy, Debug)]
 #[derive(PartialEq, Clone, Copy, Debug)]
 pub enum EntryVerificationStatus {
 pub enum EntryVerificationStatus {
     Failure,
     Failure,
@@ -590,12 +581,14 @@ impl EntrySlice for [Entry] {
             .take(self.len())
             .take(self.len())
             .collect();
             .collect();
 
 
-        let mut hashes_pinned = recyclers.hash_recycler.allocate().unwrap();
+        let mut hashes_pinned = recyclers.hash_recycler.allocate("poh_verify_hash");
         hashes_pinned.set_pinnable();
         hashes_pinned.set_pinnable();
         hashes_pinned.resize(hashes.len(), Hash::default());
         hashes_pinned.resize(hashes.len(), Hash::default());
         hashes_pinned.copy_from_slice(&hashes);
         hashes_pinned.copy_from_slice(&hashes);
 
 
-        let mut num_hashes_vec = recyclers.tick_count_recycler.allocate().unwrap();
+        let mut num_hashes_vec = recyclers
+            .tick_count_recycler
+            .allocate("poh_verify_num_hashes");
         num_hashes_vec.reserve_and_pin(cmp::max(1, self.len()));
         num_hashes_vec.reserve_and_pin(cmp::max(1, self.len()));
         for entry in self {
         for entry in self {
             num_hashes_vec.push(entry.num_hashes.saturating_sub(1));
             num_hashes_vec.push(entry.num_hashes.saturating_sub(1));

+ 12 - 12
ledger/src/sigverify_shreds.rs

@@ -137,7 +137,7 @@ fn slot_key_data_for_gpu<
                 .push(*slot);
                 .push(*slot);
         }
         }
     }
     }
-    let mut keyvec = recycler_cache.buffer().allocate().unwrap();
+    let mut keyvec = recycler_cache.buffer().allocate("shred_gpu_pubkeys");
     keyvec.set_pinnable();
     keyvec.set_pinnable();
     let mut slot_to_key_ix = HashMap::new();
     let mut slot_to_key_ix = HashMap::new();
 
 
@@ -152,7 +152,7 @@ fn slot_key_data_for_gpu<
             slot_to_key_ix.insert(s, i);
             slot_to_key_ix.insert(s, i);
         }
         }
     }
     }
-    let mut offsets = recycler_cache.offsets().allocate().unwrap();
+    let mut offsets = recycler_cache.offsets().allocate("shred_offsets");
     offsets.set_pinnable();
     offsets.set_pinnable();
     slots.iter().for_each(|packet_slots| {
     slots.iter().for_each(|packet_slots| {
         packet_slots.iter().for_each(|slot| {
         packet_slots.iter().for_each(|slot| {
@@ -185,11 +185,11 @@ fn shred_gpu_offsets(
     batches: &[Packets],
     batches: &[Packets],
     recycler_cache: &RecyclerCache,
     recycler_cache: &RecyclerCache,
 ) -> (TxOffset, TxOffset, TxOffset, Vec<Vec<u32>>) {
 ) -> (TxOffset, TxOffset, TxOffset, Vec<Vec<u32>>) {
-    let mut signature_offsets = recycler_cache.offsets().allocate().unwrap();
+    let mut signature_offsets = recycler_cache.offsets().allocate("shred_signatures");
     signature_offsets.set_pinnable();
     signature_offsets.set_pinnable();
-    let mut msg_start_offsets = recycler_cache.offsets().allocate().unwrap();
+    let mut msg_start_offsets = recycler_cache.offsets().allocate("shred_msg_starts");
     msg_start_offsets.set_pinnable();
     msg_start_offsets.set_pinnable();
-    let mut msg_sizes = recycler_cache.offsets().allocate().unwrap();
+    let mut msg_sizes = recycler_cache.offsets().allocate("shred_msg_sizes");
     msg_sizes.set_pinnable();
     msg_sizes.set_pinnable();
     let mut v_sig_lens = vec![];
     let mut v_sig_lens = vec![];
     for batch in batches.iter() {
     for batch in batches.iter() {
@@ -242,7 +242,7 @@ pub fn verify_shreds_gpu(
     trace!("pubkeys_len: {}", pubkeys_len);
     trace!("pubkeys_len: {}", pubkeys_len);
     let (signature_offsets, msg_start_offsets, msg_sizes, v_sig_lens) =
     let (signature_offsets, msg_start_offsets, msg_sizes, v_sig_lens) =
         shred_gpu_offsets(pubkeys_len, batches, recycler_cache);
         shred_gpu_offsets(pubkeys_len, batches, recycler_cache);
-    let mut out = recycler_cache.buffer().allocate().unwrap();
+    let mut out = recycler_cache.buffer().allocate("out_buffer");
     out.set_pinnable();
     out.set_pinnable();
     elems.push(
     elems.push(
         perf_libs::Elems {
         perf_libs::Elems {
@@ -332,7 +332,7 @@ pub fn sign_shreds_cpu(keypair: &Keypair, batches: &mut [Packets]) {
 }
 }
 
 
 pub fn sign_shreds_gpu_pinned_keypair(keypair: &Keypair, cache: &RecyclerCache) -> PinnedVec<u8> {
 pub fn sign_shreds_gpu_pinned_keypair(keypair: &Keypair, cache: &RecyclerCache) -> PinnedVec<u8> {
-    let mut vec = cache.buffer().allocate().unwrap();
+    let mut vec = cache.buffer().allocate("pinned_keypair");
     let pubkey = keypair.pubkey().to_bytes();
     let pubkey = keypair.pubkey().to_bytes();
     let secret = keypair.secret().to_bytes();
     let secret = keypair.secret().to_bytes();
     let mut hasher = Sha512::default();
     let mut hasher = Sha512::default();
@@ -370,17 +370,17 @@ pub fn sign_shreds_gpu(
     let mut num_packets = num_keypair_packets;
     let mut num_packets = num_keypair_packets;
 
 
     //should be zero
     //should be zero
-    let mut pubkey_offsets = recycler_cache.offsets().allocate().unwrap();
+    let mut pubkey_offsets = recycler_cache.offsets().allocate("pubkey offsets");
     pubkey_offsets.resize(count, 0);
     pubkey_offsets.resize(count, 0);
 
 
-    let mut secret_offsets = recycler_cache.offsets().allocate().unwrap();
+    let mut secret_offsets = recycler_cache.offsets().allocate("secret_offsets");
     secret_offsets.resize(count, pubkey_size as u32);
     secret_offsets.resize(count, pubkey_size as u32);
 
 
     trace!("offset: {}", offset);
     trace!("offset: {}", offset);
     let (signature_offsets, msg_start_offsets, msg_sizes, _v_sig_lens) =
     let (signature_offsets, msg_start_offsets, msg_sizes, _v_sig_lens) =
         shred_gpu_offsets(offset, batches, recycler_cache);
         shred_gpu_offsets(offset, batches, recycler_cache);
     let total_sigs = signature_offsets.len();
     let total_sigs = signature_offsets.len();
-    let mut signatures_out = recycler_cache.buffer().allocate().unwrap();
+    let mut signatures_out = recycler_cache.buffer().allocate("ed25519 signatures");
     signatures_out.set_pinnable();
     signatures_out.set_pinnable();
     signatures_out.resize(total_sigs * sig_size, 0);
     signatures_out.resize(total_sigs * sig_size, 0);
     elems.push(
     elems.push(
@@ -560,7 +560,7 @@ pub mod tests {
 
 
     fn run_test_sigverify_shreds_gpu(slot: Slot) {
     fn run_test_sigverify_shreds_gpu(slot: Slot) {
         solana_logger::setup();
         solana_logger::setup();
-        let recycler_cache = RecyclerCache::new("", "");
+        let recycler_cache = RecyclerCache::default();
 
 
         let mut batch = [Packets::default()];
         let mut batch = [Packets::default()];
         let mut shred = Shred::new_from_data(
         let mut shred = Shred::new_from_data(
@@ -624,7 +624,7 @@ pub mod tests {
 
 
     fn run_test_sigverify_shreds_sign_gpu(slot: Slot) {
     fn run_test_sigverify_shreds_sign_gpu(slot: Slot) {
         solana_logger::setup();
         solana_logger::setup();
-        let recycler_cache = RecyclerCache::new("", "");
+        let recycler_cache = RecyclerCache::default();
 
 
         let mut packets = Packets::default();
         let mut packets = Packets::default();
         let num_packets = 32;
         let num_packets = 32;

+ 0 - 1
perf/Cargo.toml

@@ -22,7 +22,6 @@ solana-sdk = { path = "../sdk", version = "=1.7.0" }
 solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "=1.7.0" }
 solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "=1.7.0" }
 solana-budget-program = { path = "../programs/budget", version = "=1.7.0" }
 solana-budget-program = { path = "../programs/budget", version = "=1.7.0" }
 solana-logger = { path = "../logger", version = "=1.7.0" }
 solana-logger = { path = "../logger", version = "=1.7.0" }
-solana-measure = { path = "../measure", version = "=1.7.0" }
 solana-metrics = { path = "../metrics", version = "=1.7.0" }
 solana-metrics = { path = "../metrics", version = "=1.7.0" }
 curve25519-dalek = { version = "2" }
 curve25519-dalek = { version = "2" }
 
 

+ 3 - 3
perf/benches/recycler.rs

@@ -10,13 +10,13 @@ use test::Bencher;
 fn bench_recycler(bencher: &mut Bencher) {
 fn bench_recycler(bencher: &mut Bencher) {
     solana_logger::setup();
     solana_logger::setup();
 
 
-    let recycler: PacketsRecycler = Recycler::new_without_limit("me");
+    let recycler: PacketsRecycler = Recycler::default();
 
 
     for _ in 0..1000 {
     for _ in 0..1000 {
-        recycler.recycle_for_test(recycler.allocate().expect("There is no limit"));
+        let _packet = recycler.allocate("");
     }
     }
 
 
     bencher.iter(move || {
     bencher.iter(move || {
-        recycler.recycle_for_test(recycler.allocate().expect("There is no limit"));
+        let _packet = recycler.allocate("");
     });
     });
 }
 }

+ 3 - 3
perf/benches/sigverify.rs

@@ -15,8 +15,8 @@ fn bench_sigverify(bencher: &mut Bencher) {
     // generate packet vector
     // generate packet vector
     let mut batches = to_packets_chunked(&std::iter::repeat(tx).take(128).collect::<Vec<_>>(), 128);
     let mut batches = to_packets_chunked(&std::iter::repeat(tx).take(128).collect::<Vec<_>>(), 128);
 
 
-    let recycler = Recycler::new_without_limit("");
-    let recycler_out = Recycler::new_without_limit("");
+    let recycler = Recycler::default();
+    let recycler_out = Recycler::default();
     // verify packets
     // verify packets
     bencher.iter(|| {
     bencher.iter(|| {
         let _ans = sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out);
         let _ans = sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out);
@@ -30,7 +30,7 @@ fn bench_get_offsets(bencher: &mut Bencher) {
     // generate packet vector
     // generate packet vector
     let batches = to_packets_chunked(&std::iter::repeat(tx).take(1024).collect::<Vec<_>>(), 1024);
     let batches = to_packets_chunked(&std::iter::repeat(tx).take(1024).collect::<Vec<_>>(), 1024);
 
 
-    let recycler = Recycler::new_without_limit("");
+    let recycler = Recycler::default();
     // verify packets
     // verify packets
     bencher.iter(|| {
     bencher.iter(|| {
         let _ans = sigverify::generate_offsets(&batches, &recycler);
         let _ans = sigverify::generate_offsets(&batches, &recycler);

+ 0 - 3
perf/src/cuda_runtime.rs

@@ -70,9 +70,6 @@ impl<T: Default + Clone + Sized> Reset for PinnedVec<T> {
     fn set_recycler(&mut self, recycler: Weak<RecyclerX<Self>>) {
     fn set_recycler(&mut self, recycler: Weak<RecyclerX<Self>>) {
         self.recycler = recycler;
         self.recycler = recycler;
     }
     }
-    fn unset_recycler(&mut self) {
-        self.recycler = Weak::default();
-    }
 }
 }
 
 
 impl<T: Clone + Default + Sized> From<PinnedVec<T>> for Vec<T> {
 impl<T: Clone + Default + Sized> From<PinnedVec<T>> for Vec<T> {

+ 16 - 14
perf/src/packet.rs

@@ -28,21 +28,19 @@ impl Packets {
         Packets { packets }
         Packets { packets }
     }
     }
 
 
-    pub fn new_with_recycler(recycler: PacketsRecycler, size: usize) -> Option<Self> {
-        let maybe_packets = recycler.allocate();
-        maybe_packets.map(|mut packets| {
-            packets.reserve_and_pin(size);
-            Packets { packets }
-        })
+    pub fn new_with_recycler(recycler: PacketsRecycler, size: usize, name: &'static str) -> Self {
+        let mut packets = recycler.allocate(name);
+        packets.reserve_and_pin(size);
+        Packets { packets }
     }
     }
     pub fn new_with_recycler_data(
     pub fn new_with_recycler_data(
         recycler: &PacketsRecycler,
         recycler: &PacketsRecycler,
+        name: &'static str,
         mut packets: Vec<Packet>,
         mut packets: Vec<Packet>,
-    ) -> Option<Self> {
-        Self::new_with_recycler(recycler.clone(), packets.len()).map(|mut vec| {
-            vec.packets.append(&mut packets);
-            vec
-        })
+    ) -> Self {
+        let mut vec = Self::new_with_recycler(recycler.clone(), packets.len(), name);
+        vec.packets.append(&mut packets);
+        vec
     }
     }
 
 
     pub fn set_addr(&mut self, addr: &SocketAddr) {
     pub fn set_addr(&mut self, addr: &SocketAddr) {
@@ -78,7 +76,11 @@ pub fn to_packets_with_destination<T: Serialize>(
     recycler: PacketsRecycler,
     recycler: PacketsRecycler,
     dests_and_data: &[(SocketAddr, T)],
     dests_and_data: &[(SocketAddr, T)],
 ) -> Packets {
 ) -> Packets {
-    let mut out = Packets::new_with_recycler(recycler, dests_and_data.len()).unwrap();
+    let mut out = Packets::new_with_recycler(
+        recycler,
+        dests_and_data.len(),
+        "to_packets_with_destination",
+    );
     out.packets.resize(dests_and_data.len(), Packet::default());
     out.packets.resize(dests_and_data.len(), Packet::default());
     for (dest_and_data, o) in dests_and_data.iter().zip(out.packets.iter_mut()) {
     for (dest_and_data, o) in dests_and_data.iter().zip(out.packets.iter_mut()) {
         if !dest_and_data.0.ip().is_unspecified() && dest_and_data.0.port() != 0 {
         if !dest_and_data.0.ip().is_unspecified() && dest_and_data.0.port() != 0 {
@@ -136,9 +138,9 @@ mod tests {
 
 
     #[test]
     #[test]
     fn test_to_packets_pinning() {
     fn test_to_packets_pinning() {
-        let recycler = PacketsRecycler::new_without_limit("");
+        let recycler = PacketsRecycler::default();
         for i in 0..2 {
         for i in 0..2 {
-            let _first_packets = Packets::new_with_recycler(recycler.clone(), i + 1);
+            let _first_packets = Packets::new_with_recycler(recycler.clone(), i + 1, "first one");
         }
         }
     }
     }
 }
 }

+ 42 - 378
perf/src/recycler.rs

@@ -1,24 +1,7 @@
 use rand::{thread_rng, Rng};
 use rand::{thread_rng, Rng};
-use solana_measure::measure::Measure;
-use std::{
-    convert::TryFrom,
-    sync::{
-        atomic::{AtomicBool, AtomicUsize, Ordering},
-        Arc, Mutex, Weak,
-    },
-    time::Instant,
-};
-
-pub const DEFAULT_MINIMUM_OBJECT_COUNT: u32 = 1000;
-pub const DEFAULT_SHRINK_PCT: u32 = 80;
-pub const DEFAULT_MAX_ABOVE_SHRINK_PCT_COUNT: u32 = 10;
-pub const DEFAULT_CHECK_SHRINK_INTERVAL_MS: u32 = 10000;
-
-enum AllocationDecision<T> {
-    Reuse(T),
-    Allocate(u32, usize),
-    AllocationLimitReached,
-}
+use std::sync::atomic::AtomicBool;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::{Arc, Mutex, Weak};
 
 
 #[derive(Debug, Default)]
 #[derive(Debug, Default)]
 struct RecyclerStats {
 struct RecyclerStats {
@@ -28,226 +11,36 @@ struct RecyclerStats {
     max_gc: AtomicUsize,
     max_gc: AtomicUsize,
 }
 }
 
 
-#[derive(Debug, Default)]
-struct RecyclerShrinkStats {
-    resulting_size: u32,
-    target_size: u32,
-    ideal_num_to_remove: u32,
-    shrink_elapsed: u64,
-    drop_elapsed: u64,
-}
-
-impl RecyclerShrinkStats {
-    fn report(&self, shrink_metric_name: &'static str) {
-        datapoint_info!(
-            shrink_metric_name,
-            ("target_size", self.target_size as i64, i64),
-            ("resulting_size", self.resulting_size as i64, i64),
-            ("ideal_num_to_remove", self.ideal_num_to_remove as i64, i64),
-            ("recycler_shrink_elapsed", self.shrink_elapsed as i64, i64),
-            ("drop_elapsed", self.drop_elapsed as i64, i64)
-        );
-    }
-}
-
-#[derive(Clone)]
-pub struct Recycler<T: Reset> {
+#[derive(Clone, Default)]
+pub struct Recycler<T> {
     recycler: Arc<RecyclerX<T>>,
     recycler: Arc<RecyclerX<T>>,
-    shrink_metric_name: &'static str,
-}
-
-impl<T: Default + Reset> Recycler<T> {
-    pub fn new_without_limit(shrink_metric_name: &'static str) -> Self {
-        Self {
-            recycler: Arc::new(RecyclerX::default()),
-            shrink_metric_name,
-        }
-    }
-
-    pub fn new_with_limit(shrink_metric_name: &'static str, limit: u32) -> Self {
-        Self {
-            recycler: Arc::new(RecyclerX::new(Some(limit))),
-            shrink_metric_name,
-        }
-    }
-}
-
-#[derive(Debug)]
-pub struct ObjectPool<T: Reset> {
-    objects: Vec<T>,
-    shrink_pct: u32,
-    minimum_object_count: u32,
-    above_shrink_pct_count: u32,
-    max_above_shrink_pct_count: u32,
-    check_shrink_interval_ms: u32,
-    last_shrink_check_time: Instant,
-    pub total_allocated_count: u32,
-    limit: Option<u32>,
-}
-impl<T: Default + Reset> Default for ObjectPool<T> {
-    fn default() -> Self {
-        ObjectPool {
-            objects: vec![],
-            shrink_pct: DEFAULT_SHRINK_PCT,
-            minimum_object_count: DEFAULT_MINIMUM_OBJECT_COUNT,
-            above_shrink_pct_count: 0,
-            max_above_shrink_pct_count: DEFAULT_MAX_ABOVE_SHRINK_PCT_COUNT,
-            check_shrink_interval_ms: DEFAULT_CHECK_SHRINK_INTERVAL_MS,
-            last_shrink_check_time: Instant::now(),
-            total_allocated_count: 0,
-            limit: None,
-        }
-    }
-}
-
-impl<T: Default + Reset> ObjectPool<T> {
-    fn new(limit: Option<u32>) -> Self {
-        Self {
-            limit,
-            ..Self::default()
-        }
-    }
-
-    fn len(&self) -> usize {
-        self.objects.len()
-    }
-
-    fn get_shrink_target(shrink_pct: u32, current_size: u32) -> u32 {
-        let shrink_pct = u64::from(shrink_pct);
-        let current_size = u64::from(current_size);
-        let shrink_target = shrink_pct
-            .saturating_mul(current_size)
-            .saturating_add(99)
-            .checked_div(100)
-            .unwrap_or(0);
-        u32::try_from(shrink_target).unwrap_or(u32::MAX)
-    }
-
-    fn shrink_if_necessary(
-        &mut self,
-        recycler_name: &'static str,
-    ) -> Option<(RecyclerShrinkStats, Vec<T>)> {
-        let is_consistent = self.total_allocated_count as usize >= self.len();
-        assert!(
-            is_consistent,
-            "Object pool inconsistent: {} {} {}",
-            self.total_allocated_count,
-            self.len(),
-            recycler_name
-        );
-        if self.last_shrink_check_time.elapsed().as_millis() > self.check_shrink_interval_ms as u128
-        {
-            self.last_shrink_check_time = Instant::now();
-            let shrink_threshold_count =
-                Self::get_shrink_target(self.shrink_pct, self.total_allocated_count);
-
-            // If more than the shrink threshold of all allocated objects are sitting doing nothing,
-            // increment the `above_shrink_pct_count`.
-            if self.len() > self.minimum_object_count as usize
-                && self.len() > shrink_threshold_count as usize
-            {
-                self.above_shrink_pct_count = self.above_shrink_pct_count.saturating_add(1);
-            } else {
-                self.above_shrink_pct_count = 0;
-            }
-
-            if self.above_shrink_pct_count as usize >= self.max_above_shrink_pct_count as usize {
-                let mut recycler_shrink_elapsed = Measure::start("recycler_shrink");
-                // Do the shrink
-                let target_size = std::cmp::max(self.minimum_object_count, shrink_threshold_count);
-                let ideal_num_to_remove = self.total_allocated_count.saturating_sub(target_size);
-                let mut shrink_removed_objects = Vec::with_capacity(ideal_num_to_remove as usize);
-                for _ in 0..ideal_num_to_remove {
-                    if let Some(mut expired_object) = self.objects.pop() {
-                        expired_object.unset_recycler();
-                        // Drop these outside of the lock because the Drop() implmentation for
-                        // certain objects like PinnedVec's can be expensive
-                        shrink_removed_objects.push(expired_object);
-                        // May not be able to shrink exactly `ideal_num_to_remove` objects since
-                        // in the case of new allocations, `total_allocated_count` is incremented
-                        // before the object is allocated (see `should_allocate_new` logic below).
-                        // This race allows a difference of up to the number of threads allocating
-                        // with this recycler.
-                        self.total_allocated_count = self.total_allocated_count.saturating_sub(1);
-                    } else {
-                        break;
-                    }
-                }
-                recycler_shrink_elapsed.stop();
-                self.above_shrink_pct_count = 0;
-                Some((
-                    RecyclerShrinkStats {
-                        resulting_size: self.total_allocated_count,
-                        target_size,
-                        ideal_num_to_remove,
-                        shrink_elapsed: recycler_shrink_elapsed.as_us(),
-                        // Filled in later
-                        drop_elapsed: 0,
-                    },
-                    shrink_removed_objects,
-                ))
-            } else {
-                None
-            }
-        } else {
-            None
-        }
-    }
-
-    fn make_allocation_decision(&mut self) -> AllocationDecision<T> {
-        if let Some(reused_object) = self.objects.pop() {
-            AllocationDecision::Reuse(reused_object)
-        } else if let Some(limit) = self.limit {
-            if self.total_allocated_count < limit {
-                self.total_allocated_count = self.total_allocated_count.saturating_add(1);
-                AllocationDecision::Allocate(self.total_allocated_count, self.len())
-            } else {
-                AllocationDecision::AllocationLimitReached
-            }
-        } else {
-            self.total_allocated_count = self.total_allocated_count.saturating_add(1);
-            AllocationDecision::Allocate(self.total_allocated_count, self.len())
-        }
-    }
 }
 }
 
 
 #[derive(Debug)]
 #[derive(Debug)]
-pub struct RecyclerX<T: Reset> {
-    gc: Mutex<ObjectPool<T>>,
+pub struct RecyclerX<T> {
+    gc: Mutex<Vec<T>>,
     stats: RecyclerStats,
     stats: RecyclerStats,
     id: usize,
     id: usize,
 }
 }
 
 
-impl<T: Default + Reset> Default for RecyclerX<T> {
+impl<T: Default> Default for RecyclerX<T> {
     fn default() -> RecyclerX<T> {
     fn default() -> RecyclerX<T> {
         let id = thread_rng().gen_range(0, 1000);
         let id = thread_rng().gen_range(0, 1000);
         trace!("new recycler..{}", id);
         trace!("new recycler..{}", id);
         RecyclerX {
         RecyclerX {
-            gc: Mutex::new(ObjectPool::default()),
+            gc: Mutex::new(vec![]),
             stats: RecyclerStats::default(),
             stats: RecyclerStats::default(),
             id,
             id,
         }
         }
     }
     }
 }
 }
 
 
-impl<T: Default + Reset> RecyclerX<T> {
-    fn new(limit: Option<u32>) -> Self {
-        RecyclerX {
-            gc: Mutex::new(ObjectPool::new(limit)),
-            ..Self::default()
-        }
-    }
-}
-
 pub trait Reset {
 pub trait Reset {
     fn reset(&mut self);
     fn reset(&mut self);
     fn warm(&mut self, size_hint: usize);
     fn warm(&mut self, size_hint: usize);
     fn set_recycler(&mut self, recycler: Weak<RecyclerX<Self>>)
     fn set_recycler(&mut self, recycler: Weak<RecyclerX<Self>>)
     where
     where
         Self: std::marker::Sized;
         Self: std::marker::Sized;
-    fn unset_recycler(&mut self)
-    where
-        Self: std::marker::Sized;
 }
 }
 
 
 lazy_static! {
 lazy_static! {
@@ -263,21 +56,12 @@ fn warm_recyclers() -> bool {
 }
 }
 
 
 impl<T: Default + Reset + Sized> Recycler<T> {
 impl<T: Default + Reset + Sized> Recycler<T> {
-    pub fn warmed(
-        num: u32,
-        size_hint: usize,
-        limit: Option<u32>,
-        shrink_metric_name: &'static str,
-    ) -> Self {
-        assert!(num <= limit.unwrap_or(std::u32::MAX));
-        let new = Self {
-            recycler: Arc::new(RecyclerX::new(limit)),
-            shrink_metric_name,
-        };
+    pub fn warmed(num: usize, size_hint: usize) -> Self {
+        let new = Self::default();
         if warm_recyclers() {
         if warm_recyclers() {
             let warmed_items: Vec<_> = (0..num)
             let warmed_items: Vec<_> = (0..num)
                 .map(|_| {
                 .map(|_| {
-                    let mut item = new.allocate().unwrap();
+                    let mut item = new.allocate("warming");
                     item.warm(size_hint);
                     item.warm(size_hint);
                     item
                     item
                 })
                 })
@@ -289,55 +73,33 @@ impl<T: Default + Reset + Sized> Recycler<T> {
         new
         new
     }
     }
 
 
-    pub fn allocate(&self) -> Option<T> {
-        let (allocation_decision, shrink_output) = {
-            let mut object_pool = self
-                .recycler
-                .gc
-                .lock()
-                .expect("recycler lock in pb fn allocate");
-
-            let shrink_output = object_pool.shrink_if_necessary(self.shrink_metric_name);
-
-            // Grab the allocation decision and shrinking stats, do the expensive
-            // allocations/deallocations outside of the lock.
-            (object_pool.make_allocation_decision(), shrink_output)
-        };
-
-        if let Some((mut shrink_stats, shrink_removed_objects)) = shrink_output {
-            let mut shrink_removed_object_elapsed = Measure::start("shrink_removed_object_elapsed");
-            drop(shrink_removed_objects);
-            shrink_removed_object_elapsed.stop();
-            shrink_stats.drop_elapsed = shrink_removed_object_elapsed.as_us();
-            shrink_stats.report(self.shrink_metric_name);
-        }
-
-        match allocation_decision {
-            AllocationDecision::Reuse(mut reused_object) => {
-                self.recycler.stats.reuse.fetch_add(1, Ordering::Relaxed);
-                reused_object.reset();
-                Some(reused_object)
-            }
-            AllocationDecision::Allocate(total_allocated_count, recycled_len) => {
-                let mut t = T::default();
-                t.set_recycler(Arc::downgrade(&self.recycler));
-                if total_allocated_count % 1000 == 0 {
-                    datapoint_info!(
-                        "recycler_total_allocated_count",
-                        ("name", self.shrink_metric_name, String),
-                        ("count", total_allocated_count as i64, i64),
-                        ("recycled_len", recycled_len as i64, i64),
-                    )
-                }
-                Some(t)
-            }
-
-            AllocationDecision::AllocationLimitReached => None,
-        }
-    }
+    pub fn allocate(&self, name: &'static str) -> T {
+        let new = self
+            .recycler
+            .gc
+            .lock()
+            .expect("recycler lock in pb fn allocate")
+            .pop();
+
+        if let Some(mut x) = new {
+            self.recycler.stats.reuse.fetch_add(1, Ordering::Relaxed);
+            x.reset();
+            return x;
+        }
+
+        let total = self.recycler.stats.total.fetch_add(1, Ordering::Relaxed);
+        trace!(
+            "allocating new: total {} {:?} id: {} reuse: {} max_gc: {}",
+            total,
+            name,
+            self.recycler.id,
+            self.recycler.stats.reuse.load(Ordering::Relaxed),
+            self.recycler.stats.max_gc.load(Ordering::Relaxed),
+        );
 
 
-    pub fn recycle_for_test(&self, x: T) {
-        self.recycler.recycle(x);
+        let mut t = T::default();
+        t.set_recycler(Arc::downgrade(&self.recycler));
+        t
     }
     }
 }
 }
 
 
@@ -345,7 +107,7 @@ impl<T: Default + Reset> RecyclerX<T> {
     pub fn recycle(&self, x: T) {
     pub fn recycle(&self, x: T) {
         let len = {
         let len = {
             let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle");
             let mut gc = self.gc.lock().expect("recycler lock in pub fn recycle");
-            gc.objects.push(x);
+            gc.push(x);
             gc.len()
             gc.len()
         };
         };
 
 
@@ -375,8 +137,6 @@ impl<T: Default + Reset> RecyclerX<T> {
 #[cfg(test)]
 #[cfg(test)]
 mod tests {
 mod tests {
     use super::*;
     use super::*;
-    use crate::packet::PacketsRecycler;
-    use std::{thread::sleep, time::Duration};
 
 
     impl Reset for u64 {
     impl Reset for u64 {
         fn reset(&mut self) {
         fn reset(&mut self) {
@@ -384,115 +144,19 @@ mod tests {
         }
         }
         fn warm(&mut self, _size_hint: usize) {}
         fn warm(&mut self, _size_hint: usize) {}
         fn set_recycler(&mut self, _recycler: Weak<RecyclerX<Self>>) {}
         fn set_recycler(&mut self, _recycler: Weak<RecyclerX<Self>>) {}
-        fn unset_recycler(&mut self) {}
     }
     }
 
 
     #[test]
     #[test]
     fn test_recycler() {
     fn test_recycler() {
-        let recycler = Recycler::new_without_limit("");
-        let mut y: u64 = recycler.allocate().unwrap();
+        let recycler = Recycler::default();
+        let mut y: u64 = recycler.allocate("test_recycler1");
         assert_eq!(y, 0);
         assert_eq!(y, 0);
         y = 20;
         y = 20;
         let recycler2 = recycler.clone();
         let recycler2 = recycler.clone();
         recycler2.recycler.recycle(y);
         recycler2.recycler.recycle(y);
         assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 1);
         assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 1);
-        let z = recycler.allocate().unwrap();
+        let z = recycler.allocate("test_recycler2");
         assert_eq!(z, 10);
         assert_eq!(z, 10);
         assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 0);
         assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 0);
     }
     }
-
-    #[test]
-    fn test_recycler_limit() {
-        let limit = 10;
-        assert!(limit <= DEFAULT_MINIMUM_OBJECT_COUNT);
-        // Use PacketRecycler so that dropping the allocated object
-        // actually recycles
-        let recycler = PacketsRecycler::new_with_limit("", limit);
-        let mut allocated_items = vec![];
-        for i in 0..limit * 2 {
-            let x = recycler.allocate();
-            if i < limit {
-                allocated_items.push(x.unwrap());
-            } else {
-                assert!(x.is_none());
-            }
-        }
-        assert_eq!(
-            recycler.recycler.gc.lock().unwrap().total_allocated_count,
-            limit
-        );
-        assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 0_usize);
-        drop(allocated_items);
-        assert_eq!(
-            recycler.recycler.gc.lock().unwrap().total_allocated_count,
-            limit
-        );
-        assert_eq!(recycler.recycler.gc.lock().unwrap().len(), limit as usize);
-    }
-
-    #[test]
-    fn test_recycler_shrink() {
-        let limit = DEFAULT_MINIMUM_OBJECT_COUNT * 2;
-        let max_above_shrink_pct_count = 2;
-        let shrink_pct = 80;
-        let recycler = PacketsRecycler::new_with_limit("", limit);
-        {
-            let mut locked_recycler = recycler.recycler.gc.lock().unwrap();
-            // Make the shrink interval a long time so shrinking doesn't happen yet
-            locked_recycler.check_shrink_interval_ms = std::u32::MAX;
-            // Set the count to one so that we shrink on every other allocation later.
-            locked_recycler.max_above_shrink_pct_count = max_above_shrink_pct_count;
-            locked_recycler.shrink_pct = shrink_pct;
-        }
-
-        let mut allocated_items = vec![];
-        for _ in 0..limit {
-            allocated_items.push(recycler.allocate().unwrap());
-        }
-        assert_eq!(
-            recycler.recycler.gc.lock().unwrap().total_allocated_count,
-            limit
-        );
-        assert_eq!(recycler.recycler.gc.lock().unwrap().len(), 0);
-        drop(allocated_items);
-        assert_eq!(recycler.recycler.gc.lock().unwrap().len(), limit as usize);
-
-        let shrink_interval = 10;
-        {
-            let mut locked_recycler = recycler.recycler.gc.lock().unwrap();
-            locked_recycler.check_shrink_interval_ms = shrink_interval;
-        }
-
-        let mut current_total_allocated_count =
-            recycler.recycler.gc.lock().unwrap().total_allocated_count;
-
-        // Shrink the recycler until it hits the minimum
-        let mut i = 0;
-        while current_total_allocated_count != DEFAULT_MINIMUM_OBJECT_COUNT {
-            sleep(Duration::from_millis(shrink_interval as u64 * 2));
-            recycler.allocate().unwrap();
-            let expected_above_shrink_pct_count = (i + 1) % max_above_shrink_pct_count;
-            assert_eq!(
-                recycler.recycler.gc.lock().unwrap().above_shrink_pct_count,
-                (i + 1) % max_above_shrink_pct_count
-            );
-            if expected_above_shrink_pct_count == 0 {
-                // Shrink happened, update the expected `current_total_allocated_count`;
-                current_total_allocated_count = std::cmp::max(
-                    ObjectPool::<u64>::get_shrink_target(shrink_pct, current_total_allocated_count),
-                    DEFAULT_MINIMUM_OBJECT_COUNT,
-                );
-                assert_eq!(
-                    recycler.recycler.gc.lock().unwrap().total_allocated_count,
-                    current_total_allocated_count
-                );
-                assert_eq!(
-                    recycler.recycler.gc.lock().unwrap().len(),
-                    current_total_allocated_count as usize
-                );
-            }
-
-            i += 1;
-        }
-    }
 }
 }

+ 4 - 11
perf/src/recycler_cache.rs

@@ -2,24 +2,17 @@ use crate::cuda_runtime::PinnedVec;
 use crate::recycler::Recycler;
 use crate::recycler::Recycler;
 use crate::sigverify::TxOffset;
 use crate::sigverify::TxOffset;
 
 
-#[derive(Clone)]
+#[derive(Default, Clone)]
 pub struct RecyclerCache {
 pub struct RecyclerCache {
     recycler_offsets: Recycler<TxOffset>,
     recycler_offsets: Recycler<TxOffset>,
     recycler_buffer: Recycler<PinnedVec<u8>>,
     recycler_buffer: Recycler<PinnedVec<u8>>,
 }
 }
 
 
 impl RecyclerCache {
 impl RecyclerCache {
-    pub fn new(offsets_shrink_name: &'static str, buffer_shrink_name: &'static str) -> Self {
+    pub fn warmed() -> Self {
         Self {
         Self {
-            recycler_offsets: Recycler::new_without_limit(offsets_shrink_name),
-            recycler_buffer: Recycler::new_without_limit(buffer_shrink_name),
-        }
-    }
-
-    pub fn warmed(offsets_shrink_name: &'static str, buffer_shrink_name: &'static str) -> Self {
-        Self {
-            recycler_offsets: Recycler::warmed(50, 4096, None, offsets_shrink_name),
-            recycler_buffer: Recycler::warmed(50, 4096, None, buffer_shrink_name),
+            recycler_offsets: Recycler::warmed(50, 4096),
+            recycler_buffer: Recycler::warmed(50, 4096),
         }
         }
     }
     }
     pub fn offsets(&self) -> &Recycler<TxOffset> {
     pub fn offsets(&self) -> &Recycler<TxOffset> {

+ 13 - 13
perf/src/sigverify.rs

@@ -247,13 +247,13 @@ fn get_packet_offsets(packet: &Packet, current_offset: usize) -> PacketOffsets {
 
 
 pub fn generate_offsets(batches: &[Packets], recycler: &Recycler<TxOffset>) -> TxOffsets {
 pub fn generate_offsets(batches: &[Packets], recycler: &Recycler<TxOffset>) -> TxOffsets {
     debug!("allocating..");
     debug!("allocating..");
-    let mut signature_offsets: PinnedVec<_> = recycler.allocate().unwrap();
+    let mut signature_offsets: PinnedVec<_> = recycler.allocate("sig_offsets");
     signature_offsets.set_pinnable();
     signature_offsets.set_pinnable();
-    let mut pubkey_offsets: PinnedVec<_> = recycler.allocate().unwrap();
+    let mut pubkey_offsets: PinnedVec<_> = recycler.allocate("pubkey_offsets");
     pubkey_offsets.set_pinnable();
     pubkey_offsets.set_pinnable();
-    let mut msg_start_offsets: PinnedVec<_> = recycler.allocate().unwrap();
+    let mut msg_start_offsets: PinnedVec<_> = recycler.allocate("msg_start_offsets");
     msg_start_offsets.set_pinnable();
     msg_start_offsets.set_pinnable();
-    let mut msg_sizes: PinnedVec<_> = recycler.allocate().unwrap();
+    let mut msg_sizes: PinnedVec<_> = recycler.allocate("msg_size_offsets");
     msg_sizes.set_pinnable();
     msg_sizes.set_pinnable();
     let mut current_offset: usize = 0;
     let mut current_offset: usize = 0;
     let mut v_sig_lens = Vec::new();
     let mut v_sig_lens = Vec::new();
@@ -405,7 +405,7 @@ pub fn ed25519_verify(
 
 
     debug!("CUDA ECDSA for {}", batch_size(batches));
     debug!("CUDA ECDSA for {}", batch_size(batches));
     debug!("allocating out..");
     debug!("allocating out..");
-    let mut out = recycler_out.allocate().unwrap();
+    let mut out = recycler_out.allocate("out_buffer");
     out.set_pinnable();
     out.set_pinnable();
     let mut elems = Vec::new();
     let mut elems = Vec::new();
     let mut rvs = Vec::new();
     let mut rvs = Vec::new();
@@ -748,8 +748,8 @@ mod tests {
 
 
         let mut batches = generate_packet_vec(&packet, n, 2);
         let mut batches = generate_packet_vec(&packet, n, 2);
 
 
-        let recycler = Recycler::new_without_limit("");
-        let recycler_out = Recycler::new_without_limit("");
+        let recycler = Recycler::default();
+        let recycler_out = Recycler::default();
         // verify packets
         // verify packets
         sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out);
         sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out);
 
 
@@ -770,8 +770,8 @@ mod tests {
 
 
         let mut batches = generate_packet_vec(&packet, 1, 1);
         let mut batches = generate_packet_vec(&packet, 1, 1);
 
 
-        let recycler = Recycler::new_without_limit("");
-        let recycler_out = Recycler::new_without_limit("");
+        let recycler = Recycler::default();
+        let recycler_out = Recycler::default();
         // verify packets
         // verify packets
         sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out);
         sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out);
         assert!(batches
         assert!(batches
@@ -810,8 +810,8 @@ mod tests {
 
 
         batches[0].packets.push(packet);
         batches[0].packets.push(packet);
 
 
-        let recycler = Recycler::new_without_limit("");
-        let recycler_out = Recycler::new_without_limit("");
+        let recycler = Recycler::default();
+        let recycler_out = Recycler::default();
         // verify packets
         // verify packets
         sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out);
         sigverify::ed25519_verify(&mut batches, &recycler, &recycler_out);
 
 
@@ -840,8 +840,8 @@ mod tests {
         let tx = test_multisig_tx();
         let tx = test_multisig_tx();
         let packet = sigverify::make_packet_from_transaction(tx);
         let packet = sigverify::make_packet_from_transaction(tx);
 
 
-        let recycler = Recycler::new_without_limit("");
-        let recycler_out = Recycler::new_without_limit("");
+        let recycler = Recycler::default();
+        let recycler_out = Recycler::default();
         for _ in 0..50 {
         for _ in 0..50 {
             let n = thread_rng().gen_range(1, 30);
             let n = thread_rng().gen_range(1, 30);
             let num_batches = thread_rng().gen_range(2, 30);
             let num_batches = thread_rng().gen_range(2, 30);

+ 3 - 6
streamer/src/streamer.rs

@@ -42,10 +42,7 @@ fn recv_loop(
     let mut now = Instant::now();
     let mut now = Instant::now();
     let mut num_max_received = 0; // Number of times maximum packets were received
     let mut num_max_received = 0; // Number of times maximum packets were received
     loop {
     loop {
-        let (mut msgs, should_send) =
-            Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH)
-                .map(|allocated| (allocated, true))
-                .unwrap_or((Packets::with_capacity(PACKETS_PER_BATCH), false));
+        let mut msgs = Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH, name);
         loop {
         loop {
             // Check for exit signal, even if socket is busy
             // Check for exit signal, even if socket is busy
             // (for instance the leader transaction socket)
             // (for instance the leader transaction socket)
@@ -58,7 +55,7 @@ fn recv_loop(
                 }
                 }
                 recv_count += len;
                 recv_count += len;
                 call_count += 1;
                 call_count += 1;
-                if len > 0 && should_send {
+                if len > 0 {
                     channel.send(msgs)?;
                     channel.send(msgs)?;
                 }
                 }
                 break;
                 break;
@@ -211,7 +208,7 @@ mod test {
             Arc::new(read),
             Arc::new(read),
             &exit,
             &exit,
             s_reader,
             s_reader,
-            Recycler::new_without_limit(""),
+            Recycler::default(),
             "test",
             "test",
             1,
             1,
         );
         );