Sfoglia il codice sorgente

[accumulator-updater 5/x] Single PDA (#760)

* feat(accumulator-updater): write accumulator messages into one account

* chore(accumulator-updater): clean up commented out code

* chore(accumulator-updater): remove unused feature for cargo

* chore(accumulator-updater): minor comment fix

* feat(accumulator-updater): update implementation of AccumulatorInput header

Removed InputIndex in header, updated to use end_offsets & include header_len

* feat(accumulator-updater): add MessageHeader to all PriceMessages and include in serialization

* fix(accumulator-updater): fix AccumulatorInput size & unit tests

* feat(accumulator-updater): update put_all to write as much data as possible

put_all will write up to the limit of the accumulator_input.data.len(). this ix must not fail even
when the data being written exceeds the size of the account. in this situation, we will write as
many *COMPLETE* messages as possible

* docs(accumulator-updater): update put_all ix documentation

* chore(accumulator-updater): addressed PR feedback

fixed comments, added test, renamed accumulatorInput.data to messages

* fix(accumulator-updater): fix ts test
swimricky 2 anni fa
parent
commit
0ac771266c

+ 1 - 1
accumulator_updater/programs/accumulator_updater/Cargo.toml

@@ -16,6 +16,6 @@ cpi = ["no-entrypoint"]
 default = []
 
 [dependencies]
-anchor-lang = "0.27.0"
+anchor-lang = { version = "0.27.0" }
 # needed for the new #[account(zero_copy)] in anchor 0.27.0
 bytemuck = { version = "1.4.0", features = ["derive", "min_const_generics"]}

+ 49 - 75
accumulator_updater/programs/accumulator_updater/src/instructions/put_all.rs

@@ -12,99 +12,73 @@ use {
     },
 };
 
-#[derive(AnchorSerialize, AnchorDeserialize)]
-pub struct InputSchemaAndData {
-    pub schema: u8,
-    pub data:   Vec<u8>,
-}
 
+pub const ACCUMULATOR: &[u8; 11] = b"accumulator";
 
 pub fn put_all<'info>(
     ctx: Context<'_, '_, '_, 'info, PutAll<'info>>,
     base_account_key: Pubkey,
