|
@@ -164,7 +164,7 @@ use {
|
|
|
atomic::{AtomicBool, AtomicU64, Ordering},
|
|
atomic::{AtomicBool, AtomicU64, Ordering},
|
|
|
Arc, Mutex, RwLock,
|
|
Arc, Mutex, RwLock,
|
|
|
},
|
|
},
|
|
|
- thread::{sleep, Builder, JoinHandle},
|
|
|
|
|
|
|
+ thread::{self, Builder, JoinHandle},
|
|
|
time::{Duration, Instant},
|
|
time::{Duration, Instant},
|
|
|
},
|
|
},
|
|
|
strum::VariantNames,
|
|
strum::VariantNames,
|
|
@@ -308,6 +308,8 @@ pub struct GeneratorConfig {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
pub struct ValidatorConfig {
|
|
pub struct ValidatorConfig {
|
|
|
|
|
+ /// The destination file for validator logs; `stderr` is used if `None`
|
|
|
|
|
+ pub logfile: Option<PathBuf>,
|
|
|
pub halt_at_slot: Option<Slot>,
|
|
pub halt_at_slot: Option<Slot>,
|
|
|
pub expected_genesis_hash: Option<Hash>,
|
|
pub expected_genesis_hash: Option<Hash>,
|
|
|
pub expected_bank_hash: Option<Hash>,
|
|
pub expected_bank_hash: Option<Hash>,
|
|
@@ -391,6 +393,7 @@ impl ValidatorConfig {
|
|
|
NonZeroUsize::new(num_cpus::get()).expect("thread count is non-zero");
|
|
NonZeroUsize::new(num_cpus::get()).expect("thread count is non-zero");
|
|
|
|
|
|
|
|
Self {
|
|
Self {
|
|
|
|
|
+ logfile: None,
|
|
|
halt_at_slot: None,
|
|
halt_at_slot: None,
|
|
|
expected_genesis_hash: None,
|
|
expected_genesis_hash: None,
|
|
|
expected_bank_hash: None,
|
|
expected_bank_hash: None,
|
|
@@ -615,6 +618,11 @@ impl ValidatorTpuConfig {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
pub struct Validator {
|
|
pub struct Validator {
|
|
|
|
|
+ /// The destination file for validator logs; `stderr` is used if `None`
|
|
|
|
|
+ #[cfg_attr(not(unix), allow(dead_code))]
|
|
|
|
|
+ logfile: Option<PathBuf>,
|
|
|
|
|
+ /// A global flag to indicate communicate shutdown between threads
|
|
|
|
|
+ exit: Arc<AtomicBool>,
|
|
|
validator_exit: Arc<RwLock<Exit>>,
|
|
validator_exit: Arc<RwLock<Exit>>,
|
|
|
json_rpc_service: Option<JsonRpcService>,
|
|
json_rpc_service: Option<JsonRpcService>,
|
|
|
pubsub_service: Option<PubSubService>,
|
|
pubsub_service: Option<PubSubService>,
|
|
@@ -1728,7 +1736,7 @@ impl Validator {
|
|
|
blockstore.clone(),
|
|
blockstore.clone(),
|
|
|
&config.broadcast_stage_type,
|
|
&config.broadcast_stage_type,
|
|
|
xdp_sender,
|
|
xdp_sender,
|
|
|
- exit,
|
|
|
|
|
|
|
+ exit.clone(),
|
|
|
node.info.shred_version(),
|
|
node.info.shred_version(),
|
|
|
vote_tracker,
|
|
vote_tracker,
|
|
|
bank_forks.clone(),
|
|
bank_forks.clone(),
|
|
@@ -1803,6 +1811,8 @@ impl Validator {
|
|
|
});
|
|
});
|
|
|
|
|
|
|
|
Ok(Self {
|
|
Ok(Self {
|
|
|
|
|
+ logfile: config.logfile.clone(),
|
|
|
|
|
+ exit,
|
|
|
stats_reporter_service,
|
|
stats_reporter_service,
|
|
|
gossip_service,
|
|
gossip_service,
|
|
|
serve_repair_service,
|
|
serve_repair_service,
|
|
@@ -1839,6 +1849,53 @@ impl Validator {
|
|
|
})
|
|
})
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ /// Register and listen for signals that the validator will act on. Also,
|
|
|
|
|
+ /// monitor the validator's exit flag incase a shutdown has been initated
|
|
|
|
|
+ /// by one of the validator threads
|
|
|
|
|
+ pub fn listen_for_signals(&self) -> Result<()> {
|
|
|
|
|
+ // Reopen the logfile when the SIGUSR1 signal is received; this provides
|
|
|
|
|
+ // a hook for working with logrotate
|
|
|
|
|
+ let sigusr1_flag = Arc::new(AtomicBool::new(false));
|
|
|
|
|
+ #[cfg(unix)]
|
|
|
|
|
+ {
|
|
|
|
|
+ if self.logfile.is_some() {
|
|
|
|
|
+ signal_hook::flag::register(libc::SIGUSR1, sigusr1_flag.clone())?;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ info!("Validator::listen_for_signals() has started");
|
|
|
|
|
+ loop {
|
|
|
|
|
+ if self.exit.load(Ordering::Relaxed) {
|
|
|
|
|
+ break;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if sigusr1_flag.load(Ordering::Relaxed) {
|
|
|
|
|
+ #[cfg(unix)]
|
|
|
|
|
+ {
|
|
|
|
|
+ if let Some(logfile) = self.logfile.as_ref() {
|
|
|
|
|
+ info!("Received SIGUSR1, reopening {}", logfile.display());
|
|
|
|
|
+ agave_logger::redirect_stderr(logfile);
|
|
|
|
|
+ // Reset the flag to `false` to allow detection of the
|
|
|
|
|
+ // signal again and to avoid hitting this case every
|
|
|
|
|
+ // iteration
|
|
|
|
|
+ sigusr1_flag.store(false, Ordering::Relaxed);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ #[cfg(not(unix))]
|
|
|
|
|
+ {
|
|
|
|
|
+ unreachable!("The SIGUSR1 signal is only handled on unix systems");
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // One second is a reasonable response time for these signals to
|
|
|
|
|
+ // avoid this thread from being overly active
|
|
|
|
|
+ thread::sleep(Duration::from_secs(1));
|
|
|
|
|
+ }
|
|
|
|
|
+ info!("Validator::listen_for_signals() has stopped");
|
|
|
|
|
+
|
|
|
|
|
+ Ok(())
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
// Used for notifying many nodes in parallel to exit
|
|
// Used for notifying many nodes in parallel to exit
|
|
|
pub fn exit(&mut self) {
|
|
pub fn exit(&mut self) {
|
|
|
self.validator_exit.write().unwrap().exit();
|
|
self.validator_exit.write().unwrap().exit();
|
|
@@ -2340,7 +2397,7 @@ impl<'a> ProcessBlockStore<'a> {
|
|
|
let slot = bank_forks.read().unwrap().working_bank().slot();
|
|
let slot = bank_forks.read().unwrap().working_bank().slot();
|
|
|
*start_progress.write().unwrap() =
|
|
*start_progress.write().unwrap() =
|
|
|
ValidatorStartProgress::ProcessingLedger { slot, max_slot };
|
|
ValidatorStartProgress::ProcessingLedger { slot, max_slot };
|
|
|
- sleep(Duration::from_secs(2));
|
|
|
|
|
|
|
+ thread::sleep(Duration::from_secs(2));
|
|
|
}
|
|
}
|
|
|
})
|
|
})
|
|
|
.unwrap();
|
|
.unwrap();
|
|
@@ -2777,7 +2834,7 @@ fn wait_for_supermajority(
|
|
|
// prevent load balancers from removing the node from their list of candidates during a
|
|
// prevent load balancers from removing the node from their list of candidates during a
|
|
|
// manual restart.
|
|
// manual restart.
|
|
|
rpc_override_health_check.store(true, Ordering::Relaxed);
|
|
rpc_override_health_check.store(true, Ordering::Relaxed);
|
|
|
- sleep(Duration::new(1, 0));
|
|
|
|
|
|
|
+ thread::sleep(Duration::new(1, 0));
|
|
|
}
|
|
}
|
|
|
rpc_override_health_check.store(false, Ordering::Relaxed);
|
|
rpc_override_health_check.store(false, Ordering::Relaxed);
|
|
|
Ok(true)
|
|
Ok(true)
|