lib.rs 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655
  1. //! `anchor_client` provides an RPC client to send transactions and fetch
  2. //! deserialized accounts from Solana programs written in `anchor_lang`.
  3. use anchor_lang::solana_program::hash::Hash;
  4. use anchor_lang::solana_program::instruction::{AccountMeta, Instruction};
  5. use anchor_lang::solana_program::program_error::ProgramError;
  6. use anchor_lang::solana_program::pubkey::Pubkey;
  7. use anchor_lang::{AccountDeserialize, Discriminator, InstructionData, ToAccountMetas};
  8. use futures::{Future, StreamExt};
  9. use regex::Regex;
  10. use solana_account_decoder::UiAccountEncoding;
  11. use solana_client::rpc_config::{
  12. RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcSendTransactionConfig,
  13. RpcTransactionLogsConfig, RpcTransactionLogsFilter,
  14. };
  15. use solana_client::rpc_filter::{Memcmp, RpcFilterType};
  16. use solana_client::{
  17. client_error::ClientError as SolanaClientError,
  18. nonblocking::{
  19. pubsub_client::{PubsubClient, PubsubClientError},
  20. rpc_client::RpcClient as AsyncRpcClient,
  21. },
  22. rpc_client::RpcClient,
  23. rpc_response::{Response as RpcResponse, RpcLogsResponse},
  24. };
  25. use solana_sdk::account::Account;
  26. use solana_sdk::commitment_config::CommitmentConfig;
  27. use solana_sdk::signature::{Signature, Signer};
  28. use solana_sdk::transaction::Transaction;
  29. use std::iter::Map;
  30. use std::marker::PhantomData;
  31. use std::ops::Deref;
  32. use std::pin::Pin;
  33. use std::sync::Arc;
  34. use std::vec::IntoIter;
  35. use thiserror::Error;
  36. use tokio::{
  37. runtime::Handle,
  38. sync::{
  39. mpsc::{unbounded_channel, UnboundedReceiver},
  40. RwLock,
  41. },
  42. task::JoinHandle,
  43. };
  44. pub use anchor_lang;
  45. pub use cluster::Cluster;
  46. pub use solana_client;
  47. pub use solana_sdk;
  48. mod cluster;
  49. #[cfg(not(feature = "async"))]
  50. mod blocking;
  51. #[cfg(feature = "async")]
  52. mod nonblocking;
  53. const PROGRAM_LOG: &str = "Program log: ";
  54. const PROGRAM_DATA: &str = "Program data: ";
  55. type UnsubscribeFn = Box<dyn FnOnce() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send>;
  56. /// Client defines the base configuration for building RPC clients to
  57. /// communicate with Anchor programs running on a Solana cluster. It's
  58. /// primary use is to build a `Program` client via the `program` method.
  59. pub struct Client<C> {
  60. cfg: Config<C>,
  61. }
  62. impl<C: Clone + Deref<Target = impl Signer>> Client<C> {
  63. pub fn new(cluster: Cluster, payer: C) -> Self {
  64. Self {
  65. cfg: Config {
  66. cluster,
  67. payer,
  68. options: None,
  69. },
  70. }
  71. }
  72. pub fn new_with_options(cluster: Cluster, payer: C, options: CommitmentConfig) -> Self {
  73. Self {
  74. cfg: Config {
  75. cluster,
  76. payer,
  77. options: Some(options),
  78. },
  79. }
  80. }
  81. pub fn program(&self, program_id: Pubkey) -> Result<Program<C>, ClientError> {
  82. let cfg = Config {
  83. cluster: self.cfg.cluster.clone(),
  84. options: self.cfg.options,
  85. payer: self.cfg.payer.clone(),
  86. };
  87. Program::new(program_id, cfg)
  88. }
  89. }
  90. /// Auxiliary data structure to align the types of the Solana CLI utils with Anchor client.
  91. /// Client<C> implementation requires <C: Clone + Deref<Target = impl Signer>> which does not comply with Box<dyn Signer>
  92. /// that's used when loaded Signer from keypair file. This struct is used to wrap the usage.
  93. pub struct DynSigner(pub Arc<dyn Signer>);
  94. impl Signer for DynSigner {
  95. fn pubkey(&self) -> Pubkey {
  96. self.0.pubkey()
  97. }
  98. fn try_pubkey(&self) -> Result<Pubkey, solana_sdk::signer::SignerError> {
  99. self.0.try_pubkey()
  100. }
  101. fn sign_message(&self, message: &[u8]) -> solana_sdk::signature::Signature {
  102. self.0.sign_message(message)
  103. }
  104. fn try_sign_message(
  105. &self,
  106. message: &[u8],
  107. ) -> Result<solana_sdk::signature::Signature, solana_sdk::signer::SignerError> {
  108. self.0.try_sign_message(message)
  109. }
  110. fn is_interactive(&self) -> bool {
  111. self.0.is_interactive()
  112. }
  113. }
  114. // Internal configuration for a client.
  115. #[derive(Debug)]
  116. pub struct Config<C> {
  117. cluster: Cluster,
  118. payer: C,
  119. options: Option<CommitmentConfig>,
  120. }
  121. pub struct EventUnsubscriber<'a> {
  122. handle: JoinHandle<Result<(), ClientError>>,
  123. rx: UnboundedReceiver<UnsubscribeFn>,
  124. #[cfg(not(feature = "async"))]
  125. runtime_handle: &'a Handle,
  126. _lifetime_marker: PhantomData<&'a Handle>,
  127. }
  128. impl<'a> EventUnsubscriber<'a> {
  129. async fn unsubscribe_internal(mut self) {
  130. if let Some(unsubscribe) = self.rx.recv().await {
  131. unsubscribe().await;
  132. }
  133. let _ = self.handle.await;
  134. }
  135. }
  136. /// Program is the primary client handle to be used to build and send requests.
  137. pub struct Program<C> {
  138. program_id: Pubkey,
  139. cfg: Config<C>,
  140. sub_client: Arc<RwLock<Option<PubsubClient>>>,
  141. #[cfg(not(feature = "async"))]
  142. rt: tokio::runtime::Runtime,
  143. }
  144. impl<C: Deref<Target = impl Signer> + Clone> Program<C> {
  145. pub fn payer(&self) -> Pubkey {
  146. self.cfg.payer.pubkey()
  147. }
  148. /// Returns a request builder.
  149. pub fn request(&self) -> RequestBuilder<C> {
  150. RequestBuilder::from(
  151. self.program_id,
  152. self.cfg.cluster.url(),
  153. self.cfg.payer.clone(),
  154. self.cfg.options,
  155. #[cfg(not(feature = "async"))]
  156. self.rt.handle(),
  157. )
  158. }
  159. pub fn id(&self) -> Pubkey {
  160. self.program_id
  161. }
  162. pub fn rpc(&self) -> RpcClient {
  163. RpcClient::new_with_commitment(
  164. self.cfg.cluster.url().to_string(),
  165. self.cfg.options.unwrap_or_default(),
  166. )
  167. }
  168. pub fn async_rpc(&self) -> AsyncRpcClient {
  169. AsyncRpcClient::new_with_commitment(
  170. self.cfg.cluster.url().to_string(),
  171. self.cfg.options.unwrap_or_default(),
  172. )
  173. }
  174. async fn account_internal<T: AccountDeserialize>(
  175. &self,
  176. address: Pubkey,
  177. ) -> Result<T, ClientError> {
  178. let rpc_client = AsyncRpcClient::new_with_commitment(
  179. self.cfg.cluster.url().to_string(),
  180. self.cfg.options.unwrap_or_default(),
  181. );
  182. let account = rpc_client
  183. .get_account_with_commitment(&address, CommitmentConfig::processed())
  184. .await?
  185. .value
  186. .ok_or(ClientError::AccountNotFound)?;
  187. let mut data: &[u8] = &account.data;
  188. T::try_deserialize(&mut data).map_err(Into::into)
  189. }
  190. async fn accounts_lazy_internal<T: AccountDeserialize + Discriminator>(
  191. &self,
  192. filters: Vec<RpcFilterType>,
  193. ) -> Result<ProgramAccountsIterator<T>, ClientError> {
  194. let account_type_filter =
  195. RpcFilterType::Memcmp(Memcmp::new_base58_encoded(0, &T::discriminator()));
  196. let config = RpcProgramAccountsConfig {
  197. filters: Some([vec![account_type_filter], filters].concat()),
  198. account_config: RpcAccountInfoConfig {
  199. encoding: Some(UiAccountEncoding::Base64),
  200. ..RpcAccountInfoConfig::default()
  201. },
  202. ..RpcProgramAccountsConfig::default()
  203. };
  204. Ok(ProgramAccountsIterator {
  205. inner: self
  206. .async_rpc()
  207. .get_program_accounts_with_config(&self.id(), config)
  208. .await?
  209. .into_iter()
  210. .map(|(key, account)| {
  211. Ok((key, T::try_deserialize(&mut (&account.data as &[u8]))?))
  212. }),
  213. })
  214. }
  215. async fn init_sub_client_if_needed(&self) -> Result<(), ClientError> {
  216. let lock = &self.sub_client;
  217. let mut client = lock.write().await;
  218. if client.is_none() {
  219. let sub_client = PubsubClient::new(self.cfg.cluster.ws_url()).await?;
  220. *client = Some(sub_client);
  221. }
  222. Ok(())
  223. }
  224. async fn on_internal<T: anchor_lang::Event + anchor_lang::AnchorDeserialize>(
  225. &self,
  226. f: impl Fn(&EventContext, T) + Send + 'static,
  227. ) -> Result<
  228. (
  229. JoinHandle<Result<(), ClientError>>,
  230. UnboundedReceiver<UnsubscribeFn>,
  231. ),
  232. ClientError,
  233. > {
  234. self.init_sub_client_if_needed().await?;
  235. let (tx, rx) = unbounded_channel::<_>();
  236. let config = RpcTransactionLogsConfig {
  237. commitment: self.cfg.options,
  238. };
  239. let program_id_str = self.program_id.to_string();
  240. let filter = RpcTransactionLogsFilter::Mentions(vec![program_id_str.clone()]);
  241. let lock = Arc::clone(&self.sub_client);
  242. let handle = tokio::spawn(async move {
  243. if let Some(ref client) = *lock.read().await {
  244. let (mut notifications, unsubscribe) =
  245. client.logs_subscribe(filter, config).await?;
  246. tx.send(unsubscribe).map_err(|e| {
  247. ClientError::SolanaClientPubsubError(PubsubClientError::RequestFailed {
  248. message: "Unsubscribe failed".to_string(),
  249. reason: e.to_string(),
  250. })
  251. })?;
  252. while let Some(logs) = notifications.next().await {
  253. let ctx = EventContext {
  254. signature: logs.value.signature.parse().unwrap(),
  255. slot: logs.context.slot,
  256. };
  257. let events = parse_logs_response(logs, &program_id_str);
  258. for e in events {
  259. f(&ctx, e);
  260. }
  261. }
  262. }
  263. Ok::<(), ClientError>(())
  264. });
  265. Ok((handle, rx))
  266. }
  267. }
  268. /// Iterator with items of type (Pubkey, T). Used to lazily deserialize account structs.
  269. /// Wrapper type hides the inner type from usages so the implementation can be changed.
  270. pub struct ProgramAccountsIterator<T> {
  271. inner: Map<IntoIter<(Pubkey, Account)>, AccountConverterFunction<T>>,
  272. }
  273. /// Function type that accepts solana accounts and returns deserialized anchor accounts
  274. type AccountConverterFunction<T> = fn((Pubkey, Account)) -> Result<(Pubkey, T), ClientError>;
  275. impl<T> Iterator for ProgramAccountsIterator<T> {
  276. type Item = Result<(Pubkey, T), ClientError>;
  277. fn next(&mut self) -> Option<Self::Item> {
  278. self.inner.next()
  279. }
  280. }
  281. pub fn handle_program_log<T: anchor_lang::Event + anchor_lang::AnchorDeserialize>(
  282. self_program_str: &str,
  283. l: &str,
  284. ) -> Result<(Option<T>, Option<String>, bool), ClientError> {
  285. use anchor_lang::__private::base64;
  286. use base64::engine::general_purpose::STANDARD;
  287. use base64::Engine;
  288. // Log emitted from the current program.
  289. if let Some(log) = l
  290. .strip_prefix(PROGRAM_LOG)
  291. .or_else(|| l.strip_prefix(PROGRAM_DATA))
  292. {
  293. let borsh_bytes = match STANDARD.decode(log) {
  294. Ok(borsh_bytes) => borsh_bytes,
  295. _ => {
  296. #[cfg(feature = "debug")]
  297. println!("Could not base64 decode log: {}", log);
  298. return Ok((None, None, false));
  299. }
  300. };
  301. let mut slice: &[u8] = &borsh_bytes[..];
  302. let disc: [u8; 8] = {
  303. let mut disc = [0; 8];
  304. disc.copy_from_slice(&borsh_bytes[..8]);
  305. slice = &slice[8..];
  306. disc
  307. };
  308. let mut event = None;
  309. if disc == T::discriminator() {
  310. let e: T = anchor_lang::AnchorDeserialize::deserialize(&mut slice)
  311. .map_err(|e| ClientError::LogParseError(e.to_string()))?;
  312. event = Some(e);
  313. }
  314. Ok((event, None, false))
  315. }
  316. // System log.
  317. else {
  318. let (program, did_pop) = handle_system_log(self_program_str, l);
  319. Ok((None, program, did_pop))
  320. }
  321. }
  322. pub fn handle_system_log(this_program_str: &str, log: &str) -> (Option<String>, bool) {
  323. if log.starts_with(&format!("Program {this_program_str} log:")) {
  324. (Some(this_program_str.to_string()), false)
  325. } else if log.contains("invoke") {
  326. (Some("cpi".to_string()), false) // Any string will do.
  327. } else {
  328. let re = Regex::new(r"^Program (.*) success*$").unwrap();
  329. if re.is_match(log) {
  330. (None, true)
  331. } else {
  332. (None, false)
  333. }
  334. }
  335. }
  336. pub struct Execution {
  337. stack: Vec<String>,
  338. }
  339. impl Execution {
  340. pub fn new(logs: &mut &[String]) -> Result<Self, ClientError> {
  341. let l = &logs[0];
  342. *logs = &logs[1..];
  343. let re = Regex::new(r"^Program (.*) invoke.*$").unwrap();
  344. let c = re
  345. .captures(l)
  346. .ok_or_else(|| ClientError::LogParseError(l.to_string()))?;
  347. let program = c
  348. .get(1)
  349. .ok_or_else(|| ClientError::LogParseError(l.to_string()))?
  350. .as_str()
  351. .to_string();
  352. Ok(Self {
  353. stack: vec![program],
  354. })
  355. }
  356. pub fn program(&self) -> String {
  357. assert!(!self.stack.is_empty());
  358. self.stack[self.stack.len() - 1].clone()
  359. }
  360. pub fn push(&mut self, new_program: String) {
  361. self.stack.push(new_program);
  362. }
  363. pub fn pop(&mut self) {
  364. assert!(!self.stack.is_empty());
  365. self.stack.pop().unwrap();
  366. }
  367. }
  368. #[derive(Debug)]
  369. pub struct EventContext {
  370. pub signature: Signature,
  371. pub slot: u64,
  372. }
  373. #[derive(Debug, Error)]
  374. pub enum ClientError {
  375. #[error("Account not found")]
  376. AccountNotFound,
  377. #[error("{0}")]
  378. AnchorError(#[from] anchor_lang::error::Error),
  379. #[error("{0}")]
  380. ProgramError(#[from] ProgramError),
  381. #[error("{0}")]
  382. SolanaClientError(#[from] SolanaClientError),
  383. #[error("{0}")]
  384. SolanaClientPubsubError(#[from] PubsubClientError),
  385. #[error("Unable to parse log: {0}")]
  386. LogParseError(String),
  387. #[error(transparent)]
  388. IOError(#[from] std::io::Error),
  389. }
  390. /// `RequestBuilder` provides a builder interface to create and send
  391. /// transactions to a cluster.
  392. pub struct RequestBuilder<'a, C> {
  393. cluster: String,
  394. program_id: Pubkey,
  395. accounts: Vec<AccountMeta>,
  396. options: CommitmentConfig,
  397. instructions: Vec<Instruction>,
  398. payer: C,
  399. // Serialized instruction data for the target RPC.
  400. instruction_data: Option<Vec<u8>>,
  401. signers: Vec<&'a dyn Signer>,
  402. #[cfg(not(feature = "async"))]
  403. handle: &'a Handle,
  404. }
  405. impl<'a, C: Deref<Target = impl Signer> + Clone> RequestBuilder<'a, C> {
  406. #[must_use]
  407. pub fn payer(mut self, payer: C) -> Self {
  408. self.payer = payer;
  409. self
  410. }
  411. #[must_use]
  412. pub fn cluster(mut self, url: &str) -> Self {
  413. self.cluster = url.to_string();
  414. self
  415. }
  416. #[must_use]
  417. pub fn instruction(mut self, ix: Instruction) -> Self {
  418. self.instructions.push(ix);
  419. self
  420. }
  421. #[must_use]
  422. pub fn program(mut self, program_id: Pubkey) -> Self {
  423. self.program_id = program_id;
  424. self
  425. }
  426. #[must_use]
  427. pub fn accounts(mut self, accounts: impl ToAccountMetas) -> Self {
  428. let mut metas = accounts.to_account_metas(None);
  429. self.accounts.append(&mut metas);
  430. self
  431. }
  432. #[must_use]
  433. pub fn options(mut self, options: CommitmentConfig) -> Self {
  434. self.options = options;
  435. self
  436. }
  437. #[must_use]
  438. pub fn args(mut self, args: impl InstructionData) -> Self {
  439. self.instruction_data = Some(args.data());
  440. self
  441. }
  442. #[must_use]
  443. pub fn signer(mut self, signer: &'a dyn Signer) -> Self {
  444. self.signers.push(signer);
  445. self
  446. }
  447. pub fn instructions(&self) -> Result<Vec<Instruction>, ClientError> {
  448. let mut instructions = self.instructions.clone();
  449. if let Some(ix_data) = &self.instruction_data {
  450. instructions.push(Instruction {
  451. program_id: self.program_id,
  452. data: ix_data.clone(),
  453. accounts: self.accounts.clone(),
  454. });
  455. }
  456. Ok(instructions)
  457. }
  458. fn signed_transaction_with_blockhash(
  459. &self,
  460. latest_hash: Hash,
  461. ) -> Result<Transaction, ClientError> {
  462. let instructions = self.instructions()?;
  463. let mut signers = self.signers.clone();
  464. signers.push(&*self.payer);
  465. let tx = Transaction::new_signed_with_payer(
  466. &instructions,
  467. Some(&self.payer.pubkey()),
  468. &signers,
  469. latest_hash,
  470. );
  471. Ok(tx)
  472. }
  473. pub fn transaction(&self) -> Result<Transaction, ClientError> {
  474. let instructions = &self.instructions;
  475. let tx = Transaction::new_with_payer(instructions, Some(&self.payer.pubkey()));
  476. Ok(tx)
  477. }
  478. async fn signed_transaction_internal(&self) -> Result<Transaction, ClientError> {
  479. let latest_hash =
  480. AsyncRpcClient::new_with_commitment(self.cluster.to_owned(), self.options)
  481. .get_latest_blockhash()
  482. .await?;
  483. let tx = self.signed_transaction_with_blockhash(latest_hash)?;
  484. Ok(tx)
  485. }
  486. async fn send_internal(&self) -> Result<Signature, ClientError> {
  487. let rpc_client = AsyncRpcClient::new_with_commitment(self.cluster.to_owned(), self.options);
  488. let latest_hash = rpc_client.get_latest_blockhash().await?;
  489. let tx = self.signed_transaction_with_blockhash(latest_hash)?;
  490. rpc_client
  491. .send_and_confirm_transaction(&tx)
  492. .await
  493. .map_err(Into::into)
  494. }
  495. async fn send_with_spinner_and_config_internal(
  496. &self,
  497. config: RpcSendTransactionConfig,
  498. ) -> Result<Signature, ClientError> {
  499. let rpc_client = AsyncRpcClient::new_with_commitment(self.cluster.to_owned(), self.options);
  500. let latest_hash = rpc_client.get_latest_blockhash().await?;
  501. let tx = self.signed_transaction_with_blockhash(latest_hash)?;
  502. rpc_client
  503. .send_and_confirm_transaction_with_spinner_and_config(
  504. &tx,
  505. rpc_client.commitment(),
  506. config,
  507. )
  508. .await
  509. .map_err(Into::into)
  510. }
  511. }
  512. fn parse_logs_response<T: anchor_lang::Event + anchor_lang::AnchorDeserialize>(
  513. logs: RpcResponse<RpcLogsResponse>,
  514. program_id_str: &str,
  515. ) -> Vec<T> {
  516. let mut logs = &logs.value.logs[..];
  517. let mut events: Vec<T> = Vec::new();
  518. if !logs.is_empty() {
  519. if let Ok(mut execution) = Execution::new(&mut logs) {
  520. for l in logs {
  521. // Parse the log.
  522. let (event, new_program, did_pop) = {
  523. if program_id_str == execution.program() {
  524. handle_program_log(program_id_str, l).unwrap_or_else(|e| {
  525. println!("Unable to parse log: {e}");
  526. std::process::exit(1);
  527. })
  528. } else {
  529. let (program, did_pop) = handle_system_log(program_id_str, l);
  530. (None, program, did_pop)
  531. }
  532. };
  533. // Emit the event.
  534. if let Some(e) = event {
  535. events.push(e);
  536. }
  537. // Switch program context on CPI.
  538. if let Some(new_program) = new_program {
  539. execution.push(new_program);
  540. }
  541. // Program returned.
  542. if did_pop {
  543. execution.pop();
  544. }
  545. }
  546. }
  547. }
  548. events
  549. }
  550. #[cfg(test)]
  551. mod tests {
  552. use super::*;
  553. #[test]
  554. fn new_execution() {
  555. let mut logs: &[String] =
  556. &["Program 7Y8VDzehoewALqJfyxZYMgYCnMTCDhWuGfJKUvjYWATw invoke [1]".to_string()];
  557. let exe = Execution::new(&mut logs).unwrap();
  558. assert_eq!(
  559. exe.stack[0],
  560. "7Y8VDzehoewALqJfyxZYMgYCnMTCDhWuGfJKUvjYWATw".to_string()
  561. );
  562. }
  563. #[test]
  564. fn handle_system_log_pop() {
  565. let log = "Program 7Y8VDzehoewALqJfyxZYMgYCnMTCDhWuGfJKUvjYWATw success";
  566. let (program, did_pop) = handle_system_log("asdf", log);
  567. assert_eq!(program, None);
  568. assert!(did_pop);
  569. }
  570. #[test]
  571. fn handle_system_log_no_pop() {
  572. let log = "Program 7swsTUiQ6KUK4uFYquQKg4epFRsBnvbrTf2fZQCa2sTJ qwer";
  573. let (program, did_pop) = handle_system_log("asdf", log);
  574. assert_eq!(program, None);
  575. assert!(!did_pop);
  576. }
  577. }