Jayant Krishnamurthy 8 tháng trước cách đây
mục cha
commit
5f24838e20

+ 1 - 1
apps/fortuna/src/chain/ethereum.rs

@@ -200,7 +200,7 @@ impl InstrumentedPythContract {
 }
 
 #[async_trait]
-impl<T: JsonRpcClient + 'static> EntropyReader for PythRandom<Provider<T>> {
+impl<T: Middleware + 'static> EntropyReader for PythRandom<T> {
     async fn get_request(
         &self,
         provider_address: Address,

+ 1 - 1
apps/fortuna/src/chain/reader.rs

@@ -29,7 +29,7 @@ impl From<BlockStatus> for EthersBlockNumber {
     }
 }
 
-#[derive(Clone)]
+#[derive(Clone, Debug, Hash, Eq, PartialEq)]
 pub struct RequestedWithCallbackEvent {
     pub sequence_number: u64,
     pub user_random_number: [u8; 32],

+ 14 - 11
apps/fortuna/src/keeper.rs

@@ -2,6 +2,7 @@ use {
     crate::{
         api::{BlockchainState, ChainId},
         chain::ethereum::{InstrumentedPythContract, InstrumentedSignablePythContract},
+        chain::reader::RequestedWithCallbackEvent,
         config::EthereumConfig,
         eth_utils::traced_client::RpcMetrics,
         keeper::block::{
@@ -11,6 +12,7 @@ use {
         keeper::commitment::update_commitments_loop,
         keeper::fee::adjust_fee_wrapper,
         keeper::fee::withdraw_fees_wrapper,
+        keeper::keeper_state::KeeperState,
         keeper::track::track_accrued_pyth_fees,
         keeper::track::track_balance,
         keeper::track::track_provider,
@@ -77,7 +79,16 @@ pub async fn run_keeper_threads(
     );
     let keeper_address = contract.wallet().address();
 
-    let fulfilled_requests_cache = Arc::new(RwLock::new(HashSet::<u64>::new()));
+    let keeper_state = Arc::new(KeeperState {
+        contract: contract.clone(),
+        gas_limit: chain_eth_config.gas_limit.into(),
+        chain_state: chain_state.clone(),
+        metrics: metrics.clone(),
+        escalation_policy: chain_eth_config.escalation_policy.to_policy(),
+    });
+
+    let fulfilled_requests_cache =
+        Arc::new(RwLock::new(HashSet::<RequestedWithCallbackEvent>::new()));
 
     // Spawn a thread to handle the events from last backlog_range blocks.
     let gas_limit: U256 = chain_eth_config.gas_limit.into();
@@ -87,11 +98,7 @@ pub async fn run_keeper_threads(
                 from: latest_safe_block.saturating_sub(chain_eth_config.backlog_range),
                 to: latest_safe_block,
             },
-            contract.clone(),
-            gas_limit,
-            chain_eth_config.escalation_policy.to_policy(),
-            chain_state.clone(),
-            metrics.clone(),
+            keeper_state.clone(),
             fulfilled_requests_cache.clone(),
             chain_eth_config.block_delays.clone(),
         )
@@ -113,12 +120,8 @@ pub async fn run_keeper_threads(
     // Spawn a thread for block processing with configured delays
     spawn(
         process_new_blocks(
-            chain_state.clone(),
             rx,
-            Arc::clone(&contract),
-            gas_limit,
-            chain_eth_config.escalation_policy.to_policy(),
-            metrics.clone(),
+            keeper_state.clone(),
             fulfilled_requests_cache.clone(),
             chain_eth_config.block_delays.clone(),
         )

+ 14 - 19
apps/fortuna/src/keeper/block.rs

@@ -67,8 +67,7 @@ pub async fn get_latest_safe_block(chain_state: &BlockchainState) -> BlockNumber
 pub async fn process_block_range<H: EventHandler>(
     block_range: BlockRange,
     state: Arc<H>,
-    escalation_policy: EscalationPolicy,
-    fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
+    fulfilled_requests_cache: Arc<RwLock<HashSet<H::Event>>>,
 ) {
     let BlockRange {
         from: first_block,
@@ -88,7 +87,6 @@ pub async fn process_block_range<H: EventHandler>(
                 to: to_block,
             },
             state.clone(),
-            escalation_policy.clone(),
             fulfilled_requests_cache.clone(),
         )
         .in_current_span()
@@ -108,23 +106,26 @@ pub async fn process_block_range<H: EventHandler>(
 pub async fn process_single_block_batch<H: EventHandler>(
     block_range: BlockRange,
     state: Arc<H>,
-    escalation_policy: EscalationPolicy,
-    fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
+    fulfilled_requests_cache: Arc<RwLock<HashSet<H::Event>>>,
 ) {
     loop {
-        let events_res = state.get_events(block_range).await;
+        let events_res = state.get_events(&block_range).await;
 
         match events_res {
             Ok(events) => {
                 tracing::info!(num_of_events = &events.len(), "Processing",);
                 for event in &events {
+                    // TODO: should we be clearing these out?
                     // the write lock guarantees we spawn only one task per sequence number
-                    let newly_inserted = fulfilled_requests_cache
-                        .write()
-                        .await
-                        .insert(event.sequence_number);
+                    let newly_inserted =
+                        fulfilled_requests_cache.write().await.insert(event.clone());
+
                     if newly_inserted {
-                        spawn(state.process_event(event.clone()).in_current_span());
+                        let s = state.clone();
+                        let e = event.clone();
+                        spawn(async move {
+                            s.process_event(e).in_current_span().await;
+                        });
                     }
                 }
                 tracing::info!(num_of_events = &events.len(), "Processed",);
@@ -267,8 +268,7 @@ pub async fn watch_blocks(
 pub async fn process_new_blocks<H: EventHandler>(
     mut rx: mpsc::Receiver<BlockRange>,
     state: Arc<H>,
-    escalation_policy: EscalationPolicy,
-    fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
+    fulfilled_requests_cache: Arc<RwLock<HashSet<H::Event>>>,
     block_delays: Vec<u64>,
 ) {
     tracing::info!("Waiting for new block ranges to process");
@@ -278,7 +278,6 @@ pub async fn process_new_blocks<H: EventHandler>(
             process_block_range(
                 block_range.clone(),
                 state.clone(),
-                escalation_policy.clone(),
                 fulfilled_requests_cache.clone(),
             )
             .in_current_span()
@@ -293,7 +292,6 @@ pub async fn process_new_blocks<H: EventHandler>(
                 process_block_range(
                     adjusted_range,
                     state.clone(),
-                    escalation_policy.clone(),
                     fulfilled_requests_cache.clone(),
                 )
                 .in_current_span()
@@ -310,8 +308,7 @@ pub async fn process_new_blocks<H: EventHandler>(
 pub async fn process_backlog<H: EventHandler>(
     backlog_range: BlockRange,
     state: Arc<H>,
-    escalation_policy: EscalationPolicy,
-    fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,
+    fulfilled_requests_cache: Arc<RwLock<HashSet<H::Event>>>,
     block_delays: Vec<u64>,
 ) {
     tracing::info!("Processing backlog");
@@ -319,7 +316,6 @@ pub async fn process_backlog<H: EventHandler>(
     process_block_range(
         backlog_range.clone(),
         state.clone(),
-        escalation_policy.clone(),
         fulfilled_requests_cache.clone(),
     )
     .in_current_span()
@@ -334,7 +330,6 @@ pub async fn process_backlog<H: EventHandler>(
         process_block_range(
             adjusted_range,
             state.clone(),
-            escalation_policy.clone(),
             fulfilled_requests_cache.clone(),
         )
         .in_current_span()