|
|
@@ -258,9 +258,9 @@ type RequestMsg = (
|
|
|
/// See the [module documentation][self].
|
|
|
#[derive(Debug)]
|
|
|
pub struct PubsubClient {
|
|
|
- subscribe_tx: mpsc::UnboundedSender<SubscribeRequestMsg>,
|
|
|
- request_tx: mpsc::UnboundedSender<RequestMsg>,
|
|
|
- shutdown_tx: oneshot::Sender<()>,
|
|
|
+ subscribe_sender: mpsc::UnboundedSender<SubscribeRequestMsg>,
|
|
|
+ request_sender: mpsc::UnboundedSender<RequestMsg>,
|
|
|
+ shutdown_sender: oneshot::Sender<()>,
|
|
|
node_version: RwLock<Option<semver::Version>>,
|
|
|
ws: JoinHandle<PubsubClientResult>,
|
|
|
}
|
|
|
@@ -272,26 +272,26 @@ impl PubsubClient {
|
|
|
.await
|
|
|
.map_err(PubsubClientError::ConnectionError)?;
|
|
|
|
|
|
- let (subscribe_tx, subscribe_rx) = mpsc::unbounded_channel();
|
|
|
- let (request_tx, request_rx) = mpsc::unbounded_channel();
|
|
|
- let (shutdown_tx, shutdown_rx) = oneshot::channel();
|
|
|
+ let (subscribe_sender, subscribe_receiver) = mpsc::unbounded_channel();
|
|
|
+ let (request_sender, request_receiver) = mpsc::unbounded_channel();
|
|
|
+ let (shutdown_sender, shutdown_receiver) = oneshot::channel();
|
|
|
|
|
|
Ok(Self {
|
|
|
- subscribe_tx,
|
|
|
- request_tx,
|
|
|
- shutdown_tx,
|
|
|
+ subscribe_sender,
|
|
|
+ request_sender,
|
|
|
+ shutdown_sender,
|
|
|
node_version: RwLock::new(None),
|
|
|
ws: tokio::spawn(PubsubClient::run_ws(
|
|
|
ws,
|
|
|
- subscribe_rx,
|
|
|
- request_rx,
|
|
|
- shutdown_rx,
|
|
|
+ subscribe_receiver,
|
|
|
+ request_receiver,
|
|
|
+ shutdown_receiver,
|
|
|
)),
|
|
|
})
|
|
|
}
|
|
|
|
|
|
pub async fn shutdown(self) -> PubsubClientResult {
|
|
|
- let _ = self.shutdown_tx.send(());
|
|
|
+ let _ = self.shutdown_sender.send(());
|
|
|
self.ws.await.unwrap() // WS future should not be cancelled or panicked
|
|
|
}
|
|
|
|
|
|
@@ -315,11 +315,11 @@ impl PubsubClient {
|
|
|
}
|
|
|
|
|
|
async fn get_version(&self) -> PubsubClientResult<semver::Version> {
|
|
|
- let (response_tx, response_rx) = oneshot::channel();
|
|
|
- self.request_tx
|
|
|
- .send(("getVersion".to_string(), Value::Null, response_tx))
|
|
|
+ let (response_sender, response_receiver) = oneshot::channel();
|
|
|
+ self.request_sender
|
|
|
+ .send(("getVersion".to_string(), Value::Null, response_sender))
|
|
|
.map_err(|err| PubsubClientError::ConnectionClosed(err.to_string()))?;
|
|
|
- let result = response_rx
|
|
|
+ let result = response_receiver
|
|
|
.await
|
|
|
.map_err(|err| PubsubClientError::ConnectionClosed(err.to_string()))??;
|
|
|
let node_version: RpcVersionInfo = serde_json::from_value(result)?;
|
|
|
@@ -336,12 +336,12 @@ impl PubsubClient {
|
|
|
where
|
|
|
T: DeserializeOwned + Send + 'a,
|
|
|
{
|
|
|
- let (response_tx, response_rx) = oneshot::channel();
|
|
|
- self.subscribe_tx
|
|
|
- .send((operation.to_string(), params, response_tx))
|
|
|
+ let (response_sender, response_receiver) = oneshot::channel();
|
|
|
+ self.subscribe_sender
|
|
|
+ .send((operation.to_string(), params, response_sender))
|
|
|
.map_err(|err| PubsubClientError::ConnectionClosed(err.to_string()))?;
|
|
|
|
|
|
- let (notifications, unsubscribe) = response_rx
|
|
|
+ let (notifications, unsubscribe) = response_receiver
|
|
|
.await
|
|
|
.map_err(|err| PubsubClientError::ConnectionClosed(err.to_string()))??;
|
|
|
Ok((
|
|
|
@@ -528,9 +528,9 @@ impl PubsubClient {
|
|
|
|
|
|
async fn run_ws(
|
|
|
mut ws: WebSocketStream<MaybeTlsStream<TcpStream>>,
|
|
|
- mut subscribe_rx: mpsc::UnboundedReceiver<SubscribeRequestMsg>,
|
|
|
- mut request_rx: mpsc::UnboundedReceiver<RequestMsg>,
|
|
|
- mut shutdown_rx: oneshot::Receiver<()>,
|
|
|
+ mut subscribe_receiver: mpsc::UnboundedReceiver<SubscribeRequestMsg>,
|
|
|
+ mut request_receiver: mpsc::UnboundedReceiver<RequestMsg>,
|
|
|
+ mut shutdown_receiver: oneshot::Receiver<()>,
|
|
|
) -> PubsubClientResult {
|
|
|
let mut request_id: u64 = 0;
|
|
|
|
|
|
@@ -538,12 +538,12 @@ impl PubsubClient {
|
|
|
let mut requests_unsubscribe = BTreeMap::<u64, oneshot::Sender<()>>::new();
|
|
|
let mut other_requests = BTreeMap::new();
|
|
|
let mut subscriptions = BTreeMap::new();
|
|
|
- let (unsubscribe_tx, mut unsubscribe_rx) = mpsc::unbounded_channel();
|
|
|
+ let (unsubscribe_sender, mut unsubscribe_receiver) = mpsc::unbounded_channel();
|
|
|
|
|
|
loop {
|
|
|
tokio::select! {
|
|
|
// Send close on shutdown signal
|
|
|
- _ = (&mut shutdown_rx) => {
|
|
|
+ _ = (&mut shutdown_receiver) => {
|
|
|
let frame = CloseFrame { code: CloseCode::Normal, reason: "".into() };
|
|
|
ws.send(Message::Close(Some(frame))).await?;
|
|
|
ws.flush().await?;
|
|
|
@@ -554,28 +554,28 @@ impl PubsubClient {
|
|
|
ws.send(Message::Ping(Vec::new())).await?;
|
|
|
},
|
|
|
// Read message for subscribe
|
|
|
- Some((operation, params, response_tx)) = subscribe_rx.recv() => {
|
|
|
+ Some((operation, params, response_sender)) = subscribe_receiver.recv() => {
|
|
|
request_id += 1;
|
|
|
let method = format!("{operation}Subscribe");
|
|
|
let text = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":params}).to_string();
|
|
|
ws.send(Message::Text(text)).await?;
|
|
|
- requests_subscribe.insert(request_id, (operation, response_tx));
|
|
|
+ requests_subscribe.insert(request_id, (operation, response_sender));
|
|
|
},
|
|
|
// Read message for unsubscribe
|
|
|
- Some((operation, sid, response_tx)) = unsubscribe_rx.recv() => {
|
|
|
+ Some((operation, sid, response_sender)) = unsubscribe_receiver.recv() => {
|
|
|
subscriptions.remove(&sid);
|
|
|
request_id += 1;
|
|
|
let method = format!("{operation}Unsubscribe");
|
|
|
let text = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":[sid]}).to_string();
|
|
|
ws.send(Message::Text(text)).await?;
|
|
|
- requests_unsubscribe.insert(request_id, response_tx);
|
|
|
+ requests_unsubscribe.insert(request_id, response_sender);
|
|
|
},
|
|
|
// Read message for other requests
|
|
|
- Some((method, params, response_tx)) = request_rx.recv() => {
|
|
|
+ Some((method, params, response_sender)) = request_receiver.recv() => {
|
|
|
request_id += 1;
|
|
|
let text = json!({"jsonrpc":"2.0","id":request_id,"method":method,"params":params}).to_string();
|
|
|
ws.send(Message::Text(text)).await?;
|
|
|
- other_requests.insert(request_id, response_tx);
|
|
|
+ other_requests.insert(request_id, response_sender);
|
|
|
}
|
|
|
// Read incoming WebSocket message
|
|
|
next_msg = ws.next() => {
|
|
|
@@ -621,26 +621,26 @@ impl PubsubClient {
|
|
|
}
|
|
|
});
|
|
|
|
|
|
- if let Some(response_tx) = other_requests.remove(&id) {
|
|
|
+ if let Some(response_sender) = other_requests.remove(&id) {
|
|
|
match err {
|
|
|
Some(reason) => {
|
|
|
- let _ = response_tx.send(Err(PubsubClientError::RequestFailed { reason, message: text.clone()}));
|
|
|
+ let _ = response_sender.send(Err(PubsubClientError::RequestFailed { reason, message: text.clone()}));
|
|
|
},
|
|
|
None => {
|
|
|
let json_result = json.get("result").ok_or_else(|| {
|
|
|
PubsubClientError::RequestFailed { reason: "missing `result` field".into(), message: text.clone() }
|
|
|
})?;
|
|
|
- if response_tx.send(Ok(json_result.clone())).is_err() {
|
|
|
+ if response_sender.send(Ok(json_result.clone())).is_err() {
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- } else if let Some(response_tx) = requests_unsubscribe.remove(&id) {
|
|
|
- let _ = response_tx.send(()); // do not care if receiver is closed
|
|
|
- } else if let Some((operation, response_tx)) = requests_subscribe.remove(&id) {
|
|
|
+ } else if let Some(response_sender) = requests_unsubscribe.remove(&id) {
|
|
|
+ let _ = response_sender.send(()); // do not care if receiver is closed
|
|
|
+ } else if let Some((operation, response_sender)) = requests_subscribe.remove(&id) {
|
|
|
match err {
|
|
|
Some(reason) => {
|
|
|
- let _ = response_tx.send(Err(PubsubClientError::SubscribeFailed { reason, message: text.clone()}));
|
|
|
+ let _ = response_sender.send(Err(PubsubClientError::SubscribeFailed { reason, message: text.clone()}));
|
|
|
},
|
|
|
None => {
|
|
|
// Subscribe Id
|
|
|
@@ -649,20 +649,20 @@ impl PubsubClient {
|
|
|
})?;
|
|
|
|
|
|
// Create notifications channel and unsubscribe function
|
|
|
- let (notifications_tx, notifications_rx) = mpsc::unbounded_channel();
|
|
|
- let unsubscribe_tx = unsubscribe_tx.clone();
|
|
|
+ let (notifications_sender, notifications_receiver) = mpsc::unbounded_channel();
|
|
|
+ let unsubscribe_sender = unsubscribe_sender.clone();
|
|
|
let unsubscribe = Box::new(move || async move {
|
|
|
- let (response_tx, response_rx) = oneshot::channel();
|
|
|
+ let (response_sender, response_receiver) = oneshot::channel();
|
|
|
// do nothing if ws already closed
|
|
|
- if unsubscribe_tx.send((operation, sid, response_tx)).is_ok() {
|
|
|
- let _ = response_rx.await; // channel can be closed only if ws is closed
|
|
|
+ if unsubscribe_sender.send((operation, sid, response_sender)).is_ok() {
|
|
|
+ let _ = response_receiver.await; // channel can be closed only if ws is closed
|
|
|
}
|
|
|
}.boxed());
|
|
|
|
|
|
- if response_tx.send(Ok((notifications_rx, unsubscribe))).is_err() {
|
|
|
+ if response_sender.send(Ok((notifications_receiver, unsubscribe))).is_err() {
|
|
|
break;
|
|
|
}
|
|
|
- subscriptions.insert(sid, notifications_tx);
|
|
|
+ subscriptions.insert(sid, notifications_sender);
|
|
|
}
|
|
|
}
|
|
|
} else {
|
|
|
@@ -678,9 +678,9 @@ impl PubsubClient {
|
|
|
if let Some(sid) = params.get("subscription").and_then(Value::as_u64) {
|
|
|
let mut unsubscribe_required = false;
|
|
|
|
|
|
- if let Some(notifications_tx) = subscriptions.get(&sid) {
|
|
|
+ if let Some(notifications_sender) = subscriptions.get(&sid) {
|
|
|
if let Some(result) = params.remove("result") {
|
|
|
- if notifications_tx.send(result).is_err() {
|
|
|
+ if notifications_sender.send(result).is_err() {
|
|
|
unsubscribe_required = true;
|
|
|
}
|
|
|
}
|
|
|
@@ -691,8 +691,8 @@ impl PubsubClient {
|
|
|
if unsubscribe_required {
|
|
|
if let Some(Value::String(method)) = json.remove("method") {
|
|
|
if let Some(operation) = method.strip_suffix("Notification") {
|
|
|
- let (response_tx, _response_rx) = oneshot::channel();
|
|
|
- let _ = unsubscribe_tx.send((operation.to_string(), sid, response_tx));
|
|
|
+ let (response_sender, _response_receiver) = oneshot::channel();
|
|
|
+ let _ = unsubscribe_sender.send((operation.to_string(), sid, response_sender));
|
|
|
}
|
|
|
}
|
|
|
}
|