client.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. package solana
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/binary"
  6. "fmt"
  7. "github.com/certusone/wormhole/node/pkg/common"
  8. "github.com/certusone/wormhole/node/pkg/p2p"
  9. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  10. "github.com/certusone/wormhole/node/pkg/supervisor"
  11. "github.com/certusone/wormhole/node/pkg/vaa"
  12. eth_common "github.com/ethereum/go-ethereum/common"
  13. "github.com/gagliardetto/solana-go"
  14. "github.com/gagliardetto/solana-go/rpc"
  15. "github.com/mr-tron/base58"
  16. "github.com/prometheus/client_golang/prometheus"
  17. "go.uber.org/zap"
  18. "math/big"
  19. "time"
  20. )
  21. type SolanaWatcher struct {
  22. bridge solana.PublicKey
  23. wsUrl string
  24. rpcUrl string
  25. lockEvent chan *common.ChainLock
  26. }
  27. var (
  28. solanaConnectionErrors = prometheus.NewCounterVec(
  29. prometheus.CounterOpts{
  30. Name: "wormhole_solana_connection_errors_total",
  31. Help: "Total number of Solana connection errors",
  32. }, []string{"reason"})
  33. solanaAccountSkips = prometheus.NewCounterVec(
  34. prometheus.CounterOpts{
  35. Name: "wormhole_solana_account_updates_skipped_total",
  36. Help: "Total number of account updates skipped due to invalid data",
  37. }, []string{"reason"})
  38. solanaLockupsConfirmed = prometheus.NewCounter(
  39. prometheus.CounterOpts{
  40. Name: "wormhole_solana_lockups_confirmed_total",
  41. Help: "Total number of verified Solana lockups found",
  42. })
  43. currentSolanaHeight = prometheus.NewGauge(
  44. prometheus.GaugeOpts{
  45. Name: "wormhole_solana_current_height",
  46. Help: "Current Solana slot height (at default commitment level, not the level used for lockups)",
  47. })
  48. queryLatency = prometheus.NewHistogramVec(
  49. prometheus.HistogramOpts{
  50. Name: "wormhole_solana_query_latency",
  51. Help: "Latency histogram for Solana RPC calls",
  52. }, []string{"operation"})
  53. )
  54. func init() {
  55. prometheus.MustRegister(solanaConnectionErrors)
  56. prometheus.MustRegister(solanaAccountSkips)
  57. prometheus.MustRegister(solanaLockupsConfirmed)
  58. prometheus.MustRegister(currentSolanaHeight)
  59. prometheus.MustRegister(queryLatency)
  60. }
  61. func NewSolanaWatcher(wsUrl, rpcUrl string, bridgeAddress solana.PublicKey, lockEvents chan *common.ChainLock) *SolanaWatcher {
  62. return &SolanaWatcher{bridge: bridgeAddress, wsUrl: wsUrl, rpcUrl: rpcUrl, lockEvent: lockEvents}
  63. }
  64. func (s *SolanaWatcher) Run(ctx context.Context) error {
  65. // Initialize gossip metrics (we want to broadcast the address even if we're not yet syncing)
  66. bridgeAddr := base58.Encode(s.bridge[:])
  67. p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDSolana, &gossipv1.Heartbeat_Network{
  68. ContractAddress: bridgeAddr,
  69. })
  70. rpcClient := rpc.New(s.rpcUrl)
  71. logger := supervisor.Logger(ctx)
  72. errC := make(chan error)
  73. go func() {
  74. timer := time.NewTicker(time.Second * 5)
  75. defer timer.Stop()
  76. for {
  77. select {
  78. case <-ctx.Done():
  79. return
  80. case <-timer.C:
  81. func() {
  82. // Get current slot height
  83. rCtx, cancel := context.WithTimeout(ctx, time.Second*5)
  84. defer cancel()
  85. start := time.Now()
  86. slot, err := rpcClient.GetSlot(rCtx, "")
  87. queryLatency.WithLabelValues("get_slot").Observe(time.Since(start).Seconds())
  88. if err != nil {
  89. solanaConnectionErrors.WithLabelValues("get_slot_error").Inc()
  90. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSolana, 1)
  91. errC <- err
  92. return
  93. }
  94. currentSolanaHeight.Set(float64(slot))
  95. p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDSolana, &gossipv1.Heartbeat_Network{
  96. Height: int64(slot),
  97. ContractAddress: bridgeAddr,
  98. })
  99. logger.Info("current Solana height", zap.Uint64("slot", uint64(slot)))
  100. // Find TransferOutProposal accounts without a VAA
  101. rCtx, cancel = context.WithTimeout(ctx, time.Second*5)
  102. defer cancel()
  103. start = time.Now()
  104. accounts, err := rpcClient.GetProgramAccountsWithOpts(rCtx, s.bridge, &rpc.GetProgramAccountsOpts{
  105. Commitment: rpc.CommitmentFinalized,
  106. Filters: []rpc.RPCFilter{
  107. {
  108. DataSize: 1184, // Search for TransferOutProposal accounts
  109. },
  110. {
  111. Memcmp: &rpc.RPCFilterMemcmp{
  112. Offset: 1140, // Offset of VaaTime
  113. Bytes: solana.Base58{0, 0, 0, 0}, // VAA time is 0 when no VAA is present
  114. },
  115. },
  116. },
  117. })
  118. queryLatency.WithLabelValues("get_program_accounts").Observe(time.Since(start).Seconds())
  119. if err != nil {
  120. solanaConnectionErrors.WithLabelValues("get_program_account_error").Inc()
  121. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSolana, 1)
  122. errC <- err
  123. return
  124. }
  125. logger.Debug("fetched transfer proposals without VAA",
  126. zap.Int("n", len(accounts)),
  127. zap.Duration("took", time.Since(start)),
  128. )
  129. for _, acc := range accounts {
  130. proposal, err := ParseTransferOutProposal(acc.Account.Data.GetBinary())
  131. if err != nil {
  132. solanaAccountSkips.WithLabelValues("parse_transfer_out").Inc()
  133. logger.Warn(
  134. "failed to parse transfer proposal",
  135. zap.Stringer("account", acc.Pubkey),
  136. zap.Error(err),
  137. )
  138. continue
  139. }
  140. // VAA submitted
  141. if proposal.VaaTime.Unix() != 0 {
  142. solanaAccountSkips.WithLabelValues("is_submitted_vaa").Inc()
  143. continue
  144. }
  145. var txHash eth_common.Hash
  146. copy(txHash[:], acc.Pubkey[:])
  147. lock := &common.ChainLock{
  148. TxHash: txHash,
  149. Timestamp: proposal.LockupTime,
  150. Nonce: proposal.Nonce,
  151. SourceAddress: proposal.SourceAddress,
  152. TargetAddress: proposal.ForeignAddress,
  153. SourceChain: vaa.ChainIDSolana,
  154. TargetChain: proposal.ToChainID,
  155. TokenChain: proposal.Asset.Chain,
  156. TokenAddress: proposal.Asset.Address,
  157. TokenDecimals: proposal.Asset.Decimals,
  158. Amount: proposal.Amount,
  159. }
  160. solanaLockupsConfirmed.Inc()
  161. logger.Info("found lockup without VAA", zap.Stringer("lockup_address", acc.Pubkey))
  162. s.lockEvent <- lock
  163. }
  164. }()
  165. }
  166. }
  167. }()
  168. select {
  169. case <-ctx.Done():
  170. return ctx.Err()
  171. case err := <-errC:
  172. return err
  173. }
  174. }
  175. type (
  176. TransferOutProposal struct {
  177. Amount *big.Int
  178. ToChainID vaa.ChainID
  179. SourceAddress vaa.Address
  180. ForeignAddress vaa.Address
  181. Asset vaa.AssetMeta
  182. Nonce uint32
  183. VAA [1001]byte
  184. VaaTime time.Time
  185. LockupTime time.Time
  186. PokeCounter uint8
  187. SignatureAccount solana.PublicKey
  188. }
  189. )
  190. func ParseTransferOutProposal(data []byte) (*TransferOutProposal, error) {
  191. prop := &TransferOutProposal{}
  192. r := bytes.NewBuffer(data)
  193. var amountBytes [32]byte
  194. if n, err := r.Read(amountBytes[:]); err != nil || n != 32 {
  195. return nil, fmt.Errorf("failed to read amount: %w", err)
  196. }
  197. // Reverse (little endian -> big endian)
  198. for i := 0; i < len(amountBytes)/2; i++ {
  199. amountBytes[i], amountBytes[len(amountBytes)-i-1] = amountBytes[len(amountBytes)-i-1], amountBytes[i]
  200. }
  201. prop.Amount = new(big.Int).SetBytes(amountBytes[:])
  202. if err := binary.Read(r, binary.LittleEndian, &prop.ToChainID); err != nil {
  203. return nil, fmt.Errorf("failed to read to chain id: %w", err)
  204. }
  205. if n, err := r.Read(prop.SourceAddress[:]); err != nil || n != 32 {
  206. return nil, fmt.Errorf("failed to read source address: %w", err)
  207. }
  208. if n, err := r.Read(prop.ForeignAddress[:]); err != nil || n != 32 {
  209. return nil, fmt.Errorf("failed to read source address: %w", err)
  210. }
  211. assetMeta := vaa.AssetMeta{}
  212. if n, err := r.Read(assetMeta.Address[:]); err != nil || n != 32 {
  213. return nil, fmt.Errorf("failed to read asset meta address: %w", err)
  214. }
  215. if err := binary.Read(r, binary.LittleEndian, &assetMeta.Chain); err != nil {
  216. return nil, fmt.Errorf("failed to read asset meta chain: %w", err)
  217. }
  218. if err := binary.Read(r, binary.LittleEndian, &assetMeta.Decimals); err != nil {
  219. return nil, fmt.Errorf("failed to read asset meta decimals: %w", err)
  220. }
  221. prop.Asset = assetMeta
  222. // Skip alignment byte
  223. r.Next(1)
  224. if err := binary.Read(r, binary.LittleEndian, &prop.Nonce); err != nil {
  225. return nil, fmt.Errorf("failed to read nonce: %w", err)
  226. }
  227. if n, err := r.Read(prop.VAA[:]); err != nil || n != 1001 {
  228. return nil, fmt.Errorf("failed to read vaa: %w", err)
  229. }
  230. // Skip alignment bytes
  231. r.Next(3)
  232. var vaaTime uint32
  233. if err := binary.Read(r, binary.LittleEndian, &vaaTime); err != nil {
  234. return nil, fmt.Errorf("failed to read vaa time: %w", err)
  235. }
  236. prop.VaaTime = time.Unix(int64(vaaTime), 0)
  237. var lockupTime uint32
  238. if err := binary.Read(r, binary.LittleEndian, &lockupTime); err != nil {
  239. return nil, fmt.Errorf("failed to read lockup time: %w", err)
  240. }
  241. prop.LockupTime = time.Unix(int64(lockupTime), 0)
  242. if err := binary.Read(r, binary.LittleEndian, &prop.PokeCounter); err != nil {
  243. return nil, fmt.Errorf("failed to read poke counter: %w", err)
  244. }
  245. if n, err := r.Read(prop.SignatureAccount[:]); err != nil || n != 32 {
  246. return nil, fmt.Errorf("failed to read signature account: %w", err)
  247. }
  248. return prop, nil
  249. }