Quellcode durchsuchen

feat(scheduler-bindings): ExternalWorker skeleton loop (#8703)

* SharableTransactionBatchRegion unconditionally Clone/Copy

* ExternalWorker - always responds with invalid_message

* test_backoff

* test_validate_message

* deal with windows

* separate module for external worker
Andrew Fitzgerald vor 3 Wochen
Ursprung
Commit
acced30f4f
3 geänderte Dateien mit 209 neuen und 11 gelöschten Zeilen
  1. 1 0
      core/src/banking_stage.rs
  2. 206 7
      core/src/banking_stage/consume_worker.rs
  3. 2 4
      scheduler-bindings/src/lib.rs

+ 1 - 0
core/src/banking_stage.rs

@@ -64,6 +64,7 @@ pub mod leader_slot_metrics;
 pub mod qos_service;
 pub mod vote_storage;
 
+#[allow(dead_code)]
 mod consume_worker;
 mod vote_worker;
 

+ 206 - 7
core/src/banking_stage/consume_worker.rs

@@ -28,6 +28,12 @@ pub enum ConsumeWorkerError<Tx> {
     Send(#[from] SendError<FinishedConsumeWork<Tx>>),
 }
 
+#[derive(Debug, Error)]
+pub enum ExternalConsumeWorkerError {
+    #[error("Sender disconnected")]
+    SenderDisconnected,
+}
+
 enum ProcessingStatus<Tx> {
     Processed,
     /// Work could not be processed due to lack of bank.
@@ -68,8 +74,6 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
     }
 
     pub fn run(self) -> Result<(), ConsumeWorkerError<Tx>> {
-        const STARTING_SLEEP_DURATION: Duration = Duration::from_micros(250);
-
         let mut did_work = false;
         let mut last_empty_time = Instant::now();
         let mut sleep_duration = STARTING_SLEEP_DURATION;
@@ -93,7 +97,7 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
                     }
                     did_work = false;
                     let idle_duration = now.duration_since(last_empty_time);
-                    backoff(idle_duration, &mut sleep_duration);
+                    sleep_duration = backoff(idle_duration, &sleep_duration);
                 }
                 Err(TryRecvError::Disconnected) => {
                     return Err(ConsumeWorkerError::Recv(TryRecvError::Disconnected))
@@ -173,6 +177,180 @@ impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
     }
 }
 
+#[cfg(unix)]
+mod external {
+    use {
+        super::*,
+        agave_scheduler_bindings::{
+            PackToWorkerMessage, TransactionResponseRegion, WorkerToPackMessage,
+            MAX_TRANSACTIONS_PER_MESSAGE,
+        },
+        agave_scheduling_utils::transaction_ptr::TransactionPtr,
+        agave_transaction_view::resolved_transaction_view::ResolvedTransactionView,
+        solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
+    };
+
+    pub(crate) struct ExternalWorker {
+        exit: Arc<AtomicBool>,
+        receiver: shaq::Consumer<PackToWorkerMessage>,
+        consumer: Consumer,
+        sender: shaq::Producer<WorkerToPackMessage>,
+        allocator: rts_alloc::Allocator,
+
+        metrics: Arc<ConsumeWorkerMetrics>,
+    }
+
+    type Tx = RuntimeTransaction<ResolvedTransactionView<TransactionPtr>>;
+
+    impl ExternalWorker {
+        pub fn new(
+            id: u32,
+            exit: Arc<AtomicBool>,
+            receiver: shaq::Consumer<PackToWorkerMessage>,
+            consumer: Consumer,
+            sender: shaq::Producer<WorkerToPackMessage>,
+            allocator: rts_alloc::Allocator,
+        ) -> Self {
+            Self {
+                exit,
+                receiver,
+                consumer,
+                sender,
+                allocator,
+                metrics: Arc::new(ConsumeWorkerMetrics::new(id)),
+            }
+        }
+
+        pub fn metrics_handle(&self) -> Arc<ConsumeWorkerMetrics> {
+            self.metrics.clone()
+        }
+
+        pub fn run(mut self) -> Result<(), ExternalConsumeWorkerError> {
+            let mut did_work = false;
+            let mut last_empty_time = Instant::now();
+            let mut sleep_duration = STARTING_SLEEP_DURATION;
+
+            while !self.exit.load(Ordering::Relaxed) {
+                if self.receiver.is_empty() {
+                    self.receiver.sync();
+                }
+
+                match self.receiver.try_read() {
+                    Some(message) => {
+                        did_work = true;
+                        self.sender.sync();
+                        // SAFETY: `try_read` gives a ptr to a properly aligned
+                        //         region for a `PackToWorkerMessage`
+                        self.process_message(unsafe { message.as_ref() })?;
+                        self.sender.commit();
+                        self.receiver.finalize();
+                    }
+                    None => {
+                        let now = Instant::now();
+
+                        if did_work {
+                            last_empty_time = now;
+                        }
+                        did_work = false;
+                        let idle_duration = now.duration_since(last_empty_time);
+                        sleep_duration = backoff(idle_duration, &sleep_duration);
+                    }
+                }
+            }
+
+            Ok(())
+        }
+
+        fn process_message(
+            &mut self,
+            message: &PackToWorkerMessage,
+        ) -> Result<(), ExternalConsumeWorkerError> {
+            if !Self::validate_message(message) {
+                return self.return_invalid_message(message);
+            }
+
+            self.metrics
+                .count_metrics
+                .num_messages_processed
+                .fetch_add(1, Ordering::Relaxed);
+            unimplemented!("No flags are currently valid");
+        }
+
+        fn return_invalid_message(
+            &mut self,
+            message: &PackToWorkerMessage,
+        ) -> Result<(), ExternalConsumeWorkerError> {
+            let invalid_message = WorkerToPackMessage {
+                batch: message.batch,
+                processed: 0,
+                responses: TransactionResponseRegion {
+                    tag: 0,
+                    num_transaction_responses: 0,
+                    transaction_responses_offset: 0,
+                },
+            };
+
+            let Some(send_ptr) = self.sender.reserve() else {
+                return Err(ExternalConsumeWorkerError::SenderDisconnected);
+            };
+
+            // SAFETY: `reserve` guarantees a properly aligned space
+            //         for a `WorkerToPackMessage`
+            unsafe { send_ptr.write(invalid_message) };
+
+            Ok(())
+        }
+
+        /// Returns `true` if a message is valid and can be processed.
+        fn validate_message(message: &PackToWorkerMessage) -> bool {
+            message.batch.num_transactions > 0
+                && usize::from(message.batch.num_transactions) <= MAX_TRANSACTIONS_PER_MESSAGE
+                && Self::validate_message_flags(message.flags)
+        }
+
+        fn validate_message_flags(_flags: u16) -> bool {
+            false // no flags are valid currently
+        }
+    }
+
+    #[cfg(test)]
+    mod tests {
+        use super::*;
+
+        #[test]
+        fn test_validate_message() {
+            let mut message = PackToWorkerMessage {
+                flags: agave_scheduler_bindings::pack_message_flags::NONE,
+                max_execution_slot: u64::MAX,
+                batch: agave_scheduler_bindings::SharableTransactionBatchRegion {
+                    num_transactions: 0,
+                    transactions_offset: 0,
+                },
+            };
+
+            // No transactions = invalid
+            assert!(!ExternalWorker::validate_message(&message));
+
+            // Too many transactions = invalid.
+            message.batch.num_transactions = MAX_TRANSACTIONS_PER_MESSAGE as u8 + 1;
+            assert!(!ExternalWorker::validate_message(&message));
+
+            // Bad flags = invalid
+            message.batch.num_transactions = 1;
+            assert!(!ExternalWorker::validate_message(&message));
+        }
+
+        #[test]
+        fn test_validate_message_flags() {
+            assert!(!ExternalWorker::validate_message_flags(
+                agave_scheduler_bindings::pack_message_flags::NONE
+            ));
+            assert!(!ExternalWorker::validate_message_flags(
+                agave_scheduler_bindings::pack_message_flags::RESOLVE
+            ));
+        }
+    }
+}
 /// Helper function to create an non-blocking iterator over work in the receiver,
 /// starting with the given work item.
 fn try_drain_iter<T>(work: T, receiver: &Receiver<T>) -> impl Iterator<Item = T> + '_ {
