run.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. use {
  2. crate::{
  3. api::{self, BlockchainState, ChainId},
  4. chain::{
  5. ethereum::InstrumentedPythContract,
  6. traced_client::{RpcMetrics, TracedClient},
  7. },
  8. command::register_provider::CommitmentMetadata,
  9. config::{Commitment, Config, EthereumConfig, RunOptions},
  10. keeper::{self, KeeperMetrics},
  11. state::{HashChainState, PebbleHashChain},
  12. },
  13. anyhow::{anyhow, Error, Result},
  14. axum::Router,
  15. ethers::{
  16. middleware::Middleware,
  17. types::{Address, BlockNumber},
  18. },
  19. futures::future::join_all,
  20. prometheus_client::{
  21. encoding::EncodeLabelSet,
  22. metrics::{family::Family, gauge::Gauge},
  23. registry::Registry,
  24. },
  25. std::{
  26. collections::HashMap,
  27. net::SocketAddr,
  28. sync::Arc,
  29. time::{Duration, SystemTime, UNIX_EPOCH},
  30. },
  31. tokio::{
  32. spawn,
  33. sync::{watch, RwLock},
  34. time,
  35. },
  36. tower_http::cors::CorsLayer,
  37. utoipa::OpenApi,
  38. utoipa_swagger_ui::SwaggerUi,
  39. };
  40. /// Track metrics in this interval
  41. const TRACK_INTERVAL: Duration = Duration::from_secs(10);
  42. pub async fn run_api(
  43. socket_addr: SocketAddr,
  44. chains: HashMap<String, api::BlockchainState>,
  45. metrics_registry: Arc<RwLock<Registry>>,
  46. mut rx_exit: watch::Receiver<bool>,
  47. ) -> Result<()> {
  48. #[derive(OpenApi)]
  49. #[openapi(
  50. paths(
  51. crate::api::revelation,
  52. crate::api::chain_ids,
  53. ),
  54. components(
  55. schemas(
  56. crate::api::GetRandomValueResponse,
  57. crate::api::Blob,
  58. crate::api::BinaryEncoding,
  59. )
  60. ),
  61. tags(
  62. (name = "fortuna", description = "Random number service for the Pyth Entropy protocol")
  63. )
  64. )]
  65. struct ApiDoc;
  66. let api_state = api::ApiState::new(chains, metrics_registry).await;
  67. // Initialize Axum Router. Note the type here is a `Router<State>` due to the use of the
  68. // `with_state` method which replaces `Body` with `State` in the type signature.
  69. let app = Router::new();
  70. let app = app
  71. .merge(SwaggerUi::new("/docs").url("/docs/openapi.json", ApiDoc::openapi()))
  72. .merge(api::routes(api_state))
  73. // Permissive CORS layer to allow all origins
  74. .layer(CorsLayer::permissive());
  75. tracing::info!("Starting server on: {:?}", &socket_addr);
  76. // Binds the axum's server to the configured address and port. This is a blocking call and will
  77. // not return until the server is shutdown.
  78. axum::Server::try_bind(&socket_addr)?
  79. .serve(app.into_make_service())
  80. .with_graceful_shutdown(async {
  81. // It can return an error or an Ok(()). In both cases, we would shut down.
  82. // As Ok(()) means, exit signal (ctrl + c) was received.
  83. // And Err(e) means, the sender was dropped which should not be the case.
  84. let _ = rx_exit.changed().await;
  85. tracing::info!("Shutting down RPC server...");
  86. })
  87. .await?;
  88. Ok(())
  89. }
  90. pub async fn run_keeper(
  91. chains: HashMap<String, api::BlockchainState>,
  92. config: Config,
  93. private_key: String,
  94. metrics_registry: Arc<RwLock<Registry>>,
  95. rpc_metrics: Arc<RpcMetrics>,
  96. ) -> Result<()> {
  97. let mut handles = Vec::new();
  98. let keeper_metrics = Arc::new(KeeperMetrics::new(metrics_registry).await);
  99. for (chain_id, chain_config) in chains {
  100. let chain_eth_config = config
  101. .chains
  102. .get(&chain_id)
  103. .expect("All chains should be present in the config file")
  104. .clone();
  105. let private_key = private_key.clone();
  106. handles.push(spawn(keeper::run_keeper_threads(
  107. private_key,
  108. chain_eth_config,
  109. chain_config.clone(),
  110. keeper_metrics.clone(),
  111. rpc_metrics.clone(),
  112. )));
  113. }
  114. Ok(())
  115. }
  116. pub async fn run(opts: &RunOptions) -> Result<()> {
  117. let config = Config::load(&opts.config.config)?;
  118. let secret = config.provider.secret.load()?.ok_or(anyhow!(
  119. "Please specify a provider secret in the config file."
  120. ))?;
  121. let (tx_exit, rx_exit) = watch::channel(false);
  122. let metrics_registry = Arc::new(RwLock::new(Registry::default()));
  123. let rpc_metrics = Arc::new(RpcMetrics::new(metrics_registry.clone()).await);
  124. let mut tasks = Vec::new();
  125. for (chain_id, chain_config) in config.chains.clone() {
  126. let secret_copy = secret.clone();
  127. let rpc_metrics = rpc_metrics.clone();
  128. tasks.push(spawn(async move {
  129. let state = setup_chain_state(
  130. &config.provider.address,
  131. &secret_copy,
  132. config.provider.chain_sample_interval,
  133. &chain_id,
  134. &chain_config,
  135. rpc_metrics,
  136. )
  137. .await;
  138. (chain_id, state)
  139. }));
  140. }
  141. let states = join_all(tasks).await;
  142. let mut chains: HashMap<ChainId, BlockchainState> = HashMap::new();
  143. for result in states {
  144. let (chain_id, state) = result?;
  145. match state {
  146. Ok(state) => {
  147. chains.insert(chain_id.clone(), state);
  148. }
  149. Err(e) => {
  150. tracing::error!("Failed to setup {} {}", chain_id, e);
  151. }
  152. }
  153. }
  154. if chains.is_empty() {
  155. return Err(anyhow!("No chains were successfully setup"));
  156. }
  157. // Listen for Ctrl+C so we can set the exit flag and wait for a graceful shutdown.
  158. spawn(async move {
  159. tracing::info!("Registered shutdown signal handler...");
  160. tokio::signal::ctrl_c().await.unwrap();
  161. tracing::info!("Shut down signal received, waiting for tasks...");
  162. // no need to handle error here, as it will only occur when all the
  163. // receiver has been dropped and that's what we want to do
  164. tx_exit.send(true)?;
  165. Ok::<(), Error>(())
  166. });
  167. if let Some(keeper_private_key) = config.keeper.private_key.load()? {
  168. spawn(run_keeper(
  169. chains.clone(),
  170. config.clone(),
  171. keeper_private_key,
  172. metrics_registry.clone(),
  173. rpc_metrics.clone(),
  174. ));
  175. } else {
  176. tracing::info!("Not starting keeper service: no keeper private key specified. Please add one to the config if you would like to run the keeper service.")
  177. }
  178. // Spawn a thread to track latest block lag. This helps us know if the rpc is up and updated with the latest block.
  179. spawn(track_block_timestamp_lag(
  180. config,
  181. metrics_registry.clone(),
  182. rpc_metrics.clone(),
  183. ));
  184. run_api(opts.addr, chains, metrics_registry, rx_exit).await?;
  185. Ok(())
  186. }
  187. async fn setup_chain_state(
  188. provider: &Address,
  189. secret: &str,
  190. chain_sample_interval: u64,
  191. chain_id: &ChainId,
  192. chain_config: &EthereumConfig,
  193. rpc_metrics: Arc<RpcMetrics>,
  194. ) -> Result<BlockchainState> {
  195. let contract = Arc::new(InstrumentedPythContract::from_config(
  196. chain_config,
  197. chain_id.clone(),
  198. rpc_metrics,
  199. )?);
  200. let mut provider_commitments = chain_config.commitments.clone().unwrap_or_default();
  201. provider_commitments.sort_by(|c1, c2| {
  202. c1.original_commitment_sequence_number
  203. .cmp(&c2.original_commitment_sequence_number)
  204. });
  205. let provider_info = contract.get_provider_info(*provider).call().await?;
  206. let latest_metadata = bincode::deserialize::<CommitmentMetadata>(
  207. &provider_info.commitment_metadata,
  208. )
  209. .map_err(|e| {
  210. anyhow!(
  211. "Chain: {} - Failed to deserialize commitment metadata: {}",
  212. &chain_id,
  213. e
  214. )
  215. })?;
  216. let last_prior_commitment = provider_commitments.last();
  217. if last_prior_commitment.is_some()
  218. && last_prior_commitment
  219. .unwrap()
  220. .original_commitment_sequence_number
  221. >= provider_info.original_commitment_sequence_number
  222. {
  223. return Err(anyhow!("The current hash chain for chain id {} has configured commitments for sequence numbers greater than the current on-chain sequence number. Are the commitments configured correctly?", &chain_id));
  224. }
  225. provider_commitments.push(Commitment {
  226. seed: latest_metadata.seed,
  227. chain_length: latest_metadata.chain_length,
  228. original_commitment_sequence_number: provider_info.original_commitment_sequence_number,
  229. });
  230. // TODO: we may want to load the hash chain in a lazy/fault-tolerant way. If there are many blockchains,
  231. // then it's more likely that some RPC fails. We should tolerate these faults and generate the hash chain
  232. // later when a user request comes in for that chain.
  233. let mut offsets = Vec::<usize>::new();
  234. let mut hash_chains = Vec::<PebbleHashChain>::new();
  235. for commitment in &provider_commitments {
  236. let offset = commitment.original_commitment_sequence_number.try_into()?;
  237. offsets.push(offset);
  238. let pebble_hash_chain = PebbleHashChain::from_config(
  239. secret,
  240. chain_id,
  241. provider,
  242. &chain_config.contract_addr,
  243. &commitment.seed,
  244. commitment.chain_length,
  245. chain_sample_interval,
  246. )
  247. .map_err(|e| anyhow!("Failed to create hash chain: {}", e))?;
  248. hash_chains.push(pebble_hash_chain);
  249. }
  250. let chain_state = HashChainState {
  251. offsets,
  252. hash_chains,
  253. };
  254. if chain_state.reveal(provider_info.original_commitment_sequence_number)?
  255. != provider_info.original_commitment
  256. {
  257. return Err(anyhow!("The root of the generated hash chain for chain id {} does not match the commitment. Are the secret and chain length configured correctly?", &chain_id));
  258. } else {
  259. tracing::info!("Root of chain id {} matches commitment", &chain_id);
  260. }
  261. let state = BlockchainState {
  262. id: chain_id.clone(),
  263. state: Arc::new(chain_state),
  264. contract,
  265. provider_address: *provider,
  266. reveal_delay_blocks: chain_config.reveal_delay_blocks,
  267. confirmed_block_status: chain_config.confirmed_block_status,
  268. };
  269. Ok(state)
  270. }
  271. #[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
  272. pub struct ChainLabel {
  273. pub chain_id: String,
  274. }
  275. #[tracing::instrument(name = "block_timestamp_lag", skip_all, fields(chain_id = chain_id))]
  276. pub async fn check_block_timestamp_lag(
  277. chain_id: String,
  278. chain_config: EthereumConfig,
  279. metrics: Family<ChainLabel, Gauge>,
  280. rpc_metrics: Arc<RpcMetrics>,
  281. ) {
  282. let provider =
  283. match TracedClient::new(chain_id.clone(), &chain_config.geth_rpc_addr, rpc_metrics) {
  284. Ok(r) => r,
  285. Err(e) => {
  286. tracing::error!("Failed to create provider for chain id - {:?}", e);
  287. return;
  288. }
  289. };
  290. const INF_LAG: i64 = 1000000; // value that definitely triggers an alert
  291. let lag = match provider.get_block(BlockNumber::Latest).await {
  292. Ok(block) => match block {
  293. Some(block) => {
  294. let block_timestamp = block.timestamp;
  295. let server_timestamp = SystemTime::now()
  296. .duration_since(UNIX_EPOCH)
  297. .unwrap()
  298. .as_secs();
  299. let lag: i64 = (server_timestamp as i64) - (block_timestamp.as_u64() as i64);
  300. lag
  301. }
  302. None => {
  303. tracing::error!("Block is None");
  304. INF_LAG
  305. }
  306. },
  307. Err(e) => {
  308. tracing::error!("Failed to get block - {:?}", e);
  309. INF_LAG
  310. }
  311. };
  312. metrics
  313. .get_or_create(&ChainLabel {
  314. chain_id: chain_id.clone(),
  315. })
  316. .set(lag);
  317. }
  318. /// Tracks the difference between the server timestamp and the latest block timestamp for each chain
  319. pub async fn track_block_timestamp_lag(
  320. config: Config,
  321. metrics_registry: Arc<RwLock<Registry>>,
  322. rpc_metrics: Arc<RpcMetrics>,
  323. ) {
  324. let metrics = Family::<ChainLabel, Gauge>::default();
  325. metrics_registry.write().await.register(
  326. "block_timestamp_lag",
  327. "The difference between server timestamp and latest block timestamp",
  328. metrics.clone(),
  329. );
  330. loop {
  331. for (chain_id, chain_config) in &config.chains {
  332. spawn(check_block_timestamp_lag(
  333. chain_id.clone(),
  334. chain_config.clone(),
  335. metrics.clone(),
  336. rpc_metrics.clone(),
  337. ));
  338. }
  339. time::sleep(TRACK_INTERVAL).await;
  340. }
  341. }