|
|
@@ -183,10 +183,9 @@ use {
|
|
|
RpcTransactionLogsFilter,
|
|
|
},
|
|
|
error_object::RpcErrorObject,
|
|
|
- filter::maybe_map_filters,
|
|
|
response::{
|
|
|
Response as RpcResponse, RpcBlockUpdate, RpcKeyedAccount, RpcLogsResponse,
|
|
|
- RpcSignatureResult, RpcVersionInfo, RpcVote, SlotInfo, SlotUpdate,
|
|
|
+ RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate,
|
|
|
},
|
|
|
},
|
|
|
solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature},
|
|
|
@@ -194,7 +193,7 @@ use {
|
|
|
thiserror::Error,
|
|
|
tokio::{
|
|
|
net::TcpStream,
|
|
|
- sync::{mpsc, oneshot, RwLock},
|
|
|
+ sync::{mpsc, oneshot},
|
|
|
task::JoinHandle,
|
|
|
time::{sleep, Duration},
|
|
|
},
|
|
|
@@ -265,9 +264,8 @@ type RequestMsg = (
|
|
|
#[derive(Debug)]
|
|
|
pub struct PubsubClient {
|
|
|
subscribe_sender: mpsc::UnboundedSender<SubscribeRequestMsg>,
|
|
|
- request_sender: mpsc::UnboundedSender<RequestMsg>,
|
|
|
+ _request_sender: mpsc::UnboundedSender<RequestMsg>,
|
|
|
shutdown_sender: oneshot::Sender<()>,
|
|
|
- node_version: RwLock<Option<semver::Version>>,
|
|
|
ws: JoinHandle<PubsubClientResult>,
|
|
|
}
|
|
|
|
|
|
@@ -279,14 +277,14 @@ impl PubsubClient {
|
|
|
.map_err(PubsubClientError::ConnectionError)?;
|
|
|
|
|
|
let (subscribe_sender, subscribe_receiver) = mpsc::unbounded_channel();
|
|
|
- let (request_sender, request_receiver) = mpsc::unbounded_channel();
|
|
|
+ let (_request_sender, request_receiver) = mpsc::unbounded_channel();
|
|
|
let (shutdown_sender, shutdown_receiver) = oneshot::channel();
|
|
|
|
|
|
+ #[allow(clippy::used_underscore_binding)]
|
|
|
Ok(Self {
|
|
|
subscribe_sender,
|
|
|
- request_sender,
|
|
|
+ _request_sender,
|
|
|
shutdown_sender,
|
|
|
- node_version: RwLock::new(None),
|
|
|
ws: tokio::spawn(PubsubClient::run_ws(
|
|
|
ws,
|
|
|
subscribe_receiver,
|
|
|
@@ -301,43 +299,11 @@ impl PubsubClient {
|
|
|
self.ws.await.unwrap() // WS future should not be cancelled or panicked
|
|
|
}
|
|
|
|
|
|
- pub async fn set_node_version(&self, version: semver::Version) -> Result<(), ()> {
|
|
|
- let mut w_node_version = self.node_version.write().await;
|
|
|
- *w_node_version = Some(version);
|
|
|
+ #[deprecated(since = "2.0.2", note = "PubsubClient::node_version is no longer used")]
|
|
|
+ pub async fn set_node_version(&self, _version: semver::Version) -> Result<(), ()> {
|
|
|
Ok(())
|
|
|
}
|
|
|
|
|
|
- async fn get_node_version(&self) -> PubsubClientResult<semver::Version> {
|
|
|
- let r_node_version = self.node_version.read().await;
|
|
|
- if let Some(version) = &*r_node_version {
|
|
|
- Ok(version.clone())
|
|
|
- } else {
|
|
|
- drop(r_node_version);
|
|
|
- let mut w_node_version = self.node_version.write().await;
|
|
|
- let node_version = self.get_version().await?;
|
|
|
- *w_node_version = Some(node_version.clone());
|
|
|
- Ok(node_version)
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- async fn get_version(&self) -> PubsubClientResult<semver::Version> {
|
|
|
- let (response_sender, response_receiver) = oneshot::channel();
|
|
|
- self.request_sender
|
|
|
- .send(("getVersion".to_string(), Value::Null, response_sender))
|
|
|
- .map_err(|err| PubsubClientError::ConnectionClosed(err.to_string()))?;
|
|
|
- let result = response_receiver
|
|
|
- .await
|
|
|
- .map_err(|err| PubsubClientError::ConnectionClosed(err.to_string()))??;
|
|
|
- let node_version: RpcVersionInfo = serde_json::from_value(result)?;
|
|
|
- let node_version = semver::Version::parse(&node_version.solana_core).map_err(|e| {
|
|
|
- PubsubClientError::RequestFailed {
|
|
|
- reason: format!("failed to parse cluster version: {e}"),
|
|
|
- message: "getVersion".to_string(),
|
|
|
- }
|
|
|
- })?;
|
|
|
- Ok(node_version)
|
|
|
- }
|
|
|
-
|
|
|
async fn subscribe<'a, T>(&self, operation: &str, params: Value) -> SubscribeResult<'a, T>
|
|
|
where
|
|
|
T: DeserializeOwned + Send + 'a,
|
|
|
@@ -426,22 +392,8 @@ impl PubsubClient {
|
|
|
pub async fn program_subscribe(
|
|
|
&self,
|
|
|
pubkey: &Pubkey,
|
|
|
- mut config: Option<RpcProgramAccountsConfig>,
|
|
|
+ config: Option<RpcProgramAccountsConfig>,
|
|
|
) -> SubscribeResult<'_, RpcResponse<RpcKeyedAccount>> {
|
|
|
- if let Some(ref mut config) = config {
|
|
|
- if let Some(ref mut filters) = config.filters {
|
|
|
- let node_version = self.get_node_version().await.ok();
|
|
|
- // If node does not support the pubsub `getVersion` method, assume version is old
|
|
|
- // and filters should be mapped (node_version.is_none()).
|
|
|
- maybe_map_filters(node_version, filters).map_err(|e| {
|
|
|
- PubsubClientError::RequestFailed {
|
|
|
- reason: e,
|
|
|
- message: "maybe_map_filters".to_string(),
|
|
|
- }
|
|
|
- })?;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
let params = json!([pubkey.to_string(), config]);
|
|
|
self.subscribe("program", params).await
|
|
|
}
|