main.rs 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. #![feature(never_type)]
  2. #![feature(btree_cursors)]
  3. use {
  4. anyhow::Result,
  5. clap::{
  6. CommandFactory,
  7. Parser,
  8. },
  9. futures::future::join_all,
  10. lazy_static::lazy_static,
  11. state::State,
  12. std::io::IsTerminal,
  13. tokio::{
  14. spawn,
  15. sync::watch,
  16. },
  17. };
  18. mod aggregate;
  19. mod api;
  20. mod config;
  21. mod metrics_server;
  22. mod network;
  23. mod price_feeds_metadata;
  24. mod serde;
  25. mod state;
  26. lazy_static! {
  27. /// A static exit flag to indicate to running threads that we're shutting down. This is used to
  28. /// gracefully shutdown the application.
  29. ///
  30. /// We make this global based on the fact the:
  31. /// - The `Sender` side does not rely on any async runtime.
  32. /// - Exit logic doesn't really require carefully threading this value through the app.
  33. /// - The `Receiver` side of a watch channel performs the detection based on if the change
  34. /// happened after the subscribe, so it means all listeners should always be notified
  35. /// currectly.
  36. pub static ref EXIT: watch::Sender<bool> = watch::channel(false).0;
  37. }
  38. /// Initialize the Application. This can be invoked either by real main, or by the Geyser plugin.
  39. #[tracing::instrument]
  40. async fn init() -> Result<()> {
  41. tracing::info!("Initializing Hermes...");
  42. // Parse the command line arguments with StructOpt, will exit automatically on `--help` or
  43. // with invalid arguments.
  44. match config::Options::parse() {
  45. config::Options::Run(opts) => {
  46. tracing::info!("Starting hermes service...");
  47. // The update broadcast channel is used to send store update notifications to the public API.
  48. let (update_tx, _) = tokio::sync::broadcast::channel(1000);
  49. // Initialize a cache store with a 1000 element circular buffer.
  50. let store = State::new(update_tx.clone(), 1000, opts.benchmarks.endpoint.clone());
  51. // Listen for Ctrl+C so we can set the exit flag and wait for a graceful shutdown.
  52. spawn(async move {
  53. tracing::info!("Registered shutdown signal handler...");
  54. tokio::signal::ctrl_c().await.unwrap();
  55. tracing::info!("Shut down signal received, waiting for tasks...");
  56. let _ = EXIT.send(true);
  57. });
  58. // Spawn all worker tasks, and wait for all to complete (which will happen if a shutdown
  59. // signal has been observed).
  60. let tasks = join_all([
  61. Box::pin(spawn(network::wormhole::spawn(opts.clone(), store.clone()))),
  62. Box::pin(spawn(network::pythnet::spawn(opts.clone(), store.clone()))),
  63. Box::pin(spawn(metrics_server::run(opts.clone(), store.clone()))),
  64. Box::pin(spawn(api::spawn(opts.clone(), store.clone(), update_tx))),
  65. ])
  66. .await;
  67. for task in tasks {
  68. task??;
  69. }
  70. }
  71. config::Options::ShowEnv(opts) => {
  72. // For each subcommand, scan for arguments that allow overriding with an ENV variable
  73. // and print that variable.
  74. for subcommand in config::Options::command().get_subcommands() {
  75. for arg in subcommand.get_arguments() {
  76. if let Some(env) = arg.get_env().and_then(|env| env.to_str()) {
  77. // Find the defaults for this argument, if present.
  78. let defaults = arg
  79. .get_default_values()
  80. .iter()
  81. .map(|v| v.to_str().unwrap())
  82. .collect::<Vec<_>>()
  83. .join(",");
  84. println!(
  85. "{}={}",
  86. env,
  87. match opts.defaults {
  88. true => defaults,
  89. false => std::env::var(env).unwrap_or(defaults),
  90. }
  91. );
  92. }
  93. }
  94. }
  95. }
  96. }
  97. Ok(())
  98. }
  99. #[tokio::main]
  100. #[tracing::instrument]
  101. async fn main() -> Result<()> {
  102. // Initialize a Tracing Subscriber
  103. let fmt_builder = tracing_subscriber::fmt()
  104. .with_file(false)
  105. .with_line_number(true)
  106. .with_thread_ids(true)
  107. .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
  108. .with_ansi(std::io::stderr().is_terminal());
  109. // Use the compact formatter if we're in a terminal, otherwise use the JSON formatter.
  110. if std::io::stderr().is_terminal() {
  111. tracing::subscriber::set_global_default(fmt_builder.compact().finish())?;
  112. } else {
  113. tracing::subscriber::set_global_default(fmt_builder.json().finish())?;
  114. }
  115. // Launch the application. If it fails, print the full backtrace and exit. RUST_BACKTRACE
  116. // should be set to 1 for this otherwise it will only print the top-level error.
  117. if let Err(result) = init().await {
  118. eprintln!("{}", result.backtrace());
  119. result.chain().for_each(|cause| eprintln!("{cause}"));
  120. std::process::exit(1);
  121. }
  122. Ok(())
  123. }