Преглед на файлове

Avoid large block for unified scheduler bp testing (#7223)

Ryo Onodera преди 3 месеца
родител
ревизия
cfdfc391d0

+ 1 - 0
Cargo.lock

@@ -11651,6 +11651,7 @@ dependencies = [
  "solana-keypair",
  "solana-keypair",
  "solana-ledger",
  "solana-ledger",
  "solana-logger",
  "solana-logger",
+ "solana-metrics",
  "solana-poh",
  "solana-poh",
  "solana-pubkey",
  "solana-pubkey",
  "solana-runtime",
  "solana-runtime",

+ 4 - 3
core/src/banking_stage/packet_deserializer.rs

@@ -145,13 +145,14 @@ impl PacketDeserializer {
         Ok((num_packets_received, messages))
         Ok((num_packets_received, messages))
     }
     }
 
 
-    pub(crate) fn deserialize_packets_with_indexes(
+    pub(crate) fn deserialize_packets_for_unified_scheduler(
         packet_batch: &PacketBatch,
         packet_batch: &PacketBatch,
-    ) -> impl Iterator<Item = (ImmutableDeserializedPacket, usize)> + '_ {
+    ) -> impl Iterator<Item = (ImmutableDeserializedPacket, usize, usize)> + '_ {
         packet_batch.iter().enumerate().filter_map(|(index, pkt)| {
         packet_batch.iter().enumerate().filter_map(|(index, pkt)| {
             if !pkt.meta().discard() {
             if !pkt.meta().discard() {
+                let pkt_size = pkt.meta().size;
                 let pkt = ImmutableDeserializedPacket::new(pkt).ok()?;
                 let pkt = ImmutableDeserializedPacket::new(pkt).ok()?;
-                Some((pkt, index))
+                Some((pkt, index, pkt_size))
             } else {
             } else {
                 None
                 None
             }
             }

+ 3 - 3
core/src/banking_stage/unified_scheduler.rs

@@ -68,9 +68,9 @@ pub(crate) fn ensure_banking_stage_setup(
             for batch in batches.iter() {
             for batch in batches.iter() {
                 // over-provision nevertheless some of packets could be invalid.
                 // over-provision nevertheless some of packets could be invalid.
                 let task_id_base = helper.generate_task_ids(batch.len());
                 let task_id_base = helper.generate_task_ids(batch.len());
-                let packets = PacketDeserializer::deserialize_packets_with_indexes(batch);
+                let packets = PacketDeserializer::deserialize_packets_for_unified_scheduler(batch);
 
 
-                for (packet, packet_index) in packets {
+                for (packet, packet_index, packet_size) in packets {
                     let Some((transaction, _deactivation_slot)) = packet
                     let Some((transaction, _deactivation_slot)) = packet
                         .build_sanitized_transaction(
                         .build_sanitized_transaction(
                             bank.vote_only_bank(),
                             bank.vote_only_bank(),
@@ -83,7 +83,7 @@ pub(crate) fn ensure_banking_stage_setup(
 
 
                     let index = task_id_base + packet_index;
                     let index = task_id_base + packet_index;
 
 
-                    let task = helper.create_new_task(transaction, index);
+                    let task = helper.create_new_task(transaction, index, packet_size);
                     helper.send_new_task(task);
                     helper.send_new_task(task);
                 }
                 }
             }
             }

+ 1 - 0
programs/sbf/Cargo.lock

@@ -9715,6 +9715,7 @@ dependencies = [
  "solana-clock",
  "solana-clock",
  "solana-cost-model",
  "solana-cost-model",
  "solana-ledger",
  "solana-ledger",
+ "solana-metrics",
  "solana-poh",
  "solana-poh",
  "solana-pubkey",
  "solana-pubkey",
  "solana-runtime",
  "solana-runtime",

+ 1 - 0
svm/examples/Cargo.lock

@@ -8802,6 +8802,7 @@ dependencies = [
  "solana-clock",
  "solana-clock",
  "solana-cost-model",
  "solana-cost-model",
  "solana-ledger",
  "solana-ledger",
+ "solana-metrics",
  "solana-poh",
  "solana-poh",
  "solana-pubkey",
  "solana-pubkey",
  "solana-runtime",
  "solana-runtime",

+ 32 - 0
unified-scheduler-logic/src/lib.rs

@@ -422,6 +422,9 @@ const_assert_eq!(mem::size_of::<LockResult>(), 1);
 pub type Task = Arc<TaskInner>;
 pub type Task = Arc<TaskInner>;
 const_assert_eq!(mem::size_of::<Task>(), 8);
 const_assert_eq!(mem::size_of::<Task>(), 8);
 
 
+pub type BlockSize = usize;
+pub const NO_CONSUMED_BLOCK_SIZE: BlockSize = 0;
+
 /// [`Token`] for [`UsageQueue`].
 /// [`Token`] for [`UsageQueue`].
 type UsageQueueToken = Token<UsageQueueInner>;
 type UsageQueueToken = Token<UsageQueueInner>;
 const_assert_eq!(mem::size_of::<UsageQueueToken>(), 0);
 const_assert_eq!(mem::size_of::<UsageQueueToken>(), 0);
@@ -440,6 +443,7 @@ pub struct TaskInner {
     index: usize,
     index: usize,
     lock_contexts: Vec<LockContext>,
     lock_contexts: Vec<LockContext>,
     blocked_usage_count: TokenCell<ShortCounter>,
     blocked_usage_count: TokenCell<ShortCounter>,
+    consumed_block_size: BlockSize,
 }
 }
 
 
 impl TaskInner {
 impl TaskInner {
@@ -447,6 +451,10 @@ impl TaskInner {
         self.index
         self.index
     }
     }
 
 
+    pub fn consumed_block_size(&self) -> BlockSize {
+        self.consumed_block_size
+    }
+
     pub fn transaction(&self) -> &RuntimeTransaction<SanitizedTransaction> {
     pub fn transaction(&self) -> &RuntimeTransaction<SanitizedTransaction> {
         &self.transaction
         &self.transaction
     }
     }
@@ -870,6 +878,29 @@ impl SchedulingStateMachine {
         transaction: RuntimeTransaction<SanitizedTransaction>,
         transaction: RuntimeTransaction<SanitizedTransaction>,
         index: usize,
         index: usize,
         usage_queue_loader: &mut impl FnMut(Pubkey) -> UsageQueue,
         usage_queue_loader: &mut impl FnMut(Pubkey) -> UsageQueue,
+    ) -> Task {
+        Self::do_create_task(
+            transaction,
+            index,
+            NO_CONSUMED_BLOCK_SIZE,
+            usage_queue_loader,
+        )
+    }
+
+    pub fn create_block_production_task(
+        transaction: RuntimeTransaction<SanitizedTransaction>,
+        index: usize,
+        consumed_block_size: BlockSize,
+        usage_queue_loader: &mut impl FnMut(Pubkey) -> UsageQueue,
+    ) -> Task {
+        Self::do_create_task(transaction, index, consumed_block_size, usage_queue_loader)
+    }
+
+    fn do_create_task(
+        transaction: RuntimeTransaction<SanitizedTransaction>,
+        index: usize,
+        consumed_block_size: BlockSize,
+        usage_queue_loader: &mut impl FnMut(Pubkey) -> UsageQueue,
     ) -> Task {
     ) -> Task {
         // It's crucial for tasks to be validated with
         // It's crucial for tasks to be validated with
         // `account_locks::validate_account_locks()` prior to the creation.
         // `account_locks::validate_account_locks()` prior to the creation.
@@ -924,6 +955,7 @@ impl SchedulingStateMachine {
             index,
             index,
             lock_contexts,
             lock_contexts,
             blocked_usage_count: TokenCell::new(ShortCounter::zero()),
             blocked_usage_count: TokenCell::new(ShortCounter::zero()),
+            consumed_block_size,
         })
         })
     }
     }
 
 

+ 1 - 0
unified-scheduler-pool/Cargo.toml

@@ -27,6 +27,7 @@ scopeguard = { workspace = true }
 solana-clock = { workspace = true }
 solana-clock = { workspace = true }
 solana-cost-model = { workspace = true }
 solana-cost-model = { workspace = true }
 solana-ledger = { workspace = true }
 solana-ledger = { workspace = true }
+solana-metrics = { workspace = true }
 solana-poh = { workspace = true }
 solana-poh = { workspace = true }
 solana-pubkey = { workspace = true }
 solana-pubkey = { workspace = true }
 solana-runtime = { workspace = true }
 solana-runtime = { workspace = true }

+ 63 - 7
unified-scheduler-pool/src/lib.rs

@@ -30,6 +30,7 @@ use {
     solana_ledger::blockstore_processor::{
     solana_ledger::blockstore_processor::{
         execute_batch, TransactionBatchWithIndexes, TransactionStatusSender,
         execute_batch, TransactionBatchWithIndexes, TransactionStatusSender,
     },
     },
+    solana_metrics::datapoint_info,
     solana_poh::transaction_recorder::{RecordTransactionsSummary, TransactionRecorder},
     solana_poh::transaction_recorder::{RecordTransactionsSummary, TransactionRecorder},
     solana_pubkey::Pubkey,
     solana_pubkey::Pubkey,
     solana_runtime::{
     solana_runtime::{
@@ -48,6 +49,7 @@ use {
     solana_transaction::sanitized::SanitizedTransaction,
     solana_transaction::sanitized::SanitizedTransaction,
     solana_transaction_error::{TransactionError, TransactionResult as Result},
     solana_transaction_error::{TransactionError, TransactionResult as Result},
     solana_unified_scheduler_logic::{
     solana_unified_scheduler_logic::{
+        BlockSize,
         SchedulingMode::{self, BlockProduction, BlockVerification},
         SchedulingMode::{self, BlockProduction, BlockVerification},
         SchedulingStateMachine, Task, UsageQueue,
         SchedulingStateMachine, Task, UsageQueue,
     },
     },
@@ -69,6 +71,10 @@ use {
     vec_extract_if_polyfill::MakeExtractIf,
     vec_extract_if_polyfill::MakeExtractIf,
 };
 };
 
 
+// For now, cap bandwidth use to just half of 1 Gbps link, which should be pretty conservative
+// assumption these days...
+const MAX_BLOCK_SIZE_THRESHOLD: BlockSize = 20 * 1024 * 1024;
+
 mod sleepless_testing;
 mod sleepless_testing;
 use crate::sleepless_testing::BuilderTracked;
 use crate::sleepless_testing::BuilderTracked;
 
 
@@ -371,16 +377,21 @@ impl BankingStageHelper {
         &self,
         &self,
         transaction: RuntimeTransaction<SanitizedTransaction>,
         transaction: RuntimeTransaction<SanitizedTransaction>,
         index: usize,
         index: usize,
+        consumed_block_size: BlockSize,
     ) -> Task {
     ) -> Task {
-        SchedulingStateMachine::create_task(transaction, index, &mut |pubkey| {
-            self.usage_queue_loader.load(pubkey)
-        })
+        SchedulingStateMachine::create_block_production_task(
+            transaction,
+            index,
+            consumed_block_size,
+            &mut |pubkey| self.usage_queue_loader.load(pubkey),
+        )
     }
     }
 
 
     fn recreate_task(&self, executed_task: Box<ExecutedTask>) -> Task {
     fn recreate_task(&self, executed_task: Box<ExecutedTask>) -> Task {
         let new_index = self.generate_task_ids(1);
         let new_index = self.generate_task_ids(1);
+        let consumed_block_size = executed_task.consumed_block_size();
         let transaction = executed_task.into_transaction();
         let transaction = executed_task.into_transaction();
-        self.create_new_task(transaction, new_index)
+        self.create_new_task(transaction, new_index, consumed_block_size)
     }
     }
 
 
     pub fn send_new_task(&self, task: Task) {
     pub fn send_new_task(&self, task: Task) {
@@ -1146,6 +1157,10 @@ impl ExecutedTask {
         })
         })
     }
     }
 
 
+    fn consumed_block_size(&self) -> BlockSize {
+        self.task.consumed_block_size()
+    }
+
     fn into_transaction(self) -> RuntimeTransaction<SanitizedTransaction> {
     fn into_transaction(self) -> RuntimeTransaction<SanitizedTransaction> {
         self.task.into_transaction()
         self.task.into_transaction()
     }
     }
@@ -1707,6 +1722,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
         (result, timings): &mut ResultWithTimings,
         (result, timings): &mut ResultWithTimings,
         executed_task: Box<ExecutedTask>,
         executed_task: Box<ExecutedTask>,
         state_machine: &mut SchedulingStateMachine,
         state_machine: &mut SchedulingStateMachine,
+        block_size_estimate: &mut usize,
         session_ending: &mut bool,
         session_ending: &mut bool,
         handler_context: &HandlerContext,
         handler_context: &HandlerContext,
     ) -> bool {
     ) -> bool {
@@ -1720,6 +1736,9 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
             BlockVerification => match executed_task.result_with_timings.0 {
             BlockVerification => match executed_task.result_with_timings.0 {
                 Ok(()) => {
                 Ok(()) => {
                     // The most normal case
                     // The most normal case
+                    // This is only for block production.
+                    assert_eq!(executed_task.consumed_block_size(), 0);
+
                     false
                     false
                 }
                 }
                 // This should never be observed because the scheduler thread makes all running
                 // This should never be observed because the scheduler thread makes all running
@@ -1742,6 +1761,20 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
                 match executed_task.result_with_timings.0 {
                 match executed_task.result_with_timings.0 {
                     Ok(()) => {
                     Ok(()) => {
                         // The most normal case
                         // The most normal case
+                        *block_size_estimate = block_size_estimate
+                            .checked_add(executed_task.consumed_block_size())
+                            .unwrap();
+
+                        // Avoid too large blocks in byte wise, which could destabilize the
+                        // cluster.
+                        //
+                        // While this check is very light-weight, it isn't rigid nor deterministic.
+                        // That's why it's called an estimate-based _threshold_, not _limit_ to
+                        // indicate these implications. This lenient behavior is acceptable.
+                        if *block_size_estimate > MAX_BLOCK_SIZE_THRESHOLD {
+                            sleepless_testing::at(CheckPoint::SessionEnding);
+                            *session_ending = true;
+                        }
                     }
                     }
                     Err(TransactionError::CommitCancelled)
                     Err(TransactionError::CommitCancelled)
                     | Err(TransactionError::WouldExceedMaxBlockCostLimit)
                     | Err(TransactionError::WouldExceedMaxBlockCostLimit)
@@ -1817,6 +1850,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
     ) {
     ) {
         let scheduling_mode = context.mode();
         let scheduling_mode = context.mode();
         let mut current_slot = context.slot();
         let mut current_slot = context.slot();
+        let mut block_size_estimate = 0;
         let (mut is_finished, mut session_ending) = match scheduling_mode {
         let (mut is_finished, mut session_ending) = match scheduling_mode {
             BlockVerification => (false, false),
             BlockVerification => (false, false),
             BlockProduction => {
             BlockProduction => {
@@ -2028,6 +2062,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
                                     &mut result_with_timings,
                                     &mut result_with_timings,
                                     executed_task,
                                     executed_task,
                                     &mut state_machine,
                                     &mut state_machine,
+                                    &mut block_size_estimate,
                                     &mut session_ending,
                                     &mut session_ending,
                                     &handler_context,
                                     &handler_context,
                                 ) {
                                 ) {
@@ -2086,6 +2121,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
                                     &mut result_with_timings,
                                     &mut result_with_timings,
                                     executed_task,
                                     executed_task,
                                     &mut state_machine,
                                     &mut state_machine,
+                                    &mut block_size_estimate,
                                     &mut session_ending,
                                     &mut session_ending,
                                     &handler_context,
                                     &handler_context,
                                 ) {
                                 ) {
@@ -2108,6 +2144,14 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
                     session_result_sender
                     session_result_sender
                         .send(result_with_timings)
                         .send(result_with_timings)
                         .expect("always outlived receiver");
                         .expect("always outlived receiver");
+
+                    if matches!(scheduling_mode, BlockProduction) {
+                        datapoint_info!(
+                            "unified_scheduler-bp_session_stats",
+                            ("slot", current_slot.unwrap_or_default(), i64),
+                            ("block_size_estimate", block_size_estimate, i64),
+                        );
+                    }
                     if matches!(scheduling_mode, BlockVerification) {
                     if matches!(scheduling_mode, BlockVerification) {
                         state_machine.reinitialize();
                         state_machine.reinitialize();
                     }
                     }
@@ -2154,6 +2198,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
                                 assert_eq!(scheduling_mode, new_context.mode());
                                 assert_eq!(scheduling_mode, new_context.mode());
                                 assert!(!new_context.is_preallocated());
                                 assert!(!new_context.is_preallocated());
                                 current_slot = new_context.slot();
                                 current_slot = new_context.slot();
+                                block_size_estimate = 0;
                                 runnable_task_sender
                                 runnable_task_sender
                                     .send_chained_channel(
                                     .send_chained_channel(
                                         &new_context,
                                         &new_context,
@@ -2698,6 +2743,7 @@ mod tests {
         solana_timings::ExecuteTimingType,
         solana_timings::ExecuteTimingType,
         solana_transaction::sanitized::SanitizedTransaction,
         solana_transaction::sanitized::SanitizedTransaction,
         solana_transaction_error::TransactionError,
         solana_transaction_error::TransactionError,
+        solana_unified_scheduler_logic::NO_CONSUMED_BLOCK_SIZE,
         std::{
         std::{
             num::Saturating,
             num::Saturating,
             sync::{atomic::Ordering, Arc, RwLock},
             sync::{atomic::Ordering, Arc, RwLock},
@@ -4624,6 +4670,16 @@ mod tests {
         poh_service.join().unwrap();
         poh_service.join().unwrap();
     }
     }
 
 
+    impl BankingStageHelper {
+        fn create_new_unconstrained_task(
+            &self,
+            transaction: RuntimeTransaction<SanitizedTransaction>,
+            index: usize,
+        ) -> Task {
+            self.create_new_task(transaction, index, NO_CONSUMED_BLOCK_SIZE)
+        }
+    }
+
     #[test]
     #[test]
     fn test_block_production_scheduler_buffering_on_spawn() {
     fn test_block_production_scheduler_buffering_on_spawn() {
         solana_logger::setup();
         solana_logger::setup();
@@ -4672,7 +4728,7 @@ mod tests {
         ));
         ));
         let fixed_banking_packet_handler =
         let fixed_banking_packet_handler =
             Box::new(move |helper: &BankingStageHelper, _banking_packet| {
             Box::new(move |helper: &BankingStageHelper, _banking_packet| {
-                helper.send_new_task(helper.create_new_task(tx0.clone(), 17))
+                helper.send_new_task(helper.create_new_unconstrained_task(tx0.clone(), 17))
             });
             });
         pool.register_banking_stage(
         pool.register_banking_stage(
             None,
             None,
@@ -4739,7 +4795,7 @@ mod tests {
         ));
         ));
         let fixed_banking_packet_handler =
         let fixed_banking_packet_handler =
             Box::new(move |helper: &BankingStageHelper, _banking_packet| {
             Box::new(move |helper: &BankingStageHelper, _banking_packet| {
-                helper.send_new_task(helper.create_new_task(tx0.clone(), 18))
+                helper.send_new_task(helper.create_new_unconstrained_task(tx0.clone(), 18))
             });
             });
 
 
         let (banking_packet_sender, banking_packet_receiver) = crossbeam_channel::unbounded();
         let (banking_packet_sender, banking_packet_receiver) = crossbeam_channel::unbounded();
@@ -5097,7 +5153,7 @@ mod tests {
         let fixed_banking_packet_handler =
         let fixed_banking_packet_handler =
             Box::new(move |helper: &BankingStageHelper, _banking_packet| {
             Box::new(move |helper: &BankingStageHelper, _banking_packet| {
                 for index in 0..DISCARDED_TASK_COUNT {
                 for index in 0..DISCARDED_TASK_COUNT {
-                    helper.send_new_task(helper.create_new_task(tx0.clone(), index))
+                    helper.send_new_task(helper.create_new_unconstrained_task(tx0.clone(), index))
                 }
                 }
             });
             });