瀏覽代碼

traced client implementation

0xfirefist 1 年之前
父節點
當前提交
ef5924b201

+ 1 - 0
apps/fortuna/src/chain.rs

@@ -1,2 +1,3 @@
 pub(crate) mod ethereum;
 pub(crate) mod reader;
+pub(crate) mod traced_client;

+ 20 - 7
apps/fortuna/src/chain/ethereum.rs

@@ -1,5 +1,7 @@
 use {
+    super::traced_client::TracedClient,
     crate::{
+        api::ChainId,
         chain::reader::{
             self,
             BlockNumber,
@@ -34,7 +36,6 @@ use {
         },
         prelude::TransactionRequest,
         providers::{
-            Http,
             Middleware,
             Provider,
         },
@@ -48,11 +49,13 @@ use {
             U256,
         },
     },
+    prometheus_client::registry::Registry,
     sha3::{
         Digest,
         Keccak256,
     },
     std::sync::Arc,
+    tokio::sync::RwLock,
 };
 
 // TODO: Programmatically generate this so we don't have to keep committed ABI in sync with the
@@ -64,11 +67,11 @@ abigen!(
 
 pub type SignablePythContract = PythRandom<
     TransformerMiddleware<
-        NonceManagerMiddleware<SignerMiddleware<Provider<Http>, LocalWallet>>,
+        NonceManagerMiddleware<SignerMiddleware<Provider<TracedClient>, LocalWallet>>,
         LegacyTxTransformer,
     >,
 >;
-pub type PythContract = PythRandom<Provider<Http>>;
+pub type PythContract = PythRandom<Provider<TracedClient>>;
 
 /// Transformer that converts a transaction into a legacy transaction if use_legacy_tx is true.
 #[derive(Clone, Debug)]
@@ -90,10 +93,14 @@ impl Transformer for LegacyTxTransformer {
 
 impl SignablePythContract {
     pub async fn from_config(
+        chain_id: ChainId,
         chain_config: &EthereumConfig,
         private_key: &str,
+        metrics_registry: Arc<RwLock<Registry>>,
     ) -> Result<SignablePythContract> {
-        let provider = Provider::<Http>::try_from(&chain_config.geth_rpc_addr)?;
+        let provider =
+            TracedClient::new_provider(chain_id, &chain_config.geth_rpc_addr, metrics_registry)
+                .await?;
         let chain_id = provider.get_chainid().await?;
 
         let transformer = LegacyTxTransformer {
@@ -184,8 +191,14 @@ impl SignablePythContract {
 }
 
 impl PythContract {
-    pub fn from_config(chain_config: &EthereumConfig) -> Result<PythContract> {
-        let provider = Provider::<Http>::try_from(&chain_config.geth_rpc_addr)?;
+    pub async fn from_config(
+        chain_id: ChainId,
+        chain_config: &EthereumConfig,
+        metrics_registry: Arc<RwLock<Registry>>,
+    ) -> Result<PythContract> {
+        let provider =
+            TracedClient::new_provider(chain_id, &chain_config.geth_rpc_addr, metrics_registry)
+                .await?;
 
         Ok(PythRandom::new(
             chain_config.contract_addr,
@@ -262,7 +275,7 @@ impl EntropyReader for PythContract {
         user_random_number: [u8; 32],
         provider_revelation: [u8; 32],
     ) -> Result<Option<U256>> {
-        let result: Result<U256, ContractError<Provider<Http>>> = self
+        let result: Result<U256, ContractError<Provider<TracedClient>>> = self
             .reveal_with_callback(
                 provider,
                 sequence_number,

+ 133 - 0
apps/fortuna/src/chain/traced_client.rs

@@ -0,0 +1,133 @@
+use {
+    crate::api::ChainId,
+    anyhow::Result,
+    axum::async_trait,
+    ethers::{
+        prelude::Http,
+        providers::{
+            HttpClientError,
+            JsonRpcClient,
+            Provider,
+        },
+    },
+    prometheus_client::{
+        encoding::EncodeLabelSet,
+        metrics::{
+            counter::Counter,
+            family::Family,
+            histogram::Histogram,
+        },
+        registry::Registry,
+    },
+    std::sync::Arc,
+    tokio::{
+        sync::RwLock,
+        time::Instant,
+    },
+};
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash, EncodeLabelSet)]
+pub struct ChainLabel {
+    chain_id: ChainId,
+}
+
+#[derive(Debug)]
+pub struct TracedClient {
+    inner: Http,
+
+    chain_id:                  ChainId,
+    rpc_requests_count:        Family<ChainLabel, Counter>,
+    rpc_requests_latency:      Family<ChainLabel, Histogram>,
+    rpc_requests_errors_count: Family<ChainLabel, Counter>,
+}
+
+#[async_trait]
+impl JsonRpcClient for TracedClient {
+    type Error = HttpClientError;
+
+    async fn request<
+        T: serde::Serialize + Send + Sync + std::fmt::Debug,
+        R: serde::de::DeserializeOwned + Send,
+    >(
+        &self,
+        method: &str,
+        params: T,
+    ) -> Result<R, HttpClientError> {
+        let start = Instant::now();
+        self.rpc_requests_count
+            .get_or_create(&ChainLabel {
+                chain_id: self.chain_id.clone(),
+            })
+            .inc();
+        let res = match self.inner.request(method, params).await {
+            Ok(result) => Ok(result),
+            Err(e) => {
+                self.rpc_requests_errors_count
+                    .get_or_create(&ChainLabel {
+                        chain_id: self.chain_id.clone(),
+                    })
+                    .inc();
+                Err(e)
+            }
+        };
+
+        let latency = start.elapsed().as_secs_f64();
+        println!(
+            "RPC request to {:?} took {:.2} seconds",
+            self.chain_id, latency
+        );
+        self.rpc_requests_latency
+            .get_or_create(&ChainLabel {
+                chain_id: self.chain_id.clone(),
+            })
+            .observe(latency);
+        res
+    }
+}
+
+impl TracedClient {
+    pub async fn new_provider(
+        chain_id: ChainId,
+        url: &str,
+        metrics_registry: Arc<RwLock<Registry>>,
+    ) -> Result<Provider<TracedClient>> {
+        let mut writable_registry = metrics_registry.write().await;
+
+        let rpc_requests_count = Family::default();
+        writable_registry.register(
+            "rpc_requests_count",
+            "The number of RPC requests made to the chain.",
+            rpc_requests_count.clone(),
+        );
+
+        let rpc_requests_latency = Family::<ChainLabel, Histogram>::new_with_constructor(|| {
+            Histogram::new(
+                [
+                    0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0,
+                ]
+                .into_iter(),
+            )
+        });
+        writable_registry.register(
+            "rpc_requests_latency",
+            "The latency of RPC requests to the chain.",
+            rpc_requests_latency.clone(),
+        );
+
+        let rpc_requests_errors_count = Family::default();
+        writable_registry.register(
+            "rpc_requests_errors_count",
+            "The number of RPC requests made to the chain that failed.",
+            rpc_requests_errors_count.clone(),
+        );
+
+        let url = url::Url::parse(url)?;
+        Ok(Provider::new(TracedClient {
+            inner: Http::new(url),
+            chain_id,
+            rpc_requests_count,
+            rpc_requests_latency,
+            rpc_requests_errors_count,
+        }))
+    }
+}

+ 6 - 6
apps/fortuna/src/command.rs

@@ -1,15 +1,15 @@
-mod generate;
-mod get_request;
+// mod generate;
+// mod get_request;
 mod register_provider;
-mod request_randomness;
+// mod request_randomness;
 mod run;
 mod setup_provider;
 
 pub use {
-    generate::generate,
-    get_request::get_request,
+    // generate::generate,
+    // get_request::get_request,
     register_provider::register_provider,
-    request_randomness::request_randomness,
+    // request_randomness::request_randomness,
     run::run,
     setup_provider::setup_provider,
 };

+ 11 - 2
apps/fortuna/src/command/register_provider.rs

@@ -16,7 +16,9 @@ use {
         },
         types::U256,
     },
+    prometheus_client::registry::Registry,
     std::sync::Arc,
+    tokio::sync::RwLock,
 };
 
 #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
@@ -31,8 +33,15 @@ pub async fn register_provider(opts: &RegisterProviderOptions) -> Result<()> {
     let chain_config = Config::load(&opts.config.config)?.get_chain_config(&opts.chain_id)?;
 
     // Initialize a Provider to interface with the EVM contract.
-    let contract =
-        Arc::new(SignablePythContract::from_config(&chain_config, &opts.private_key).await?);
+    let contract = Arc::new(
+        SignablePythContract::from_config(
+            opts.chain_id.clone(),
+            &chain_config,
+            &opts.private_key,
+            Arc::new(RwLock::new(Registry::default())),
+        )
+        .await?,
+    );
     // Create a new random hash chain.
     let random = rand::random::<[u8; 32]>();
     let secret = opts.randomness.load_secret()?;

+ 6 - 3
apps/fortuna/src/command/run.rs

@@ -132,10 +132,15 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
         .map(|path| ProviderConfig::load(&path).expect("Failed to load provider config"));
     let secret = opts.randomness.load_secret()?;
     let (tx_exit, rx_exit) = watch::channel(false);
+    let metrics_registry = Arc::new(RwLock::new(Registry::default()));
 
     let mut chains: HashMap<ChainId, BlockchainState> = HashMap::new();
     for (chain_id, chain_config) in &config.chains {
-        let contract = Arc::new(PythContract::from_config(&chain_config)?);
+        let contract = Arc::new(
+            PythContract::from_config(chain_id.clone(), &chain_config, metrics_registry.clone())
+                .await
+                .unwrap(),
+        );
         let provider_chain_config = provider_config
             .as_ref()
             .and_then(|c| c.get_chain_config(chain_id));
@@ -221,8 +226,6 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
         Ok::<(), Error>(())
     });
 
-    let metrics_registry = Arc::new(RwLock::new(Registry::default()));
-
     if let Some(keeper_private_key) = opts.load_keeper_private_key()? {
         spawn(run_keeper(
             chains.clone(),

+ 15 - 6
apps/fortuna/src/command/setup_provider.rs

@@ -2,9 +2,9 @@ use {
     crate::{
         api::get_register_uri,
         chain::ethereum::SignablePythContract,
-        command::{
-            register_provider,
-            register_provider::CommitmentMetadata,
+        command::register_provider::{
+            self,
+            CommitmentMetadata,
         },
         config::{
             Config,
@@ -28,7 +28,9 @@ use {
         },
         types::Bytes,
     },
+    prometheus_client::registry::Registry,
     std::sync::Arc,
+    tokio::sync::RwLock,
 };
 
 /// Setup provider for all the chains.
@@ -45,8 +47,15 @@ pub async fn setup_provider(opts: &SetupProviderOptions) -> Result<()> {
 
     for (chain_id, chain_config) in &config.chains {
         // Initialize a Provider to interface with the EVM contract.
-        let contract =
-            Arc::new(SignablePythContract::from_config(&chain_config, &private_key).await?);
+        let contract = Arc::new(
+            SignablePythContract::from_config(
+                chain_id.clone(),
+                &chain_config,
+                &private_key,
+                Arc::new(RwLock::new(Registry::default())),
+            )
+            .await?,
+        );
 
         tracing::info!("{}: fetching provider info", chain_id);
         let provider_info = contract.get_provider_info(provider_address).call().await?;
@@ -107,7 +116,7 @@ pub async fn setup_provider(opts: &SetupProviderOptions) -> Result<()> {
 
         if register {
             tracing::info!("{}: registering", &chain_id);
-            register_provider(&RegisterProviderOptions {
+            register_provider::register_provider(&RegisterProviderOptions {
                 config: opts.config.clone(),
                 chain_id: chain_id.clone(),
                 private_key: private_key.clone(),

+ 6 - 7
apps/fortuna/src/config.rs

@@ -62,15 +62,14 @@ pub enum Options {
     /// Set up the provider for all the provided chains.
     /// It registers, re-registers, or updates provider config on chain.
     SetupProvider(SetupProviderOptions),
+    // / Request a random number from the contract.
+    // RequestRandomness(RequestRandomnessOptions),
 
-    /// Request a random number from the contract.
-    RequestRandomness(RequestRandomnessOptions),
+    // / Generate a random number by running the entire protocol end-to-end
+    // Generate(GenerateOptions),
 
-    /// Generate a random number by running the entire protocol end-to-end
-    Generate(GenerateOptions),
-
-    /// Get the status of a pending request for a random number.
-    GetRequest(GetRequestOptions),
+    // / Get the status of a pending request for a random number.
+    // GetRequest(GetRequestOptions),
 }
 
 #[derive(Args, Clone, Debug)]

+ 46 - 37
apps/fortuna/src/keeper.rs

@@ -3,6 +3,7 @@ use {
         api::{
             self,
             BlockchainState,
+            ChainId,
         },
         chain::{
             ethereum::{
@@ -13,6 +14,7 @@ use {
                 BlockNumber,
                 RequestedWithCallbackEvent,
             },
+            traced_client::TracedClient,
         },
         config::EthereumConfig,
     },
@@ -23,7 +25,6 @@ use {
     ethers::{
         contract::ContractError,
         providers::{
-            Http,
             Middleware,
             Provider,
             Ws,
@@ -95,6 +96,7 @@ pub struct ChainLabel {
 }
 
 pub struct KeeperMetrics {
+    pub metrics_registry:        Arc<RwLock<Registry>>,
     pub current_sequence_number: Family<AccountLabel, Gauge>,
     pub end_sequence_number:     Family<AccountLabel, Gauge>,
     pub balance:                 Family<AccountLabel, Gauge<f64, AtomicU64>>,
@@ -165,7 +167,10 @@ impl KeeperMetrics {
             block_timestamp_lag.clone(),
         );
 
+        drop(writable_registry);
+
         KeeperMetrics {
+            metrics_registry: registry,
             current_sequence_number,
             end_sequence_number,
             requests,
@@ -225,9 +230,14 @@ pub async fn run_keeper_threads(
     tracing::info!("latest safe block: {}", &latest_safe_block);
 
     let contract = Arc::new(
-        SignablePythContract::from_config(&chain_eth_config, &private_key)
-            .await
-            .expect("Chain config should be valid"),
+        SignablePythContract::from_config(
+            chain_state.id.clone(),
+            &chain_eth_config,
+            &private_key,
+            metrics.clone(),
+        )
+        .await
+        .expect("Chain config should be valid"),
     );
     let keeper_address = contract.client().inner().inner().signer().address();
 
@@ -720,20 +730,20 @@ pub async fn process_backlog(
 
 /// tracks the balance of the given address on the given chain periodically
 pub async fn track_balance(
-    chain_id: String,
+    chain_id: ChainId,
     chain_config: EthereumConfig,
     address: Address,
-    metrics_registry: Arc<KeeperMetrics>,
+    metrics: Arc<KeeperMetrics>,
 ) {
-    loop {
-        let provider = match Provider::<Http>::try_from(&chain_config.geth_rpc_addr) {
-            Ok(r) => r,
-            Err(_e) => {
-                time::sleep(RETRY_INTERVAL).await;
-                continue;
-            }
-        };
+    let provider = TracedClient::new_provider(
+        chain_id.clone(),
+        &chain_config.geth_rpc_addr,
+        metrics.metrics_registry.clone(),
+    )
+    .await
+    .unwrap();
 
+    loop {
         let balance = match provider.get_balance(address, None).await {
             // This conversion to u128 is fine as the total balance will never cross the limits
             // of u128 practically.
@@ -747,7 +757,7 @@ pub async fn track_balance(
         // The balance is in wei, so we need to divide by 1e18 to convert it to eth.
         let balance = balance as f64 / 1e18;
 
-        metrics_registry
+        metrics
             .balance
             .get_or_create(&AccountLabel {
                 chain_id: chain_id.clone(),
@@ -764,17 +774,16 @@ pub async fn track_provider(
     chain_id: String,
     chain_config: EthereumConfig,
     provider_address: Address,
-    metrics_registry: Arc<KeeperMetrics>,
+    metrics: Arc<KeeperMetrics>,
 ) {
+    let contract = PythContract::from_config(
+        chain_id.clone(),
+        &chain_config,
+        metrics.metrics_registry.clone(),
+    )
+    .await
+    .unwrap();
     loop {
-        let contract = match PythContract::from_config(&chain_config) {
-            Ok(r) => r,
-            Err(_e) => {
-                time::sleep(RETRY_INTERVAL).await;
-                continue;
-            }
-        };
-
         let provider_info = match contract.get_provider_info(provider_address).call().await {
             Ok(info) => info,
             Err(_e) => {
@@ -790,7 +799,7 @@ pub async fn track_provider(
         let current_sequence_number = provider_info.sequence_number;
         let end_sequence_number = provider_info.end_sequence_number;
 
-        metrics_registry
+        metrics
             .collected_fee
             .get_or_create(&AccountLabel {
                 chain_id: chain_id.clone(),
@@ -798,7 +807,7 @@ pub async fn track_provider(
             })
             .set(collected_fee);
 
-        metrics_registry
+        metrics
             .current_sequence_number
             .get_or_create(&AccountLabel {
                 chain_id: chain_id.clone(),
@@ -808,7 +817,7 @@ pub async fn track_provider(
             // a long time for it to cross the limits of i64.
             // currently prometheus only supports i64 for Gauge types
             .set(current_sequence_number as i64);
-        metrics_registry
+        metrics
             .end_sequence_number
             .get_or_create(&AccountLabel {
                 chain_id: chain_id.clone(),
@@ -823,17 +832,17 @@ pub async fn track_provider(
 pub async fn track_block_timestamp_lag(
     chain_id: String,
     chain_config: EthereumConfig,
-    metrics_registry: Arc<KeeperMetrics>,
+    metrics: Arc<KeeperMetrics>,
 ) {
-    loop {
-        let provider = match Provider::<Http>::try_from(&chain_config.geth_rpc_addr) {
-            Ok(r) => r,
-            Err(_e) => {
-                time::sleep(RETRY_INTERVAL).await;
-                continue;
-            }
-        };
+    let provider = TracedClient::new_provider(
+        chain_id.clone(),
+        &chain_config.geth_rpc_addr,
+        metrics.metrics_registry.clone(),
+    )
+    .await
+    .unwrap();
 
+    loop {
         match provider.get_block(EthereumBlockNumber::Latest).await {
             Ok(b) => {
                 if let Some(block) = b {
@@ -844,7 +853,7 @@ pub async fn track_block_timestamp_lag(
                         .as_secs();
                     let lag = server_timestamp - block_timestamp.as_u64();
 
-                    metrics_registry
+                    metrics
                         .block_timestamp_lag
                         .get_or_create(&ChainLabel {
                             chain_id: chain_id.clone(),

+ 3 - 3
apps/fortuna/src/main.rs

@@ -36,11 +36,11 @@ async fn main() -> Result<()> {
     )?;
 
     match config::Options::parse() {
-        config::Options::GetRequest(opts) => command::get_request(&opts).await,
-        config::Options::Generate(opts) => command::generate(&opts).await,
+        // config::Options::GetRequest(opts) => command::get_request(&opts).await,
+        // config::Options::Generate(opts) => command::generate(&opts).await,
         config::Options::Run(opts) => command::run(&opts).await,
         config::Options::RegisterProvider(opts) => command::register_provider(&opts).await,
         config::Options::SetupProvider(opts) => command::setup_provider(&opts).await,
-        config::Options::RequestRandomness(opts) => command::request_randomness(&opts).await,
+        // config::Options::RequestRandomness(opts) => command::request_randomness(&opts).await,
     }
 }