keeper.rs 42 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242
  1. use {
  2. crate::{
  3. api::{
  4. self,
  5. BlockchainState,
  6. ChainId,
  7. },
  8. chain::{
  9. eth_gas_oracle::eip1559_default_estimator,
  10. ethereum::{
  11. InstrumentedPythContract,
  12. InstrumentedSignablePythContract,
  13. PythContractCall,
  14. },
  15. reader::{
  16. BlockNumber,
  17. RequestedWithCallbackEvent,
  18. },
  19. traced_client::{
  20. RpcMetrics,
  21. TracedClient,
  22. },
  23. },
  24. config::EthereumConfig,
  25. },
  26. anyhow::{
  27. anyhow,
  28. Result,
  29. },
  30. backoff::ExponentialBackoff,
  31. ethers::{
  32. providers::{
  33. Middleware,
  34. Provider,
  35. Ws,
  36. },
  37. signers::Signer,
  38. types::{
  39. Address,
  40. U256,
  41. },
  42. },
  43. futures::StreamExt,
  44. prometheus_client::{
  45. encoding::EncodeLabelSet,
  46. metrics::{
  47. counter::Counter,
  48. family::Family,
  49. gauge::Gauge,
  50. },
  51. registry::Registry,
  52. },
  53. std::{
  54. collections::HashSet,
  55. sync::{
  56. atomic::AtomicU64,
  57. Arc,
  58. },
  59. },
  60. tokio::{
  61. spawn,
  62. sync::{
  63. mpsc,
  64. RwLock,
  65. },
  66. time::{
  67. self,
  68. Duration,
  69. },
  70. },
  71. tracing::{
  72. self,
  73. Instrument,
  74. },
  75. };
  76. /// How much to wait before retrying in case of an RPC error
  77. const RETRY_INTERVAL: Duration = Duration::from_secs(5);
  78. /// How many blocks to look back for events that might be missed when starting the keeper
  79. const BACKLOG_RANGE: u64 = 1000;
  80. /// How many blocks to fetch events for in a single rpc call
  81. const BLOCK_BATCH_SIZE: u64 = 100;
  82. /// How much to wait before polling the next latest block
  83. const POLL_INTERVAL: Duration = Duration::from_secs(2);
  84. /// Track metrics in this interval
  85. const TRACK_INTERVAL: Duration = Duration::from_secs(10);
  86. /// Check whether we need to conduct a withdrawal at this interval.
  87. const WITHDRAW_INTERVAL: Duration = Duration::from_secs(300);
  88. /// Check whether we need to adjust the fee at this interval.
  89. const ADJUST_FEE_INTERVAL: Duration = Duration::from_secs(30);
  90. /// Check whether we need to manually update the commitments to reduce numHashes for future
  91. /// requests and reduce the gas cost of the reveal.
  92. const UPDATE_COMMITMENTS_INTERVAL: Duration = Duration::from_secs(30);
  93. const UPDATE_COMMITMENTS_THRESHOLD_FACTOR: f64 = 0.95;
  94. /// Rety last N blocks
  95. const RETRY_PREVIOUS_BLOCKS: u64 = 100;
  96. #[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
  97. pub struct AccountLabel {
  98. pub chain_id: String,
  99. pub address: String,
  100. }
  101. #[derive(Default)]
  102. pub struct KeeperMetrics {
  103. pub current_sequence_number: Family<AccountLabel, Gauge>,
  104. pub end_sequence_number: Family<AccountLabel, Gauge>,
  105. pub balance: Family<AccountLabel, Gauge<f64, AtomicU64>>,
  106. pub collected_fee: Family<AccountLabel, Gauge<f64, AtomicU64>>,
  107. pub current_fee: Family<AccountLabel, Gauge<f64, AtomicU64>>,
  108. pub total_gas_spent: Family<AccountLabel, Gauge<f64, AtomicU64>>,
  109. pub requests: Family<AccountLabel, Counter>,
  110. pub requests_processed: Family<AccountLabel, Counter>,
  111. pub requests_reprocessed: Family<AccountLabel, Counter>,
  112. pub reveals: Family<AccountLabel, Counter>,
  113. }
  114. impl KeeperMetrics {
  115. pub async fn new(registry: Arc<RwLock<Registry>>) -> Self {
  116. let mut writable_registry = registry.write().await;
  117. let keeper_metrics = KeeperMetrics::default();
  118. writable_registry.register(
  119. "current_sequence_number",
  120. "The sequence number for a new request",
  121. keeper_metrics.current_sequence_number.clone(),
  122. );
  123. writable_registry.register(
  124. "end_sequence_number",
  125. "The sequence number for the end request",
  126. keeper_metrics.end_sequence_number.clone(),
  127. );
  128. writable_registry.register(
  129. "requests",
  130. "Number of requests received through events",
  131. keeper_metrics.requests.clone(),
  132. );
  133. writable_registry.register(
  134. "requests_processed",
  135. "Number of requests processed",
  136. keeper_metrics.requests_processed.clone(),
  137. );
  138. writable_registry.register(
  139. "reveal",
  140. "Number of reveals",
  141. keeper_metrics.reveals.clone(),
  142. );
  143. writable_registry.register(
  144. "balance",
  145. "Balance of the keeper",
  146. keeper_metrics.balance.clone(),
  147. );
  148. writable_registry.register(
  149. "collected_fee",
  150. "Collected fee on the contract",
  151. keeper_metrics.collected_fee.clone(),
  152. );
  153. writable_registry.register(
  154. "current_fee",
  155. "Current fee charged by the provider",
  156. keeper_metrics.current_fee.clone(),
  157. );
  158. writable_registry.register(
  159. "total_gas_spent",
  160. "Total gas spent revealing requests",
  161. keeper_metrics.total_gas_spent.clone(),
  162. );
  163. writable_registry.register(
  164. "requests_reprocessed",
  165. "Number of requests reprocessed",
  166. keeper_metrics.requests_reprocessed.clone(),
  167. );
  168. keeper_metrics
  169. }
  170. }
  171. #[derive(Debug)]
  172. pub struct BlockRange {
  173. pub from: BlockNumber,
  174. pub to: BlockNumber,
  175. }
  176. #[derive(Debug, Clone, Copy, PartialEq, Eq)]
  177. pub enum RequestState {
  178. /// Fulfilled means that the request was either revealed or we are sure we
  179. /// will not be able to reveal it.
  180. Fulfilled,
  181. /// We have already processed the request but couldn't fulfill it and we are
  182. /// unsure if we can fulfill it or not.
  183. Processed,
  184. }
  185. /// Get the latest safe block number for the chain. Retry internally if there is an error.
  186. async fn get_latest_safe_block(chain_state: &BlockchainState) -> BlockNumber {
  187. loop {
  188. match chain_state
  189. .contract
  190. .get_block_number(chain_state.confirmed_block_status)
  191. .await
  192. {
  193. Ok(latest_confirmed_block) => {
  194. tracing::info!(
  195. "Fetched latest safe block {}",
  196. latest_confirmed_block - chain_state.reveal_delay_blocks
  197. );
  198. return latest_confirmed_block - chain_state.reveal_delay_blocks;
  199. }
  200. Err(e) => {
  201. tracing::error!("Error while getting block number. error: {:?}", e);
  202. time::sleep(RETRY_INTERVAL).await;
  203. }
  204. }
  205. }
  206. }
  207. /// Run threads to handle events for the last `BACKLOG_RANGE` blocks, watch for new blocks and
  208. /// handle any events for the new blocks.
  209. #[tracing::instrument(name = "keeper", skip_all, fields(chain_id = chain_state.id))]
  210. pub async fn run_keeper_threads(
  211. private_key: String,
  212. chain_eth_config: EthereumConfig,
  213. chain_state: BlockchainState,
  214. metrics: Arc<KeeperMetrics>,
  215. rpc_metrics: Arc<RpcMetrics>,
  216. ) {
  217. tracing::info!("starting keeper");
  218. let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
  219. tracing::info!("latest safe block: {}", &latest_safe_block);
  220. let contract = Arc::new(
  221. InstrumentedSignablePythContract::from_config(
  222. &chain_eth_config,
  223. &private_key,
  224. chain_state.id.clone(),
  225. rpc_metrics.clone(),
  226. )
  227. .await
  228. .expect("Chain config should be valid"),
  229. );
  230. let keeper_address = contract.wallet().address();
  231. let fulfilled_requests_cache = Arc::new(RwLock::new(HashSet::<u64>::new()));
  232. // Spawn a thread to handle the events from last BACKLOG_RANGE blocks.
  233. let gas_limit: U256 = chain_eth_config.gas_limit.into();
  234. spawn(
  235. process_backlog(
  236. BlockRange {
  237. from: latest_safe_block.saturating_sub(BACKLOG_RANGE),
  238. to: latest_safe_block,
  239. },
  240. contract.clone(),
  241. gas_limit,
  242. chain_state.clone(),
  243. metrics.clone(),
  244. fulfilled_requests_cache.clone(),
  245. )
  246. .in_current_span(),
  247. );
  248. let (tx, rx) = mpsc::channel::<BlockRange>(1000);
  249. // Spawn a thread to watch for new blocks and send the range of blocks for which events has not been handled to the `tx` channel.
  250. spawn(
  251. watch_blocks_wrapper(
  252. chain_state.clone(),
  253. latest_safe_block,
  254. tx,
  255. chain_eth_config.geth_rpc_wss.clone(),
  256. )
  257. .in_current_span(),
  258. );
  259. // Spawn a thread that listens for block ranges on the `rx` channel and processes the events for those blocks.
  260. spawn(
  261. process_new_blocks(
  262. chain_state.clone(),
  263. rx,
  264. Arc::clone(&contract),
  265. gas_limit,
  266. metrics.clone(),
  267. fulfilled_requests_cache.clone(),
  268. )
  269. .in_current_span(),
  270. );
  271. // Spawn a thread that watches the keeper wallet balance and submits withdrawal transactions as needed to top-up the balance.
  272. spawn(
  273. withdraw_fees_wrapper(
  274. contract.clone(),
  275. chain_state.provider_address,
  276. WITHDRAW_INTERVAL,
  277. U256::from(chain_eth_config.min_keeper_balance),
  278. )
  279. .in_current_span(),
  280. );
  281. // Spawn a thread that periodically adjusts the provider fee.
  282. spawn(
  283. adjust_fee_wrapper(
  284. contract.clone(),
  285. chain_state.provider_address,
  286. ADJUST_FEE_INTERVAL,
  287. chain_eth_config.legacy_tx,
  288. chain_eth_config.gas_limit,
  289. chain_eth_config.min_profit_pct,
  290. chain_eth_config.target_profit_pct,
  291. chain_eth_config.max_profit_pct,
  292. chain_eth_config.fee,
  293. )
  294. .in_current_span(),
  295. );
  296. spawn(update_commitments_loop(contract.clone(), chain_state.clone()).in_current_span());
  297. // Spawn a thread to track the provider info and the balance of the keeper
  298. spawn(
  299. async move {
  300. let chain_id = chain_state.id.clone();
  301. let chain_config = chain_eth_config.clone();
  302. let provider_address = chain_state.provider_address;
  303. let keeper_metrics = metrics.clone();
  304. let contract = match InstrumentedPythContract::from_config(
  305. &chain_config,
  306. chain_id.clone(),
  307. rpc_metrics,
  308. ) {
  309. Ok(r) => r,
  310. Err(e) => {
  311. tracing::error!("Error while connecting to pythnet contract. error: {:?}", e);
  312. return;
  313. }
  314. };
  315. loop {
  316. // There isn't a loop for indefinite trials. There is a new thread being spawned every `TRACK_INTERVAL` seconds.
  317. // If rpc start fails all of these threads will just exit, instead of retrying.
  318. // We are tracking rpc failures elsewhere, so it's fine.
  319. spawn(
  320. track_provider(
  321. chain_id.clone(),
  322. contract.clone(),
  323. provider_address,
  324. keeper_metrics.clone(),
  325. )
  326. .in_current_span(),
  327. );
  328. spawn(
  329. track_balance(
  330. chain_id.clone(),
  331. contract.client(),
  332. keeper_address,
  333. keeper_metrics.clone(),
  334. )
  335. .in_current_span(),
  336. );
  337. time::sleep(TRACK_INTERVAL).await;
  338. }
  339. }
  340. .in_current_span(),
  341. );
  342. }
  343. /// Process an event with backoff. It will retry the reveal on failure for 5 minutes.
  344. #[tracing::instrument(name = "process_event_with_backoff", skip_all, fields(
  345. sequence_number = event.sequence_number
  346. ))]
  347. pub async fn process_event_with_backoff(
  348. event: RequestedWithCallbackEvent,
  349. chain_state: BlockchainState,
  350. contract: Arc<InstrumentedSignablePythContract>,
  351. gas_limit: U256,
  352. metrics: Arc<KeeperMetrics>,
  353. ) {
  354. metrics
  355. .requests
  356. .get_or_create(&AccountLabel {
  357. chain_id: chain_state.id.clone(),
  358. address: chain_state.provider_address.to_string(),
  359. })
  360. .inc();
  361. tracing::info!("Started processing event");
  362. let mut backoff = ExponentialBackoff::default();
  363. backoff.max_elapsed_time = Some(Duration::from_secs(300)); // retry for 5 minutes
  364. match backoff::future::retry_notify(
  365. backoff,
  366. || async {
  367. process_event(&event, &chain_state, &contract, gas_limit, metrics.clone()).await
  368. },
  369. |e, dur| {
  370. tracing::error!("Error happened at {:?}: {}", dur, e);
  371. },
  372. )
  373. .await
  374. {
  375. Ok(()) => {
  376. tracing::info!("Processed event",);
  377. }
  378. Err(e) => {
  379. tracing::error!("Failed to process event: {:?}", e);
  380. }
  381. }
  382. metrics
  383. .requests_processed
  384. .get_or_create(&AccountLabel {
  385. chain_id: chain_state.id.clone(),
  386. address: chain_state.provider_address.to_string(),
  387. })
  388. .inc();
  389. }
  390. /// Process a callback on a chain. It estimates the gas for the reveal with callback and
  391. /// submits the transaction if the gas estimate is below the gas limit.
  392. /// It will return a permanent or transient error depending on the error type and whether
  393. /// retry is possible or not.
  394. pub async fn process_event(
  395. event: &RequestedWithCallbackEvent,
  396. chain_config: &BlockchainState,
  397. contract: &InstrumentedSignablePythContract,
  398. gas_limit: U256,
  399. metrics: Arc<KeeperMetrics>,
  400. ) -> Result<(), backoff::Error<anyhow::Error>> {
  401. // ignore requests that are not for the configured provider
  402. if chain_config.provider_address != event.provider_address {
  403. return Ok(());
  404. }
  405. let provider_revelation = chain_config
  406. .state
  407. .reveal(event.sequence_number)
  408. .map_err(|e| backoff::Error::permanent(anyhow!("Error revealing: {:?}", e)))?;
  409. let gas_estimate_res = chain_config
  410. .contract
  411. .estimate_reveal_with_callback_gas(
  412. contract.wallet().address(),
  413. event.provider_address,
  414. event.sequence_number,
  415. event.user_random_number,
  416. provider_revelation,
  417. )
  418. .in_current_span()
  419. .await;
  420. let gas_estimate = gas_estimate_res.map_err(|e| {
  421. // we consider the error transient even if it is a contract revert since
  422. // it can be because of routing to a lagging RPC node. Retrying such errors will
  423. // incur a few additional RPC calls, but it is fine.
  424. backoff::Error::transient(anyhow!("Error estimating gas for reveal: {:?}", e))
  425. })?;
  426. if gas_estimate > gas_limit {
  427. return Err(backoff::Error::permanent(anyhow!(
  428. "Gas estimate for reveal with callback is higher than the gas limit {} > {}",
  429. gas_estimate,
  430. gas_limit
  431. )));
  432. }
  433. // Pad the gas estimate by 25% after checking it against the gas limit
  434. let gas_estimate = gas_estimate.saturating_mul(5.into()) / 4;
  435. let contract_call = contract
  436. .reveal_with_callback(
  437. event.provider_address,
  438. event.sequence_number,
  439. event.user_random_number,
  440. provider_revelation,
  441. )
  442. .gas(gas_estimate);
  443. let client = contract.client();
  444. let mut transaction = contract_call.tx.clone();
  445. // manually fill the tx with the gas info, so we can log the details in case of error
  446. client
  447. .fill_transaction(&mut transaction, None)
  448. .await
  449. .map_err(|e| {
  450. backoff::Error::transient(anyhow!("Error filling the reveal transaction: {:?}", e))
  451. })?;
  452. let pending_tx = client
  453. .send_transaction(transaction.clone(), None)
  454. .await
  455. .map_err(|e| {
  456. backoff::Error::transient(anyhow!(
  457. "Error submitting the reveal transaction. Tx:{:?}, Error:{:?}",
  458. transaction,
  459. e
  460. ))
  461. })?;
  462. let receipt = pending_tx
  463. .await
  464. .map_err(|e| {
  465. backoff::Error::transient(anyhow!(
  466. "Error waiting for transaction receipt. Tx:{:?} Error:{:?}",
  467. transaction,
  468. e
  469. ))
  470. })?
  471. .ok_or_else(|| {
  472. // RPC may not return an error on tx submission if the nonce is too high.
  473. // But we will never get a receipt. So we reset the nonce manager to get the correct nonce.
  474. let nonce_manager = contract.client_ref().inner().inner();
  475. nonce_manager.reset();
  476. backoff::Error::transient(anyhow!(
  477. "Can't verify the reveal, probably dropped from mempool Tx:{:?}",
  478. transaction
  479. ))
  480. })?;
  481. tracing::info!(
  482. sequence_number = &event.sequence_number,
  483. transaction_hash = &receipt.transaction_hash.to_string(),
  484. gas_used = ?receipt.gas_used,
  485. "Revealed with res: {:?}",
  486. receipt
  487. );
  488. if let Some(gas_used) = receipt.gas_used {
  489. let gas_used = gas_used.as_u128() as f64 / 1e18;
  490. metrics
  491. .total_gas_spent
  492. .get_or_create(&AccountLabel {
  493. chain_id: chain_config.id.clone(),
  494. address: client
  495. .inner()
  496. .inner()
  497. .inner()
  498. .signer()
  499. .address()
  500. .to_string(),
  501. })
  502. .inc_by(gas_used);
  503. }
  504. metrics
  505. .reveals
  506. .get_or_create(&AccountLabel {
  507. chain_id: chain_config.id.clone(),
  508. address: chain_config.provider_address.to_string(),
  509. })
  510. .inc();
  511. Ok(())
  512. }
  513. /// Process a range of blocks in batches. It calls the `process_single_block_batch` method for each batch.
  514. #[tracing::instrument(skip_all, fields(
  515. range_from_block = block_range.from, range_to_block = block_range.to
  516. ))]
  517. pub async fn process_block_range(
  518. block_range: BlockRange,
  519. contract: Arc<InstrumentedSignablePythContract>,
  520. gas_limit: U256,
  521. chain_state: api::BlockchainState,
  522. metrics: Arc<KeeperMetrics>,
  523. fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
  524. ) {
  525. let BlockRange {
  526. from: first_block,
  527. to: last_block,
  528. } = block_range;
  529. let mut current_block = first_block;
  530. while current_block <= last_block {
  531. let mut to_block = current_block + BLOCK_BATCH_SIZE;
  532. if to_block > last_block {
  533. to_block = last_block;
  534. }
  535. // TODO: this is handling all blocks sequentially we might want to handle them in parallel in future.
  536. process_single_block_batch(
  537. BlockRange {
  538. from: current_block,
  539. to: to_block,
  540. },
  541. contract.clone(),
  542. gas_limit,
  543. chain_state.clone(),
  544. metrics.clone(),
  545. fulfilled_requests_cache.clone(),
  546. )
  547. .in_current_span()
  548. .await;
  549. current_block = to_block + 1;
  550. }
  551. }
  552. /// Process a batch of blocks for a chain. It will fetch events for all the blocks in a single call for the provided batch
  553. /// and then try to process them one by one. It checks the `fulfilled_request_cache`. If the request was already fulfilled.
  554. /// It won't reprocess it. If the request was already processed, it will reprocess it.
  555. /// If the process fails, it will retry indefinitely.
  556. #[tracing::instrument(name = "batch", skip_all, fields(
  557. batch_from_block = block_range.from, batch_to_block = block_range.to
  558. ))]
  559. pub async fn process_single_block_batch(
  560. block_range: BlockRange,
  561. contract: Arc<InstrumentedSignablePythContract>,
  562. gas_limit: U256,
  563. chain_state: api::BlockchainState,
  564. metrics: Arc<KeeperMetrics>,
  565. fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
  566. ) {
  567. loop {
  568. let events_res = chain_state
  569. .contract
  570. .get_request_with_callback_events(block_range.from, block_range.to)
  571. .await;
  572. match events_res {
  573. Ok(events) => {
  574. tracing::info!(num_of_events = &events.len(), "Processing",);
  575. for event in &events {
  576. // the write lock guarantees we spawn only one task per sequence number
  577. let newly_inserted = fulfilled_requests_cache
  578. .write()
  579. .await
  580. .insert(event.sequence_number);
  581. if newly_inserted {
  582. spawn(
  583. process_event_with_backoff(
  584. event.clone(),
  585. chain_state.clone(),
  586. contract.clone(),
  587. gas_limit,
  588. metrics.clone(),
  589. )
  590. .in_current_span(),
  591. );
  592. }
  593. }
  594. tracing::info!(num_of_events = &events.len(), "Processed",);
  595. break;
  596. }
  597. Err(e) => {
  598. tracing::error!(
  599. "Error while getting events. Waiting for {} seconds before retry. error: {:?}",
  600. RETRY_INTERVAL.as_secs(),
  601. e
  602. );
  603. time::sleep(RETRY_INTERVAL).await;
  604. }
  605. }
  606. }
  607. }
  608. /// Wrapper for the `watch_blocks` method. If there was an error while watching, it will retry after a delay.
  609. /// It retries indefinitely.
  610. #[tracing::instrument(name = "watch_blocks", skip_all, fields(
  611. initial_safe_block = latest_safe_block
  612. ))]
  613. pub async fn watch_blocks_wrapper(
  614. chain_state: BlockchainState,
  615. latest_safe_block: BlockNumber,
  616. tx: mpsc::Sender<BlockRange>,
  617. geth_rpc_wss: Option<String>,
  618. ) {
  619. let mut last_safe_block_processed = latest_safe_block;
  620. loop {
  621. if let Err(e) = watch_blocks(
  622. chain_state.clone(),
  623. &mut last_safe_block_processed,
  624. tx.clone(),
  625. geth_rpc_wss.clone(),
  626. )
  627. .in_current_span()
  628. .await
  629. {
  630. tracing::error!("watching blocks. error: {:?}", e);
  631. time::sleep(RETRY_INTERVAL).await;
  632. }
  633. }
  634. }
  635. /// Watch for new blocks and send the range of blocks for which events have not been handled to the `tx` channel.
  636. /// We are subscribing to new blocks instead of events. If we miss some blocks, it will be fine as we are sending
  637. /// block ranges to the `tx` channel. If we have subscribed to events, we could have missed those and won't even
  638. /// know about it.
  639. pub async fn watch_blocks(
  640. chain_state: BlockchainState,
  641. last_safe_block_processed: &mut BlockNumber,
  642. tx: mpsc::Sender<BlockRange>,
  643. geth_rpc_wss: Option<String>,
  644. ) -> Result<()> {
  645. tracing::info!("Watching blocks to handle new events");
  646. let provider_option = match geth_rpc_wss {
  647. Some(wss) => Some(match Provider::<Ws>::connect(wss.clone()).await {
  648. Ok(provider) => provider,
  649. Err(e) => {
  650. tracing::error!("Error while connecting to wss: {}. error: {:?}", wss, e);
  651. return Err(e.into());
  652. }
  653. }),
  654. None => {
  655. tracing::info!("No wss provided");
  656. None
  657. }
  658. };
  659. let mut stream_option = match provider_option {
  660. Some(ref provider) => Some(match provider.subscribe_blocks().await {
  661. Ok(client) => client,
  662. Err(e) => {
  663. tracing::error!("Error while subscribing to blocks. error {:?}", e);
  664. return Err(e.into());
  665. }
  666. }),
  667. None => None,
  668. };
  669. loop {
  670. match stream_option {
  671. Some(ref mut stream) => {
  672. if let None = stream.next().await {
  673. tracing::error!("Error blocks subscription stream ended");
  674. return Err(anyhow!("Error blocks subscription stream ended"));
  675. }
  676. }
  677. None => {
  678. time::sleep(POLL_INTERVAL).await;
  679. }
  680. }
  681. let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
  682. if latest_safe_block > *last_safe_block_processed {
  683. let mut from = latest_safe_block.saturating_sub(RETRY_PREVIOUS_BLOCKS);
  684. // In normal situation, the difference between latest and last safe block should not be more than 2-3 (for arbitrum it can be 10)
  685. // TODO: add a metric for this in separate PR. We need alerts
  686. // But in extreme situation, where we were unable to send the block range multiple times, the difference between latest_safe_block and
  687. // last_safe_block_processed can grow. It is fine to not have the retry mechanisms for those earliest blocks as we expect the rpc
  688. // to be in consistency after this much time.
  689. if from > *last_safe_block_processed {
  690. from = *last_safe_block_processed;
  691. }
  692. match tx
  693. .send(BlockRange {
  694. from,
  695. to: latest_safe_block,
  696. })
  697. .await
  698. {
  699. Ok(_) => {
  700. tracing::info!(
  701. from_block = from,
  702. to_block = &latest_safe_block,
  703. "Block range sent to handle events",
  704. );
  705. *last_safe_block_processed = latest_safe_block;
  706. }
  707. Err(e) => {
  708. tracing::error!(
  709. from_block = from,
  710. to_block = &latest_safe_block,
  711. "Error while sending block range to handle events. These will be handled in next call. error: {:?}",
  712. e
  713. );
  714. }
  715. };
  716. }
  717. }
  718. }
  719. /// It waits on rx channel to receive block ranges and then calls process_block_range to process them.
  720. #[tracing::instrument(skip_all)]
  721. pub async fn process_new_blocks(
  722. chain_state: BlockchainState,
  723. mut rx: mpsc::Receiver<BlockRange>,
  724. contract: Arc<InstrumentedSignablePythContract>,
  725. gas_limit: U256,
  726. metrics: Arc<KeeperMetrics>,
  727. fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
  728. ) {
  729. tracing::info!("Waiting for new block ranges to process");
  730. loop {
  731. if let Some(block_range) = rx.recv().await {
  732. process_block_range(
  733. block_range,
  734. Arc::clone(&contract),
  735. gas_limit,
  736. chain_state.clone(),
  737. metrics.clone(),
  738. fulfilled_requests_cache.clone(),
  739. )
  740. .in_current_span()
  741. .await;
  742. }
  743. }
  744. }
  745. /// Processes the backlog_range for a chain.
  746. #[tracing::instrument(skip_all)]
  747. pub async fn process_backlog(
  748. backlog_range: BlockRange,
  749. contract: Arc<InstrumentedSignablePythContract>,
  750. gas_limit: U256,
  751. chain_state: BlockchainState,
  752. metrics: Arc<KeeperMetrics>,
  753. fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
  754. ) {
  755. tracing::info!("Processing backlog");
  756. process_block_range(
  757. backlog_range,
  758. contract,
  759. gas_limit,
  760. chain_state,
  761. metrics,
  762. fulfilled_requests_cache,
  763. )
  764. .in_current_span()
  765. .await;
  766. tracing::info!("Backlog processed");
  767. }
  768. /// tracks the balance of the given address on the given chain
  769. /// if there was an error, the function will just return
  770. #[tracing::instrument(skip_all)]
  771. pub async fn track_balance(
  772. chain_id: String,
  773. provider: Arc<Provider<TracedClient>>,
  774. address: Address,
  775. metrics: Arc<KeeperMetrics>,
  776. ) {
  777. let balance = match provider.get_balance(address, None).await {
  778. // This conversion to u128 is fine as the total balance will never cross the limits
  779. // of u128 practically.
  780. Ok(r) => r.as_u128(),
  781. Err(e) => {
  782. tracing::error!("Error while getting balance. error: {:?}", e);
  783. return;
  784. }
  785. };
  786. // The f64 conversion is made to be able to serve metrics within the constraints of Prometheus.
  787. // The balance is in wei, so we need to divide by 1e18 to convert it to eth.
  788. let balance = balance as f64 / 1e18;
  789. metrics
  790. .balance
  791. .get_or_create(&AccountLabel {
  792. chain_id: chain_id.clone(),
  793. address: address.to_string(),
  794. })
  795. .set(balance);
  796. }
  797. /// tracks the collected fees and the hashchain data of the given provider address on the given chain
  798. /// if there is a error the function will just return
  799. #[tracing::instrument(skip_all)]
  800. pub async fn track_provider(
  801. chain_id: ChainId,
  802. contract: InstrumentedPythContract,
  803. provider_address: Address,
  804. metrics: Arc<KeeperMetrics>,
  805. ) {
  806. let provider_info = match contract.get_provider_info(provider_address).call().await {
  807. Ok(info) => info,
  808. Err(e) => {
  809. tracing::error!("Error while getting provider info. error: {:?}", e);
  810. return;
  811. }
  812. };
  813. // The f64 conversion is made to be able to serve metrics with the constraints of Prometheus.
  814. // The fee is in wei, so we divide by 1e18 to convert it to eth.
  815. let collected_fee = provider_info.accrued_fees_in_wei as f64 / 1e18;
  816. let current_fee: f64 = provider_info.fee_in_wei as f64 / 1e18;
  817. let current_sequence_number = provider_info.sequence_number;
  818. let end_sequence_number = provider_info.end_sequence_number;
  819. metrics
  820. .collected_fee
  821. .get_or_create(&AccountLabel {
  822. chain_id: chain_id.clone(),
  823. address: provider_address.to_string(),
  824. })
  825. .set(collected_fee);
  826. metrics
  827. .current_fee
  828. .get_or_create(&AccountLabel {
  829. chain_id: chain_id.clone(),
  830. address: provider_address.to_string(),
  831. })
  832. .set(current_fee);
  833. metrics
  834. .current_sequence_number
  835. .get_or_create(&AccountLabel {
  836. chain_id: chain_id.clone(),
  837. address: provider_address.to_string(),
  838. })
  839. // sequence_number type on chain is u64 but practically it will take
  840. // a long time for it to cross the limits of i64.
  841. // currently prometheus only supports i64 for Gauge types
  842. .set(current_sequence_number as i64);
  843. metrics
  844. .end_sequence_number
  845. .get_or_create(&AccountLabel {
  846. chain_id: chain_id.clone(),
  847. address: provider_address.to_string(),
  848. })
  849. .set(end_sequence_number as i64);
  850. }
  851. #[tracing::instrument(name = "withdraw_fees", skip_all, fields())]
  852. pub async fn withdraw_fees_wrapper(
  853. contract: Arc<InstrumentedSignablePythContract>,
  854. provider_address: Address,
  855. poll_interval: Duration,
  856. min_balance: U256,
  857. ) {
  858. loop {
  859. if let Err(e) = withdraw_fees_if_necessary(contract.clone(), provider_address, min_balance)
  860. .in_current_span()
  861. .await
  862. {
  863. tracing::error!("Withdrawing fees. error: {:?}", e);
  864. }
  865. time::sleep(poll_interval).await;
  866. }
  867. }
  868. /// Withdraws accumulated fees in the contract as needed to maintain the balance of the keeper wallet.
  869. pub async fn withdraw_fees_if_necessary(
  870. contract: Arc<InstrumentedSignablePythContract>,
  871. provider_address: Address,
  872. min_balance: U256,
  873. ) -> Result<()> {
  874. let provider = contract.provider();
  875. let wallet = contract.wallet();
  876. let keeper_balance = provider
  877. .get_balance(wallet.address(), None)
  878. .await
  879. .map_err(|e| anyhow!("Error while getting balance. error: {:?}", e))?;
  880. let provider_info = contract
  881. .get_provider_info(provider_address)
  882. .call()
  883. .await
  884. .map_err(|e| anyhow!("Error while getting provider info. error: {:?}", e))?;
  885. if provider_info.fee_manager != wallet.address() {
  886. return Err(anyhow!("Fee manager for provider {:?} is not the keeper wallet. Fee manager: {:?} Keeper: {:?}", provider, provider_info.fee_manager, wallet.address()));
  887. }
  888. let fees = provider_info.accrued_fees_in_wei;
  889. if keeper_balance < min_balance && U256::from(fees) > min_balance {
  890. tracing::info!("Claiming accrued fees...");
  891. let contract_call = contract.withdraw_as_fee_manager(provider_address, fees);
  892. send_and_confirm(contract_call).await?;
  893. } else if keeper_balance < min_balance {
  894. tracing::warn!("Keeper balance {:?} is too low (< {:?}) but provider fees are not sufficient to top-up.", keeper_balance, min_balance)
  895. }
  896. Ok(())
  897. }
  898. pub async fn send_and_confirm(contract_call: PythContractCall) -> Result<()> {
  899. let call_name = contract_call.function.name.as_str();
  900. let pending_tx = contract_call
  901. .send()
  902. .await
  903. .map_err(|e| anyhow!("Error submitting transaction({}) {:?}", call_name, e))?;
  904. let tx_result = pending_tx
  905. .await
  906. .map_err(|e| {
  907. anyhow!(
  908. "Error waiting for transaction({}) receipt: {:?}",
  909. call_name,
  910. e
  911. )
  912. })?
  913. .ok_or_else(|| {
  914. anyhow!(
  915. "Can't verify the transaction({}), probably dropped from mempool",
  916. call_name
  917. )
  918. })?;
  919. tracing::info!(
  920. transaction_hash = &tx_result.transaction_hash.to_string(),
  921. "Confirmed transaction({}). Receipt: {:?}",
  922. call_name,
  923. tx_result,
  924. );
  925. Ok(())
  926. }
  927. #[tracing::instrument(name = "adjust_fee", skip_all)]
  928. pub async fn adjust_fee_wrapper(
  929. contract: Arc<InstrumentedSignablePythContract>,
  930. provider_address: Address,
  931. poll_interval: Duration,
  932. legacy_tx: bool,
  933. gas_limit: u64,
  934. min_profit_pct: u64,
  935. target_profit_pct: u64,
  936. max_profit_pct: u64,
  937. min_fee_wei: u128,
  938. ) {
  939. // The maximum balance of accrued fees + provider wallet balance. None if we haven't observed a value yet.
  940. let mut high_water_pnl: Option<U256> = None;
  941. // The sequence number where the keeper last updated the on-chain fee. None if we haven't observed it yet.
  942. let mut sequence_number_of_last_fee_update: Option<u64> = None;
  943. loop {
  944. if let Err(e) = adjust_fee_if_necessary(
  945. contract.clone(),
  946. provider_address,
  947. legacy_tx,
  948. gas_limit,
  949. min_profit_pct,
  950. target_profit_pct,
  951. max_profit_pct,
  952. min_fee_wei,
  953. &mut high_water_pnl,
  954. &mut sequence_number_of_last_fee_update,
  955. )
  956. .in_current_span()
  957. .await
  958. {
  959. tracing::error!("Withdrawing fees. error: {:?}", e);
  960. }
  961. time::sleep(poll_interval).await;
  962. }
  963. }
  964. #[tracing::instrument(name = "update_commitments", skip_all)]
  965. pub async fn update_commitments_loop(
  966. contract: Arc<InstrumentedSignablePythContract>,
  967. chain_state: BlockchainState,
  968. ) {
  969. loop {
  970. if let Err(e) = update_commitments_if_necessary(contract.clone(), &chain_state)
  971. .in_current_span()
  972. .await
  973. {
  974. tracing::error!("Update commitments. error: {:?}", e);
  975. }
  976. time::sleep(UPDATE_COMMITMENTS_INTERVAL).await;
  977. }
  978. }
  979. pub async fn update_commitments_if_necessary(
  980. contract: Arc<InstrumentedSignablePythContract>,
  981. chain_state: &BlockchainState,
  982. ) -> Result<()> {
  983. //TODO: we can reuse the result from the last call from the watch_blocks thread to reduce RPCs
  984. let latest_safe_block = get_latest_safe_block(chain_state).in_current_span().await;
  985. let provider_address = chain_state.provider_address;
  986. let provider_info = contract
  987. .get_provider_info(provider_address)
  988. .block(latest_safe_block) // To ensure we are not revealing sooner than we should
  989. .call()
  990. .await
  991. .map_err(|e| anyhow!("Error while getting provider info. error: {:?}", e))?;
  992. if provider_info.max_num_hashes == 0 {
  993. return Ok(());
  994. }
  995. let threshold =
  996. ((provider_info.max_num_hashes as f64) * UPDATE_COMMITMENTS_THRESHOLD_FACTOR) as u64;
  997. if provider_info.sequence_number - provider_info.current_commitment_sequence_number > threshold
  998. {
  999. let seq_number = provider_info.sequence_number - 1;
  1000. let provider_revelation = chain_state
  1001. .state
  1002. .reveal(seq_number)
  1003. .map_err(|e| anyhow!("Error revealing: {:?}", e))?;
  1004. let contract_call =
  1005. contract.advance_provider_commitment(provider_address, seq_number, provider_revelation);
  1006. send_and_confirm(contract_call).await?;
  1007. }
  1008. Ok(())
  1009. }
  1010. /// Adjust the fee charged by the provider to ensure that it is profitable at the prevailing gas price.
  1011. /// This method targets a fee as a function of the maximum cost of the callback,
  1012. /// c = (gas_limit) * (current gas price), with min_fee_wei as a lower bound on the fee.
  1013. ///
  1014. /// The method then updates the on-chain fee if all of the following are satisfied:
  1015. /// - the on-chain fee does not fall into an interval [c*min_profit, c*max_profit]. The tolerance
  1016. /// factor prevents the on-chain fee from changing with every single gas price fluctuation.
  1017. /// Profit scalars are specified in percentage units, min_profit = (min_profit_pct + 100) / 100
  1018. /// - either the fee is increasing or the keeper is earning a profit -- i.e., fees only decrease when the keeper is profitable
  1019. /// - at least one random number has been requested since the last fee update
  1020. ///
  1021. /// These conditions are intended to make sure that the keeper is profitable while also minimizing the number of fee
  1022. /// update transactions.
  1023. pub async fn adjust_fee_if_necessary(
  1024. contract: Arc<InstrumentedSignablePythContract>,
  1025. provider_address: Address,
  1026. legacy_tx: bool,
  1027. gas_limit: u64,
  1028. min_profit_pct: u64,
  1029. target_profit_pct: u64,
  1030. max_profit_pct: u64,
  1031. min_fee_wei: u128,
  1032. high_water_pnl: &mut Option<U256>,
  1033. sequence_number_of_last_fee_update: &mut Option<u64>,
  1034. ) -> Result<()> {
  1035. let provider_info = contract
  1036. .get_provider_info(provider_address)
  1037. .call()
  1038. .await
  1039. .map_err(|e| anyhow!("Error while getting provider info. error: {:?}", e))?;
  1040. if provider_info.fee_manager != contract.wallet().address() {
  1041. return Err(anyhow!("Fee manager for provider {:?} is not the keeper wallet. Fee manager: {:?} Keeper: {:?}", contract.provider(), provider_info.fee_manager, contract.wallet().address()));
  1042. }
  1043. // Calculate target window for the on-chain fee.
  1044. let max_callback_cost: u128 = estimate_tx_cost(contract.clone(), legacy_tx, gas_limit.into())
  1045. .await
  1046. .map_err(|e| anyhow!("Could not estimate transaction cost. error {:?}", e))?;
  1047. let target_fee_min = std::cmp::max(
  1048. (max_callback_cost * (100 + u128::from(min_profit_pct))) / 100,
  1049. min_fee_wei,
  1050. );
  1051. let target_fee = std::cmp::max(
  1052. (max_callback_cost * (100 + u128::from(target_profit_pct))) / 100,
  1053. min_fee_wei,
  1054. );
  1055. let target_fee_max = std::cmp::max(
  1056. (max_callback_cost * (100 + u128::from(max_profit_pct))) / 100,
  1057. min_fee_wei,
  1058. );
  1059. // Calculate current P&L to determine if we can reduce fees.
  1060. let current_keeper_balance = contract
  1061. .provider()
  1062. .get_balance(contract.wallet().address(), None)
  1063. .await
  1064. .map_err(|e| anyhow!("Error while getting balance. error: {:?}", e))?;
  1065. let current_keeper_fees = U256::from(provider_info.accrued_fees_in_wei);
  1066. let current_pnl = current_keeper_balance + current_keeper_fees;
  1067. let can_reduce_fees = match high_water_pnl {
  1068. Some(x) => current_pnl >= *x,
  1069. None => false,
  1070. };
  1071. // Determine if the chain has seen activity since the last fee update.
  1072. let is_chain_active: bool = match sequence_number_of_last_fee_update {
  1073. Some(n) => provider_info.sequence_number > *n,
  1074. None => {
  1075. // We don't want to adjust the fees on server start for unused chains, hence false here.
  1076. false
  1077. }
  1078. };
  1079. let provider_fee: u128 = provider_info.fee_in_wei;
  1080. if is_chain_active
  1081. && ((provider_fee > target_fee_max && can_reduce_fees) || provider_fee < target_fee_min)
  1082. {
  1083. tracing::info!(
  1084. "Adjusting fees. Current: {:?} Target: {:?}",
  1085. provider_fee,
  1086. target_fee
  1087. );
  1088. let contract_call = contract.set_provider_fee_as_fee_manager(provider_address, target_fee);
  1089. send_and_confirm(contract_call).await?;
  1090. *sequence_number_of_last_fee_update = Some(provider_info.sequence_number);
  1091. } else {
  1092. tracing::info!(
  1093. "Skipping fee adjustment. Current: {:?} Target: {:?} [{:?}, {:?}] Current Sequence Number: {:?} Last updated sequence number {:?} Current pnl: {:?} High water pnl: {:?}",
  1094. provider_fee,
  1095. target_fee,
  1096. target_fee_min,
  1097. target_fee_max,
  1098. provider_info.sequence_number,
  1099. sequence_number_of_last_fee_update,
  1100. current_pnl,
  1101. high_water_pnl
  1102. )
  1103. }
  1104. // Update high water pnl
  1105. *high_water_pnl = Some(std::cmp::max(
  1106. current_pnl,
  1107. high_water_pnl.unwrap_or(U256::from(0)),
  1108. ));
  1109. // Update sequence number on server start.
  1110. match sequence_number_of_last_fee_update {
  1111. Some(_) => (),
  1112. None => {
  1113. *sequence_number_of_last_fee_update = Some(provider_info.sequence_number);
  1114. }
  1115. };
  1116. Ok(())
  1117. }
  1118. /// Estimate the cost (in wei) of a transaction consuming gas_used gas.
  1119. pub async fn estimate_tx_cost(
  1120. contract: Arc<InstrumentedSignablePythContract>,
  1121. use_legacy_tx: bool,
  1122. gas_used: u128,
  1123. ) -> Result<u128> {
  1124. let middleware = contract.client();
  1125. let gas_price: u128 = if use_legacy_tx {
  1126. middleware
  1127. .get_gas_price()
  1128. .await
  1129. .map_err(|e| anyhow!("Failed to fetch gas price. error: {:?}", e))?
  1130. .try_into()
  1131. .map_err(|e| anyhow!("gas price doesn't fit into 128 bits. error: {:?}", e))?
  1132. } else {
  1133. let (max_fee_per_gas, max_priority_fee_per_gas) = middleware
  1134. .estimate_eip1559_fees(Some(eip1559_default_estimator))
  1135. .await?;
  1136. (max_fee_per_gas + max_priority_fee_per_gas)
  1137. .try_into()
  1138. .map_err(|e| anyhow!("gas price doesn't fit into 128 bits. error: {:?}", e))?
  1139. };
  1140. Ok(gas_price * gas_used)
  1141. }