main.rs 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. #![feature(never_type)]
  2. #![feature(btree_cursors)]
  3. use {
  4. crate::store::Store,
  5. anyhow::Result,
  6. futures::future::join_all,
  7. std::{
  8. io::IsTerminal,
  9. sync::atomic::AtomicBool,
  10. },
  11. structopt::StructOpt,
  12. tokio::spawn,
  13. };
  14. mod api;
  15. mod config;
  16. mod doc_examples;
  17. mod macros;
  18. mod network;
  19. mod store;
  20. // A static exit flag to indicate to running threads that we're shutting down. This is used to
  21. // gracefully shutdown the application.
  22. //
  23. // NOTE: A more idiomatic approach would be to use a tokio::sync::broadcast channel, and to send a
  24. // shutdown signal to all running tasks. However, this is a bit more complicated to implement and
  25. // we don't rely on global state for anything else.
  26. pub(crate) static SHOULD_EXIT: AtomicBool = AtomicBool::new(false);
  27. /// Initialize the Application. This can be invoked either by real main, or by the Geyser plugin.
  28. #[tracing::instrument]
  29. async fn init() -> Result<()> {
  30. tracing::info!("Initializing Hermes...");
  31. // Parse the command line arguments with StructOpt, will exit automatically on `--help` or
  32. // with invalid arguments.
  33. match config::Options::from_args() {
  34. config::Options::Run(opts) => {
  35. tracing::info!("Starting hermes service...");
  36. // The update channel is used to send store update notifications to the public API.
  37. let (update_tx, update_rx) = tokio::sync::mpsc::channel(1000);
  38. // Initialize a cache store with a 1000 element circular buffer.
  39. let store = Store::new(update_tx.clone(), 1000, opts.benchmarks_endpoint.clone());
  40. // Listen for Ctrl+C so we can set the exit flag and wait for a graceful shutdown. We
  41. // also send off any notifications needed to close off any waiting tasks.
  42. spawn(async move {
  43. tracing::info!("Registered shutdown signal handler...");
  44. tokio::signal::ctrl_c().await.unwrap();
  45. tracing::info!("Shut down signal received, waiting for tasks...");
  46. SHOULD_EXIT.store(true, std::sync::atomic::Ordering::Release);
  47. let _ = update_tx.send(()).await;
  48. });
  49. // Spawn all worker tasks, and wait for all to complete (which will happen if a shutdown
  50. // signal has been observed).
  51. let tasks = join_all([
  52. Box::pin(spawn(network::p2p::spawn(opts.clone(), store.clone()))),
  53. Box::pin(spawn(network::pythnet::spawn(opts.clone(), store.clone()))),
  54. Box::pin(spawn(api::run(opts.clone(), store.clone(), update_rx))),
  55. ])
  56. .await;
  57. for task in tasks {
  58. task??;
  59. }
  60. }
  61. }
  62. Ok(())
  63. }
  64. #[tokio::main]
  65. #[tracing::instrument]
  66. async fn main() -> Result<()> {
  67. // Initialize a Tracing Subscriber
  68. tracing::subscriber::set_global_default(
  69. tracing_subscriber::fmt()
  70. .compact()
  71. .with_file(false)
  72. .with_line_number(true)
  73. .with_thread_ids(true)
  74. .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
  75. .with_ansi(std::io::stderr().is_terminal())
  76. .finish(),
  77. )?;
  78. // Launch the application. If it fails, print the full backtrace and exit. RUST_BACKTRACE
  79. // should be set to 1 for this otherwise it will only print the top-level error.
  80. if let Err(result) = init().await {
  81. eprintln!("{}", result.backtrace());
  82. result.chain().for_each(|cause| eprintln!("{cause}"));
  83. std::process::exit(1);
  84. }
  85. Ok(())
  86. }