run.rs 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. use {
  2. crate::{
  3. api::{
  4. self,
  5. BlockchainState,
  6. ChainId,
  7. },
  8. chain::ethereum::PythContract,
  9. command::register_provider::CommitmentMetadata,
  10. config::{
  11. Commitment,
  12. Config,
  13. ProviderConfig,
  14. RunOptions,
  15. },
  16. keeper,
  17. state::{
  18. HashChainState,
  19. PebbleHashChain,
  20. },
  21. },
  22. anyhow::{
  23. anyhow,
  24. Error,
  25. Result,
  26. },
  27. axum::Router,
  28. std::{
  29. collections::HashMap,
  30. net::SocketAddr,
  31. sync::Arc,
  32. },
  33. tokio::{
  34. spawn,
  35. sync::watch,
  36. },
  37. tower_http::cors::CorsLayer,
  38. utoipa::OpenApi,
  39. utoipa_swagger_ui::SwaggerUi,
  40. };
  41. pub async fn run_api(
  42. socket_addr: SocketAddr,
  43. chains: HashMap<String, api::BlockchainState>,
  44. mut rx_exit: watch::Receiver<bool>,
  45. ) -> Result<()> {
  46. #[derive(OpenApi)]
  47. #[openapi(
  48. paths(
  49. crate::api::revelation,
  50. crate::api::chain_ids,
  51. ),
  52. components(
  53. schemas(
  54. crate::api::GetRandomValueResponse,
  55. crate::api::Blob,
  56. crate::api::BinaryEncoding,
  57. )
  58. ),
  59. tags(
  60. (name = "fortuna", description = "Random number service for the Pyth Entropy protocol")
  61. )
  62. )]
  63. struct ApiDoc;
  64. let metrics_registry = api::Metrics::new();
  65. let api_state = api::ApiState {
  66. chains: Arc::new(chains),
  67. metrics: Arc::new(metrics_registry),
  68. };
  69. // Initialize Axum Router. Note the type here is a `Router<State>` due to the use of the
  70. // `with_state` method which replaces `Body` with `State` in the type signature.
  71. let app = Router::new();
  72. let app = app
  73. .merge(SwaggerUi::new("/docs").url("/docs/openapi.json", ApiDoc::openapi()))
  74. .merge(api::routes(api_state))
  75. // Permissive CORS layer to allow all origins
  76. .layer(CorsLayer::permissive());
  77. tracing::info!("Starting server on: {:?}", &socket_addr);
  78. // Binds the axum's server to the configured address and port. This is a blocking call and will
  79. // not return until the server is shutdown.
  80. axum::Server::try_bind(&socket_addr)?
  81. .serve(app.into_make_service())
  82. .with_graceful_shutdown(async {
  83. // It can return an error or an Ok(()). In both cases, we would shut down.
  84. // As Ok(()) means, exit signal (ctrl + c) was received.
  85. // And Err(e) means, the sender was dropped which should not be the case.
  86. let _ = rx_exit.changed().await;
  87. tracing::info!("Shutting down RPC server...");
  88. })
  89. .await?;
  90. Ok(())
  91. }
  92. pub async fn run_keeper(
  93. chains: HashMap<String, api::BlockchainState>,
  94. config: Config,
  95. private_key: String,
  96. ) -> Result<()> {
  97. let mut handles = Vec::new();
  98. for (chain_id, chain_config) in chains {
  99. let chain_eth_config = config
  100. .chains
  101. .get(&chain_id)
  102. .expect("All chains should be present in the config file")
  103. .clone();
  104. let private_key = private_key.clone();
  105. handles.push(spawn(keeper::run_keeper_threads(
  106. private_key,
  107. chain_eth_config,
  108. chain_config.clone(),
  109. )));
  110. }
  111. Ok(())
  112. }
  113. pub async fn run(opts: &RunOptions) -> Result<()> {
  114. let config = Config::load(&opts.config.config)?;
  115. let provider_config = opts
  116. .provider_config
  117. .provider_config
  118. .as_ref()
  119. .map(|path| ProviderConfig::load(&path).expect("Failed to load provider config"));
  120. let secret = opts.randomness.load_secret()?;
  121. let (tx_exit, rx_exit) = watch::channel(false);
  122. let mut chains: HashMap<ChainId, BlockchainState> = HashMap::new();
  123. for (chain_id, chain_config) in &config.chains {
  124. let contract = Arc::new(PythContract::from_config(&chain_config)?);
  125. let provider_chain_config = provider_config
  126. .as_ref()
  127. .and_then(|c| c.get_chain_config(chain_id));
  128. let mut provider_commitments = provider_chain_config
  129. .as_ref()
  130. .map(|c| c.get_sorted_commitments())
  131. .unwrap_or_else(|| Vec::new());
  132. let provider_info = contract.get_provider_info(opts.provider).call().await?;
  133. let latest_metadata =
  134. bincode::deserialize::<CommitmentMetadata>(&provider_info.commitment_metadata)?;
  135. provider_commitments.push(Commitment {
  136. seed: latest_metadata.seed,
  137. chain_length: latest_metadata.chain_length,
  138. original_commitment_sequence_number: provider_info.original_commitment_sequence_number,
  139. });
  140. // TODO: we may want to load the hash chain in a lazy/fault-tolerant way. If there are many blockchains,
  141. // then it's more likely that some RPC fails. We should tolerate these faults and generate the hash chain
  142. // later when a user request comes in for that chain.
  143. let mut offsets = Vec::<usize>::new();
  144. let mut hash_chains = Vec::<PebbleHashChain>::new();
  145. for commitment in &provider_commitments {
  146. let offset = commitment.original_commitment_sequence_number.try_into()?;
  147. offsets.push(offset);
  148. let pebble_hash_chain = PebbleHashChain::from_config(
  149. &secret,
  150. &chain_id,
  151. &opts.provider,
  152. &chain_config.contract_addr,
  153. &commitment.seed,
  154. commitment.chain_length,
  155. )?;
  156. hash_chains.push(pebble_hash_chain);
  157. }
  158. let chain_state = HashChainState {
  159. offsets,
  160. hash_chains,
  161. };
  162. if chain_state.reveal(provider_info.original_commitment_sequence_number)?
  163. != provider_info.original_commitment
  164. {
  165. return Err(anyhow!("The root of the generated hash chain for chain id {} does not match the commitment. Are the secret and chain length configured correctly?", &chain_id).into());
  166. } else {
  167. tracing::info!("Root of chain id {} matches commitment", &chain_id);
  168. }
  169. let state = api::BlockchainState {
  170. id: chain_id.clone(),
  171. state: Arc::new(chain_state),
  172. contract,
  173. provider_address: opts.provider,
  174. reveal_delay_blocks: chain_config.reveal_delay_blocks,
  175. confirmed_block_status: chain_config.confirmed_block_status,
  176. };
  177. chains.insert(chain_id.clone(), state);
  178. }
  179. // Listen for Ctrl+C so we can set the exit flag and wait for a graceful shutdown.
  180. spawn(async move {
  181. tracing::info!("Registered shutdown signal handler...");
  182. tokio::signal::ctrl_c().await.unwrap();
  183. tracing::info!("Shut down signal received, waiting for tasks...");
  184. // no need to handle error here, as it will only occur when all the
  185. // receiver has been dropped and that's what we want to do
  186. tx_exit.send(true)?;
  187. Ok::<(), Error>(())
  188. });
  189. if let Some(keeper_private_key) = opts.load_keeper_private_key()? {
  190. spawn(run_keeper(chains.clone(), config, keeper_private_key));
  191. }
  192. run_api(opts.addr.clone(), chains, rx_exit).await?;
  193. Ok(())
  194. }