admin_rpc_service.rs 66 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659
  1. use {
  2. crossbeam_channel::Sender,
  3. jsonrpc_core::{BoxFuture, ErrorCode, MetaIoHandler, Metadata, Result},
  4. jsonrpc_core_client::{transports::ipc, RpcError},
  5. jsonrpc_derive::rpc,
  6. jsonrpc_ipc_server::{
  7. tokio::sync::oneshot::channel as oneshot_channel, RequestContext, ServerBuilder,
  8. },
  9. log::*,
  10. serde::{de::Deserializer, Deserialize, Serialize},
  11. solana_accounts_db::accounts_index::AccountIndex,
  12. solana_core::{
  13. admin_rpc_post_init::AdminRpcRequestMetadataPostInit,
  14. banking_stage::{
  15. transaction_scheduler::scheduler_controller::SchedulerConfig, BankingControlMsg,
  16. BankingStage,
  17. },
  18. consensus::{tower_storage::TowerStorage, Tower},
  19. repair::repair_service,
  20. validator::{
  21. BlockProductionMethod, SchedulerPacing, TransactionStructure, ValidatorStartProgress,
  22. },
  23. },
  24. solana_geyser_plugin_manager::GeyserPluginManagerRequest,
  25. solana_gossip::contact_info::{ContactInfo, Protocol, SOCKET_ADDR_UNSPECIFIED},
  26. solana_keypair::{read_keypair_file, Keypair},
  27. solana_pubkey::Pubkey,
  28. solana_rpc::rpc::verify_pubkey,
  29. solana_rpc_client_api::{config::RpcAccountIndex, custom_error::RpcCustomError},
  30. solana_signer::Signer,
  31. solana_validator_exit::Exit,
  32. std::{
  33. collections::{HashMap, HashSet},
  34. env, error,
  35. fmt::{self, Display},
  36. net::{IpAddr, SocketAddr},
  37. num::NonZeroUsize,
  38. path::{Path, PathBuf},
  39. sync::{
  40. atomic::{AtomicBool, Ordering},
  41. Arc, RwLock,
  42. },
  43. thread::{self, Builder},
  44. time::{Duration, SystemTime},
  45. },
  46. tokio::runtime::Runtime,
  47. };
  48. #[derive(Clone)]
  49. pub struct AdminRpcRequestMetadata {
  50. pub rpc_addr: Option<SocketAddr>,
  51. pub start_time: SystemTime,
  52. pub start_progress: Arc<RwLock<ValidatorStartProgress>>,
  53. pub validator_exit: Arc<RwLock<Exit>>,
  54. pub validator_exit_backpressure: HashMap<String, Arc<AtomicBool>>,
  55. pub authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
  56. pub tower_storage: Arc<dyn TowerStorage>,
  57. pub staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
  58. pub post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
  59. pub rpc_to_plugin_manager_sender: Option<Sender<GeyserPluginManagerRequest>>,
  60. }
  61. impl Metadata for AdminRpcRequestMetadata {}
  62. impl AdminRpcRequestMetadata {
  63. fn with_post_init<F, R>(&self, func: F) -> Result<R>
  64. where
  65. F: FnOnce(&AdminRpcRequestMetadataPostInit) -> Result<R>,
  66. {
  67. if let Some(post_init) = self.post_init.read().unwrap().as_ref() {
  68. func(post_init)
  69. } else {
  70. Err(jsonrpc_core::error::Error::invalid_params(
  71. "Retry once validator start up is complete",
  72. ))
  73. }
  74. }
  75. }
  76. #[derive(Debug, Deserialize, Serialize)]
  77. pub struct AdminRpcContactInfo {
  78. pub id: String,
  79. pub gossip: SocketAddr,
  80. pub tvu: SocketAddr,
  81. pub tvu_quic: SocketAddr,
  82. pub serve_repair_quic: SocketAddr,
  83. pub tpu: SocketAddr,
  84. pub tpu_forwards: SocketAddr,
  85. pub tpu_vote: SocketAddr,
  86. pub rpc: SocketAddr,
  87. pub rpc_pubsub: SocketAddr,
  88. pub serve_repair: SocketAddr,
  89. pub last_updated_timestamp: u64,
  90. pub shred_version: u16,
  91. }
  92. #[derive(Debug, Deserialize, Serialize)]
  93. pub struct AdminRpcRepairWhitelist {
  94. pub whitelist: Vec<Pubkey>,
  95. }
  96. impl From<ContactInfo> for AdminRpcContactInfo {
  97. fn from(node: ContactInfo) -> Self {
  98. macro_rules! unwrap_socket {
  99. ($name:ident) => {
  100. node.$name().unwrap_or(SOCKET_ADDR_UNSPECIFIED)
  101. };
  102. ($name:ident, $protocol:expr) => {
  103. node.$name($protocol).unwrap_or(SOCKET_ADDR_UNSPECIFIED)
  104. };
  105. }
  106. Self {
  107. id: node.pubkey().to_string(),
  108. last_updated_timestamp: node.wallclock(),
  109. gossip: unwrap_socket!(gossip),
  110. tvu: unwrap_socket!(tvu, Protocol::UDP),
  111. tvu_quic: unwrap_socket!(tvu, Protocol::QUIC),
  112. serve_repair_quic: unwrap_socket!(serve_repair, Protocol::QUIC),
  113. tpu: unwrap_socket!(tpu, Protocol::UDP),
  114. tpu_forwards: unwrap_socket!(tpu_forwards, Protocol::UDP),
  115. tpu_vote: unwrap_socket!(tpu_vote, Protocol::UDP),
  116. rpc: unwrap_socket!(rpc),
  117. rpc_pubsub: unwrap_socket!(rpc_pubsub),
  118. serve_repair: unwrap_socket!(serve_repair, Protocol::UDP),
  119. shred_version: node.shred_version(),
  120. }
  121. }
  122. }
  123. impl Display for AdminRpcContactInfo {
  124. fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
  125. writeln!(f, "Identity: {}", self.id)?;
  126. writeln!(f, "Gossip: {}", self.gossip)?;
  127. writeln!(f, "TVU: {}", self.tvu)?;
  128. writeln!(f, "TVU QUIC: {}", self.tvu_quic)?;
  129. writeln!(f, "TPU: {}", self.tpu)?;
  130. writeln!(f, "TPU Forwards: {}", self.tpu_forwards)?;
  131. writeln!(f, "TPU Votes: {}", self.tpu_vote)?;
  132. writeln!(f, "RPC: {}", self.rpc)?;
  133. writeln!(f, "RPC Pubsub: {}", self.rpc_pubsub)?;
  134. writeln!(f, "Serve Repair: {}", self.serve_repair)?;
  135. writeln!(f, "Last Updated Timestamp: {}", self.last_updated_timestamp)?;
  136. writeln!(f, "Shred Version: {}", self.shred_version)
  137. }
  138. }
  139. impl solana_cli_output::VerboseDisplay for AdminRpcContactInfo {}
  140. impl solana_cli_output::QuietDisplay for AdminRpcContactInfo {}
  141. impl Display for AdminRpcRepairWhitelist {
  142. fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
  143. writeln!(f, "Repair whitelist: {:?}", &self.whitelist)
  144. }
  145. }
  146. impl solana_cli_output::VerboseDisplay for AdminRpcRepairWhitelist {}
  147. impl solana_cli_output::QuietDisplay for AdminRpcRepairWhitelist {}
  148. #[rpc]
  149. pub trait AdminRpc {
  150. type Metadata;
  151. /// Initiates validator exit; exit is asynchronous so the validator
  152. /// will almost certainly still be running when this method returns
  153. #[rpc(meta, name = "exit")]
  154. fn exit(&self, meta: Self::Metadata) -> Result<()>;
  155. /// Return the process id (pid)
  156. #[rpc(meta, name = "pid")]
  157. fn pid(&self, meta: Self::Metadata) -> Result<u32>;
  158. #[rpc(meta, name = "reloadPlugin")]
  159. fn reload_plugin(
  160. &self,
  161. meta: Self::Metadata,
  162. name: String,
  163. config_file: String,
  164. ) -> BoxFuture<Result<()>>;
  165. #[rpc(meta, name = "unloadPlugin")]
  166. fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture<Result<()>>;
  167. #[rpc(meta, name = "loadPlugin")]
  168. fn load_plugin(&self, meta: Self::Metadata, config_file: String) -> BoxFuture<Result<String>>;
  169. #[rpc(meta, name = "listPlugins")]
  170. fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture<Result<Vec<String>>>;
  171. #[rpc(meta, name = "rpcAddress")]
  172. fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>>;
  173. #[rpc(name = "setLogFilter")]
  174. fn set_log_filter(&self, filter: String) -> Result<()>;
  175. #[rpc(meta, name = "startTime")]
  176. fn start_time(&self, meta: Self::Metadata) -> Result<SystemTime>;
  177. #[rpc(meta, name = "startProgress")]
  178. fn start_progress(&self, meta: Self::Metadata) -> Result<ValidatorStartProgress>;
  179. #[rpc(meta, name = "addAuthorizedVoter")]
  180. fn add_authorized_voter(&self, meta: Self::Metadata, keypair_file: String) -> Result<()>;
  181. #[rpc(meta, name = "addAuthorizedVoterFromBytes")]
  182. fn add_authorized_voter_from_bytes(&self, meta: Self::Metadata, keypair: Vec<u8>)
  183. -> Result<()>;
  184. #[rpc(meta, name = "removeAllAuthorizedVoters")]
  185. fn remove_all_authorized_voters(&self, meta: Self::Metadata) -> Result<()>;
  186. #[rpc(meta, name = "setIdentity")]
  187. fn set_identity(
  188. &self,
  189. meta: Self::Metadata,
  190. keypair_file: String,
  191. require_tower: bool,
  192. ) -> Result<()>;
  193. #[rpc(meta, name = "setIdentityFromBytes")]
  194. fn set_identity_from_bytes(
  195. &self,
  196. meta: Self::Metadata,
  197. identity_keypair: Vec<u8>,
  198. require_tower: bool,
  199. ) -> Result<()>;
  200. #[rpc(meta, name = "setStakedNodesOverrides")]
  201. fn set_staked_nodes_overrides(&self, meta: Self::Metadata, path: String) -> Result<()>;
  202. #[rpc(meta, name = "contactInfo")]
  203. fn contact_info(&self, meta: Self::Metadata) -> Result<AdminRpcContactInfo>;
  204. #[rpc(meta, name = "selectActiveInterface")]
  205. fn select_active_interface(&self, meta: Self::Metadata, interface: IpAddr) -> Result<()>;
  206. #[rpc(meta, name = "repairShredFromPeer")]
  207. fn repair_shred_from_peer(
  208. &self,
  209. meta: Self::Metadata,
  210. pubkey: Option<Pubkey>,
  211. slot: u64,
  212. shred_index: u64,
  213. ) -> Result<()>;
  214. #[rpc(meta, name = "repairWhitelist")]
  215. fn repair_whitelist(&self, meta: Self::Metadata) -> Result<AdminRpcRepairWhitelist>;
  216. #[rpc(meta, name = "setRepairWhitelist")]
  217. fn set_repair_whitelist(&self, meta: Self::Metadata, whitelist: Vec<Pubkey>) -> Result<()>;
  218. #[rpc(meta, name = "getSecondaryIndexKeySize")]
  219. fn get_secondary_index_key_size(
  220. &self,
  221. meta: Self::Metadata,
  222. pubkey_str: String,
  223. ) -> Result<HashMap<RpcAccountIndex, usize>>;
  224. #[rpc(meta, name = "setPublicTpuAddress")]
  225. fn set_public_tpu_address(
  226. &self,
  227. meta: Self::Metadata,
  228. public_tpu_addr: SocketAddr,
  229. ) -> Result<()>;
  230. #[rpc(meta, name = "setPublicTpuForwardsAddress")]
  231. fn set_public_tpu_forwards_address(
  232. &self,
  233. meta: Self::Metadata,
  234. public_tpu_forwards_addr: SocketAddr,
  235. ) -> Result<()>;
  236. #[rpc(meta, name = "setPublicTvuAddress")]
  237. fn set_public_tvu_address(
  238. &self,
  239. meta: Self::Metadata,
  240. public_tvu_addr: SocketAddr,
  241. ) -> Result<()>;
  242. #[rpc(meta, name = "manageBlockProduction")]
  243. fn manage_block_production(
  244. &self,
  245. meta: Self::Metadata,
  246. block_production_method: BlockProductionMethod,
  247. transaction_struct: TransactionStructure,
  248. num_workers: NonZeroUsize,
  249. scheduler_pacing: SchedulerPacing,
  250. ) -> Result<()>;
  251. }
  252. pub struct AdminRpcImpl;
  253. impl AdminRpc for AdminRpcImpl {
  254. type Metadata = AdminRpcRequestMetadata;
  255. fn exit(&self, meta: Self::Metadata) -> Result<()> {
  256. debug!("exit admin rpc request received");
  257. thread::Builder::new()
  258. .name("solProcessExit".into())
  259. .spawn(move || {
  260. // Delay exit signal until this RPC request completes, otherwise the caller of `exit` might
  261. // receive a confusing error as the validator shuts down before a response is sent back.
  262. thread::sleep(Duration::from_millis(100));
  263. info!("validator exit requested");
  264. meta.validator_exit.write().unwrap().exit();
  265. if !meta.validator_exit_backpressure.is_empty() {
  266. let service_names = meta.validator_exit_backpressure.keys();
  267. info!("Wait for these services to complete: {service_names:?}");
  268. loop {
  269. // The initial sleep is a grace period to allow the services to raise their
  270. // backpressure flags.
  271. // Subsequent sleeps are to throttle how often we check and log.
  272. thread::sleep(Duration::from_secs(1));
  273. let mut any_flags_raised = false;
  274. for (name, flag) in meta.validator_exit_backpressure.iter() {
  275. let is_flag_raised = flag.load(Ordering::Relaxed);
  276. if is_flag_raised {
  277. info!("{name}'s exit backpressure flag is raised");
  278. any_flags_raised = true;
  279. }
  280. }
  281. if !any_flags_raised {
  282. break;
  283. }
  284. }
  285. info!("All services have completed");
  286. }
  287. // TODO: Debug why Exit doesn't always cause the validator to fully exit
  288. // (rocksdb background processing or some other stuck thread perhaps?).
  289. //
  290. // If the process is still alive after five seconds, exit harder
  291. thread::sleep(Duration::from_secs(
  292. env::var("SOLANA_VALIDATOR_EXIT_TIMEOUT")
  293. .ok()
  294. .and_then(|x| x.parse().ok())
  295. .unwrap_or(5),
  296. ));
  297. warn!("validator exit timeout");
  298. std::process::exit(0);
  299. })
  300. .unwrap();
  301. Ok(())
  302. }
  303. fn pid(&self, _meta: Self::Metadata) -> Result<u32> {
  304. Ok(std::process::id())
  305. }
  306. fn reload_plugin(
  307. &self,
  308. meta: Self::Metadata,
  309. name: String,
  310. config_file: String,
  311. ) -> BoxFuture<Result<()>> {
  312. Box::pin(async move {
  313. // Construct channel for plugin to respond to this particular rpc request instance
  314. let (response_sender, response_receiver) = oneshot_channel();
  315. // Send request to plugin manager if there is a geyser service
  316. if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
  317. rpc_to_manager_sender
  318. .send(GeyserPluginManagerRequest::ReloadPlugin {
  319. name,
  320. config_file,
  321. response_sender,
  322. })
  323. .expect("GeyerPluginService should never drop request receiver");
  324. } else {
  325. return Err(jsonrpc_core::Error {
  326. code: ErrorCode::InvalidRequest,
  327. message: "No geyser plugin service".to_string(),
  328. data: None,
  329. });
  330. }
  331. // Await response from plugin manager
  332. response_receiver
  333. .await
  334. .expect("GeyerPluginService's oneshot sender shouldn't drop early")
  335. })
  336. }
  337. fn load_plugin(&self, meta: Self::Metadata, config_file: String) -> BoxFuture<Result<String>> {
  338. Box::pin(async move {
  339. // Construct channel for plugin to respond to this particular rpc request instance
  340. let (response_sender, response_receiver) = oneshot_channel();
  341. // Send request to plugin manager if there is a geyser service
  342. if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
  343. rpc_to_manager_sender
  344. .send(GeyserPluginManagerRequest::LoadPlugin {
  345. config_file,
  346. response_sender,
  347. })
  348. .expect("GeyerPluginService should never drop request receiver");
  349. } else {
  350. return Err(jsonrpc_core::Error {
  351. code: ErrorCode::InvalidRequest,
  352. message: "No geyser plugin service".to_string(),
  353. data: None,
  354. });
  355. }
  356. // Await response from plugin manager
  357. response_receiver
  358. .await
  359. .expect("GeyerPluginService's oneshot sender shouldn't drop early")
  360. })
  361. }
  362. fn unload_plugin(&self, meta: Self::Metadata, name: String) -> BoxFuture<Result<()>> {
  363. Box::pin(async move {
  364. // Construct channel for plugin to respond to this particular rpc request instance
  365. let (response_sender, response_receiver) = oneshot_channel();
  366. // Send request to plugin manager if there is a geyser service
  367. if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
  368. rpc_to_manager_sender
  369. .send(GeyserPluginManagerRequest::UnloadPlugin {
  370. name,
  371. response_sender,
  372. })
  373. .expect("GeyerPluginService should never drop request receiver");
  374. } else {
  375. return Err(jsonrpc_core::Error {
  376. code: ErrorCode::InvalidRequest,
  377. message: "No geyser plugin service".to_string(),
  378. data: None,
  379. });
  380. }
  381. // Await response from plugin manager
  382. response_receiver
  383. .await
  384. .expect("GeyerPluginService's oneshot sender shouldn't drop early")
  385. })
  386. }
  387. fn list_plugins(&self, meta: Self::Metadata) -> BoxFuture<Result<Vec<String>>> {
  388. Box::pin(async move {
  389. // Construct channel for plugin to respond to this particular rpc request instance
  390. let (response_sender, response_receiver) = oneshot_channel();
  391. // Send request to plugin manager
  392. if let Some(ref rpc_to_manager_sender) = meta.rpc_to_plugin_manager_sender {
  393. rpc_to_manager_sender
  394. .send(GeyserPluginManagerRequest::ListPlugins { response_sender })
  395. .expect("GeyerPluginService should never drop request receiver");
  396. } else {
  397. return Err(jsonrpc_core::Error {
  398. code: ErrorCode::InvalidRequest,
  399. message: "No geyser plugin service".to_string(),
  400. data: None,
  401. });
  402. }
  403. // Await response from plugin manager
  404. response_receiver
  405. .await
  406. .expect("GeyerPluginService's oneshot sender shouldn't drop early")
  407. })
  408. }
  409. fn rpc_addr(&self, meta: Self::Metadata) -> Result<Option<SocketAddr>> {
  410. debug!("rpc_addr admin rpc request received");
  411. Ok(meta.rpc_addr)
  412. }
  413. fn set_log_filter(&self, filter: String) -> Result<()> {
  414. debug!("set_log_filter admin rpc request received");
  415. agave_logger::setup_with(&filter);
  416. Ok(())
  417. }
  418. fn start_time(&self, meta: Self::Metadata) -> Result<SystemTime> {
  419. debug!("start_time admin rpc request received");
  420. Ok(meta.start_time)
  421. }
  422. fn start_progress(&self, meta: Self::Metadata) -> Result<ValidatorStartProgress> {
  423. debug!("start_progress admin rpc request received");
  424. Ok(*meta.start_progress.read().unwrap())
  425. }
  426. fn add_authorized_voter(&self, meta: Self::Metadata, keypair_file: String) -> Result<()> {
  427. debug!("add_authorized_voter request received");
  428. let authorized_voter = read_keypair_file(keypair_file)
  429. .map_err(|err| jsonrpc_core::error::Error::invalid_params(format!("{err}")))?;
  430. AdminRpcImpl::add_authorized_voter_keypair(meta, authorized_voter)
  431. }
  432. fn add_authorized_voter_from_bytes(
  433. &self,
  434. meta: Self::Metadata,
  435. keypair: Vec<u8>,
  436. ) -> Result<()> {
  437. debug!("add_authorized_voter_from_bytes request received");
  438. let authorized_voter = Keypair::try_from(keypair.as_ref()).map_err(|err| {
  439. jsonrpc_core::error::Error::invalid_params(format!(
  440. "Failed to read authorized voter keypair from provided byte array: {err}"
  441. ))
  442. })?;
  443. AdminRpcImpl::add_authorized_voter_keypair(meta, authorized_voter)
  444. }
  445. fn remove_all_authorized_voters(&self, meta: Self::Metadata) -> Result<()> {
  446. debug!("remove_all_authorized_voters received");
  447. meta.authorized_voter_keypairs.write().unwrap().clear();
  448. Ok(())
  449. }
  450. fn set_identity(
  451. &self,
  452. meta: Self::Metadata,
  453. keypair_file: String,
  454. require_tower: bool,
  455. ) -> Result<()> {
  456. debug!("set_identity request received");
  457. let identity_keypair = read_keypair_file(&keypair_file).map_err(|err| {
  458. jsonrpc_core::error::Error::invalid_params(format!(
  459. "Failed to read identity keypair from {keypair_file}: {err}"
  460. ))
  461. })?;
  462. AdminRpcImpl::set_identity_keypair(meta, identity_keypair, require_tower)
  463. }
  464. fn set_identity_from_bytes(
  465. &self,
  466. meta: Self::Metadata,
  467. identity_keypair: Vec<u8>,
  468. require_tower: bool,
  469. ) -> Result<()> {
  470. debug!("set_identity_from_bytes request received");
  471. let identity_keypair = Keypair::try_from(identity_keypair.as_ref()).map_err(|err| {
  472. jsonrpc_core::error::Error::invalid_params(format!(
  473. "Failed to read identity keypair from provided byte array: {err}"
  474. ))
  475. })?;
  476. AdminRpcImpl::set_identity_keypair(meta, identity_keypair, require_tower)
  477. }
  478. fn set_staked_nodes_overrides(&self, meta: Self::Metadata, path: String) -> Result<()> {
  479. let loaded_config = load_staked_nodes_overrides(&path)
  480. .map_err(|err| {
  481. error!(
  482. "Failed to load staked nodes overrides from {}: {}",
  483. &path, err
  484. );
  485. jsonrpc_core::error::Error::internal_error()
  486. })?
  487. .staked_map_id;
  488. let mut write_staked_nodes = meta.staked_nodes_overrides.write().unwrap();
  489. write_staked_nodes.clear();
  490. write_staked_nodes.extend(loaded_config);
  491. info!("Staked nodes overrides loaded from {path}");
  492. debug!("overrides map: {write_staked_nodes:?}");
  493. Ok(())
  494. }
  495. fn contact_info(&self, meta: Self::Metadata) -> Result<AdminRpcContactInfo> {
  496. meta.with_post_init(|post_init| Ok(post_init.cluster_info.my_contact_info().into()))
  497. }
  498. fn select_active_interface(&self, meta: Self::Metadata, interface: IpAddr) -> Result<()> {
  499. debug!("select_active_interface received: {interface}");
  500. meta.with_post_init(|post_init| {
  501. let node = post_init.node.as_ref().ok_or_else(|| {
  502. jsonrpc_core::Error::invalid_params("`Node` not initialized in post_init")
  503. })?;
  504. node.switch_active_interface(interface, &post_init.cluster_info)
  505. .map_err(|e| {
  506. jsonrpc_core::Error::invalid_params(format!(
  507. "Switching failed due to error {e}"
  508. ))
  509. })?;
  510. info!("Switched primary interface to {interface}");
  511. Ok(())
  512. })
  513. }
  514. fn repair_shred_from_peer(
  515. &self,
  516. meta: Self::Metadata,
  517. pubkey: Option<Pubkey>,
  518. slot: u64,
  519. shred_index: u64,
  520. ) -> Result<()> {
  521. debug!("repair_shred_from_peer request received");
  522. meta.with_post_init(|post_init| {
  523. repair_service::RepairService::request_repair_for_shred_from_peer(
  524. post_init.cluster_info.clone(),
  525. post_init.cluster_slots.clone(),
  526. pubkey,
  527. slot,
  528. shred_index,
  529. &post_init.repair_socket.clone(),
  530. post_init.outstanding_repair_requests.clone(),
  531. );
  532. Ok(())
  533. })
  534. }
  535. fn repair_whitelist(&self, meta: Self::Metadata) -> Result<AdminRpcRepairWhitelist> {
  536. debug!("repair_whitelist request received");
  537. meta.with_post_init(|post_init| {
  538. let whitelist: Vec<_> = post_init
  539. .repair_whitelist
  540. .read()
  541. .unwrap()
  542. .iter()
  543. .copied()
  544. .collect();
  545. Ok(AdminRpcRepairWhitelist { whitelist })
  546. })
  547. }
  548. fn set_repair_whitelist(&self, meta: Self::Metadata, whitelist: Vec<Pubkey>) -> Result<()> {
  549. debug!("set_repair_whitelist request received");
  550. let whitelist: HashSet<Pubkey> = whitelist.into_iter().collect();
  551. meta.with_post_init(|post_init| {
  552. *post_init.repair_whitelist.write().unwrap() = whitelist;
  553. warn!(
  554. "Repair whitelist set to {:?}",
  555. &post_init.repair_whitelist.read().unwrap()
  556. );
  557. Ok(())
  558. })
  559. }
  560. fn get_secondary_index_key_size(
  561. &self,
  562. meta: Self::Metadata,
  563. pubkey_str: String,
  564. ) -> Result<HashMap<RpcAccountIndex, usize>> {
  565. debug!("get_secondary_index_key_size rpc request received: {pubkey_str:?}");
  566. let index_key = verify_pubkey(&pubkey_str)?;
  567. meta.with_post_init(|post_init| {
  568. let bank = post_init.bank_forks.read().unwrap().root_bank();
  569. // Take ref to enabled AccountSecondaryIndexes
  570. let enabled_account_indexes = &bank.accounts().accounts_db.account_indexes;
  571. // Exit if secondary indexes are not enabled
  572. if enabled_account_indexes.is_empty() {
  573. debug!("get_secondary_index_key_size: secondary index not enabled.");
  574. return Ok(HashMap::new());
  575. };
  576. // Make sure the requested key is not explicitly excluded
  577. if !enabled_account_indexes.include_key(&index_key) {
  578. return Err(RpcCustomError::KeyExcludedFromSecondaryIndex {
  579. index_key: index_key.to_string(),
  580. }
  581. .into());
  582. }
  583. // Grab a ref to the AccountsDbfor this Bank
  584. let accounts_index = &bank.accounts().accounts_db.accounts_index;
  585. // Find the size of the key in every index where it exists
  586. let found_sizes = enabled_account_indexes
  587. .indexes
  588. .iter()
  589. .filter_map(|index| {
  590. accounts_index
  591. .get_index_key_size(index, &index_key)
  592. .map(|size| (rpc_account_index_from_account_index(index), size))
  593. })
  594. .collect::<HashMap<_, _>>();
  595. // Note: Will return an empty HashMap if no keys are found.
  596. if found_sizes.is_empty() {
  597. debug!("get_secondary_index_key_size: key not found in the secondary index.");
  598. }
  599. Ok(found_sizes)
  600. })
  601. }
  602. fn set_public_tpu_address(
  603. &self,
  604. meta: Self::Metadata,
  605. public_tpu_addr: SocketAddr,
  606. ) -> Result<()> {
  607. debug!("set_public_tpu_address rpc request received: {public_tpu_addr}");
  608. meta.with_post_init(|post_init| {
  609. post_init
  610. .cluster_info
  611. .my_contact_info()
  612. .tpu(Protocol::QUIC)
  613. .ok_or_else(|| {
  614. error!(
  615. "The public TPU QUIC address isn't being published. The node is likely in \
  616. repair mode. See help for --restricted-repair-only-mode for more \
  617. information."
  618. );
  619. jsonrpc_core::error::Error::internal_error()
  620. })?;
  621. post_init
  622. .cluster_info
  623. .set_tpu_quic(public_tpu_addr)
  624. .map_err(|err| {
  625. error!("Failed to set public TPU QUIC address to {public_tpu_addr}: {err}");
  626. jsonrpc_core::error::Error::internal_error()
  627. })?;
  628. let my_contact_info = post_init.cluster_info.my_contact_info();
  629. warn!(
  630. "Public TPU addresses set to {:?} (quic)",
  631. my_contact_info.tpu(Protocol::QUIC),
  632. );
  633. Ok(())
  634. })
  635. }
  636. fn set_public_tpu_forwards_address(
  637. &self,
  638. meta: Self::Metadata,
  639. public_tpu_forwards_addr: SocketAddr,
  640. ) -> Result<()> {
  641. debug!("set_public_tpu_forwards_address rpc request received: {public_tpu_forwards_addr}");
  642. meta.with_post_init(|post_init| {
  643. post_init
  644. .cluster_info
  645. .my_contact_info()
  646. .tpu_forwards(Protocol::QUIC)
  647. .ok_or_else(|| {
  648. error!(
  649. "The public TPU Forwards address isn't being published. The node is \
  650. likely in repair mode. See help for --restricted-repair-only-mode for \
  651. more information."
  652. );
  653. jsonrpc_core::error::Error::internal_error()
  654. })?;
  655. post_init
  656. .cluster_info
  657. .set_tpu_forwards_quic(public_tpu_forwards_addr)
  658. .map_err(|err| {
  659. error!(
  660. "Failed to set public TPU QUIC address to {public_tpu_forwards_addr}: \
  661. {err}"
  662. );
  663. jsonrpc_core::error::Error::internal_error()
  664. })?;
  665. let my_contact_info = post_init.cluster_info.my_contact_info();
  666. warn!(
  667. "Public TPU Forwards address set to {:?} (quic)",
  668. my_contact_info.tpu_forwards(Protocol::QUIC),
  669. );
  670. Ok(())
  671. })
  672. }
  673. fn set_public_tvu_address(
  674. &self,
  675. meta: Self::Metadata,
  676. public_tvu_addr: SocketAddr,
  677. ) -> Result<()> {
  678. debug!("set_public_tvu_address rpc request received: {public_tvu_addr}");
  679. meta.with_post_init(|post_init| {
  680. post_init
  681. .cluster_info
  682. .my_contact_info()
  683. .tvu(Protocol::UDP)
  684. .ok_or_else(|| {
  685. error!(
  686. "The public TVU address isn't being published. The node is likely in \
  687. repair mode. See help for --restricted-repair-only-mode for more \
  688. information."
  689. );
  690. jsonrpc_core::error::Error::internal_error()
  691. })?;
  692. post_init
  693. .cluster_info
  694. .set_tvu_socket(public_tvu_addr)
  695. .map_err(|err| {
  696. error!("Failed to set public TVU address to {public_tvu_addr}: {err}");
  697. jsonrpc_core::error::Error::internal_error()
  698. })?;
  699. let my_contact_info = post_init.cluster_info.my_contact_info();
  700. warn!(
  701. "Public TVU addresses set to {:?}",
  702. my_contact_info.tvu(Protocol::UDP),
  703. );
  704. Ok(())
  705. })
  706. }
  707. fn manage_block_production(
  708. &self,
  709. meta: Self::Metadata,
  710. block_production_method: BlockProductionMethod,
  711. transaction_struct: TransactionStructure,
  712. num_workers: NonZeroUsize,
  713. scheduler_pacing: SchedulerPacing,
  714. ) -> Result<()> {
  715. debug!("manage_block_production rpc request received");
  716. if num_workers > BankingStage::max_num_workers() {
  717. return Err(jsonrpc_core::error::Error::invalid_params(format!(
  718. "Number of workers ({}) exceeds maximum allowed ({})",
  719. num_workers,
  720. BankingStage::max_num_workers()
  721. )));
  722. }
  723. if transaction_struct != TransactionStructure::View {
  724. warn!("TransactionStructure::Sdk has no effect on block production");
  725. }
  726. meta.with_post_init(|post_init| {
  727. if post_init
  728. .banking_control_sender
  729. .try_send(BankingControlMsg::Internal {
  730. block_production_method,
  731. num_workers,
  732. config: SchedulerConfig { scheduler_pacing },
  733. })
  734. .is_err()
  735. {
  736. error!("Banking stage already switching schedulers");
  737. return Err(jsonrpc_core::error::Error::internal_error());
  738. }
  739. Ok(())
  740. })
  741. }
  742. }
  743. impl AdminRpcImpl {
  744. fn add_authorized_voter_keypair(
  745. meta: AdminRpcRequestMetadata,
  746. authorized_voter: Keypair,
  747. ) -> Result<()> {
  748. let mut authorized_voter_keypairs = meta.authorized_voter_keypairs.write().unwrap();
  749. if authorized_voter_keypairs
  750. .iter()
  751. .any(|x| x.pubkey() == authorized_voter.pubkey())
  752. {
  753. Err(jsonrpc_core::error::Error::invalid_params(
  754. "Authorized voter already present",
  755. ))
  756. } else {
  757. authorized_voter_keypairs.push(Arc::new(authorized_voter));
  758. Ok(())
  759. }
  760. }
  761. fn set_identity_keypair(
  762. meta: AdminRpcRequestMetadata,
  763. identity_keypair: Keypair,
  764. require_tower: bool,
  765. ) -> Result<()> {
  766. meta.with_post_init(|post_init| {
  767. if require_tower {
  768. let _ = Tower::restore(meta.tower_storage.as_ref(), &identity_keypair.pubkey())
  769. .map_err(|err| {
  770. jsonrpc_core::error::Error::invalid_params(format!(
  771. "Unable to load tower file for identity {}: {}",
  772. identity_keypair.pubkey(),
  773. err
  774. ))
  775. })?;
  776. }
  777. for (key, notifier) in &*post_init.notifies.read().unwrap() {
  778. if let Err(err) = notifier.update_key(&identity_keypair) {
  779. error!("Error updating network layer keypair: {err} on {key:?}");
  780. }
  781. }
  782. solana_metrics::set_host_id(identity_keypair.pubkey().to_string());
  783. post_init
  784. .cluster_info
  785. .set_keypair(Arc::new(identity_keypair));
  786. warn!("Identity set to {}", post_init.cluster_info.id());
  787. Ok(())
  788. })
  789. }
  790. }
  791. fn rpc_account_index_from_account_index(account_index: &AccountIndex) -> RpcAccountIndex {
  792. match account_index {
  793. AccountIndex::ProgramId => RpcAccountIndex::ProgramId,
  794. AccountIndex::SplTokenOwner => RpcAccountIndex::SplTokenOwner,
  795. AccountIndex::SplTokenMint => RpcAccountIndex::SplTokenMint,
  796. }
  797. }
  798. // Start the Admin RPC interface
  799. pub fn run(ledger_path: &Path, metadata: AdminRpcRequestMetadata) {
  800. let admin_rpc_path = admin_rpc_path(ledger_path);
  801. let event_loop = tokio::runtime::Builder::new_multi_thread()
  802. .thread_name("solAdminRpcEl")
  803. .worker_threads(3) // Three still seems like a lot, and better than the default of available core count
  804. .enable_all()
  805. .build()
  806. .unwrap();
  807. Builder::new()
  808. .name("solAdminRpc".to_string())
  809. .spawn(move || {
  810. let mut io = MetaIoHandler::default();
  811. io.extend_with(AdminRpcImpl.to_delegate());
  812. let validator_exit = metadata.validator_exit.clone();
  813. let server = ServerBuilder::with_meta_extractor(io, move |_req: &RequestContext| {
  814. metadata.clone()
  815. })
  816. .event_loop_executor(event_loop.handle().clone())
  817. .start(&format!("{}", admin_rpc_path.display()));
  818. match server {
  819. Err(err) => {
  820. warn!("Unable to start admin rpc service: {err:?}");
  821. }
  822. Ok(server) => {
  823. info!("started admin rpc service!");
  824. let close_handle = server.close_handle();
  825. validator_exit
  826. .write()
  827. .unwrap()
  828. .register_exit(Box::new(move || {
  829. close_handle.close();
  830. }));
  831. server.wait();
  832. }
  833. }
  834. })
  835. .unwrap();
  836. }
  837. fn admin_rpc_path(ledger_path: &Path) -> PathBuf {
  838. #[cfg(target_family = "windows")]
  839. {
  840. // More information about the wackiness of pipe names over at
  841. // https://docs.microsoft.com/en-us/windows/win32/ipc/pipe-names
  842. if let Some(ledger_filename) = ledger_path.file_name() {
  843. PathBuf::from(format!(
  844. "\\\\.\\pipe\\{}-admin.rpc",
  845. ledger_filename.to_string_lossy()
  846. ))
  847. } else {
  848. PathBuf::from("\\\\.\\pipe\\admin.rpc")
  849. }
  850. }
  851. #[cfg(not(target_family = "windows"))]
  852. {
  853. ledger_path.join("admin.rpc")
  854. }
  855. }
  856. // Connect to the Admin RPC interface
  857. pub async fn connect(ledger_path: &Path) -> std::result::Result<gen_client::Client, RpcError> {
  858. let admin_rpc_path = admin_rpc_path(ledger_path);
  859. if !admin_rpc_path.exists() {
  860. Err(RpcError::Client(format!(
  861. "{} does not exist",
  862. admin_rpc_path.display()
  863. )))
  864. } else {
  865. ipc::connect::<_, gen_client::Client>(&format!("{}", admin_rpc_path.display())).await
  866. }
  867. }
  868. // Create a runtime for use by client side admin RPC interface calls
  869. pub fn runtime() -> Runtime {
  870. tokio::runtime::Builder::new_multi_thread()
  871. .thread_name("solAdminRpcRt")
  872. .enable_all()
  873. // The agave-validator subcommands make few admin RPC calls and block
  874. // on the results so two workers is plenty
  875. .worker_threads(2)
  876. .build()
  877. .expect("new tokio runtime")
  878. }
  879. #[derive(Default, Deserialize, Clone)]
  880. pub struct StakedNodesOverrides {
  881. #[serde(deserialize_with = "deserialize_pubkey_map")]
  882. pub staked_map_id: HashMap<Pubkey, u64>,
  883. }
  884. pub fn deserialize_pubkey_map<'de, D>(des: D) -> std::result::Result<HashMap<Pubkey, u64>, D::Error>
  885. where
  886. D: Deserializer<'de>,
  887. {
  888. let container: HashMap<String, u64> = serde::Deserialize::deserialize(des)?;
  889. let mut container_typed: HashMap<Pubkey, u64> = HashMap::new();
  890. for (key, value) in container.iter() {
  891. let typed_key = Pubkey::try_from(key.as_str())
  892. .map_err(|_| serde::de::Error::invalid_type(serde::de::Unexpected::Map, &"PubKey"))?;
  893. container_typed.insert(typed_key, *value);
  894. }
  895. Ok(container_typed)
  896. }
  897. pub fn load_staked_nodes_overrides(
  898. path: &String,
  899. ) -> std::result::Result<StakedNodesOverrides, Box<dyn error::Error>> {
  900. debug!("Loading staked nodes overrides configuration from {path}");
  901. if Path::new(&path).exists() {
  902. let file = std::fs::File::open(path)?;
  903. Ok(serde_yaml::from_reader(file)?)
  904. } else {
  905. Err(format!("Staked nodes overrides provided '{path}' a non-existing file path.").into())
  906. }
  907. }
  908. #[cfg(test)]
  909. mod tests {
  910. use {
  911. super::*,
  912. serde_json::Value,
  913. solana_account::{Account, AccountSharedData},
  914. solana_accounts_db::{
  915. accounts_db::{AccountsDbConfig, ACCOUNTS_DB_CONFIG_FOR_TESTING},
  916. accounts_index::AccountSecondaryIndexes,
  917. },
  918. solana_core::{
  919. admin_rpc_post_init::{KeyUpdaterType, KeyUpdaters},
  920. consensus::tower_storage::NullTowerStorage,
  921. validator::{Validator, ValidatorConfig, ValidatorTpuConfig},
  922. },
  923. solana_gossip::{cluster_info::ClusterInfo, node::Node},
  924. solana_ledger::{
  925. create_new_tmp_ledger,
  926. genesis_utils::{
  927. create_genesis_config, create_genesis_config_with_leader, GenesisConfigInfo,
  928. },
  929. },
  930. solana_net_utils::{sockets::bind_to_localhost_unique, SocketAddrSpace},
  931. solana_program_option::COption,
  932. solana_program_pack::Pack,
  933. solana_pubkey::Pubkey,
  934. solana_rpc::rpc::create_validator_exit,
  935. solana_runtime::{
  936. bank::{Bank, BankTestConfig},
  937. bank_forks::BankForks,
  938. },
  939. solana_system_interface::program as system_program,
  940. solana_tpu_client::tpu_client::DEFAULT_TPU_ENABLE_UDP,
  941. spl_generic_token::token,
  942. spl_token_2022_interface::state::{
  943. Account as TokenAccount, AccountState as TokenAccountState, Mint,
  944. },
  945. std::{collections::HashSet, fs::remove_dir_all, sync::atomic::AtomicBool},
  946. tokio::sync::mpsc,
  947. };
  948. #[derive(Default)]
  949. struct TestConfig {
  950. account_indexes: AccountSecondaryIndexes,
  951. }
  952. struct RpcHandler {
  953. io: MetaIoHandler<AdminRpcRequestMetadata>,
  954. meta: AdminRpcRequestMetadata,
  955. bank_forks: Arc<RwLock<BankForks>>,
  956. }
  957. impl RpcHandler {
  958. fn _start() -> Self {
  959. Self::start_with_config(TestConfig::default())
  960. }
  961. fn start_with_config(config: TestConfig) -> Self {
  962. let keypair = Arc::new(Keypair::new());
  963. let cluster_info = Arc::new(ClusterInfo::new(
  964. ContactInfo::new(
  965. keypair.pubkey(),
  966. solana_time_utils::timestamp(), // wallclock
  967. 0u16, // shred_version
  968. ),
  969. keypair,
  970. SocketAddrSpace::Unspecified,
  971. ));
  972. let exit = Arc::new(AtomicBool::new(false));
  973. let validator_exit = create_validator_exit(exit);
  974. let (bank_forks, vote_keypair) = new_bank_forks_with_config(BankTestConfig {
  975. accounts_db_config: AccountsDbConfig {
  976. account_indexes: Some(config.account_indexes),
  977. ..ACCOUNTS_DB_CONFIG_FOR_TESTING
  978. },
  979. });
  980. let vote_account = vote_keypair.pubkey();
  981. let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
  982. let repair_whitelist = Arc::new(RwLock::new(HashSet::new()));
  983. let meta = AdminRpcRequestMetadata {
  984. rpc_addr: None,
  985. start_time: SystemTime::now(),
  986. start_progress,
  987. validator_exit,
  988. validator_exit_backpressure: HashMap::default(),
  989. authorized_voter_keypairs: Arc::new(RwLock::new(vec![vote_keypair])),
  990. tower_storage: Arc::new(NullTowerStorage {}),
  991. post_init: Arc::new(RwLock::new(Some(AdminRpcRequestMetadataPostInit {
  992. cluster_info,
  993. bank_forks: bank_forks.clone(),
  994. vote_account,
  995. repair_whitelist,
  996. notifies: Arc::new(RwLock::new(KeyUpdaters::default())),
  997. repair_socket: Arc::new(bind_to_localhost_unique().expect("should bind")),
  998. outstanding_repair_requests: Arc::<
  999. RwLock<repair_service::OutstandingShredRepairs>,
  1000. >::default(),
  1001. cluster_slots: Arc::new(
  1002. solana_core::cluster_slots_service::cluster_slots::ClusterSlots::default_for_tests(),
  1003. ),
  1004. node: None,
  1005. banking_control_sender: mpsc::channel(1).0,
  1006. }))),
  1007. staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
  1008. rpc_to_plugin_manager_sender: None,
  1009. };
  1010. let mut io = MetaIoHandler::default();
  1011. io.extend_with(AdminRpcImpl.to_delegate());
  1012. Self {
  1013. io,
  1014. meta,
  1015. bank_forks,
  1016. }
  1017. }
  1018. fn root_bank(&self) -> Arc<Bank> {
  1019. self.bank_forks.read().unwrap().root_bank()
  1020. }
  1021. }
  1022. fn new_bank_forks_with_config(
  1023. config: BankTestConfig,
  1024. ) -> (Arc<RwLock<BankForks>>, Arc<Keypair>) {
  1025. let GenesisConfigInfo {
  1026. genesis_config,
  1027. voting_keypair,
  1028. ..
  1029. } = create_genesis_config(1_000_000_000);
  1030. let bank = Bank::new_with_config_for_tests(&genesis_config, config);
  1031. (BankForks::new_rw_arc(bank), Arc::new(voting_keypair))
  1032. }
  1033. #[test]
  1034. fn test_secondary_index_key_sizes() {
  1035. for secondary_index_enabled in [true, false] {
  1036. let account_indexes = if secondary_index_enabled {
  1037. AccountSecondaryIndexes {
  1038. keys: None,
  1039. indexes: HashSet::from([
  1040. AccountIndex::ProgramId,
  1041. AccountIndex::SplTokenMint,
  1042. AccountIndex::SplTokenOwner,
  1043. ]),
  1044. }
  1045. } else {
  1046. AccountSecondaryIndexes::default()
  1047. };
  1048. // RPC & Bank Setup
  1049. let rpc = RpcHandler::start_with_config(TestConfig { account_indexes });
  1050. let bank = rpc.root_bank();
  1051. let RpcHandler { io, meta, .. } = rpc;
  1052. // Pubkeys
  1053. let token_account1_pubkey = Pubkey::new_unique();
  1054. let token_account2_pubkey = Pubkey::new_unique();
  1055. let token_account3_pubkey = Pubkey::new_unique();
  1056. let mint1_pubkey = Pubkey::new_unique();
  1057. let mint2_pubkey = Pubkey::new_unique();
  1058. let wallet1_pubkey = Pubkey::new_unique();
  1059. let wallet2_pubkey = Pubkey::new_unique();
  1060. let non_existent_pubkey = Pubkey::new_unique();
  1061. let delegate = Pubkey::new_unique();
  1062. let mut num_default_spl_token_program_accounts = 0;
  1063. let mut num_default_system_program_accounts = 0;
  1064. if !secondary_index_enabled {
  1065. // Test first with no accounts added & no secondary indexes enabled:
  1066. let req = format!(
  1067. r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{token_account1_pubkey}"]}}"#,
  1068. );
  1069. let res = io.handle_request_sync(&req, meta.clone());
  1070. let result: Value = serde_json::from_str(&res.expect("actual response"))
  1071. .expect("actual response deserialization");
  1072. let sizes: HashMap<RpcAccountIndex, usize> =
  1073. serde_json::from_value(result["result"].clone()).unwrap();
  1074. assert!(sizes.is_empty());
  1075. } else {
  1076. // Count SPL Token Program Default Accounts
  1077. let req = format!(
  1078. r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
  1079. token::id(),
  1080. );
  1081. let res = io.handle_request_sync(&req, meta.clone());
  1082. let result: Value = serde_json::from_str(&res.expect("actual response"))
  1083. .expect("actual response deserialization");
  1084. let sizes: HashMap<RpcAccountIndex, usize> =
  1085. serde_json::from_value(result["result"].clone()).unwrap();
  1086. assert_eq!(sizes.len(), 1);
  1087. num_default_spl_token_program_accounts =
  1088. *sizes.get(&RpcAccountIndex::ProgramId).unwrap();
  1089. // Count System Program Default Accounts
  1090. let req = format!(
  1091. r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
  1092. system_program::id(),
  1093. );
  1094. let res = io.handle_request_sync(&req, meta.clone());
  1095. let result: Value = serde_json::from_str(&res.expect("actual response"))
  1096. .expect("actual response deserialization");
  1097. let sizes: HashMap<RpcAccountIndex, usize> =
  1098. serde_json::from_value(result["result"].clone()).unwrap();
  1099. assert_eq!(sizes.len(), 1);
  1100. num_default_system_program_accounts =
  1101. *sizes.get(&RpcAccountIndex::ProgramId).unwrap();
  1102. }
  1103. // Add 2 basic wallet accounts
  1104. let wallet1_account = AccountSharedData::from(Account {
  1105. lamports: 11111111,
  1106. owner: system_program::id(),
  1107. ..Account::default()
  1108. });
  1109. bank.store_account(&wallet1_pubkey, &wallet1_account);
  1110. let wallet2_account = AccountSharedData::from(Account {
  1111. lamports: 11111111,
  1112. owner: system_program::id(),
  1113. ..Account::default()
  1114. });
  1115. bank.store_account(&wallet2_pubkey, &wallet2_account);
  1116. // Add a token account
  1117. let mut account1_data = vec![0; TokenAccount::get_packed_len()];
  1118. let token_account1 = TokenAccount {
  1119. mint: mint1_pubkey,
  1120. owner: wallet1_pubkey,
  1121. delegate: COption::Some(delegate),
  1122. amount: 420,
  1123. state: TokenAccountState::Initialized,
  1124. is_native: COption::None,
  1125. delegated_amount: 30,
  1126. close_authority: COption::Some(wallet1_pubkey),
  1127. };
  1128. TokenAccount::pack(token_account1, &mut account1_data).unwrap();
  1129. let token_account1 = AccountSharedData::from(Account {
  1130. lamports: 111,
  1131. data: account1_data.to_vec(),
  1132. owner: token::id(),
  1133. ..Account::default()
  1134. });
  1135. bank.store_account(&token_account1_pubkey, &token_account1);
  1136. // Add the mint
  1137. let mut mint1_data = vec![0; Mint::get_packed_len()];
  1138. let mint1_state = Mint {
  1139. mint_authority: COption::Some(wallet1_pubkey),
  1140. supply: 500,
  1141. decimals: 2,
  1142. is_initialized: true,
  1143. freeze_authority: COption::Some(wallet1_pubkey),
  1144. };
  1145. Mint::pack(mint1_state, &mut mint1_data).unwrap();
  1146. let mint_account1 = AccountSharedData::from(Account {
  1147. lamports: 222,
  1148. data: mint1_data.to_vec(),
  1149. owner: token::id(),
  1150. ..Account::default()
  1151. });
  1152. bank.store_account(&mint1_pubkey, &mint_account1);
  1153. // Add another token account with the different owner, but same delegate, and mint
  1154. let mut account2_data = vec![0; TokenAccount::get_packed_len()];
  1155. let token_account2 = TokenAccount {
  1156. mint: mint1_pubkey,
  1157. owner: wallet2_pubkey,
  1158. delegate: COption::Some(delegate),
  1159. amount: 420,
  1160. state: TokenAccountState::Initialized,
  1161. is_native: COption::None,
  1162. delegated_amount: 30,
  1163. close_authority: COption::Some(wallet2_pubkey),
  1164. };
  1165. TokenAccount::pack(token_account2, &mut account2_data).unwrap();
  1166. let token_account2 = AccountSharedData::from(Account {
  1167. lamports: 333,
  1168. data: account2_data.to_vec(),
  1169. owner: token::id(),
  1170. ..Account::default()
  1171. });
  1172. bank.store_account(&token_account2_pubkey, &token_account2);
  1173. // Add another token account with the same owner and delegate but different mint
  1174. let mut account3_data = vec![0; TokenAccount::get_packed_len()];
  1175. let token_account3 = TokenAccount {
  1176. mint: mint2_pubkey,
  1177. owner: wallet2_pubkey,
  1178. delegate: COption::Some(delegate),
  1179. amount: 42,
  1180. state: TokenAccountState::Initialized,
  1181. is_native: COption::None,
  1182. delegated_amount: 30,
  1183. close_authority: COption::Some(wallet2_pubkey),
  1184. };
  1185. TokenAccount::pack(token_account3, &mut account3_data).unwrap();
  1186. let token_account3 = AccountSharedData::from(Account {
  1187. lamports: 444,
  1188. data: account3_data.to_vec(),
  1189. owner: token::id(),
  1190. ..Account::default()
  1191. });
  1192. bank.store_account(&token_account3_pubkey, &token_account3);
  1193. // Add the new mint
  1194. let mut mint2_data = vec![0; Mint::get_packed_len()];
  1195. let mint2_state = Mint {
  1196. mint_authority: COption::Some(wallet2_pubkey),
  1197. supply: 200,
  1198. decimals: 3,
  1199. is_initialized: true,
  1200. freeze_authority: COption::Some(wallet2_pubkey),
  1201. };
  1202. Mint::pack(mint2_state, &mut mint2_data).unwrap();
  1203. let mint_account2 = AccountSharedData::from(Account {
  1204. lamports: 555,
  1205. data: mint2_data.to_vec(),
  1206. owner: token::id(),
  1207. ..Account::default()
  1208. });
  1209. bank.store_account(&mint2_pubkey, &mint_account2);
  1210. // Accounts should now look like the following:
  1211. //
  1212. // -----system_program------
  1213. // / \
  1214. // /-(owns) \-(owns)
  1215. // / \
  1216. // wallet1 ---wallet2---
  1217. // / / \
  1218. // /-(SPL::owns) /-(SPL::owns) \-(SPL::owns)
  1219. // / / \
  1220. // token_account1 token_account2 token_account3
  1221. // \ / /
  1222. // \-(SPL::mint) /-(SPL::mint) /-(SPL::mint)
  1223. // \ / /
  1224. // --mint_account1-- mint_account2
  1225. if secondary_index_enabled {
  1226. // ----------- Test for a non-existent key -----------
  1227. let req = format!(
  1228. r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{non_existent_pubkey}"]}}"#,
  1229. );
  1230. let res = io.handle_request_sync(&req, meta.clone());
  1231. let result: Value = serde_json::from_str(&res.expect("actual response"))
  1232. .expect("actual response deserialization");
  1233. let sizes: HashMap<RpcAccountIndex, usize> =
  1234. serde_json::from_value(result["result"].clone()).unwrap();
  1235. assert!(sizes.is_empty());
  1236. // --------------- Test Queries ---------------
  1237. // 1) Wallet1 - Owns 1 SPL Token
  1238. let req = format!(
  1239. r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{wallet1_pubkey}"]}}"#,
  1240. );
  1241. let res = io.handle_request_sync(&req, meta.clone());
  1242. let result: Value = serde_json::from_str(&res.expect("actual response"))
  1243. .expect("actual response deserialization");
  1244. let sizes: HashMap<RpcAccountIndex, usize> =
  1245. serde_json::from_value(result["result"].clone()).unwrap();
  1246. assert_eq!(sizes.len(), 1);
  1247. assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenOwner).unwrap(), 1);
  1248. // 2) Wallet2 - Owns 2 SPL Tokens
  1249. let req = format!(
  1250. r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{wallet2_pubkey}"]}}"#,
  1251. );
  1252. let res = io.handle_request_sync(&req, meta.clone());
  1253. let result: Value = serde_json::from_str(&res.expect("actual response"))
  1254. .expect("actual response deserialization");
  1255. let sizes: HashMap<RpcAccountIndex, usize> =
  1256. serde_json::from_value(result["result"].clone()).unwrap();
  1257. assert_eq!(sizes.len(), 1);
  1258. assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenOwner).unwrap(), 2);
  1259. // 3) Mint1 - Is in 2 SPL Accounts
  1260. let req = format!(
  1261. r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{mint1_pubkey}"]}}"#,
  1262. );
  1263. let res = io.handle_request_sync(&req, meta.clone());
  1264. let result: Value = serde_json::from_str(&res.expect("actual response"))
  1265. .expect("actual response deserialization");
  1266. let sizes: HashMap<RpcAccountIndex, usize> =
  1267. serde_json::from_value(result["result"].clone()).unwrap();
  1268. assert_eq!(sizes.len(), 1);
  1269. assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenMint).unwrap(), 2);
  1270. // 4) Mint2 - Is in 1 SPL Account
  1271. let req = format!(
  1272. r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{mint2_pubkey}"]}}"#,
  1273. );
  1274. let res = io.handle_request_sync(&req, meta.clone());
  1275. let result: Value = serde_json::from_str(&res.expect("actual response"))
  1276. .expect("actual response deserialization");
  1277. let sizes: HashMap<RpcAccountIndex, usize> =
  1278. serde_json::from_value(result["result"].clone()).unwrap();
  1279. assert_eq!(sizes.len(), 1);
  1280. assert_eq!(*sizes.get(&RpcAccountIndex::SplTokenMint).unwrap(), 1);
  1281. // 5) SPL Token Program Owns 6 Accounts - 1 Default, 5 created above.
  1282. let req = format!(
  1283. r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
  1284. token::id(),
  1285. );
  1286. let res = io.handle_request_sync(&req, meta.clone());
  1287. let result: Value = serde_json::from_str(&res.expect("actual response"))
  1288. .expect("actual response deserialization");
  1289. let sizes: HashMap<RpcAccountIndex, usize> =
  1290. serde_json::from_value(result["result"].clone()).unwrap();
  1291. assert_eq!(sizes.len(), 1);
  1292. assert_eq!(
  1293. *sizes.get(&RpcAccountIndex::ProgramId).unwrap(),
  1294. (num_default_spl_token_program_accounts + 5)
  1295. );
  1296. // 5) System Program Owns 4 Accounts + 2 Default, 2 created above.
  1297. let req = format!(
  1298. r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{}"]}}"#,
  1299. system_program::id(),
  1300. );
  1301. let res = io.handle_request_sync(&req, meta.clone());
  1302. let result: Value = serde_json::from_str(&res.expect("actual response"))
  1303. .expect("actual response deserialization");
  1304. let sizes: HashMap<RpcAccountIndex, usize> =
  1305. serde_json::from_value(result["result"].clone()).unwrap();
  1306. assert_eq!(sizes.len(), 1);
  1307. assert_eq!(
  1308. *sizes.get(&RpcAccountIndex::ProgramId).unwrap(),
  1309. (num_default_system_program_accounts + 2)
  1310. );
  1311. } else {
  1312. // ------------ Secondary Indexes Disabled ------------
  1313. let req = format!(
  1314. r#"{{"jsonrpc":"2.0","id":1,"method":"getSecondaryIndexKeySize","params":["{token_account2_pubkey}"]}}"#,
  1315. );
  1316. let res = io.handle_request_sync(&req, meta.clone());
  1317. let result: Value = serde_json::from_str(&res.expect("actual response"))
  1318. .expect("actual response deserialization");
  1319. let sizes: HashMap<RpcAccountIndex, usize> =
  1320. serde_json::from_value(result["result"].clone()).unwrap();
  1321. assert!(sizes.is_empty());
  1322. }
  1323. }
  1324. }
  1325. // This test checks that the rpc call to `set_identity` works a expected with
  1326. // Bank but without validator.
  1327. #[test]
  1328. fn test_set_identity() {
  1329. let rpc = RpcHandler::start_with_config(TestConfig::default());
  1330. let RpcHandler { io, meta, .. } = rpc;
  1331. let expected_validator_id = Keypair::new();
  1332. let validator_id_bytes = format!("{:?}", expected_validator_id.to_bytes());
  1333. let set_id_request = format!(
  1334. r#"{{"jsonrpc":"2.0","id":1,"method":"setIdentityFromBytes","params":[{validator_id_bytes}, false]}}"#,
  1335. );
  1336. let response = io.handle_request_sync(&set_id_request, meta.clone());
  1337. let actual_parsed_response: Value =
  1338. serde_json::from_str(&response.expect("actual response"))
  1339. .expect("actual response deserialization");
  1340. let expected_parsed_response: Value = serde_json::from_str(
  1341. r#"{
  1342. "id": 1,
  1343. "jsonrpc": "2.0",
  1344. "result": null
  1345. }"#,
  1346. )
  1347. .expect("Failed to parse expected response");
  1348. assert_eq!(actual_parsed_response, expected_parsed_response);
  1349. let contact_info_request =
  1350. r#"{"jsonrpc":"2.0","id":1,"method":"contactInfo","params":[]}"#.to_string();
  1351. let response = io.handle_request_sync(&contact_info_request, meta.clone());
  1352. let parsed_response: Value = serde_json::from_str(&response.expect("actual response"))
  1353. .expect("actual response deserialization");
  1354. let actual_validator_id = parsed_response["result"]["id"]
  1355. .as_str()
  1356. .expect("Expected a string");
  1357. assert_eq!(
  1358. actual_validator_id,
  1359. expected_validator_id.pubkey().to_string()
  1360. );
  1361. }
  1362. struct TestValidatorWithAdminRpc {
  1363. meta: AdminRpcRequestMetadata,
  1364. io: MetaIoHandler<AdminRpcRequestMetadata>,
  1365. validator_ledger_path: PathBuf,
  1366. }
  1367. impl TestValidatorWithAdminRpc {
  1368. fn new() -> Self {
  1369. let leader_keypair = Keypair::new();
  1370. let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
  1371. let validator_keypair = Keypair::new();
  1372. let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
  1373. let genesis_config =
  1374. create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1000)
  1375. .genesis_config;
  1376. let (validator_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
  1377. let voting_keypair = Arc::new(Keypair::new());
  1378. let voting_pubkey = voting_keypair.pubkey();
  1379. let authorized_voter_keypairs = Arc::new(RwLock::new(vec![voting_keypair]));
  1380. let validator_config = ValidatorConfig {
  1381. rpc_addrs: Some((
  1382. validator_node.info.rpc().unwrap(),
  1383. validator_node.info.rpc_pubsub().unwrap(),
  1384. )),
  1385. ..ValidatorConfig::default_for_test()
  1386. };
  1387. let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
  1388. let post_init = Arc::new(RwLock::new(None));
  1389. let meta = AdminRpcRequestMetadata {
  1390. rpc_addr: validator_config.rpc_addrs.map(|(rpc_addr, _)| rpc_addr),
  1391. start_time: SystemTime::now(),
  1392. start_progress: start_progress.clone(),
  1393. validator_exit: validator_config.validator_exit.clone(),
  1394. validator_exit_backpressure: HashMap::default(),
  1395. authorized_voter_keypairs: authorized_voter_keypairs.clone(),
  1396. tower_storage: Arc::new(NullTowerStorage {}),
  1397. post_init: post_init.clone(),
  1398. staked_nodes_overrides: Arc::new(RwLock::new(HashMap::new())),
  1399. rpc_to_plugin_manager_sender: None,
  1400. };
  1401. let _validator = Validator::new(
  1402. validator_node,
  1403. Arc::new(validator_keypair),
  1404. &validator_ledger_path,
  1405. &voting_pubkey,
  1406. authorized_voter_keypairs,
  1407. vec![leader_node.info],
  1408. &validator_config,
  1409. true, // should_check_duplicate_instance
  1410. None, // rpc_to_plugin_manager_receiver
  1411. start_progress.clone(),
  1412. SocketAddrSpace::Unspecified,
  1413. ValidatorTpuConfig::new_for_tests(DEFAULT_TPU_ENABLE_UDP),
  1414. post_init.clone(),
  1415. )
  1416. .expect("assume successful validator start");
  1417. assert_eq!(
  1418. *start_progress.read().unwrap(),
  1419. ValidatorStartProgress::Running
  1420. );
  1421. let post_init = post_init.read().unwrap();
  1422. assert!(post_init.is_some());
  1423. let post_init = post_init.as_ref().unwrap();
  1424. let notifies = post_init.notifies.read().unwrap();
  1425. let updater_keys: HashSet<KeyUpdaterType> =
  1426. notifies.into_iter().map(|(key, _)| key.clone()).collect();
  1427. assert_eq!(
  1428. updater_keys,
  1429. HashSet::from_iter(vec![
  1430. KeyUpdaterType::Tpu,
  1431. KeyUpdaterType::TpuForwards,
  1432. KeyUpdaterType::TpuVote,
  1433. KeyUpdaterType::Forward,
  1434. KeyUpdaterType::RpcService
  1435. ])
  1436. );
  1437. let mut io = MetaIoHandler::default();
  1438. io.extend_with(AdminRpcImpl.to_delegate());
  1439. Self {
  1440. meta,
  1441. io,
  1442. validator_ledger_path,
  1443. }
  1444. }
  1445. fn handle_request(&self, request: &str) -> Option<String> {
  1446. self.io.handle_request_sync(request, self.meta.clone())
  1447. }
  1448. }
  1449. impl Drop for TestValidatorWithAdminRpc {
  1450. fn drop(&mut self) {
  1451. remove_dir_all(self.validator_ledger_path.clone()).unwrap();
  1452. }
  1453. }
  1454. // This test checks that `set_identity` call works with working validator and client.
  1455. #[test]
  1456. fn test_set_identity_with_validator() {
  1457. let test_validator = TestValidatorWithAdminRpc::new();
  1458. let expected_validator_id = Keypair::new();
  1459. let validator_id_bytes = format!("{:?}", expected_validator_id.to_bytes());
  1460. let set_id_request = format!(
  1461. r#"{{"jsonrpc":"2.0","id":1,"method":"setIdentityFromBytes","params":[{validator_id_bytes}, false]}}"#,
  1462. );
  1463. let response = test_validator.handle_request(&set_id_request);
  1464. let actual_parsed_response: Value =
  1465. serde_json::from_str(&response.expect("actual response"))
  1466. .expect("actual response deserialization");
  1467. let expected_parsed_response: Value = serde_json::from_str(
  1468. r#"{
  1469. "id": 1,
  1470. "jsonrpc": "2.0",
  1471. "result": null
  1472. }"#,
  1473. )
  1474. .expect("Failed to parse expected response");
  1475. assert_eq!(actual_parsed_response, expected_parsed_response);
  1476. let contact_info_request =
  1477. r#"{"jsonrpc":"2.0","id":1,"method":"contactInfo","params":[]}"#.to_string();
  1478. let response = test_validator.handle_request(&contact_info_request);
  1479. let parsed_response: Value = serde_json::from_str(&response.expect("actual response"))
  1480. .expect("actual response deserialization");
  1481. let actual_validator_id = parsed_response["result"]["id"]
  1482. .as_str()
  1483. .expect("Expected a string");
  1484. assert_eq!(
  1485. actual_validator_id,
  1486. expected_validator_id.pubkey().to_string()
  1487. );
  1488. let contact_info_request =
  1489. r#"{"jsonrpc":"2.0","id":1,"method":"exit","params":[]}"#.to_string();
  1490. let exit_response = test_validator.handle_request(&contact_info_request);
  1491. let actual_parsed_response: Value =
  1492. serde_json::from_str(&exit_response.expect("actual response"))
  1493. .expect("actual response deserialization");
  1494. assert_eq!(actual_parsed_response, expected_parsed_response);
  1495. }
  1496. }