client.rs 20 KB


  1. use {
  2. futures_util::StreamExt,
  3. serde_json::{json, Value},
  4. solana_clock::Slot,
  5. solana_commitment_config::{CommitmentConfig, CommitmentLevel},
  6. solana_keypair::Keypair,
  7. solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path_auto_delete},
  8. solana_native_token::LAMPORTS_PER_SOL,
  9. solana_pubkey::Pubkey,
  10. solana_pubsub_client::{nonblocking, pubsub_client::PubsubClient},
  11. solana_rpc::{
  12. optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
  13. rpc::{create_test_transaction_entries, populate_blockstore_for_tests},
  14. rpc_pubsub_service::{PubSubConfig, PubSubService},
  15. rpc_subscriptions::RpcSubscriptions,
  16. },
  17. solana_rpc_client::rpc_client::RpcClient,
  18. solana_rpc_client_api::{
  19. config::{
  20. RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter,
  21. RpcProgramAccountsConfig,
  22. },
  23. response::SlotInfo,
  24. },
  25. solana_runtime::{
  26. bank::Bank,
  27. bank_forks::BankForks,
  28. commitment::{BlockCommitmentCache, CommitmentSlots},
  29. genesis_utils::{create_genesis_config, GenesisConfigInfo},
  30. },
  31. solana_signer::Signer,
  32. solana_streamer::socket::SocketAddrSpace,
  33. solana_system_interface::program as system_program,
  34. solana_system_transaction as system_transaction,
  35. solana_test_validator::TestValidator,
  36. solana_transaction_status::{
  37. BlockEncodingOptions, ConfirmedBlock, TransactionDetails, UiTransactionEncoding,
  38. },
  39. std::{
  40. collections::HashSet,
  41. net::{IpAddr, SocketAddr},
  42. sync::{
  43. atomic::{AtomicBool, AtomicU64, Ordering},
  44. Arc, RwLock,
  45. },
  46. thread::sleep,
  47. time::{Duration, Instant},
  48. },
  49. systemstat::Ipv4Addr,
  50. tungstenite::connect,
  51. };
  52. fn pubsub_addr() -> SocketAddr {
  53. let port_range = solana_net_utils::sockets::localhost_port_range_for_tests();
  54. SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port_range.0)
  55. }
  56. #[test]
  57. fn test_rpc_client() {
  58. solana_logger::setup();
  59. let alice = Keypair::new();
  60. let test_validator =
  61. TestValidator::with_no_fees(alice.pubkey(), None, SocketAddrSpace::Unspecified);
  62. let bob_pubkey = solana_pubkey::new_rand();
  63. let client = RpcClient::new(test_validator.rpc_url());
  64. assert_eq!(
  65. client.get_version().unwrap().solana_core,
  66. solana_version::semver!()
  67. );
  68. assert!(client.get_account(&bob_pubkey).is_err());
  69. assert_eq!(client.get_balance(&bob_pubkey).unwrap(), 0);
  70. let original_alice_balance = client.get_balance(&alice.pubkey()).unwrap();
  71. let blockhash = client.get_latest_blockhash().unwrap();
  72. let tx = system_transaction::transfer(&alice, &bob_pubkey, 20 * LAMPORTS_PER_SOL, blockhash);
  73. let signature = client.send_transaction(&tx).unwrap();
  74. let mut confirmed_tx = false;
  75. let now = Instant::now();
  76. while now.elapsed().as_secs() <= 20 {
  77. let response = client
  78. .confirm_transaction_with_commitment(&signature, CommitmentConfig::processed())
  79. .unwrap();
  80. if response.value {
  81. confirmed_tx = true;
  82. break;
  83. }
  84. sleep(Duration::from_millis(500));
  85. }
  86. assert!(confirmed_tx);
  87. assert_eq!(
  88. client
  89. .get_balance_with_commitment(&bob_pubkey, CommitmentConfig::processed())
  90. .unwrap()
  91. .value,
  92. 20 * LAMPORTS_PER_SOL
  93. );
  94. assert_eq!(
  95. client
  96. .get_balance_with_commitment(&alice.pubkey(), CommitmentConfig::processed())
  97. .unwrap()
  98. .value,
  99. original_alice_balance - 20 * LAMPORTS_PER_SOL
  100. );
  101. }
  102. #[test]
  103. fn test_account_subscription() {
  104. let pubsub_addr = pubsub_addr();
  105. let exit = Arc::new(AtomicBool::new(false));
  106. let GenesisConfigInfo {
  107. genesis_config,
  108. mint_keypair: alice,
  109. ..
  110. } = create_genesis_config(10_000);
  111. let bank = Bank::new_for_tests(&genesis_config);
  112. let blockhash = bank.last_blockhash();
  113. let bank_forks = BankForks::new_rw_arc(bank);
  114. let bank0 = bank_forks.read().unwrap().get(0).unwrap();
  115. let bank1 = Bank::new_from_parent(bank0, &Pubkey::default(), 1);
  116. bank_forks.write().unwrap().insert(bank1);
  117. let bob = Keypair::new();
  118. let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
  119. let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
  120. exit.clone(),
  121. max_complete_transaction_status_slot,
  122. bank_forks.clone(),
  123. Arc::new(RwLock::new(BlockCommitmentCache::default())),
  124. OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
  125. ));
  126. let (trigger, pubsub_service) =
  127. PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr);
  128. check_server_is_ready_or_panic(&pubsub_addr, 10, Duration::from_millis(300));
  129. let config = Some(RpcAccountInfoConfig {
  130. commitment: Some(CommitmentConfig::finalized()),
  131. encoding: None,
  132. data_slice: None,
  133. min_context_slot: None,
  134. });
  135. let (mut client, receiver) = PubsubClient::account_subscribe(
  136. &format!("ws://0.0.0.0:{}/", pubsub_addr.port()),
  137. &bob.pubkey(),
  138. config,
  139. )
  140. .unwrap();
  141. // Transfer 100 lamports from alice to bob
  142. let tx = system_transaction::transfer(&alice, &bob.pubkey(), 100, blockhash);
  143. bank_forks
  144. .read()
  145. .unwrap()
  146. .get(1)
  147. .unwrap()
  148. .process_transaction(&tx)
  149. .unwrap();
  150. let commitment_slots = CommitmentSlots {
  151. slot: 1,
  152. ..CommitmentSlots::default()
  153. };
  154. subscriptions.notify_subscribers(commitment_slots);
  155. let commitment_slots = CommitmentSlots {
  156. slot: 2,
  157. root: 1,
  158. highest_confirmed_slot: 1,
  159. highest_super_majority_root: 1,
  160. };
  161. subscriptions.notify_subscribers(commitment_slots);
  162. let expected = json!({
  163. "context": { "slot": 1 },
  164. "value": {
  165. "owner": system_program::id().to_string(),
  166. "lamports": 100,
  167. "data": "",
  168. "executable": false,
  169. "rentEpoch": u64::MAX,
  170. "space": 0,
  171. },
  172. });
  173. // Read notification
  174. let mut errors: Vec<(Value, Value)> = Vec::new();
  175. let response = receiver.recv();
  176. match response {
  177. Ok(response) => {
  178. let actual = serde_json::to_value(response).unwrap();
  179. if expected != actual {
  180. errors.push((expected, actual));
  181. }
  182. }
  183. Err(_) => eprintln!("unexpected websocket receive timeout"),
  184. }
  185. exit.store(true, Ordering::Relaxed);
  186. trigger.cancel();
  187. client.shutdown().unwrap();
  188. pubsub_service.close().unwrap();
  189. assert_eq!(errors, [].to_vec());
  190. }
  191. #[test]
  192. fn test_block_subscription() {
  193. // setup BankForks
  194. let exit = Arc::new(AtomicBool::new(false));
  195. let GenesisConfigInfo {
  196. genesis_config,
  197. mint_keypair: alice,
  198. ..
  199. } = create_genesis_config(10_000);
  200. let bank = Bank::new_for_tests(&genesis_config);
  201. let rent_exempt_amount = bank.get_minimum_balance_for_rent_exemption(0);
  202. let bank_forks = BankForks::new_rw_arc(bank);
  203. // setup Blockstore
  204. let ledger_path = get_tmp_ledger_path_auto_delete!();
  205. let blockstore = Blockstore::open(ledger_path.path()).unwrap();
  206. let blockstore = Arc::new(blockstore);
  207. // populate ledger with test txs
  208. let bank = bank_forks.read().unwrap().working_bank();
  209. let keypair1 = Keypair::new();
  210. let keypair2 = Keypair::new();
  211. let keypair3 = Keypair::new();
  212. let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
  213. bank.transfer(rent_exempt_amount, &alice, &keypair2.pubkey())
  214. .unwrap();
  215. populate_blockstore_for_tests(
  216. create_test_transaction_entries(
  217. vec![&alice, &keypair1, &keypair2, &keypair3],
  218. bank.clone(),
  219. )
  220. .0,
  221. bank,
  222. blockstore.clone(),
  223. max_complete_transaction_status_slot,
  224. );
  225. let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
  226. // setup RpcSubscriptions && PubSubService
  227. let subscriptions = Arc::new(RpcSubscriptions::new_for_tests_with_blockstore(
  228. exit.clone(),
  229. max_complete_transaction_status_slot,
  230. blockstore.clone(),
  231. bank_forks.clone(),
  232. Arc::new(RwLock::new(BlockCommitmentCache::default())),
  233. OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
  234. ));
  235. let pubsub_addr = pubsub_addr();
  236. let pub_cfg = PubSubConfig {
  237. enable_block_subscription: true,
  238. ..PubSubConfig::default()
  239. };
  240. let (trigger, pubsub_service) = PubSubService::new(pub_cfg, &subscriptions, pubsub_addr);
  241. check_server_is_ready_or_panic(&pubsub_addr, 10, Duration::from_millis(300));
  242. // setup PubsubClient
  243. let (mut client, receiver) = PubsubClient::block_subscribe(
  244. &format!("ws://0.0.0.0:{}/", pubsub_addr.port()),
  245. RpcBlockSubscribeFilter::All,
  246. Some(RpcBlockSubscribeConfig {
  247. commitment: Some(CommitmentConfig {
  248. commitment: CommitmentLevel::Confirmed,
  249. }),
  250. encoding: Some(UiTransactionEncoding::Json),
  251. transaction_details: Some(TransactionDetails::Signatures),
  252. show_rewards: None,
  253. max_supported_transaction_version: None,
  254. }),
  255. )
  256. .unwrap();
  257. // trigger Gossip notification
  258. let slot = bank_forks.read().unwrap().highest_slot();
  259. subscriptions.notify_gossip_subscribers(slot);
  260. let maybe_actual = receiver.recv_timeout(Duration::from_millis(400));
  261. match maybe_actual {
  262. Ok(actual) => {
  263. let versioned_block = blockstore.get_complete_block(slot, false).unwrap();
  264. let confirmed_block = ConfirmedBlock::from(versioned_block);
  265. let block = confirmed_block
  266. .encode_with_options(
  267. UiTransactionEncoding::Json,
  268. BlockEncodingOptions {
  269. transaction_details: TransactionDetails::Signatures,
  270. show_rewards: false,
  271. max_supported_transaction_version: None,
  272. },
  273. )
  274. .unwrap();
  275. assert_eq!(actual.value.slot, slot);
  276. assert!(block.eq(&actual.value.block.unwrap()));
  277. }
  278. Err(e) => {
  279. eprintln!("unexpected websocket receive timeout");
  280. assert_eq!(Some(e), None);
  281. }
  282. }
  283. // cleanup
  284. exit.store(true, Ordering::Relaxed);
  285. trigger.cancel();
  286. client.shutdown().unwrap();
  287. pubsub_service.close().unwrap();
  288. }
  289. #[test]
  290. fn test_program_subscription() {
  291. let pubsub_addr = pubsub_addr();
  292. let exit = Arc::new(AtomicBool::new(false));
  293. let GenesisConfigInfo {
  294. genesis_config,
  295. mint_keypair: alice,
  296. ..
  297. } = create_genesis_config(10_000);
  298. let bank = Bank::new_for_tests(&genesis_config);
  299. let blockhash = bank.last_blockhash();
  300. let bank_forks = BankForks::new_rw_arc(bank);
  301. let bank0 = bank_forks.read().unwrap().get(0).unwrap();
  302. let bank1 = Bank::new_from_parent(bank0, &Pubkey::default(), 1);
  303. bank_forks.write().unwrap().insert(bank1);
  304. let bob = Keypair::new();
  305. let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
  306. let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
  307. exit.clone(),
  308. max_complete_transaction_status_slot,
  309. bank_forks.clone(),
  310. Arc::new(RwLock::new(BlockCommitmentCache::default())),
  311. OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
  312. ));
  313. let (trigger, pubsub_service) =
  314. PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr);
  315. check_server_is_ready_or_panic(&pubsub_addr, 10, Duration::from_millis(300));
  316. let config = Some(RpcProgramAccountsConfig {
  317. ..RpcProgramAccountsConfig::default()
  318. });
  319. let program_id = Pubkey::new_unique();
  320. let (mut client, receiver) = PubsubClient::program_subscribe(
  321. &format!("ws://0.0.0.0:{}/", pubsub_addr.port()),
  322. &program_id,
  323. config,
  324. )
  325. .unwrap();
  326. // Create new program account at bob's address
  327. let tx = system_transaction::create_account(&alice, &bob, blockhash, 100, 0, &program_id);
  328. bank_forks
  329. .read()
  330. .unwrap()
  331. .get(1)
  332. .unwrap()
  333. .process_transaction(&tx)
  334. .unwrap();
  335. let commitment_slots = CommitmentSlots {
  336. slot: 1,
  337. ..CommitmentSlots::default()
  338. };
  339. subscriptions.notify_subscribers(commitment_slots);
  340. let commitment_slots = CommitmentSlots {
  341. slot: 2,
  342. root: 1,
  343. highest_confirmed_slot: 1,
  344. highest_super_majority_root: 1,
  345. };
  346. subscriptions.notify_subscribers(commitment_slots);
  347. // Poll notifications generated by the transfer
  348. let mut notifications = Vec::new();
  349. let mut pubkeys = HashSet::new();
  350. loop {
  351. let response = receiver.recv_timeout(Duration::from_millis(100));
  352. match response {
  353. Ok(response) => {
  354. notifications.push(response.clone());
  355. pubkeys.insert(response.value.pubkey);
  356. }
  357. Err(_) => {
  358. break;
  359. }
  360. }
  361. }
  362. // Shutdown
  363. exit.store(true, Ordering::Relaxed);
  364. trigger.cancel();
  365. client.shutdown().unwrap();
  366. pubsub_service.close().unwrap();
  367. assert_eq!(notifications.len(), 1);
  368. assert!(pubkeys.contains(&bob.pubkey().to_string()));
  369. }
  370. #[test]
  371. fn test_root_subscription() {
  372. let pubsub_addr = pubsub_addr();
  373. let exit = Arc::new(AtomicBool::new(false));
  374. let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
  375. let bank = Bank::new_for_tests(&genesis_config);
  376. let bank_forks = BankForks::new_rw_arc(bank);
  377. let bank0 = bank_forks.read().unwrap().get(0).unwrap();
  378. let bank1 = Bank::new_from_parent(bank0, &Pubkey::default(), 1);
  379. bank_forks.write().unwrap().insert(bank1);
  380. let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
  381. let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
  382. exit.clone(),
  383. max_complete_transaction_status_slot,
  384. bank_forks.clone(),
  385. Arc::new(RwLock::new(BlockCommitmentCache::default())),
  386. OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
  387. ));
  388. let (trigger, pubsub_service) =
  389. PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr);
  390. check_server_is_ready_or_panic(&pubsub_addr, 10, Duration::from_millis(300));
  391. let (mut client, receiver) =
  392. PubsubClient::root_subscribe(&format!("ws://0.0.0.0:{}/", pubsub_addr.port())).unwrap();
  393. let roots = vec![1, 2, 3];
  394. subscriptions.notify_roots(roots.clone());
  395. // Read notifications
  396. let mut errors: Vec<(Slot, Slot)> = Vec::new();
  397. for expected in roots {
  398. let response = receiver.recv();
  399. match response {
  400. Ok(response) => {
  401. if expected != response {
  402. errors.push((expected, response));
  403. }
  404. }
  405. Err(_) => eprintln!("unexpected websocket receive timeout"),
  406. }
  407. }
  408. exit.store(true, Ordering::Relaxed);
  409. trigger.cancel();
  410. client.shutdown().unwrap();
  411. pubsub_service.close().unwrap();
  412. assert_eq!(errors, [].to_vec());
  413. }
  414. #[test]
  415. fn test_slot_subscription() {
  416. let pubsub_addr = pubsub_addr();
  417. let exit = Arc::new(AtomicBool::new(false));
  418. let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
  419. let bank = Bank::new_for_tests(&genesis_config);
  420. let bank_forks = BankForks::new_rw_arc(bank);
  421. let optimistically_confirmed_bank =
  422. OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
  423. let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
  424. let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
  425. exit.clone(),
  426. max_complete_transaction_status_slot,
  427. bank_forks,
  428. Arc::new(RwLock::new(BlockCommitmentCache::default())),
  429. optimistically_confirmed_bank,
  430. ));
  431. let (trigger, pubsub_service) =
  432. PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr);
  433. check_server_is_ready_or_panic(&pubsub_addr, 10, Duration::from_millis(300));
  434. let (mut client, receiver) =
  435. PubsubClient::slot_subscribe(&format!("ws://0.0.0.0:{}/", pubsub_addr.port())).unwrap();
  436. let mut errors: Vec<(SlotInfo, SlotInfo)> = Vec::new();
  437. for i in 0..3 {
  438. subscriptions.notify_slot(i + 1, i, i);
  439. let maybe_actual = receiver.recv_timeout(Duration::from_millis(400));
  440. match maybe_actual {
  441. Ok(actual) => {
  442. let expected = SlotInfo {
  443. slot: i + 1,
  444. parent: i,
  445. root: i,
  446. };
  447. if actual != expected {
  448. errors.push((actual, expected));
  449. }
  450. }
  451. Err(_err) => {
  452. eprintln!("unexpected websocket receive timeout");
  453. break;
  454. }
  455. }
  456. }
  457. exit.store(true, Ordering::Relaxed);
  458. trigger.cancel();
  459. client.shutdown().unwrap();
  460. pubsub_service.close().unwrap();
  461. assert_eq!(errors, [].to_vec());
  462. }
  463. #[tokio::test]
  464. async fn test_slot_subscription_async() {
  465. let sync_service = Arc::new(AtomicU64::new(0));
  466. let sync_client = Arc::clone(&sync_service);
  467. fn wait_until(atomic: &Arc<AtomicU64>, value: u64) {
  468. let now = Instant::now();
  469. while atomic.load(Ordering::Relaxed) != value {
  470. if now.elapsed() > Duration::from_secs(5) {
  471. panic!("wait for too long")
  472. }
  473. sleep(Duration::from_millis(1))
  474. }
  475. }
  476. let pubsub_addr = pubsub_addr();
  477. tokio::task::spawn_blocking(move || {
  478. let exit = Arc::new(AtomicBool::new(false));
  479. let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
  480. let bank = Bank::new_for_tests(&genesis_config);
  481. let bank_forks = BankForks::new_rw_arc(bank);
  482. let optimistically_confirmed_bank =
  483. OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
  484. let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
  485. let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
  486. exit.clone(),
  487. max_complete_transaction_status_slot,
  488. bank_forks,
  489. Arc::new(RwLock::new(BlockCommitmentCache::default())),
  490. optimistically_confirmed_bank,
  491. ));
  492. let (trigger, pubsub_service) =
  493. PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr);
  494. check_server_is_ready_or_panic(&pubsub_addr, 10, Duration::from_millis(100));
  495. sync_service.store(1, Ordering::Relaxed);
  496. wait_until(&sync_service, 2);
  497. subscriptions.notify_slot(1, 0, 0);
  498. sync_service.store(3, Ordering::Relaxed);
  499. wait_until(&sync_service, 4);
  500. subscriptions.notify_slot(2, 1, 1);
  501. sync_service.store(5, Ordering::Relaxed);
  502. wait_until(&sync_service, 6);
  503. exit.store(true, Ordering::Relaxed);
  504. trigger.cancel();
  505. pubsub_service.close().unwrap();
  506. });
  507. wait_until(&sync_client, 1);
  508. let url = format!("ws://0.0.0.0:{}/", pubsub_addr.port());
  509. let pubsub_client = nonblocking::pubsub_client::PubsubClient::new(&url)
  510. .await
  511. .unwrap();
  512. let (mut notifications, unsubscribe) = pubsub_client.slot_subscribe().await.unwrap();
  513. sync_client.store(2, Ordering::Relaxed);
  514. wait_until(&sync_client, 3);
  515. assert_eq!(
  516. tokio::time::timeout(Duration::from_millis(25), notifications.next()).await,
  517. Ok(Some(SlotInfo {
  518. slot: 1,
  519. parent: 0,
  520. root: 0,
  521. }))
  522. );
  523. sync_client.store(4, Ordering::Relaxed);
  524. wait_until(&sync_client, 5);
  525. assert_eq!(
  526. tokio::time::timeout(Duration::from_millis(25), notifications.next()).await,
  527. Ok(Some(SlotInfo {
  528. slot: 2,
  529. parent: 1,
  530. root: 1,
  531. }))
  532. );
  533. sync_client.store(6, Ordering::Relaxed);
  534. unsubscribe().await;
  535. }
  536. fn check_server_is_ready_or_panic(
  537. socket_addr: &SocketAddr,
  538. mut retry: u8,
  539. sleep_duration: Duration,
  540. ) {
  541. loop {
  542. if retry == 0 {
  543. break;
  544. } else {
  545. retry = retry.checked_sub(1).unwrap();
  546. }
  547. if connect(format!("ws://{socket_addr}")).is_ok() {
  548. return;
  549. }
  550. sleep(sleep_duration);
  551. }
  552. panic!("server hasn't been ready");
  553. }