Sfoglia il codice sorgente

Validate and expire bp unified scheduler tasks (#8664)

* Validate and expire bp unified scheduler tasks

* Try hard to handle stale tasks instead of dropping

* Update dev-bins/Cargo.lock

* Rename

* Move MaxAge back to solana-core
Ryo Onodera 2 settimane fa
parent
commit
77b01e0470

+ 1 - 0
Cargo.lock

@@ -11640,6 +11640,7 @@ name = "solana-unified-scheduler-logic"
 version = "4.0.0-alpha.0"
 dependencies = [
  "assert_matches",
+ "solana-clock",
  "solana-instruction",
  "solana-message",
  "solana-pubkey",

+ 5 - 21
core/src/banking_stage/consumer.rs

@@ -175,27 +175,11 @@ impl Consumer {
         // This means that the transaction may cross and epoch boundary (not allowed),
         //  or account lookup tables may have been closed.
         let pre_results = txs.iter().zip(max_ages).map(|(tx, max_age)| {
-            // If the transaction was sanitized before this bank's epoch,
-            // additional checks are necessary.
-            if bank.epoch() != max_age.sanitized_epoch {
-                // Reserved key set may have changed, so we must verify that
-                // no writable keys are reserved.
-                bank.check_reserved_keys(tx)?;
-            }
-
-            if bank.slot() > max_age.alt_invalidation_slot {
-                // The address table lookup **may** have expired, but the
-                // expiration is not guaranteed since there may have been
-                // skipped slot.
-                // If the addresses still resolve here, then the transaction is still
-                // valid, and we can continue with processing.
-                // If they do not, then the ATL has expired and the transaction
-                // can be dropped.
-                let (_addresses, _deactivation_slot) =
-                    bank.load_addresses_from_ref(tx.message_address_table_lookups())?;
-            }
-
-            Ok(())
+            bank.resanitize_transaction_minimally(
+                tx,
+                max_age.sanitized_epoch,
+                max_age.alt_invalidation_slot,
+            )
         });
         self.process_and_record_transactions_with_pre_results(bank, txs, pre_results, flags)
     }

+ 18 - 3
core/src/banking_stage/unified_scheduler.rs

@@ -35,6 +35,8 @@ use {
     },
     crate::banking_trace::Channels,
     agave_banking_stage_ingress_types::BankingPacketBatch,
+    solana_accounts_db::account_locks::validate_account_locks,
+    solana_address_lookup_table_interface::state::estimate_last_valid_slot,
     solana_clock::Slot,
     solana_message::{v0::LoadedAddresses, SimpleAddressLoader},
     solana_poh::{poh_recorder::PohRecorder, transaction_recorder::TransactionRecorder},
@@ -42,7 +44,9 @@ use {
     solana_runtime_transaction::{
         runtime_transaction::RuntimeTransaction, transaction_meta::StaticMeta,
     },
-    solana_svm_transaction::message_address_table_lookup::SVMMessageAddressTableLookup,
+    solana_svm_transaction::{
+        message_address_table_lookup::SVMMessageAddressTableLookup, svm_message::SVMMessage,
+    },
     solana_transaction::{
         sanitized::{MessageHash, SanitizedTransaction},
         versioned::{sanitized::SanitizedVersionedTransaction, VersionedTransaction},
@@ -107,7 +111,7 @@ pub(crate) fn ensure_banking_stage_setup(
 
                     // WARN: Ignoring deactivation slot here can lead to the production of invalid
                     // blocks. Currently, this code is not used in prod.
-                    let (loaded_addresses, _deactivation_slot) =
+                    let (loaded_addresses, deactivation_slot) =
                         resolve_addresses_with_deactivation(&tx, &bank).ok()?;
                     let tx = RuntimeTransaction::<SanitizedTransaction>::try_from(
                         tx,
@@ -115,6 +119,11 @@ pub(crate) fn ensure_banking_stage_setup(
                         bank.get_reserved_account_keys(),
                     )
                     .ok()?;
+                    validate_account_locks(
+                        tx.account_keys(),
+                        bank.get_transaction_account_lock_limit(),
+                    )
+                    .ok()?;
 
                     // Determine priority.
                     let compute_budget_limits = tx
@@ -125,7 +134,13 @@ pub(crate) fn ensure_banking_stage_setup(
                         calculate_priority_and_cost(&tx, &compute_budget_limits.into(), &bank);
                     let task_id = BankingStageHelper::new_task_id(task_id_base + i, priority);
 
-                    Some(helper.create_new_task(tx, task_id, packet.meta().size))
+                    Some(helper.create_new_task(
+                        tx,
+                        task_id,
+                        packet.meta().size,
+                        bank.epoch(),
+                        estimate_last_valid_slot(bank.slot().min(deactivation_slot)),
+                    ))
                 });
 
                 for task in tasks {

+ 1 - 0
dev-bins/Cargo.lock

@@ -9611,6 +9611,7 @@ name = "solana-unified-scheduler-logic"
 version = "4.0.0-alpha.0"
 dependencies = [
  "assert_matches",
+ "solana-clock",
  "solana-pubkey",
  "solana-runtime-transaction",
  "solana-transaction",

+ 1 - 0
programs/sbf/Cargo.lock

@@ -10136,6 +10136,7 @@ name = "solana-unified-scheduler-logic"
 version = "4.0.0-alpha.0"
 dependencies = [
  "assert_matches",
+ "solana-clock",
  "solana-pubkey",
  "solana-runtime-transaction",
  "solana-transaction",

+ 29 - 0
runtime/src/bank.rs

@@ -3065,6 +3065,35 @@ impl Bank {
         self.prepare_sanitized_batch(slice::from_ref(transaction))
     }
 
+    pub fn resanitize_transaction_minimally(
+        &self,
+        transaction: &impl TransactionWithMeta,
+        sanitized_epoch: Epoch,
+        alt_invalidation_slot: Slot,
+    ) -> Result<()> {
+        // If the transaction was sanitized before this bank's epoch,
+        // additional checks are necessary.
+        if self.epoch() != sanitized_epoch {
+            // Reserved key set may have changed, so we must verify that
+            // no writable keys are reserved.
+            self.check_reserved_keys(transaction)?;
+        }
+
+        if self.slot() > alt_invalidation_slot {
+            // The address table lookup **may** have expired, but the
+            // expiration is not guaranteed since there may have been
+            // skipped slot.
+            // If the addresses still resolve here, then the transaction is still
+            // valid, and we can continue with processing.
+            // If they do not, then the ATL has expired and the transaction
+            // can be dropped.
+            let (_addresses, _deactivation_slot) =
+                self.load_addresses_from_ref(transaction.message_address_table_lookups())?;
+        }
+
+        Ok(())
+    }
+
     /// Run transactions against a frozen bank without committing the results
     pub fn simulate_transaction(
         &self,

+ 1 - 0
unified-scheduler-logic/Cargo.toml

@@ -14,6 +14,7 @@ agave-unstable-api = []
 
 [dependencies]
 assert_matches = { workspace = true }
+solana-clock = { workspace = true }
 solana-pubkey = { workspace = true }
 solana-runtime-transaction = { workspace = true }
 solana-transaction = { workspace = true }

+ 23 - 0
unified-scheduler-logic/src/lib.rs

@@ -107,6 +107,7 @@
 use {
     crate::utils::{ShortCounter, Token, TokenCell},
     assert_matches::assert_matches,
+    solana_clock::{Epoch, Slot},
     solana_pubkey::Pubkey,
     solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
     solana_transaction::sanitized::SanitizedTransaction,
@@ -451,6 +452,8 @@ const_assert_eq!(mem::size_of::<Task>(), 8);
 
 pub type BlockSize = usize;
 pub const NO_CONSUMED_BLOCK_SIZE: BlockSize = 0;
+pub const MAX_SANITIZED_EPOCH: Epoch = Epoch::MAX;
+pub const MAX_ALT_INVALIDATION_SLOT: Slot = Slot::MAX;
 
 /// [`Token`] for [`UsageQueue`].
 type UsageQueueToken = Token<UsageQueueInner>;
@@ -476,6 +479,8 @@ pub struct TaskInner {
     /// before running.
     blocked_usage_count: TokenCell<ShortCounter>,
     consumed_block_size: BlockSize,
+    sanitized_epoch: Epoch,
+    alt_invalidation_slot: Slot,
 }
 
 impl TaskInner {
@@ -495,6 +500,14 @@ impl TaskInner {
         self.consumed_block_size
     }
 
+    pub fn sanitized_epoch(&self) -> Epoch {
+        self.sanitized_epoch
+    }
+
+    pub fn alt_invalidation_slot(&self) -> Slot {
+        self.alt_invalidation_slot
+    }
+
     pub fn transaction(&self) -> &RuntimeTransaction<SanitizedTransaction> {
         &self.transaction
     }
@@ -1254,6 +1267,8 @@ impl SchedulingStateMachine {
             transaction,
             task_id,
             NO_CONSUMED_BLOCK_SIZE,
+            MAX_SANITIZED_EPOCH,
+            MAX_ALT_INVALIDATION_SLOT,
             usage_queue_loader,
         )
     }
@@ -1262,12 +1277,16 @@ impl SchedulingStateMachine {
         transaction: RuntimeTransaction<SanitizedTransaction>,
         task_id: OrderedTaskId,
         consumed_block_size: BlockSize,
+        sanitized_epoch: Epoch,
+        alt_invalidation_slot: Slot,
         usage_queue_loader: &mut impl FnMut(Pubkey) -> UsageQueue,
     ) -> Task {
         Self::do_create_task(
             transaction,
             task_id,
             consumed_block_size,
+            sanitized_epoch,
+            alt_invalidation_slot,
             usage_queue_loader,
         )
     }
@@ -1276,6 +1295,8 @@ impl SchedulingStateMachine {
         transaction: RuntimeTransaction<SanitizedTransaction>,
         task_id: OrderedTaskId,
         consumed_block_size: BlockSize,
+        sanitized_epoch: Epoch,
+        alt_invalidation_slot: Slot,
         usage_queue_loader: &mut impl FnMut(Pubkey) -> UsageQueue,
     ) -> Task {
         // It's crucial for tasks to be validated with
@@ -1332,6 +1353,8 @@ impl SchedulingStateMachine {
             lock_contexts,
             blocked_usage_count: TokenCell::new(ShortCounter::zero()),
             consumed_block_size,
+            sanitized_epoch,
+            alt_invalidation_slot,
         })
     }
 

+ 41 - 4
unified-scheduler-pool/src/lib.rs

@@ -34,7 +34,7 @@ use {
     dyn_clone::{clone_trait_object, DynClone},
     log::*,
     scopeguard::defer,
-    solana_clock::Slot,
+    solana_clock::{Epoch, Slot},
     solana_cost_model::cost_model::CostModel,
     solana_ledger::blockstore_processor::{
         execute_batch, TransactionBatchWithIndexes, TransactionStatusSender,
@@ -387,11 +387,15 @@ impl BankingStageHelper {
         transaction: RuntimeTransaction<SanitizedTransaction>,
         task_id: OrderedTaskId,
         consumed_block_size: BlockSize,
+        sanitized_epoch: Epoch,
+        alt_invalidation_slot: Slot,
     ) -> Task {
         SchedulingStateMachine::create_block_production_task(
             transaction,
             task_id,
             consumed_block_size,
+            sanitized_epoch,
+            alt_invalidation_slot,
             &mut |pubkey| self.usage_queue_loader.load(pubkey),
         )
     }
@@ -399,8 +403,16 @@ impl BankingStageHelper {
     fn recreate_task(&self, executed_task: Box<ExecutedTask>) -> Task {
         let new_task_id = self.regenerated_task_id(executed_task.task.task_id());
         let consumed_block_size = executed_task.consumed_block_size();
+        let sanitized_epoch = executed_task.sanitized_epoch();
+        let alt_invalidation_slot = executed_task.alt_invalidation_slot();
         let transaction = executed_task.into_transaction();
-        self.create_new_task(transaction, new_task_id, consumed_block_size)
+        self.create_new_task(
+            transaction,
+            new_task_id,
+            consumed_block_size,
+            sanitized_epoch,
+            alt_invalidation_slot,
+        )
     }
 
     pub fn send_new_task(&self, task: Task) {
@@ -1045,6 +1057,15 @@ impl TaskHandler for DefaultTaskHandler {
                 bank.prepare_unlocked_batch_from_single_tx(transaction)
             }
             BlockProduction => {
+                if let Err(error) = bank.resanitize_transaction_minimally(
+                    transaction,
+                    task.sanitized_epoch(),
+                    task.alt_invalidation_slot(),
+                ) {
+                    *result = Err(error);
+                    return;
+                }
+
                 // Due to the probable presence of an independent banking thread (like the jito
                 // thread), we are forced to lock the addresses unlike block verification. The
                 // scheduling thread isn't appropriate for these kinds of work; so, instead do that
@@ -1189,6 +1210,14 @@ impl ExecutedTask {
         self.task.consumed_block_size()
     }
 
+    fn sanitized_epoch(&self) -> Epoch {
+        self.task.sanitized_epoch()
+    }
+
+    fn alt_invalidation_slot(&self) -> Slot {
+        self.task.alt_invalidation_slot()
+    }
+
     fn into_transaction(self) -> RuntimeTransaction<SanitizedTransaction> {
         self.task.into_transaction()
     }
@@ -2775,7 +2804,9 @@ mod tests {
         solana_system_transaction as system_transaction,
         solana_transaction::sanitized::SanitizedTransaction,
         solana_transaction_error::TransactionError,
-        solana_unified_scheduler_logic::NO_CONSUMED_BLOCK_SIZE,
+        solana_unified_scheduler_logic::{
+            MAX_ALT_INVALIDATION_SLOT, MAX_SANITIZED_EPOCH, NO_CONSUMED_BLOCK_SIZE,
+        },
         std::{
             num::Saturating,
             sync::{Arc, RwLock},
@@ -4654,7 +4685,13 @@ mod tests {
             transaction: RuntimeTransaction<SanitizedTransaction>,
             task_id: OrderedTaskId,
         ) -> Task {
-            self.create_new_task(transaction, task_id, NO_CONSUMED_BLOCK_SIZE)
+            self.create_new_task(
+                transaction,
+                task_id,
+                NO_CONSUMED_BLOCK_SIZE,
+                MAX_SANITIZED_EPOCH,
+                MAX_ALT_INVALIDATION_SLOT,
+            )
         }
     }