Преглед на файлове

feat(fortuna-v2): implement a keeper service for entropy v2 (#1366)

* extract code to run api

* save

* some changes

* add exit checks

* retry for sub threads and fetch events

* handle events

* remove unused

* compiling

* add logs to keeper

* add simulation and some fixed

* refactoring keeper

* backlog refactoring works

* extract handle event

* extract watch blocks in a method

* handle events extracted

* remove res block from backlog method

* remove res block from watch_blocks

* remove res block from process events

* load private key from file

* add gas limit to blockchain config

* remove unused imports

* remove a log

* gas param u256

* spell simulate

* rename keeper private keeper file

* wait for only api to exit

* remove exit check from keeper

* remove is valid request method as simulate will cover things

* remove some parameters

* remove exit check from keeper

* use saturating sub

* correct condition

* update logging statement

* combine logs

* use nonce manager to send transaction

* poll instead of stream and add nonce middleware

* remove unused

* fix tests

* add ws support to streaming

* Refactor and improve error handling

* replace simulation with gas estimation

* add polling support for when no wss url

* version update

* test check

* update comment

* update key comment

* rename chain_config to chain_state

* update version

* pad gas estimate

* add comments

---------

Co-authored-by: Amin Moghaddam <amin@pyth.network>
Dev Kalra преди 1 година
родител
ревизия
6295674efa
променени са 10 файла, в които са добавени 710 реда и са изтрити 75 реда
  1. 68 45
      fortuna/Cargo.lock
  2. 4 2
      fortuna/Cargo.toml
  3. 4 0
      fortuna/src/api.rs
  4. 60 2
      fortuna/src/chain/ethereum.rs
  5. 46 1
      fortuna/src/chain/reader.rs
  6. 92 23
      fortuna/src/command/run.rs
  7. 10 1
      fortuna/src/config.rs
  8. 18 1
      fortuna/src/config/run.rs
  9. 407 0
      fortuna/src/keeper.rs
  10. 1 0
      fortuna/src/main.rs

+ 68 - 45
fortuna/Cargo.lock

@@ -522,9 +522,9 @@ dependencies = [
 
 [[package]]
 name = "cargo_metadata"
-version = "0.17.0"
+version = "0.18.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e7daec1a2a2129eeba1644b220b4647ec537b0b5d4bfd6876fcc5a540056b592"
+checksum = "2d886547e41f740c616ae73108f6eb70afe6d940c7bc697cb30f13daec073037"
 dependencies = [
  "camino",
  "cargo-platform",
@@ -1031,9 +1031,9 @@ dependencies = [
 
 [[package]]
 name = "enr"
-version = "0.9.1"
+version = "0.10.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fe81b5c06ecfdbc71dd845216f225f53b62a10cb8a16c946836a3467f701d05b"
+checksum = "2a3d8dc56e02f954cac8eb489772c552c473346fc34f67412bb6244fd647f7e4"
 dependencies = [
  "base64 0.21.4",
  "bytes",
@@ -1146,9 +1146,9 @@ dependencies = [
 
 [[package]]
 name = "ethers"
-version = "2.0.10"
+version = "2.0.14"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1ad13497f6e0a24292fc7b408e30d22fe9dc262da1f40d7b542c3a44e7fc0476"
+checksum = "816841ea989f0c69e459af1cf23a6b0033b19a55424a1ea3a30099becdb8dec0"
 dependencies = [
  "ethers-addressbook",
  "ethers-contract",
@@ -1162,9 +1162,9 @@ dependencies = [
 
 [[package]]
 name = "ethers-addressbook"
-version = "2.0.10"
+version = "2.0.14"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c6e9e8acd0ed348403cc73a670c24daba3226c40b98dc1a41903766b3ab6240a"
+checksum = "5495afd16b4faa556c3bba1f21b98b4983e53c1755022377051a975c3b021759"
 dependencies = [
  "ethers-core",
  "once_cell",
@@ -1174,9 +1174,9 @@ dependencies = [
 
 [[package]]
 name = "ethers-contract"
-version = "2.0.10"
+version = "2.0.14"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d79269278125006bb0552349c03593ffa9702112ca88bc7046cc669f148fb47c"
+checksum = "6fceafa3578c836eeb874af87abacfb041f92b4da0a78a5edd042564b8ecdaaa"
 dependencies = [
  "const-hex",
  "ethers-contract-abigen",
@@ -1193,9 +1193,9 @@ dependencies = [
 
 [[package]]
 name = "ethers-contract-abigen"
-version = "2.0.10"
+version = "2.0.14"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ce95a43c939b2e4e2f3191c5ad4a1f279780b8a39139c9905b43a7433531e2ab"
+checksum = "04ba01fbc2331a38c429eb95d4a570166781f14290ef9fdb144278a90b5a739b"
 dependencies = [
  "Inflector",
  "const-hex",
@@ -1211,15 +1211,15 @@ dependencies = [
  "serde",
  "serde_json",
  "syn 2.0.38",
- "toml 0.7.8",
+ "toml 0.8.12",
  "walkdir",
 ]
 
 [[package]]
 name = "ethers-contract-derive"
-version = "2.0.10"
+version = "2.0.14"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8e9ce44906fc871b3ee8c69a695ca7ec7f70e50cb379c9b9cb5e532269e492f6"
+checksum = "87689dcabc0051cde10caaade298f9e9093d65f6125c14575db3fd8c669a168f"
 dependencies = [
  "Inflector",
  "const-hex",
@@ -1233,9 +1233,9 @@ dependencies = [
 
 [[package]]
 name = "ethers-core"
-version = "2.0.10"
+version = "2.0.14"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c0a17f0708692024db9956b31d7a20163607d2745953f5ae8125ab368ba280ad"
+checksum = "82d80cc6ad30b14a48ab786523af33b37f28a8623fc06afd55324816ef18fb1f"
 dependencies = [
  "arrayvec",
  "bytes",
@@ -1253,7 +1253,7 @@ dependencies = [
  "rlp",
  "serde",
  "serde_json",
- "strum 0.25.0",
+ "strum 0.26.2",
  "syn 2.0.38",
  "tempfile",
  "thiserror",
@@ -1263,10 +1263,11 @@ dependencies = [
 
 [[package]]
 name = "ethers-etherscan"
-version = "2.0.10"
+version = "2.0.14"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0e53451ea4a8128fbce33966da71132cf9e1040dcfd2a2084fd7733ada7b2045"
+checksum = "e79e5973c26d4baf0ce55520bd732314328cabe53193286671b47144145b9649"
 dependencies = [
+ "chrono",
  "ethers-core",
  "reqwest",
  "semver",
@@ -1278,9 +1279,9 @@ dependencies = [
 
 [[package]]
 name = "ethers-middleware"
-version = "2.0.10"
+version = "2.0.14"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "473f1ccd0c793871bbc248729fa8df7e6d2981d6226e4343e3bbaa9281074d5d"
+checksum = "48f9fdf09aec667c099909d91908d5eaf9be1bd0e2500ba4172c1d28bfaa43de"
 dependencies = [
  "async-trait",
  "auto_impl",
@@ -1305,9 +1306,9 @@ dependencies = [
 
 [[package]]
 name = "ethers-providers"
-version = "2.0.10"
+version = "2.0.14"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6838fa110e57d572336178b7c79e94ff88ef976306852d8cb87d9e5b1fc7c0b5"
+checksum = "6434c9a33891f1effc9c75472e12666db2fa5a0fec4b29af6221680a6fe83ab2"
 dependencies = [
  "async-trait",
  "auto_impl",
@@ -1316,6 +1317,7 @@ dependencies = [
  "const-hex",
  "enr",
  "ethers-core",
+ "futures-channel",
  "futures-core",
  "futures-timer",
  "futures-util",
@@ -1342,9 +1344,9 @@ dependencies = [
 
 [[package]]
 name = "ethers-signers"
-version = "2.0.10"
+version = "2.0.14"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5ea44bec930f12292866166f9ddbea6aa76304850e4d8dcd66dc492b43d00ff1"
+checksum = "228875491c782ad851773b652dd8ecac62cda8571d3bc32a5853644dd26766c2"
 dependencies = [
  "async-trait",
  "coins-bip32",
@@ -1361,9 +1363,9 @@ dependencies = [
 
 [[package]]
 name = "ethers-solc"
-version = "2.0.10"
+version = "2.0.14"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "de34e484e7ae3cab99fbfd013d6c5dc7f9013676a4e0e414d8b12e1213e8b3ba"
+checksum = "66244a771d9163282646dbeffe0e6eca4dda4146b6498644e678ac6089b11edd"
 dependencies = [
  "cfg-if",
  "const-hex",
@@ -1486,7 +1488,7 @@ dependencies = [
 
 [[package]]
 name = "fortuna"
-version = "3.3.4"
+version = "4.0.0"
 dependencies = [
  "anyhow",
  "axum",
@@ -1498,6 +1500,7 @@ dependencies = [
  "clap",
  "ethabi",
  "ethers",
+ "futures",
  "hex",
  "lazy_static",
  "once_cell",
@@ -2758,7 +2761,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "7f4c021e1093a56626774e81216a4ce732a735e5bad4868a03f3ed65ca0c3919"
 dependencies = [
  "once_cell",
- "toml_edit",
+ "toml_edit 0.19.15",
 ]
 
 [[package]]
@@ -3387,9 +3390,9 @@ dependencies = [
 
 [[package]]
 name = "serde_spanned"
-version = "0.6.3"
+version = "0.6.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "96426c9936fd7a0124915f9185ea1d20aa9445cc9821142f0a73bc9207a2e186"
+checksum = "eb3622f419d1296904700073ea6cc23ad690adbd66f13ea683df73298736f0c1"
 dependencies = [
  "serde",
 ]
@@ -3581,9 +3584,9 @@ dependencies = [
 
 [[package]]
 name = "solang-parser"
-version = "0.3.2"
+version = "0.3.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7cb9fa2fa2fa6837be8a2495486ff92e3ffe68a99b6eeba288e139efdd842457"
+checksum = "c425ce1c59f4b154717592f0bdf4715c3a1d55058883622d3157e1f0908a5b26"
 dependencies = [
  "itertools 0.11.0",
  "lalrpop",
@@ -3645,11 +3648,11 @@ dependencies = [
 
 [[package]]
 name = "strum"
-version = "0.25.0"
+version = "0.26.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125"
+checksum = "5d8cec3501a5194c432b2b7976db6b7d10ec95c253208b45f83f7136aa985e29"
 dependencies = [
- "strum_macros 0.25.2",
+ "strum_macros 0.26.2",
 ]
 
 [[package]]
@@ -3667,9 +3670,9 @@ dependencies = [
 
 [[package]]
 name = "strum_macros"
-version = "0.25.2"
+version = "0.26.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ad8d03b598d3d0fff69bf533ee3ef19b8eeb342729596df84bcc7e1f96ec4059"
+checksum = "c6cf59daf282c0a494ba14fd21610a0325f9f90ec9d1231dea26bcb1d696c946"
 dependencies = [
  "heck",
  "proc-macro2",
@@ -3955,21 +3958,21 @@ dependencies = [
 
 [[package]]
 name = "toml"
-version = "0.7.8"
+version = "0.8.12"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dd79e69d3b627db300ff956027cc6c3798cef26d22526befdfcd12feeb6d2257"
+checksum = "e9dd1545e8208b4a5af1aa9bbd0b4cf7e9ea08fabc5d0a5c67fcaafa17433aa3"
 dependencies = [
  "serde",
  "serde_spanned",
  "toml_datetime",
- "toml_edit",
+ "toml_edit 0.22.9",
 ]
 
 [[package]]
 name = "toml_datetime"
-version = "0.6.3"
+version = "0.6.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7cda73e2f1397b1262d6dfdcef8aafae14d1de7748d66822d3bfeeb6d03e5e4b"
+checksum = "3550f4e9685620ac18a50ed434eb3aec30db8ba93b0287467bca5826ea25baf1"
 dependencies = [
  "serde",
 ]
@@ -3979,12 +3982,23 @@ name = "toml_edit"
 version = "0.19.15"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421"
+dependencies = [
+ "indexmap 2.0.2",
+ "toml_datetime",
+ "winnow 0.5.16",
+]
+
+[[package]]
+name = "toml_edit"
+version = "0.22.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8e40bb779c5187258fd7aad0eb68cb8706a0a81fa712fbea808ab43c4b8374c4"
 dependencies = [
  "indexmap 2.0.2",
  "serde",
  "serde_spanned",
  "toml_datetime",
- "winnow",
+ "winnow 0.6.5",
 ]
 
 [[package]]
@@ -4512,6 +4526,15 @@ dependencies = [
  "memchr",
 ]
 
+[[package]]
+name = "winnow"
+version = "0.6.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dffa400e67ed5a4dd237983829e66475f0a4a26938c4b04c21baede6262215b8"
+dependencies = [
+ "memchr",
+]
+
 [[package]]
 name = "winreg"
 version = "0.50.0"

+ 4 - 2
fortuna/Cargo.toml

@@ -1,6 +1,6 @@
 [package]
 name    = "fortuna"
-version = "3.3.4"
+version = "4.0.0"
 edition = "2021"
 
 [dependencies]
@@ -12,7 +12,8 @@ bincode = "1.3.3"
 byteorder   = "1.5.0"
 clap        = { version = "4.4.6", features = ["derive", "cargo", "env"] }
 ethabi      = "18.0.0"
-ethers      = "2.0.10"
+ethers      = { version = "2.0.14", features = ["ws"] }
+futures            = { version = "0.3.28" }
 hex         = "0.4.3"
 prometheus-client  = { version = "0.21.2" }
 pythnet-sdk = { path = "../pythnet/pythnet_sdk", features = ["strum"] }
@@ -34,5 +35,6 @@ once_cell = "1.18.0"
 lazy_static = "1.4.0"
 url = "2.5.0"
 
+
 [dev-dependencies]
 axum-test = "13.1.1"

+ 4 - 0
fortuna/src/api.rs

@@ -73,6 +73,8 @@ impl ApiState {
 /// The state of the randomness service for a single blockchain.
 #[derive(Clone)]
 pub struct BlockchainState {
+    /// The chain id for this blockchain, useful for logging
+    pub id:                     ChainId,
     /// The hash chain(s) required to serve random numbers for this blockchain
     pub state:                  Arc<HashChainState>,
     /// The contract that the server is fulfilling requests for.
@@ -245,6 +247,7 @@ mod test {
         let eth_read = Arc::new(MockEntropyReader::with_requests(10, &[]));
 
         let eth_state = BlockchainState {
+            id:                     "ethereum".into(),
             state:                  ETH_CHAIN.clone(),
             contract:               eth_read.clone(),
             provider_address:       PROVIDER,
@@ -255,6 +258,7 @@ mod test {
         let avax_read = Arc::new(MockEntropyReader::with_requests(10, &[]));
 
         let avax_state = BlockchainState {
+            id:                     "avalanche".into(),
             state:                  AVAX_CHAIN.clone(),
             contract:               avax_read.clone(),
             provider_address:       PROVIDER,

+ 60 - 2
fortuna/src/chain/ethereum.rs

@@ -5,6 +5,7 @@ use {
             BlockNumber,
             BlockStatus,
             EntropyReader,
+            RequestedWithCallbackEvent,
         },
         config::EthereumConfig,
     },
@@ -18,6 +19,7 @@ use {
         abi::RawLog,
         contract::{
             abigen,
+            ContractError,
             EthLogDecode,
         },
         core::types::Address,
@@ -27,6 +29,7 @@ use {
                 TransformerError,
                 TransformerMiddleware,
             },
+            NonceManagerMiddleware,
             SignerMiddleware,
         },
         prelude::TransactionRequest,
@@ -42,6 +45,7 @@ use {
         types::{
             transaction::eip2718::TypedTransaction,
             BlockNumber as EthersBlockNumber,
+            U256,
         },
     },
     sha3::{
@@ -59,7 +63,10 @@ abigen!(
 );
 
 pub type SignablePythContract = PythRandom<
-    TransformerMiddleware<SignerMiddleware<Provider<Http>, LocalWallet>, LegacyTxTransformer>,
+    TransformerMiddleware<
+        NonceManagerMiddleware<SignerMiddleware<Provider<Http>, LocalWallet>>,
+        LegacyTxTransformer,
+    >,
 >;
 pub type PythContract = PythRandom<Provider<Http>>;
 
@@ -97,10 +104,12 @@ impl SignablePythContract {
             .parse::<LocalWallet>()?
             .with_chain_id(chain_id.as_u64());
 
+        let address = wallet__.address();
+
         Ok(PythRandom::new(
             chain_config.contract_addr,
             Arc::new(TransformerMiddleware::new(
-                SignerMiddleware::new(provider, wallet__),
+                NonceManagerMiddleware::new(SignerMiddleware::new(provider, wallet__), address),
                 transformer,
             )),
         ))
@@ -225,4 +234,53 @@ impl EntropyReader for PythContract {
             .ok_or_else(|| Error::msg("pending confirmation"))?
             .as_u64())
     }
+
+    async fn get_request_with_callback_events(
+        &self,
+        from_block: BlockNumber,
+        to_block: BlockNumber,
+    ) -> Result<Vec<RequestedWithCallbackEvent>> {
+        let mut event = self.requested_with_callback_filter();
+        event.filter = event.filter.from_block(from_block).to_block(to_block);
+
+        let res: Vec<RequestedWithCallbackFilter> = event.query().await?;
+
+        Ok(res
+            .iter()
+            .map(|r| RequestedWithCallbackEvent {
+                sequence_number:    r.sequence_number,
+                user_random_number: r.user_random_number,
+                provider_address:   r.request.provider,
+            })
+            .collect())
+    }
+
+    async fn estimate_reveal_with_callback_gas(
+        &self,
+        provider: Address,
+        sequence_number: u64,
+        user_random_number: [u8; 32],
+        provider_revelation: [u8; 32],
+    ) -> Result<Option<U256>> {
+        let result: Result<U256, ContractError<Provider<Http>>> = self
+            .reveal_with_callback(
+                provider,
+                sequence_number,
+                user_random_number,
+                provider_revelation,
+            )
+            .estimate_gas()
+            .await;
+
+        match result {
+            Ok(gas) => Ok(Some(gas)),
+            Err(e) => match e {
+                ContractError::ProviderError { e } => Err(anyhow!(e)),
+                _ => {
+                    tracing::info!("Gas estimation for reveal with callback failed: {:?}", e);
+                    Ok(None)
+                }
+            },
+        }
+    }
 }

+ 46 - 1
fortuna/src/chain/reader.rs

@@ -4,6 +4,7 @@ use {
     ethers::types::{
         Address,
         BlockNumber as EthersBlockNumber,
+        U256,
     },
 };
 
@@ -32,6 +33,13 @@ impl Into<EthersBlockNumber> for BlockStatus {
     }
 }
 
+#[derive(Clone)]
+pub struct RequestedWithCallbackEvent {
+    pub sequence_number:    u64,
+    pub user_random_number: [u8; 32],
+    pub provider_address:   Address,
+}
+
 /// EntropyReader is the read-only interface of the Entropy contract.
 #[async_trait]
 pub trait EntropyReader: Send + Sync {
@@ -42,6 +50,22 @@ pub trait EntropyReader: Send + Sync {
         -> Result<Option<Request>>;
 
     async fn get_block_number(&self, confirmed_block_status: BlockStatus) -> Result<BlockNumber>;
+
+    async fn get_request_with_callback_events(
+        &self,
+        from_block: BlockNumber,
+        to_block: BlockNumber,
+    ) -> Result<Vec<RequestedWithCallbackEvent>>;
+
+    /// Simulate a reveal with callback. Returns Some(gas) if the estimation was successful.
+    /// Returns None otherwise. Returns an error if the gas could not be estimated.
+    async fn estimate_reveal_with_callback_gas(
+        &self,
+        provider: Address,
+        sequence_number: u64,
+        user_random_number: [u8; 32],
+        provider_revelation: [u8; 32],
+    ) -> Result<Option<U256>>;
 }
 
 /// An in-flight request stored in the contract.
@@ -68,7 +92,10 @@ pub mod mock {
         },
         anyhow::Result,
         axum::async_trait,
-        ethers::types::Address,
+        ethers::types::{
+            Address,
+            U256,
+        },
         std::sync::RwLock,
     };
 
@@ -147,5 +174,23 @@ pub mod mock {
         ) -> Result<BlockNumber> {
             Ok(*self.block_number.read().unwrap())
         }
+
+        async fn get_request_with_callback_events(
+            &self,
+            _from_block: BlockNumber,
+            _to_block: BlockNumber,
+        ) -> Result<Vec<super::RequestedWithCallbackEvent>> {
+            Ok(vec![])
+        }
+
+        async fn estimate_reveal_with_callback_gas(
+            &self,
+            provider: Address,
+            sequence_number: u64,
+            user_random_number: [u8; 32],
+            provider_revelation: [u8; 32],
+        ) -> Result<Option<U256>> {
+            Ok(Some(U256::from(5)))
+        }
     }
 }

+ 92 - 23
fortuna/src/command/run.rs

@@ -1,12 +1,17 @@
 use {
     crate::{
-        api,
+        api::{
+            self,
+            BlockchainState,
+            ChainId,
+        },
         chain::ethereum::PythContract,
         command::register_provider::CommitmentMetadata,
         config::{
             Config,
             RunOptions,
         },
+        keeper,
         state::{
             HashChainState,
             PebbleHashChain,
@@ -14,19 +19,30 @@ use {
     },
     anyhow::{
         anyhow,
+        Error,
         Result,
     },
     axum::Router,
     std::{
         collections::HashMap,
+        net::SocketAddr,
         sync::Arc,
+        vec,
+    },
+    tokio::{
+        spawn,
+        sync::watch,
     },
     tower_http::cors::CorsLayer,
     utoipa::OpenApi,
     utoipa_swagger_ui::SwaggerUi,
 };
 
-pub async fn run(opts: &RunOptions) -> Result<()> {
+pub async fn run_api(
+    socket_addr: SocketAddr,
+    chains: HashMap<String, api::BlockchainState>,
+    mut rx_exit: watch::Receiver<bool>,
+) -> Result<()> {
     #[derive(OpenApi)]
     #[openapi(
     paths(
@@ -46,11 +62,70 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
     )]
     struct ApiDoc;
 
+    let metrics_registry = api::Metrics::new();
+    let api_state = api::ApiState {
+        chains:  Arc::new(chains),
+        metrics: Arc::new(metrics_registry),
+    };
+
+    // Initialize Axum Router. Note the type here is a `Router<State>` due to the use of the
+    // `with_state` method which replaces `Body` with `State` in the type signature.
+    let app = Router::new();
+    let app = app
+        .merge(SwaggerUi::new("/docs").url("/docs/openapi.json", ApiDoc::openapi()))
+        .merge(api::routes(api_state))
+        // Permissive CORS layer to allow all origins
+        .layer(CorsLayer::permissive());
+
+    tracing::info!("Starting server on: {:?}", &socket_addr);
+    // Binds the axum's server to the configured address and port. This is a blocking call and will
+    // not return until the server is shutdown.
+    axum::Server::try_bind(&socket_addr)?
+        .serve(app.into_make_service())
+        .with_graceful_shutdown(async {
+            // It can return an error or an Ok(()). In both cases, we would shut down.
+            // As Ok(()) means, exit signal (ctrl + c) was received.
+            // And Err(e) means, the sender was dropped which should not be the case.
+            let _ = rx_exit.changed().await;
+
+            tracing::info!("Shutting down RPC server...");
+        })
+        .await?;
+
+    Ok(())
+}
+
+
+pub async fn run_keeper(
+    chains: HashMap<String, api::BlockchainState>,
+    config: Config,
+    private_key: String,
+) -> Result<()> {
+    let mut handles = Vec::new();
+    for (chain_id, chain_config) in chains {
+        let chain_eth_config = config
+            .chains
+            .get(&chain_id)
+            .expect("All chains should be present in the config file")
+            .clone();
+        let private_key = private_key.clone();
+        handles.push(spawn(keeper::run_keeper_threads(
+            private_key,
+            chain_eth_config,
+            chain_config.clone(),
+        )));
+    }
+
+    Ok(())
+}
+
+pub async fn run(opts: &RunOptions) -> Result<()> {
     let config = Config::load(&opts.config.config)?;
+    let private_key = opts.load_private_key()?;
     let secret = opts.randomness.load_secret()?;
+    let (tx_exit, rx_exit) = watch::channel(false);
 
-
-    let mut chains = HashMap::new();
+    let mut chains: HashMap<ChainId, BlockchainState> = HashMap::new();
     for (chain_id, chain_config) in &config.chains {
         let contract = Arc::new(PythContract::from_config(&chain_config)?);
         let provider_info = contract.get_provider_info(opts.provider).call().await?;
@@ -89,6 +164,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
         }
 
         let state = api::BlockchainState {
+            id: chain_id.clone(),
             state: Arc::new(chain_state),
             contract,
             provider_address: opts.provider,
@@ -99,28 +175,21 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
         chains.insert(chain_id.clone(), state);
     }
 
-    let metrics_registry = api::Metrics::new();
-    let api_state = api::ApiState {
-        chains:  Arc::new(chains),
-        metrics: Arc::new(metrics_registry),
-    };
 
-    // Initialize Axum Router. Note the type here is a `Router<State>` due to the use of the
-    // `with_state` method which replaces `Body` with `State` in the type signature.
-    let app = Router::new();
-    let app = app
-        .merge(SwaggerUi::new("/docs").url("/docs/openapi.json", ApiDoc::openapi()))
-        .merge(api::routes(api_state))
-        // Permissive CORS layer to allow all origins
-        .layer(CorsLayer::permissive());
+    // Listen for Ctrl+C so we can set the exit flag and wait for a graceful shutdown.
+    spawn(async move {
+        tracing::info!("Registered shutdown signal handler...");
+        tokio::signal::ctrl_c().await.unwrap();
+        tracing::info!("Shut down signal received, waiting for tasks...");
+        // no need to handle error here, as it will only occur when all the
+        // receiver has been dropped and that's what we want to do
+        tx_exit.send(true)?;
 
+        Ok::<(), Error>(())
+    });
+    spawn(run_keeper(chains.clone(), config, private_key));
 
-    tracing::info!("Starting server on: {:?}", &opts.addr);
-    // Binds the axum's server to the configured address and port. This is a blocking call and will
-    // not return until the server is shutdown.
-    axum::Server::try_bind(&opts.addr)?
-        .serve(app.into_make_service())
-        .await?;
+    run_api(opts.addr.clone(), chains, rx_exit).await?;
 
     Ok(())
 }

+ 10 - 1
fortuna/src/config.rs

@@ -18,7 +18,10 @@ use {
         Args,
         Parser,
     },
-    ethers::types::Address,
+    ethers::types::{
+        Address,
+        U256,
+    },
     std::{
         collections::HashMap,
         fs,
@@ -131,6 +134,9 @@ pub struct EthereumConfig {
     /// URL of a Geth RPC endpoint to use for interacting with the blockchain.
     pub geth_rpc_addr: String,
 
+    /// URL of a Geth RPC wss endpoint to use for subscribing to blockchain events.
+    pub geth_rpc_wss: Option<String>,
+
     /// Address of a Pyth Randomness contract to interact with.
     pub contract_addr: Address,
 
@@ -148,4 +154,7 @@ pub struct EthereumConfig {
     /// For example, Finalized, Safe, Latest
     #[serde(default)]
     pub confirmed_block_status: BlockStatus,
+
+    /// The gas limit to use for entropy callback transactions.
+    pub gas_limit: U256,
 }

+ 18 - 1
fortuna/src/config/run.rs

@@ -3,9 +3,13 @@ use {
         ConfigOptions,
         RandomnessOptions,
     },
+    anyhow::Result,
     clap::Args,
     ethers::types::Address,
-    std::net::SocketAddr,
+    std::{
+        fs,
+        net::SocketAddr,
+    },
 };
 
 /// Run the webservice
@@ -27,4 +31,17 @@ pub struct RunOptions {
     #[arg(long = "provider")]
     #[arg(env = "FORTUNA_PROVIDER")]
     pub provider: Address,
+
+    /// Path to a file containing a 20-byte (40 char) hex encoded Ethereum private key.
+    /// This key is required to submit transactions for entropy callback requests.
+    /// This key should not be a registered provider.
+    #[arg(long = "keeper-private-key")]
+    #[arg(env = "KEEPER_PRIVATE_KEY")]
+    pub keeper_private_key_file: String,
+}
+
+impl RunOptions {
+    pub fn load_private_key(&self) -> Result<String> {
+        return Ok((fs::read_to_string(&self.keeper_private_key_file))?);
+    }
 }

+ 407 - 0
fortuna/src/keeper.rs

@@ -0,0 +1,407 @@
+use {
+    crate::{
+        api::{
+            self,
+            BlockchainState,
+        },
+        chain::{
+            ethereum::SignablePythContract,
+            reader::{
+                BlockNumber,
+                RequestedWithCallbackEvent,
+            },
+        },
+        config::EthereumConfig,
+    },
+    anyhow::Result,
+    ethers::{
+        providers::{
+            Middleware,
+            Provider,
+            Ws,
+        },
+        types::U256,
+    },
+    futures::StreamExt,
+    std::sync::Arc,
+    tokio::{
+        spawn,
+        sync::mpsc,
+        time::{
+            self,
+            Duration,
+        },
+    },
+    tracing,
+};
+
+
+/// How much to wait before retrying in case of an RPC error
+const RETRY_INTERVAL: Duration = Duration::from_secs(5);
+/// How many blocks to look back for events that might be missed when starting the keeper
+const BACKLOG_RANGE: u64 = 1000;
+/// How many blocks to fetch events for in a single rpc call
+const BLOCK_BATCH_SIZE: u64 = 100;
+/// How much to wait before polling the next latest block
+const POLL_INTERVAL: Duration = Duration::from_secs(5);
+
+
+/// Get the latest safe block number for the chain. Retry internally if there is an error.
+async fn get_latest_safe_block(chain_state: &BlockchainState) -> BlockNumber {
+    loop {
+        match chain_state
+            .contract
+            .get_block_number(chain_state.confirmed_block_status)
+            .await
+        {
+            Ok(latest_confirmed_block) => {
+                return latest_confirmed_block - chain_state.reveal_delay_blocks
+            }
+            Err(e) => {
+                tracing::error!("Error while getting block number. error: {:?}", e);
+                time::sleep(RETRY_INTERVAL).await;
+            }
+        }
+    }
+}
+
+/// Run threads to handle events for the last `BACKLOG_RANGE` blocks. Watch for new blocks and
+/// handle any events for the new blocks.
+pub async fn run_keeper_threads(
+    private_key: String,
+    chain_eth_config: EthereumConfig,
+    chain_state: BlockchainState,
+) {
+    tracing::info!("Starting keeper for chain: {}", &chain_state.id);
+
+    let latest_safe_block = get_latest_safe_block(&chain_state).await;
+
+    tracing::info!(
+        "Latest safe block for chain {}: {} ",
+        &chain_state.id,
+        &latest_safe_block
+    );
+
+    let contract = Arc::new(
+        SignablePythContract::from_config(&chain_eth_config, &private_key)
+            .await
+            .expect("Chain config should be valid"),
+    );
+
+    let backlog_chain_state = chain_state.clone();
+    let backlog_contract = contract.clone();
+    // Spawn a thread to handle the events from last BACKLOG_RANGE blocks.
+    spawn(async move {
+        let from_block = latest_safe_block.saturating_sub(BACKLOG_RANGE);
+        process_block_range(
+            BlockRange {
+                from: from_block,
+                to:   latest_safe_block,
+            },
+            backlog_contract,
+            chain_eth_config.gas_limit,
+            backlog_chain_state.clone(),
+        )
+        .await;
+        tracing::info!(
+            "Backlog processing for chain: {} completed",
+            &backlog_chain_state.id
+        );
+    });
+
+    let (tx, rx) = mpsc::channel::<BlockRange>(1000);
+
+    let watch_blocks_chain_state = chain_state.clone();
+    // Spawn a thread to watch for new blocks and send the range of blocks for which events has not been handled to the `tx` channel.
+    spawn(async move {
+        loop {
+            if let Err(e) = watch_blocks(
+                watch_blocks_chain_state.clone(),
+                latest_safe_block,
+                tx.clone(),
+                chain_eth_config.geth_rpc_wss.clone(),
+            )
+            .await
+            {
+                tracing::error!(
+                    "Error in watching blocks for chain: {}, {:?}",
+                    &watch_blocks_chain_state.id,
+                    e
+                );
+                time::sleep(RETRY_INTERVAL).await;
+            }
+        }
+    });
+    // Spawn a thread that listens for block ranges on the `rx` channel and processes the events for those blocks.
+    spawn(process_new_blocks(
+        chain_state.clone(),
+        rx,
+        Arc::clone(&contract),
+        chain_eth_config.gas_limit,
+    ));
+}
+
+
+// Process an event for a chain. It estimates the gas for the reveal with callback and
+// submits the transaction if the gas estimate is below the gas limit.
+// It will return an Error if the gas estimation failed with a provider error or if the
+// reveal with callback failed with a provider error.
+pub async fn process_event(
+    event: RequestedWithCallbackEvent,
+    chain_config: &BlockchainState,
+    contract: &Arc<SignablePythContract>,
+    gas_limit: U256,
+) -> Result<()> {
+    if chain_config.provider_address != event.provider_address {
+        return Ok(());
+    }
+    let provider_revelation = match chain_config.state.reveal(event.sequence_number) {
+        Ok(result) => result,
+        Err(e) => {
+            tracing::error!(
+                "Error while revealing for provider: {} and sequence number: {} with error: {:?}",
+                event.provider_address,
+                event.sequence_number,
+                e
+            );
+            return Ok(());
+        }
+    };
+
+    let gas_estimate_res = chain_config
+        .contract
+        .estimate_reveal_with_callback_gas(
+            event.provider_address,
+            event.sequence_number,
+            event.user_random_number,
+            provider_revelation,
+        )
+        .await;
+
+    match gas_estimate_res {
+        Ok(gas_estimate_option) => match gas_estimate_option {
+            Some(gas_estimate) => {
+                // Pad the gas estimate by 33%
+                let (gas_estimate, _) = gas_estimate
+                    .saturating_mul(U256::from(4))
+                    .div_mod(U256::from(3));
+
+                if gas_estimate > gas_limit {
+                    tracing::error!(
+                        "Gas estimate for reveal with callback is higher than the gas limit for chain: {}",
+                        &chain_config.id
+                    );
+                    return Ok(());
+                }
+
+                let res = contract
+                    .reveal_with_callback(
+                        event.provider_address,
+                        event.sequence_number,
+                        event.user_random_number,
+                        provider_revelation,
+                    )
+                    .gas(gas_estimate)
+                    .send()
+                    .await?
+                    .await;
+
+                match res {
+                    Ok(_) => {
+                        tracing::info!(
+                            "Revealed on chain: {} for provider: {} and sequence number: {} with res: {:?}",
+                            &chain_config.id,
+                            event.provider_address,
+                            event.sequence_number,
+                            res
+                        );
+                        Ok(())
+                    }
+                    Err(e) => {
+                        tracing::error!(
+                            "Error while revealing for provider: {} and sequence number: {} with error: {:?}",
+                            event.provider_address,
+                            event.sequence_number,
+                            e
+                        );
+                        Err(e.into())
+                    }
+                }
+            }
+            None => Ok(()),
+        },
+        Err(e) => {
+            tracing::error!(
+                "Error while simulating reveal for provider: {} and sequence number: {} \n error: {:?}",
+                event.provider_address,
+                event.sequence_number,
+                e
+            );
+            Err(e)
+        }
+    }
+}
+
+
+/// Process a range of blocks for a chain. It will fetch events for the blocks in the provided range
+/// and then try to process them one by one. If the process fails, it will retry indefinitely.
+pub async fn process_block_range(
+    block_range: BlockRange,
+    contract: Arc<SignablePythContract>,
+    gas_limit: U256,
+    chain_state: api::BlockchainState,
+) {
+    tracing::info!(
+        "Processing blocks for chain: {} from block: {} to block: {}",
+        &chain_state.id,
+        block_range.from,
+        block_range.to
+    );
+
+    let BlockRange {
+        from: first_block,
+        to: last_block,
+    } = block_range;
+    let mut current_block = first_block;
+    while current_block <= last_block {
+        let mut to_block = current_block + BLOCK_BATCH_SIZE;
+        if to_block > last_block {
+            to_block = last_block;
+        }
+        let events_res = chain_state
+            .contract
+            .get_request_with_callback_events(current_block, to_block)
+            .await;
+
+        match events_res {
+            Ok(events) => {
+                for event in events {
+                    while let Err(e) =
+                        process_event(event.clone(), &chain_state, &contract, gas_limit).await
+                    {
+                        tracing::error!(
+                            "Error while processing event for chain: {} and sequence number: {}. Waiting for {} seconds before retry. error: {:?}",
+                            &chain_state.id,
+                            &event.sequence_number,
+                            RETRY_INTERVAL.as_secs(),
+                            e
+                        );
+                        time::sleep(RETRY_INTERVAL).await;
+                    }
+                }
+                tracing::info!(
+                    "Backlog processed for chain: {} from block: {} to block: {}",
+                    &chain_state.id,
+                    &current_block,
+                    &to_block
+                );
+                current_block = to_block + 1;
+            }
+            Err(e) => {
+                tracing::error!(
+                    "Error while getting events for chain: {} from block: {} to block: {}. Waiting for {} seconds before retry.  error: {:?}",
+                    &chain_state.id,
+                    &current_block,
+                    &to_block,
+                    RETRY_INTERVAL.as_secs(),
+                    e
+                );
+                time::sleep(RETRY_INTERVAL).await;
+            }
+        }
+    }
+}
+
+pub struct BlockRange {
+    pub from: BlockNumber,
+    pub to:   BlockNumber,
+}
+
+/// Watch for new blocks and send the range of blocks for which events have not been handled to the `tx` channel.
+/// We are subscribing to new blocks instead of events. If we miss some blocks, it will be fine as we are sending
+/// block ranges to the `tx` channel. If we have subscribed to events, we could have missed those and won't even
+/// know about it.
+pub async fn watch_blocks(
+    chain_state: BlockchainState,
+    latest_safe_block: BlockNumber,
+    tx: mpsc::Sender<BlockRange>,
+    geth_rpc_wss: Option<String>,
+) -> Result<()> {
+    tracing::info!(
+        "Watching blocks to handle new events for chain: {}",
+        &chain_state.id
+    );
+    let mut last_safe_block_processed = latest_safe_block;
+
+    let provider_option = match geth_rpc_wss {
+        Some(wss) => Some(Provider::<Ws>::connect(wss).await?),
+        None => {
+            tracing::info!("No wss provided for chain: {}", &chain_state.id);
+            None
+        }
+    };
+
+    let mut stream_option = match provider_option {
+        Some(ref provider) => Some(provider.subscribe_blocks().await?),
+        None => None,
+    };
+
+    loop {
+        match stream_option {
+            Some(ref mut stream) => {
+                stream.next().await;
+            }
+            None => {
+                time::sleep(POLL_INTERVAL).await;
+            }
+        }
+
+        let latest_safe_block = get_latest_safe_block(&chain_state).await;
+        if latest_safe_block > last_safe_block_processed {
+            match tx
+                .send(BlockRange {
+                    from: last_safe_block_processed + 1,
+                    to:   latest_safe_block,
+                })
+                .await
+            {
+                Ok(_) => {
+                    tracing::info!(
+                        "Block range sent to handle events for chain {}: {} to {}",
+                        &chain_state.id,
+                        &last_safe_block_processed + 1,
+                        &latest_safe_block
+                    );
+                    last_safe_block_processed = latest_safe_block;
+                }
+                Err(e) => {
+                    tracing::error!("Error while sending block range to handle events for chain {}. These will be handled in next call. error: {:?}",&chain_state.id,e);
+                }
+            };
+        }
+    }
+}
+
+/// It waits on rx channel to receive block ranges and then calls process_block_range to process them.
+pub async fn process_new_blocks(
+    chain_state: BlockchainState,
+    mut rx: mpsc::Receiver<BlockRange>,
+    contract: Arc<SignablePythContract>,
+    gas_limit: U256,
+) {
+    loop {
+        tracing::info!(
+            "Waiting for new block ranges to process for chain: {}",
+            &chain_state.id
+        );
+        if let Some(block_range) = rx.recv().await {
+            process_block_range(
+                block_range,
+                Arc::clone(&contract),
+                gas_limit,
+                chain_state.clone(),
+            )
+            .await;
+        }
+    }
+}

+ 1 - 0
fortuna/src/main.rs

@@ -11,6 +11,7 @@ pub mod api;
 pub mod chain;
 pub mod command;
 pub mod config;
+pub mod keeper;
 pub mod state;
 
 // Server TODO list: