| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220 |
- package solana
- import (
- "context"
- "github.com/certusone/wormhole/bridge/pkg/common"
- "github.com/certusone/wormhole/bridge/pkg/p2p"
- gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
- "github.com/certusone/wormhole/bridge/pkg/supervisor"
- "github.com/certusone/wormhole/bridge/pkg/vaa"
- "github.com/dfuse-io/solana-go"
- "github.com/dfuse-io/solana-go/rpc"
- eth_common "github.com/ethereum/go-ethereum/common"
- "github.com/mr-tron/base58"
- "github.com/near/borsh-go"
- "github.com/prometheus/client_golang/prometheus"
- "go.uber.org/zap"
- "time"
- )
- type SolanaWatcher struct {
- bridge solana.PublicKey
- wsUrl string
- rpcUrl string
- messageEvent chan *common.MessagePublication
- }
- var (
- solanaConnectionErrors = prometheus.NewCounterVec(
- prometheus.CounterOpts{
- Name: "wormhole_solana_connection_errors_total",
- Help: "Total number of Solana connection errors",
- }, []string{"reason"})
- solanaAccountSkips = prometheus.NewCounterVec(
- prometheus.CounterOpts{
- Name: "wormhole_solana_account_updates_skipped_total",
- Help: "Total number of account updates skipped due to invalid data",
- }, []string{"reason"})
- solanaLockupsConfirmed = prometheus.NewCounter(
- prometheus.CounterOpts{
- Name: "wormhole_solana_lockups_confirmed_total",
- Help: "Total number of verified Solana lockups found",
- })
- currentSolanaHeight = prometheus.NewGauge(
- prometheus.GaugeOpts{
- Name: "wormhole_solana_current_height",
- Help: "Current Solana slot height (at default commitment level, not the level used for lockups)",
- })
- queryLatency = prometheus.NewHistogramVec(
- prometheus.HistogramOpts{
- Name: "wormhole_solana_query_latency",
- Help: "Latency histogram for Solana RPC calls",
- }, []string{"operation"})
- )
- func init() {
- prometheus.MustRegister(solanaConnectionErrors)
- prometheus.MustRegister(solanaAccountSkips)
- prometheus.MustRegister(solanaLockupsConfirmed)
- prometheus.MustRegister(currentSolanaHeight)
- prometheus.MustRegister(queryLatency)
- }
- func NewSolanaWatcher(wsUrl, rpcUrl string, bridgeAddress solana.PublicKey, messageEvents chan *common.MessagePublication) *SolanaWatcher {
- return &SolanaWatcher{bridge: bridgeAddress, wsUrl: wsUrl, rpcUrl: rpcUrl, messageEvent: messageEvents}
- }
- func (s *SolanaWatcher) Run(ctx context.Context) error {
- // Initialize gossip metrics (we want to broadcast the address even if we're not yet syncing)
- bridgeAddr := base58.Encode(s.bridge[:])
- p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDSolana, &gossipv1.Heartbeat_Network{
- BridgeAddress: bridgeAddr,
- })
- rpcClient := rpc.NewClient(s.rpcUrl)
- logger := supervisor.Logger(ctx)
- errC := make(chan error)
- go func() {
- timer := time.NewTicker(time.Second * 5)
- defer timer.Stop()
- for {
- select {
- case <-ctx.Done():
- return
- case <-timer.C:
- func() {
- // Get current slot height
- rCtx, cancel := context.WithTimeout(ctx, time.Second*5)
- defer cancel()
- start := time.Now()
- slot, err := rpcClient.GetSlot(rCtx, "")
- queryLatency.WithLabelValues("get_slot").Observe(time.Since(start).Seconds())
- if err != nil {
- solanaConnectionErrors.WithLabelValues("get_slot_error").Inc()
- errC <- err
- return
- }
- currentSolanaHeight.Set(float64(slot))
- p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDSolana, &gossipv1.Heartbeat_Network{
- Height: int64(slot),
- BridgeAddress: bridgeAddr,
- })
- logger.Info("current Solana height", zap.Uint64("slot", uint64(slot)))
- // Find MessagePublicationAccount accounts without a VAA
- rCtx, cancel = context.WithTimeout(ctx, time.Second*5)
- defer cancel()
- start = time.Now()
- accounts, err := rpcClient.GetProgramAccounts(rCtx, s.bridge, &rpc.GetProgramAccountsOpts{
- Commitment: rpc.CommitmentMax, // TODO: deprecated, use Finalized
- Filters: []rpc.RPCFilter{
- {
- Memcmp: &rpc.RPCFilterMemcmp{
- Offset: 0, // Start of the account
- Bytes: solana.Base58{'m', 's', 'g'}, // Prefix of the posted message accounts
- },
- },
- {
- Memcmp: &rpc.RPCFilterMemcmp{
- Offset: 4, // Start of the Persist flag
- Bytes: solana.Base58{0x01}, // Only grab messages that need to be persisted
- },
- },
- {
- Memcmp: &rpc.RPCFilterMemcmp{
- Offset: 5, // Offset of VaaTime
- Bytes: solana.Base58{0, 0, 0, 0}, // This means this VAA hasn't been signed yet
- },
- },
- },
- })
- queryLatency.WithLabelValues("get_program_accounts").Observe(time.Since(start).Seconds())
- if err != nil {
- solanaConnectionErrors.WithLabelValues("get_program_account_error").Inc()
- errC <- err
- return
- }
- logger.Debug("fetched transfer proposals without VAA",
- zap.Int("n", len(accounts)),
- zap.Duration("took", time.Since(start)),
- )
- for _, acc := range accounts {
- proposal, err := ParseTransferOutProposal(acc.Account.Data)
- if err != nil {
- solanaAccountSkips.WithLabelValues("parse_transfer_out").Inc()
- logger.Warn(
- "failed to parse transfer proposal",
- zap.Stringer("account", acc.Pubkey),
- zap.Error(err),
- )
- continue
- }
- // VAA submitted
- if proposal.VaaTime != 0 {
- solanaAccountSkips.WithLabelValues("is_submitted_vaa").Inc()
- continue
- }
- var txHash eth_common.Hash
- copy(txHash[:], acc.Pubkey[:])
- lock := &common.MessagePublication{
- TxHash: txHash,
- Timestamp: time.Unix(int64(proposal.SubmissionTime), 0),
- Nonce: proposal.Nonce,
- Sequence: proposal.Sequence,
- EmitterChain: vaa.ChainIDSolana,
- EmitterAddress: proposal.EmitterAddress,
- Payload: proposal.Payload,
- Persist: proposal.Persist == 1,
- }
- solanaLockupsConfirmed.Inc()
- logger.Info("found lockup without VAA", zap.Stringer("lockup_address", acc.Pubkey))
- s.messageEvent <- lock
- }
- }()
- }
- }
- }()
- select {
- case <-ctx.Done():
- return ctx.Err()
- case err := <-errC:
- return err
- }
- }
- type (
- MessagePublicationAccount struct {
- VaaVersion uint8
- // Borsh does not seem to support booleans, so 0=false / 1=true
- Persist uint8
- VaaTime uint32
- VaaSignatureAccount vaa.Address
- SubmissionTime uint32
- Nonce uint32
- Sequence uint64
- EmitterChain uint16
- EmitterAddress vaa.Address
- Payload []byte
- }
- )
- func ParseTransferOutProposal(data []byte) (*MessagePublicationAccount, error) {
- prop := &MessagePublicationAccount{}
- // Skip the b"msg" prefix
- if err := borsh.Deserialize(prop, data[3:]); err != nil {
- return nil, err
- }
- return prop, nil
- }
|