keeper.rs 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. use crate::keeper::track::track_block_timestamp_lag;
  2. use {
  3. crate::{
  4. api::{BlockchainState, ChainId},
  5. chain::ethereum::{InstrumentedPythContract, InstrumentedSignablePythContract},
  6. config::EthereumConfig,
  7. eth_utils::traced_client::RpcMetrics,
  8. keeper::block::{
  9. get_latest_safe_block, process_backlog, process_new_blocks, watch_blocks_wrapper,
  10. BlockRange,
  11. },
  12. keeper::commitment::update_commitments_loop,
  13. keeper::fee::adjust_fee_wrapper,
  14. keeper::fee::withdraw_fees_wrapper,
  15. keeper::track::track_accrued_pyth_fees,
  16. keeper::track::track_balance,
  17. keeper::track::track_provider,
  18. },
  19. ethers::{signers::Signer, types::U256},
  20. keeper_metrics::{AccountLabel, KeeperMetrics},
  21. std::{collections::HashSet, sync::Arc},
  22. tokio::{
  23. spawn,
  24. sync::{mpsc, RwLock},
  25. time::{self, Duration},
  26. },
  27. tracing::{self, Instrument},
  28. };
  29. pub(crate) mod block;
  30. pub(crate) mod commitment;
  31. pub(crate) mod fee;
  32. pub(crate) mod keeper_metrics;
  33. pub(crate) mod process_event;
  34. pub(crate) mod track;
  35. /// Track metrics in this interval
  36. const TRACK_INTERVAL: Duration = Duration::from_secs(10);
  37. /// Check whether we need to conduct a withdrawal at this interval.
  38. const WITHDRAW_INTERVAL: Duration = Duration::from_secs(300);
  39. /// Check whether we need to adjust the fee at this interval.
  40. const ADJUST_FEE_INTERVAL: Duration = Duration::from_secs(30);
  41. #[derive(Debug, Clone, Copy, PartialEq, Eq)]
  42. pub enum RequestState {
  43. /// Fulfilled means that the request was either revealed or we are sure we
  44. /// will not be able to reveal it.
  45. Fulfilled,
  46. /// We have already processed the request but couldn't fulfill it and we are
  47. /// unsure if we can fulfill it or not.
  48. Processed,
  49. }
  50. /// Run threads to handle events for the last `BACKLOG_RANGE` blocks, watch for new blocks and
  51. /// handle any events for the new blocks.
  52. #[tracing::instrument(name = "keeper", skip_all, fields(chain_id = chain_state.id))]
  53. pub async fn run_keeper_threads(
  54. private_key: String,
  55. chain_eth_config: EthereumConfig,
  56. chain_state: BlockchainState,
  57. metrics: Arc<KeeperMetrics>,
  58. rpc_metrics: Arc<RpcMetrics>,
  59. ) -> anyhow::Result<()> {
  60. tracing::info!("Starting keeper");
  61. let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
  62. tracing::info!("Latest safe block: {}", &latest_safe_block);
  63. let contract = Arc::new(
  64. InstrumentedSignablePythContract::from_config(
  65. &chain_eth_config,
  66. &private_key,
  67. chain_state.id.clone(),
  68. rpc_metrics.clone(),
  69. )
  70. .await?,
  71. );
  72. let keeper_address = contract.wallet().address();
  73. let fulfilled_requests_cache = Arc::new(RwLock::new(HashSet::<u64>::new()));
  74. // Spawn a thread to handle the events from last backlog_range blocks.
  75. let gas_limit: U256 = chain_eth_config.gas_limit.into();
  76. spawn(
  77. process_backlog(
  78. BlockRange {
  79. from: latest_safe_block.saturating_sub(chain_eth_config.backlog_range),
  80. to: latest_safe_block,
  81. },
  82. contract.clone(),
  83. gas_limit,
  84. chain_eth_config.escalation_policy.to_policy(),
  85. chain_state.clone(),
  86. metrics.clone(),
  87. fulfilled_requests_cache.clone(),
  88. chain_eth_config.block_delays.clone(),
  89. )
  90. .in_current_span(),
  91. );
  92. let (tx, rx) = mpsc::channel::<BlockRange>(1000);
  93. // Spawn a thread to watch for new blocks and send the range of blocks for which events has not been handled to the `tx` channel.
  94. spawn(watch_blocks_wrapper(chain_state.clone(), latest_safe_block, tx).in_current_span());
  95. // Spawn a thread for block processing with configured delays
  96. spawn(
  97. process_new_blocks(
  98. chain_state.clone(),
  99. rx,
  100. Arc::clone(&contract),
  101. gas_limit,
  102. chain_eth_config.escalation_policy.to_policy(),
  103. metrics.clone(),
  104. fulfilled_requests_cache.clone(),
  105. chain_eth_config.block_delays.clone(),
  106. )
  107. .in_current_span(),
  108. );
  109. // Spawn a thread that watches the keeper wallet balance and submits withdrawal transactions as needed to top-up the balance.
  110. spawn(
  111. withdraw_fees_wrapper(
  112. contract.clone(),
  113. chain_state.provider_address,
  114. WITHDRAW_INTERVAL,
  115. U256::from(chain_eth_config.min_keeper_balance),
  116. )
  117. .in_current_span(),
  118. );
  119. // Spawn a thread that periodically adjusts the provider fee.
  120. spawn(
  121. adjust_fee_wrapper(
  122. contract.clone(),
  123. chain_state.clone(),
  124. chain_state.provider_address,
  125. ADJUST_FEE_INTERVAL,
  126. chain_eth_config.legacy_tx,
  127. // NOTE: we are adjusting the fees based on the maximum configured gas for user transactions.
  128. // However, the keeper will pad the gas limit for transactions (per the escalation policy) to ensure reliable submission.
  129. // Consequently, fees can be adjusted such that transactions are still unprofitable.
  130. // While we could scale up this value based on the padding, that ends up overcharging users as most transactions cost nowhere
  131. // near the maximum gas limit.
  132. // In the unlikely event that the keeper fees aren't sufficient, the solution to this is to configure the target
  133. // fee percentage to be higher on that specific chain.
  134. chain_eth_config.gas_limit,
  135. // NOTE: unwrap() here so we panic early if someone configures these values below -100.
  136. u64::try_from(100 + chain_eth_config.min_profit_pct)
  137. .expect("min_profit_pct must be >= -100"),
  138. u64::try_from(100 + chain_eth_config.target_profit_pct)
  139. .expect("target_profit_pct must be >= -100"),
  140. u64::try_from(100 + chain_eth_config.max_profit_pct)
  141. .expect("max_profit_pct must be >= -100"),
  142. chain_eth_config.fee,
  143. metrics.clone(),
  144. )
  145. .in_current_span(),
  146. );
  147. spawn(update_commitments_loop(contract.clone(), chain_state.clone()).in_current_span());
  148. // Spawn a thread to track the provider info and the balance of the keeper
  149. spawn(
  150. async move {
  151. let chain_id = chain_state.id.clone();
  152. let chain_config = chain_eth_config.clone();
  153. let provider_address = chain_state.provider_address;
  154. let keeper_metrics = metrics.clone();
  155. let contract = match InstrumentedPythContract::from_config(
  156. &chain_config,
  157. chain_id.clone(),
  158. rpc_metrics,
  159. ) {
  160. Ok(r) => r,
  161. Err(e) => {
  162. tracing::error!("Error while connecting to pythnet contract. error: {:?}", e);
  163. return;
  164. }
  165. };
  166. loop {
  167. time::sleep(TRACK_INTERVAL).await;
  168. // Track provider info and balance sequentially. Note that the tracking is done sequentially with the
  169. // timestamp last. If there is a persistent error in any of these methods, the timestamp will lag behind
  170. // current time and trigger an alert.
  171. if let Err(e) = track_provider(
  172. chain_id.clone(),
  173. contract.clone(),
  174. provider_address,
  175. keeper_metrics.clone(),
  176. )
  177. .await
  178. {
  179. tracing::error!("Error tracking provider: {:?}", e);
  180. continue;
  181. }
  182. if let Err(e) = track_balance(
  183. chain_id.clone(),
  184. contract.client(),
  185. keeper_address,
  186. keeper_metrics.clone(),
  187. )
  188. .await
  189. {
  190. tracing::error!("Error tracking balance: {:?}", e);
  191. continue;
  192. }
  193. if let Err(e) = track_accrued_pyth_fees(
  194. chain_id.clone(),
  195. contract.clone(),
  196. keeper_metrics.clone(),
  197. )
  198. .await
  199. {
  200. tracing::error!("Error tracking accrued pyth fees: {:?}", e);
  201. continue;
  202. }
  203. if let Err(e) = track_block_timestamp_lag(
  204. chain_id.clone(),
  205. contract.client(),
  206. keeper_metrics.clone(),
  207. )
  208. .await
  209. {
  210. tracing::error!("Error tracking block timestamp lag: {:?}", e);
  211. continue;
  212. }
  213. }
  214. }
  215. .in_current_span(),
  216. );
  217. Ok(())
  218. }