소스 검색

update keeper to consume chain writer

0xfirefist 1 년 전
부모
커밋
4414b1be1a
2개의 변경된 파일211개의 추가작업 그리고 321개의 파일을 삭제
  1. 4 0
      apps/fortuna/src/command/run.rs
  2. 207 321
      apps/fortuna/src/keeper.rs

+ 4 - 0
apps/fortuna/src/command/run.rs

@@ -136,7 +136,11 @@ pub async fn run_keeper(
             .expect("All chains should be present in the config file")
             .clone();
         let private_key = private_key.clone();
+        let chain_writer = chain_eth_config
+            .get_writer(chain_config.provider_address, &private_key)
+            .await?;
         handles.push(spawn(keeper::run_keeper_threads(
+            chain_writer,
             private_key,
             chain_eth_config,
             chain_config.clone(),

+ 207 - 321
apps/fortuna/src/keeper.rs

@@ -1,27 +1,25 @@
 use {
     crate::{
-        api::{
-            self,
-            BlockchainState,
-        },
+        api::BlockchainState,
         chain::{
+            chain::{
+                ChainWriter,
+                RevealError,
+            },
             ethereum::{
                 PythContract,
                 SignablePythContract,
             },
-            reader::{
-                BlockNumber,
-                RequestedWithCallbackEvent,
-            },
+            reader::BlockNumber,
         },
         config::EthereumConfig,
+        state::HashChainState,
     },
     anyhow::{
         anyhow,
         Result,
     },
     ethers::{
-        contract::ContractError,
         providers::{
             Http,
             Middleware,
@@ -29,10 +27,7 @@ use {
             Ws,
         },
         signers::Signer,
-        types::{
-            Address,
-            U256,
-        },
+        types::Address,
     },
     futures::StreamExt,
     prometheus_client::{
@@ -177,22 +172,90 @@ pub enum RequestState {
     /// We have already processed the request but couldn't fulfill it and we are
     /// unsure if we can fulfill it or not.
     Processed,
+    /// A thread is already processing this request
+    Processing,
+}
+
+/// Thread safe cache to store the RequestState for each sequence number.
+/// The user must first call `process` to know if they should process the request or not.
+pub struct RequestCache {
+    cache: RwLock<HashMap<u64, RequestState>>,
 }
+impl RequestCache {
+    /// If a request state is already processing or fulfilled it will return false,
+    /// otherwise if request state is processed or not available it will return true.
+    pub async fn process(&self, sequence_number: u64) -> bool {
+        let read_cache = self.cache.read().await;
+        let state = read_cache.get(&sequence_number);
+
+        if let Some(state) = state {
+            if *state == RequestState::Fulfilled || *state == RequestState::Processing {
+                return false;
+            }
+        }
+        drop(read_cache);
+
+        let mut write_cache = self.cache.write().await;
+        let state = write_cache.get(&sequence_number);
+        match state {
+            Some(state) => match state {
+                RequestState::Processed => {
+                    write_cache.insert(sequence_number, RequestState::Processing);
+                    true
+                }
+                _ => false,
+            },
+            None => {
+                write_cache.insert(sequence_number, RequestState::Processing);
+                true
+            }
+        }
+    }
+
+    /// It will set the request state for the given sequence number to processed.
+    /// Note: It will panic if the request state is already fulfilled or not available.
+    pub async fn processed(&self, sequence_number: u64) {
+        let mut write_cache = self.cache.write().await;
+        let state = write_cache
+            .get(&sequence_number)
+            .expect("processed should be called only when there was a previous state");
+        match *state {
+            RequestState::Fulfilled => {
+                panic!("processed should not be called for a request that was already fulfilled")
+            }
+            RequestState::Processing => {
+                let _ = write_cache.insert(sequence_number, RequestState::Processed);
+            }
+            _ => {}
+        }
+    }
+
+    /// It will set the request state for the given sequence number to fulfilled.
+    /// Note: It will panic if the request state is not available.
+    pub async fn fulfilled(&self, sequence_number: u64) {
+        let mut write_cache = self.cache.write().await;
+        let state = write_cache
+            .get(&sequence_number)
+            .expect("fulfilled should be called only when there was a previous state");
+        match *state {
+            RequestState::Processed => {
+                let _ = write_cache.insert(sequence_number, RequestState::Fulfilled);
+            }
+            RequestState::Processing => {
+                let _ = write_cache.insert(sequence_number, RequestState::Fulfilled);
+            }
+            _ => {}
+        }
+    }
+}
+
 
 /// Get the latest safe block number for the chain. Retry internally if there is an error.
-async fn get_latest_safe_block(chain_state: &BlockchainState) -> BlockNumber {
+async fn get_latest_safe_block(chain_writer: Arc<dyn ChainWriter>) -> BlockNumber {
     loop {
-        match chain_state
-            .contract
-            .get_block_number(chain_state.confirmed_block_status)
-            .await
-        {
-            Ok(latest_confirmed_block) => {
-                tracing::info!(
-                    "Fetched latest safe block {}",
-                    latest_confirmed_block - chain_state.reveal_delay_blocks
-                );
-                return latest_confirmed_block - chain_state.reveal_delay_blocks;
+        match chain_writer.get_latest_safe_block().await {
+            Ok(n) => {
+                return n;
             }
             Err(e) => {
                 tracing::error!("Error while getting block number. error: {:?}", e);
@@ -206,6 +269,7 @@ async fn get_latest_safe_block(chain_state: &BlockchainState) -> BlockNumber {
 /// handle any events for the new blocks.
 #[tracing::instrument(name="keeper", skip_all, fields(chain_id=chain_state.id))]
 pub async fn run_keeper_threads(
+    chain_writer: Arc<dyn ChainWriter>,
     private_key: String,
     chain_eth_config: EthereumConfig,
     chain_state: BlockchainState,
@@ -215,7 +279,9 @@ pub async fn run_keeper_threads(
     let keeper_metrics = Arc::new(KeeperMetrics::new(metrics.clone()).await);
 
     tracing::info!("starting keeper");
-    let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
+    let latest_safe_block = get_latest_safe_block(chain_writer.clone())
+        .in_current_span()
+        .await;
     tracing::info!("latest safe block: {}", &latest_safe_block);
 
     let contract = Arc::new(
@@ -225,8 +291,9 @@ pub async fn run_keeper_threads(
     );
     let keeper_address = contract.client().inner().inner().signer().address();
 
-    let fulfilled_requests_cache = Arc::new(RwLock::new(HashMap::<u64, RequestState>::new()));
-
+    let request_cache = Arc::new(RequestCache {
+        cache: RwLock::new(HashMap::<u64, RequestState>::new()),
+    });
     // Spawn a thread to handle the events from last BACKLOG_RANGE blocks.
     spawn(
         process_backlog(
@@ -234,11 +301,10 @@ pub async fn run_keeper_threads(
                 from: latest_safe_block.saturating_sub(BACKLOG_RANGE),
                 to:   latest_safe_block,
             },
-            contract.clone(),
-            chain_eth_config.gas_limit,
-            chain_state.clone(),
+            chain_writer.clone(),
+            chain_state.state.clone(),
             keeper_metrics.clone(),
-            fulfilled_requests_cache.clone(),
+            request_cache.clone(),
         )
         .in_current_span(),
     );
@@ -247,7 +313,7 @@ pub async fn run_keeper_threads(
     // Spawn a thread to watch for new blocks and send the range of blocks for which events has not been handled to the `tx` channel.
     spawn(
         watch_blocks_wrapper(
-            chain_state.clone(),
+            chain_writer.clone(),
             latest_safe_block,
             tx,
             chain_eth_config.geth_rpc_wss.clone(),
@@ -255,14 +321,21 @@ pub async fn run_keeper_threads(
         .in_current_span(),
     );
     // Spawn a thread that listens for block ranges on the `rx` channel and processes the events for those blocks.
+
+    // chain_writer: Arc<dyn ChainWriter>,
+    // hash_chain: Arc<HashChainState>,
+    // mut rx: mpsc::Receiver<BlockRange>,
+    // metrics: Arc<KeeperMetrics>,
+    // request_cache: Arc<RequestCache>,
+
+
     spawn(
         process_new_blocks(
-            chain_state.clone(),
+            chain_writer.clone(),
+            chain_state.state.clone(),
             rx,
-            Arc::clone(&contract),
-            chain_eth_config.gas_limit,
             keeper_metrics.clone(),
-            fulfilled_requests_cache.clone(),
+            request_cache.clone(),
         )
         .in_current_span(),
     );
@@ -305,205 +378,14 @@ pub async fn run_keeper_threads(
     );
 }
 
-
-/// Process an event for a chain. It estimates the gas for the reveal with callback and
-/// submits the transaction if the gas estimate is below the gas limit.
-/// It will return an Error if the gas estimation failed with a provider error or if the
-/// reveal with callback failed with a provider error.
-pub async fn process_event(
-    event: RequestedWithCallbackEvent,
-    chain_config: &BlockchainState,
-    contract: &Arc<SignablePythContract>,
-    gas_limit: U256,
-    metrics: Arc<KeeperMetrics>,
-    fulfilled_requests_cache: Arc<RwLock<HashMap<u64, RequestState>>>,
-) -> Result<()> {
-    if chain_config.provider_address != event.provider_address {
-        fulfilled_requests_cache
-            .write()
-            .await
-            .insert(event.sequence_number, RequestState::Fulfilled);
-        return Ok(());
-    }
-    let provider_revelation = match chain_config.state.reveal(event.sequence_number) {
-        Ok(result) => result,
-        Err(e) => {
-            tracing::error!(
-                sequence_number = &event.sequence_number,
-                "Error while revealing with error: {:?}",
-                e
-            );
-            fulfilled_requests_cache
-                .write()
-                .await
-                .insert(event.sequence_number, RequestState::Fulfilled);
-            return Ok(());
-        }
-    };
-
-    let gas_estimate_res = chain_config
-        .contract
-        .estimate_reveal_with_callback_gas(
-            event.provider_address,
-            event.sequence_number,
-            event.user_random_number,
-            provider_revelation,
-        )
-        .in_current_span()
-        .await;
-
-    match gas_estimate_res {
-        Ok(gas_estimate_option) => match gas_estimate_option {
-            Some(gas_estimate) => {
-                // Pad the gas estimate by 33%
-                let (gas_estimate, _) = gas_estimate
-                    .saturating_mul(U256::from(4))
-                    .div_mod(U256::from(3));
-
-                if gas_estimate > gas_limit {
-                    tracing::error!(
-                        sequence_number = &event.sequence_number,
-                        "Gas estimate for reveal with callback is higher than the gas limit"
-                    );
-
-                    fulfilled_requests_cache
-                        .write()
-                        .await
-                        .insert(event.sequence_number, RequestState::Fulfilled);
-                    return Ok(());
-                }
-
-                let contract_call = contract
-                    .reveal_with_callback(
-                        event.provider_address,
-                        event.sequence_number,
-                        event.user_random_number,
-                        provider_revelation,
-                    )
-                    .gas(gas_estimate);
-
-                let res = contract_call.send().await;
-
-                let pending_tx = match res {
-                    Ok(pending_tx) => pending_tx,
-                    Err(e) => match e {
-                        // If there is a provider error, we weren't able to send the transaction.
-                        // We will return an error. So, that the caller can decide what to do (retry).
-                        ContractError::ProviderError { e } => return Err(e.into()),
-                        // For all the other errors, it is likely the case we won't be able to reveal for
-                        // ever. We will return an Ok(()) to signal that we have processed this reveal
-                        // and concluded that its Ok to not reveal.
-                        _ => {
-                            fulfilled_requests_cache
-                                .write()
-                                .await
-                                .insert(event.sequence_number, RequestState::Processed);
-                            tracing::error!(
-                                sequence_number = &event.sequence_number,
-                                "Error while revealing with error: {:?}",
-                                e
-                            );
-                            return Ok(());
-                        }
-                    },
-                };
-
-                match pending_tx.await {
-                    Ok(res) => match res {
-                        Some(res) => {
-                            tracing::info!(
-                                sequence_number = &event.sequence_number,
-                                transaction_hash = &res.transaction_hash.to_string(),
-                                gas_used = ?res.gas_used,
-                                "Revealed with res: {:?}",
-                                res
-                            );
-
-                            if let Some(gas_used) = res.gas_used {
-                                let gas_used = gas_used.as_u128() as f64 / 1e18;
-                                metrics
-                                    .total_gas_spent
-                                    .get_or_create(&AccountLabel {
-                                        chain_id: chain_config.id.clone(),
-                                        address:  contract
-                                            .client()
-                                            .inner()
-                                            .inner()
-                                            .signer()
-                                            .address()
-                                            .to_string(),
-                                    })
-                                    .inc_by(gas_used);
-                            }
-
-                            metrics
-                                .reveals
-                                .get_or_create(&AccountLabel {
-                                    chain_id: chain_config.id.clone(),
-                                    address:  chain_config.provider_address.to_string(),
-                                })
-                                .inc();
-
-                            fulfilled_requests_cache
-                                .write()
-                                .await
-                                .insert(event.sequence_number, RequestState::Fulfilled);
-                            Ok(())
-                        }
-                        None => {
-                            tracing::error!(
-                                sequence_number = &event.sequence_number,
-                                "Can't verify the reveal"
-                            );
-                            // It is better to return an error in this scenario
-                            // For the caller to retry
-                            Err(anyhow!("Can't verify the reveal"))
-                        }
-                    },
-
-                    Err(e) => {
-                        tracing::error!(
-                            sequence_number = &event.sequence_number,
-                            "Error while revealing with error: {:?}",
-                            e
-                        );
-                        Err(e.into())
-                    }
-                }
-            }
-            None => {
-                tracing::info!(
-                    sequence_number = &event.sequence_number,
-                    "Not fulfilling event"
-                );
-                fulfilled_requests_cache
-                    .write()
-                    .await
-                    .insert(event.sequence_number, RequestState::Processed);
-                Ok(())
-            }
-        },
-        Err(e) => {
-            tracing::error!(
-                sequence_number = &event.sequence_number,
-                "Error while simulating reveal with error: {:?}",
-                e
-            );
-            Err(e)
-        }
-    }
-}
-
-
 /// Process a range of blocks in batches. It calls the `process_single_block_batch` method for each batch.
 #[tracing::instrument(skip_all, fields(range_from_block=block_range.from, range_to_block=block_range.to))]
 pub async fn process_block_range(
     block_range: BlockRange,
-    contract: Arc<SignablePythContract>,
-    gas_limit: U256,
-    chain_state: api::BlockchainState,
-    metrics: Arc<KeeperMetrics>,
-    fulfilled_requests_cache: Arc<RwLock<HashMap<u64, RequestState>>>,
+    chain_writer: Arc<dyn ChainWriter>,
+    hash_chain: Arc<HashChainState>,
+    _metrics: Arc<KeeperMetrics>,
+    request_cache: Arc<RequestCache>,
 ) {
     let BlockRange {
         from: first_block,
@@ -522,11 +404,9 @@ pub async fn process_block_range(
                 from: current_block,
                 to:   to_block,
             },
-            contract.clone(),
-            gas_limit,
-            chain_state.clone(),
-            metrics.clone(),
-            fulfilled_requests_cache.clone(),
+            chain_writer.clone(),
+            hash_chain.clone(),
+            request_cache.clone(),
         )
         .in_current_span()
         .await;
@@ -542,85 +422,93 @@ pub async fn process_block_range(
 #[tracing::instrument(name="batch", skip_all, fields(batch_from_block=block_range.from, batch_to_block=block_range.to))]
 pub async fn process_single_block_batch(
     block_range: BlockRange,
-    contract: Arc<SignablePythContract>,
-    gas_limit: U256,
-    chain_state: api::BlockchainState,
-    metrics: Arc<KeeperMetrics>,
-    fulfilled_requests_cache: Arc<RwLock<HashMap<u64, RequestState>>>,
+    chain_writer: Arc<dyn ChainWriter>,
+    hash_chain: Arc<HashChainState>,
+    // metrics: Arc<KeeperMetrics>,
+    request_cache: Arc<RequestCache>,
 ) {
     loop {
-        let events_res = chain_state
-            .contract
-            .get_request_with_callback_events(block_range.from, block_range.to)
+        let events_res = chain_writer
+            .get_requests_with_callback_data(block_range.from, block_range.to)
             .await;
 
         match events_res {
             Ok(events) => {
                 tracing::info!(num_of_events = &events.len(), "Processing",);
                 for event in &events {
-                    if let Some(state) = fulfilled_requests_cache
-                        .read()
-                        .await
-                        .get(&event.sequence_number)
-                    {
-                        match state {
-                            RequestState::Fulfilled => {
-                                tracing::info!(
-                                    sequence_number = &event.sequence_number,
-                                    "Skipping already fulfilled request",
-                                );
-                                continue;
-                            }
-                            RequestState::Processed => {
+                    if false == request_cache.process(event.sequence_number).await {
+                        tracing::info!(
+                            sequence_number = &event.sequence_number,
+                            "Request is under process or already fullfilled. Skipping.",
+                        );
+                        continue;
+                    };
+
+                    tracing::info!(sequence_number = &event.sequence_number, "Processing event",);
+                    let provider_revelation = match hash_chain.reveal(event.sequence_number) {
+                        Ok(result) => result,
+                        Err(e) => {
+                            tracing::error!(
+                                sequence_number = &event.sequence_number,
+                                "Error getting provider revelation with error. Can't process: {:?}",
+                                e
+                            );
+                            continue;
+                        }
+                    };
+
+                    loop {
+                        match chain_writer
+                            .reveal_with_callback(event.clone(), provider_revelation)
+                            .await
+                        {
+                            Ok(_) => {
+                                request_cache.fulfilled(event.sequence_number).await;
                                 tracing::info!(
                                     sequence_number = &event.sequence_number,
-                                    "Reprocessing already processed request",
+                                    "Request fulfilled",
                                 );
-                                metrics
-                                    .requests_reprocessed
-                                    .get_or_create(&AccountLabel {
-                                        chain_id: chain_state.id.clone(),
-                                        address:  chain_state.provider_address.to_string(),
-                                    })
-                                    .inc();
+                                break;
                             }
+                            Err(e) => match e {
+                                RevealError::GasLimitExceeded => {
+                                    tracing::error!(
+                                        sequence_number = &event.sequence_number,
+                                        "Error gas limit exceeded Won't process"
+                                    );
+                                    request_cache.fulfilled(event.sequence_number).await;
+                                    break;
+                                }
+                                RevealError::ContractError(e) => {
+                                    tracing::error!(
+                                        sequence_number = &event.sequence_number,
+                                        "Error contract error. Won't reprocess: {:?}",
+                                        e
+                                    );
+                                    request_cache.processed(event.sequence_number).await;
+                                    break;
+                                }
+                                RevealError::RpcError(e) => {
+                                    tracing::error!(
+                                        sequence_number = &event.sequence_number,
+                                        "Error rpc error. Reprocessing: {:?}",
+                                        e
+                                    );
+                                    time::sleep(RETRY_INTERVAL).await;
+                                }
+                                RevealError::Unknown(e) => {
+                                    tracing::error!(
+                                        sequence_number = &event.sequence_number,
+                                        "Error unknown error. Won't reprocess: {:?}",
+                                        e
+                                    );
+                                    request_cache.processed(event.sequence_number).await;
+                                    break;
+                                }
+                            },
                         }
                     }
-                    metrics
-                        .requests
-                        .get_or_create(&AccountLabel {
-                            chain_id: chain_state.id.clone(),
-                            address:  chain_state.provider_address.to_string(),
-                        })
-                        .inc();
-                    tracing::info!(sequence_number = &event.sequence_number, "Processing event",);
-                    while let Err(e) = process_event(
-                        event.clone(),
-                        &chain_state,
-                        &contract,
-                        gas_limit,
-                        metrics.clone(),
-                        fulfilled_requests_cache.clone(),
-                    )
-                    .in_current_span()
-                    .await
-                    {
-                        tracing::error!(
-                            sequence_number = &event.sequence_number,
-                            "Error while processing event. Waiting for {} seconds before retry. error: {:?}",
-                            RETRY_INTERVAL.as_secs(),
-                            e
-                        );
-                        time::sleep(RETRY_INTERVAL).await;
-                    }
                     tracing::info!(sequence_number = &event.sequence_number, "Processed event",);
-                    metrics
-                        .requests_processed
-                        .get_or_create(&AccountLabel {
-                            chain_id: chain_state.id.clone(),
-                            address:  chain_state.provider_address.to_string(),
-                        })
-                        .inc();
                 }
                 tracing::info!(num_of_events = &events.len(), "Processed",);
                 break;
@@ -641,7 +529,7 @@ pub async fn process_single_block_batch(
 /// It retries indefinitely.
 #[tracing::instrument(name="watch_blocks", skip_all, fields(initial_safe_block=latest_safe_block))]
 pub async fn watch_blocks_wrapper(
-    chain_state: BlockchainState,
+    chain_writer: Arc<dyn ChainWriter>,
     latest_safe_block: BlockNumber,
     tx: mpsc::Sender<BlockRange>,
     geth_rpc_wss: Option<String>,
@@ -649,7 +537,7 @@ pub async fn watch_blocks_wrapper(
     let mut last_safe_block_processed = latest_safe_block;
     loop {
         if let Err(e) = watch_blocks(
-            chain_state.clone(),
+            chain_writer.clone(),
             &mut last_safe_block_processed,
             tx.clone(),
             geth_rpc_wss.clone(),
@@ -668,7 +556,7 @@ pub async fn watch_blocks_wrapper(
 /// block ranges to the `tx` channel. If we have subscribed to events, we could have missed those and won't even
 /// know about it.
 pub async fn watch_blocks(
-    chain_state: BlockchainState,
+    chain_writer: Arc<dyn ChainWriter>,
     last_safe_block_processed: &mut BlockNumber,
     tx: mpsc::Sender<BlockRange>,
     geth_rpc_wss: Option<String>,
@@ -713,7 +601,9 @@ pub async fn watch_blocks(
             }
         }
 
-        let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
+        let latest_safe_block = get_latest_safe_block(chain_writer.clone())
+            .in_current_span()
+            .await;
         if latest_safe_block > *last_safe_block_processed {
             let mut from = latest_safe_block
                 .checked_sub(RETRY_PREVIOUS_BLOCKS)
@@ -758,23 +648,21 @@ pub async fn watch_blocks(
 /// It waits on rx channel to receive block ranges and then calls process_block_range to process them.
 #[tracing::instrument(skip_all)]
 pub async fn process_new_blocks(
-    chain_state: BlockchainState,
+    chain_writer: Arc<dyn ChainWriter>,
+    hash_chain: Arc<HashChainState>,
     mut rx: mpsc::Receiver<BlockRange>,
-    contract: Arc<SignablePythContract>,
-    gas_limit: U256,
     metrics: Arc<KeeperMetrics>,
-    fulfilled_requests_cache: Arc<RwLock<HashMap<u64, RequestState>>>,
+    request_cache: Arc<RequestCache>,
 ) {
     tracing::info!("Waiting for new block ranges to process");
     loop {
         if let Some(block_range) = rx.recv().await {
             process_block_range(
                 block_range,
-                Arc::clone(&contract),
-                gas_limit,
-                chain_state.clone(),
+                chain_writer.clone(),
+                hash_chain.clone(),
                 metrics.clone(),
-                fulfilled_requests_cache.clone(),
+                request_cache.clone(),
             )
             .in_current_span()
             .await;
@@ -786,20 +674,18 @@ pub async fn process_new_blocks(
 #[tracing::instrument(skip_all)]
 pub async fn process_backlog(
     backlog_range: BlockRange,
-    contract: Arc<SignablePythContract>,
-    gas_limit: U256,
-    chain_state: BlockchainState,
+    chain_writer: Arc<dyn ChainWriter>,
+    hash_chain: Arc<HashChainState>,
     metrics: Arc<KeeperMetrics>,
-    fulfilled_requests_cache: Arc<RwLock<HashMap<u64, RequestState>>>,
+    request_cache: Arc<RequestCache>,
 ) {
     tracing::info!("Processing backlog");
     process_block_range(
         backlog_range,
-        contract,
-        gas_limit,
-        chain_state,
+        chain_writer,
+        hash_chain,
         metrics,
-        fulfilled_requests_cache,
+        request_cache,
     )
     .in_current_span()
     .await;