-    values: Vec<InputSchemaAndData>,
+    messages: Vec<Vec<u8>>,
 ) -> Result<()> {
     let cpi_caller = ctx.accounts.whitelist_verifier.is_allowed()?;
-    let account_infos = ctx.remaining_accounts;
-    require_eq!(account_infos.len(), values.len());
+    let accumulator_input_ai = ctx
+        .remaining_accounts
+        .first()
+        .ok_or(AccumulatorUpdaterError::AccumulatorInputNotProvided)?;
 
-    let rent = Rent::get()?;
-    let (mut initialized, mut updated) = (vec![], vec![]);
+    let loader;
 
-    for (
-        ai,
-        InputSchemaAndData {
-            schema: account_schema,
-            data: account_data,
-        },
-    ) in account_infos.iter().zip(values)
     {
-        let bump = if is_uninitialized_account(ai) {
-            let seeds = &[
+        let accumulator_input = &mut (if is_uninitialized_account(accumulator_input_ai) {
+            let (pda, bump) = Pubkey::find_program_address(
+                &[
+                    cpi_caller.as_ref(),
+                    ACCUMULATOR.as_ref(),
+                    base_account_key.as_ref(),
+                ],
+                &crate::ID,
+            );
+            require_keys_eq!(accumulator_input_ai.key(), pda);
+            let signer_seeds = &[
                 cpi_caller.as_ref(),
-                b"accumulator".as_ref(),
+                ACCUMULATOR.as_ref(),
                 base_account_key.as_ref(),
-                &account_schema.to_le_bytes(),
+                &[bump],
             ];
-            let (pda, bump) = Pubkey::find_program_address(seeds, &crate::ID);
-            require_keys_eq!(ai.key(), pda);
-
-
-            //TODO: Update this with serialization logic
-            // 8 for anchor discriminator
-            let accumulator_size = 8 + AccumulatorInput::size(&account_data);
             PutAll::create_account(
-                ai,
-                accumulator_size,
+                accumulator_input_ai,
+                8 + AccumulatorInput::INIT_SPACE,
                 &ctx.accounts.payer,
-                &[
-                    cpi_caller.as_ref(),
-                    b"accumulator".as_ref(),
-                    base_account_key.as_ref(),
-                    &account_schema.to_le_bytes(),
-                    &[bump],
-                ],
-                &rent,
+                signer_seeds,
                 &ctx.accounts.system_program,
             )?;
-            initialized.push(ai.key());
-
-            bump
-        } else {
-            let accumulator_input = <AccumulatorInput as AccountDeserialize>::try_deserialize(
-                &mut &**ai.try_borrow_mut_data()?,
+            loader = AccountLoader::<AccumulatorInput>::try_from_unchecked(
+                &crate::ID,
+                accumulator_input_ai,
             )?;
-            {
-                // TODO: allow re-sizing?
-                require_gte!(
-                    accumulator_input.data.len(),
-                    account_data.len(),
-                    AccumulatorUpdaterError::CurrentDataLengthExceeded
-                );
-
-                accumulator_input.validate(
-                    ai.key(),
-                    cpi_caller,
-                    base_account_key,
-                    account_schema,
-                )?;
-            }
-
+            let mut accumulator_input = loader.load_init()?;
+            accumulator_input.header = AccumulatorHeader::new(bump);
+            accumulator_input
+        } else {
+            loader = AccountLoader::<AccumulatorInput>::try_from(accumulator_input_ai)?;
+            let mut accumulator_input = loader.load_mut()?;
+            accumulator_input.header.set_version();
+            accumulator_input
+        });
+        // note: redundant for uninitialized code path but safer to check here.
+        // compute budget cost should be minimal
+        accumulator_input.validate(accumulator_input_ai.key(), cpi_caller, base_account_key)?;
+
+
+        let (num_msgs, num_bytes) = accumulator_input.put_all(&messages);
+        if num_msgs != messages.len() {
+            msg!("unable to fit all messages in accumulator input account. Wrote {}/{} messages and {} bytes", num_msgs, messages.len(), num_bytes);
+        }
+    }
 
-            updated.push(ai.key());
-            accumulator_input.header.bump
-        };
 
-        let accumulator_input =
-            AccumulatorInput::new(AccumulatorHeader::new(bump, account_schema), account_data);
-        accumulator_input.persist(ai)?;
-    }
+    loader.exit(&crate::ID)?;
 
-    msg!(
-        "[emit-updates]: initialized: {:?}, updated: {:?}",
-        initialized,
-        updated
-    );
     Ok(())
 }
 
@@ -113,12 +87,13 @@ pub fn is_uninitialized_account(ai: &AccountInfo) -> bool {
 }
 
 #[derive(Accounts)]
+#[instruction( base_account_key: Pubkey)]
 pub struct PutAll<'info> {
     #[account(mut)]
     pub payer:              Signer<'info>,
     pub whitelist_verifier: WhitelistVerifier<'info>,
     pub system_program:     Program<'info, System>,
-    // remaining_accounts:  - [AccumulatorInput PDAs]
+    // remaining_accounts:  - [AccumulatorInput PDA]
 }
 
 impl<'info> PutAll<'info> {
@@ -127,10 +102,9 @@ impl<'info> PutAll<'info> {
         space: usize,
         payer: &AccountInfo<'a>,
         seeds: &[&[u8]],
-        rent: &Rent,
         system_program: &AccountInfo<'a>,
     ) -> Result<()> {
-        let lamports = rent.minimum_balance(space);
+        let lamports = Rent::get()?.minimum_balance(space);
 
         system_program::create_account(
             CpiContext::new_with_signer(

+ 24 - 10
accumulator_updater/programs/accumulator_updater/src/lib.rs

@@ -52,24 +52,36 @@ pub mod accumulator_updater {
     }
 
 
-    /// Create or update inputs for the Accumulator. Each input is written
-    /// into a separate PDA account derived with
-    /// seeds = [cpi_caller, b"accumulator", base_account_key, schema]
+    /// Insert messages/inputs for the Accumulator. All inputs derived from the
+    /// `base_account_key` will go into the same PDA. The PDA is derived with
+    /// seeds = [cpi_caller, b"accumulator", base_account_key]
     ///
-    /// The caller of this instruction must pass those PDAs
-    /// while calling this function in the same order as elements.
     ///
     ///
     /// * `base_account_key`    - Pubkey of the original account the
-    ///                         AccumulatorInput(s) are derived from
-    /// * `values`              - Vec of (schema, account_data) in same respective
-    ///                           order `ctx.remaining_accounts`
+    ///                           AccumulatorInput is derived from
+    /// * `messages`            - Vec of vec of bytes, each representing a message
+    ///                           to be hashed and accumulated
+    ///
+    /// This ix will write as many of the messages up to the length
+    /// of the `accumulator_input.data`.
+    /// If `accumulator_input.data.len() < messages.map(|x| x.len()).sum()`
+    /// then the remaining messages will be ignored.
+    ///
+    /// The current implementation assumes that each invocation of this
+    /// ix is independent of any previous invocations. It will overwrite
+    /// any existing contents.
+    ///
+    /// TODO:
+    ///     - try handling re-allocation of the accumulator_input space
+    ///     - handle updates ("paging/batches of messages")
+    ///
     pub fn put_all<'info>(
         ctx: Context<'_, '_, '_, 'info, PutAll<'info>>,
         base_account_key: Pubkey,
-        values: Vec<InputSchemaAndData>,
+        messages: Vec<Vec<u8>>,
     ) -> Result<()> {
-        instructions::put_all(ctx, base_account_key, values)
+        instructions::put_all(ctx, base_account_key, messages)
     }
 }
 
@@ -128,4 +140,6 @@ pub enum AccumulatorUpdaterError {
     CurrentDataLengthExceeded,
     #[msg("Accumulator Input not writable")]
     AccumulatorInputNotWritable,
+    #[msg("Accumulator Input not provided")]
+    AccumulatorInputNotProvided,
 }

+ 0 - 1
accumulator_updater/programs/accumulator_updater/src/macros.rs

@@ -5,7 +5,6 @@ macro_rules! accumulator_input_seeds {
             $cpi_caller_pid.as_ref(),
             b"accumulator".as_ref(),
             $base_account.as_ref(),
-            &$accumulator_input.header.account_schema.to_le_bytes(),
             &[$accumulator_input.header.bump],
         ]
     };

+ 245 - 42
accumulator_updater/programs/accumulator_updater/src/state/accumulator_input.rs

@@ -14,26 +14,101 @@ use {
 /// the CPI calling program (e.g. Pyth Oracle)
 ///
 /// TODO: implement custom serialization & set alignment
-#[account]
+#[account(zero_copy)]
+#[derive(Debug, InitSpace)]
 pub struct AccumulatorInput {
-    pub header: AccumulatorHeader,
-    pub data:   Vec<u8>,
+    pub header:   AccumulatorHeader,
+    // 10KB - 8 (discriminator) - 514 (header)
+    // TODO: do we want to initialize this to the max size?
+    //   - will lead to more data being passed around for validators
+    pub messages: [u8; 9_718],
 }
 
+//TODO:
+// - implement custom serialization & set alignment
+// - what other fields are needed?
+#[zero_copy]
+#[derive(InitSpace, Debug)]
+pub struct AccumulatorHeader {
+    pub bump:        u8, // 1
+    pub version:     u8, // 1
+    // byte offset of accounts where data starts
+    // e.g. account_info.data[offset + header_len]
+    pub header_len:  u16, // 2
+    /// endpoints of every message.
+    /// ex: [10, 14]
+    /// => msg1 = account_info.data[(header_len + 0)..(header_len + 10)]
+    /// => msg2 = account_info.data[(header_len + 10)..(header_len + 14)]
+    pub end_offsets: [u16; 255], // 510
+}
+
+impl AccumulatorHeader {
+    // HEADER_LEN allows for append-only forward-compatibility for the header.
+    // this is the number of bytes from the beginning of the account_info.data
+    // to the start of the `AccumulatorInput` data.
+    pub const HEADER_LEN: u16 = 8 + AccumulatorHeader::INIT_SPACE as u16;
+
+    pub const CURRENT_VERSION: u8 = 1;
+
+    pub fn new(bump: u8) -> Self {
+        Self {
+            bump,
+            header_len: Self::HEADER_LEN,
+            version: Self::CURRENT_VERSION,
+            end_offsets: [0u16; u8::MAX as usize],
+        }
+    }
+
+    pub fn set_version(&mut self) {
+        self.version = Self::CURRENT_VERSION;
+    }
+}
 impl AccumulatorInput {
-    pub fn size(data: &Vec<u8>) -> usize {
-        AccumulatorHeader::SIZE + 4 + data.len()
+    pub fn size(&self) -> usize {
+        AccumulatorHeader::INIT_SPACE + 4 + self.messages.len()
     }
 
-    pub fn new(header: AccumulatorHeader, data: Vec<u8>) -> Self {
-        Self { header, data }
+    pub fn new(bump: u8) -> Self {
+        let header = AccumulatorHeader::new(bump);
+        Self {
+            header,
+            messages: [0u8; 9_718],
+        }
     }
 
-    pub fn update(&mut self, new_data: Vec<u8>) {
-        self.header = AccumulatorHeader::new(self.header.bump, self.header.account_schema);
-        self.data = new_data;
+    /// `put_all` writes all the messages to the `AccumulatorInput` account
+    /// and updates the `end_offsets` array.
+    ///
+    /// Returns tuple of the number of messages written and the end_offset
+    /// of the last message
+    ///
+    // TODO: add a end_offsets index parameter for "continuation"
+    // TODO: test max size of parameters that can be passed into CPI call
+    pub fn put_all(&mut self, values: &Vec<Vec<u8>>) -> (usize, u16) {
+        let mut offset = 0u16;
+
+        for (i, v) in values.iter().enumerate() {
+            let start = offset;
+            let len = u16::try_from(v.len());
+            if len.is_err() {
+                return (i, start);
+            }
+            let end = offset.checked_add(len.unwrap());
+            if end.is_none() {
+                return (i, start);
+            }
+            let end = end.unwrap();
+            if end > self.messages.len() as u16 {
+                return (i, start);
+            }
+            self.header.end_offsets[i] = end;
+            self.messages[(start as usize)..(end as usize)].copy_from_slice(v);
+            offset = end
+        }
+        (values.len(), offset)
     }
 
+
     fn derive_pda(&self, cpi_caller: Pubkey, base_account: Pubkey) -> Result<Pubkey> {
         let res = Pubkey::create_program_address(
             accumulator_input_seeds!(self, cpi_caller, base_account),
@@ -43,48 +118,176 @@ impl AccumulatorInput {
         Ok(res)
     }
 
-    pub fn validate(
-        &self,
-        key: Pubkey,
-        cpi_caller: Pubkey,
-        base_account: Pubkey,
-        account_schema: u8,
-    ) -> Result<()> {
+    pub fn validate(&self, key: Pubkey, cpi_caller: Pubkey, base_account: Pubkey) -> Result<()> {
         let expected_key = self.derive_pda(cpi_caller, base_account)?;
         require_keys_eq!(expected_key, key);
-        require_eq!(self.header.account_schema, account_schema);
         Ok(())
     }
+}
 
-    pub fn persist(&self, ai: &AccountInfo) -> Result<()> {
-        AccountSerialize::try_serialize(self, &mut &mut ai.data.borrow_mut()[..]).map_err(|e| {
-            msg!("original error: {:?}", e);
-            AccumulatorUpdaterError::SerializeError
-        })?;
-        Ok(())
+
+#[cfg(test)]
+mod test {
+    use {
+        super::*,
+        bytemuck::bytes_of,
+        std::mem::{
+            align_of,
+            size_of,
+        },
+    };
+
+    fn data_bytes(data: Vec<u8>) -> Vec<u8> {
+        let mut bytes = vec![];
+        for d in data {
+            bytes.extend_from_slice(&d.to_be_bytes());
+        }
+        bytes
     }
-}
 
-//TODO:
-// - implement custom serialization & set alignment
-// - what other fields are needed?
-#[derive(AnchorSerialize, AnchorDeserialize, Clone, Debug, Default)]
-pub struct AccumulatorHeader {
-    pub bump:           u8,
-    pub version:        u8,
-    pub account_schema: u8,
-}
 
+    #[test]
+    fn test_sizes_and_alignments() {
+        let (header_idx_size, header_idx_align) = (
+            size_of::<AccumulatorHeader>(),
+            align_of::<AccumulatorHeader>(),
+        );
 
-impl AccumulatorHeader {
-    pub const SIZE: usize = 1 + 1 + 1;
-    pub const CURRENT_VERSION: u8 = 1;
+        let (input_size, input_align) = (
+            size_of::<AccumulatorInput>(),
+            align_of::<AccumulatorInput>(),
+        );
 
-    pub fn new(bump: u8, account_schema: u8) -> Self {
-        Self {
-            bump,
-            version: Self::CURRENT_VERSION,
-            account_schema,
+        assert_eq!(header_idx_size, 514);
+        assert_eq!(header_idx_align, 2);
+        assert_eq!(input_size, 10_232);
+        assert_eq!(input_align, 2);
+    }
+
+    #[test]
+    fn test_put_all() {
+        let data = vec![vec![12, 34], vec![56, 78, 90]];
+        let data_bytes: Vec<Vec<u8>> = data.into_iter().map(data_bytes).collect();
+
+        let accumulator_input = &mut AccumulatorInput::new(0);
+
+        let (num_msgs, num_bytes) = accumulator_input.put_all(&data_bytes);
+        assert_eq!(num_msgs, 2);
+        assert_eq!(num_bytes, 5);
+
+
+        assert_eq!(accumulator_input.header.end_offsets[0], 2);
+        assert_eq!(accumulator_input.header.end_offsets[1], 5);
+
+
+        let accumulator_input_bytes = bytes_of(accumulator_input);
+
+        // The header_len field represents the size of all data prior to the message bytes.
+        // This includes the account discriminator, which is not part of the header struct.
+        // Subtract the size of the discriminator (8 bytes) to compensate
+        let header_len = accumulator_input.header.header_len as usize - 8;
+
+
+        let iter = accumulator_input
+            .header
+            .end_offsets
+            .iter()
+            .take_while(|x| **x != 0);
+        let mut start = header_len;
+        let mut data_iter = data_bytes.iter();
+        for offset in iter {
+            let end_offset = header_len + *offset as usize;
+            let accumulator_input_data = &accumulator_input_bytes[start..end_offset];
+            let expected_data = data_iter.next().unwrap();
+            assert_eq!(accumulator_input_data, expected_data.as_slice());
+            start = end_offset;
+        }
+    }
+
+    #[test]
+    fn test_put_all_exceed_max() {
+        let data = vec![vec![0u8; 9_718 - 2], vec![0u8], vec![0u8; 2]];
+
+        let data_bytes: Vec<Vec<u8>> = data.into_iter().map(data_bytes).collect();
+        let accumulator_input = &mut AccumulatorInput::new(0);
+        let (num_msgs, num_bytes) = accumulator_input.put_all(&data_bytes);
+        assert_eq!(num_msgs, 2);
+        assert_eq!(
+            num_bytes,
+            data_bytes[0..2].iter().map(|x| x.len()).sum::<usize>() as u16
+        );
+
+        let accumulator_input_bytes = bytes_of(accumulator_input);
+
+        // The header_len field represents the size of all data prior to the message bytes.
+        // This includes the account discriminator, which is not part of the header struct.
+        // Subtract the size of the discriminator (8 bytes) to compensate
+        let header_len = accumulator_input.header.header_len as usize - 8;
+
+
+        let iter = accumulator_input
+            .header
+            .end_offsets
+            .iter()
+            .take_while(|x| **x != 0);
+        let mut start = header_len;
+        let mut data_iter = data_bytes.iter();
+        for offset in iter {
+            let end_offset = header_len + *offset as usize;
+            let accumulator_input_data = &accumulator_input_bytes[start..end_offset];
+            let expected_data = data_iter.next().unwrap();
+            assert_eq!(accumulator_input_data, expected_data.as_slice());
+            start = end_offset;
         }
+
+        assert_eq!(accumulator_input.header.end_offsets[2], 0);
+    }
+
+    #[test]
+    fn test_put_all_long_vec() {
+        let data = vec![
+            vec![0u8; 9_718 - 3],
+            vec![0u8],
+            vec![0u8],
+            vec![0u8; u16::MAX as usize + 2],
+            vec![0u8],
+        ];
+
+        let data_bytes: Vec<Vec<u8>> = data.into_iter().map(data_bytes).collect();
+        let accumulator_input = &mut AccumulatorInput::new(0);
+        let (num_msgs, num_bytes) = accumulator_input.put_all(&data_bytes);
+        assert_eq!(num_msgs, 3);
+        assert_eq!(
+            num_bytes,
+            data_bytes[0..3].iter().map(|x| x.len()).sum::<usize>() as u16
+        );
+
+        let accumulator_input_bytes = bytes_of(accumulator_input);
+
+        // *note* minus 8 here since no account discriminator when using
+        // `bytes_of`directly on accumulator_input
+        let header_len = accumulator_input.header.header_len as usize - 8;
+
+
+        let iter = accumulator_input
+            .header
+            .end_offsets
+            .iter()
+            .take_while(|x| **x != 0);
+        let mut start = header_len;
+        let mut data_iter = data_bytes.iter();
+        for offset in iter {
+            let end_offset = header_len + *offset as usize;
+            let accumulator_input_data = &accumulator_input_bytes[start..end_offset];
+            let expected_data = data_iter.next().unwrap();
+            assert_eq!(accumulator_input_data, expected_data.as_slice());
+            start = end_offset;
+        }
+
+        assert_eq!(accumulator_input.header.end_offsets[0], 9_715);
+        assert_eq!(accumulator_input.header.end_offsets[1], 9_716);
+        assert_eq!(accumulator_input.header.end_offsets[2], 9_717);
+        assert_eq!(accumulator_input.header.end_offsets[3], 0);
+        assert_eq!(accumulator_input.header.end_offsets[4], 0);
     }
 }

+ 7 - 15
accumulator_updater/programs/mock-cpi-caller/src/instructions/add_price.rs

@@ -28,8 +28,8 @@ pub fn add_price<'info>(
     ctx: Context<'_, '_, '_, 'info, AddPrice<'info>>,
     params: AddPriceParams,
 ) -> Result<()> {
-    let mut account_data: Vec<Vec<u8>> = vec![];
-    let schemas = get_schemas(PythAccountType::Price);
+    let mut inputs: Vec<Vec<u8>> = vec![];
+    let _schemas = get_schemas(PythAccountType::Price);
 
     {
         let pyth_price_acct = &mut ctx.accounts.pyth_price_account.load_init()?;
@@ -38,25 +38,19 @@ pub fn add_price<'info>(
 
         let price_full_data = FullPriceMessage::from(&**pyth_price_acct).accumulator_serialize()?;
 
-        account_data.push(price_full_data);
+        inputs.push(price_full_data);
 
 
         let price_compact_data =
             CompactPriceMessage::from(&**pyth_price_acct).accumulator_serialize()?;
-        account_data.push(price_compact_data);
+        inputs.push(price_compact_data);
     }
 
 
-    let values = schemas
-        .into_iter()
-        .map(|s| s.to_u8())
-        .zip(account_data)
-        .collect::<Vec<(u8, Vec<u8>)>>();
-
     // Note: normally pyth oracle add_price wouldn't call emit_accumulator_inputs
     // since add_price doesn't actually add/update any price data we would
     // want included in the accumulator anyways. This is just for testing
-    AddPrice::emit_accumulator_inputs(ctx, values)
+    AddPrice::emit_accumulator_inputs(ctx, inputs)
 }
 
 
@@ -64,7 +58,7 @@ impl<'info> AddPrice<'info> {
     /// Invoke accumulator-updater emit-inputs ix cpi call using solana
     pub fn emit_accumulator_inputs(
         ctx: Context<'_, '_, '_, 'info, AddPrice<'info>>,
-        values: Vec<(u8, Vec<u8>)>,
+        inputs: Vec<Vec<u8>>,
     ) -> anchor_lang::Result<()> {
         let mut accounts = vec![
             AccountMeta::new(ctx.accounts.payer.key(), true),
@@ -85,9 +79,7 @@ impl<'info> AddPrice<'info> {
                 //anchor ix discriminator/identifier
                 sighash("global", ACCUMULATOR_UPDATER_IX_NAME),
                 ctx.accounts.pyth_price_account.key(),
-                values,
-                // account_data,
-                // account_schemas,
+                inputs,
             )
                 .try_to_vec()
                 .unwrap(),

+ 6 - 18
accumulator_updater/programs/mock-cpi-caller/src/instructions/update_price.rs

@@ -63,8 +63,8 @@ pub fn update_price<'info>(
     ctx: Context<'_, '_, '_, 'info, UpdatePrice<'info>>,
     params: UpdatePriceParams,
 ) -> Result<()> {
-    let mut account_data = vec![];
-    let schemas = get_schemas(PythAccountType::Price);
+    let mut inputs = vec![];
+    let _schemas = get_schemas(PythAccountType::Price);
 
     {
         let pyth_price_acct = &mut ctx.accounts.pyth_price_account.load_mut()?;
@@ -72,33 +72,23 @@ pub fn update_price<'info>(
 
         let price_full_data = FullPriceMessage::from(&**pyth_price_acct).accumulator_serialize()?;
 
-        account_data.push(price_full_data);
+        inputs.push(price_full_data);
 
 
         let price_compact_data =
             CompactPriceMessage::from(&**pyth_price_acct).accumulator_serialize()?;
-        account_data.push(price_compact_data);
+        inputs.push(price_compact_data);
     }
 
 
-    // let account_schemas = schemas.into_iter().map(|s| s.to_u8()).collect::<Vec<u8>>();
-
-    let values = schemas
-        .into_iter()
-        .map(|s| s.to_u8())
-        .zip(account_data)
-        .collect::<Vec<(u8, Vec<u8>)>>();
-
-    UpdatePrice::emit_accumulator_inputs(ctx, values)
+    UpdatePrice::emit_accumulator_inputs(ctx, inputs)
 }
 
 impl<'info> UpdatePrice<'info> {
     /// Invoke accumulator-updater emit-inputs ix cpi call
     pub fn emit_accumulator_inputs(
         ctx: Context<'_, '_, '_, 'info, UpdatePrice<'info>>,
-        values: Vec<(u8, Vec<u8>)>,
-        // account_data: Vec<Vec<u8>>,
-        // account_schemas: Vec<u8>,
+        values: Vec<Vec<u8>>,
     ) -> anchor_lang::Result<()> {
         let mut accounts = vec![
             AccountMeta::new(ctx.accounts.payer.key(), true),
@@ -120,8 +110,6 @@ impl<'info> UpdatePrice<'info> {
                 sighash("global", ACCUMULATOR_UPDATER_IX_NAME),
                 ctx.accounts.pyth_price_account.key(),
                 values,
-                // account_data,
-                // account_schemas,
             )
                 .try_to_vec()
                 .unwrap(),

+ 1 - 1
accumulator_updater/programs/mock-cpi-caller/src/lib.rs

@@ -42,7 +42,7 @@ mod test {
     fn ix_discriminator() {
         let a = &(accumulator_updater::instruction::PutAll {
             base_account_key: anchor_lang::prelude::Pubkey::default(),
-            values:           vec![],
+            messages:         vec![],
         }
         .data()[..8]);
 

+ 45 - 8
accumulator_updater/programs/mock-cpi-caller/src/message/price.rs

@@ -1,30 +1,57 @@
 use {
     crate::{
-        message::AccumulatorSerializer,
+        message::{
+            AccumulatorSerializer,
+            MessageSchema,
+        },
         state::PriceAccount,
     },
     anchor_lang::prelude::*,
-    bytemuck::{
-        Pod,
-        Zeroable,
-    },
     std::io::Write,
 };
 
-// TODO: should these schemas be "external" (protobuf?)
 
 #[repr(C)]
-#[derive(Debug, Copy, Clone, Pod, Zeroable)]
+#[derive(Clone, Default, Debug, Eq, PartialEq)]
+pub struct MessageHeader {
+    pub schema:  u8,
+    pub version: u16,
+    pub size:    u32,
+}
+
+impl MessageHeader {
+    pub const CURRENT_VERSION: u8 = 2;
+
+    pub fn new(schema: MessageSchema, size: u32) -> Self {
+        Self {
+            schema: schema.to_u8(),
+            version: Self::CURRENT_VERSION as u16,
+            size,
+        }
+    }
+}
+
+#[repr(C)]
+#[derive(Clone, Default, Debug, Eq, PartialEq)]
 pub struct CompactPriceMessage {
+    pub header:     MessageHeader,
     pub price_expo: u64,
     pub price:      u64,
     pub id:         u64,
 }
 
 
+impl CompactPriceMessage {
+    // size without header
+    pub const SIZE: usize = 24;
+}
+
 impl AccumulatorSerializer for CompactPriceMessage {
     fn accumulator_serialize(&self) -> Result<Vec<u8>> {
         let mut bytes = vec![];
+        bytes.write_all(&self.header.schema.to_be_bytes())?;
+        bytes.write_all(&self.header.version.to_be_bytes())?;
+        bytes.write_all(&self.header.size.to_be_bytes())?;
         bytes.write_all(&self.id.to_be_bytes())?;
         bytes.write_all(&self.price.to_be_bytes())?;
         bytes.write_all(&self.price_expo.to_be_bytes())?;
@@ -35,6 +62,7 @@ impl AccumulatorSerializer for CompactPriceMessage {
 impl From<&PriceAccount> for CompactPriceMessage {
     fn from(other: &PriceAccount) -> Self {
         Self {
+            header:     MessageHeader::new(MessageSchema::Compact, Self::SIZE as u32),
             id:         other.id,
             price:      other.price,
             price_expo: other.price_expo,
@@ -44,8 +72,9 @@ impl From<&PriceAccount> for CompactPriceMessage {
 
 
 #[repr(C)]
-#[derive(Debug, Copy, Clone, Pod, Zeroable)]
+#[derive(Clone, Default, Debug, Eq, PartialEq)]
 pub struct FullPriceMessage {
+    pub header:     MessageHeader,
     pub id:         u64,
     pub price:      u64,
     pub price_expo: u64,
@@ -53,9 +82,14 @@ pub struct FullPriceMessage {
     pub ema_expo:   u64,
 }
 
+impl FullPriceMessage {
+    pub const SIZE: usize = 40;
+}
+
 impl From<&PriceAccount> for FullPriceMessage {
     fn from(other: &PriceAccount) -> Self {
         Self {
+            header:     MessageHeader::new(MessageSchema::Full, Self::SIZE as u32),
             id:         other.id,
             price:      other.price,
             price_expo: other.price_expo,
@@ -68,6 +102,9 @@ impl From<&PriceAccount> for FullPriceMessage {
 impl AccumulatorSerializer for FullPriceMessage {
     fn accumulator_serialize(&self) -> Result<Vec<u8>> {
         let mut bytes = vec![];
+        bytes.write_all(&self.header.schema.to_be_bytes())?;
+        bytes.write_all(&self.header.version.to_be_bytes())?;
+        bytes.write_all(&self.header.size.to_be_bytes())?;
         bytes.write_all(&self.id.to_be_bytes())?;
         bytes.write_all(&self.price.to_be_bytes())?;
         bytes.write_all(&self.price_expo.to_be_bytes())?;

+ 112 - 79
accumulator_updater/tests/accumulator_updater.ts

@@ -33,8 +33,6 @@ const [pythPriceAccountPk] = anchor.web3.PublicKey.findProgramAddressSync(
   mockCpiProg.programId
 );
 
-const PRICE_SCHEMAS = [0, 1];
-
 describe("accumulator_updater", () => {
   // Configure the client to use the local cluster.
   let provider = anchor.AnchorProvider.env();
@@ -113,26 +111,22 @@ describe("accumulator_updater", () => {
       })
       .pubkeys();
 
-    const accumulatorPdaKeys = PRICE_SCHEMAS.map((pythSchema) => {
-      return anchor.web3.PublicKey.findProgramAddressSync(
-        [
-          mockCpiProg.programId.toBuffer(),
-          Buffer.from("accumulator"),
-          // mockCpiCallerAddPriceTxPubkeys.pythPriceAccount.toBuffer(),
-          pythPriceAccountPk.toBuffer(),
-          new anchor.BN(pythSchema).toArrayLike(Buffer, "le", 1),
-        ],
-        accumulatorUpdaterProgram.programId
-      )[0];
-    });
-    const accumulatorPdaMetas = accumulatorPdaKeys.map((pda) => {
-      return {
-        pubkey: pda,
+    const accumulatorPdaKey = anchor.web3.PublicKey.findProgramAddressSync(
+      [
+        mockCpiProg.programId.toBuffer(),
+        Buffer.from("accumulator"),
+        pythPriceAccountPk.toBuffer(),
+      ],
+      accumulatorUpdaterProgram.programId
+    )[0];
+
+    const accumulatorPdaMetas = [
+      {
+        pubkey: accumulatorPdaKey,
         isSigner: false,
         isWritable: true,
-      };
-      // return pda;
-    });
+      },
+    ];
 
     const mockCpiCallerAddPriceTxPrep = await mockCpiProg.methods
       .addPrice(addPriceParams)
@@ -182,16 +176,14 @@ describe("accumulator_updater", () => {
       ...pythPriceAccount,
       data: pythPriceAccount.data.toString("hex"),
     };
-    console.log(`pythPriceAccount: ${JSON.stringify(pythPriceAcct)}`);
 
-    const accumulatorInputs =
-      await accumulatorUpdaterProgram.account.accumulatorInput.fetchMultiple(
-        accumulatorPdaKeys
+    const accumulatorInput =
+      await accumulatorUpdaterProgram.account.accumulatorInput.fetch(
+        accumulatorPdaKey
       );
 
-    const accumulatorPriceMessages = accumulatorInputs.map((ai) => {
-      return parseAccumulatorInput(ai);
-    });
+    const accumulatorPriceMessages = parseAccumulatorInput(accumulatorInput);
+
     console.log(
       `accumulatorPriceMessages: ${JSON.stringify(
         accumulatorPriceMessages,
@@ -223,12 +215,10 @@ describe("accumulator_updater", () => {
         ],
       }
     );
-    const accumulatorInputKeyStrings = accumulatorPdaKeys.map((k) =>
-      k.toString()
-    );
-    accumulatorAccounts.forEach((a) => {
-      assert.isTrue(accumulatorInputKeyStrings.includes(a.pubkey.toString()));
-    });
+
+    accumulatorAccounts
+      .map((a) => a.toString())
+      .includes(accumulatorPdaKey.toString());
   });
 
   it("Mock CPI Program - UpdatePrice", async () => {
@@ -239,10 +229,7 @@ describe("accumulator_updater", () => {
       emaExpo: new anchor.BN(8),
     };
 
-    let accumulatorPdaMetas = getAccumulatorPdaMetas(
-      pythPriceAccountPk,
-      PRICE_SCHEMAS
-    );
+    let accumulatorPdaMeta = getAccumulatorPdaMeta(pythPriceAccountPk);
     await mockCpiProg.methods
       .updatePrice(updatePriceParams)
       .accounts({
@@ -251,7 +238,7 @@ describe("accumulator_updater", () => {
         accumulatorWhitelist: whitelistPubkey,
         accumulatorProgram: accumulatorUpdaterProgram.programId,
       })
-      .remainingAccounts(accumulatorPdaMetas)
+      .remainingAccounts([accumulatorPdaMeta])
       .preInstructions([
         ComputeBudgetProgram.setComputeUnitLimit({ units: 1_000_000 }),
       ])
@@ -266,13 +253,12 @@ describe("accumulator_updater", () => {
     assert.isTrue(pythPriceAccount.priceExpo.eq(updatePriceParams.priceExpo));
     assert.isTrue(pythPriceAccount.ema.eq(updatePriceParams.ema));
     assert.isTrue(pythPriceAccount.emaExpo.eq(updatePriceParams.emaExpo));
-    const accumulatorInputs =
-      await accumulatorUpdaterProgram.account.accumulatorInput.fetchMultiple(
-        accumulatorPdaMetas.map((m) => m.pubkey)
+    const accumulatorInput =
+      await accumulatorUpdaterProgram.account.accumulatorInput.fetch(
+        accumulatorPdaMeta.pubkey
       );
-    const updatedAccumulatorPriceMessages = accumulatorInputs.map((ai) => {
-      return parseAccumulatorInput(ai);
-    });
+    const updatedAccumulatorPriceMessages =
+      parseAccumulatorInput(accumulatorInput);
 
     console.log(
       `updatedAccumulatorPriceMessages: ${JSON.stringify(
@@ -289,28 +275,22 @@ describe("accumulator_updater", () => {
   });
 });
 
-export const getAccumulatorPdaMetas = (
-  pythAccount: anchor.web3.PublicKey,
-  schemas: number[]
-): AccountMeta[] => {
-  const accumulatorPdaKeys = schemas.map((pythSchema) => {
-    return anchor.web3.PublicKey.findProgramAddressSync(
-      [
-        mockCpiProg.programId.toBuffer(),
-        Buffer.from("accumulator"),
-        pythAccount.toBuffer(),
-        new anchor.BN(pythSchema).toArrayLike(Buffer, "le", 1),
-      ],
-      accumulatorUpdaterProgram.programId
-    )[0];
-  });
-  return accumulatorPdaKeys.map((pda) => {
-    return {
-      pubkey: pda,
-      isSigner: false,
-      isWritable: true,
-    };
-  });
+export const getAccumulatorPdaMeta = (
+  pythAccount: anchor.web3.PublicKey
+): AccountMeta => {
+  const accumulatorPdaKey = anchor.web3.PublicKey.findProgramAddressSync(
+    [
+      mockCpiProg.programId.toBuffer(),
+      Buffer.from("accumulator"),
+      pythAccount.toBuffer(),
+    ],
+    accumulatorUpdaterProgram.programId
+  )[0];
+  return {
+    pubkey: accumulatorPdaKey,
+    isSigner: false,
+    isWritable: true,
+  };
 };
 
 type AccumulatorInputHeader = IdlTypes<AccumulatorUpdater>["AccumulatorHeader"];
@@ -319,19 +299,73 @@ type AccumulatorInputHeader = IdlTypes<AccumulatorUpdater>["AccumulatorHeader"];
 // accountType and accountSchema.
 function parseAccumulatorInput({
   header,
-  data,
+  messages,
 }: {
   header: AccumulatorInputHeader;
-  data: Buffer;
-}): AccumulatorPriceMessage {
-  console.log(`header: ${JSON.stringify(header)}`);
-  if (header.accountSchema === 0) {
-    console.log(`[full]data: ${data.toString("hex")}`);
-    return parseFullPriceMessage(data);
-  } else {
-    console.log(`[compact]data: ${data.toString("hex")}`);
-    return parseCompactPriceMessage(data);
+  messages: number[];
+}): AccumulatorPriceMessage[] {
+  const accumulatorMessages = [];
+  let dataBuffer = Buffer.from(messages);
+
+  let start = 0;
+  for (let i = 0; i < header.endOffsets.length; i++) {
+    const endOffset = header.endOffsets[i];
+
+    if (endOffset == 0) {
+      console.log(`endOffset = 0. breaking`);
+      break;
+    }
+
+    const messageBytes = dataBuffer.subarray(start, endOffset);
+    const { header: msgHeader, data: msgData } =
+      parseMessageBytes(messageBytes);
+    console.info(`header: ${JSON.stringify(msgHeader, null, 2)}`);
+    if (msgHeader.schema == 0) {
+      accumulatorMessages.push(parseFullPriceMessage(msgData));
+    } else if (msgHeader.schema == 1) {
+      accumulatorMessages.push(parseCompactPriceMessage(msgData));
+    } else {
+      console.warn("Unknown input index: " + i);
+      continue;
+    }
+    start = endOffset;
   }
+  return accumulatorMessages;
+}
+
+type MessageHeader = {
+  schema: number;
+  version: number;
+  size: number;
+};
+
+type Message = {
+  header: MessageHeader;
+  data: Buffer;
+};
+
+function parseMessageBytes(data: Buffer): Message {
+  let offset = 0;
+
+  const schema = data.readInt8(offset);
+  offset += 1;
+
+  const version = data.readInt16BE(offset);
+  offset += 2;
+
+  const size = data.readUInt32BE(offset);
+  offset += 4;
+
+  const messageHeader = {
+    schema,
+    version,
+    size,
+  };
+  let messageData = data.subarray(offset, offset + size);
+  return {
+    header: messageHeader,
+    data: messageData,
+  };
 }
 
 //TODO: follow wormhole sdk parsing structure?
@@ -345,8 +379,7 @@ type FullPriceMessage = {
   ema: anchor.BN;
   emaExpo: anchor.BN;
 };
-
-function parseFullPriceMessage(data: Buffer): FullPriceMessage {
+function parseFullPriceMessage(data: Uint8Array): FullPriceMessage {
   return {
     id: new anchor.BN(data.subarray(0, 8), "be"),
     price: new anchor.BN(data.subarray(8, 16), "be"),
@@ -362,7 +395,7 @@ type CompactPriceMessage = {
   priceExpo: anchor.BN;
 };
 
-function parseCompactPriceMessage(data: Buffer): CompactPriceMessage {
+function parseCompactPriceMessage(data: Uint8Array): CompactPriceMessage {
   return {
     id: new anchor.BN(data.subarray(0, 8), "be"),
     price: new anchor.BN(data.subarray(8, 16), "be"),