|
|
@@ -1,6 +1,6 @@
|
|
|
//! This module connects to the Pythnet RPC server and listens for accumulator
|
|
|
//! updates. It then sends the updates to the store module for processing and
|
|
|
-//! storage.
|
|
|
+//! storage. It also periodically fetches and stores the latest price feeds metadata.
|
|
|
|
|
|
use {
|
|
|
crate::{
|
|
|
@@ -8,6 +8,7 @@ use {
|
|
|
AccumulatorMessages,
|
|
|
Update,
|
|
|
},
|
|
|
+ api::types::PriceFeedMetadata,
|
|
|
config::RunOptions,
|
|
|
network::wormhole::{
|
|
|
update_guardian_set,
|
|
|
@@ -15,6 +16,10 @@ use {
|
|
|
GuardianSet,
|
|
|
GuardianSetData,
|
|
|
},
|
|
|
+ price_feeds_metadata::{
|
|
|
+ store_price_feeds_metadata,
|
|
|
+ DEFAULT_PRICE_FEEDS_CACHE_UPDATE_INTERVAL,
|
|
|
+ },
|
|
|
state::State,
|
|
|
},
|
|
|
anyhow::{
|
|
|
@@ -23,6 +28,11 @@ use {
|
|
|
},
|
|
|
borsh::BorshDeserialize,
|
|
|
futures::stream::StreamExt,
|
|
|
+ pyth_sdk::PriceIdentifier,
|
|
|
+ pyth_sdk_solana::state::{
|
|
|
+ load_mapping_account,
|
|
|
+ load_product_account,
|
|
|
+ },
|
|
|
solana_account_decoder::UiAccountEncoding,
|
|
|
solana_client::{
|
|
|
nonblocking::{
|
|
|
@@ -41,11 +51,13 @@ use {
|
|
|
},
|
|
|
solana_sdk::{
|
|
|
account::Account,
|
|
|
+ bs58,
|
|
|
commitment_config::CommitmentConfig,
|
|
|
pubkey::Pubkey,
|
|
|
system_program,
|
|
|
},
|
|
|
std::{
|
|
|
+ collections::BTreeMap,
|
|
|
sync::{
|
|
|
atomic::Ordering,
|
|
|
Arc,
|
|
|
@@ -136,11 +148,10 @@ pub async fn run(store: Arc<State>, pythnet_ws_endpoint: String) -> Result<()> {
|
|
|
encoding: Some(UiAccountEncoding::Base64Zstd),
|
|
|
..Default::default()
|
|
|
},
|
|
|
- filters: Some(vec![RpcFilterType::Memcmp(Memcmp {
|
|
|
- offset: 0,
|
|
|
- bytes: MemcmpEncodedBytes::Bytes(b"PAS1".to_vec()),
|
|
|
- encoding: None,
|
|
|
- })]),
|
|
|
+ filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new(
|
|
|
+ 0, // offset
|
|
|
+ MemcmpEncodedBytes::Bytes(b"PAS1".to_vec()), // bytes
|
|
|
+ ))]),
|
|
|
with_context: Some(true),
|
|
|
};
|
|
|
|
|
|
@@ -257,6 +268,9 @@ async fn fetch_existing_guardian_sets(
|
|
|
pub async fn spawn(opts: RunOptions, state: Arc<State>) -> Result<()> {
|
|
|
tracing::info!(endpoint = opts.pythnet.ws_addr, "Started Pythnet Listener.");
|
|
|
|
|
|
+ // Create RpcClient instance here
|
|
|
+ let rpc_client = RpcClient::new(opts.pythnet.http_addr.clone());
|
|
|
+
|
|
|
fetch_existing_guardian_sets(
|
|
|
state.clone(),
|
|
|
opts.pythnet.http_addr.clone(),
|
|
|
@@ -284,7 +298,7 @@ pub async fn spawn(opts: RunOptions, state: Arc<State>) -> Result<()> {
|
|
|
})
|
|
|
};
|
|
|
|
|
|
- let task_guadian_watcher = {
|
|
|
+ let task_guardian_watcher = {
|
|
|
let store = state.clone();
|
|
|
let pythnet_http_endpoint = opts.pythnet.http_addr.clone();
|
|
|
tokio::spawn(async move {
|
|
|
@@ -317,6 +331,117 @@ pub async fn spawn(opts: RunOptions, state: Arc<State>) -> Result<()> {
|
|
|
})
|
|
|
};
|
|
|
|
|
|
- let _ = tokio::join!(task_listener, task_guadian_watcher);
|
|
|
+
|
|
|
+ let task_price_feeds_metadata_updater = {
|
|
|
+ let price_feeds_state = state.clone();
|
|
|
+ tokio::spawn(async move {
|
|
|
+ while !crate::SHOULD_EXIT.load(Ordering::Acquire) {
|
|
|
+ if let Err(e) = fetch_and_store_price_feeds_metadata(
|
|
|
+ price_feeds_state.as_ref(),
|
|
|
+ &opts.pythnet.mapping_addr,
|
|
|
+ &rpc_client,
|
|
|
+ )
|
|
|
+ .await
|
|
|
+ {
|
|
|
+ tracing::error!("Error in fetching and storing price feeds metadata: {}", e);
|
|
|
+ }
|
|
|
+ // This loop with a sleep interval of 1 second allows the task to check for an exit signal at a
|
|
|
+ // fine-grained interval. Instead of sleeping directly for the entire `price_feeds_update_interval`,
|
|
|
+ // which could delay the response to an exit signal, this approach ensures the task can exit promptly
|
|
|
+ // if `crate::SHOULD_EXIT` is set, enhancing the responsiveness of the service to shutdown requests.
|
|
|
+ for _ in 0..DEFAULT_PRICE_FEEDS_CACHE_UPDATE_INTERVAL {
|
|
|
+ if crate::SHOULD_EXIT.load(Ordering::Acquire) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ tokio::time::sleep(Duration::from_secs(1)).await;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ })
|
|
|
+ };
|
|
|
+
|
|
|
+ let _ = tokio::join!(
|
|
|
+ task_listener,
|
|
|
+ task_guardian_watcher,
|
|
|
+ task_price_feeds_metadata_updater
|
|
|
+ );
|
|
|
Ok(())
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
+pub async fn fetch_and_store_price_feeds_metadata(
|
|
|
+ state: &State,
|
|
|
+ mapping_address: &Pubkey,
|
|
|
+ rpc_client: &RpcClient,
|
|
|
+) -> Result<Vec<PriceFeedMetadata>> {
|
|
|
+ let price_feeds_metadata = fetch_price_feeds_metadata(&mapping_address, &rpc_client).await?;
|
|
|
+ store_price_feeds_metadata(&state, &price_feeds_metadata).await?;
|
|
|
+ Ok(price_feeds_metadata)
|
|
|
+}
|
|
|
+
|
|
|
+async fn fetch_price_feeds_metadata(
|
|
|
+ mapping_address: &Pubkey,
|
|
|
+ rpc_client: &RpcClient,
|
|
|
+) -> Result<Vec<PriceFeedMetadata>> {
|
|
|
+ let mut price_feeds_metadata = Vec::<PriceFeedMetadata>::new();
|
|
|
+ let mapping_data = rpc_client.get_account_data(mapping_address).await?;
|
|
|
+ let mapping_acct = load_mapping_account(&mapping_data)?;
|
|
|
+
|
|
|
+ // Split product keys into chunks of 150 to avoid too many open files error (error trying to connect: tcp open error: Too many open files (os error 24))
|
|
|
+ for product_keys_chunk in mapping_acct
|
|
|
+ .products
|
|
|
+ .iter()
|
|
|
+ .filter(|&prod_pkey| *prod_pkey != Pubkey::default())
|
|
|
+ .collect::<Vec<_>>()
|
|
|
+ .chunks(150)
|
|
|
+ {
|
|
|
+ // Prepare a list of futures for fetching product account data for each chunk
|
|
|
+ let fetch_product_data_futures = product_keys_chunk
|
|
|
+ .iter()
|
|
|
+ .map(|prod_pkey| rpc_client.get_account_data(prod_pkey))
|
|
|
+ .collect::<Vec<_>>();
|
|
|
+
|
|
|
+ // Await all futures concurrently within the chunk
|
|
|
+ let products_data_results = futures::future::join_all(fetch_product_data_futures).await;
|
|
|
+
|
|
|
+ for prod_data_result in products_data_results {
|
|
|
+ match prod_data_result {
|
|
|
+ Ok(prod_data) => {
|
|
|
+ let prod_acct = match load_product_account(&prod_data) {
|
|
|
+ Ok(prod_acct) => prod_acct,
|
|
|
+ Err(e) => {
|
|
|
+ println!("Error loading product account: {}", e);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ // TODO: Add stricter type checking for attributes
|
|
|
+ let attributes = prod_acct
|
|
|
+ .iter()
|
|
|
+ .filter(|(key, _)| !key.is_empty())
|
|
|
+ .map(|(key, val)| (key.to_string(), val.to_string()))
|
|
|
+ .collect::<BTreeMap<String, String>>();
|
|
|
+
|
|
|
+ if prod_acct.px_acc != Pubkey::default() {
|
|
|
+ let px_pkey = prod_acct.px_acc;
|
|
|
+ let px_pkey_bytes = bs58::decode(&px_pkey.to_string()).into_vec()?;
|
|
|
+ let px_pkey_array: [u8; 32] = px_pkey_bytes
|
|
|
+ .try_into()
|
|
|
+ .expect("Invalid length for PriceIdentifier");
|
|
|
+
|
|
|
+ let price_feed_metadata = PriceFeedMetadata {
|
|
|
+ id: PriceIdentifier::new(px_pkey_array),
|
|
|
+ attributes,
|
|
|
+ };
|
|
|
+
|
|
|
+ price_feeds_metadata.push(price_feed_metadata);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Err(e) => {
|
|
|
+ println!("Error loading product account: {}", e);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Ok(price_feeds_metadata)
|
|
|
+}
|