Răsfoiți Sursa

pyth-lazer-agent fixes, tests, ci

Mike Rolish 5 luni în urmă
părinte
comite
6fd4328f82

+ 37 - 0
.github/workflows/ci-pyth-lazer-agent.yml

@@ -0,0 +1,37 @@
+name: "pyth-lazer-agent Rust Test Suite"
+on:
+  push:
+    branches:
+      - main
+  pull_request:
+    paths:
+      - .github/workflows/ci-pyth-lazer-agent.yml
+      - apps/pyth-lazer-agent/**
+
+jobs:
+  pyth-lazer-agent-rust-test-suite:
+    name: pyth-lazer-agent Rust Test Suite
+    runs-on: ubuntu-22.04
+    defaults:
+      run:
+        working-directory: apps/pyth-lazer-agent
+    steps:
+      - uses: actions/checkout@v4
+        with:
+          submodules: recursive
+      - uses: actions-rust-lang/setup-rust-toolchain@v1
+        with:
+          toolchain: 1.87.0
+          components: clippy,rustfmt
+      - uses: Swatinem/rust-cache@v2
+        with:
+          workspaces: "apps/pyth-lazer-agent -> target"
+      - name: Format check
+        run: cargo fmt --all -- --check
+        if: success() || failure()
+      - name: Clippy check
+        run: cargo clippy --all-targets -- --deny warnings
+        if: success() || failure()
+      - name: test
+        run: cargo test
+        if: success() || failure()

+ 55 - 0
.github/workflows/docker-pyth-lazer-agent.yml

@@ -0,0 +1,55 @@
+name: Build and Push pyth-lazer-agent Image
+on:
+  push:
+    tags:
+      - pyth-lazer-agent-v*
+  pull_request:
+    paths:
+      - "apps/pyth-lazer-agent/**"
+  workflow_dispatch:
+    inputs:
+      dispatch_description:
+        description: "Dispatch description"
+        required: true
+        type: string
+permissions:
+  contents: read
+  id-token: write
+  packages: write
+env:
+  REGISTRY: ghcr.io
+  IMAGE_NAME: pyth-network/pyth-lazer-agent
+jobs:
+  pyth-lazer-agent-image:
+    runs-on: ubuntu-latest
+    steps:
+      - uses: actions/checkout@v2
+      - name: Set image tag to version of the git tag
+        if: ${{ startsWith(github.ref, 'refs/tags/pyth-lazer-agent-v') }}
+        run: |
+          PREFIX="refs/tags/pyth-lazer-agent-"
+          VERSION="${GITHUB_REF:${#PREFIX}}"
+          echo "IMAGE_TAG=${VERSION}" >> "${GITHUB_ENV}"
+      - name: Set image tag to the git commit hash
+        if: ${{ !startsWith(github.ref, 'refs/tags/pyth-lazer-agent-v') }}
+        run: |
+          echo "IMAGE_TAG=${{ github.sha }}" >> "${GITHUB_ENV}"
+      - name: Log in to the Container registry
+        uses: docker/login-action@65b78e6e13532edd9afa3aa52ac7964289d1a9c1
+        with:
+          registry: ${{ env.REGISTRY }}
+          username: ${{ github.actor }}
+          password: ${{ secrets.GITHUB_TOKEN }}
+      - name: Extract metadata (tags, labels) for Docker
+        id: metadata_pyth_lazer_agent
+        uses: docker/metadata-action@9ec57ed1fcdbf14dcef7dfbe97b2010124a938b7
+        with:
+          images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
+      - name: Build and push server docker image
+        uses: docker/build-push-action@f2a1d5e99d037542a71f64918e516c093c6f3fc4
+        with:
+          context: .
+          file: "./apps/pyth-lazer-agent/Dockerfile"
+          push: ${{ github.event_name != 'pull_request' }}
+          tags: ${{ steps.metadata_pyth_lazer_agent.outputs.tags }}
+          labels: ${{ steps.metadata_pyth_lazer_agent.outputs.labels }}

+ 0 - 0
pyth-lazer-agent/.dockerignore → apps/pyth-lazer-agent/.dockerignore


+ 0 - 0
pyth-lazer-agent/.gitignore → apps/pyth-lazer-agent/.gitignore


+ 2 - 0
pyth-lazer-agent/Cargo.lock → apps/pyth-lazer-agent/Cargo.lock

@@ -1660,6 +1660,7 @@ version = "0.1.0"
 dependencies = [
  "anyhow",
  "backoff",
+ "base64 0.22.1",
  "bincode",
  "clap",
  "config",
@@ -1679,6 +1680,7 @@ dependencies = [
  "serde_json",
  "soketto",
  "solana-keypair",
+ "tempfile",
  "tokio",
  "tokio-tungstenite",
  "tokio-util",

+ 4 - 0
pyth-lazer-agent/Cargo.toml → apps/pyth-lazer-agent/Cargo.toml

@@ -9,6 +9,7 @@ pyth-lazer-protocol = "0.7.2"
 
 anyhow = "1.0.98"
 backoff = "0.4.0"
+base64 = "0.22.1"
 bincode = { version = "2.0.1", features = ["serde"] }
 clap = { version = "4.5.32", features = ["derive"] }
 config = "0.15.11"
@@ -32,3 +33,6 @@ tokio-util = { version = "0.7.14", features = ["compat"] }
 tracing = "0.1.41"
 tracing-subscriber = { version = "0.3.19", features = ["env-filter", "json"] }
 url = { version = "2.5.4", features = ["serde"] }
+
+[dev-dependencies]
+tempfile = "3.20.0"

+ 0 - 0
pyth-lazer-agent/Dockerfile → apps/pyth-lazer-agent/Dockerfile


+ 0 - 0
pyth-lazer-agent/config/config.toml → apps/pyth-lazer-agent/config/config.toml


+ 0 - 0
pyth-lazer-agent/rust-toolchain.toml → apps/pyth-lazer-agent/rust-toolchain.toml


+ 0 - 2
pyth-lazer-agent/src/config.rs → apps/pyth-lazer-agent/src/config.rs

@@ -13,8 +13,6 @@ pub struct Config {
     pub listen_address: SocketAddr,
     pub relayer_urls: Vec<Url>,
     #[derivative(Debug = "ignore")]
-    pub authorization_token: String,
-    #[derivative(Debug = "ignore")]
     pub publish_keypair_path: PathBuf,
     #[serde(with = "humantime_serde", default = "default_publish_interval")]
     pub publish_interval_duration: Duration,

+ 0 - 0
pyth-lazer-agent/src/http_server.rs → apps/pyth-lazer-agent/src/http_server.rs


+ 258 - 0
apps/pyth-lazer-agent/src/lazer_publisher.rs

@@ -0,0 +1,258 @@
+use crate::config::{CHANNEL_CAPACITY, Config};
+use crate::relayer_session::RelayerSessionTask;
+use anyhow::{Context, Result, bail};
+use base64::Engine;
+use base64::prelude::BASE64_STANDARD;
+use ed25519_dalek::{Signer, SigningKey};
+use protobuf::well_known_types::timestamp::Timestamp;
+use protobuf::{Message, MessageField};
+use pyth_lazer_publisher_sdk::publisher_update::{FeedUpdate, PublisherUpdate};
+use pyth_lazer_publisher_sdk::transaction::lazer_transaction::Payload;
+use pyth_lazer_publisher_sdk::transaction::signature_data::Data::Ed25519;
+use pyth_lazer_publisher_sdk::transaction::{
+    Ed25519SignatureData, LazerTransaction, SignatureData, SignedLazerTransaction,
+};
+use solana_keypair::read_keypair_file;
+use std::path::PathBuf;
+use tokio::sync::broadcast;
+use tokio::{
+    select,
+    sync::mpsc::{self, Receiver, Sender},
+    time::interval,
+};
+use tracing::error;
+
+#[derive(Clone)]
+pub struct LazerPublisher {
+    sender: Sender<FeedUpdate>,
+}
+
+impl LazerPublisher {
+    fn load_signing_key(publish_keypair_path: &PathBuf) -> Result<SigningKey> {
+        // Read the keypair from the file using Solana SDK because it's the same key used by the Pythnet publisher
+        let publish_keypair = match read_keypair_file(publish_keypair_path) {
+            Ok(k) => k,
+            Err(e) => {
+                tracing::error!(
+                    error = ?e,
+                    publish_keypair_path = publish_keypair_path.display().to_string(),
+                    "Reading publish keypair returned an error. ",
+                );
+                bail!("Reading publish keypair returned an error.");
+            }
+        };
+
+        SigningKey::from_keypair_bytes(&publish_keypair.to_bytes())
+            .context("Failed to create signing key from keypair")
+    }
+
+    pub async fn new(config: &Config) -> Self {
+        let signing_key = match Self::load_signing_key(&config.publish_keypair_path) {
+            Ok(signing_key) => signing_key,
+            Err(e) => {
+                tracing::error!("Failed to load signing key: {e:?}");
+                // Can't proceed on key failure
+                panic!("Failed to load signing key: {e:?}");
+            }
+        };
+
+        let (relayer_sender, _) = broadcast::channel(CHANNEL_CAPACITY);
+        for url in config.relayer_urls.iter() {
+            let mut task = RelayerSessionTask {
+                url: url.clone(),
+                token: BASE64_STANDARD.encode(signing_key.verifying_key().to_bytes()),
+                receiver: relayer_sender.subscribe(),
+            };
+            tokio::spawn(async move { task.run().await });
+        }
+
+        let (sender, receiver) = mpsc::channel(CHANNEL_CAPACITY);
+        let mut task = LazerPublisherTask {
+            config: config.clone(),
+            receiver,
+            pending_updates: Vec::new(),
+            relayer_sender,
+            signing_key,
+        };
+        tokio::spawn(async move { task.run().await });
+        Self { sender }
+    }
+
+    pub async fn push_feed_update(&self, feed_update: FeedUpdate) -> Result<()> {
+        self.sender.send(feed_update).await?;
+        Ok(())
+    }
+}
+
+struct LazerPublisherTask {
+    // connection state
+    config: Config,
+    receiver: Receiver<FeedUpdate>,
+    pending_updates: Vec<FeedUpdate>,
+    relayer_sender: broadcast::Sender<SignedLazerTransaction>,
+    signing_key: SigningKey,
+}
+
+impl LazerPublisherTask {
+    pub async fn run(&mut self) {
+        let mut publish_interval = interval(self.config.publish_interval_duration);
+        loop {
+            select! {
+                Some(feed_update) = self.receiver.recv() => {
+                    self.pending_updates.push(feed_update);
+                }
+                _ = publish_interval.tick() => {
+                    if let Err(err) = self.batch_transaction().await {
+                        error!("Failed to publish updates: {}", err);
+                    }
+                }
+            }
+        }
+    }
+
+    async fn batch_transaction(&mut self) -> Result<()> {
+        if self.pending_updates.is_empty() {
+            return Ok(());
+        }
+
+        let publisher_update = PublisherUpdate {
+            updates: self.pending_updates.drain(..).collect(),
+            publisher_timestamp: MessageField::some(Timestamp::now()),
+            special_fields: Default::default(),
+        };
+        let lazer_transaction = LazerTransaction {
+            payload: Some(Payload::PublisherUpdate(publisher_update)),
+            special_fields: Default::default(),
+        };
+        let buf = match lazer_transaction.write_to_bytes() {
+            Ok(buf) => buf,
+            Err(e) => {
+                tracing::warn!("Failed to encode Lazer transaction to bytes: {:?}", e);
+                bail!("Failed to encode Lazer transaction")
+            }
+        };
+        let signature = self.signing_key.sign(&buf);
+        let signature_data = SignatureData {
+            data: Some(Ed25519(Ed25519SignatureData {
+                signature: Some(signature.to_bytes().into()),
+                public_key: Some(self.signing_key.verifying_key().to_bytes().into()),
+                special_fields: Default::default(),
+            })),
+            special_fields: Default::default(),
+        };
+        let signed_lazer_transaction = SignedLazerTransaction {
+            signature_data: MessageField::some(signature_data),
+            payload: Some(buf),
+            special_fields: Default::default(),
+        };
+        match self.relayer_sender.send(signed_lazer_transaction.clone()) {
+            Ok(_) => (),
+            Err(e) => {
+                tracing::error!("Error sending transaction to relayer receivers: {e}");
+            }
+        }
+
+        Ok(())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::config::{CHANNEL_CAPACITY, Config};
+    use crate::lazer_publisher::LazerPublisherTask;
+    use ed25519_dalek::SigningKey;
+    use protobuf::well_known_types::timestamp::Timestamp;
+    use protobuf::{Message, MessageField};
+    use pyth_lazer_publisher_sdk::publisher_update::feed_update::Update;
+    use pyth_lazer_publisher_sdk::publisher_update::{FeedUpdate, PriceUpdate};
+    use pyth_lazer_publisher_sdk::transaction::{LazerTransaction, lazer_transaction};
+    use std::io::Write;
+    use std::path::PathBuf;
+    use std::time::Duration;
+    use tempfile::NamedTempFile;
+    use tokio::sync::broadcast::error::TryRecvError;
+    use tokio::sync::{broadcast, mpsc};
+    use url::Url;
+
+    fn get_private_key() -> SigningKey {
+        SigningKey::from_keypair_bytes(&[
+            105, 175, 146, 91, 32, 145, 164, 199, 37, 111, 139, 255, 44, 225, 5, 247, 154, 170,
+            238, 70, 47, 15, 9, 48, 102, 87, 180, 50, 50, 38, 148, 243, 62, 148, 219, 72, 222, 170,
+            8, 246, 176, 33, 205, 29, 118, 11, 220, 163, 214, 204, 46, 49, 132, 94, 170, 173, 244,
+            39, 179, 211, 177, 70, 252, 31,
+        ])
+        .unwrap()
+    }
+
+    fn get_private_key_file() -> NamedTempFile {
+        let private_key_string = "[105,175,146,91,32,145,164,199,37,111,139,255,44,225,5,247,154,170,238,70,47,15,9,48,102,87,180,50,50,38,148,243,62,148,219,72,222,170,8,246,176,33,205,29,118,11,220,163,214,204,46,49,132,94,170,173,244,39,179,211,177,70,252,31]";
+        let mut temp_file = NamedTempFile::new().unwrap();
+        temp_file
+            .as_file_mut()
+            .write(private_key_string.as_bytes())
+            .unwrap();
+        temp_file.flush().unwrap();
+        temp_file
+    }
+
+    #[tokio::test]
+    async fn test_lazer_exporter_task() {
+        let signing_key_file = get_private_key_file();
+        let signing_key = get_private_key();
+
+        let config = Config {
+            listen_address: "0.0.0.0:12345".parse().unwrap(),
+            relayer_urls: vec![Url::parse("http://127.0.0.1:12346").unwrap()],
+            publish_keypair_path: PathBuf::from(signing_key_file.path()),
+            publish_interval_duration: Duration::from_millis(25),
+        };
+
+        let (relayer_sender, mut relayer_receiver) = broadcast::channel(CHANNEL_CAPACITY);
+        let (sender, receiver) = mpsc::channel(CHANNEL_CAPACITY);
+        let mut task = LazerPublisherTask {
+            config: config.clone(),
+            receiver,
+            pending_updates: Vec::new(),
+            relayer_sender,
+            signing_key,
+        };
+        tokio::spawn(async move { task.run().await });
+
+        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
+        match relayer_receiver.try_recv() {
+            Err(TryRecvError::Empty) => (),
+            _ => panic!("channel should be empty"),
+        }
+
+        let feed_update = FeedUpdate {
+            feed_id: Some(1),
+            source_timestamp: MessageField::some(Timestamp::now()),
+            update: Some(Update::PriceUpdate(PriceUpdate {
+                price: Some(100_000_00000000),
+                ..PriceUpdate::default()
+            })),
+            special_fields: Default::default(),
+        };
+        sender.send(feed_update.clone()).await.unwrap();
+        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
+
+        match relayer_receiver.try_recv() {
+            Ok(transaction) => {
+                let lazer_transaction =
+                    LazerTransaction::parse_from_bytes(transaction.payload.unwrap().as_slice())
+                        .unwrap();
+                let publisher_update =
+                    if let lazer_transaction::Payload::PublisherUpdate(publisher_update) =
+                        lazer_transaction.payload.unwrap()
+                    {
+                        publisher_update
+                    } else {
+                        panic!("expected publisher_update")
+                    };
+                assert_eq!(publisher_update.updates.len(), 1);
+                assert_eq!(publisher_update.updates[0], feed_update);
+            }
+            _ => panic!("channel should have a transaction waiting"),
+        }
+    }
+}

+ 0 - 0
pyth-lazer-agent/src/main.rs → apps/pyth-lazer-agent/src/main.rs


+ 0 - 0
pyth-lazer-agent/src/publisher_handle.rs → apps/pyth-lazer-agent/src/publisher_handle.rs


+ 128 - 0
pyth-lazer-agent/src/relayer_session.rs → apps/pyth-lazer-agent/src/relayer_session.rs

@@ -157,3 +157,131 @@ impl RelayerSessionTask {
         }
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use crate::relayer_session::RelayerSessionTask;
+    use ed25519_dalek::{Signer, SigningKey};
+    use futures_util::StreamExt;
+    use protobuf::well_known_types::timestamp::Timestamp;
+    use protobuf::{Message, MessageField};
+    use pyth_lazer_publisher_sdk::publisher_update::feed_update::Update;
+    use pyth_lazer_publisher_sdk::publisher_update::{FeedUpdate, PriceUpdate, PublisherUpdate};
+    use pyth_lazer_publisher_sdk::transaction::lazer_transaction::Payload;
+    use pyth_lazer_publisher_sdk::transaction::signature_data::Data::Ed25519;
+    use pyth_lazer_publisher_sdk::transaction::{
+        Ed25519SignatureData, LazerTransaction, SignatureData, SignedLazerTransaction,
+    };
+    use std::net::SocketAddr;
+    use tokio::net::TcpListener;
+    use tokio::sync::{broadcast, mpsc};
+    use url::Url;
+
+    pub const RELAYER_CHANNEL_CAPACITY: usize = 1000;
+
+    fn get_private_key() -> SigningKey {
+        SigningKey::from_keypair_bytes(&[
+            105, 175, 146, 91, 32, 145, 164, 199, 37, 111, 139, 255, 44, 225, 5, 247, 154, 170,
+            238, 70, 47, 15, 9, 48, 102, 87, 180, 50, 50, 38, 148, 243, 62, 148, 219, 72, 222, 170,
+            8, 246, 176, 33, 205, 29, 118, 11, 220, 163, 214, 204, 46, 49, 132, 94, 170, 173, 244,
+            39, 179, 211, 177, 70, 252, 31,
+        ])
+        .unwrap()
+    }
+
+    pub async fn run_mock_relayer(
+        addr: SocketAddr,
+        back_sender: mpsc::Sender<SignedLazerTransaction>,
+    ) {
+        let listener = TcpListener::bind(addr).await.unwrap();
+
+        tokio::spawn(async move {
+            let Ok((stream, _)) = listener.accept().await else {
+                panic!("failed to accept mock relayer websocket connection");
+            };
+            let ws_stream = tokio_tungstenite::accept_async(stream)
+                .await
+                .expect("handshake failed");
+            let (_, mut read) = ws_stream.split();
+            while let Some(msg) = read.next().await {
+                if let Ok(msg) = msg {
+                    if msg.is_binary() {
+                        tracing::info!("Received binary message: {msg:?}");
+                        let transaction =
+                            SignedLazerTransaction::parse_from_bytes(msg.into_data().as_ref())
+                                .unwrap();
+                        back_sender.clone().send(transaction).await.unwrap();
+                    }
+                } else {
+                    tracing::error!("Received a malformed message: {msg:?}");
+                }
+            }
+        });
+    }
+
+    #[tokio::test]
+    async fn test_relayer_session() {
+        let (back_sender, mut back_receiver) = mpsc::channel(RELAYER_CHANNEL_CAPACITY);
+        let relayer_addr = "127.0.0.1:12346".parse().unwrap();
+        run_mock_relayer(relayer_addr, back_sender).await;
+        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
+
+        let (relayer_sender, relayer_receiver) = broadcast::channel(RELAYER_CHANNEL_CAPACITY);
+
+        let mut relayer_session_task = RelayerSessionTask {
+            // connection state
+            url: Url::parse("ws://127.0.0.1:12346").unwrap(),
+            token: "token1".to_string(),
+            receiver: relayer_receiver,
+        };
+        tokio::spawn(async move { relayer_session_task.run().await });
+        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
+
+        let transaction = get_signed_lazer_transaction();
+        relayer_sender
+            .send(transaction.clone())
+            .expect("relayer_sender.send failed");
+        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
+        let received_transaction = back_receiver
+            .recv()
+            .await
+            .expect("back_receiver.recv failed");
+        assert_eq!(transaction, received_transaction);
+    }
+
+    fn get_signed_lazer_transaction() -> SignedLazerTransaction {
+        let publisher_update = PublisherUpdate {
+            updates: vec![FeedUpdate {
+                feed_id: Some(1),
+                source_timestamp: MessageField::some(Timestamp::now()),
+                update: Some(Update::PriceUpdate(PriceUpdate {
+                    price: Some(1_000_000_000i64),
+                    ..PriceUpdate::default()
+                })),
+                special_fields: Default::default(),
+            }],
+            publisher_timestamp: MessageField::some(Timestamp::now()),
+            special_fields: Default::default(),
+        };
+        let lazer_transaction = LazerTransaction {
+            payload: Some(Payload::PublisherUpdate(publisher_update)),
+            special_fields: Default::default(),
+        };
+        let buf = lazer_transaction.write_to_bytes().unwrap();
+        let signing_key = get_private_key();
+        let signature = signing_key.sign(&buf);
+        let signature_data = SignatureData {
+            data: Some(Ed25519(Ed25519SignatureData {
+                signature: Some(signature.to_bytes().into()),
+                public_key: Some(signing_key.verifying_key().to_bytes().into()),
+                special_fields: Default::default(),
+            })),
+            special_fields: Default::default(),
+        };
+        SignedLazerTransaction {
+            signature_data: MessageField::some(signature_data),
+            payload: Some(buf),
+            special_fields: Default::default(),
+        }
+    }
+}

+ 0 - 0
pyth-lazer-agent/src/websocket_utils.rs → apps/pyth-lazer-agent/src/websocket_utils.rs


+ 0 - 152
pyth-lazer-agent/src/lazer_publisher.rs

@@ -1,152 +0,0 @@
-use crate::config::{CHANNEL_CAPACITY, Config};
-use crate::relayer_session::RelayerSessionTask;
-use anyhow::{Context, Result, bail};
-use ed25519_dalek::{Signer, SigningKey};
-use protobuf::well_known_types::timestamp::Timestamp;
-use protobuf::{Message, MessageField};
-use pyth_lazer_publisher_sdk::publisher_update::{FeedUpdate, PublisherUpdate};
-use pyth_lazer_publisher_sdk::transaction::lazer_transaction::Payload;
-use pyth_lazer_publisher_sdk::transaction::signature_data::Data::Ed25519;
-use pyth_lazer_publisher_sdk::transaction::{
-    Ed25519SignatureData, LazerTransaction, SignatureData, SignedLazerTransaction,
-};
-use solana_keypair::read_keypair_file;
-use tokio::sync::broadcast;
-use tokio::{
-    select,
-    sync::mpsc::{self, Receiver, Sender},
-    time::interval,
-};
-use tracing::error;
-
-#[derive(Clone)]
-pub struct LazerPublisher {
-    sender: Sender<FeedUpdate>,
-}
-
-impl LazerPublisher {
-    pub async fn new(config: &Config) -> Self {
-        let (relayer_sender, _) = broadcast::channel(CHANNEL_CAPACITY);
-        for url in config.relayer_urls.iter() {
-            let mut task = RelayerSessionTask {
-                url: url.clone(),
-                token: config.authorization_token.to_owned(),
-                receiver: relayer_sender.subscribe(),
-            };
-            tokio::spawn(async move { task.run().await });
-        }
-
-        let (sender, receiver) = mpsc::channel(CHANNEL_CAPACITY);
-        let mut task = LazerPublisherTask {
-            config: config.clone(),
-            receiver,
-            pending_updates: Vec::new(),
-            relayer_sender,
-        };
-        tokio::spawn(async move { task.run().await });
-        Self { sender }
-    }
-
-    pub async fn push_feed_update(&self, feed_update: FeedUpdate) -> Result<()> {
-        self.sender.send(feed_update).await?;
-        Ok(())
-    }
-}
-
-struct LazerPublisherTask {
-    // connection state
-    config: Config,
-    receiver: Receiver<FeedUpdate>,
-    pending_updates: Vec<FeedUpdate>,
-    relayer_sender: broadcast::Sender<SignedLazerTransaction>,
-}
-
-impl LazerPublisherTask {
-    fn load_signing_key(&self) -> Result<SigningKey> {
-        // Read the keypair from the file using Solana SDK because it's the same key used by the Pythnet publisher
-        let publish_keypair = match read_keypair_file(&self.config.publish_keypair_path) {
-            Ok(k) => k,
-            Err(e) => {
-                tracing::error!(
-                    error = ?e,
-                    publish_keypair_path = self.config.publish_keypair_path.display().to_string(),
-                    "Reading publish keypair returned an error. ",
-                );
-                bail!("Reading publish keypair returned an error.");
-            }
-        };
-
-        SigningKey::from_keypair_bytes(&publish_keypair.to_bytes())
-            .context("Failed to create signing key from keypair")
-    }
-
-    pub async fn run(&mut self) {
-        let signing_key = match self.load_signing_key() {
-            Ok(signing_key) => signing_key,
-            Err(e) => {
-                tracing::error!("Failed to load signing key: {e:?}");
-                // Can't proceed on key failure
-                panic!("Failed to load signing key: {e:?}");
-            }
-        };
-
-        let mut publish_interval = interval(self.config.publish_interval_duration);
-        loop {
-            select! {
-                Some(feed_update) = self.receiver.recv() => {
-                    self.pending_updates.push(feed_update);
-                }
-                _ = publish_interval.tick() => {
-                    if let Err(err) = self.batch_transaction(&signing_key).await {
-                        error!("Failed to publish updates: {}", err);
-                    }
-                }
-            }
-        }
-    }
-
-    async fn batch_transaction(&mut self, signing_key: &SigningKey) -> Result<()> {
-        if self.pending_updates.is_empty() {
-            return Ok(());
-        }
-
-        let publisher_update = PublisherUpdate {
-            updates: self.pending_updates.drain(..).collect(),
-            publisher_timestamp: MessageField::some(Timestamp::now()),
-            special_fields: Default::default(),
-        };
-        let lazer_transaction = LazerTransaction {
-            payload: Some(Payload::PublisherUpdate(publisher_update)),
-            special_fields: Default::default(),
-        };
-        let buf = match lazer_transaction.write_to_bytes() {
-            Ok(buf) => buf,
-            Err(e) => {
-                tracing::warn!("Failed to encode Lazer transaction to bytes: {:?}", e);
-                bail!("Failed to encode Lazer transaction")
-            }
-        };
-        let signature = signing_key.sign(&buf);
-        let signature_data = SignatureData {
-            data: Some(Ed25519(Ed25519SignatureData {
-                signature: Some(signature.to_bytes().into()),
-                public_key: Some(signing_key.verifying_key().to_bytes().into()),
-                special_fields: Default::default(),
-            })),
-            special_fields: Default::default(),
-        };
-        let signed_lazer_transaction = SignedLazerTransaction {
-            signature_data: MessageField::some(signature_data),
-            payload: Some(buf),
-            special_fields: Default::default(),
-        };
-        match self.relayer_sender.send(signed_lazer_transaction.clone()) {
-            Ok(_) => (),
-            Err(e) => {
-                tracing::error!("Error sending transaction to relayer receivers: {e}");
-            }
-        }
-
-        Ok(())
-    }
-}