| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284 |
- package solana
- import (
- "bytes"
- "context"
- "encoding/binary"
- "fmt"
- "github.com/certusone/wormhole/node/pkg/common"
- "github.com/certusone/wormhole/node/pkg/p2p"
- gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
- "github.com/certusone/wormhole/node/pkg/supervisor"
- "github.com/certusone/wormhole/node/pkg/vaa"
- "github.com/dfuse-io/solana-go"
- "github.com/dfuse-io/solana-go/rpc"
- eth_common "github.com/ethereum/go-ethereum/common"
- "github.com/mr-tron/base58"
- "github.com/prometheus/client_golang/prometheus"
- "go.uber.org/zap"
- "math/big"
- "time"
- )
- type SolanaWatcher struct {
- bridge solana.PublicKey
- wsUrl string
- rpcUrl string
- lockEvent chan *common.ChainLock
- }
- var (
- solanaConnectionErrors = prometheus.NewCounterVec(
- prometheus.CounterOpts{
- Name: "wormhole_solana_connection_errors_total",
- Help: "Total number of Solana connection errors",
- }, []string{"reason"})
- solanaAccountSkips = prometheus.NewCounterVec(
- prometheus.CounterOpts{
- Name: "wormhole_solana_account_updates_skipped_total",
- Help: "Total number of account updates skipped due to invalid data",
- }, []string{"reason"})
- solanaLockupsConfirmed = prometheus.NewCounter(
- prometheus.CounterOpts{
- Name: "wormhole_solana_lockups_confirmed_total",
- Help: "Total number of verified Solana lockups found",
- })
- currentSolanaHeight = prometheus.NewGauge(
- prometheus.GaugeOpts{
- Name: "wormhole_solana_current_height",
- Help: "Current Solana slot height (at default commitment level, not the level used for lockups)",
- })
- queryLatency = prometheus.NewHistogramVec(
- prometheus.HistogramOpts{
- Name: "wormhole_solana_query_latency",
- Help: "Latency histogram for Solana RPC calls",
- }, []string{"operation"})
- )
- func init() {
- prometheus.MustRegister(solanaConnectionErrors)
- prometheus.MustRegister(solanaAccountSkips)
- prometheus.MustRegister(solanaLockupsConfirmed)
- prometheus.MustRegister(currentSolanaHeight)
- prometheus.MustRegister(queryLatency)
- }
- func NewSolanaWatcher(wsUrl, rpcUrl string, bridgeAddress solana.PublicKey, lockEvents chan *common.ChainLock) *SolanaWatcher {
- return &SolanaWatcher{bridge: bridgeAddress, wsUrl: wsUrl, rpcUrl: rpcUrl, lockEvent: lockEvents}
- }
- func (s *SolanaWatcher) Run(ctx context.Context) error {
- // Initialize gossip metrics (we want to broadcast the address even if we're not yet syncing)
- bridgeAddr := base58.Encode(s.bridge[:])
- p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDSolana, &gossipv1.Heartbeat_Network{
- BridgeAddress: bridgeAddr,
- })
- rpcClient := rpc.NewClient(s.rpcUrl)
- logger := supervisor.Logger(ctx)
- errC := make(chan error)
- go func() {
- timer := time.NewTicker(time.Second * 5)
- defer timer.Stop()
- for {
- select {
- case <-ctx.Done():
- return
- case <-timer.C:
- func() {
- // Get current slot height
- rCtx, cancel := context.WithTimeout(ctx, time.Second*5)
- defer cancel()
- start := time.Now()
- slot, err := rpcClient.GetSlot(rCtx, "")
- queryLatency.WithLabelValues("get_slot").Observe(time.Since(start).Seconds())
- if err != nil {
- solanaConnectionErrors.WithLabelValues("get_slot_error").Inc()
- errC <- err
- return
- }
- currentSolanaHeight.Set(float64(slot))
- p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDSolana, &gossipv1.Heartbeat_Network{
- Height: int64(slot),
- BridgeAddress: bridgeAddr,
- })
- logger.Info("current Solana height", zap.Uint64("slot", uint64(slot)))
- // Find TransferOutProposal accounts without a VAA
- rCtx, cancel = context.WithTimeout(ctx, time.Second*5)
- defer cancel()
- start = time.Now()
- accounts, err := rpcClient.GetProgramAccounts(rCtx, s.bridge, &rpc.GetProgramAccountsOpts{
- Commitment: rpc.CommitmentMax, // TODO: deprecated, use Finalized
- Filters: []rpc.RPCFilter{
- {
- DataSize: 1184, // Search for TransferOutProposal accounts
- },
- {
- Memcmp: &rpc.RPCFilterMemcmp{
- Offset: 1140, // Offset of VaaTime
- Bytes: solana.Base58{0, 0, 0, 0}, // VAA time is 0 when no VAA is present
- },
- },
- },
- })
- queryLatency.WithLabelValues("get_program_accounts").Observe(time.Since(start).Seconds())
- if err != nil {
- solanaConnectionErrors.WithLabelValues("get_program_account_error").Inc()
- errC <- err
- return
- }
- logger.Debug("fetched transfer proposals without VAA",
- zap.Int("n", len(accounts)),
- zap.Duration("took", time.Since(start)),
- )
- for _, acc := range accounts {
- proposal, err := ParseTransferOutProposal(acc.Account.Data)
- if err != nil {
- solanaAccountSkips.WithLabelValues("parse_transfer_out").Inc()
- logger.Warn(
- "failed to parse transfer proposal",
- zap.Stringer("account", acc.Pubkey),
- zap.Error(err),
- )
- continue
- }
- // VAA submitted
- if proposal.VaaTime.Unix() != 0 {
- solanaAccountSkips.WithLabelValues("is_submitted_vaa").Inc()
- continue
- }
- var txHash eth_common.Hash
- copy(txHash[:], acc.Pubkey[:])
- lock := &common.ChainLock{
- TxHash: txHash,
- Timestamp: proposal.LockupTime,
- Nonce: proposal.Nonce,
- SourceAddress: proposal.SourceAddress,
- TargetAddress: proposal.ForeignAddress,
- SourceChain: vaa.ChainIDSolana,
- TargetChain: proposal.ToChainID,
- TokenChain: proposal.Asset.Chain,
- TokenAddress: proposal.Asset.Address,
- TokenDecimals: proposal.Asset.Decimals,
- Amount: proposal.Amount,
- }
- solanaLockupsConfirmed.Inc()
- logger.Info("found lockup without VAA", zap.Stringer("lockup_address", acc.Pubkey))
- s.lockEvent <- lock
- }
- }()
- }
- }
- }()
- select {
- case <-ctx.Done():
- return ctx.Err()
- case err := <-errC:
- return err
- }
- }
- type (
- TransferOutProposal struct {
- Amount *big.Int
- ToChainID vaa.ChainID
- SourceAddress vaa.Address
- ForeignAddress vaa.Address
- Asset vaa.AssetMeta
- Nonce uint32
- VAA [1001]byte
- VaaTime time.Time
- LockupTime time.Time
- PokeCounter uint8
- SignatureAccount solana.PublicKey
- }
- )
- func ParseTransferOutProposal(data []byte) (*TransferOutProposal, error) {
- prop := &TransferOutProposal{}
- r := bytes.NewBuffer(data)
- var amountBytes [32]byte
- if n, err := r.Read(amountBytes[:]); err != nil || n != 32 {
- return nil, fmt.Errorf("failed to read amount: %w", err)
- }
- // Reverse (little endian -> big endian)
- for i := 0; i < len(amountBytes)/2; i++ {
- amountBytes[i], amountBytes[len(amountBytes)-i-1] = amountBytes[len(amountBytes)-i-1], amountBytes[i]
- }
- prop.Amount = new(big.Int).SetBytes(amountBytes[:])
- if err := binary.Read(r, binary.LittleEndian, &prop.ToChainID); err != nil {
- return nil, fmt.Errorf("failed to read to chain id: %w", err)
- }
- if n, err := r.Read(prop.SourceAddress[:]); err != nil || n != 32 {
- return nil, fmt.Errorf("failed to read source address: %w", err)
- }
- if n, err := r.Read(prop.ForeignAddress[:]); err != nil || n != 32 {
- return nil, fmt.Errorf("failed to read source address: %w", err)
- }
- assetMeta := vaa.AssetMeta{}
- if n, err := r.Read(assetMeta.Address[:]); err != nil || n != 32 {
- return nil, fmt.Errorf("failed to read asset meta address: %w", err)
- }
- if err := binary.Read(r, binary.LittleEndian, &assetMeta.Chain); err != nil {
- return nil, fmt.Errorf("failed to read asset meta chain: %w", err)
- }
- if err := binary.Read(r, binary.LittleEndian, &assetMeta.Decimals); err != nil {
- return nil, fmt.Errorf("failed to read asset meta decimals: %w", err)
- }
- prop.Asset = assetMeta
- // Skip alignment byte
- r.Next(1)
- if err := binary.Read(r, binary.LittleEndian, &prop.Nonce); err != nil {
- return nil, fmt.Errorf("failed to read nonce: %w", err)
- }
- if n, err := r.Read(prop.VAA[:]); err != nil || n != 1001 {
- return nil, fmt.Errorf("failed to read vaa: %w", err)
- }
- // Skip alignment bytes
- r.Next(3)
- var vaaTime uint32
- if err := binary.Read(r, binary.LittleEndian, &vaaTime); err != nil {
- return nil, fmt.Errorf("failed to read vaa time: %w", err)
- }
- prop.VaaTime = time.Unix(int64(vaaTime), 0)
- var lockupTime uint32
- if err := binary.Read(r, binary.LittleEndian, &lockupTime); err != nil {
- return nil, fmt.Errorf("failed to read lockup time: %w", err)
- }
- prop.LockupTime = time.Unix(int64(lockupTime), 0)
- if err := binary.Read(r, binary.LittleEndian, &prop.PokeCounter); err != nil {
- return nil, fmt.Errorf("failed to read poke counter: %w", err)
- }
- if n, err := r.Read(prop.SignatureAccount[:]); err != nil || n != 32 {
- return nil, fmt.Errorf("failed to read signature account: %w", err)
- }
- return prop, nil
- }
|