Explorar el Código

refactor(hermes): clean exit

Reisen hace 2 años
padre
commit
5edcce4530
Se han modificado 6 ficheros con 127 adiciones y 68 borrados
  1. 20 14
      hermes/src/api.rs
  2. 42 20
      hermes/src/main.rs
  3. 27 14
      hermes/src/network/p2p.rs
  4. 36 19
      hermes/src/network/pythnet.rs
  5. 1 1
      hermes/src/store.rs
  6. 1 0
      hermes/src/store/storage.rs

+ 20 - 14
hermes/src/api.rs

@@ -11,7 +11,10 @@ use {
         Router,
     },
     serde_qs::axum::QsQueryConfig,
-    std::sync::Arc,
+    std::sync::{
+        atomic::Ordering,
+        Arc,
+    },
     tokio::{
         signal,
         sync::mpsc::Receiver,
@@ -95,24 +98,26 @@ pub async fn run(opts: RunOptions, store: Arc<Store>, mut update_rx: Receiver<()
         .with_state(state.clone())
         // Permissive CORS layer to allow all origins
         .layer(CorsLayer::permissive())
-        // non-strict mode permits escaped [] in URL parameters.
-        // 5 is the allowed depth (also the default value for this parameter).
+        // Non-strict mode permits escaped [] in URL parameters. 5 is the allowed depth (also the
+        // default value for this parameter).
         .layer(Extension(QsQueryConfig::new(5, false)));
 
-
     // Call dispatch updates to websocket every 1 seconds
     // FIXME use a channel to get updates from the store
     tokio::spawn(async move {
-        loop {
-            // Panics if the update channel is closed, which should never happen.
-            // If it happens we have no way to recover, so we just panic.
-            update_rx
-                .recv()
-                .await
-                .expect("state update channel is closed");
+        while !crate::SHOULD_EXIT.load(Ordering::Acquire) {
+            // Causes a full application shutdown if an error occurs, we can't recover from this so
+            // we just quit.
+            if update_rx.recv().await.is_none() {
+                log::error!("Failed to receive update from store.");
+                crate::SHOULD_EXIT.store(true, Ordering::Release);
+                break;
+            }
 
             notify_updates(state.ws.clone()).await;
         }
+
+        log::info!("Shutting down websocket updates...")
     });
 
     // Binds the axum's server to the configured address and port. This is a blocking call and will
@@ -120,9 +125,10 @@ pub async fn run(opts: RunOptions, store: Arc<Store>, mut update_rx: Receiver<()
     axum::Server::try_bind(&opts.api_addr)?
         .serve(app.into_make_service())
         .with_graceful_shutdown(async {
-            signal::ctrl_c()
-                .await
-                .expect("Ctrl-c signal handler failed.");
+            // Ignore Ctrl+C errors, either way we need to shut down. The main Ctrl+C handler
+            // should also have triggered so we will let that one print the shutdown warning.
+            let _ = signal::ctrl_c().await;
+            crate::SHOULD_EXIT.store(true, Ordering::Release);
         })
         .await?;
 

+ 42 - 20
hermes/src/main.rs

@@ -5,7 +5,10 @@
 use {
     crate::store::Store,
     anyhow::Result,
+    futures::future::join_all,
+    std::sync::atomic::AtomicBool,
     structopt::StructOpt,
+    tokio::spawn,
 };
 
 mod api;
@@ -15,6 +18,14 @@ mod macros;
 mod network;
 mod store;
 
+// A static exit flag to indicate to running threads that we're shutting down. This is used to
+// gracefully shutdown the application.
+//
+// NOTE: A more idiomatic approach would be to use a tokio::sync::broadcast channel, and to send a
+// shutdown signal to all running tasks. However, this is a bit more complicated to implement and
+// we don't rely on global state for anything else.
+pub(crate) static SHOULD_EXIT: AtomicBool = AtomicBool::new(false);
+
 /// Initialize the Application. This can be invoked either by real main, or by the Geyser plugin.
 async fn init() -> Result<()> {
     log::info!("Initializing Hermes...");
@@ -29,11 +40,29 @@ async fn init() -> Result<()> {
             let (update_tx, update_rx) = tokio::sync::mpsc::channel(1000);
 
             // Initialize a cache store with a 1000 element circular buffer.
-            let store = Store::new(update_tx, 1000);
+            let store = Store::new(update_tx.clone(), 1000);
+
+            // Listen for Ctrl+C so we can set the exit flag and wait for a graceful shutdown. We
+            // also send off any notifications needed to close off any waiting tasks.
+            spawn(async move {
+                tokio::signal::ctrl_c().await.unwrap();
+                log::info!("Shut down signal received, waiting for tasks...");
+                SHOULD_EXIT.store(true, std::sync::atomic::Ordering::Release);
+                let _ = update_tx.send(()).await;
+            });
 
-            network::p2p::spawn(opts.clone(), store.clone()).await?;
-            network::pythnet::spawn(opts.clone(), store.clone()).await?;
-            api::run(opts.clone(), store.clone(), update_rx).await?;
+            // Spawn all worker tasks, and wait for all to complete (which will happen if a shutdown
+            // signal has been observed).
+            let tasks = join_all([
+                Box::pin(spawn(network::p2p::spawn(opts.clone(), store.clone()))),
+                Box::pin(spawn(network::pythnet::spawn(opts.clone(), store.clone()))),
+                Box::pin(spawn(api::run(opts.clone(), store.clone(), update_rx))),
+            ])
+            .await;
+
+            for task in tasks {
+                task??;
+            }
         }
     }
 
@@ -41,23 +70,16 @@ async fn init() -> Result<()> {
 }
 
 #[tokio::main]
-async fn main() -> Result<!> {
+async fn main() -> Result<()> {
     env_logger::init();
 
-    tokio::spawn(async move {
-        // Launch the application. If it fails, print the full backtrace and exit. RUST_BACKTRACE
-        // should be set to 1 for this otherwise it will only print the top-level error.
-        if let Err(result) = init().await {
-            eprintln!("{}", result.backtrace());
-            for cause in result.chain() {
-                eprintln!("{cause}");
-            }
-            std::process::exit(1);
-        }
-    });
+    // Launch the application. If it fails, print the full backtrace and exit. RUST_BACKTRACE
+    // should be set to 1 for this otherwise it will only print the top-level error.
+    if let Err(result) = init().await {
+        eprintln!("{}", result.backtrace());
+        result.chain().for_each(|cause| eprintln!("{cause}"));
+        std::process::exit(1);
+    }
 
-    // TODO: Setup a Ctrl-C handler that waits. We use process::exit(0) for now but we should have
-    // a graceful shutdown with an AtomicBool or similar before production.
-    tokio::signal::ctrl_c().await?;
-    std::process::exit(0);
+    Ok(())
 }

+ 27 - 14
hermes/src/network/p2p.rs

@@ -25,6 +25,7 @@ use {
             CString,
         },
         sync::{
+            atomic::Ordering,
             mpsc::{
                 Receiver,
                 Sender,
@@ -76,13 +77,15 @@ extern "C" fn proxy(o: ObservationC) {
     let vaa = unsafe { std::slice::from_raw_parts(o.vaa, o.vaa_len) }.to_owned();
     // The chances of the mutex getting poisioned is very low and if it happens
     // there is no way for us to recover from it.
-    if let Err(e) = OBSERVATIONS
+    if OBSERVATIONS
         .0
         .lock()
-        .expect("Cannot acquire p2p channel lock")
-        .send(vaa)
+        .map_err(|_| ())
+        .and_then(|tx| tx.send(vaa).map_err(|_| ()))
+        .is_err()
     {
-        log::error!("Failed to send observation: {}", e);
+        log::error!("Failed to lock p2p observation channel or to send observation.");
+        crate::SHOULD_EXIT.store(true, Ordering::Release);
     }
 }
 
@@ -129,18 +132,22 @@ pub async fn spawn(opts: RunOptions, store: Arc<Store>) -> Result<()> {
     log::info!("Starting P2P server on {:?}", opts.wh_listen_addrs);
 
     std::thread::spawn(|| {
-        bootstrap(
+        if bootstrap(
             opts.wh_network_id,
             opts.wh_bootstrap_addrs,
             opts.wh_listen_addrs,
         )
-        .unwrap()
+        .is_err()
+        {
+            log::error!("Failed to bootstrap P2P server.");
+            crate::SHOULD_EXIT.store(true, Ordering::Release);
+        }
     });
 
     tokio::spawn(async move {
         // Listen in the background for new VAA's from the p2p layer
         // and update the state accordingly.
-        loop {
+        while !crate::SHOULD_EXIT.load(Ordering::Acquire) {
             let vaa_bytes = tokio::task::spawn_blocking(|| {
                 let observation = OBSERVATIONS.1.lock();
                 let observation = match observation {
@@ -148,21 +155,24 @@ pub async fn spawn(opts: RunOptions, store: Arc<Store>) -> Result<()> {
                     Err(e) => {
                         // This should never happen, but if it does, we want to panic and crash
                         // as it is not recoverable.
-                        panic!("Failed to lock p2p observation channel: {e}");
+                        log::error!("Failed to lock p2p observation channel: {e}");
+                        crate::SHOULD_EXIT.store(true, Ordering::Release);
+                        return Err(anyhow::anyhow!("Failed to lock p2p observation channel"));
                     }
                 };
 
                 match observation.recv() {
-                    Ok(vaa_bytes) => vaa_bytes,
+                    Ok(vaa_bytes) => Ok(vaa_bytes),
                     Err(e) => {
-                        // This should never happen, but if it does, we want to panic and crash
-                        // as it is not recoverable.
-                        panic!("Failed to receive p2p observation: {e}");
+                        // This should never happen, but if it does, we want to shutdown the
+                        // application as it is unrecoverable.
+                        log::error!("Failed to receive p2p observation: {e}");
+                        crate::SHOULD_EXIT.store(true, Ordering::Release);
+                        Err(anyhow::anyhow!("Failed to receive p2p observation."))
                     }
                 }
             })
-            .await
-            .unwrap();
+            .await??;
 
             let store = store.clone();
             tokio::spawn(async move {
@@ -171,6 +181,9 @@ pub async fn spawn(opts: RunOptions, store: Arc<Store>) -> Result<()> {
                 }
             });
         }
+
+        log::info!("Shutting down P2P server...");
+        Ok::<(), anyhow::Error>(())
     });
 
     Ok(())

+ 36 - 19
hermes/src/network/pythnet.rs

@@ -47,7 +47,10 @@ use {
         system_program,
     },
     std::{
-        sync::Arc,
+        sync::{
+            atomic::Ordering,
+            Arc,
+        },
         time::Duration,
     },
     tokio::time::Instant,
@@ -125,7 +128,7 @@ async fn fetch_bridge_data(
     }
 }
 
-pub async fn run(store: Arc<Store>, pythnet_ws_endpoint: String) -> Result<!> {
+pub async fn run(store: Arc<Store>, pythnet_ws_endpoint: String) -> Result<()> {
     let client = PubsubClient::new(pythnet_ws_endpoint.as_ref()).await?;
 
     let config = RpcProgramAccountsConfig {
@@ -147,7 +150,7 @@ pub async fn run(store: Arc<Store>, pythnet_ws_endpoint: String) -> Result<!> {
         .program_subscribe(&system_program::id(), Some(config))
         .await?;
 
-    loop {
+    while !crate::SHOULD_EXIT.load(Ordering::Acquire) {
         match notif.next().await {
             Some(update) => {
                 let account: Account = match update.value.account.decode() {
@@ -198,6 +201,8 @@ pub async fn run(store: Arc<Store>, pythnet_ws_endpoint: String) -> Result<!> {
             }
         }
     }
+
+    Ok(())
 }
 
 /// Fetch existing GuardianSet accounts from Wormhole.
@@ -264,33 +269,42 @@ pub async fn spawn(opts: RunOptions, store: Arc<Store>) -> Result<()> {
     )
     .await?;
 
-    {
+    let task_listener = {
         let store = store.clone();
         let pythnet_ws_endpoint = opts.pythnet_ws_endpoint.clone();
         tokio::spawn(async move {
-            loop {
+            while !crate::SHOULD_EXIT.load(Ordering::Acquire) {
                 let current_time = Instant::now();
 
                 if let Err(ref e) = run(store.clone(), pythnet_ws_endpoint.clone()).await {
                     log::error!("Error in Pythnet network listener: {:?}", e);
-                }
-
-                if current_time.elapsed() < Duration::from_secs(30) {
-                    log::error!(
-                        "Pythnet network listener is restarting too quickly. Sleeping for 1s"
-                    );
-                    tokio::time::sleep(Duration::from_secs(1)).await;
+                    if current_time.elapsed() < Duration::from_secs(30) {
+                        log::error!(
+                            "Pythnet network listener restarting too quickly. Sleeping for 1s"
+                        );
+                        tokio::time::sleep(Duration::from_secs(1)).await;
+                    }
                 }
             }
-        });
-    }
 
-    {
+            log::info!("Shutting down Pythnet listener...");
+        })
+    };
+
+    let task_guadian_watcher = {
         let store = store.clone();
         let pythnet_http_endpoint = opts.pythnet_http_endpoint.clone();
         tokio::spawn(async move {
-            loop {
-                tokio::time::sleep(Duration::from_secs(60)).await;
+            while !crate::SHOULD_EXIT.load(Ordering::Acquire) {
+                // Poll for new guardian sets every 60 seconds. We use a short wait time so we can
+                // properly exit if a quit signal was received. This isn't a perfect solution, but
+                // it's good enough for now.
+                for _ in 0..60 {
+                    if crate::SHOULD_EXIT.load(Ordering::Acquire) {
+                        break;
+                    }
+                    tokio::time::sleep(Duration::from_secs(1)).await;
+                }
 
                 match fetch_existing_guardian_sets(
                     store.clone(),
@@ -305,8 +319,11 @@ pub async fn spawn(opts: RunOptions, store: Arc<Store>) -> Result<()> {
                     }
                 }
             }
-        });
-    }
 
+            log::info!("Shutting down Pythnet guardian set poller...");
+        })
+    };
+
+    let _ = tokio::join!(task_listener, task_guadian_watcher);
     Ok(())
 }

+ 1 - 1
hermes/src/store.rs

@@ -341,7 +341,7 @@ mod test {
     /// Generate list of updates for the given list of messages at a given slot with given sequence
     ///
     /// Sequence in Vaas is used to filter duplicate messages (as by wormhole design there is only
-    /// one message per sequence)
+    /// one message per sequence).
     pub fn generate_update(messages: Vec<Message>, slot: Slot, sequence: u64) -> Vec<Update> {
         let mut updates = Vec::new();
 

+ 1 - 0
hermes/src/store/storage.rs

@@ -296,6 +296,7 @@ mod test {
         }
     }
 
+    #[cfg(test)]
     pub async fn create_and_store_dummy_price_feed_message_state(
         storage: &Storage,
         feed_id: FeedId,