瀏覽代碼

feat(scheduler-bindings): ExternalWorker::execute_batch (#8742)

* ExternalWorker::execute_batch

* test_reason_from_packet_handling_error

* test_consume_response_iterator

* consistent use of ok_or

* clean remote free lists for worker
Andrew Fitzgerald 3 周之前
父節點
當前提交
cba137ed44

+ 1 - 0
core/Cargo.toml

@@ -192,6 +192,7 @@ sysctl = { workspace = true }
 [dev-dependencies]
 agave-logger = { workspace = true }
 agave-reserved-account-keys = { workspace = true }
+agave-scheduler-bindings = { workspace = true, features = ["dev-context-only-utils"] }
 bencher = { workspace = true }
 criterion = { workspace = true }
 fs_extra = { workspace = true }

+ 1 - 0
core/src/banking_stage.rs

@@ -726,6 +726,7 @@ mod external {
                     ),
                     worker_to_pack,
                     allocator,
+                    context.poh_recorder.read().unwrap().shared_leader_state(),
                 );
 
                 worker_metrics.push(consume_worker.metrics_handle());

+ 406 - 9
core/src/banking_stage/consume_worker.rs

@@ -176,12 +176,27 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
 pub(crate) mod external {
     use {
         super::*,
+        crate::banking_stage::{
+            committer::CommitTransactionDetails,
+            scheduler_messages::MaxAge,
+            transaction_scheduler::receive_and_buffer::{
+                translate_to_runtime_view, PacketHandlingError,
+            },
+        },
         agave_scheduler_bindings::{
+            pack_message_flags,
+            worker_message_types::{not_included_reasons, ExecutionResponse},
             PackToWorkerMessage, TransactionResponseRegion, WorkerToPackMessage,
             MAX_TRANSACTIONS_PER_MESSAGE,
         },
-        agave_scheduling_utils::transaction_ptr::TransactionPtr,
+        agave_scheduling_utils::{
+            error::transaction_error_to_not_included_reason,
+            responses_region::execution_responses_from_iter,
+            transaction_ptr::{TransactionPtr, TransactionPtrBatch},
+        },
         agave_transaction_view::resolved_transaction_view::ResolvedTransactionView,
+        solana_cost_model::cost_model::CostModel,
+        solana_runtime::bank::Bank,
         solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
     };
 
@@ -189,6 +204,8 @@ pub(crate) mod external {
     pub enum ExternalConsumeWorkerError {
         #[error("Sender disconnected")]
         SenderDisconnected,
+        #[error("Allocation failed")]
+        AllocationFailure,
     }
 
     pub(crate) struct ExternalWorker {
@@ -198,6 +215,7 @@ pub(crate) mod external {
         sender: shaq::Producer<WorkerToPackMessage>,
         allocator: rts_alloc::Allocator,
 
+        shared_leader_state: SharedLeaderState,
         metrics: Arc<ConsumeWorkerMetrics>,
     }
 
@@ -211,6 +229,7 @@ pub(crate) mod external {
             consumer: Consumer,
             sender: shaq::Producer<WorkerToPackMessage>,
             allocator: rts_alloc::Allocator,
+            shared_leader_state: SharedLeaderState,
         ) -> Self {
             Self {
                 exit,
@@ -218,6 +237,7 @@ pub(crate) mod external {
                 consumer,
                 sender,
                 allocator,
+                shared_leader_state,
                 metrics: Arc::new(ConsumeWorkerMetrics::new(id)),
             }
         }
@@ -232,6 +252,7 @@ pub(crate) mod external {
             let mut sleep_duration = STARTING_SLEEP_DURATION;
 
             while !self.exit.load(Ordering::Relaxed) {
+                self.allocator.clean_remote_free_lists();
                 if self.receiver.is_empty() {
                     self.receiver.sync();
                 }
@@ -274,7 +295,170 @@ pub(crate) mod external {
                 .count_metrics
                 .num_messages_processed
                 .fetch_add(1, Ordering::Relaxed);
-            unimplemented!("No flags are currently valid");
+
+            match message.flags {
+                pack_message_flags::NONE => self.execute_batch(message),
+                _ => unreachable!("flags verified earlier"),
+            }
+        }
+
+        fn execute_batch(
+            &mut self,
+            message: &PackToWorkerMessage,
+        ) -> Result<(), ExternalConsumeWorkerError> {
+            // Loop here to avoid exposing internal error to external scheduler.
+            // In the vast majority of cases, this will iterate a single time;
+            // If we began execution when a slot was still in process, and could
+            // not record at the end because the slot has ended, we will retry
+            // on the next slot.
+            for _ in 0..1 {
+                let Some(leader_state) =
+                    active_leader_state_with_timeout(&self.shared_leader_state)
+                else {
+                    return self.return_not_included_with_reason(
+                        message,
+                        not_included_reasons::BANK_NOT_AVAILABLE,
+                    );
+                };
+
+                let bank = leader_state
+                    .working_bank()
+                    .expect("active_leader_state_with_timeout should only return an active bank");
+                if bank.slot() > message.max_execution_slot {
+                    return self.return_not_included_with_reason(
+                        message,
+                        not_included_reasons::SLOT_MISMATCH,
+                    );
+                }
+
+                // SAFETY: Assumption that external scheduler does not pass messages with batch regions
+                //         not pointing to valid regions in the allocator.
+                let batch = unsafe {
+                    TransactionPtrBatch::from_sharable_transaction_batch_region(
+                        &message.batch,
+                        &self.allocator,
+                    )
+                };
+                let (translation_results, transactions, max_ages) =
+                    Self::translate_transaction_batch(&batch, bank);
+
+                let output = self.consumer.process_and_record_aged_transactions(
+                    bank,
+                    &transactions,
+                    &max_ages,
+                );
+
+                self.metrics.update_for_consume(&output);
+                self.metrics.has_data.store(true, Ordering::Relaxed);
+
+                let Ok(commit_results) = output
+                    .execute_and_commit_transactions_output
+                    .commit_transactions_result
+                else {
+                    // If already ON the last possible execution slot,
+                    // immediately give up instead of trying on next slot.
+                    if bank.slot() == message.max_execution_slot {
+                        break;
+                    }
+                    continue; // recording failed, try again on next slot if possible.
+                };
+
+                let responses = execution_responses_from_iter(
+                    &self.allocator,
+                    Self::consume_response_iterator(
+                        &translation_results,
+                        &transactions,
+                        &commit_results,
+                        bank,
+                    ),
+                )
+                .ok_or(ExternalConsumeWorkerError::AllocationFailure)?;
+                let response = WorkerToPackMessage {
+                    batch: message.batch,
+                    processed: 1,
+                    responses,
+                };
+
+                let send_ptr = self
+                    .sender
+                    .reserve()
+                    .ok_or(ExternalConsumeWorkerError::SenderDisconnected)?;
+
+                // `reserve` returns valid aligned pointer
+                unsafe { send_ptr.write(response) };
+                return Ok(());
+            }
+
+            // If not successfully recorded even after second attempt, then we
+            // just return immediately as if a bank is not available.
+            self.return_not_included_with_reason(message, not_included_reasons::BANK_NOT_AVAILABLE)
+        }
+
+        fn consume_response_iterator<'a>(
+            translation_results: &'a [Result<(), PacketHandlingError>],
+            transactions: &'a [impl TransactionWithMeta],
+            commit_results: &'a [CommitTransactionDetails],
+            bank: &'a Bank,
+        ) -> impl ExactSizeIterator<Item = ExecutionResponse> + 'a {
+            assert_eq!(transactions.len(), commit_results.len());
+            let mut transactions_iterator = transactions.iter();
+            let mut commit_result_iterator = commit_results.iter();
+
+            translation_results
+                .iter()
+                .map(move |translation_result| match translation_result {
+                    Ok(()) => {
+                        let tx = transactions_iterator.next().expect(
+                            "transactions must contain element for each successfully translated \
+                             result",
+                        );
+                        let commit_details = commit_result_iterator.next().expect(
+                            "commit result iterator must contain element for each sent transaction",
+                        );
+                        Self::response_from_commit_details(tx, commit_details, bank)
+                    }
+                    Err(err) => ExecutionResponse {
+                        not_included_reason: Self::reason_from_packet_handling_error(err),
+                        cost_units: 0,
+                        fee_payer_balance: 0,
+                    },
+                })
+        }
+
+        /// Return all transactions in the batch as not included with the provided
+        /// reason.
+        fn return_not_included_with_reason(
+            &mut self,
+            message: &PackToWorkerMessage,
+            reason: u8,
+        ) -> Result<(), ExternalConsumeWorkerError> {
+            let response_region = execution_responses_from_iter(
+                &self.allocator,
+                (0..message.batch.num_transactions).map(|_| ExecutionResponse {
+                    not_included_reason: reason,
+                    cost_units: 0,
+                    fee_payer_balance: 0,
+                }),
+            )
+            .ok_or(ExternalConsumeWorkerError::AllocationFailure)?;
+
+            let response_message = WorkerToPackMessage {
+                batch: message.batch,
+                processed: 1,
+                responses: response_region,
+            };
+
+            // Should de-allocate the memory, but this is a non-recoverable
+            // error and so it's not needed.
+            let send_message = self
+                .sender
+                .reserve()
+                .ok_or(ExternalConsumeWorkerError::SenderDisconnected)?;
+
+            unsafe {
+                send_message.write(response_message);
+            }
+            Ok(())
         }
 
         fn return_invalid_message(
@@ -291,9 +475,10 @@ pub(crate) mod external {
                 },
             };
 
-            let Some(send_ptr) = self.sender.reserve() else {
-                return Err(ExternalConsumeWorkerError::SenderDisconnected);
-            };
+            let send_ptr = self
+                .sender
+                .reserve()
+                .ok_or(ExternalConsumeWorkerError::SenderDisconnected)?;
 
             // SAFETY: `reserve` guarantees a properly aligned space
             //         for a `WorkerToPackMessage`
@@ -302,6 +487,61 @@ pub(crate) mod external {
             Ok(())
         }
 
+        /// Translate batch of transactions into usable
+        fn translate_transaction_batch(
+            batch: &TransactionPtrBatch,
+            bank: &Bank,
+        ) -> (Vec<Result<(), PacketHandlingError>>, Vec<Tx>, Vec<MaxAge>) {
+            let enable_static_instruction_limit = bank
+                .feature_set
+                .is_active(&agave_feature_set::static_instruction_limit::ID);
+            let transaction_account_lock_limit = bank.get_transaction_account_lock_limit();
+
+            let mut translation_results = Vec::with_capacity(MAX_TRANSACTIONS_PER_MESSAGE);
+            let mut transactions = Vec::with_capacity(MAX_TRANSACTIONS_PER_MESSAGE);
+            let mut max_ages = Vec::with_capacity(MAX_TRANSACTIONS_PER_MESSAGE);
+            for transaction_ptr in batch.iter() {
+                match Self::translate_transaction(
+                    transaction_ptr,
+                    bank,
+                    enable_static_instruction_limit,
+                    transaction_account_lock_limit,
+                ) {
+                    Ok((tx, max_age)) => {
+                        transactions.push(tx);
+                        max_ages.push(max_age);
+                        translation_results.push(Ok(()));
+                    }
+                    Err(err) => translation_results.push(Err(err)),
+                }
+            }
+
+            (translation_results, transactions, max_ages)
+        }
+
+        fn translate_transaction(
+            transaction_ptr: TransactionPtr,
+            bank: &Bank,
+            enable_static_instruction_limit: bool,
+            transaction_account_lock_limit: usize,
+        ) -> Result<(Tx, MaxAge), PacketHandlingError> {
+            translate_to_runtime_view(
+                transaction_ptr,
+                bank,
+                enable_static_instruction_limit,
+                transaction_account_lock_limit,
+            )
+            .map(|(view, deactivation_slot)| {
+                (
+                    view,
+                    MaxAge {
+                        sanitized_epoch: bank.epoch(),
+                        alt_invalidation_slot: deactivation_slot,
+                    },
+                )
+            })
+        }
+
         /// Returns `true` if a message is valid and can be processed.
         fn validate_message(message: &PackToWorkerMessage) -> bool {
             message.batch.num_transactions > 0
@@ -309,14 +549,55 @@ pub(crate) mod external {
                 && Self::validate_message_flags(message.flags)
         }
 
-        fn validate_message_flags(_flags: u16) -> bool {
-            false // no flags are valid currently
+        fn validate_message_flags(flags: u16) -> bool {
+            flags == pack_message_flags::NONE
+        }
+
+        fn response_from_commit_details(
+            tx: &impl TransactionWithMeta,
+            commit_details: &CommitTransactionDetails,
+            bank: &Bank,
+        ) -> ExecutionResponse {
+            match commit_details {
+                CommitTransactionDetails::Committed {
+                    compute_units,
+                    loaded_accounts_data_size,
+                    fee_payer_post_balance,
+                    ..
+                } => ExecutionResponse {
+                    not_included_reason: not_included_reasons::NONE,
+                    cost_units: CostModel::calculate_cost_for_executed_transaction(
+                        tx,
+                        *compute_units,
+                        *loaded_accounts_data_size,
+                        &bank.feature_set,
+                    )
+                    .sum(),
+                    fee_payer_balance: *fee_payer_post_balance,
+                },
+                CommitTransactionDetails::NotCommitted(transaction_error) => ExecutionResponse {
+                    not_included_reason: transaction_error_to_not_included_reason(
+                        transaction_error,
+                    ),
+                    cost_units: 0,
+                    fee_payer_balance: 0,
+                },
+            }
+        }
+
+        fn reason_from_packet_handling_error(err: &PacketHandlingError) -> u8 {
+            match err {
+                PacketHandlingError::ALTResolution => {
+                    not_included_reasons::ADDRESS_LOOKUP_TABLE_NOT_FOUND
+                }
+                _ => not_included_reasons::SANITIZE_FAILURE,
+            }
         }
     }
 
     #[cfg(test)]
     mod tests {
-        use super::*;
+        use {super::*, solana_system_transaction::transfer, solana_transaction::TransactionError};
 
         #[test]
         fn test_validate_message() {
@@ -338,18 +619,134 @@ pub(crate) mod external {
 
             // Bad flags = invalid
             message.batch.num_transactions = 1;
+            message.flags = u16::MAX;
             assert!(!ExternalWorker::validate_message(&message));
+
+            message.flags = pack_message_flags::NONE;
+            assert!(ExternalWorker::validate_message(&message));
         }
 
         #[test]
         fn test_validate_message_flags() {
-            assert!(!ExternalWorker::validate_message_flags(
+            assert!(ExternalWorker::validate_message_flags(
                 agave_scheduler_bindings::pack_message_flags::NONE
             ));
             assert!(!ExternalWorker::validate_message_flags(
                 agave_scheduler_bindings::pack_message_flags::RESOLVE
             ));
         }
+
+        #[test]
+        fn test_consume_response_iterator() {
+            let simple_tx = bincode::serialize(&transfer(
+                &solana_keypair::Keypair::new(),
+                &solana_pubkey::Pubkey::new_unique(),
+                1,
+                solana_hash::Hash::default(),
+            ))
+            .unwrap();
+            let bank = Bank::default_for_tests();
+            let txs = (0..3)
+                .map(|_| {
+                    translate_to_runtime_view(
+                        &simple_tx[..],
+                        &bank,
+                        true,
+                        bank.get_transaction_account_lock_limit(),
+                    )
+                    .ok()
+                    .unwrap()
+                    .0
+                })
+                .collect::<Vec<_>>();
+
+            let responses = ExternalWorker::consume_response_iterator(
+                &[
+                    Err(PacketHandlingError::Sanitization),
+                    Ok(()),
+                    Ok(()),
+                    Ok(()),
+                ],
+                &txs,
+                &[
+                    CommitTransactionDetails::Committed {
+                        compute_units: 6,
+                        loaded_accounts_data_size: 1024,
+                        fee_payer_post_balance: 1_000_000,
+                        result: Err(TransactionError::InstructionError(
+                            0,
+                            solana_transaction::InstructionError::Custom(0),
+                        )),
+                    },
+                    CommitTransactionDetails::Committed {
+                        compute_units: 10,
+                        loaded_accounts_data_size: 2048,
+                        fee_payer_post_balance: 2_000_000,
+                        result: Ok(()),
+                    },
+                    CommitTransactionDetails::NotCommitted(
+                        TransactionError::InsufficientFundsForFee,
+                    ),
+                ],
+                &bank,
+            )
+            .collect::<Vec<_>>();
+
+            assert_eq!(
+                responses,
+                &[
+                    ExecutionResponse {
+                        not_included_reason: not_included_reasons::SANITIZE_FAILURE,
+                        cost_units: 0,
+                        fee_payer_balance: 0
+                    },
+                    ExecutionResponse {
+                        not_included_reason: not_included_reasons::NONE,
+                        cost_units: 1337,
+                        fee_payer_balance: 1_000_000,
+                    },
+                    ExecutionResponse {
+                        not_included_reason: not_included_reasons::NONE,
+                        cost_units: 1341,
+                        fee_payer_balance: 2_000_000,
+                    },
+                    ExecutionResponse {
+                        not_included_reason: not_included_reasons::INSUFFICIENT_FUNDS_FOR_FEE,
+                        cost_units: 0,
+                        fee_payer_balance: 0,
+                    }
+                ]
+            )
+        }
+
+        #[test]
+        fn test_reason_from_packet_handling_error() {
+            assert_eq!(
+                ExternalWorker::reason_from_packet_handling_error(
+                    &PacketHandlingError::Sanitization
+                ),
+                not_included_reasons::SANITIZE_FAILURE
+            );
+            assert_eq!(
+                ExternalWorker::reason_from_packet_handling_error(
+                    &PacketHandlingError::LockValidation
+                ),
+                not_included_reasons::SANITIZE_FAILURE
+            );
+            assert_eq!(
+                ExternalWorker::reason_from_packet_handling_error(
+                    &PacketHandlingError::ComputeBudget
+                ),
+                not_included_reasons::SANITIZE_FAILURE
+            );
+
+            assert_eq!(
+                ExternalWorker::reason_from_packet_handling_error(
+                    &PacketHandlingError::ALTResolution
+                ),
+                not_included_reasons::ADDRESS_LOOKUP_TABLE_NOT_FOUND
+            );
+        }
     }
 }
 /// Helper function to create an non-blocking iterator over work in the receiver,

+ 6 - 2
core/src/banking_stage/transaction_scheduler/receive_and_buffer.rs

@@ -226,6 +226,7 @@ pub(crate) enum PacketHandlingError {
     Sanitization,
     LockValidation,
     ComputeBudget,
+    ALTResolution,
 }
 
 impl TransactionViewReceiveAndBuffer {
@@ -350,7 +351,10 @@ impl TransactionViewReceiveAndBuffer {
                             transaction_account_lock_limit,
                         ) {
                             Ok(state) => Ok(state),
-                            Err(PacketHandlingError::Sanitization) => {
+                            Err(
+                                PacketHandlingError::Sanitization
+                                | PacketHandlingError::ALTResolution,
+                            ) => {
                                 num_dropped_on_parsing_and_sanitization += 1;
                                 Err(())
                             }
@@ -495,7 +499,7 @@ pub(crate) fn load_addresses_for_view<D: TransactionData>(
             .map(|(loaded_addresses, deactivation_slot)| {
                 (Some(loaded_addresses), deactivation_slot)
             })
-            .map_err(|_| PacketHandlingError::Sanitization),
+            .map_err(|_| PacketHandlingError::ALTResolution),
     }
 }
 

+ 2 - 2
scheduling-utils/src/error.rs

@@ -4,14 +4,14 @@ use {
 };
 
 /// Translate
-pub fn transaction_result_to_not_included_reason(result: Result<(), TransactionError>) -> u8 {
+pub fn transaction_result_to_not_included_reason(result: &Result<(), TransactionError>) -> u8 {
     match result {
         Ok(()) => not_included_reasons::NONE,
         Err(err) => transaction_error_to_not_included_reason(err),
     }
 }
 
-pub fn transaction_error_to_not_included_reason(error: TransactionError) -> u8 {
+pub fn transaction_error_to_not_included_reason(error: &TransactionError) -> u8 {
     match error {
         TransactionError::AccountInUse => not_included_reasons::ACCOUNT_IN_USE,
         TransactionError::AccountLoadedTwice => not_included_reasons::ACCOUNT_LOADED_TWICE,

+ 2 - 0
scheduling-utils/src/lib.rs

@@ -13,4 +13,6 @@ pub mod error;
 #[cfg(unix)]
 pub mod handshake;
 #[cfg(unix)]
+pub mod responses_region;
+#[cfg(unix)]
 pub mod transaction_ptr;

+ 50 - 0
scheduling-utils/src/responses_region.rs

@@ -0,0 +1,50 @@
+use {
+    agave_scheduler_bindings::{
+        worker_message_types::{ExecutionResponse, EXECUTION_RESPONSE},
+        TransactionResponseRegion,
+    },
+    rts_alloc::Allocator,
+};
+
+/// Prepare a [`TransactionResponseRegion`] with [`ExecutionResponse`].
+pub fn execution_responses_from_iter(
+    allocator: &Allocator,
+    iter: impl ExactSizeIterator<Item = ExecutionResponse>,
+) -> Option<TransactionResponseRegion> {
+    // SAFETY: EXECUTION_RESPONSE -> ExecutionResponse
+    unsafe { from_iterator(allocator, EXECUTION_RESPONSE, iter) }
+}
+
+/// Prepare a [`TransactionResponseRegion`] from an iterator.
+///
+/// # Safety
+/// - T must be a valid response type
+/// - `tag` must match the `T`
+unsafe fn from_iterator<T: Sized>(
+    allocator: &Allocator,
+    tag: u8,
+    iter: impl ExactSizeIterator<Item = T>,
+) -> Option<TransactionResponseRegion> {
+    let num_transaction_responses = iter.len();
+    let size = num_transaction_responses.wrapping_mul(core::mem::size_of::<T>());
+    let response_ptr = allocator.allocate(size as u32)?.cast::<T>();
+
+    for (index, response) in iter.enumerate() {
+        debug_assert!(
+            response_ptr.is_aligned(),
+            "allocator should guarantee alignment for the response types of interest"
+        );
+
+        // SAFETY: `response_ptr` is sufficiently sized to fit the response vector.
+        unsafe { response_ptr.add(index).write(response) };
+    }
+
+    // SAFETY: `response_ptr` was allocated from the allocator.
+    let transaction_responses_offset = unsafe { allocator.offset(response_ptr.cast()) };
+
+    Some(TransactionResponseRegion {
+        tag,
+        num_transaction_responses: num_transaction_responses as u8,
+        transaction_responses_offset,
+    })
+}