keeper.rs 42 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243
  1. use {
  2. crate::{
  3. api::{
  4. self,
  5. BlockchainState,
  6. ChainId,
  7. },
  8. chain::{
  9. eth_gas_oracle::eip1559_default_estimator,
  10. ethereum::{
  11. InstrumentedPythContract,
  12. InstrumentedSignablePythContract,
  13. PythContractCall,
  14. },
  15. reader::{
  16. BlockNumber,
  17. RequestedWithCallbackEvent,
  18. },
  19. traced_client::{
  20. RpcMetrics,
  21. TracedClient,
  22. },
  23. },
  24. config::EthereumConfig,
  25. },
  26. anyhow::{
  27. anyhow,
  28. Result,
  29. },
  30. backoff::ExponentialBackoff,
  31. ethers::{
  32. providers::{
  33. Middleware,
  34. Provider,
  35. Ws,
  36. },
  37. signers::Signer,
  38. types::{
  39. Address,
  40. U256,
  41. },
  42. },
  43. futures::StreamExt,
  44. prometheus_client::{
  45. encoding::EncodeLabelSet,
  46. metrics::{
  47. counter::Counter,
  48. family::Family,
  49. gauge::Gauge,
  50. },
  51. registry::Registry,
  52. },
  53. std::{
  54. collections::HashSet,
  55. sync::{
  56. atomic::AtomicU64,
  57. Arc,
  58. },
  59. },
  60. tokio::{
  61. spawn,
  62. sync::{
  63. mpsc,
  64. RwLock,
  65. },
  66. time::{
  67. self,
  68. Duration,
  69. },
  70. },
  71. tracing::{
  72. self,
  73. Instrument,
  74. },
  75. };
  76. /// How much to wait before retrying in case of an RPC error
  77. const RETRY_INTERVAL: Duration = Duration::from_secs(5);
  78. /// How many blocks to look back for events that might be missed when starting the keeper
  79. const BACKLOG_RANGE: u64 = 1000;
  80. /// How many blocks to fetch events for in a single rpc call
  81. const BLOCK_BATCH_SIZE: u64 = 100;
  82. /// How much to wait before polling the next latest block
  83. const POLL_INTERVAL: Duration = Duration::from_secs(2);
  84. /// Track metrics in this interval
  85. const TRACK_INTERVAL: Duration = Duration::from_secs(10);
  86. /// Check whether we need to conduct a withdrawal at this interval.
  87. const WITHDRAW_INTERVAL: Duration = Duration::from_secs(300);
  88. /// Check whether we need to adjust the fee at this interval.
  89. const ADJUST_FEE_INTERVAL: Duration = Duration::from_secs(30);
  90. /// Check whether we need to manually update the commitments to reduce numHashes for future
  91. /// requests and reduce the gas cost of the reveal.
  92. const UPDATE_COMMITMENTS_INTERVAL: Duration = Duration::from_secs(30);
  93. const UPDATE_COMMITMENTS_THRESHOLD_FACTOR: f64 = 0.95;
  94. /// Rety last N blocks
  95. const RETRY_PREVIOUS_BLOCKS: u64 = 100;
  96. #[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
  97. pub struct AccountLabel {
  98. pub chain_id: String,
  99. pub address: String,
  100. }
  101. #[derive(Default)]
  102. pub struct KeeperMetrics {
  103. pub current_sequence_number: Family<AccountLabel, Gauge>,
  104. pub end_sequence_number: Family<AccountLabel, Gauge>,
  105. pub balance: Family<AccountLabel, Gauge<f64, AtomicU64>>,
  106. pub collected_fee: Family<AccountLabel, Gauge<f64, AtomicU64>>,
  107. pub current_fee: Family<AccountLabel, Gauge<f64, AtomicU64>>,
  108. pub total_gas_spent: Family<AccountLabel, Gauge<f64, AtomicU64>>,
  109. pub requests: Family<AccountLabel, Counter>,
  110. pub requests_processed: Family<AccountLabel, Counter>,
  111. pub requests_reprocessed: Family<AccountLabel, Counter>,
  112. pub reveals: Family<AccountLabel, Counter>,
  113. }
  114. impl KeeperMetrics {
  115. pub async fn new(registry: Arc<RwLock<Registry>>) -> Self {
  116. let mut writable_registry = registry.write().await;
  117. let keeper_metrics = KeeperMetrics::default();
  118. writable_registry.register(
  119. "current_sequence_number",
  120. "The sequence number for a new request",
  121. keeper_metrics.current_sequence_number.clone(),
  122. );
  123. writable_registry.register(
  124. "end_sequence_number",
  125. "The sequence number for the end request",
  126. keeper_metrics.end_sequence_number.clone(),
  127. );
  128. writable_registry.register(
  129. "requests",
  130. "Number of requests received through events",
  131. keeper_metrics.requests.clone(),
  132. );
  133. writable_registry.register(
  134. "requests_processed",
  135. "Number of requests processed",
  136. keeper_metrics.requests_processed.clone(),
  137. );
  138. writable_registry.register(
  139. "reveal",
  140. "Number of reveals",
  141. keeper_metrics.reveals.clone(),
  142. );
  143. writable_registry.register(
  144. "balance",
  145. "Balance of the keeper",
  146. keeper_metrics.balance.clone(),
  147. );
  148. writable_registry.register(
  149. "collected_fee",
  150. "Collected fee on the contract",
  151. keeper_metrics.collected_fee.clone(),
  152. );
  153. writable_registry.register(
  154. "current_fee",
  155. "Current fee charged by the provider",
  156. keeper_metrics.current_fee.clone(),
  157. );
  158. writable_registry.register(
  159. "total_gas_spent",
  160. "Total gas spent revealing requests",
  161. keeper_metrics.total_gas_spent.clone(),
  162. );
  163. writable_registry.register(
  164. "requests_reprocessed",
  165. "Number of requests reprocessed",
  166. keeper_metrics.requests_reprocessed.clone(),
  167. );
  168. keeper_metrics
  169. }
  170. }
  171. #[derive(Debug)]
  172. pub struct BlockRange {
  173. pub from: BlockNumber,
  174. pub to: BlockNumber,
  175. }
  176. #[derive(Debug, Clone, Copy, PartialEq, Eq)]
  177. pub enum RequestState {
  178. /// Fulfilled means that the request was either revealed or we are sure we
  179. /// will not be able to reveal it.
  180. Fulfilled,
  181. /// We have already processed the request but couldn't fulfill it and we are
  182. /// unsure if we can fulfill it or not.
  183. Processed,
  184. }
  185. /// Get the latest safe block number for the chain. Retry internally if there is an error.
  186. async fn get_latest_safe_block(chain_state: &BlockchainState) -> BlockNumber {
  187. loop {
  188. match chain_state
  189. .contract
  190. .get_block_number(chain_state.confirmed_block_status)
  191. .await
  192. {
  193. Ok(latest_confirmed_block) => {
  194. tracing::info!(
  195. "Fetched latest safe block {}",
  196. latest_confirmed_block - chain_state.reveal_delay_blocks
  197. );
  198. return latest_confirmed_block - chain_state.reveal_delay_blocks;
  199. }
  200. Err(e) => {
  201. tracing::error!("Error while getting block number. error: {:?}", e);
  202. time::sleep(RETRY_INTERVAL).await;
  203. }
  204. }
  205. }
  206. }
  207. /// Run threads to handle events for the last `BACKLOG_RANGE` blocks, watch for new blocks and
  208. /// handle any events for the new blocks.
  209. #[tracing::instrument(name = "keeper", skip_all, fields(chain_id = chain_state.id))]
  210. pub async fn run_keeper_threads(
  211. private_key: String,
  212. chain_eth_config: EthereumConfig,
  213. chain_state: BlockchainState,
  214. metrics: Arc<KeeperMetrics>,
  215. rpc_metrics: Arc<RpcMetrics>,
  216. ) {
  217. tracing::info!("starting keeper");
  218. let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
  219. tracing::info!("latest safe block: {}", &latest_safe_block);
  220. let contract = Arc::new(
  221. InstrumentedSignablePythContract::from_config(
  222. &chain_eth_config,
  223. &private_key,
  224. chain_state.id.clone(),
  225. rpc_metrics.clone(),
  226. )
  227. .await
  228. .expect("Chain config should be valid"),
  229. );
  230. let keeper_address = contract.wallet().address();
  231. let fulfilled_requests_cache = Arc::new(RwLock::new(HashSet::<u64>::new()));
  232. // Spawn a thread to handle the events from last BACKLOG_RANGE blocks.
  233. let gas_limit: U256 = chain_eth_config.gas_limit.into();
  234. spawn(
  235. process_backlog(
  236. BlockRange {
  237. from: latest_safe_block.saturating_sub(BACKLOG_RANGE),
  238. to: latest_safe_block,
  239. },
  240. contract.clone(),
  241. gas_limit,
  242. chain_state.clone(),
  243. metrics.clone(),
  244. fulfilled_requests_cache.clone(),
  245. )
  246. .in_current_span(),
  247. );
  248. let (tx, rx) = mpsc::channel::<BlockRange>(1000);
  249. // Spawn a thread to watch for new blocks and send the range of blocks for which events has not been handled to the `tx` channel.
  250. spawn(
  251. watch_blocks_wrapper(
  252. chain_state.clone(),
  253. latest_safe_block,
  254. tx,
  255. chain_eth_config.geth_rpc_wss.clone(),
  256. )
  257. .in_current_span(),
  258. );
  259. // Spawn a thread that listens for block ranges on the `rx` channel and processes the events for those blocks.
  260. spawn(
  261. process_new_blocks(
  262. chain_state.clone(),
  263. rx,
  264. Arc::clone(&contract),
  265. gas_limit,
  266. metrics.clone(),
  267. fulfilled_requests_cache.clone(),
  268. )
  269. .in_current_span(),
  270. );
  271. // Spawn a thread that watches the keeper wallet balance and submits withdrawal transactions as needed to top-up the balance.
  272. spawn(
  273. withdraw_fees_wrapper(
  274. contract.clone(),
  275. chain_state.provider_address.clone(),
  276. WITHDRAW_INTERVAL,
  277. U256::from(chain_eth_config.min_keeper_balance),
  278. )
  279. .in_current_span(),
  280. );
  281. // Spawn a thread that periodically adjusts the provider fee.
  282. spawn(
  283. adjust_fee_wrapper(
  284. contract.clone(),
  285. chain_state.provider_address.clone(),
  286. ADJUST_FEE_INTERVAL,
  287. chain_eth_config.legacy_tx,
  288. chain_eth_config.gas_limit,
  289. chain_eth_config.min_profit_pct,
  290. chain_eth_config.target_profit_pct,
  291. chain_eth_config.max_profit_pct,
  292. chain_eth_config.fee,
  293. )
  294. .in_current_span(),
  295. );
  296. spawn(update_commitments_loop(contract.clone(), chain_state.clone()).in_current_span());
  297. // Spawn a thread to track the provider info and the balance of the keeper
  298. spawn(
  299. async move {
  300. let chain_id = chain_state.id.clone();
  301. let chain_config = chain_eth_config.clone();
  302. let provider_address = chain_state.provider_address.clone();
  303. let keeper_metrics = metrics.clone();
  304. let contract = match InstrumentedPythContract::from_config(
  305. &chain_config,
  306. chain_id.clone(),
  307. rpc_metrics,
  308. ) {
  309. Ok(r) => r,
  310. Err(e) => {
  311. tracing::error!("Error while connecting to pythnet contract. error: {:?}", e);
  312. return;
  313. }
  314. };
  315. loop {
  316. // There isn't a loop for indefinite trials. There is a new thread being spawned every `TRACK_INTERVAL` seconds.
  317. // If rpc start fails all of these threads will just exit, instead of retrying.
  318. // We are tracking rpc failures elsewhere, so it's fine.
  319. spawn(
  320. track_provider(
  321. chain_id.clone(),
  322. contract.clone(),
  323. provider_address.clone(),
  324. keeper_metrics.clone(),
  325. )
  326. .in_current_span(),
  327. );
  328. spawn(
  329. track_balance(
  330. chain_id.clone(),
  331. contract.client(),
  332. keeper_address.clone(),
  333. keeper_metrics.clone(),
  334. )
  335. .in_current_span(),
  336. );
  337. time::sleep(TRACK_INTERVAL).await;
  338. }
  339. }
  340. .in_current_span(),
  341. );
  342. }
  343. /// Process an event with backoff. It will retry the reveal on failure for 5 minutes.
  344. #[tracing::instrument(name = "process_event_with_backoff", skip_all, fields(
  345. sequence_number = event.sequence_number
  346. ))]
  347. pub async fn process_event_with_backoff(
  348. event: RequestedWithCallbackEvent,
  349. chain_state: BlockchainState,
  350. contract: Arc<InstrumentedSignablePythContract>,
  351. gas_limit: U256,
  352. metrics: Arc<KeeperMetrics>,
  353. ) {
  354. metrics
  355. .requests
  356. .get_or_create(&AccountLabel {
  357. chain_id: chain_state.id.clone(),
  358. address: chain_state.provider_address.to_string(),
  359. })
  360. .inc();
  361. tracing::info!("Started processing event");
  362. let mut backoff = ExponentialBackoff::default();
  363. backoff.max_elapsed_time = Some(Duration::from_secs(300)); // retry for 5 minutes
  364. match backoff::future::retry_notify(
  365. backoff,
  366. || async {
  367. process_event(&event, &chain_state, &contract, gas_limit, metrics.clone()).await
  368. },
  369. |e, dur| {
  370. tracing::error!("Error happened at {:?}: {}", dur, e);
  371. },
  372. )
  373. .await
  374. {
  375. Ok(()) => {
  376. tracing::info!("Processed event",);
  377. }
  378. Err(e) => {
  379. tracing::error!("Failed to process event: {:?}", e);
  380. }
  381. }
  382. metrics
  383. .requests_processed
  384. .get_or_create(&AccountLabel {
  385. chain_id: chain_state.id.clone(),
  386. address: chain_state.provider_address.to_string(),
  387. })
  388. .inc();
  389. }
  390. /// Process a callback on a chain. It estimates the gas for the reveal with callback and
  391. /// submits the transaction if the gas estimate is below the gas limit.
  392. /// It will return a permanent or transient error depending on the error type and whether
  393. /// retry is possible or not.
  394. pub async fn process_event(
  395. event: &RequestedWithCallbackEvent,
  396. chain_config: &BlockchainState,
  397. contract: &InstrumentedSignablePythContract,
  398. gas_limit: U256,
  399. metrics: Arc<KeeperMetrics>,
  400. ) -> Result<(), backoff::Error<anyhow::Error>> {
  401. // ignore requests that are not for the configured provider
  402. if chain_config.provider_address != event.provider_address {
  403. return Ok(());
  404. }
  405. let provider_revelation = chain_config
  406. .state
  407. .reveal(event.sequence_number)
  408. .map_err(|e| backoff::Error::permanent(anyhow!("Error revealing: {:?}", e)))?;
  409. let gas_estimate_res = chain_config
  410. .contract
  411. .estimate_reveal_with_callback_gas(
  412. event.provider_address,
  413. event.sequence_number,
  414. event.user_random_number,
  415. provider_revelation,
  416. )
  417. .in_current_span()
  418. .await;
  419. let gas_estimate = gas_estimate_res.map_err(|e| {
  420. // we consider the error transient even if it is a contract revert since
  421. // it can be because of routing to a lagging RPC node. Retrying such errors will
  422. // incur a few additional RPC calls, but it is fine.
  423. backoff::Error::transient(anyhow!("Error estimating gas for reveal: {:?}", e))
  424. })?;
  425. if gas_estimate > gas_limit {
  426. return Err(backoff::Error::permanent(anyhow!(
  427. "Gas estimate for reveal with callback is higher than the gas limit {} > {}",
  428. gas_estimate,
  429. gas_limit
  430. )));
  431. }
  432. // Pad the gas estimate by 25% after checking it against the gas limit
  433. let gas_estimate = gas_estimate.saturating_mul(5.into()) / 4;
  434. let contract_call = contract
  435. .reveal_with_callback(
  436. event.provider_address,
  437. event.sequence_number,
  438. event.user_random_number,
  439. provider_revelation,
  440. )
  441. .gas(gas_estimate);
  442. let client = contract.client();
  443. let mut transaction = contract_call.tx.clone();
  444. // manually fill the tx with the gas info, so we can log the details in case of error
  445. client
  446. .fill_transaction(&mut transaction, None)
  447. .await
  448. .map_err(|e| {
  449. backoff::Error::transient(anyhow!("Error filling the reveal transaction: {:?}", e))
  450. })?;
  451. let pending_tx = client
  452. .send_transaction(transaction.clone(), None)
  453. .await
  454. .map_err(|e| {
  455. backoff::Error::transient(anyhow!(
  456. "Error submitting the reveal transaction. Tx:{:?}, Error:{:?}",
  457. transaction,
  458. e
  459. ))
  460. })?;
  461. let receipt = pending_tx
  462. .await
  463. .map_err(|e| {
  464. backoff::Error::transient(anyhow!(
  465. "Error waiting for transaction receipt. Tx:{:?} Error:{:?}",
  466. transaction,
  467. e
  468. ))
  469. })?
  470. .ok_or_else(|| {
  471. // RPC may not return an error on tx submission if the nonce is too high.
  472. // But we will never get a receipt. So we reset the nonce manager to get the correct nonce.
  473. let nonce_manager = contract.client_ref().inner().inner();
  474. nonce_manager.reset();
  475. backoff::Error::transient(anyhow!(
  476. "Can't verify the reveal, probably dropped from mempool Tx:{:?}",
  477. transaction
  478. ))
  479. })?;
  480. tracing::info!(
  481. sequence_number = &event.sequence_number,
  482. transaction_hash = &receipt.transaction_hash.to_string(),
  483. gas_used = ?receipt.gas_used,
  484. "Revealed with res: {:?}",
  485. receipt
  486. );
  487. if let Some(gas_used) = receipt.gas_used {
  488. let gas_used = gas_used.as_u128() as f64 / 1e18;
  489. metrics
  490. .total_gas_spent
  491. .get_or_create(&AccountLabel {
  492. chain_id: chain_config.id.clone(),
  493. address: client
  494. .inner()
  495. .inner()
  496. .inner()
  497. .signer()
  498. .address()
  499. .to_string(),
  500. })
  501. .inc_by(gas_used);
  502. }
  503. metrics
  504. .reveals
  505. .get_or_create(&AccountLabel {
  506. chain_id: chain_config.id.clone(),
  507. address: chain_config.provider_address.to_string(),
  508. })
  509. .inc();
  510. Ok(())
  511. }
  512. /// Process a range of blocks in batches. It calls the `process_single_block_batch` method for each batch.
  513. #[tracing::instrument(skip_all, fields(
  514. range_from_block = block_range.from, range_to_block = block_range.to
  515. ))]
  516. pub async fn process_block_range(
  517. block_range: BlockRange,
  518. contract: Arc<InstrumentedSignablePythContract>,
  519. gas_limit: U256,
  520. chain_state: api::BlockchainState,
  521. metrics: Arc<KeeperMetrics>,
  522. fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
  523. ) {
  524. let BlockRange {
  525. from: first_block,
  526. to: last_block,
  527. } = block_range;
  528. let mut current_block = first_block;
  529. while current_block <= last_block {
  530. let mut to_block = current_block + BLOCK_BATCH_SIZE;
  531. if to_block > last_block {
  532. to_block = last_block;
  533. }
  534. // TODO: this is handling all blocks sequentially we might want to handle them in parallel in future.
  535. process_single_block_batch(
  536. BlockRange {
  537. from: current_block,
  538. to: to_block,
  539. },
  540. contract.clone(),
  541. gas_limit,
  542. chain_state.clone(),
  543. metrics.clone(),
  544. fulfilled_requests_cache.clone(),
  545. )
  546. .in_current_span()
  547. .await;
  548. current_block = to_block + 1;
  549. }
  550. }
  551. /// Process a batch of blocks for a chain. It will fetch events for all the blocks in a single call for the provided batch
  552. /// and then try to process them one by one. It checks the `fulfilled_request_cache`. If the request was already fulfilled.
  553. /// It won't reprocess it. If the request was already processed, it will reprocess it.
  554. /// If the process fails, it will retry indefinitely.
  555. #[tracing::instrument(name = "batch", skip_all, fields(
  556. batch_from_block = block_range.from, batch_to_block = block_range.to
  557. ))]
  558. pub async fn process_single_block_batch(
  559. block_range: BlockRange,
  560. contract: Arc<InstrumentedSignablePythContract>,
  561. gas_limit: U256,
  562. chain_state: api::BlockchainState,
  563. metrics: Arc<KeeperMetrics>,
  564. fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
  565. ) {
  566. loop {
  567. let events_res = chain_state
  568. .contract
  569. .get_request_with_callback_events(block_range.from, block_range.to)
  570. .await;
  571. match events_res {
  572. Ok(events) => {
  573. tracing::info!(num_of_events = &events.len(), "Processing",);
  574. for event in &events {
  575. // the write lock guarantees we spawn only one task per sequence number
  576. let newly_inserted = fulfilled_requests_cache
  577. .write()
  578. .await
  579. .insert(event.sequence_number);
  580. if newly_inserted {
  581. spawn(
  582. process_event_with_backoff(
  583. event.clone(),
  584. chain_state.clone(),
  585. contract.clone(),
  586. gas_limit,
  587. metrics.clone(),
  588. )
  589. .in_current_span(),
  590. );
  591. }
  592. }
  593. tracing::info!(num_of_events = &events.len(), "Processed",);
  594. break;
  595. }
  596. Err(e) => {
  597. tracing::error!(
  598. "Error while getting events. Waiting for {} seconds before retry. error: {:?}",
  599. RETRY_INTERVAL.as_secs(),
  600. e
  601. );
  602. time::sleep(RETRY_INTERVAL).await;
  603. }
  604. }
  605. }
  606. }
  607. /// Wrapper for the `watch_blocks` method. If there was an error while watching, it will retry after a delay.
  608. /// It retries indefinitely.
  609. #[tracing::instrument(name = "watch_blocks", skip_all, fields(
  610. initial_safe_block = latest_safe_block
  611. ))]
  612. pub async fn watch_blocks_wrapper(
  613. chain_state: BlockchainState,
  614. latest_safe_block: BlockNumber,
  615. tx: mpsc::Sender<BlockRange>,
  616. geth_rpc_wss: Option<String>,
  617. ) {
  618. let mut last_safe_block_processed = latest_safe_block;
  619. loop {
  620. if let Err(e) = watch_blocks(
  621. chain_state.clone(),
  622. &mut last_safe_block_processed,
  623. tx.clone(),
  624. geth_rpc_wss.clone(),
  625. )
  626. .in_current_span()
  627. .await
  628. {
  629. tracing::error!("watching blocks. error: {:?}", e);
  630. time::sleep(RETRY_INTERVAL).await;
  631. }
  632. }
  633. }
  634. /// Watch for new blocks and send the range of blocks for which events have not been handled to the `tx` channel.
  635. /// We are subscribing to new blocks instead of events. If we miss some blocks, it will be fine as we are sending
  636. /// block ranges to the `tx` channel. If we have subscribed to events, we could have missed those and won't even
  637. /// know about it.
  638. pub async fn watch_blocks(
  639. chain_state: BlockchainState,
  640. last_safe_block_processed: &mut BlockNumber,
  641. tx: mpsc::Sender<BlockRange>,
  642. geth_rpc_wss: Option<String>,
  643. ) -> Result<()> {
  644. tracing::info!("Watching blocks to handle new events");
  645. let provider_option = match geth_rpc_wss {
  646. Some(wss) => Some(match Provider::<Ws>::connect(wss.clone()).await {
  647. Ok(provider) => provider,
  648. Err(e) => {
  649. tracing::error!("Error while connecting to wss: {}. error: {:?}", wss, e);
  650. return Err(e.into());
  651. }
  652. }),
  653. None => {
  654. tracing::info!("No wss provided");
  655. None
  656. }
  657. };
  658. let mut stream_option = match provider_option {
  659. Some(ref provider) => Some(match provider.subscribe_blocks().await {
  660. Ok(client) => client,
  661. Err(e) => {
  662. tracing::error!("Error while subscribing to blocks. error {:?}", e);
  663. return Err(e.into());
  664. }
  665. }),
  666. None => None,
  667. };
  668. loop {
  669. match stream_option {
  670. Some(ref mut stream) => {
  671. if let None = stream.next().await {
  672. tracing::error!("Error blocks subscription stream ended");
  673. return Err(anyhow!("Error blocks subscription stream ended"));
  674. }
  675. }
  676. None => {
  677. time::sleep(POLL_INTERVAL).await;
  678. }
  679. }
  680. let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
  681. if latest_safe_block > *last_safe_block_processed {
  682. let mut from = latest_safe_block
  683. .checked_sub(RETRY_PREVIOUS_BLOCKS)
  684. .unwrap_or(0);
  685. // In normal situation, the difference between latest and last safe block should not be more than 2-3 (for arbitrum it can be 10)
  686. // TODO: add a metric for this in separate PR. We need alerts
  687. // But in extreme situation, where we were unable to send the block range multiple times, the difference between latest_safe_block and
  688. // last_safe_block_processed can grow. It is fine to not have the retry mechanisms for those earliest blocks as we expect the rpc
  689. // to be in consistency after this much time.
  690. if from > *last_safe_block_processed {
  691. from = *last_safe_block_processed;
  692. }
  693. match tx
  694. .send(BlockRange {
  695. from,
  696. to: latest_safe_block,
  697. })
  698. .await
  699. {
  700. Ok(_) => {
  701. tracing::info!(
  702. from_block = from,
  703. to_block = &latest_safe_block,
  704. "Block range sent to handle events",
  705. );
  706. *last_safe_block_processed = latest_safe_block;
  707. }
  708. Err(e) => {
  709. tracing::error!(
  710. from_block = from,
  711. to_block = &latest_safe_block,
  712. "Error while sending block range to handle events. These will be handled in next call. error: {:?}",
  713. e
  714. );
  715. }
  716. };
  717. }
  718. }
  719. }
  720. /// It waits on rx channel to receive block ranges and then calls process_block_range to process them.
  721. #[tracing::instrument(skip_all)]
  722. pub async fn process_new_blocks(
  723. chain_state: BlockchainState,
  724. mut rx: mpsc::Receiver<BlockRange>,
  725. contract: Arc<InstrumentedSignablePythContract>,
  726. gas_limit: U256,
  727. metrics: Arc<KeeperMetrics>,
  728. fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
  729. ) {
  730. tracing::info!("Waiting for new block ranges to process");
  731. loop {
  732. if let Some(block_range) = rx.recv().await {
  733. process_block_range(
  734. block_range,
  735. Arc::clone(&contract),
  736. gas_limit,
  737. chain_state.clone(),
  738. metrics.clone(),
  739. fulfilled_requests_cache.clone(),
  740. )
  741. .in_current_span()
  742. .await;
  743. }
  744. }
  745. }
  746. /// Processes the backlog_range for a chain.
  747. #[tracing::instrument(skip_all)]
  748. pub async fn process_backlog(
  749. backlog_range: BlockRange,
  750. contract: Arc<InstrumentedSignablePythContract>,
  751. gas_limit: U256,
  752. chain_state: BlockchainState,
  753. metrics: Arc<KeeperMetrics>,
  754. fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
  755. ) {
  756. tracing::info!("Processing backlog");
  757. process_block_range(
  758. backlog_range,
  759. contract,
  760. gas_limit,
  761. chain_state,
  762. metrics,
  763. fulfilled_requests_cache,
  764. )
  765. .in_current_span()
  766. .await;
  767. tracing::info!("Backlog processed");
  768. }
  769. /// tracks the balance of the given address on the given chain
  770. /// if there was an error, the function will just return
  771. #[tracing::instrument(skip_all)]
  772. pub async fn track_balance(
  773. chain_id: String,
  774. provider: Arc<Provider<TracedClient>>,
  775. address: Address,
  776. metrics: Arc<KeeperMetrics>,
  777. ) {
  778. let balance = match provider.get_balance(address, None).await {
  779. // This conversion to u128 is fine as the total balance will never cross the limits
  780. // of u128 practically.
  781. Ok(r) => r.as_u128(),
  782. Err(e) => {
  783. tracing::error!("Error while getting balance. error: {:?}", e);
  784. return;
  785. }
  786. };
  787. // The f64 conversion is made to be able to serve metrics within the constraints of Prometheus.
  788. // The balance is in wei, so we need to divide by 1e18 to convert it to eth.
  789. let balance = balance as f64 / 1e18;
  790. metrics
  791. .balance
  792. .get_or_create(&AccountLabel {
  793. chain_id: chain_id.clone(),
  794. address: address.to_string(),
  795. })
  796. .set(balance);
  797. }
  798. /// tracks the collected fees and the hashchain data of the given provider address on the given chain
  799. /// if there is a error the function will just return
  800. #[tracing::instrument(skip_all)]
  801. pub async fn track_provider(
  802. chain_id: ChainId,
  803. contract: InstrumentedPythContract,
  804. provider_address: Address,
  805. metrics: Arc<KeeperMetrics>,
  806. ) {
  807. let provider_info = match contract.get_provider_info(provider_address).call().await {
  808. Ok(info) => info,
  809. Err(e) => {
  810. tracing::error!("Error while getting provider info. error: {:?}", e);
  811. return;
  812. }
  813. };
  814. // The f64 conversion is made to be able to serve metrics with the constraints of Prometheus.
  815. // The fee is in wei, so we divide by 1e18 to convert it to eth.
  816. let collected_fee = provider_info.accrued_fees_in_wei as f64 / 1e18;
  817. let current_fee: f64 = provider_info.fee_in_wei as f64 / 1e18;
  818. let current_sequence_number = provider_info.sequence_number;
  819. let end_sequence_number = provider_info.end_sequence_number;
  820. metrics
  821. .collected_fee
  822. .get_or_create(&AccountLabel {
  823. chain_id: chain_id.clone(),
  824. address: provider_address.to_string(),
  825. })
  826. .set(collected_fee);
  827. metrics
  828. .current_fee
  829. .get_or_create(&AccountLabel {
  830. chain_id: chain_id.clone(),
  831. address: provider_address.to_string(),
  832. })
  833. .set(current_fee);
  834. metrics
  835. .current_sequence_number
  836. .get_or_create(&AccountLabel {
  837. chain_id: chain_id.clone(),
  838. address: provider_address.to_string(),
  839. })
  840. // sequence_number type on chain is u64 but practically it will take
  841. // a long time for it to cross the limits of i64.
  842. // currently prometheus only supports i64 for Gauge types
  843. .set(current_sequence_number as i64);
  844. metrics
  845. .end_sequence_number
  846. .get_or_create(&AccountLabel {
  847. chain_id: chain_id.clone(),
  848. address: provider_address.to_string(),
  849. })
  850. .set(end_sequence_number as i64);
  851. }
  852. #[tracing::instrument(name = "withdraw_fees", skip_all, fields())]
  853. pub async fn withdraw_fees_wrapper(
  854. contract: Arc<InstrumentedSignablePythContract>,
  855. provider_address: Address,
  856. poll_interval: Duration,
  857. min_balance: U256,
  858. ) {
  859. loop {
  860. if let Err(e) = withdraw_fees_if_necessary(contract.clone(), provider_address, min_balance)
  861. .in_current_span()
  862. .await
  863. {
  864. tracing::error!("Withdrawing fees. error: {:?}", e);
  865. }
  866. time::sleep(poll_interval).await;
  867. }
  868. }
  869. /// Withdraws accumulated fees in the contract as needed to maintain the balance of the keeper wallet.
  870. pub async fn withdraw_fees_if_necessary(
  871. contract: Arc<InstrumentedSignablePythContract>,
  872. provider_address: Address,
  873. min_balance: U256,
  874. ) -> Result<()> {
  875. let provider = contract.provider();
  876. let wallet = contract.wallet();
  877. let keeper_balance = provider
  878. .get_balance(wallet.address(), None)
  879. .await
  880. .map_err(|e| anyhow!("Error while getting balance. error: {:?}", e))?;
  881. let provider_info = contract
  882. .get_provider_info(provider_address)
  883. .call()
  884. .await
  885. .map_err(|e| anyhow!("Error while getting provider info. error: {:?}", e))?;
  886. if provider_info.fee_manager != wallet.address() {
  887. return Err(anyhow!("Fee manager for provider {:?} is not the keeper wallet. Fee manager: {:?} Keeper: {:?}", provider, provider_info.fee_manager, wallet.address()));
  888. }
  889. let fees = provider_info.accrued_fees_in_wei;
  890. if keeper_balance < min_balance && U256::from(fees) > min_balance {
  891. tracing::info!("Claiming accrued fees...");
  892. let contract_call = contract.withdraw_as_fee_manager(provider_address, fees);
  893. send_and_confirm(contract_call).await?;
  894. } else if keeper_balance < min_balance {
  895. tracing::warn!("Keeper balance {:?} is too low (< {:?}) but provider fees are not sufficient to top-up.", keeper_balance, min_balance)
  896. }
  897. Ok(())
  898. }
  899. pub async fn send_and_confirm(contract_call: PythContractCall) -> Result<()> {
  900. let call_name = contract_call.function.name.as_str();
  901. let pending_tx = contract_call
  902. .send()
  903. .await
  904. .map_err(|e| anyhow!("Error submitting transaction({}) {:?}", call_name, e))?;
  905. let tx_result = pending_tx
  906. .await
  907. .map_err(|e| {
  908. anyhow!(
  909. "Error waiting for transaction({}) receipt: {:?}",
  910. call_name,
  911. e
  912. )
  913. })?
  914. .ok_or_else(|| {
  915. anyhow!(
  916. "Can't verify the transaction({}), probably dropped from mempool",
  917. call_name
  918. )
  919. })?;
  920. tracing::info!(
  921. transaction_hash = &tx_result.transaction_hash.to_string(),
  922. "Confirmed transaction({}). Receipt: {:?}",
  923. call_name,
  924. tx_result,
  925. );
  926. Ok(())
  927. }
  928. #[tracing::instrument(name = "adjust_fee", skip_all)]
  929. pub async fn adjust_fee_wrapper(
  930. contract: Arc<InstrumentedSignablePythContract>,
  931. provider_address: Address,
  932. poll_interval: Duration,
  933. legacy_tx: bool,
  934. gas_limit: u64,
  935. min_profit_pct: u64,
  936. target_profit_pct: u64,
  937. max_profit_pct: u64,
  938. min_fee_wei: u128,
  939. ) {
  940. // The maximum balance of accrued fees + provider wallet balance. None if we haven't observed a value yet.
  941. let mut high_water_pnl: Option<U256> = None;
  942. // The sequence number where the keeper last updated the on-chain fee. None if we haven't observed it yet.
  943. let mut sequence_number_of_last_fee_update: Option<u64> = None;
  944. loop {
  945. if let Err(e) = adjust_fee_if_necessary(
  946. contract.clone(),
  947. provider_address,
  948. legacy_tx,
  949. gas_limit,
  950. min_profit_pct,
  951. target_profit_pct,
  952. max_profit_pct,
  953. min_fee_wei,
  954. &mut high_water_pnl,
  955. &mut sequence_number_of_last_fee_update,
  956. )
  957. .in_current_span()
  958. .await
  959. {
  960. tracing::error!("Withdrawing fees. error: {:?}", e);
  961. }
  962. time::sleep(poll_interval).await;
  963. }
  964. }
  965. #[tracing::instrument(name = "update_commitments", skip_all)]
  966. pub async fn update_commitments_loop(
  967. contract: Arc<InstrumentedSignablePythContract>,
  968. chain_state: BlockchainState,
  969. ) {
  970. loop {
  971. if let Err(e) = update_commitments_if_necessary(contract.clone(), &chain_state)
  972. .in_current_span()
  973. .await
  974. {
  975. tracing::error!("Update commitments. error: {:?}", e);
  976. }
  977. time::sleep(UPDATE_COMMITMENTS_INTERVAL).await;
  978. }
  979. }
  980. pub async fn update_commitments_if_necessary(
  981. contract: Arc<InstrumentedSignablePythContract>,
  982. chain_state: &BlockchainState,
  983. ) -> Result<()> {
  984. //TODO: we can reuse the result from the last call from the watch_blocks thread to reduce RPCs
  985. let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
  986. let provider_address = chain_state.provider_address;
  987. let provider_info = contract
  988. .get_provider_info(provider_address)
  989. .block(latest_safe_block) // To ensure we are not revealing sooner than we should
  990. .call()
  991. .await
  992. .map_err(|e| anyhow!("Error while getting provider info. error: {:?}", e))?;
  993. if provider_info.max_num_hashes == 0 {
  994. return Ok(());
  995. }
  996. let threshold =
  997. ((provider_info.max_num_hashes as f64) * UPDATE_COMMITMENTS_THRESHOLD_FACTOR) as u64;
  998. if provider_info.sequence_number - provider_info.current_commitment_sequence_number > threshold
  999. {
  1000. let seq_number = provider_info.sequence_number - 1;
  1001. let provider_revelation = chain_state
  1002. .state
  1003. .reveal(seq_number)
  1004. .map_err(|e| anyhow!("Error revealing: {:?}", e))?;
  1005. let contract_call =
  1006. contract.advance_provider_commitment(provider_address, seq_number, provider_revelation);
  1007. send_and_confirm(contract_call).await?;
  1008. }
  1009. Ok(())
  1010. }
  1011. /// Adjust the fee charged by the provider to ensure that it is profitable at the prevailing gas price.
  1012. /// This method targets a fee as a function of the maximum cost of the callback,
  1013. /// c = (gas_limit) * (current gas price), with min_fee_wei as a lower bound on the fee.
  1014. ///
  1015. /// The method then updates the on-chain fee if all of the following are satisfied:
  1016. /// - the on-chain fee does not fall into an interval [c*min_profit, c*max_profit]. The tolerance
  1017. /// factor prevents the on-chain fee from changing with every single gas price fluctuation.
  1018. /// Profit scalars are specified in percentage units, min_profit = (min_profit_pct + 100) / 100
  1019. /// - either the fee is increasing or the keeper is earning a profit -- i.e., fees only decrease when the keeper is profitable
  1020. /// - at least one random number has been requested since the last fee update
  1021. ///
  1022. /// These conditions are intended to make sure that the keeper is profitable while also minimizing the number of fee
  1023. /// update transactions.
  1024. pub async fn adjust_fee_if_necessary(
  1025. contract: Arc<InstrumentedSignablePythContract>,
  1026. provider_address: Address,
  1027. legacy_tx: bool,
  1028. gas_limit: u64,
  1029. min_profit_pct: u64,
  1030. target_profit_pct: u64,
  1031. max_profit_pct: u64,
  1032. min_fee_wei: u128,
  1033. high_water_pnl: &mut Option<U256>,
  1034. sequence_number_of_last_fee_update: &mut Option<u64>,
  1035. ) -> Result<()> {
  1036. let provider_info = contract
  1037. .get_provider_info(provider_address)
  1038. .call()
  1039. .await
  1040. .map_err(|e| anyhow!("Error while getting provider info. error: {:?}", e))?;
  1041. if provider_info.fee_manager != contract.wallet().address() {
  1042. return Err(anyhow!("Fee manager for provider {:?} is not the keeper wallet. Fee manager: {:?} Keeper: {:?}", contract.provider(), provider_info.fee_manager, contract.wallet().address()));
  1043. }
  1044. // Calculate target window for the on-chain fee.
  1045. let max_callback_cost: u128 = estimate_tx_cost(contract.clone(), legacy_tx, gas_limit.into())
  1046. .await
  1047. .map_err(|e| anyhow!("Could not estimate transaction cost. error {:?}", e))?;
  1048. let target_fee_min = std::cmp::max(
  1049. (max_callback_cost * (100 + u128::from(min_profit_pct))) / 100,
  1050. min_fee_wei,
  1051. );
  1052. let target_fee = std::cmp::max(
  1053. (max_callback_cost * (100 + u128::from(target_profit_pct))) / 100,
  1054. min_fee_wei,
  1055. );
  1056. let target_fee_max = std::cmp::max(
  1057. (max_callback_cost * (100 + u128::from(max_profit_pct))) / 100,
  1058. min_fee_wei,
  1059. );
  1060. // Calculate current P&L to determine if we can reduce fees.
  1061. let current_keeper_balance = contract
  1062. .provider()
  1063. .get_balance(contract.wallet().address(), None)
  1064. .await
  1065. .map_err(|e| anyhow!("Error while getting balance. error: {:?}", e))?;
  1066. let current_keeper_fees = U256::from(provider_info.accrued_fees_in_wei);
  1067. let current_pnl = current_keeper_balance + current_keeper_fees;
  1068. let can_reduce_fees = match high_water_pnl {
  1069. Some(x) => current_pnl >= *x,
  1070. None => false,
  1071. };
  1072. // Determine if the chain has seen activity since the last fee update.
  1073. let is_chain_active: bool = match sequence_number_of_last_fee_update {
  1074. Some(n) => provider_info.sequence_number > *n,
  1075. None => {
  1076. // We don't want to adjust the fees on server start for unused chains, hence false here.
  1077. false
  1078. }
  1079. };
  1080. let provider_fee: u128 = provider_info.fee_in_wei;
  1081. if is_chain_active
  1082. && ((provider_fee > target_fee_max && can_reduce_fees) || provider_fee < target_fee_min)
  1083. {
  1084. tracing::info!(
  1085. "Adjusting fees. Current: {:?} Target: {:?}",
  1086. provider_fee,
  1087. target_fee
  1088. );
  1089. let contract_call = contract.set_provider_fee_as_fee_manager(provider_address, target_fee);
  1090. send_and_confirm(contract_call).await?;
  1091. *sequence_number_of_last_fee_update = Some(provider_info.sequence_number);
  1092. } else {
  1093. tracing::info!(
  1094. "Skipping fee adjustment. Current: {:?} Target: {:?} [{:?}, {:?}] Current Sequence Number: {:?} Last updated sequence number {:?} Current pnl: {:?} High water pnl: {:?}",
  1095. provider_fee,
  1096. target_fee,
  1097. target_fee_min,
  1098. target_fee_max,
  1099. provider_info.sequence_number,
  1100. sequence_number_of_last_fee_update,
  1101. current_pnl,
  1102. high_water_pnl
  1103. )
  1104. }
  1105. // Update high water pnl
  1106. *high_water_pnl = Some(std::cmp::max(
  1107. current_pnl,
  1108. high_water_pnl.unwrap_or(U256::from(0)),
  1109. ));
  1110. // Update sequence number on server start.
  1111. match sequence_number_of_last_fee_update {
  1112. Some(_) => (),
  1113. None => {
  1114. *sequence_number_of_last_fee_update = Some(provider_info.sequence_number);
  1115. }
  1116. };
  1117. Ok(())
  1118. }
  1119. /// Estimate the cost (in wei) of a transaction consuming gas_used gas.
  1120. pub async fn estimate_tx_cost(
  1121. contract: Arc<InstrumentedSignablePythContract>,
  1122. use_legacy_tx: bool,
  1123. gas_used: u128,
  1124. ) -> Result<u128> {
  1125. let middleware = contract.client();
  1126. let gas_price: u128 = if use_legacy_tx {
  1127. middleware
  1128. .get_gas_price()
  1129. .await
  1130. .map_err(|e| anyhow!("Failed to fetch gas price. error: {:?}", e))?
  1131. .try_into()
  1132. .map_err(|e| anyhow!("gas price doesn't fit into 128 bits. error: {:?}", e))?
  1133. } else {
  1134. let (max_fee_per_gas, max_priority_fee_per_gas) = middleware
  1135. .estimate_eip1559_fees(Some(eip1559_default_estimator))
  1136. .await?;
  1137. (max_fee_per_gas + max_priority_fee_per_gas)
  1138. .try_into()
  1139. .map_err(|e| anyhow!("gas price doesn't fit into 128 bits. error: {:?}", e))?
  1140. };
  1141. Ok(gas_price * gas_used)
  1142. }