run.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367
  1. use {
  2. crate::{
  3. api::{
  4. self,
  5. BlockchainState,
  6. ChainId,
  7. },
  8. chain::ethereum::PythContract,
  9. command::register_provider::CommitmentMetadata,
  10. config::{
  11. Commitment,
  12. Config,
  13. EthereumConfig,
  14. RunOptions,
  15. },
  16. keeper,
  17. state::{
  18. HashChainState,
  19. PebbleHashChain,
  20. },
  21. },
  22. anyhow::{
  23. anyhow,
  24. Error,
  25. Result,
  26. },
  27. axum::Router,
  28. ethers::{
  29. middleware::Middleware,
  30. providers::{
  31. Http,
  32. Provider,
  33. },
  34. types::{
  35. Address,
  36. BlockNumber,
  37. },
  38. },
  39. prometheus_client::{
  40. encoding::EncodeLabelSet,
  41. metrics::{
  42. family::Family,
  43. gauge::Gauge,
  44. },
  45. registry::Registry,
  46. },
  47. std::{
  48. collections::HashMap,
  49. net::SocketAddr,
  50. sync::Arc,
  51. time::{
  52. Duration,
  53. SystemTime,
  54. UNIX_EPOCH,
  55. },
  56. },
  57. tokio::{
  58. spawn,
  59. sync::{
  60. watch,
  61. RwLock,
  62. },
  63. time,
  64. },
  65. tower_http::cors::CorsLayer,
  66. utoipa::OpenApi,
  67. utoipa_swagger_ui::SwaggerUi,
  68. };
  69. /// Track metrics in this interval
  70. const TRACK_INTERVAL: Duration = Duration::from_secs(10);
  71. pub async fn run_api(
  72. socket_addr: SocketAddr,
  73. chains: HashMap<String, api::BlockchainState>,
  74. metrics_registry: Arc<RwLock<Registry>>,
  75. mut rx_exit: watch::Receiver<bool>,
  76. ) -> Result<()> {
  77. #[derive(OpenApi)]
  78. #[openapi(
  79. paths(
  80. crate::api::revelation,
  81. crate::api::chain_ids,
  82. ),
  83. components(
  84. schemas(
  85. crate::api::GetRandomValueResponse,
  86. crate::api::Blob,
  87. crate::api::BinaryEncoding,
  88. )
  89. ),
  90. tags(
  91. (name = "fortuna", description = "Random number service for the Pyth Entropy protocol")
  92. )
  93. )]
  94. struct ApiDoc;
  95. let api_state = api::ApiState::new(chains, metrics_registry).await;
  96. // Initialize Axum Router. Note the type here is a `Router<State>` due to the use of the
  97. // `with_state` method which replaces `Body` with `State` in the type signature.
  98. let app = Router::new();
  99. let app = app
  100. .merge(SwaggerUi::new("/docs").url("/docs/openapi.json", ApiDoc::openapi()))
  101. .merge(api::routes(api_state))
  102. // Permissive CORS layer to allow all origins
  103. .layer(CorsLayer::permissive());
  104. tracing::info!("Starting server on: {:?}", &socket_addr);
  105. // Binds the axum's server to the configured address and port. This is a blocking call and will
  106. // not return until the server is shutdown.
  107. axum::Server::try_bind(&socket_addr)?
  108. .serve(app.into_make_service())
  109. .with_graceful_shutdown(async {
  110. // It can return an error or an Ok(()). In both cases, we would shut down.
  111. // As Ok(()) means, exit signal (ctrl + c) was received.
  112. // And Err(e) means, the sender was dropped which should not be the case.
  113. let _ = rx_exit.changed().await;
  114. tracing::info!("Shutting down RPC server...");
  115. })
  116. .await?;
  117. Ok(())
  118. }
  119. pub async fn run_keeper(
  120. chains: HashMap<String, api::BlockchainState>,
  121. config: Config,
  122. private_key: String,
  123. metrics_registry: Arc<RwLock<Registry>>,
  124. ) -> Result<()> {
  125. let mut handles = Vec::new();
  126. for (chain_id, chain_config) in chains {
  127. let chain_eth_config = config
  128. .chains
  129. .get(&chain_id)
  130. .expect("All chains should be present in the config file")
  131. .clone();
  132. let private_key = private_key.clone();
  133. handles.push(spawn(keeper::run_keeper_threads(
  134. private_key,
  135. chain_eth_config,
  136. chain_config.clone(),
  137. metrics_registry.clone(),
  138. )));
  139. }
  140. Ok(())
  141. }
  142. pub async fn run(opts: &RunOptions) -> Result<()> {
  143. let config = Config::load(&opts.config.config)?;
  144. let secret = config.provider.secret.load()?.ok_or(anyhow!(
  145. "Please specify a provider secret in the config file."
  146. ))?;
  147. let (tx_exit, rx_exit) = watch::channel(false);
  148. let mut chains: HashMap<ChainId, BlockchainState> = HashMap::new();
  149. for (chain_id, chain_config) in &config.chains {
  150. let state =
  151. setup_chain_state(&config.provider.address, &secret, chain_id, chain_config).await;
  152. match state {
  153. Ok(state) => {
  154. chains.insert(chain_id.clone(), state);
  155. }
  156. Err(e) => {
  157. tracing::error!("Failed to setup {} {}", chain_id, e);
  158. }
  159. }
  160. }
  161. if chains.is_empty() {
  162. return Err(anyhow!("No chains were successfully setup"));
  163. }
  164. // Listen for Ctrl+C so we can set the exit flag and wait for a graceful shutdown.
  165. spawn(async move {
  166. tracing::info!("Registered shutdown signal handler...");
  167. tokio::signal::ctrl_c().await.unwrap();
  168. tracing::info!("Shut down signal received, waiting for tasks...");
  169. // no need to handle error here, as it will only occur when all the
  170. // receiver has been dropped and that's what we want to do
  171. tx_exit.send(true)?;
  172. Ok::<(), Error>(())
  173. });
  174. let metrics_registry = Arc::new(RwLock::new(Registry::default()));
  175. if let Some(keeper_private_key) = config.keeper.private_key.load()? {
  176. spawn(run_keeper(
  177. chains.clone(),
  178. config.clone(),
  179. keeper_private_key,
  180. metrics_registry.clone(),
  181. ));
  182. } else {
  183. tracing::info!("Not starting keeper service: no keeper private key specified. Please add one to the config if you would like to run the keeper service.")
  184. }
  185. // Spawn a thread to track latest block lag. This helps us know if the rpc is up and updated with the latest block.
  186. spawn(track_block_timestamp_lag(config, metrics_registry.clone()));
  187. run_api(opts.addr.clone(), chains, metrics_registry, rx_exit).await?;
  188. Ok(())
  189. }
  190. async fn setup_chain_state(
  191. provider: &Address,
  192. secret: &String,
  193. chain_id: &ChainId,
  194. chain_config: &EthereumConfig,
  195. ) -> Result<BlockchainState> {
  196. let contract = Arc::new(PythContract::from_config(&chain_config)?);
  197. let mut provider_commitments = chain_config.commitments.clone().unwrap_or(Vec::new());
  198. provider_commitments.sort_by(|c1, c2| {
  199. c1.original_commitment_sequence_number
  200. .cmp(&c2.original_commitment_sequence_number)
  201. });
  202. let provider_info = contract.get_provider_info(*provider).call().await?;
  203. let latest_metadata = bincode::deserialize::<CommitmentMetadata>(
  204. &provider_info.commitment_metadata,
  205. )
  206. .map_err(|e| {
  207. anyhow!(
  208. "Chain: {} - Failed to deserialize commitment metadata: {}",
  209. &chain_id,
  210. e
  211. )
  212. })?;
  213. let last_prior_commitment = provider_commitments.last();
  214. if last_prior_commitment.is_some()
  215. && last_prior_commitment
  216. .unwrap()
  217. .original_commitment_sequence_number
  218. >= provider_info.original_commitment_sequence_number
  219. {
  220. return Err(anyhow!("The current hash chain for chain id {} has configured commitments for sequence numbers greater than the current on-chain sequence number. Are the commitments configured correctly?", &chain_id));
  221. }
  222. provider_commitments.push(Commitment {
  223. seed: latest_metadata.seed,
  224. chain_length: latest_metadata.chain_length,
  225. original_commitment_sequence_number: provider_info.original_commitment_sequence_number,
  226. });
  227. // TODO: we may want to load the hash chain in a lazy/fault-tolerant way. If there are many blockchains,
  228. // then it's more likely that some RPC fails. We should tolerate these faults and generate the hash chain
  229. // later when a user request comes in for that chain.
  230. let mut offsets = Vec::<usize>::new();
  231. let mut hash_chains = Vec::<PebbleHashChain>::new();
  232. for commitment in &provider_commitments {
  233. let offset = commitment.original_commitment_sequence_number.try_into()?;
  234. offsets.push(offset);
  235. let pebble_hash_chain = PebbleHashChain::from_config(
  236. &secret,
  237. &chain_id,
  238. &provider,
  239. &chain_config.contract_addr,
  240. &commitment.seed,
  241. commitment.chain_length,
  242. )?;
  243. hash_chains.push(pebble_hash_chain);
  244. }
  245. let chain_state = HashChainState {
  246. offsets,
  247. hash_chains,
  248. };
  249. if chain_state.reveal(provider_info.original_commitment_sequence_number)?
  250. != provider_info.original_commitment
  251. {
  252. return Err(anyhow!("The root of the generated hash chain for chain id {} does not match the commitment. Are the secret and chain length configured correctly?", &chain_id).into());
  253. } else {
  254. tracing::info!("Root of chain id {} matches commitment", &chain_id);
  255. }
  256. let state = BlockchainState {
  257. id: chain_id.clone(),
  258. state: Arc::new(chain_state),
  259. contract,
  260. provider_address: provider.clone(),
  261. reveal_delay_blocks: chain_config.reveal_delay_blocks,
  262. confirmed_block_status: chain_config.confirmed_block_status,
  263. };
  264. Ok(state)
  265. }
  266. #[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
  267. pub struct ChainLabel {
  268. pub chain_id: String,
  269. }
  270. #[tracing::instrument(name = "block_timestamp_lag", skip_all, fields(chain_id = chain_id))]
  271. pub async fn check_block_timestamp_lag(
  272. chain_id: String,
  273. chain_config: EthereumConfig,
  274. metrics: Family<ChainLabel, Gauge>,
  275. ) {
  276. let provider = match Provider::<Http>::try_from(&chain_config.geth_rpc_addr) {
  277. Ok(r) => r,
  278. Err(e) => {
  279. tracing::error!("Failed to create provider for chain id - {:?}", e);
  280. return;
  281. }
  282. };
  283. const INF_LAG: i64 = 1000000; // value that definitely triggers an alert
  284. let lag = match provider.get_block(BlockNumber::Latest).await {
  285. Ok(block) => match block {
  286. Some(block) => {
  287. let block_timestamp = block.timestamp;
  288. let server_timestamp = SystemTime::now()
  289. .duration_since(UNIX_EPOCH)
  290. .unwrap()
  291. .as_secs();
  292. let lag: i64 = (server_timestamp as i64) - (block_timestamp.as_u64() as i64);
  293. lag
  294. }
  295. None => {
  296. tracing::error!("Block is None");
  297. INF_LAG
  298. }
  299. },
  300. Err(e) => {
  301. tracing::error!("Failed to get block - {:?}", e);
  302. INF_LAG
  303. }
  304. };
  305. metrics
  306. .get_or_create(&ChainLabel {
  307. chain_id: chain_id.clone(),
  308. })
  309. .set(lag);
  310. }
  311. /// Tracks the difference between the server timestamp and the latest block timestamp for each chain
  312. pub async fn track_block_timestamp_lag(config: Config, metrics_registry: Arc<RwLock<Registry>>) {
  313. let metrics = Family::<ChainLabel, Gauge>::default();
  314. metrics_registry.write().await.register(
  315. "block_timestamp_lag",
  316. "The difference between server timestamp and latest block timestamp",
  317. metrics.clone(),
  318. );
  319. loop {
  320. for (chain_id, chain_config) in &config.chains {
  321. spawn(check_block_timestamp_lag(
  322. chain_id.clone(),
  323. chain_config.clone(),
  324. metrics.clone(),
  325. ));
  326. }
  327. time::sleep(TRACK_INTERVAL).await;
  328. }
  329. }