Quellcode durchsuchen

feat(fortuna): Use v2 events, add new fields to History API for Explorer (#2845)

* use AnyPool for generic SQL cxn, add postgres migrations

* migrations

* fix sqlite issues

* fix sqlite issues

* use requested V2 event

* merge

* feat(fortuna): use requestV2 event, parse revealed event from tx logs, add gas and callback fields to history API

* fix(fortuna): validate request v2 seq_num

* chore(fortuna): better name
Tejas Badadare vor 3 Monaten
Ursprung
Commit
5a1c980350

+ 1 - 1
Cargo.lock

@@ -3071,7 +3071,7 @@ dependencies = [
 
 [[package]]
 name = "fortuna"
-version = "8.2.6"
+version = "9.0.0"
 dependencies = [
  "anyhow",
  "axum 0.6.20",

+ 1 - 1
apps/fortuna/Cargo.toml

@@ -1,6 +1,6 @@
 [package]
 name = "fortuna"
-version = "8.2.6"
+version = "9.0.0"
 edition = "2021"
 
 [lib]

+ 4 - 0
apps/fortuna/migrations/20250801232728_add_callback_fields.down.sql

@@ -0,0 +1,4 @@
+-- Remove callback_failed, callback_return_value, and callback_gas_used from the requests table.
+ALTER TABLE request DROP COLUMN callback_failed;
+ALTER TABLE request DROP COLUMN callback_return_value;
+ALTER TABLE request DROP COLUMN callback_gas_used;

+ 4 - 0
apps/fortuna/migrations/20250801232728_add_callback_fields.up.sql

@@ -0,0 +1,4 @@
+-- Add callback_failed, callback_return_value, and callback_gas_used to the requests table.
+ALTER TABLE request ADD COLUMN callback_failed INTEGER;
+ALTER TABLE request ADD COLUMN callback_return_value VARCHAR;
+ALTER TABLE request ADD COLUMN callback_gas_used VARCHAR(100);

+ 10 - 21
apps/fortuna/src/chain/ethereum.rs

@@ -3,10 +3,7 @@
 use {
     crate::{
         api::ChainId,
-        chain::reader::{
-            self, BlockNumber, BlockStatus, EntropyReader, EntropyRequestInfo,
-            RequestedWithCallbackEvent,
-        },
+        chain::reader::{self, BlockNumber, BlockStatus, EntropyReader, RequestedV2Event},
         config::EthereumConfig,
         eth_utils::{
             eth_gas_oracle::EthProviderOracle,
@@ -283,6 +280,7 @@ impl<T: JsonRpcClient + 'static> EntropyReader for PythRandom<Provider<T>> {
                 block_number: request.block_number,
                 use_blockhash: request.use_blockhash,
                 callback_status: reader::RequestCallbackStatus::try_from(request.callback_status)?,
+                gas_limit_10k: request.gas_limit_1_0k,
             }))
         }
     }
@@ -306,8 +304,8 @@ impl<T: JsonRpcClient + 'static> EntropyReader for PythRandom<Provider<T>> {
         from_block: BlockNumber,
         to_block: BlockNumber,
         provider: Address,
-    ) -> Result<Vec<RequestedWithCallbackEvent>> {
-        let mut event = self.requested_with_callback_filter();
+    ) -> Result<Vec<RequestedV2Event>> {
+        let mut event = self.requested_2_filter();
         event.filter = event
             .filter
             .address(self.address())
@@ -315,24 +313,15 @@ impl<T: JsonRpcClient + 'static> EntropyReader for PythRandom<Provider<T>> {
             .to_block(to_block)
             .topic1(provider);
 
-        let res: Vec<(RequestedWithCallbackFilter, LogMeta)> = event.query_with_meta().await?;
+        let res: Vec<(Requested2Filter, LogMeta)> = event.query_with_meta().await?;
         Ok(res
             .into_iter()
-            .map(|(r, meta)| RequestedWithCallbackEvent {
+            .map(|(r, meta)| RequestedV2Event {
                 sequence_number: r.sequence_number,
-                user_random_number: r.user_random_number,
-                provider_address: r.request.provider,
-                requestor: r.requestor,
-                request: EntropyRequestInfo {
-                    provider: r.request.provider,
-                    sequence_number: r.request.sequence_number,
-                    num_hashes: r.request.num_hashes,
-                    commitment: r.request.commitment,
-                    block_number: r.request.block_number,
-                    requester: r.request.requester,
-                    use_blockhash: r.request.use_blockhash,
-                    is_request_with_callback: r.request.is_request_with_callback,
-                },
+                user_random_number: r.user_contribution,
+                provider_address: r.provider,
+                sender: r.caller,
+                gas_limit: r.gas_limit,
                 log_meta: meta,
             })
             .filter(|r| r.provider_address == provider)

+ 9 - 5
apps/fortuna/src/chain/reader.rs

@@ -45,12 +45,12 @@ pub struct EntropyRequestInfo {
 }
 
 #[derive(Clone)]
-pub struct RequestedWithCallbackEvent {
+pub struct RequestedV2Event {
     pub sequence_number: u64,
     pub user_random_number: [u8; 32],
     pub provider_address: Address,
-    pub requestor: Address,
-    pub request: EntropyRequestInfo,
+    pub sender: Address,
+    pub gas_limit: u32,
     pub log_meta: LogMeta,
 }
 
@@ -73,7 +73,7 @@ pub trait EntropyReader: Send + Sync {
         from_block: BlockNumber,
         to_block: BlockNumber,
         provider: Address,
-    ) -> Result<Vec<RequestedWithCallbackEvent>>;
+    ) -> Result<Vec<RequestedV2Event>>;
 
     /// Estimate the gas required to reveal a random number with a callback.
     async fn estimate_reveal_with_callback_gas(
@@ -97,6 +97,8 @@ pub struct Request {
     pub block_number: BlockNumber,
     pub use_blockhash: bool,
     pub callback_status: RequestCallbackStatus,
+    /// The gas limit for the request, in 10k gas units. (i.e., 2 = 20k gas).
+    pub gas_limit_10k: u16,
 }
 
 /// Status values for Request.callback_status
@@ -169,6 +171,7 @@ pub mod mock {
                             block_number: b,
                             use_blockhash: u,
                             callback_status: RequestCallbackStatus::CallbackNotNecessary,
+                            gas_limit_10k: 0,
                         })
                         .collect(),
                 ),
@@ -189,6 +192,7 @@ pub mod mock {
                 block_number,
                 use_blockhash,
                 callback_status: RequestCallbackStatus::CallbackNotNecessary,
+                gas_limit_10k: 0,
             });
             self
         }
@@ -227,7 +231,7 @@ pub mod mock {
             _from_block: BlockNumber,
             _to_block: BlockNumber,
             _provider: Address,
-        ) -> Result<Vec<super::RequestedWithCallbackEvent>> {
+        ) -> Result<Vec<super::RequestedV2Event>> {
             Ok(vec![])
         }
 

+ 21 - 4
apps/fortuna/src/eth_utils/utils.rs

@@ -1,12 +1,13 @@
 use {
     crate::{
-        chain::ethereum::InstrumentedSignablePythContract, eth_utils::nonce_manager::NonceManaged,
+        chain::ethereum::{InstrumentedSignablePythContract, PythRandomEvents, Revealed2Filter},
+        eth_utils::nonce_manager::NonceManaged,
     },
     anyhow::{anyhow, Result},
     backoff::ExponentialBackoff,
     ethabi::ethereum_types::U64,
     ethers::{
-        contract::{ContractCall, ContractError},
+        contract::{ContractCall, ContractError, EthLogDecode},
         middleware::Middleware,
         providers::{MiddlewareError, ProviderError},
         signers::Signer,
@@ -30,6 +31,7 @@ pub struct SubmitTxResult {
     pub fee_multiplier: u64,
     pub duration: Duration,
     pub receipt: TransactionReceipt,
+    pub revealed_event: Revealed2Filter,
 }
 
 #[derive(Clone, Debug)]
@@ -157,7 +159,7 @@ pub async fn submit_tx_with_backoff<T: Middleware + NonceManaged + 'static>(
 
     let num_retries = Arc::new(AtomicU64::new(0));
 
-    let success = backoff::future::retry_notify(
+    let receipt = backoff::future::retry_notify(
         backoff,
         || async {
             let num_retries = num_retries.load(std::sync::atomic::Ordering::Relaxed);
@@ -186,11 +188,26 @@ pub async fn submit_tx_with_backoff<T: Middleware + NonceManaged + 'static>(
     let duration = start_time.elapsed();
     let num_retries = num_retries.load(std::sync::atomic::Ordering::Relaxed);
 
+    let revealed_event: Revealed2Filter = {
+        let mut found_event = None;
+        for log in &receipt.logs {
+            if let Ok(PythRandomEvents::Revealed2Filter(decoded_event)) =
+                PythRandomEvents::decode_log(&log.clone().into())
+            {
+                found_event = Some(decoded_event);
+                break;
+            }
+        }
+        // A successful reveal will always emit a Revealed v2 event, so theoretically we should never get here.
+        found_event.ok_or_else(|| SubmitTxError::ReceiptError(call.tx.clone(), receipt.clone()))?
+    };
+
     Ok(SubmitTxResult {
         num_retries,
         fee_multiplier: escalation_policy.get_fee_multiplier_pct(num_retries),
         duration,
-        receipt: success,
+        receipt,
+        revealed_event,
     })
 }
 

+ 50 - 6
apps/fortuna/src/history.rs

@@ -5,7 +5,7 @@ use {
     ethers::{
         core::utils::hex::ToHex,
         prelude::TxHash,
-        types::{Address, U256},
+        types::{Address, Bytes, U256},
         utils::keccak256,
     },
     serde::Serialize,
@@ -45,6 +45,17 @@ pub enum RequestEntryState {
         #[schema(example = "a905ab56567d31a7fda38ed819d97bc257f3ebe385fc5c72ce226d3bb855f0fe")]
         #[serde_as(as = "serde_with::hex::Hex")]
         combined_random_number: [u8; 32],
+        /// Whether the callback to the caller failed.
+        callback_failed: bool,
+        /// Return value from the callback. If the callback failed, this field contains
+        /// the error code and any additional returned data. Note that "" often indicates an out-of-gas error.
+        /// If the callback returns more than 256 bytes, only the first 256 bytes of the callback return value are included.
+        /// NOTE: This field is the raw bytes returned from the callback, not hex-decoded. The client should decode it as needed.
+        callback_return_value: Bytes,
+        /// How much gas the callback used.
+        #[schema(example = "567890", value_type = String)]
+        #[serde(with = "crate::serde::u32")]
+        callback_gas_used: u32,
     },
     Failed {
         reason: String,
@@ -78,8 +89,7 @@ pub struct RequestStatus {
     /// Gas limit for the callback in the smallest unit of the chain.
     /// For example, if the native currency is ETH, this will be in wei.
     #[schema(example = "500000", value_type = String)]
-    #[serde(with = "crate::serde::u256")]
-    pub gas_limit: U256,
+    pub gas_limit: u32,
     /// The user contribution to the random number.
     #[schema(example = "a905ab56567d31a7fda38ed819d97bc257f3ebe385fc5c72ce226d3bb855f0fe")]
     #[serde_as(as = "serde_with::hex::Hex")]
@@ -121,6 +131,9 @@ struct RequestRow {
     provider_random_number: Option<String>,
     gas_used: Option<String>,
     info: Option<String>,
+    callback_failed: Option<i64>,
+    callback_return_value: Option<String>,
+    callback_gas_used: Option<String>,
 }
 
 impl TryFrom<RequestRow> for RequestStatus {
@@ -139,7 +152,9 @@ impl TryFrom<RequestRow> for RequestStatus {
         let user_random_number = hex::FromHex::from_hex(row.user_random_number)?;
         let request_tx_hash = row.request_tx_hash.parse()?;
         let sender = row.sender.parse()?;
-        let gas_limit = U256::from_dec_str(&row.gas_limit)
+        let gas_limit = row
+            .gas_limit
+            .parse::<u32>()
             .map_err(|_| anyhow::anyhow!("Failed to parse gas limit"))?;
 
         let state = match row.state.as_str() {
@@ -173,6 +188,18 @@ impl TryFrom<RequestRow> for RequestStatus {
                         &user_random_number,
                         &provider_random_number,
                     ),
+                    //  Sqlx::Any doesn't support boolean types, so we need to convert from integer
+                    //  https://github.com/launchbadge/sqlx/issues/2778
+                    callback_failed: row.callback_failed.unwrap_or(0) == 1,
+                    callback_return_value: row
+                        .callback_return_value
+                        .map(|s| s.parse::<Bytes>().unwrap_or_default())
+                        .unwrap_or_default(),
+                    callback_gas_used: row
+                        .callback_gas_used
+                        .unwrap_or_default()
+                        .parse::<u32>()
+                        .map_err(|_| anyhow::anyhow!("Failed to parse callback_gas_used"))?,
                 }
             }
             "Failed" => RequestEntryState::Failed {
@@ -312,18 +339,29 @@ impl History {
                 provider_random_number,
                 gas_used,
                 combined_random_number: _,
+                callback_failed,
+                callback_return_value,
+                callback_gas_used,
             } => {
                 let reveal_block_number = reveal_block_number as i64;
                 let reveal_tx_hash: String = reveal_tx_hash.encode_hex();
                 let provider_random_number: String = provider_random_number.encode_hex();
                 let gas_used: String = gas_used.to_string();
-                let result = sqlx::query("UPDATE request SET state = $1, last_updated_at = $2, reveal_block_number = $3, reveal_tx_hash = $4, provider_random_number = $5, gas_used = $6 WHERE network_id = $7 AND sequence = $8 AND provider = $9 AND request_tx_hash = $10")
+                //  Sqlx::Any doesn't support boolean types, so we need to convert to integer
+                //  https://github.com/launchbadge/sqlx/issues/2778
+                let callback_failed: i64 = if callback_failed { 1 } else { 0 };
+                let callback_return_value: String = callback_return_value.encode_hex();
+                let callback_gas_used: String = callback_gas_used.to_string();
+                let result = sqlx::query("UPDATE request SET state = $1, last_updated_at = $2, reveal_block_number = $3, reveal_tx_hash = $4, provider_random_number = $5, gas_used = $6, callback_failed = $7, callback_return_value = $8, callback_gas_used = $9 WHERE network_id = $10 AND sequence = $11 AND provider = $12 AND request_tx_hash = $13")
                     .bind("Completed")
                     .bind(new_status.last_updated_at.timestamp())
                     .bind(reveal_block_number)
                     .bind(reveal_tx_hash)
                     .bind(provider_random_number)
                     .bind(gas_used)
+                    .bind(callback_failed)
+                    .bind(callback_return_value)
+                    .bind(callback_gas_used)
                     .bind(network_id)
                     .bind(sequence)
                     .bind(provider.clone())
@@ -646,7 +684,7 @@ mod test {
             user_random_number: [20; 32],
             sender: Address::random(),
             state: RequestEntryState::Pending,
-            gas_limit: U256::from(500_000),
+            gas_limit: 500_000,
         }
     }
 
@@ -665,6 +703,9 @@ mod test {
                 &status.user_random_number,
                 &[40; 32],
             ),
+            callback_failed: false,
+            callback_return_value: Default::default(),
+            callback_gas_used: 100_000,
         };
         History::update_request_status(&history.pool, status.clone()).await;
 
@@ -876,6 +917,9 @@ mod test {
                 &status.user_random_number,
                 &[40; 32],
             ),
+            callback_failed: false,
+            callback_return_value: Default::default(),
+            callback_gas_used: 0,
         };
         History::update_request_status(&history.pool, status.clone()).await;
         let mut failed_status = status.clone();

+ 8 - 5
apps/fortuna/src/keeper/process_event.rs

@@ -3,14 +3,14 @@ use {
     crate::{
         chain::{
             ethereum::PythRandomErrorsErrors,
-            reader::{RequestCallbackStatus, RequestedWithCallbackEvent},
+            reader::{RequestCallbackStatus, RequestedV2Event},
         },
         eth_utils::utils::{submit_tx_with_backoff, SubmitTxError},
         history::{RequestEntryState, RequestStatus},
         keeper::block::ProcessParams,
     },
     anyhow::{anyhow, Result},
-    ethers::{abi::AbiDecode, contract::ContractError, types::U256},
+    ethers::{abi::AbiDecode, contract::ContractError},
     std::time::Duration,
     tracing,
 };
@@ -20,7 +20,7 @@ use {
     sequence_number = event.sequence_number
 ))]
 pub async fn process_event_with_backoff(
-    event: RequestedWithCallbackEvent,
+    event: RequestedV2Event,
     process_param: ProcessParams,
 ) -> Result<()> {
     let ProcessParams {
@@ -110,10 +110,10 @@ pub async fn process_event_with_backoff(
         last_updated_at: chrono::Utc::now(),
         request_block_number: event.log_meta.block_number.as_u64(),
         request_tx_hash: event.log_meta.transaction_hash,
-        sender: event.requestor,
+        sender: event.sender,
         user_random_number: event.user_random_number,
         state: RequestEntryState::Pending,
-        gas_limit: U256::from(0), // FIXME(Tejas): set this properly
+        gas_limit: event.gas_limit,
     };
     history.add(&status);
 
@@ -192,6 +192,9 @@ pub async fn process_event_with_backoff(
                     &event.user_random_number,
                     &provider_revelation,
                 ),
+                callback_failed: result.revealed_event.callback_failed,
+                callback_return_value: result.revealed_event.callback_return_value,
+                callback_gas_used: result.revealed_event.callback_gas_used,
             };
             history.add(&status);
             tracing::info!(

+ 20 - 0
apps/fortuna/src/serde.rs

@@ -19,3 +19,23 @@ pub mod u256 {
         U256::from_dec_str(s.as_str()).map_err(|err| D::Error::custom(err.to_string()))
     }
 }
+
+pub mod u32 {
+    use serde::{de::Error, Deserialize, Deserializer, Serializer};
+
+    pub fn serialize<S>(n: &u32, s: S) -> Result<S::Ok, S::Error>
+    where
+        S: Serializer,
+    {
+        s.serialize_str(&n.to_string())
+    }
+
+    pub fn deserialize<'de, D>(d: D) -> Result<u32, D::Error>
+    where
+        D: Deserializer<'de>,
+    {
+        let s: String = Deserialize::deserialize(d)?;
+        s.parse::<u32>()
+            .map_err(|err| D::Error::custom(err.to_string()))
+    }
+}