@@ -223,15 +401,18 @@ fn active_leader_state(
     }
 }
 
-fn backoff(idle_duration: Duration, sleep_duration: &mut Duration) {
-    const MAX_SLEEP_DURATION: Duration = Duration::from_millis(1);
-    const IDLE_SLEEP_THRESHOLD: Duration = Duration::from_millis(1);
+const STARTING_SLEEP_DURATION: Duration = Duration::from_micros(250);
+const MAX_SLEEP_DURATION: Duration = Duration::from_millis(1);
+const IDLE_SLEEP_THRESHOLD: Duration = Duration::from_millis(1);
 
+/// Sleeps for the specified time. Returns the next sleep duration to use.
+fn backoff(idle_duration: Duration, sleep_duration: &Duration) -> Duration {
     if idle_duration < IDLE_SLEEP_THRESHOLD {
         core::hint::spin_loop();
+        *sleep_duration
     } else {
         std::thread::sleep(*sleep_duration);
-        *sleep_duration = sleep_duration.saturating_mul(2).min(MAX_SLEEP_DURATION);
+        sleep_duration.saturating_mul(2).min(MAX_SLEEP_DURATION)
     }
 }
 
@@ -1301,4 +1482,22 @@ mod tests {
         drop(test_frame);
         let _ = worker_thread.join().unwrap();
     }
+
+    #[test]
+    fn test_backoff() {
+        let sleep_duration = STARTING_SLEEP_DURATION;
+
+        // No idle time - does not increase duration for next sleep.
+        let sleep_duration = backoff(Duration::ZERO, &sleep_duration);
+        assert_eq!(sleep_duration, STARTING_SLEEP_DURATION);
+
+        // Longer time idling we sleep and double the next time.
+        let sleep_duration = backoff(IDLE_SLEEP_THRESHOLD, &sleep_duration);
+        assert_eq!(sleep_duration, STARTING_SLEEP_DURATION.saturating_mul(2));
+
+        // Maximum sleep time
+        let sleep_duration = Duration::from_micros(900);
+        let sleep_duration = backoff(IDLE_SLEEP_THRESHOLD, &sleep_duration);
+        assert_eq!(sleep_duration, MAX_SLEEP_DURATION);
+    }
 }

+ 2 - 4
scheduler-bindings/src/lib.rs

@@ -94,10 +94,8 @@ pub struct SharablePubkeys {
 /// 4. External pack process frees all transaction memory pointed to by the
 ///    [`SharableTransactionRegion`] in the batch, then frees the memory for
 ///    the array of [`SharableTransactionRegion`].
-#[cfg_attr(
-    feature = "dev-context-only-utils",
-    derive(Debug, Clone, Copy, PartialEq, Eq)
-)]
+#[cfg_attr(feature = "dev-context-only-utils", derive(Debug, PartialEq, Eq))]
+#[derive(Clone, Copy)]
 #[repr(C)]
 pub struct SharableTransactionBatchRegion {
     /// Number of transactions in the batch.