Explorar el Código

feat(fortuna): Multiple replica support (#2812)

* feat(fortuna): implement multiple replica support with sequence number modulo filtering

Co-Authored-By: Tejas Badadare <tejas@dourolabs.xyz>

* feat(fortuna): add delayed processing for backup replicas

- Add configurable time delay before backup replicas check request status
- Backup replicas now wait backup_delay_seconds before attempting fulfillment
- Add backup_delay_seconds field to ReplicaConfig with default of 30 seconds
- Improves reliability by reducing race conditions between replicas

Co-Authored-By: Tejas Badadare <tejas@dourolabs.xyz>

* docs(fortuna): add comprehensive multi-replica setup documentation

- Add Multiple Replica Setup section to README.md with modulo assignment explanation
- Add replica_config examples to config.sample.yaml for 2, 3, and 5 replica setups
- Include deployment considerations, failover behavior, and wallet separation requirements
- Add validation for backup_delay_seconds > 0 to prevent race conditions

Co-Authored-By: Tejas Badadare <tejas@dourolabs.xyz>

* update sample config

* refactor(fortuna): streamline keeper configuration and improve event processing

- Updated config.sample.yaml by removing unnecessary blank lines.
- Changed backup_delay_seconds in README.md from 45 to 30 for consistency.
- Refactored run_keeper_threads to accept KeeperConfig directly instead of private_key.
- Enhanced run function to handle keeper configuration more effectively.
- Added comments in process_event_with_backoff to clarify primary and backup replica logic.

* feat(fortuna): add provider arg to entropy load testing script

* fix(fortuna): improve config and replica logic

* fix(fortuna): names, config check

* bump version

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Tejas Badadare hace 4 meses
padre
commit
c0666aa24c

+ 1 - 1
apps/fortuna/Cargo.lock

@@ -1647,7 +1647,7 @@ dependencies = [
 
 [[package]]
 name = "fortuna"
-version = "7.6.5"
+version = "7.7.0"
 dependencies = [
  "anyhow",
  "axum",

+ 3 - 4
apps/fortuna/Cargo.toml

@@ -1,6 +1,6 @@
 [package]
 name = "fortuna"
-version = "7.6.5"
+version = "7.7.0"
 edition = "2021"
 
 [lib]
@@ -41,13 +41,12 @@ url = "2.5.0"
 chrono = { version = "0.4.38", features = [
   "clock",
   "std",
-  "serde"
+  "serde",
 ], default-features = false }
 backoff = { version = "0.4.0", features = ["futures", "tokio"] }
 thiserror = "1.0.61"
 futures-locks = "0.7.1"
-sqlx = { version = "0.8", features = [ "runtime-tokio", "sqlite", "chrono" ] }
-
+sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite", "chrono"] }
 
 
 [dev-dependencies]

+ 59 - 0
apps/fortuna/README.md

@@ -17,6 +17,11 @@ a database to be available at build time. Create a `.env` file in the root of th
 DATABASE_URL="sqlite:fortuna.db?mode=rwc"
 ```
 
+Install sqlx for cargo with:
+```bash
+cargo install sqlx
+```
+
 Next, you need to create the database and apply the schema migrations. You can do this by running:
 
 ```bash
@@ -40,6 +45,60 @@ Please add the changed files in the `.sqlx` folder to your git commit.
 The Fortuna binary has a command-line interface to perform useful operations on the contract, such as
 registering a new randomness provider, or drawing a random value. To see the available commands, simply run `cargo run`.
 
+## Multiple Replica Setup
+
+Fortuna supports running multiple replica instances for high availability and reliability. This prevents service interruption if one instance goes down and distributes the workload across multiple instances.
+
+### How Replica Assignment Works
+
+- Each replica is assigned a unique `replica_id` (0, 1, 2, etc.)
+- Requests are distributed using modulo assignment: `sequence_number % total_replicas`
+- Each replica primarily handles requests assigned to its ID
+- After a configurable delay, replicas will process requests from other replicas as backup (failover)
+
+### Example Configurations
+
+**Two Replica Setup (Blue/Green):**
+```yaml
+# Replica 0 (Blue) - handles even sequence numbers (0, 2, 4, ...)
+keeper:
+  replica_config:
+    replica_id: 0
+    total_replicas: 2
+    backup_delay_seconds: 30
+
+# Replica 1 (Green) - handles odd sequence numbers (1, 3, 5, ...)
+keeper:
+  replica_config:
+    replica_id: 1
+    total_replicas: 2
+    backup_delay_seconds: 30
+```
+
+**Three Replica Setup:**
+```yaml
+# Replica 0 - handles sequence numbers 0, 3, 6, 9, ...
+keeper:
+  replica_config:
+    replica_id: 0
+    total_replicas: 3
+    backup_delay_seconds: 30
+```
+
+### Deployment Considerations
+
+1. **Separate Wallets**: Each replica MUST use a different private key to avoid nonce conflicts
+2. **Backup Delay**: Set `backup_delay_seconds` long enough to allow primary replica to process requests, but short enough for acceptable failover time (recommended: 30-60 seconds)
+3. **Monitoring**: Monitor each replica's processing metrics to ensure proper load distribution
+4. **Gas Management**: Each replica needs sufficient ETH balance for gas fees
+
+### Failover Behavior
+
+- Primary replica processes requests immediately
+- Backup replicas wait for `backup_delay_seconds` before checking if request is still unfulfilled
+- If request is already fulfilled during the delay, backup replica skips processing
+- This prevents duplicate transactions and wasted gas while ensuring reliability
+
 ## Local Development
 
 To start an instance of the webserver for local testing, you first need to perform a few setup steps:

+ 16 - 0
apps/fortuna/config.sample.yaml

@@ -86,3 +86,19 @@ keeper:
     value: 0xabcd
     # For production, you can store the private key in a file.
     # file: keeper-key.txt
+
+  # Multi-replica configuration
+  # Optional: Multi-replica configuration for high availability and load distribution
+  # Uncomment and configure for production deployments with multiple Fortuna instances
+  # replica_config:
+  #   replica_id: 0              # Unique identifier for this replica (0, 1, 2, ...)
+  #   total_replicas: 2          # Total number of replica instances running
+  #   backup_delay_seconds: 30   # Seconds to wait before processing other replicas' requests
+  #
+  # Example configurations:
+  #
+  # Two-replica setup (Blue/Green):
+  # - Replica 0: handles even sequence numbers (0, 2, 4, ...)
+  # - Replica 1: handles odd sequence numbers (1, 3, 5, ...)
+  #
+  # IMPORTANT: Each replica must use a different private_key to avoid nonce conflicts!

+ 9 - 1
apps/fortuna/src/command/run.rs

@@ -3,7 +3,7 @@ use {
         api::{self, ApiBlockChainState, BlockchainState, ChainId},
         chain::ethereum::InstrumentedPythContract,
         command::register_provider::CommitmentMetadata,
-        config::{Commitment, Config, EthereumConfig, ProviderConfig, RunOptions},
+        config::{Commitment, Config, EthereumConfig, ProviderConfig, ReplicaConfig, RunOptions},
         eth_utils::traced_client::RpcMetrics,
         history::History,
         keeper::{self, keeper_metrics::KeeperMetrics},
@@ -94,10 +94,14 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
 
     let keeper_metrics: Arc<KeeperMetrics> =
         Arc::new(KeeperMetrics::new(metrics_registry.clone()).await);
+
     let keeper_private_key_option = config.keeper.private_key.load()?;
     if keeper_private_key_option.is_none() {
         tracing::info!("Not starting keeper service: no keeper private key specified. Please add one to the config if you would like to run the keeper service.")
     }
+
+    let keeper_replica_config = config.keeper.replica_config.clone();
+
     let chains: Arc<RwLock<HashMap<ChainId, ApiBlockChainState>>> = Arc::new(RwLock::new(
         config
             .chains
@@ -110,6 +114,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
         keeper_metrics.add_chain(chain_id.clone(), config.provider.address);
         let keeper_metrics = keeper_metrics.clone();
         let keeper_private_key_option = keeper_private_key_option.clone();
+        let keeper_replica_config = keeper_replica_config.clone();
         let chains = chains.clone();
         let secret_copy = secret.clone();
         let rpc_metrics = rpc_metrics.clone();
@@ -123,6 +128,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
                     chain_config.clone(),
                     keeper_metrics.clone(),
                     keeper_private_key_option.clone(),
+                    keeper_replica_config.clone(),
                     chains.clone(),
                     &secret_copy,
                     history.clone(),
@@ -173,6 +179,7 @@ async fn setup_chain_and_run_keeper(
     chain_config: EthereumConfig,
     keeper_metrics: Arc<KeeperMetrics>,
     keeper_private_key_option: Option<String>,
+    keeper_replica_config: Option<ReplicaConfig>,
     chains: Arc<RwLock<HashMap<ChainId, ApiBlockChainState>>>,
     secret_copy: &str,
     history: Arc<History>,
@@ -195,6 +202,7 @@ async fn setup_chain_and_run_keeper(
     if let Some(keeper_private_key) = keeper_private_key_option {
         keeper::run_keeper_threads(
             keeper_private_key,
+            keeper_replica_config,
             chain_config,
             state,
             keeper_metrics.clone(),

+ 32 - 0
apps/fortuna/src/config.rs

@@ -94,6 +94,23 @@ impl Config {
             }
         }
 
+        if let Some(replica_config) = &config.keeper.replica_config {
+            if replica_config.total_replicas == 0 {
+                return Err(anyhow!("Keeper replica configuration is invalid. total_replicas must be greater than 0."));
+            }
+            if config.keeper.private_key.load()?.is_none() {
+                return Err(anyhow!(
+                    "Keeper replica configuration requires a keeper private key to be specified."
+                ));
+            }
+            if replica_config.replica_id >= replica_config.total_replicas {
+                return Err(anyhow!("Keeper replica configuration is invalid. replica_id must be less than total_replicas."));
+            }
+            if replica_config.backup_delay_seconds == 0 {
+                return Err(anyhow!("Keeper replica configuration is invalid. backup_delay_seconds must be greater than 0 to prevent race conditions."));
+            }
+        }
+
         Ok(config)
     }
 
@@ -333,6 +350,18 @@ fn default_chain_sample_interval() -> u64 {
     1
 }
 
+#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
+pub struct ReplicaConfig {
+    pub replica_id: u64,
+    pub total_replicas: u64,
+    #[serde(default = "default_backup_delay_seconds")]
+    pub backup_delay_seconds: u64,
+}
+
+fn default_backup_delay_seconds() -> u64 {
+    30
+}
+
 /// Configuration values for the keeper service that are shared across chains.
 #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
 pub struct KeeperConfig {
@@ -342,6 +371,9 @@ pub struct KeeperConfig {
     /// This key *does not need to be a registered provider*. In particular, production deployments
     /// should ensure this is a different key in order to reduce the severity of security breaches.
     pub private_key: SecretString,
+
+    #[serde(default)]
+    pub replica_config: Option<ReplicaConfig>,
 }
 
 // A secret is a string that can be provided either as a literal in the config,

+ 5 - 3
apps/fortuna/src/keeper.rs

@@ -2,7 +2,7 @@ use {
     crate::{
         api::{BlockchainState, ChainId},
         chain::ethereum::{InstrumentedPythContract, InstrumentedSignablePythContract},
-        config::EthereumConfig,
+        config::{EthereumConfig, ReplicaConfig},
         eth_utils::traced_client::RpcMetrics,
         history::History,
         keeper::{
@@ -56,7 +56,8 @@ pub enum RequestState {
 /// handle any events for the new blocks.
 #[tracing::instrument(name = "keeper", skip_all, fields(chain_id = chain_state.id))]
 pub async fn run_keeper_threads(
-    private_key: String,
+    keeper_private_key: String,
+    keeper_replica_config: Option<ReplicaConfig>,
     chain_eth_config: EthereumConfig,
     chain_state: BlockchainState,
     metrics: Arc<KeeperMetrics>,
@@ -69,7 +70,7 @@ pub async fn run_keeper_threads(
 
     let contract = Arc::new(InstrumentedSignablePythContract::from_config(
         &chain_eth_config,
-        &private_key,
+        &keeper_private_key,
         chain_state.id.clone(),
         rpc_metrics.clone(),
         chain_state.network_id,
@@ -85,6 +86,7 @@ pub async fn run_keeper_threads(
         contract: contract.clone(),
         gas_limit,
         escalation_policy: chain_eth_config.escalation_policy.to_policy(),
+        replica_config: keeper_replica_config,
         metrics: metrics.clone(),
         fulfilled_requests_cache,
         history,

+ 2 - 0
apps/fortuna/src/keeper/block.rs

@@ -2,6 +2,7 @@ use {
     crate::{
         api::BlockchainState,
         chain::{ethereum::InstrumentedSignablePythContract, reader::BlockNumber},
+        config::ReplicaConfig,
         eth_utils::utils::EscalationPolicy,
         history::History,
         keeper::{
@@ -45,6 +46,7 @@ pub struct ProcessParams {
     pub gas_limit: U256,
     pub escalation_policy: EscalationPolicy,
     pub chain_state: BlockchainState,
+    pub replica_config: Option<ReplicaConfig>,
     pub metrics: Arc<KeeperMetrics>,
     pub history: Arc<History>,
     pub fulfilled_requests_cache: Arc<RwLock<HashSet<u64>>>,

+ 49 - 0
apps/fortuna/src/keeper/process_event.rs

@@ -35,6 +35,55 @@ pub async fn process_event_with_backoff(
         return Ok(());
     }
 
+    // If replica config is present, we're running with multiple instances.
+    // The incoming request is assigned by modulo operation on the sequence number
+    // and the total number of replicas. If our replica_id is the primary for this sequence number,
+    // we process the request directly. If our replica_id is a backup, we wait for the delay and
+    // then check if the request is still open. If it is, we process it as a backup replica.
+    if let Some(replica_config) = &process_param.replica_config {
+        let assigned_replica = event.sequence_number % replica_config.total_replicas;
+        let is_primary_replica = assigned_replica == replica_config.replica_id;
+
+        if is_primary_replica {
+            tracing::debug!("Processing request as primary replica");
+        } else {
+            tracing::debug!("Processing request as backup replica");
+
+            tracing::info!("Waiting before processing as backup replica");
+            tokio::time::sleep(tokio::time::Duration::from_secs(
+                replica_config.backup_delay_seconds,
+            ))
+            .await;
+
+            // Check if the request is still open after the delay.
+            // If it is, we will process it as a backup replica.
+            match chain_state
+                .contract
+                .get_request(event.provider_address, event.sequence_number)
+                .await
+            {
+                Ok(Some(_)) => {
+                    tracing::info!(
+                        delay_seconds = replica_config.backup_delay_seconds,
+                        "Request still open after delay, processing as backup replica"
+                    );
+                }
+                Ok(None) => {
+                    tracing::debug!(
+                        "Request already fulfilled by primary replica during delay, skipping"
+                    );
+                    return Ok(());
+                }
+                Err(e) => {
+                    tracing::warn!(
+                        error = ?e,
+                        "Error checking request status after delay, processing as backup replica"
+                    );
+                }
+            }
+        }
+    }
+
     let account_label = AccountLabel {
         chain_id: chain_state.id.clone(),
         address: chain_state.provider_address.to_string(),

+ 6 - 2
contract_manager/scripts/load_test_entropy.ts

@@ -9,7 +9,7 @@ const parser = yargs(hideBin(process.argv))
   .usage(
     "Load tests the entropy contract using the EntropyTester contract with many requests in a single transaction\n" +
       "it does not monitor whether the callbacks are actually submitted or not.\n" +
-      "Usage: $0 --private-key <private-key> --chain <chain-id> --tester-address <tester-address>",
+      "Usage: $0 --private-key <private-key> --chain <chain-id> --tester-address <tester-address> --provider-address <provider-address>",
   )
   .options({
     chain: {
@@ -22,6 +22,10 @@ const parser = yargs(hideBin(process.argv))
       demandOption: true,
       desc: "Address of the EntropyTester contract",
     },
+    provider: {
+      type: "string",
+      desc: "Address of the entropy provider to use for requests (defaults to default provider)",
+    },
     "success-count": {
       type: "number",
       default: 100,
@@ -66,7 +70,7 @@ async function main() {
   const privateKey = toPrivateKey(argv.privateKey);
   const chain = DefaultStore.getChainOrThrow(argv.chain, EvmChain);
   const contract = findEntropyContract(chain);
-  const provider = await contract.getDefaultProvider();
+  const provider = argv.provider || (await contract.getDefaultProvider());
   const fee = await contract.getFee(provider);
   const web3 = contract.chain.getWeb3();
   const testerContract = new web3.eth.Contract(ABI, argv.testerAddress);