| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627 |
- use {
- futures_util::StreamExt,
- serde_json::{json, Value},
- solana_clock::Slot,
- solana_commitment_config::{CommitmentConfig, CommitmentLevel},
- solana_keypair::Keypair,
- solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path_auto_delete},
- solana_native_token::LAMPORTS_PER_SOL,
- solana_pubkey::Pubkey,
- solana_pubsub_client::{nonblocking, pubsub_client::PubsubClient},
- solana_rpc::{
- optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank,
- rpc::{create_test_transaction_entries, populate_blockstore_for_tests},
- rpc_pubsub_service::{PubSubConfig, PubSubService},
- rpc_subscriptions::RpcSubscriptions,
- },
- solana_rpc_client::rpc_client::RpcClient,
- solana_rpc_client_api::{
- config::{
- RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter,
- RpcProgramAccountsConfig,
- },
- response::SlotInfo,
- },
- solana_runtime::{
- bank::Bank,
- bank_forks::BankForks,
- commitment::{BlockCommitmentCache, CommitmentSlots},
- genesis_utils::{create_genesis_config, GenesisConfigInfo},
- },
- solana_signer::Signer,
- solana_streamer::socket::SocketAddrSpace,
- solana_system_interface::program as system_program,
- solana_system_transaction as system_transaction,
- solana_test_validator::TestValidator,
- solana_transaction_status::{
- BlockEncodingOptions, ConfirmedBlock, TransactionDetails, UiTransactionEncoding,
- },
- std::{
- collections::HashSet,
- net::{IpAddr, SocketAddr},
- sync::{
- atomic::{AtomicBool, AtomicU64, Ordering},
- Arc, RwLock,
- },
- thread::sleep,
- time::{Duration, Instant},
- },
- systemstat::Ipv4Addr,
- tungstenite::connect,
- };
- fn pubsub_addr() -> SocketAddr {
- let port_range = solana_net_utils::sockets::localhost_port_range_for_tests();
- SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port_range.0)
- }
- #[test]
- fn test_rpc_client() {
- solana_logger::setup();
- let alice = Keypair::new();
- let test_validator =
- TestValidator::with_no_fees(alice.pubkey(), None, SocketAddrSpace::Unspecified);
- let bob_pubkey = solana_pubkey::new_rand();
- let client = RpcClient::new(test_validator.rpc_url());
- assert_eq!(
- client.get_version().unwrap().solana_core,
- solana_version::semver!()
- );
- assert!(client.get_account(&bob_pubkey).is_err());
- assert_eq!(client.get_balance(&bob_pubkey).unwrap(), 0);
- let original_alice_balance = client.get_balance(&alice.pubkey()).unwrap();
- let blockhash = client.get_latest_blockhash().unwrap();
- let tx = system_transaction::transfer(&alice, &bob_pubkey, 20 * LAMPORTS_PER_SOL, blockhash);
- let signature = client.send_transaction(&tx).unwrap();
- let mut confirmed_tx = false;
- let now = Instant::now();
- while now.elapsed().as_secs() <= 20 {
- let response = client
- .confirm_transaction_with_commitment(&signature, CommitmentConfig::processed())
- .unwrap();
- if response.value {
- confirmed_tx = true;
- break;
- }
- sleep(Duration::from_millis(500));
- }
- assert!(confirmed_tx);
- assert_eq!(
- client
- .get_balance_with_commitment(&bob_pubkey, CommitmentConfig::processed())
- .unwrap()
- .value,
- 20 * LAMPORTS_PER_SOL
- );
- assert_eq!(
- client
- .get_balance_with_commitment(&alice.pubkey(), CommitmentConfig::processed())
- .unwrap()
- .value,
- original_alice_balance - 20 * LAMPORTS_PER_SOL
- );
- }
- #[test]
- fn test_account_subscription() {
- let pubsub_addr = pubsub_addr();
- let exit = Arc::new(AtomicBool::new(false));
- let GenesisConfigInfo {
- genesis_config,
- mint_keypair: alice,
- ..
- } = create_genesis_config(10_000);
- let bank = Bank::new_for_tests(&genesis_config);
- let blockhash = bank.last_blockhash();
- let bank_forks = BankForks::new_rw_arc(bank);
- let bank0 = bank_forks.read().unwrap().get(0).unwrap();
- let bank1 = Bank::new_from_parent(bank0, &Pubkey::default(), 1);
- bank_forks.write().unwrap().insert(bank1);
- let bob = Keypair::new();
- let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
- let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
- exit.clone(),
- max_complete_transaction_status_slot,
- bank_forks.clone(),
- Arc::new(RwLock::new(BlockCommitmentCache::default())),
- OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
- ));
- let (trigger, pubsub_service) =
- PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr);
- check_server_is_ready_or_panic(&pubsub_addr, 10, Duration::from_millis(300));
- let config = Some(RpcAccountInfoConfig {
- commitment: Some(CommitmentConfig::finalized()),
- encoding: None,
- data_slice: None,
- min_context_slot: None,
- });
- let (mut client, receiver) = PubsubClient::account_subscribe(
- &format!("ws://0.0.0.0:{}/", pubsub_addr.port()),
- &bob.pubkey(),
- config,
- )
- .unwrap();
- // Transfer 100 lamports from alice to bob
- let tx = system_transaction::transfer(&alice, &bob.pubkey(), 100, blockhash);
- bank_forks
- .read()
- .unwrap()
- .get(1)
- .unwrap()
- .process_transaction(&tx)
- .unwrap();
- let commitment_slots = CommitmentSlots {
- slot: 1,
- ..CommitmentSlots::default()
- };
- subscriptions.notify_subscribers(commitment_slots);
- let commitment_slots = CommitmentSlots {
- slot: 2,
- root: 1,
- highest_confirmed_slot: 1,
- highest_super_majority_root: 1,
- };
- subscriptions.notify_subscribers(commitment_slots);
- let expected = json!({
- "context": { "slot": 1 },
- "value": {
- "owner": system_program::id().to_string(),
- "lamports": 100,
- "data": "",
- "executable": false,
- "rentEpoch": u64::MAX,
- "space": 0,
- },
- });
- // Read notification
- let mut errors: Vec<(Value, Value)> = Vec::new();
- let response = receiver.recv();
- match response {
- Ok(response) => {
- let actual = serde_json::to_value(response).unwrap();
- if expected != actual {
- errors.push((expected, actual));
- }
- }
- Err(_) => eprintln!("unexpected websocket receive timeout"),
- }
- exit.store(true, Ordering::Relaxed);
- trigger.cancel();
- client.shutdown().unwrap();
- pubsub_service.close().unwrap();
- assert_eq!(errors, [].to_vec());
- }
- #[test]
- fn test_block_subscription() {
- // setup BankForks
- let exit = Arc::new(AtomicBool::new(false));
- let GenesisConfigInfo {
- genesis_config,
- mint_keypair: alice,
- ..
- } = create_genesis_config(10_000);
- let bank = Bank::new_for_tests(&genesis_config);
- let rent_exempt_amount = bank.get_minimum_balance_for_rent_exemption(0);
- let bank_forks = BankForks::new_rw_arc(bank);
- // setup Blockstore
- let ledger_path = get_tmp_ledger_path_auto_delete!();
- let blockstore = Blockstore::open(ledger_path.path()).unwrap();
- let blockstore = Arc::new(blockstore);
- // populate ledger with test txs
- let bank = bank_forks.read().unwrap().working_bank();
- let keypair1 = Keypair::new();
- let keypair2 = Keypair::new();
- let keypair3 = Keypair::new();
- let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root()));
- bank.transfer(rent_exempt_amount, &alice, &keypair2.pubkey())
- .unwrap();
- populate_blockstore_for_tests(
- create_test_transaction_entries(
- vec![&alice, &keypair1, &keypair2, &keypair3],
- bank.clone(),
- )
- .0,
- bank,
- blockstore.clone(),
- max_complete_transaction_status_slot,
- );
- let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
- // setup RpcSubscriptions && PubSubService
- let subscriptions = Arc::new(RpcSubscriptions::new_for_tests_with_blockstore(
- exit.clone(),
- max_complete_transaction_status_slot,
- blockstore.clone(),
- bank_forks.clone(),
- Arc::new(RwLock::new(BlockCommitmentCache::default())),
- OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
- ));
- let pubsub_addr = pubsub_addr();
- let pub_cfg = PubSubConfig {
- enable_block_subscription: true,
- ..PubSubConfig::default()
- };
- let (trigger, pubsub_service) = PubSubService::new(pub_cfg, &subscriptions, pubsub_addr);
- check_server_is_ready_or_panic(&pubsub_addr, 10, Duration::from_millis(300));
- // setup PubsubClient
- let (mut client, receiver) = PubsubClient::block_subscribe(
- &format!("ws://0.0.0.0:{}/", pubsub_addr.port()),
- RpcBlockSubscribeFilter::All,
- Some(RpcBlockSubscribeConfig {
- commitment: Some(CommitmentConfig {
- commitment: CommitmentLevel::Confirmed,
- }),
- encoding: Some(UiTransactionEncoding::Json),
- transaction_details: Some(TransactionDetails::Signatures),
- show_rewards: None,
- max_supported_transaction_version: None,
- }),
- )
- .unwrap();
- // trigger Gossip notification
- let slot = bank_forks.read().unwrap().highest_slot();
- subscriptions.notify_gossip_subscribers(slot);
- let maybe_actual = receiver.recv_timeout(Duration::from_millis(400));
- match maybe_actual {
- Ok(actual) => {
- let versioned_block = blockstore.get_complete_block(slot, false).unwrap();
- let confirmed_block = ConfirmedBlock::from(versioned_block);
- let block = confirmed_block
- .encode_with_options(
- UiTransactionEncoding::Json,
- BlockEncodingOptions {
- transaction_details: TransactionDetails::Signatures,
- show_rewards: false,
- max_supported_transaction_version: None,
- },
- )
- .unwrap();
- assert_eq!(actual.value.slot, slot);
- assert!(block.eq(&actual.value.block.unwrap()));
- }
- Err(e) => {
- eprintln!("unexpected websocket receive timeout");
- assert_eq!(Some(e), None);
- }
- }
- // cleanup
- exit.store(true, Ordering::Relaxed);
- trigger.cancel();
- client.shutdown().unwrap();
- pubsub_service.close().unwrap();
- }
- #[test]
- fn test_program_subscription() {
- let pubsub_addr = pubsub_addr();
- let exit = Arc::new(AtomicBool::new(false));
- let GenesisConfigInfo {
- genesis_config,
- mint_keypair: alice,
- ..
- } = create_genesis_config(10_000);
- let bank = Bank::new_for_tests(&genesis_config);
- let blockhash = bank.last_blockhash();
- let bank_forks = BankForks::new_rw_arc(bank);
- let bank0 = bank_forks.read().unwrap().get(0).unwrap();
- let bank1 = Bank::new_from_parent(bank0, &Pubkey::default(), 1);
- bank_forks.write().unwrap().insert(bank1);
- let bob = Keypair::new();
- let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
- let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
- exit.clone(),
- max_complete_transaction_status_slot,
- bank_forks.clone(),
- Arc::new(RwLock::new(BlockCommitmentCache::default())),
- OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
- ));
- let (trigger, pubsub_service) =
- PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr);
- check_server_is_ready_or_panic(&pubsub_addr, 10, Duration::from_millis(300));
- let config = Some(RpcProgramAccountsConfig {
- ..RpcProgramAccountsConfig::default()
- });
- let program_id = Pubkey::new_unique();
- let (mut client, receiver) = PubsubClient::program_subscribe(
- &format!("ws://0.0.0.0:{}/", pubsub_addr.port()),
- &program_id,
- config,
- )
- .unwrap();
- // Create new program account at bob's address
- let tx = system_transaction::create_account(&alice, &bob, blockhash, 100, 0, &program_id);
- bank_forks
- .read()
- .unwrap()
- .get(1)
- .unwrap()
- .process_transaction(&tx)
- .unwrap();
- let commitment_slots = CommitmentSlots {
- slot: 1,
- ..CommitmentSlots::default()
- };
- subscriptions.notify_subscribers(commitment_slots);
- let commitment_slots = CommitmentSlots {
- slot: 2,
- root: 1,
- highest_confirmed_slot: 1,
- highest_super_majority_root: 1,
- };
- subscriptions.notify_subscribers(commitment_slots);
- // Poll notifications generated by the transfer
- let mut notifications = Vec::new();
- let mut pubkeys = HashSet::new();
- loop {
- let response = receiver.recv_timeout(Duration::from_millis(100));
- match response {
- Ok(response) => {
- notifications.push(response.clone());
- pubkeys.insert(response.value.pubkey);
- }
- Err(_) => {
- break;
- }
- }
- }
- // Shutdown
- exit.store(true, Ordering::Relaxed);
- trigger.cancel();
- client.shutdown().unwrap();
- pubsub_service.close().unwrap();
- assert_eq!(notifications.len(), 1);
- assert!(pubkeys.contains(&bob.pubkey().to_string()));
- }
- #[test]
- fn test_root_subscription() {
- let pubsub_addr = pubsub_addr();
- let exit = Arc::new(AtomicBool::new(false));
- let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
- let bank = Bank::new_for_tests(&genesis_config);
- let bank_forks = BankForks::new_rw_arc(bank);
- let bank0 = bank_forks.read().unwrap().get(0).unwrap();
- let bank1 = Bank::new_from_parent(bank0, &Pubkey::default(), 1);
- bank_forks.write().unwrap().insert(bank1);
- let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
- let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
- exit.clone(),
- max_complete_transaction_status_slot,
- bank_forks.clone(),
- Arc::new(RwLock::new(BlockCommitmentCache::default())),
- OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
- ));
- let (trigger, pubsub_service) =
- PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr);
- check_server_is_ready_or_panic(&pubsub_addr, 10, Duration::from_millis(300));
- let (mut client, receiver) =
- PubsubClient::root_subscribe(&format!("ws://0.0.0.0:{}/", pubsub_addr.port())).unwrap();
- let roots = vec![1, 2, 3];
- subscriptions.notify_roots(roots.clone());
- // Read notifications
- let mut errors: Vec<(Slot, Slot)> = Vec::new();
- for expected in roots {
- let response = receiver.recv();
- match response {
- Ok(response) => {
- if expected != response {
- errors.push((expected, response));
- }
- }
- Err(_) => eprintln!("unexpected websocket receive timeout"),
- }
- }
- exit.store(true, Ordering::Relaxed);
- trigger.cancel();
- client.shutdown().unwrap();
- pubsub_service.close().unwrap();
- assert_eq!(errors, [].to_vec());
- }
- #[test]
- fn test_slot_subscription() {
- let pubsub_addr = pubsub_addr();
- let exit = Arc::new(AtomicBool::new(false));
- let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
- let bank = Bank::new_for_tests(&genesis_config);
- let bank_forks = BankForks::new_rw_arc(bank);
- let optimistically_confirmed_bank =
- OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
- let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
- let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
- exit.clone(),
- max_complete_transaction_status_slot,
- bank_forks,
- Arc::new(RwLock::new(BlockCommitmentCache::default())),
- optimistically_confirmed_bank,
- ));
- let (trigger, pubsub_service) =
- PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr);
- check_server_is_ready_or_panic(&pubsub_addr, 10, Duration::from_millis(300));
- let (mut client, receiver) =
- PubsubClient::slot_subscribe(&format!("ws://0.0.0.0:{}/", pubsub_addr.port())).unwrap();
- let mut errors: Vec<(SlotInfo, SlotInfo)> = Vec::new();
- for i in 0..3 {
- subscriptions.notify_slot(i + 1, i, i);
- let maybe_actual = receiver.recv_timeout(Duration::from_millis(400));
- match maybe_actual {
- Ok(actual) => {
- let expected = SlotInfo {
- slot: i + 1,
- parent: i,
- root: i,
- };
- if actual != expected {
- errors.push((actual, expected));
- }
- }
- Err(_err) => {
- eprintln!("unexpected websocket receive timeout");
- break;
- }
- }
- }
- exit.store(true, Ordering::Relaxed);
- trigger.cancel();
- client.shutdown().unwrap();
- pubsub_service.close().unwrap();
- assert_eq!(errors, [].to_vec());
- }
- #[tokio::test]
- async fn test_slot_subscription_async() {
- let sync_service = Arc::new(AtomicU64::new(0));
- let sync_client = Arc::clone(&sync_service);
- fn wait_until(atomic: &Arc<AtomicU64>, value: u64) {
- let now = Instant::now();
- while atomic.load(Ordering::Relaxed) != value {
- if now.elapsed() > Duration::from_secs(5) {
- panic!("wait for too long")
- }
- sleep(Duration::from_millis(1))
- }
- }
- let pubsub_addr = pubsub_addr();
- tokio::task::spawn_blocking(move || {
- let exit = Arc::new(AtomicBool::new(false));
- let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
- let bank = Bank::new_for_tests(&genesis_config);
- let bank_forks = BankForks::new_rw_arc(bank);
- let optimistically_confirmed_bank =
- OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
- let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
- let subscriptions = Arc::new(RpcSubscriptions::new_for_tests(
- exit.clone(),
- max_complete_transaction_status_slot,
- bank_forks,
- Arc::new(RwLock::new(BlockCommitmentCache::default())),
- optimistically_confirmed_bank,
- ));
- let (trigger, pubsub_service) =
- PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr);
- check_server_is_ready_or_panic(&pubsub_addr, 10, Duration::from_millis(100));
- sync_service.store(1, Ordering::Relaxed);
- wait_until(&sync_service, 2);
- subscriptions.notify_slot(1, 0, 0);
- sync_service.store(3, Ordering::Relaxed);
- wait_until(&sync_service, 4);
- subscriptions.notify_slot(2, 1, 1);
- sync_service.store(5, Ordering::Relaxed);
- wait_until(&sync_service, 6);
- exit.store(true, Ordering::Relaxed);
- trigger.cancel();
- pubsub_service.close().unwrap();
- });
- wait_until(&sync_client, 1);
- let url = format!("ws://0.0.0.0:{}/", pubsub_addr.port());
- let pubsub_client = nonblocking::pubsub_client::PubsubClient::new(&url)
- .await
- .unwrap();
- let (mut notifications, unsubscribe) = pubsub_client.slot_subscribe().await.unwrap();
- sync_client.store(2, Ordering::Relaxed);
- wait_until(&sync_client, 3);
- assert_eq!(
- tokio::time::timeout(Duration::from_millis(25), notifications.next()).await,
- Ok(Some(SlotInfo {
- slot: 1,
- parent: 0,
- root: 0,
- }))
- );
- sync_client.store(4, Ordering::Relaxed);
- wait_until(&sync_client, 5);
- assert_eq!(
- tokio::time::timeout(Duration::from_millis(25), notifications.next()).await,
- Ok(Some(SlotInfo {
- slot: 2,
- parent: 1,
- root: 1,
- }))
- );
- sync_client.store(6, Ordering::Relaxed);
- unsubscribe().await;
- }
- fn check_server_is_ready_or_panic(
- socket_addr: &SocketAddr,
- mut retry: u8,
- sleep_duration: Duration,
- ) {
- loop {
- if retry == 0 {
- break;
- } else {
- retry = retry.checked_sub(1).unwrap();
- }
- if connect(format!("ws://{socket_addr}")).is_ok() {
- return;
- }
- sleep(sleep_duration);
- }
- panic!("server hasn't been ready");
- }
|