|
|
@@ -10,6 +10,7 @@ use {
|
|
|
MessageType,
|
|
|
PriceFeedsWithUpdateData,
|
|
|
RequestTime,
|
|
|
+ Slot,
|
|
|
Update,
|
|
|
},
|
|
|
},
|
|
|
@@ -32,10 +33,7 @@ use {
|
|
|
derive_builder::Builder,
|
|
|
moka::future::Cache,
|
|
|
pyth_sdk::PriceIdentifier,
|
|
|
- std::{
|
|
|
- ops::Rem,
|
|
|
- time::Duration,
|
|
|
- },
|
|
|
+ std::time::Duration,
|
|
|
wormhole_sdk::Vaa,
|
|
|
};
|
|
|
|
|
|
@@ -43,8 +41,6 @@ pub mod proof;
|
|
|
pub mod storage;
|
|
|
pub mod types;
|
|
|
|
|
|
-pub type RingIndex = u32;
|
|
|
-
|
|
|
#[derive(Clone, PartialEq, Debug, Builder)]
|
|
|
#[builder(derive(Debug), pattern = "immutable")]
|
|
|
pub struct AccumulatorState {
|
|
|
@@ -55,7 +51,7 @@ pub struct AccumulatorState {
|
|
|
#[derive(Clone)]
|
|
|
pub struct Store {
|
|
|
pub storage: StorageInstance,
|
|
|
- pub pending_accumulations: Cache<RingIndex, AccumulatorStateBuilder>,
|
|
|
+ pub pending_accumulations: Cache<Slot, AccumulatorStateBuilder>,
|
|
|
}
|
|
|
|
|
|
impl Store {
|
|
|
@@ -73,7 +69,7 @@ impl Store {
|
|
|
|
|
|
/// Stores the update data in the store
|
|
|
pub async fn store_update(&self, update: Update) -> Result<()> {
|
|
|
- let ring_index = match update {
|
|
|
+ let slot = match update {
|
|
|
Update::Vaa(vaa_bytes) => {
|
|
|
let vaa = serde_wormhole::from_slice::<Vaa<Vec<u8>>>(&vaa_bytes)?;
|
|
|
let payload = WormholePayload::try_from_bytes(&vaa.payload, &vaa_bytes)?;
|
|
|
@@ -83,41 +79,42 @@ impl Store {
|
|
|
|
|
|
match payload {
|
|
|
WormholePayload::Merkle(proof) => {
|
|
|
- log::info!("Storing merkle proof for state index {:?}", proof);
|
|
|
+ log::info!(
|
|
|
+ "Storing merkle proof for slot {:?}: {:?}",
|
|
|
+ proof.slot,
|
|
|
+ proof
|
|
|
+ );
|
|
|
store_wormhole_merkle_verified_message(self, proof.clone()).await?;
|
|
|
- proof.state_index
|
|
|
+ proof.slot
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
Update::AccumulatorMessages(accumulator_messages) => {
|
|
|
// FIXME: Move this constant to a better place
|
|
|
- const RING_SIZE: u32 = 10_000;
|
|
|
- let ring_index = accumulator_messages.slot.rem(RING_SIZE as u64) as u32;
|
|
|
+
|
|
|
+ let slot = accumulator_messages.slot;
|
|
|
|
|
|
log::info!(
|
|
|
- "Storing accumulator messages for ring index {:?}: {:?}",
|
|
|
- ring_index,
|
|
|
+ "Storing accumulator messages for slot {:?}: {:?}",
|
|
|
+ slot,
|
|
|
accumulator_messages
|
|
|
);
|
|
|
|
|
|
let pending_acc = self
|
|
|
.pending_accumulations
|
|
|
- .entry(ring_index)
|
|
|
+ .entry(slot)
|
|
|
.or_default()
|
|
|
.await
|
|
|
.into_value();
|
|
|
self.pending_accumulations
|
|
|
- .insert(
|
|
|
- ring_index,
|
|
|
- pending_acc.accumulator_messages(accumulator_messages),
|
|
|
- )
|
|
|
+ .insert(slot, pending_acc.accumulator_messages(accumulator_messages))
|
|
|
.await;
|
|
|
|
|
|
- ring_index
|
|
|
+ slot
|
|
|
}
|
|
|
};
|
|
|
|
|
|
- let pending_state = self.pending_accumulations.get(&ring_index);
|
|
|
+ let pending_state = self.pending_accumulations.get(&slot);
|
|
|
let pending_state = match pending_state {
|
|
|
Some(pending_state) => pending_state,
|
|
|
// Due to some race conditions this might happen when it's processed before
|
|
|
@@ -159,7 +156,7 @@ impl Store {
|
|
|
|
|
|
self.storage.store_message_states(message_states)?;
|
|
|
|
|
|
- self.pending_accumulations.invalidate(&ring_index).await;
|
|
|
+ self.pending_accumulations.invalidate(&slot).await;
|
|
|
|
|
|
Ok(())
|
|
|
}
|