Prechádzať zdrojové kódy

refactor(hermes): use tokio sync primitives in p2p

Ali Behjati 2 rokov pred
rodič
commit
518ab13e03
3 zmenil súbory, kde vykonal 22 pridanie a 30 odobranie
  1. 1 1
      hermes/Cargo.lock
  2. 1 1
      hermes/Cargo.toml
  3. 20 28
      hermes/src/network/p2p.rs

+ 1 - 1
hermes/Cargo.lock

@@ -1764,7 +1764,7 @@ checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
 
 [[package]]
 name = "hermes"
-version = "0.1.15"
+version = "0.1.16"
 dependencies = [
  "anyhow",
  "axum",

+ 1 - 1
hermes/Cargo.toml

@@ -1,6 +1,6 @@
 [package]
 name    = "hermes"
-version = "0.1.15"
+version = "0.1.16"
 edition = "2021"
 
 [dependencies]

+ 20 - 28
hermes/src/network/p2p.rs

@@ -30,14 +30,16 @@ use {
         },
         sync::{
             atomic::Ordering,
-            mpsc::{
-                Receiver,
-                Sender,
-            },
             Arc,
-            Mutex,
         },
     },
+    tokio::sync::{
+        mpsc::{
+            Receiver,
+            Sender,
+        },
+        Mutex,
+    },
     wormhole_sdk::{
         Address,
         Chain,
@@ -63,16 +65,17 @@ pub struct ObservationC {
 
 pub type Vaa = Vec<u8>;
 
+pub const CHANNEL_SIZE: usize = 1000;
+
 // A Static Channel to pipe the `Observation` from the callback into the local Rust handler for
 // observation messages. It has to be static for now because there's no way to capture state in
 // the callback passed into Go-land.
-// TODO: Move this channel to the module level that spawns the services
 lazy_static::lazy_static! {
     pub static ref OBSERVATIONS: (
         Mutex<Sender<Vaa>>,
         Mutex<Receiver<Vaa>>,
     ) = {
-        let (tx, rc) = std::sync::mpsc::channel();
+        let (tx, rc) = tokio::sync::mpsc::channel(CHANNEL_SIZE);
         (Mutex::new(tx), Mutex::new(rc))
     };
 }
@@ -119,9 +122,9 @@ extern "C" fn proxy(o: ObservationC) {
     // us to recover from it.
     if OBSERVATIONS
         .0
-        .lock()
+        .blocking_lock()
+        .blocking_send(vaa)
         .map_err(|_| ())
-        .and_then(|tx| tx.send(vaa).map_err(|_| ()))
         .is_err()
     {
         crate::SHOULD_EXIT.store(true, Ordering::Release);
@@ -193,31 +196,20 @@ pub async fn spawn(opts: RunOptions, store: Arc<Store>) -> Result<()> {
         // Listen in the background for new VAA's from the p2p layer
         // and update the state accordingly.
         while !crate::SHOULD_EXIT.load(Ordering::Acquire) {
-            let vaa = tokio::task::spawn_blocking(|| {
-                let observation = OBSERVATIONS.1.lock();
-                let observation = match observation {
-                    Ok(observation) => observation,
-                    Err(e) => {
-                        // This should never happen, but if it does, we want to panic and crash
-                        // as it is not recoverable.
-                        tracing::error!(error = ?e, "Failed to lock p2p observation channel.");
-                        crate::SHOULD_EXIT.store(true, Ordering::Release);
-                        return Err(anyhow::anyhow!("Failed to lock p2p observation channel"));
-                    }
-                };
+            let vaa = {
+                let mut observation = OBSERVATIONS.1.lock().await;
 
-                match observation.recv() {
-                    Ok(vaa_bytes) => Ok(vaa_bytes),
-                    Err(e) => {
+                match observation.recv().await {
+                    Some(vaa) => vaa,
+                    None => {
                         // This should never happen, but if it does, we want to shutdown the
                         // application as it is unrecoverable.
-                        tracing::error!(error = ?e, "Failed to receive p2p observation.");
+                        tracing::error!("Failed to receive p2p observation. Channel closed.");
                         crate::SHOULD_EXIT.store(true, Ordering::Release);
-                        Err(anyhow::anyhow!("Failed to receive p2p observation."))
+                        return Err(anyhow::anyhow!("Failed to receive p2p observation."));
                     }
                 }
-            })
-            .await??;
+            };
 
             let store = store.clone();
             tokio::spawn(async move {