|
|
@@ -49,7 +49,7 @@ use {
|
|
|
solana_transaction::sanitized::SanitizedTransaction,
|
|
|
solana_transaction_error::{TransactionError, TransactionResult as Result},
|
|
|
solana_unified_scheduler_logic::{
|
|
|
- BlockSize,
|
|
|
+ BlockSize, Capability, OrderedTaskId,
|
|
|
SchedulingMode::{self, BlockProduction, BlockVerification},
|
|
|
SchedulingStateMachine, Task, UsageQueue,
|
|
|
},
|
|
|
@@ -82,11 +82,11 @@ use crate::sleepless_testing::BuilderTracked;
|
|
|
#[allow(dead_code)]
|
|
|
#[derive(Debug)]
|
|
|
enum CheckPoint<'a> {
|
|
|
- NewTask(usize),
|
|
|
- NewBufferedTask(usize),
|
|
|
- BufferedTask(usize),
|
|
|
- TaskHandled(usize),
|
|
|
- TaskAccumulated(usize, &'a Result<()>),
|
|
|
+ NewTask(OrderedTaskId),
|
|
|
+ NewBufferedTask(OrderedTaskId),
|
|
|
+ BufferedTask(OrderedTaskId),
|
|
|
+ TaskHandled(OrderedTaskId),
|
|
|
+ TaskAccumulated(OrderedTaskId, &'a Result<()>),
|
|
|
SessionEnding,
|
|
|
SessionFinished(Option<Slot>),
|
|
|
SchedulerThreadAborted,
|
|
|
@@ -230,7 +230,7 @@ impl HandlerContext {
|
|
|
fn usage_queue_loader_for_newly_spawned(&self) -> UsageQueueLoader {
|
|
|
match self.banking_stage_helper.clone() {
|
|
|
None => UsageQueueLoader::OwnedBySelf {
|
|
|
- usage_queue_loader_inner: UsageQueueLoaderInner::default(),
|
|
|
+ usage_queue_loader_inner: UsageQueueLoaderInner::new(Capability::FifoQueueing),
|
|
|
},
|
|
|
Some(helper) => UsageQueueLoader::SharedWithBankingStage {
|
|
|
banking_stage_helper: helper,
|
|
|
@@ -347,7 +347,7 @@ const BANKING_STAGE_MAX_TASK_ID: usize = usize::MAX / 2;
|
|
|
impl BankingStageHelper {
|
|
|
fn new(new_task_sender: Sender<NewTaskPayload>) -> Self {
|
|
|
Self {
|
|
|
- usage_queue_loader: UsageQueueLoaderInner::default(),
|
|
|
+ usage_queue_loader: UsageQueueLoaderInner::new(Capability::PriorityQueueing),
|
|
|
next_task_id: AtomicUsize::default(),
|
|
|
new_task_sender,
|
|
|
}
|
|
|
@@ -376,22 +376,22 @@ impl BankingStageHelper {
|
|
|
pub fn create_new_task(
|
|
|
&self,
|
|
|
transaction: RuntimeTransaction<SanitizedTransaction>,
|
|
|
- index: usize,
|
|
|
+ task_id: OrderedTaskId,
|
|
|
consumed_block_size: BlockSize,
|
|
|
) -> Task {
|
|
|
SchedulingStateMachine::create_block_production_task(
|
|
|
transaction,
|
|
|
- index,
|
|
|
+ task_id,
|
|
|
consumed_block_size,
|
|
|
&mut |pubkey| self.usage_queue_loader.load(pubkey),
|
|
|
)
|
|
|
}
|
|
|
|
|
|
fn recreate_task(&self, executed_task: Box<ExecutedTask>) -> Task {
|
|
|
- let new_index = self.generate_task_ids(1);
|
|
|
+ let new_task_id = self.regenerated_task_id(executed_task.task.task_id());
|
|
|
let consumed_block_size = executed_task.consumed_block_size();
|
|
|
let transaction = executed_task.into_transaction();
|
|
|
- self.create_new_task(transaction, new_index, consumed_block_size)
|
|
|
+ self.create_new_task(transaction, new_task_id, consumed_block_size)
|
|
|
}
|
|
|
|
|
|
pub fn send_new_task(&self, task: Task) {
|
|
|
@@ -399,6 +399,18 @@ impl BankingStageHelper {
|
|
|
.send(NewTaskPayload::Payload(task))
|
|
|
.unwrap();
|
|
|
}
|
|
|
+
|
|
|
+ pub fn new_task_id(task_id: usize, priority: u64) -> OrderedTaskId {
|
|
|
+ // Use wrapping_sub to avoid a clippy::arithmetic_side_effects false positive...
|
|
|
+ // Actually won't ever wrap, thanks to MAX.
|
|
|
+ let reversed_priority = u64::MAX.wrapping_sub(priority) as OrderedTaskId;
|
|
|
+ (reversed_priority << const { OrderedTaskId::BITS / 2 }) | (task_id as OrderedTaskId)
|
|
|
+ }
|
|
|
+
|
|
|
+ fn regenerated_task_id(&self, executed_task_id: OrderedTaskId) -> OrderedTaskId {
|
|
|
+ const REVERSED_PRIORITY_MASK: OrderedTaskId = 0xffff_ffff_ffff_ffff_0000_0000_0000_0000;
|
|
|
+ (executed_task_id & REVERSED_PRIORITY_MASK) | (self.generate_task_ids(1) as OrderedTaskId)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
pub type DefaultSchedulerPool =
|
|
|
@@ -1015,7 +1027,7 @@ impl TaskHandler for DefaultTaskHandler {
|
|
|
) {
|
|
|
let bank = scheduling_context.bank().unwrap();
|
|
|
let transaction = task.transaction();
|
|
|
- let index = task.task_index();
|
|
|
+ let task_id = task.task_id();
|
|
|
|
|
|
let batch = match scheduling_context.mode() {
|
|
|
BlockVerification => {
|
|
|
@@ -1061,7 +1073,10 @@ impl TaskHandler for DefaultTaskHandler {
|
|
|
}
|
|
|
};
|
|
|
let transaction_indexes = match scheduling_context.mode() {
|
|
|
- BlockVerification => vec![index],
|
|
|
+ BlockVerification => {
|
|
|
+ // Blcok verification's task_id should always be within usize.
|
|
|
+ vec![task_id.try_into().unwrap()]
|
|
|
+ }
|
|
|
BlockProduction => {
|
|
|
// Create a placeholder vec, which will be populated later if
|
|
|
// transaction_status_sender is Some(_).
|
|
|
@@ -1141,7 +1156,7 @@ impl TaskHandler for DefaultTaskHandler {
|
|
|
&handler_context.prioritization_fee_cache,
|
|
|
pre_commit_callback,
|
|
|
);
|
|
|
- sleepless_testing::at(CheckPoint::TaskHandled(index));
|
|
|
+ sleepless_testing::at(CheckPoint::TaskHandled(task_id));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -1346,14 +1361,25 @@ mod chained_channel {
|
|
|
/// instance destruction is managed via `solScCleaner`. This struct is here to be put outside
|
|
|
/// `solana-unified-scheduler-logic` for the crate's original intent (separation of concerns from
|
|
|
/// the pure-logic-only crate). Some practical and mundane pruning will be implemented in this type.
|
|
|
-#[derive(Default, Debug)]
|
|
|
+#[derive(Debug)]
|
|
|
struct UsageQueueLoaderInner {
|
|
|
+ capability: Capability,
|
|
|
usage_queues: DashMap<Pubkey, UsageQueue>,
|
|
|
}
|
|
|
|
|
|
impl UsageQueueLoaderInner {
|
|
|
+ fn new(capability: Capability) -> Self {
|
|
|
+ Self {
|
|
|
+ capability,
|
|
|
+ usage_queues: DashMap::default(),
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
fn load(&self, address: Pubkey) -> UsageQueue {
|
|
|
- self.usage_queues.entry(address).or_default().clone()
|
|
|
+ self.usage_queues
|
|
|
+ .entry(address)
|
|
|
+ .or_insert_with(|| UsageQueue::new(&self.capability))
|
|
|
+ .clone()
|
|
|
}
|
|
|
|
|
|
fn count(&self) -> usize {
|
|
|
@@ -1728,7 +1754,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
|
|
|
handler_context: &HandlerContext,
|
|
|
) -> bool {
|
|
|
sleepless_testing::at(CheckPoint::TaskAccumulated(
|
|
|
- executed_task.task.task_index(),
|
|
|
+ executed_task.task.task_id(),
|
|
|
&executed_task.result_with_timings.0,
|
|
|
));
|
|
|
timings.accumulate(&executed_task.result_with_timings.1);
|
|
|
@@ -2083,13 +2109,13 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
|
|
|
|
|
|
match message {
|
|
|
Ok(NewTaskPayload::Payload(task)) => {
|
|
|
- let task_index = task.task_index();
|
|
|
- sleepless_testing::at(CheckPoint::NewTask(task_index));
|
|
|
+ let task_id = task.task_id();
|
|
|
+ sleepless_testing::at(CheckPoint::NewTask(task_id));
|
|
|
|
|
|
if let Some(task) = state_machine.schedule_or_buffer_task(task, session_ending) {
|
|
|
runnable_task_sender.send_aux_payload(task).unwrap();
|
|
|
} else {
|
|
|
- sleepless_testing::at(CheckPoint::BufferedTask(task_index));
|
|
|
+ sleepless_testing::at(CheckPoint::BufferedTask(task_id));
|
|
|
}
|
|
|
}
|
|
|
Ok(NewTaskPayload::CloseSubchannel) => {
|
|
|
@@ -2182,9 +2208,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
|
|
|
// Prepare for the new session.
|
|
|
match new_task_receiver.recv() {
|
|
|
Ok(NewTaskPayload::Payload(task)) => {
|
|
|
- sleepless_testing::at(CheckPoint::NewBufferedTask(
|
|
|
- task.task_index(),
|
|
|
- ));
|
|
|
+ sleepless_testing::at(CheckPoint::NewBufferedTask(task.task_id()));
|
|
|
assert_matches!(scheduling_mode, BlockProduction);
|
|
|
state_machine.buffer_task(task);
|
|
|
}
|
|
|
@@ -2634,9 +2658,9 @@ impl<TH: TaskHandler> InstalledScheduler for PooledScheduler<TH> {
|
|
|
fn schedule_execution(
|
|
|
&self,
|
|
|
transaction: RuntimeTransaction<SanitizedTransaction>,
|
|
|
- index: usize,
|
|
|
+ task_id: OrderedTaskId,
|
|
|
) -> ScheduleResult {
|
|
|
- let task = SchedulingStateMachine::create_task(transaction, index, &mut |pubkey| {
|
|
|
+ let task = SchedulingStateMachine::create_task(transaction, task_id, &mut |pubkey| {
|
|
|
self.inner.usage_queue_loader.load(pubkey)
|
|
|
});
|
|
|
self.inner.thread_manager.send_task(task)
|
|
|
@@ -3311,7 +3335,7 @@ mod tests {
|
|
|
&TestCheckPoint::AfterSchedulerThreadAborted,
|
|
|
]);
|
|
|
|
|
|
- static TASK_COUNT: Mutex<usize> = Mutex::new(0);
|
|
|
+ static TASK_COUNT: Mutex<OrderedTaskId> = Mutex::new(0);
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
struct CountingHandler;
|
|
|
@@ -3351,7 +3375,7 @@ mod tests {
|
|
|
// That's because the scheduler needs to be aborted quickly as an expected behavior,
|
|
|
// leaving some readily-available work untouched. So, schedule rather large number of tasks
|
|
|
// to make the short-cutting abort code-path win the race easily.
|
|
|
- const MAX_TASK_COUNT: usize = 100;
|
|
|
+ const MAX_TASK_COUNT: OrderedTaskId = 100;
|
|
|
|
|
|
for i in 0..MAX_TASK_COUNT {
|
|
|
let tx = RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer(
|
|
|
@@ -3659,10 +3683,10 @@ mod tests {
|
|
|
task: &Task,
|
|
|
_handler_context: &HandlerContext,
|
|
|
) {
|
|
|
- let index = task.task_index();
|
|
|
- if index == 0 {
|
|
|
+ let task_id = task.task_id();
|
|
|
+ if task_id == 0 {
|
|
|
sleepless_testing::at(PanickingHanlderCheckPoint::BeforeNotifiedPanic);
|
|
|
- } else if index == 1 {
|
|
|
+ } else if task_id == 1 {
|
|
|
sleepless_testing::at(PanickingHanlderCheckPoint::BeforeIgnoredPanic);
|
|
|
} else {
|
|
|
unreachable!();
|
|
|
@@ -3679,11 +3703,11 @@ mod tests {
|
|
|
// Use 2 transactions with different timings to deliberately cover the two code paths of
|
|
|
// notifying panics in the handler threads, taken conditionally depending on whether the
|
|
|
// scheduler thread has been aborted already or not.
|
|
|
- const TX_COUNT: usize = 2;
|
|
|
+ const TX_COUNT: OrderedTaskId = 2;
|
|
|
|
|
|
let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
|
|
|
let pool = SchedulerPool::<PooledScheduler<PanickingHandler>, _>::new_dyn(
|
|
|
- Some(TX_COUNT), // fix to use exactly 2 handlers
|
|
|
+ Some(TX_COUNT.try_into().unwrap()), // fix to use exactly 2 handlers
|
|
|
None,
|
|
|
None,
|
|
|
None,
|
|
|
@@ -3693,7 +3717,7 @@ mod tests {
|
|
|
|
|
|
let scheduler = pool.take_scheduler(context);
|
|
|
|
|
|
- for index in 0..TX_COUNT {
|
|
|
+ for task_id in 0..TX_COUNT {
|
|
|
// Use 2 non-conflicting txes to exercise the channel disconnected case as well.
|
|
|
let tx = RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer(
|
|
|
&Keypair::new(),
|
|
|
@@ -3701,7 +3725,7 @@ mod tests {
|
|
|
1,
|
|
|
genesis_config.hash(),
|
|
|
));
|
|
|
- scheduler.schedule_execution(tx, index).unwrap();
|
|
|
+ scheduler.schedule_execution(tx, task_id).unwrap();
|
|
|
}
|
|
|
// finally unblock the scheduler thread; otherwise the above schedule_execution could
|
|
|
// return SchedulerAborted...
|
|
|
@@ -3739,12 +3763,12 @@ mod tests {
|
|
|
task: &Task,
|
|
|
_handler_context: &HandlerContext,
|
|
|
) {
|
|
|
- let index = task.task_index();
|
|
|
+ let task_id = task.task_id();
|
|
|
*TASK_COUNT.lock().unwrap() += 1;
|
|
|
- if index == 1 {
|
|
|
+ if task_id == 1 {
|
|
|
*result = Err(TransactionError::AccountNotFound);
|
|
|
}
|
|
|
- sleepless_testing::at(CheckPoint::TaskHandled(index));
|
|
|
+ sleepless_testing::at(CheckPoint::TaskHandled(task_id));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -3808,8 +3832,8 @@ mod tests {
|
|
|
) {
|
|
|
solana_logger::setup();
|
|
|
|
|
|
- const STALLED_TRANSACTION_INDEX: usize = 0;
|
|
|
- const BLOCKED_TRANSACTION_INDEX: usize = 1;
|
|
|
+ const STALLED_TRANSACTION_INDEX: OrderedTaskId = 0;
|
|
|
+ const BLOCKED_TRANSACTION_INDEX: OrderedTaskId = 1;
|
|
|
|
|
|
let _progress = sleepless_testing::setup(&[
|
|
|
&CheckPoint::BufferedTask(BLOCKED_TRANSACTION_INDEX),
|
|
|
@@ -3828,8 +3852,8 @@ mod tests {
|
|
|
task: &Task,
|
|
|
handler_context: &HandlerContext,
|
|
|
) {
|
|
|
- let index = task.task_index();
|
|
|
- match index {
|
|
|
+ let task_id = task.task_id();
|
|
|
+ match task_id {
|
|
|
STALLED_TRANSACTION_INDEX => {
|
|
|
sleepless_testing::at(TestCheckPoint::AfterSessionEnding);
|
|
|
}
|
|
|
@@ -3981,9 +4005,9 @@ mod tests {
|
|
|
fn test_block_production_scheduler_schedule_execution_retry() {
|
|
|
solana_logger::setup();
|
|
|
|
|
|
- const ORIGINAL_TRANSACTION_INDEX: usize = 999;
|
|
|
+ const ORIGINAL_TRANSACTION_INDEX: OrderedTaskId = 999;
|
|
|
// This is 0 because it's the first task id assigned internally by BankingStageHelper
|
|
|
- const RETRIED_TRANSACTION_INDEX: usize = 0;
|
|
|
+ const RETRIED_TRANSACTION_INDEX: OrderedTaskId = 0;
|
|
|
const FULL_BLOCK_SLOT: Slot = 1;
|
|
|
|
|
|
let _progress = sleepless_testing::setup(&[
|
|
|
@@ -4124,11 +4148,8 @@ mod tests {
|
|
|
task: &Task,
|
|
|
_handler_context: &HandlerContext,
|
|
|
) {
|
|
|
- // The task index must always be matched to the slot.
|
|
|
- assert_eq!(
|
|
|
- task.task_index() as Slot,
|
|
|
- scheduling_context.slot().unwrap()
|
|
|
- );
|
|
|
+ // The task task_id must always be matched to the slot.
|
|
|
+ assert_eq!(task.task_id() as Slot, scheduling_context.slot().unwrap());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -4168,14 +4189,14 @@ mod tests {
|
|
|
let context1 = &SchedulingContext::for_verification(bank1.clone());
|
|
|
|
|
|
// Exercise the scheduler by busy-looping to expose the race condition
|
|
|
- for (context, index) in [(context0, 0), (context1, 1)]
|
|
|
+ for (context, task_id) in [(context0, 0), (context1, 1)]
|
|
|
.into_iter()
|
|
|
.cycle()
|
|
|
.take(10000)
|
|
|
{
|
|
|
let scheduler = pool.take_scheduler(context.clone());
|
|
|
scheduler
|
|
|
- .schedule_execution(dummy_tx.clone(), index)
|
|
|
+ .schedule_execution(dummy_tx.clone(), task_id)
|
|
|
.unwrap();
|
|
|
scheduler.wait_for_termination(false).1.return_to_pool();
|
|
|
}
|
|
|
@@ -4219,7 +4240,7 @@ mod tests {
|
|
|
fn schedule_execution(
|
|
|
&self,
|
|
|
transaction: RuntimeTransaction<SanitizedTransaction>,
|
|
|
- index: usize,
|
|
|
+ task_id: OrderedTaskId,
|
|
|
) -> ScheduleResult {
|
|
|
let context = self.context().clone();
|
|
|
let pool = self.3.clone();
|
|
|
@@ -4232,8 +4253,8 @@ mod tests {
|
|
|
let mut result = Ok(());
|
|
|
let mut timings = ExecuteTimings::default();
|
|
|
|
|
|
- let task = SchedulingStateMachine::create_task(transaction, index, &mut |_| {
|
|
|
- UsageQueue::default()
|
|
|
+ let task = SchedulingStateMachine::create_task(transaction, task_id, &mut |_| {
|
|
|
+ UsageQueue::new(&Capability::FifoQueueing)
|
|
|
});
|
|
|
|
|
|
<DefaultTaskHandler as TaskHandler>::handle(
|
|
|
@@ -4472,7 +4493,9 @@ mod tests {
|
|
|
transaction_recorder: None,
|
|
|
};
|
|
|
|
|
|
- let task = SchedulingStateMachine::create_task(tx, 0, &mut |_| UsageQueue::default());
|
|
|
+ let task = SchedulingStateMachine::create_task(tx, 0, &mut |_| {
|
|
|
+ UsageQueue::new(&Capability::FifoQueueing)
|
|
|
+ });
|
|
|
DefaultTaskHandler::handle(result, timings, scheduling_context, &task, handler_context);
|
|
|
assert_matches!(result, Err(TransactionError::AccountLoadedTwice));
|
|
|
}
|
|
|
@@ -4566,8 +4589,9 @@ mod tests {
|
|
|
transaction_recorder: Some(transaction_recorder),
|
|
|
};
|
|
|
|
|
|
- let task =
|
|
|
- SchedulingStateMachine::create_task(tx.clone(), 0, &mut |_| UsageQueue::default());
|
|
|
+ let task = SchedulingStateMachine::create_task(tx.clone(), 0, &mut |_| {
|
|
|
+ UsageQueue::new(&Capability::FifoQueueing)
|
|
|
+ });
|
|
|
|
|
|
// wait until the poh's working bank is cleared.
|
|
|
// also flush signal_receiver after that.
|
|
|
@@ -4701,9 +4725,9 @@ mod tests {
|
|
|
fn create_new_unconstrained_task(
|
|
|
&self,
|
|
|
transaction: RuntimeTransaction<SanitizedTransaction>,
|
|
|
- index: usize,
|
|
|
+ task_id: OrderedTaskId,
|
|
|
) -> Task {
|
|
|
- self.create_new_task(transaction, index, NO_CONSUMED_BLOCK_SIZE)
|
|
|
+ self.create_new_task(transaction, task_id, NO_CONSUMED_BLOCK_SIZE)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -5183,11 +5207,11 @@ mod tests {
|
|
|
..
|
|
|
} = create_genesis_config_for_block_production(10_000);
|
|
|
|
|
|
- const DISCARDED_TASK_COUNT: usize = 3;
|
|
|
+ const DISCARDED_TASK_COUNT: OrderedTaskId = 3;
|
|
|
let _progress = sleepless_testing::setup(&[
|
|
|
&CheckPoint::NewBufferedTask(DISCARDED_TASK_COUNT - 1),
|
|
|
&CheckPoint::DiscardRequested,
|
|
|
- &CheckPoint::Discarded(DISCARDED_TASK_COUNT),
|
|
|
+ &CheckPoint::Discarded(DISCARDED_TASK_COUNT.try_into().unwrap()),
|
|
|
&TestCheckPoint::AfterDiscarded,
|
|
|
]);
|
|
|
|
|
|
@@ -5215,8 +5239,8 @@ mod tests {
|
|
|
));
|
|
|
let fixed_banking_packet_handler =
|
|
|
Box::new(move |helper: &BankingStageHelper, _banking_packet| {
|
|
|
- for index in 0..DISCARDED_TASK_COUNT {
|
|
|
- helper.send_new_task(helper.create_new_unconstrained_task(tx0.clone(), index))
|
|
|
+ for task_id in 0..DISCARDED_TASK_COUNT {
|
|
|
+ helper.send_new_task(helper.create_new_unconstrained_task(tx0.clone(), task_id))
|
|
|
}
|
|
|
});
|
|
|
|