Răsfoiți Sursa

client: Add async to anchor-client (#2488)

Co-authored-by: acheron <acheroncrypto@gmail.com>
Jean Marchand (Exotic Markets) 2 ani în urmă
părinte
comite
65c9d6e9b9

+ 1 - 0
CHANGELOG.md

@@ -12,6 +12,7 @@ The minor version will be incremented upon a breaking change and the patch versi
 
 ### Features
 
+- client: Add `async` feature flag to use an asynchronous anchor-client ([#2488](https://github.com/coral-xyz/anchor/pull/2488)).
 - spl: Add metadata wrappers `approve_collection_authority`, `bubblegum_set_collection_size`, `burn_edition_nft`, `burn_nft`, `revoke_collection_authority`, `set_token_standard`, `utilize`, `unverify_sized_collection_item`, `unverify_collection` ([#2430](https://github.com/coral-xyz/anchor/pull/2430))
 - spl: Add `token_program` constraint to `Token`, `Mint`, and `AssociatedToken` accounts in order to override required `token_program` fields and use different token interface implementations in the same instruction ([#2460](https://github.com/coral-xyz/anchor/pull/2460))
 - cli: Add support for Solidity programs. `anchor init` and `anchor new` take an option `--solidity` which creates solidity code rather than rust. `anchor build` and `anchor test` work accordingly ([#2421](https://github.com/coral-xyz/anchor/pull/2421))

+ 2 - 0
Cargo.lock

@@ -216,12 +216,14 @@ version = "0.27.0"
 dependencies = [
  "anchor-lang",
  "anyhow",
+ "futures",
  "regex",
  "serde",
  "solana-account-decoder",
  "solana-client",
  "solana-sdk",
  "thiserror",
+ "tokio",
  "url",
 ]
 

+ 3 - 0
client/Cargo.toml

@@ -9,6 +9,7 @@ description = "Rust client for Anchor programs"
 
 [features]
 debug = []
+async = []
 
 [dependencies]
 anchor-lang = { path = "../lang", version = "0.27.0" }
@@ -20,3 +21,5 @@ solana-sdk = "<1.17.0"
 solana-account-decoder = "<1.17.0"
 thiserror = "1.0.20"
 url = "2.2.2"
+tokio = { version = "1", features = ["rt", "sync"] }
+futures = { version = "0.3.28" }

+ 4 - 0
client/example/Cargo.toml

@@ -7,6 +7,9 @@ edition = "2021"
 
 [workspace]
 
+[features]
+async = ["anchor-client/async"]
+
 [dependencies]
 anchor-client = { path = "../", features = ["debug"] }
 basic-2 = { path = "../../examples/tutorial/basic-2/programs/basic-2", features = ["no-entrypoint"] }
@@ -17,4 +20,5 @@ events = { path = "../../tests/events/programs/events", features = ["no-entrypoi
 shellexpand = "2.1.0"
 anyhow = "1.0.32"
 clap = { version = "4.2.4", features = ["derive"] }
+tokio = { version = "1", features = ["full"] }
 solana-sdk = "<1.17.0"

+ 24 - 0
client/example/run-test.sh

@@ -74,6 +74,30 @@ main() {
         --optional-pid $optional_pid \
         --multithreaded
 
+    #
+    # Restart validator for async test
+    #
+    cleanup
+    solana-test-validator -r \
+				--bpf-program $composite_pid ../../tests/composite/target/deploy/composite.so \
+				--bpf-program $basic_2_pid ../../examples/tutorial/basic-2/target/deploy/basic_2.so \
+				--bpf-program $basic_4_pid ../../examples/tutorial/basic-4/target/deploy/basic_4.so \
+				--bpf-program $events_pid ../../tests/events/target/deploy/events.so \
+				--bpf-program $optional_pid ../../tests/optional/target/deploy/optional.so \
+				> test-validator.log &
+    sleep 5
+
+    #
+    # Run async test.
+    #
+    cargo run --features async -- \
+        --composite-pid $composite_pid \
+        --basic-2-pid $basic_2_pid \
+        --basic-4-pid $basic_4_pid \
+        --events-pid $events_pid \
+        --optional-pid $optional_pid \
+        --multithreaded
+
 }
 
 cleanup() {

+ 329 - 0
client/example/src/blocking.rs

@@ -0,0 +1,329 @@
+use anchor_client::solana_sdk::pubkey::Pubkey;
+use anchor_client::solana_sdk::signature::{Keypair, Signer};
+use anchor_client::solana_sdk::system_instruction;
+use anchor_client::{Client, Cluster};
+use anyhow::Result;
+use clap::Parser;
+use solana_sdk::commitment_config::CommitmentConfig;
+use solana_sdk::signature::read_keypair_file;
+use solana_sdk::system_program;
+// The `accounts` and `instructions` modules are generated by the framework.
+use basic_2::accounts as basic_2_accounts;
+use basic_2::instruction as basic_2_instruction;
+use basic_2::Counter;
+use events::instruction as events_instruction;
+use events::MyEvent;
+use optional::accounts::Initialize as OptionalInitialize;
+use optional::instruction as optional_instruction;
+// The `accounts` and `instructions` modules are generated by the framework.
+use basic_4::accounts as basic_4_accounts;
+use basic_4::instruction as basic_4_instruction;
+use basic_4::Counter as CounterAccount;
+// The `accounts` and `instructions` modules are generated by the framework.
+use crate::Opts;
+use composite::accounts::{Bar, CompositeUpdate, Foo, Initialize};
+use composite::instruction as composite_instruction;
+use composite::{DummyA, DummyB};
+use optional::account::{DataAccount, DataPda};
+use std::ops::Deref;
+use std::rc::Rc;
+use std::sync::Arc;
+use std::thread::sleep;
+use std::time::Duration;
+
+type TestFn<C> = &'static (dyn Fn(&Client<C>, Pubkey) -> Result<()> + Send + Sync);
+
+pub fn main() -> Result<()> {
+    let opts = Opts::parse();
+
+    // Wallet and cluster params.
+    let payer = read_keypair_file(&*shellexpand::tilde("~/.config/solana/id.json"))
+        .expect("Example requires a keypair file");
+    let url = Cluster::Custom(
+        "http://localhost:8899".to_string(),
+        "ws://127.0.0.1:8900".to_string(),
+    );
+
+    if !opts.multithreaded {
+        // Client.
+        let payer = Rc::new(payer);
+        let client =
+            Client::new_with_options(url.clone(), payer.clone(), CommitmentConfig::processed());
+
+        // Run tests on single thread with a single client using an Rc
+        println!("\nStarting single thread test...");
+        composite(&client, opts.composite_pid)?;
+        basic_2(&client, opts.basic_2_pid)?;
+        basic_4(&client, opts.basic_4_pid)?;
+
+        // Can also use references, since they deref to a signer
+        let payer: &Keypair = &payer;
+        let client = Client::new_with_options(url, payer, CommitmentConfig::processed());
+        events(&client, opts.events_pid)?;
+        optional(&client, opts.optional_pid)?;
+    } else {
+        // Client.
+        let payer = Arc::new(payer);
+        let client = Client::new_with_options(url, payer, CommitmentConfig::processed());
+        let client = Arc::new(client);
+
+        // Run tests multithreaded while sharing a client
+        println!("\nStarting multithread test...");
+        let client = Arc::new(client);
+        let tests: Vec<(TestFn<Arc<Keypair>>, Pubkey)> = vec![
+            (&composite, opts.composite_pid),
+            (&basic_2, opts.basic_2_pid),
+            (&basic_4, opts.basic_4_pid),
+            (&events, opts.events_pid),
+            (&optional, opts.optional_pid),
+        ];
+        let mut handles = vec![];
+        for (test, arg) in tests {
+            let local_client = Arc::clone(&client);
+            handles.push(std::thread::spawn(move || test(&local_client, arg)));
+        }
+        for handle in handles {
+            assert!(handle.join().unwrap().is_ok());
+        }
+    }
+
+    // Success.
+    Ok(())
+}
+
+// Runs a client for examples/tutorial/composite.
+//
+// Make sure to run a localnet with the program deploy to run this example.
+pub fn composite<C: Deref<Target = impl Signer> + Clone>(
+    client: &Client<C>,
+    pid: Pubkey,
+) -> Result<()> {
+    // Program client.
+    let program = client.program(pid)?;
+
+    // `Initialize` parameters.
+    let dummy_a = Keypair::new();
+    let dummy_b = Keypair::new();
+
+    // Build and send a transaction.
+    program
+        .request()
+        .instruction(system_instruction::create_account(
+            &program.payer(),
+            &dummy_a.pubkey(),
+            program.rpc().get_minimum_balance_for_rent_exemption(500)?,
+            500,
+            &program.id(),
+        ))
+        .instruction(system_instruction::create_account(
+            &program.payer(),
+            &dummy_b.pubkey(),
+            program.rpc().get_minimum_balance_for_rent_exemption(500)?,
+            500,
+            &program.id(),
+        ))
+        .signer(&dummy_a)
+        .signer(&dummy_b)
+        .accounts(Initialize {
+            dummy_a: dummy_a.pubkey(),
+            dummy_b: dummy_b.pubkey(),
+        })
+        .args(composite_instruction::Initialize)
+        .send()?;
+
+    // Assert the transaction worked.
+    let dummy_a_account: DummyA = program.account(dummy_a.pubkey())?;
+    let dummy_b_account: DummyB = program.account(dummy_b.pubkey())?;
+    assert_eq!(dummy_a_account.data, 0);
+    assert_eq!(dummy_b_account.data, 0);
+
+    // Build and send another transaction, using composite account parameters.
+    program
+        .request()
+        .accounts(CompositeUpdate {
+            foo: Foo {
+                dummy_a: dummy_a.pubkey(),
+            },
+            bar: Bar {
+                dummy_b: dummy_b.pubkey(),
+            },
+        })
+        .args(composite_instruction::CompositeUpdate {
+            dummy_a: 1234,
+            dummy_b: 4321,
+        })
+        .send()?;
+
+    // Assert the transaction worked.
+    let dummy_a_account: DummyA = program.account(dummy_a.pubkey())?;
+    let dummy_b_account: DummyB = program.account(dummy_b.pubkey())?;
+    assert_eq!(dummy_a_account.data, 1234);
+    assert_eq!(dummy_b_account.data, 4321);
+
+    println!("Composite success!");
+
+    Ok(())
+}
+
+// Runs a client for examples/tutorial/basic-2.
+//
+// Make sure to run a localnet with the program deploy to run this example.
+pub fn basic_2<C: Deref<Target = impl Signer> + Clone>(
+    client: &Client<C>,
+    pid: Pubkey,
+) -> Result<()> {
+    let program = client.program(pid)?;
+
+    // `Create` parameters.
+    let counter = Keypair::new();
+    let authority = program.payer();
+
+    // Build and send a transaction.
+    program
+        .request()
+        .signer(&counter)
+        .accounts(basic_2_accounts::Create {
+            counter: counter.pubkey(),
+            user: authority,
+            system_program: system_program::ID,
+        })
+        .args(basic_2_instruction::Create { authority })
+        .send()?;
+
+    let counter_account: Counter = program.account(counter.pubkey())?;
+
+    assert_eq!(counter_account.authority, authority);
+    assert_eq!(counter_account.count, 0);
+
+    println!("Basic 2 success!");
+
+    Ok(())
+}
+
+pub fn events<C: Deref<Target = impl Signer> + Clone>(
+    client: &Client<C>,
+    pid: Pubkey,
+) -> Result<()> {
+    let program = client.program(pid)?;
+
+    let (sender, receiver) = std::sync::mpsc::channel();
+    let event_unsubscriber = program.on(move |_, event: MyEvent| {
+        if sender.send(event).is_err() {
+            println!("Error while transferring the event.")
+        }
+    })?;
+
+    sleep(Duration::from_millis(1000));
+
+    program
+        .request()
+        .args(events_instruction::Initialize {})
+        .send()?;
+
+    let event = receiver.recv().unwrap();
+    assert_eq!(event.data, 5);
+    assert_eq!(event.label, "hello".to_string());
+
+    event_unsubscriber.unsubscribe();
+
+    println!("Events success!");
+
+    Ok(())
+}
+
+pub fn basic_4<C: Deref<Target = impl Signer> + Clone>(
+    client: &Client<C>,
+    pid: Pubkey,
+) -> Result<()> {
+    let program = client.program(pid)?;
+    let authority = program.payer();
+    let (counter, _) = Pubkey::find_program_address(&[b"counter"], &pid);
+
+    program
+        .request()
+        .accounts(basic_4_accounts::Initialize {
+            counter,
+            authority,
+            system_program: system_program::ID,
+        })
+        .args(basic_4_instruction::Initialize {})
+        .send()?;
+    let counter_account: CounterAccount = program.account(counter)?;
+    assert_eq!(counter_account.authority, authority);
+    assert_eq!(counter_account.count, 0);
+
+    program
+        .request()
+        .accounts(basic_4_accounts::Increment { counter, authority })
+        .args(basic_4_instruction::Increment {})
+        .send()?;
+
+    let counter_account: CounterAccount = program.account(counter)?;
+    assert_eq!(counter_account.authority, authority);
+    assert_eq!(counter_account.count, 1);
+
+    println!("Basic 4 success!");
+
+    Ok(())
+}
+
+// Runs a client for tests/optional.
+//
+// Make sure to run a localnet with the program deploy to run this example.
+pub fn optional<C: Deref<Target = impl Signer> + Clone>(
+    client: &Client<C>,
+    pid: Pubkey,
+) -> Result<()> {
+    // Program client.
+    let program = client.program(pid)?;
+
+    // `Initialize` parameters.
+    let data_account_keypair = Keypair::new();
+
+    let data_account_key = data_account_keypair.pubkey();
+
+    let data_pda_seeds = &[DataPda::PREFIX.as_ref(), data_account_key.as_ref()];
+    let data_pda_key = Pubkey::find_program_address(data_pda_seeds, &pid).0;
+    let required_keypair = Keypair::new();
+    let value: u64 = 10;
+
+    // Build and send a transaction.
+
+    program
+        .request()
+        .instruction(system_instruction::create_account(
+            &program.payer(),
+            &required_keypair.pubkey(),
+            program
+                .rpc()
+                .get_minimum_balance_for_rent_exemption(DataAccount::LEN)?,
+            DataAccount::LEN as u64,
+            &program.id(),
+        ))
+        .signer(&data_account_keypair)
+        .signer(&required_keypair)
+        .accounts(OptionalInitialize {
+            payer: Some(program.payer()),
+            required: required_keypair.pubkey(),
+            system_program: Some(system_program::id()),
+            optional_account: Some(data_account_keypair.pubkey()),
+            optional_pda: None,
+        })
+        .args(optional_instruction::Initialize { value, key: pid })
+        .send()
+        .unwrap();
+
+    // Assert the transaction worked.
+    let required: DataAccount = program.account(required_keypair.pubkey())?;
+    assert_eq!(required.data, 0);
+
+    let optional_pda = program.account::<DataPda>(data_pda_key);
+    assert!(optional_pda.is_err());
+
+    let optional_account: DataAccount = program.account(data_account_keypair.pubkey())?;
+    assert_eq!(optional_account.data, value * 2);
+
+    println!("Optional success!");
+
+    Ok(())
+}

+ 12 - 314
client/example/src/main.rs

@@ -1,33 +1,12 @@
-use anchor_client::solana_sdk::commitment_config::CommitmentConfig;
 use anchor_client::solana_sdk::pubkey::Pubkey;
-use anchor_client::solana_sdk::signature::read_keypair_file;
-use anchor_client::solana_sdk::signature::{Keypair, Signer};
-use anchor_client::solana_sdk::system_instruction;
-use anchor_client::{Client, Cluster, EventContext};
 use anyhow::Result;
-use solana_sdk::system_program;
-// The `accounts` and `instructions` modules are generated by the framework.
-use basic_2::accounts as basic_2_accounts;
-use basic_2::instruction as basic_2_instruction;
-use basic_2::Counter;
-use events::instruction as events_instruction;
-use events::MyEvent;
-use optional::accounts::Initialize as OptionalInitialize;
-use optional::instruction as optional_instruction;
-// The `accounts` and `instructions` modules are generated by the framework.
-use basic_4::accounts as basic_4_accounts;
-use basic_4::instruction as basic_4_instruction;
-use basic_4::Counter as CounterAccount;
 use clap::Parser;
-// The `accounts` and `instructions` modules are generated by the framework.
-use composite::accounts::{Bar, CompositeUpdate, Foo, Initialize};
-use composite::instruction as composite_instruction;
-use composite::{DummyA, DummyB};
-use optional::account::{DataAccount, DataPda};
-use std::ops::Deref;
-use std::rc::Rc;
-use std::sync::Arc;
-use std::time::Duration;
+
+#[cfg(not(feature = "async"))]
+mod blocking;
+
+#[cfg(feature = "async")]
+mod nonblocking;
 
 #[derive(Parser, Debug)]
 pub struct Opts {
@@ -45,296 +24,15 @@ pub struct Opts {
     multithreaded: bool,
 }
 
-type TestFn<C> = &'static (dyn Fn(&Client<C>, Pubkey) -> Result<()> + Send + Sync);
-
 // This example assumes a local validator is running with the programs
 // deployed at the addresses given by the CLI args.
+#[cfg(not(feature = "async"))]
 fn main() -> Result<()> {
-    let opts = Opts::parse();
-
-    // Wallet and cluster params.
-    let payer = read_keypair_file(&*shellexpand::tilde("~/.config/solana/id.json"))
-        .expect("Example requires a keypair file");
-    let url = Cluster::Custom(
-        "http://localhost:8899".to_string(),
-        "ws://127.0.0.1:8900".to_string(),
-    );
-
-    if !opts.multithreaded {
-        // Client.
-        let payer = Rc::new(payer);
-        let client =
-            Client::new_with_options(url.clone(), payer.clone(), CommitmentConfig::processed());
-
-        // Run tests on single thread with a single client using an Rc
-        println!("\nStarting single thread test...");
-        composite(&client, opts.composite_pid)?;
-        basic_2(&client, opts.basic_2_pid)?;
-        basic_4(&client, opts.basic_4_pid)?;
-
-        // Can also use references, since they deref to a signer
-        let payer: &Keypair = &payer;
-        let client = Client::new_with_options(url, payer, CommitmentConfig::processed());
-        events(&client, opts.events_pid)?;
-        optional(&client, opts.optional_pid)?;
-    } else {
-        // Client.
-        let payer = Arc::new(payer);
-        let client = Client::new_with_options(url, payer, CommitmentConfig::processed());
-        let client = Arc::new(client);
-
-        // Run tests multithreaded while sharing a client
-        println!("\nStarting multithread test...");
-        let client = Arc::new(client);
-        let tests: Vec<(TestFn<Arc<Keypair>>, Pubkey)> = vec![
-            (&composite, opts.composite_pid),
-            (&basic_2, opts.basic_2_pid),
-            (&basic_4, opts.basic_4_pid),
-            (&events, opts.events_pid),
-            (&optional, opts.optional_pid),
-        ];
-        let mut handles = vec![];
-        for (test, arg) in tests {
-            let local_client = Arc::clone(&client);
-            handles.push(std::thread::spawn(move || test(&local_client, arg)));
-        }
-        for handle in handles {
-            assert!(handle.join().unwrap().is_ok());
-        }
-    }
-
-    // Success.
-    Ok(())
-}
-
-// Runs a client for examples/tutorial/composite.
-//
-// Make sure to run a localnet with the program deploy to run this example.
-fn composite<C: Deref<Target = impl Signer> + Clone>(
-    client: &Client<C>,
-    pid: Pubkey,
-) -> Result<()> {
-    // Program client.
-    let program = client.program(pid);
-
-    // `Initialize` parameters.
-    let dummy_a = Keypair::new();
-    let dummy_b = Keypair::new();
-
-    // Build and send a transaction.
-    program
-        .request()
-        .instruction(system_instruction::create_account(
-            &program.payer(),
-            &dummy_a.pubkey(),
-            program.rpc().get_minimum_balance_for_rent_exemption(500)?,
-            500,
-            &program.id(),
-        ))
-        .instruction(system_instruction::create_account(
-            &program.payer(),
-            &dummy_b.pubkey(),
-            program.rpc().get_minimum_balance_for_rent_exemption(500)?,
-            500,
-            &program.id(),
-        ))
-        .signer(&dummy_a)
-        .signer(&dummy_b)
-        .accounts(Initialize {
-            dummy_a: dummy_a.pubkey(),
-            dummy_b: dummy_b.pubkey(),
-        })
-        .args(composite_instruction::Initialize)
-        .send()?;
-
-    // Assert the transaction worked.
-    let dummy_a_account: DummyA = program.account(dummy_a.pubkey())?;
-    let dummy_b_account: DummyB = program.account(dummy_b.pubkey())?;
-    assert_eq!(dummy_a_account.data, 0);
-    assert_eq!(dummy_b_account.data, 0);
-
-    // Build and send another transaction, using composite account parameters.
-    program
-        .request()
-        .accounts(CompositeUpdate {
-            foo: Foo {
-                dummy_a: dummy_a.pubkey(),
-            },
-            bar: Bar {
-                dummy_b: dummy_b.pubkey(),
-            },
-        })
-        .args(composite_instruction::CompositeUpdate {
-            dummy_a: 1234,
-            dummy_b: 4321,
-        })
-        .send()?;
-
-    // Assert the transaction worked.
-    let dummy_a_account: DummyA = program.account(dummy_a.pubkey())?;
-    let dummy_b_account: DummyB = program.account(dummy_b.pubkey())?;
-    assert_eq!(dummy_a_account.data, 1234);
-    assert_eq!(dummy_b_account.data, 4321);
-
-    println!("Composite success!");
-
-    Ok(())
-}
-
-// Runs a client for examples/tutorial/basic-2.
-//
-// Make sure to run a localnet with the program deploy to run this example.
-fn basic_2<C: Deref<Target = impl Signer> + Clone>(client: &Client<C>, pid: Pubkey) -> Result<()> {
-    let program = client.program(pid);
-
-    // `Create` parameters.
-    let counter = Keypair::new();
-    let authority = program.payer();
-
-    // Build and send a transaction.
-    program
-        .request()
-        .signer(&counter)
-        .accounts(basic_2_accounts::Create {
-            counter: counter.pubkey(),
-            user: authority,
-            system_program: system_program::ID,
-        })
-        .args(basic_2_instruction::Create { authority })
-        .send()?;
-
-    let counter_account: Counter = program.account(counter.pubkey())?;
-
-    assert_eq!(counter_account.authority, authority);
-    assert_eq!(counter_account.count, 0);
-
-    println!("Basic 2 success!");
-
-    Ok(())
+    blocking::main()
 }
 
-fn events<C: Deref<Target = impl Signer> + Clone>(client: &Client<C>, pid: Pubkey) -> Result<()> {
-    let program = client.program(pid);
-
-    let (sender, receiver) = std::sync::mpsc::channel();
-    let handle = program.on(move |_ctx: &EventContext, event: MyEvent| {
-        sender.send(event).unwrap();
-    })?;
-
-    std::thread::sleep(Duration::from_millis(1000));
-
-    program
-        .request()
-        .args(events_instruction::Initialize {})
-        .send()?;
-
-    let event = receiver.recv().unwrap();
-    assert_eq!(event.data, 5);
-    assert_eq!(event.label, "hello".to_string());
-
-    // TODO: remove once https://github.com/solana-labs/solana/issues/16102
-    //       is addressed. Until then, drop the subscription handle in another
-    //       thread so that we deadlock in the other thread as to not block
-    //       this thread.
-    std::thread::spawn(move || {
-        drop(handle);
-    });
-
-    println!("Events success!");
-
-    Ok(())
-}
-
-pub fn basic_4<C: Deref<Target = impl Signer> + Clone>(
-    client: &Client<C>,
-    pid: Pubkey,
-) -> Result<()> {
-    let program = client.program(pid);
-    let authority = program.payer();
-    let (counter, _) = Pubkey::find_program_address(&[b"counter"], &pid);
-
-    program
-        .request()
-        .accounts(basic_4_accounts::Initialize {
-            counter,
-            authority,
-            system_program: system_program::ID,
-        })
-        .args(basic_4_instruction::Initialize {})
-        .send()?;
-    let counter_account: CounterAccount = program.account(counter)?;
-    assert_eq!(counter_account.authority, authority);
-    assert_eq!(counter_account.count, 0);
-
-    program
-        .request()
-        .accounts(basic_4_accounts::Increment { counter, authority })
-        .args(basic_4_instruction::Increment {})
-        .send()?;
-
-    let counter_account: CounterAccount = program.account(counter)?;
-    assert_eq!(counter_account.authority, authority);
-    assert_eq!(counter_account.count, 1);
-
-    println!("Basic 4 success!");
-
-    Ok(())
-}
-
-// Runs a client for tests/optional.
-//
-// Make sure to run a localnet with the program deploy to run this example.
-fn optional<C: Deref<Target = impl Signer> + Clone>(client: &Client<C>, pid: Pubkey) -> Result<()> {
-    // Program client.
-    let program = client.program(pid);
-
-    // `Initialize` parameters.
-    let data_account_keypair = Keypair::new();
-
-    let data_account_key = data_account_keypair.pubkey();
-
-    let data_pda_seeds = &[DataPda::PREFIX.as_ref(), data_account_key.as_ref()];
-    let data_pda_key = Pubkey::find_program_address(data_pda_seeds, &pid).0;
-    let required_keypair = Keypair::new();
-    let value: u64 = 10;
-
-    // Build and send a transaction.
-
-    program
-        .request()
-        .instruction(system_instruction::create_account(
-            &program.payer(),
-            &required_keypair.pubkey(),
-            program
-                .rpc()
-                .get_minimum_balance_for_rent_exemption(DataAccount::LEN)?,
-            DataAccount::LEN as u64,
-            &program.id(),
-        ))
-        .signer(&data_account_keypair)
-        .signer(&required_keypair)
-        .accounts(OptionalInitialize {
-            payer: Some(program.payer()),
-            required: required_keypair.pubkey(),
-            system_program: Some(system_program::id()),
-            optional_account: Some(data_account_keypair.pubkey()),
-            optional_pda: None,
-        })
-        .args(optional_instruction::Initialize { value, key: pid })
-        .send()
-        .unwrap();
-
-    // Assert the transaction worked.
-    let required: DataAccount = program.account(required_keypair.pubkey())?;
-    assert_eq!(required.data, 0);
-
-    let optional_pda = program.account::<DataPda>(data_pda_key);
-    assert!(optional_pda.is_err());
-
-    let optional_account: DataAccount = program.account(data_account_keypair.pubkey())?;
-    assert_eq!(optional_account.data, value * 2);
-
-    println!("Optional success!");
-
-    Ok(())
+#[cfg(feature = "async")]
+#[tokio::main]
+async fn main() -> Result<()> {
+    nonblocking::main().await
 }

+ 306 - 0
client/example/src/nonblocking.rs

@@ -0,0 +1,306 @@
+use anchor_client::solana_sdk::pubkey::Pubkey;
+use anchor_client::solana_sdk::signature::{Keypair, Signer};
+use anchor_client::solana_sdk::system_instruction;
+use anchor_client::{Client, Cluster};
+use anyhow::Result;
+use clap::Parser;
+use solana_sdk::commitment_config::CommitmentConfig;
+use solana_sdk::signature::read_keypair_file;
+use solana_sdk::system_program;
+// The `accounts` and `instructions` modules are generated by the framework.
+use basic_2::accounts as basic_2_accounts;
+use basic_2::instruction as basic_2_instruction;
+use basic_2::Counter;
+use events::instruction as events_instruction;
+use events::MyEvent;
+use optional::accounts::Initialize as OptionalInitialize;
+use optional::instruction as optional_instruction;
+// The `accounts` and `instructions` modules are generated by the framework.
+use basic_4::accounts as basic_4_accounts;
+use basic_4::instruction as basic_4_instruction;
+use basic_4::Counter as CounterAccount;
+// The `accounts` and `instructions` modules are generated by the framework.
+use crate::Opts;
+use composite::accounts::{Bar, CompositeUpdate, Foo, Initialize};
+use composite::instruction as composite_instruction;
+use composite::{DummyA, DummyB};
+use optional::account::{DataAccount, DataPda};
+use std::ops::Deref;
+use std::rc::Rc;
+use std::time::Duration;
+use tokio::sync::mpsc;
+use tokio::time::sleep;
+
+pub async fn main() -> Result<()> {
+    let opts = Opts::parse();
+
+    // Wallet and cluster params.
+    let payer = read_keypair_file(&*shellexpand::tilde("~/.config/solana/id.json"))
+        .expect("Example requires a keypair file");
+    let url = Cluster::Custom(
+        "http://localhost:8899".to_string(),
+        "ws://127.0.0.1:8900".to_string(),
+    );
+
+    // Client.
+    let payer = Rc::new(payer);
+    let client =
+        Client::new_with_options(url.clone(), payer.clone(), CommitmentConfig::processed());
+
+    println!("\nStarting async test...");
+    composite(&client, opts.composite_pid).await?;
+    basic_2(&client, opts.basic_2_pid).await?;
+    basic_4(&client, opts.basic_4_pid).await?;
+
+    // Can also use references, since they deref to a signer
+    let payer: &Keypair = &payer;
+    let client = Client::new_with_options(url, payer, CommitmentConfig::processed());
+    events(&client, opts.events_pid).await?;
+    optional(&client, opts.optional_pid).await?;
+    // Success.
+    Ok(())
+}
+
+pub async fn composite<C: Deref<Target = impl Signer> + Clone>(
+    client: &Client<C>,
+    pid: Pubkey,
+) -> Result<()> {
+    // Program client.
+    let program = client.program(pid)?;
+
+    // `Initialize` parameters.
+    let dummy_a = Keypair::new();
+    let dummy_b = Keypair::new();
+
+    // Build and send a transaction.
+    program
+        .request()
+        .instruction(system_instruction::create_account(
+            &program.payer(),
+            &dummy_a.pubkey(),
+            program
+                .async_rpc()
+                .get_minimum_balance_for_rent_exemption(500)
+                .await?,
+            500,
+            &program.id(),
+        ))
+        .instruction(system_instruction::create_account(
+            &program.payer(),
+            &dummy_b.pubkey(),
+            program
+                .async_rpc()
+                .get_minimum_balance_for_rent_exemption(500)
+                .await?,
+            500,
+            &program.id(),
+        ))
+        .signer(&dummy_a)
+        .signer(&dummy_b)
+        .accounts(Initialize {
+            dummy_a: dummy_a.pubkey(),
+            dummy_b: dummy_b.pubkey(),
+        })
+        .args(composite_instruction::Initialize)
+        .send()
+        .await?;
+
+    // Assert the transaction worked.
+    let dummy_a_account: DummyA = program.account(dummy_a.pubkey()).await?;
+    let dummy_b_account: DummyB = program.account(dummy_b.pubkey()).await?;
+    assert_eq!(dummy_a_account.data, 0);
+    assert_eq!(dummy_b_account.data, 0);
+
+    // Build and send another transaction, using composite account parameters.
+    program
+        .request()
+        .accounts(CompositeUpdate {
+            foo: Foo {
+                dummy_a: dummy_a.pubkey(),
+            },
+            bar: Bar {
+                dummy_b: dummy_b.pubkey(),
+            },
+        })
+        .args(composite_instruction::CompositeUpdate {
+            dummy_a: 1234,
+            dummy_b: 4321,
+        })
+        .send()
+        .await?;
+
+    // Assert the transaction worked.
+    let dummy_a_account: DummyA = program.account(dummy_a.pubkey()).await?;
+    let dummy_b_account: DummyB = program.account(dummy_b.pubkey()).await?;
+    assert_eq!(dummy_a_account.data, 1234);
+    assert_eq!(dummy_b_account.data, 4321);
+
+    println!("Composite success!");
+
+    Ok(())
+}
+
+pub async fn basic_2<C: Deref<Target = impl Signer> + Clone>(
+    client: &Client<C>,
+    pid: Pubkey,
+) -> Result<()> {
+    let program = client.program(pid)?;
+
+    // `Create` parameters.
+    let counter = Keypair::new();
+    let authority = program.payer();
+
+    // Build and send a transaction.
+    program
+        .request()
+        .signer(&counter)
+        .accounts(basic_2_accounts::Create {
+            counter: counter.pubkey(),
+            user: authority,
+            system_program: system_program::ID,
+        })
+        .args(basic_2_instruction::Create { authority })
+        .send()
+        .await?;
+
+    let counter_account: Counter = program.account(counter.pubkey()).await?;
+
+    assert_eq!(counter_account.authority, authority);
+    assert_eq!(counter_account.count, 0);
+
+    println!("Basic 2 success!");
+
+    Ok(())
+}
+
+pub async fn events<C: Deref<Target = impl Signer> + Clone>(
+    client: &Client<C>,
+    pid: Pubkey,
+) -> Result<()> {
+    let program = client.program(pid)?;
+
+    let (sender, mut receiver) = mpsc::unbounded_channel();
+    let event_unsubscriber = program
+        .on(move |_, event: MyEvent| {
+            if sender.send(event).is_err() {
+                println!("Error while transferring the event.")
+            }
+        })
+        .await?;
+
+    sleep(Duration::from_millis(1000)).await;
+
+    program
+        .request()
+        .args(events_instruction::Initialize {})
+        .send()
+        .await?;
+
+    let event = receiver.recv().await.unwrap();
+    assert_eq!(event.data, 5);
+    assert_eq!(event.label, "hello".to_string());
+
+    event_unsubscriber.unsubscribe().await;
+
+    println!("Events success!");
+
+    Ok(())
+}
+
+pub async fn basic_4<C: Deref<Target = impl Signer> + Clone>(
+    client: &Client<C>,
+    pid: Pubkey,
+) -> Result<()> {
+    let program = client.program(pid)?;
+    let authority = program.payer();
+    let (counter, _) = Pubkey::find_program_address(&[b"counter"], &pid);
+
+    program
+        .request()
+        .accounts(basic_4_accounts::Initialize {
+            counter,
+            authority,
+            system_program: system_program::ID,
+        })
+        .args(basic_4_instruction::Initialize {})
+        .send()
+        .await?;
+    let counter_account: CounterAccount = program.account(counter).await?;
+    assert_eq!(counter_account.authority, authority);
+    assert_eq!(counter_account.count, 0);
+
+    program
+        .request()
+        .accounts(basic_4_accounts::Increment { counter, authority })
+        .args(basic_4_instruction::Increment {})
+        .send()
+        .await?;
+
+    let counter_account: CounterAccount = program.account(counter).await?;
+    assert_eq!(counter_account.authority, authority);
+    assert_eq!(counter_account.count, 1);
+
+    println!("Basic 4 success!");
+
+    Ok(())
+}
+
+pub async fn optional<C: Deref<Target = impl Signer> + Clone>(
+    client: &Client<C>,
+    pid: Pubkey,
+) -> Result<()> {
+    // Program client.
+    let program = client.program(pid)?;
+
+    // `Initialize` parameters.
+    let data_account_keypair = Keypair::new();
+
+    let data_account_key = data_account_keypair.pubkey();
+
+    let data_pda_seeds = &[DataPda::PREFIX.as_ref(), data_account_key.as_ref()];
+    let data_pda_key = Pubkey::find_program_address(data_pda_seeds, &pid).0;
+    let required_keypair = Keypair::new();
+    let value: u64 = 10;
+
+    // Build and send a transaction.
+
+    program
+        .request()
+        .instruction(system_instruction::create_account(
+            &program.payer(),
+            &required_keypair.pubkey(),
+            program
+                .async_rpc()
+                .get_minimum_balance_for_rent_exemption(DataAccount::LEN)
+                .await?,
+            DataAccount::LEN as u64,
+            &program.id(),
+        ))
+        .signer(&data_account_keypair)
+        .signer(&required_keypair)
+        .accounts(OptionalInitialize {
+            payer: Some(program.payer()),
+            required: required_keypair.pubkey(),
+            system_program: Some(system_program::id()),
+            optional_account: Some(data_account_keypair.pubkey()),
+            optional_pda: None,
+        })
+        .args(optional_instruction::Initialize { value, key: pid })
+        .send()
+        .await
+        .unwrap();
+
+    // Assert the transaction worked.
+    let required: DataAccount = program.account(required_keypair.pubkey()).await?;
+    assert_eq!(required.data, 0);
+
+    let optional_pda = program.account::<DataPda>(data_pda_key).await;
+    assert!(optional_pda.is_err());
+
+    let optional_account: DataAccount = program.account(data_account_keypair.pubkey()).await?;
+    assert_eq!(optional_account.data, value * 2);
+
+    println!("Optional success!");
+
+    Ok(())
+}

+ 109 - 0
client/src/blocking.rs

@@ -0,0 +1,109 @@
+use crate::{
+    ClientError, Config, EventContext, EventUnsubscriber, Program, ProgramAccountsIterator,
+    RequestBuilder,
+};
+use anchor_lang::{prelude::Pubkey, AccountDeserialize, Discriminator};
+use solana_client::{rpc_config::RpcSendTransactionConfig, rpc_filter::RpcFilterType};
+use solana_sdk::{
+    commitment_config::CommitmentConfig, signature::Signature, signer::Signer,
+    transaction::Transaction,
+};
+use std::{marker::PhantomData, ops::Deref, sync::Arc};
+use tokio::{
+    runtime::{Builder, Handle},
+    sync::RwLock,
+};
+
+impl<'a> EventUnsubscriber<'a> {
+    /// Unsubscribe gracefully.
+    pub fn unsubscribe(self) {
+        self.runtime_handle.block_on(self.unsubscribe_internal())
+    }
+}
+
+impl<C: Deref<Target = impl Signer> + Clone> Program<C> {
+    pub fn new(program_id: Pubkey, cfg: Config<C>) -> Result<Self, ClientError> {
+        let rt: tokio::runtime::Runtime = Builder::new_multi_thread().enable_all().build()?;
+
+        Ok(Self {
+            program_id,
+            cfg,
+            sub_client: Arc::new(RwLock::new(None)),
+            rt,
+        })
+    }
+
+    /// Returns the account at the given address.
+    pub fn account<T: AccountDeserialize>(&self, address: Pubkey) -> Result<T, ClientError> {
+        self.rt.block_on(self.account_internal(address))
+    }
+
+    /// Returns all program accounts of the given type matching the given filters
+    pub fn accounts<T: AccountDeserialize + Discriminator>(
+        &self,
+        filters: Vec<RpcFilterType>,
+    ) -> Result<Vec<(Pubkey, T)>, ClientError> {
+        self.accounts_lazy(filters)?.collect()
+    }
+
+    /// Returns all program accounts of the given type matching the given filters as an iterator
+    /// Deserialization is executed lazily
+    pub fn accounts_lazy<T: AccountDeserialize + Discriminator>(
+        &self,
+        filters: Vec<RpcFilterType>,
+    ) -> Result<ProgramAccountsIterator<T>, ClientError> {
+        self.rt.block_on(self.accounts_lazy_internal(filters))
+    }
+
+    pub fn on<T: anchor_lang::Event + anchor_lang::AnchorDeserialize>(
+        &self,
+        f: impl Fn(&EventContext, T) + Send + 'static,
+    ) -> Result<EventUnsubscriber, ClientError> {
+        let (handle, rx) = self.rt.block_on(self.on_internal(f))?;
+
+        Ok(EventUnsubscriber {
+            handle,
+            rx,
+            runtime_handle: self.rt.handle(),
+            _lifetime_marker: PhantomData,
+        })
+    }
+}
+
+impl<'a, C: Deref<Target = impl Signer> + Clone> RequestBuilder<'a, C> {
+    pub fn from(
+        program_id: Pubkey,
+        cluster: &str,
+        payer: C,
+        options: Option<CommitmentConfig>,
+        handle: &'a Handle,
+    ) -> Self {
+        Self {
+            program_id,
+            payer,
+            cluster: cluster.to_string(),
+            accounts: Vec::new(),
+            options: options.unwrap_or_default(),
+            instructions: Vec::new(),
+            instruction_data: None,
+            signers: Vec::new(),
+            handle,
+        }
+    }
+
+    pub fn signed_transaction(&self) -> Result<Transaction, ClientError> {
+        self.handle.block_on(self.signed_transaction_internal())
+    }
+
+    pub fn send(&self) -> Result<Signature, ClientError> {
+        self.handle.block_on(self.send_internal())
+    }
+
+    pub fn send_with_spinner_and_config(
+        &self,
+        config: RpcSendTransactionConfig,
+    ) -> Result<Signature, ClientError> {
+        self.handle
+            .block_on(self.send_with_spinner_and_config_internal(config))
+    }
+}

+ 196 - 138
client/src/lib.rs

@@ -6,27 +6,42 @@ use anchor_lang::solana_program::instruction::{AccountMeta, Instruction};
 use anchor_lang::solana_program::program_error::ProgramError;
 use anchor_lang::solana_program::pubkey::Pubkey;
 use anchor_lang::{AccountDeserialize, Discriminator, InstructionData, ToAccountMetas};
+use futures::{Future, StreamExt};
 use regex::Regex;
 use solana_account_decoder::UiAccountEncoding;
-use solana_client::client_error::ClientError as SolanaClientError;
-use solana_client::nonblocking::rpc_client::RpcClient as AsyncRpcClient;
-use solana_client::pubsub_client::{PubsubClient, PubsubClientError, PubsubClientSubscription};
-use solana_client::rpc_client::RpcClient;
 use solana_client::rpc_config::{
     RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcSendTransactionConfig,
     RpcTransactionLogsConfig, RpcTransactionLogsFilter,
 };
 use solana_client::rpc_filter::{Memcmp, RpcFilterType};
-use solana_client::rpc_response::{Response as RpcResponse, RpcLogsResponse};
+use solana_client::{
+    client_error::ClientError as SolanaClientError,
+    nonblocking::{
+        pubsub_client::{PubsubClient, PubsubClientError},
+        rpc_client::RpcClient as AsyncRpcClient,
+    },
+    rpc_client::RpcClient,
+    rpc_response::{Response as RpcResponse, RpcLogsResponse},
+};
 use solana_sdk::account::Account;
 use solana_sdk::commitment_config::CommitmentConfig;
 use solana_sdk::signature::{Signature, Signer};
 use solana_sdk::transaction::Transaction;
-use std::convert::Into;
 use std::iter::Map;
+use std::marker::PhantomData;
 use std::ops::Deref;
+use std::pin::Pin;
+use std::sync::Arc;
 use std::vec::IntoIter;
 use thiserror::Error;
+use tokio::{
+    runtime::Handle,
+    sync::{
+        mpsc::{unbounded_channel, UnboundedReceiver},
+        RwLock,
+    },
+    task::JoinHandle,
+};
 
 pub use anchor_lang;
 pub use cluster::Cluster;
@@ -35,12 +50,15 @@ pub use solana_sdk;
 
 mod cluster;
 
+#[cfg(not(feature = "async"))]
+mod blocking;
+#[cfg(feature = "async")]
+mod nonblocking;
+
 const PROGRAM_LOG: &str = "Program log: ";
 const PROGRAM_DATA: &str = "Program data: ";
 
-/// EventHandle unsubscribes from a program event stream on drop.
-pub type EventHandle = PubsubClientSubscription<RpcResponse<RpcLogsResponse>>;
-
+type UnsubscribeFn = Box<dyn FnOnce() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send>;
 /// Client defines the base configuration for building RPC clients to
 /// communicate with Anchor programs running on a Solana cluster. It's
 /// primary use is to build a `Program` client via the `program` method.
@@ -69,31 +87,50 @@ impl<C: Clone + Deref<Target = impl Signer>> Client<C> {
         }
     }
 
-    pub fn program(&self, program_id: Pubkey) -> Program<C> {
-        Program {
-            program_id,
-            cfg: Config {
-                cluster: self.cfg.cluster.clone(),
-                options: self.cfg.options,
-                payer: self.cfg.payer.clone(),
-            },
-        }
+    pub fn program(&self, program_id: Pubkey) -> Result<Program<C>, ClientError> {
+        let cfg = Config {
+            cluster: self.cfg.cluster.clone(),
+            options: self.cfg.options,
+            payer: self.cfg.payer.clone(),
+        };
+
+        Program::new(program_id, cfg)
     }
 }
 
 // Internal configuration for a client.
 #[derive(Debug)]
-struct Config<C> {
+pub struct Config<C> {
     cluster: Cluster,
     payer: C,
     options: Option<CommitmentConfig>,
 }
 
+pub struct EventUnsubscriber<'a> {
+    handle: JoinHandle<Result<(), ClientError>>,
+    rx: UnboundedReceiver<UnsubscribeFn>,
+    #[cfg(not(feature = "async"))]
+    runtime_handle: &'a Handle,
+    _lifetime_marker: PhantomData<&'a Handle>,
+}
+
+impl<'a> EventUnsubscriber<'a> {
+    async fn unsubscribe_internal(mut self) {
+        if let Some(unsubscribe) = self.rx.recv().await {
+            unsubscribe().await;
+        }
+
+        let _ = self.handle.await;
+    }
+}
+
 /// Program is the primary client handle to be used to build and send requests.
-#[derive(Debug)]
 pub struct Program<C> {
     program_id: Pubkey,
     cfg: Config<C>,
+    sub_client: Arc<RwLock<Option<PubsubClient>>>,
+    #[cfg(not(feature = "async"))]
+    rt: tokio::runtime::Runtime,
 }
 
 impl<C: Deref<Target = impl Signer> + Clone> Program<C> {
@@ -108,34 +145,47 @@ impl<C: Deref<Target = impl Signer> + Clone> Program<C> {
             self.cfg.cluster.url(),
             self.cfg.payer.clone(),
             self.cfg.options,
+            #[cfg(not(feature = "async"))]
+            self.rt.handle(),
+        )
+    }
+
+    pub fn id(&self) -> Pubkey {
+        self.program_id
+    }
+
+    pub fn rpc(&self) -> RpcClient {
+        RpcClient::new_with_commitment(
+            self.cfg.cluster.url().to_string(),
+            self.cfg.options.unwrap_or_default(),
         )
     }
 
-    /// Returns the account at the given address.
-    pub fn account<T: AccountDeserialize>(&self, address: Pubkey) -> Result<T, ClientError> {
-        let rpc_client = RpcClient::new_with_commitment(
+    pub fn async_rpc(&self) -> AsyncRpcClient {
+        AsyncRpcClient::new_with_commitment(
+            self.cfg.cluster.url().to_string(),
+            self.cfg.options.unwrap_or_default(),
+        )
+    }
+
+    async fn account_internal<T: AccountDeserialize>(
+        &self,
+        address: Pubkey,
+    ) -> Result<T, ClientError> {
+        let rpc_client = AsyncRpcClient::new_with_commitment(
             self.cfg.cluster.url().to_string(),
             self.cfg.options.unwrap_or_default(),
         );
         let account = rpc_client
-            .get_account_with_commitment(&address, CommitmentConfig::processed())?
+            .get_account_with_commitment(&address, CommitmentConfig::processed())
+            .await?
             .value
             .ok_or(ClientError::AccountNotFound)?;
         let mut data: &[u8] = &account.data;
         T::try_deserialize(&mut data).map_err(Into::into)
     }
 
-    /// Returns all program accounts of the given type matching the given filters
-    pub fn accounts<T: AccountDeserialize + Discriminator>(
-        &self,
-        filters: Vec<RpcFilterType>,
-    ) -> Result<Vec<(Pubkey, T)>, ClientError> {
-        self.accounts_lazy(filters)?.collect()
-    }
-
-    /// Returns all program accounts of the given type matching the given filters as an iterator
-    /// Deserialization is executed lazily
-    pub fn accounts_lazy<T: AccountDeserialize + Discriminator>(
+    async fn accounts_lazy_internal<T: AccountDeserialize + Discriminator>(
         &self,
         filters: Vec<RpcFilterType>,
     ) -> Result<ProgramAccountsIterator<T>, ClientError> {
@@ -151,8 +201,9 @@ impl<C: Deref<Target = impl Signer> + Clone> Program<C> {
         };
         Ok(ProgramAccountsIterator {
             inner: self
-                .rpc()
-                .get_program_accounts_with_config(&self.id(), config)?
+                .async_rpc()
+                .get_program_accounts_with_config(&self.id(), config)
+                .await?
                 .into_iter()
                 .map(|(key, account)| {
                     Ok((key, T::try_deserialize(&mut (&account.data as &[u8]))?))
@@ -160,86 +211,64 @@ impl<C: Deref<Target = impl Signer> + Clone> Program<C> {
         })
     }
 
-    pub fn rpc(&self) -> RpcClient {
-        RpcClient::new_with_commitment(
-            self.cfg.cluster.url().to_string(),
-            self.cfg.options.unwrap_or_default(),
-        )
-    }
+    async fn init_sub_client_if_needed(&self) -> Result<(), ClientError> {
+        let lock = &self.sub_client;
+        let mut client = lock.write().await;
 
-    pub fn async_rpc(&self) -> AsyncRpcClient {
-        AsyncRpcClient::new_with_commitment(
-            self.cfg.cluster.url().to_string(),
-            self.cfg.options.unwrap_or_default(),
-        )
-    }
+        if client.is_none() {
+            let sub_client = PubsubClient::new(self.cfg.cluster.ws_url()).await?;
+            *client = Some(sub_client);
+        }
 
-    pub fn id(&self) -> Pubkey {
-        self.program_id
+        Ok(())
     }
 
-    pub fn on<T: anchor_lang::Event + anchor_lang::AnchorDeserialize>(
+    async fn on_internal<T: anchor_lang::Event + anchor_lang::AnchorDeserialize>(
         &self,
         f: impl Fn(&EventContext, T) + Send + 'static,
-    ) -> Result<EventHandle, ClientError> {
-        let addresses = vec![self.program_id.to_string()];
-        let filter = RpcTransactionLogsFilter::Mentions(addresses);
-        let ws_url = self.cfg.cluster.ws_url().to_string();
-        let cfg = RpcTransactionLogsConfig {
+    ) -> Result<
+        (
+            JoinHandle<Result<(), ClientError>>,
+            UnboundedReceiver<UnsubscribeFn>,
+        ),
+        ClientError,
+    > {
+        self.init_sub_client_if_needed().await?;
+        let (tx, rx) = unbounded_channel::<_>();
+        let config = RpcTransactionLogsConfig {
             commitment: self.cfg.options,
         };
-        let self_program_str = self.program_id.to_string();
-        let (client, receiver) = PubsubClient::logs_subscribe(&ws_url, filter, cfg)?;
-        std::thread::spawn(move || {
-            loop {
-                match receiver.recv() {
-                    Ok(logs) => {
-                        let ctx = EventContext {
-                            signature: logs.value.signature.parse().unwrap(),
-                            slot: logs.context.slot,
-                        };
-                        let mut logs = &logs.value.logs[..];
-                        if !logs.is_empty() {
-                            if let Ok(mut execution) = Execution::new(&mut logs) {
-                                for l in logs {
-                                    // Parse the log.
-                                    let (event, new_program, did_pop) = {
-                                        if self_program_str == execution.program() {
-                                            handle_program_log(&self_program_str, l).unwrap_or_else(
-                                                |e| {
-                                                    println!("Unable to parse log: {e}");
-                                                    std::process::exit(1);
-                                                },
-                                            )
-                                        } else {
-                                            let (program, did_pop) =
-                                                handle_system_log(&self_program_str, l);
-                                            (None, program, did_pop)
-                                        }
-                                    };
-                                    // Emit the event.
-                                    if let Some(e) = event {
-                                        f(&ctx, e);
-                                    }
-                                    // Switch program context on CPI.
-                                    if let Some(new_program) = new_program {
-                                        execution.push(new_program);
-                                    }
-                                    // Program returned.
-                                    if did_pop {
-                                        execution.pop();
-                                    }
-                                }
-                            }
-                        }
-                    }
-                    Err(_err) => {
-                        return;
+        let program_id_str = self.program_id.to_string();
+        let filter = RpcTransactionLogsFilter::Mentions(vec![program_id_str.clone()]);
+
+        let lock = Arc::clone(&self.sub_client);
+
+        let handle = tokio::spawn(async move {
+            if let Some(ref client) = *lock.read().await {
+                let (mut notifications, unsubscribe) =
+                    client.logs_subscribe(filter, config).await?;
+
+                tx.send(unsubscribe).map_err(|e| {
+                    ClientError::SolanaClientPubsubError(PubsubClientError::UnexpectedMessageError(
+                        e.to_string(),
+                    ))
+                })?;
+
+                while let Some(logs) = notifications.next().await {
+                    let ctx = EventContext {
+                        signature: logs.value.signature.parse().unwrap(),
+                        slot: logs.context.slot,
+                    };
+                    let events = parse_logs_response(logs, &program_id_str);
+                    for e in events {
+                        f(&ctx, e);
                     }
                 }
             }
+            Ok::<(), ClientError>(())
         });
-        Ok(client)
+
+        Ok((handle, rx))
     }
 }
 
@@ -373,6 +402,8 @@ pub enum ClientError {
     SolanaClientPubsubError(#[from] PubsubClientError),
     #[error("Unable to parse log: {0}")]
     LogParseError(String),
+    #[error(transparent)]
+    IOError(#[from] std::io::Error),
 }
 
 /// `RequestBuilder` provides a builder interface to create and send
@@ -387,27 +418,11 @@ pub struct RequestBuilder<'a, C> {
     // Serialized instruction data for the target RPC.
     instruction_data: Option<Vec<u8>>,
     signers: Vec<&'a dyn Signer>,
+    #[cfg(not(feature = "async"))]
+    handle: &'a Handle,
 }
 
 impl<'a, C: Deref<Target = impl Signer> + Clone> RequestBuilder<'a, C> {
-    pub fn from(
-        program_id: Pubkey,
-        cluster: &str,
-        payer: C,
-        options: Option<CommitmentConfig>,
-    ) -> Self {
-        Self {
-            program_id,
-            payer,
-            cluster: cluster.to_string(),
-            accounts: Vec::new(),
-            options: options.unwrap_or_default(),
-            instructions: Vec::new(),
-            instruction_data: None,
-            signers: Vec::new(),
-        }
-    }
-
     #[must_use]
     pub fn payer(mut self, payer: C) -> Self {
         self.payer = payer;
@@ -488,36 +503,39 @@ impl<'a, C: Deref<Target = impl Signer> + Clone> RequestBuilder<'a, C> {
         Ok(tx)
     }
 
-    pub fn signed_transaction(&self) -> Result<Transaction, ClientError> {
-        let latest_hash =
-            RpcClient::new_with_commitment(&self.cluster, self.options).get_latest_blockhash()?;
-        let tx = self.signed_transaction_with_blockhash(latest_hash)?;
-
-        Ok(tx)
-    }
-
     pub fn transaction(&self) -> Result<Transaction, ClientError> {
         let instructions = &self.instructions;
         let tx = Transaction::new_with_payer(instructions, Some(&self.payer.pubkey()));
         Ok(tx)
     }
 
-    pub fn send(self) -> Result<Signature, ClientError> {
-        let rpc_client = RpcClient::new_with_commitment(&self.cluster, self.options);
-        let latest_hash = rpc_client.get_latest_blockhash()?;
+    async fn signed_transaction_internal(&self) -> Result<Transaction, ClientError> {
+        let latest_hash =
+            AsyncRpcClient::new_with_commitment(self.cluster.to_owned(), self.options)
+                .get_latest_blockhash()
+                .await?;
+        let tx = self.signed_transaction_with_blockhash(latest_hash)?;
+
+        Ok(tx)
+    }
+
+    async fn send_internal(&self) -> Result<Signature, ClientError> {
+        let rpc_client = AsyncRpcClient::new_with_commitment(self.cluster.to_owned(), self.options);
+        let latest_hash = rpc_client.get_latest_blockhash().await?;
         let tx = self.signed_transaction_with_blockhash(latest_hash)?;
 
         rpc_client
             .send_and_confirm_transaction(&tx)
+            .await
             .map_err(Into::into)
     }
 
-    pub fn send_with_spinner_and_config(
-        self,
+    async fn send_with_spinner_and_config_internal(
+        &self,
         config: RpcSendTransactionConfig,
     ) -> Result<Signature, ClientError> {
-        let rpc_client = RpcClient::new_with_commitment(&self.cluster, self.options);
-        let latest_hash = rpc_client.get_latest_blockhash()?;
+        let rpc_client = AsyncRpcClient::new_with_commitment(self.cluster.to_owned(), self.options);
+        let latest_hash = rpc_client.get_latest_blockhash().await?;
         let tx = self.signed_transaction_with_blockhash(latest_hash)?;
 
         rpc_client
@@ -526,10 +544,50 @@ impl<'a, C: Deref<Target = impl Signer> + Clone> RequestBuilder<'a, C> {
                 rpc_client.commitment(),
                 config,
             )
+            .await
             .map_err(Into::into)
     }
 }
 
+fn parse_logs_response<T: anchor_lang::Event + anchor_lang::AnchorDeserialize>(
+    logs: RpcResponse<RpcLogsResponse>,
+    program_id_str: &str,
+) -> Vec<T> {
+    let mut logs = &logs.value.logs[..];
+    let mut events: Vec<T> = Vec::new();
+    if !logs.is_empty() {
+        if let Ok(mut execution) = Execution::new(&mut logs) {
+            for l in logs {
+                // Parse the log.
+                let (event, new_program, did_pop) = {
+                    if program_id_str == execution.program() {
+                        handle_program_log(program_id_str, l).unwrap_or_else(|e| {
+                            println!("Unable to parse log: {e}");
+                            std::process::exit(1);
+                        })
+                    } else {
+                        let (program, did_pop) = handle_system_log(program_id_str, l);
+                        (None, program, did_pop)
+                    }
+                };
+                // Emit the event.
+                if let Some(e) = event {
+                    events.push(e);
+                }
+                // Switch program context on CPI.
+                if let Some(new_program) = new_program {
+                    execution.push(new_program);
+                }
+                // Program returned.
+                if did_pop {
+                    execution.pop();
+                }
+            }
+        }
+    }
+    events
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;

+ 102 - 0
client/src/nonblocking.rs

@@ -0,0 +1,102 @@
+use crate::{
+    ClientError, Config, EventContext, EventUnsubscriber, Program, ProgramAccountsIterator,
+    RequestBuilder,
+};
+use anchor_lang::{prelude::Pubkey, AccountDeserialize, Discriminator};
+use solana_client::{rpc_config::RpcSendTransactionConfig, rpc_filter::RpcFilterType};
+use solana_sdk::{
+    commitment_config::CommitmentConfig, signature::Signature, signer::Signer,
+    transaction::Transaction,
+};
+use std::{marker::PhantomData, ops::Deref, sync::Arc};
+use tokio::sync::RwLock;
+
+impl<'a> EventUnsubscriber<'a> {
+    /// Unsubscribe gracefully.
+    pub async fn unsubscribe(self) {
+        self.unsubscribe_internal().await
+    }
+}
+
+impl<C: Deref<Target = impl Signer> + Clone> Program<C> {
+    pub fn new(program_id: Pubkey, cfg: Config<C>) -> Result<Self, ClientError> {
+        Ok(Self {
+            program_id,
+            cfg,
+            sub_client: Arc::new(RwLock::new(None)),
+        })
+    }
+
+    /// Returns the account at the given address.
+    pub async fn account<T: AccountDeserialize>(&self, address: Pubkey) -> Result<T, ClientError> {
+        self.account_internal(address).await
+    }
+
+    /// Returns all program accounts of the given type matching the given filters
+    pub async fn accounts<T: AccountDeserialize + Discriminator>(
+        &self,
+        filters: Vec<RpcFilterType>,
+    ) -> Result<Vec<(Pubkey, T)>, ClientError> {
+        self.accounts_lazy(filters).await?.collect()
+    }
+
+    /// Returns all program accounts of the given type matching the given filters as an iterator
+    /// Deserialization is executed lazily
+    pub async fn accounts_lazy<T: AccountDeserialize + Discriminator>(
+        &self,
+        filters: Vec<RpcFilterType>,
+    ) -> Result<ProgramAccountsIterator<T>, ClientError> {
+        self.accounts_lazy_internal(filters).await
+    }
+
+    /// Subscribe to program logs.
+    ///
+    /// Returns an [`EventUnsubscriber`] to unsubscribe and close connection gracefully.
+    pub async fn on<T: anchor_lang::Event + anchor_lang::AnchorDeserialize>(
+        &self,
+        f: impl Fn(&EventContext, T) + Send + 'static,
+    ) -> Result<EventUnsubscriber, ClientError> {
+        let (handle, rx) = self.on_internal(f).await?;
+
+        Ok(EventUnsubscriber {
+            handle,
+            rx,
+            _lifetime_marker: PhantomData,
+        })
+    }
+}
+
+impl<'a, C: Deref<Target = impl Signer> + Clone> RequestBuilder<'a, C> {
+    pub fn from(
+        program_id: Pubkey,
+        cluster: &str,
+        payer: C,
+        options: Option<CommitmentConfig>,
+    ) -> Self {
+        Self {
+            program_id,
+            payer,
+            cluster: cluster.to_string(),
+            accounts: Vec::new(),
+            options: options.unwrap_or_default(),
+            instructions: Vec::new(),
+            instruction_data: None,
+            signers: Vec::new(),
+        }
+    }
+
+    pub async fn signed_transaction(&self) -> Result<Transaction, ClientError> {
+        self.signed_transaction_internal().await
+    }
+
+    pub async fn send(self) -> Result<Signature, ClientError> {
+        self.send_internal().await
+    }
+
+    pub async fn send_with_spinner_and_config(
+        self,
+        config: RpcSendTransactionConfig,
+    ) -> Result<Signature, ClientError> {
+        self.send_with_spinner_and_config_internal(config).await
+    }
+}

+ 1 - 1
tests/zero-copy/programs/zero-copy/Cargo.toml

@@ -21,5 +21,5 @@ anchor-lang = { path = "../../../../lang" }
 bytemuck = {version = "1.4.0", features = ["derive", "min_const_generics"]}
 
 [dev-dependencies]
-anchor-client = { path = "../../../../client", features = ["debug"] }
+anchor-client = { path = "../../../../client", features = ["debug", "async"] }
 solana-program-test = "<1.17.0"

+ 1 - 1
tests/zero-copy/programs/zero-copy/tests/compute_unit_test.rs

@@ -46,7 +46,7 @@ async fn update_foo() {
         Rc::new(Keypair::new()),
         CommitmentConfig::processed(),
     );
-    let program = client.program(zero_copy::id());
+    let program = client.program(zero_copy::id()).unwrap();
     let update_ix = program
         .request()
         .accounts(zero_copy::accounts::UpdateFoo {