client.go 8.6 KB


  1. package solana
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/binary"
  6. "fmt"
  7. "github.com/certusone/wormhole/node/pkg/common"
  8. "github.com/certusone/wormhole/node/pkg/p2p"
  9. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  10. "github.com/certusone/wormhole/node/pkg/supervisor"
  11. "github.com/certusone/wormhole/node/pkg/vaa"
  12. "github.com/dfuse-io/solana-go"
  13. "github.com/dfuse-io/solana-go/rpc"
  14. eth_common "github.com/ethereum/go-ethereum/common"
  15. "github.com/mr-tron/base58"
  16. "github.com/prometheus/client_golang/prometheus"
  17. "go.uber.org/zap"
  18. "math/big"
  19. "time"
  20. )
  21. type SolanaWatcher struct {
  22. bridge solana.PublicKey
  23. wsUrl string
  24. rpcUrl string
  25. lockEvent chan *common.ChainLock
  26. }
  27. var (
  28. solanaConnectionErrors = prometheus.NewCounterVec(
  29. prometheus.CounterOpts{
  30. Name: "wormhole_solana_connection_errors_total",
  31. Help: "Total number of Solana connection errors",
  32. }, []string{"reason"})
  33. solanaAccountSkips = prometheus.NewCounterVec(
  34. prometheus.CounterOpts{
  35. Name: "wormhole_solana_account_updates_skipped_total",
  36. Help: "Total number of account updates skipped due to invalid data",
  37. }, []string{"reason"})
  38. solanaLockupsConfirmed = prometheus.NewCounter(
  39. prometheus.CounterOpts{
  40. Name: "wormhole_solana_lockups_confirmed_total",
  41. Help: "Total number of verified Solana lockups found",
  42. })
  43. currentSolanaHeight = prometheus.NewGauge(
  44. prometheus.GaugeOpts{
  45. Name: "wormhole_solana_current_height",
  46. Help: "Current Solana slot height (at default commitment level, not the level used for lockups)",
  47. })
  48. queryLatency = prometheus.NewHistogramVec(
  49. prometheus.HistogramOpts{
  50. Name: "wormhole_solana_query_latency",
  51. Help: "Latency histogram for Solana RPC calls",
  52. }, []string{"operation"})
  53. )
  54. func init() {
  55. prometheus.MustRegister(solanaConnectionErrors)
  56. prometheus.MustRegister(solanaAccountSkips)
  57. prometheus.MustRegister(solanaLockupsConfirmed)
  58. prometheus.MustRegister(currentSolanaHeight)
  59. prometheus.MustRegister(queryLatency)
  60. }
  61. func NewSolanaWatcher(wsUrl, rpcUrl string, bridgeAddress solana.PublicKey, lockEvents chan *common.ChainLock) *SolanaWatcher {
  62. return &SolanaWatcher{bridge: bridgeAddress, wsUrl: wsUrl, rpcUrl: rpcUrl, lockEvent: lockEvents}
  63. }
  64. func (s *SolanaWatcher) Run(ctx context.Context) error {
  65. // Initialize gossip metrics (we want to broadcast the address even if we're not yet syncing)
  66. bridgeAddr := base58.Encode(s.bridge[:])
  67. p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDSolana, &gossipv1.Heartbeat_Network{
  68. BridgeAddress: bridgeAddr,
  69. })
  70. rpcClient := rpc.NewClient(s.rpcUrl)
  71. logger := supervisor.Logger(ctx)
  72. errC := make(chan error)
  73. go func() {
  74. timer := time.NewTicker(time.Second * 5)
  75. defer timer.Stop()
  76. for {
  77. select {
  78. case <-ctx.Done():
  79. return
  80. case <-timer.C:
  81. func() {
  82. // Get current slot height
  83. rCtx, cancel := context.WithTimeout(ctx, time.Second*5)
  84. defer cancel()
  85. start := time.Now()
  86. slot, err := rpcClient.GetSlot(rCtx, "")
  87. queryLatency.WithLabelValues("get_slot").Observe(time.Since(start).Seconds())
  88. if err != nil {
  89. solanaConnectionErrors.WithLabelValues("get_slot_error").Inc()
  90. errC <- err
  91. return
  92. }
  93. currentSolanaHeight.Set(float64(slot))
  94. p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDSolana, &gossipv1.Heartbeat_Network{
  95. Height: int64(slot),
  96. BridgeAddress: bridgeAddr,
  97. })
  98. logger.Info("current Solana height", zap.Uint64("slot", uint64(slot)))
  99. // Find TransferOutProposal accounts without a VAA
  100. rCtx, cancel = context.WithTimeout(ctx, time.Second*5)
  101. defer cancel()
  102. start = time.Now()
  103. accounts, err := rpcClient.GetProgramAccounts(rCtx, s.bridge, &rpc.GetProgramAccountsOpts{
  104. Commitment: rpc.CommitmentMax, // TODO: deprecated, use Finalized
  105. Filters: []rpc.RPCFilter{
  106. {
  107. DataSize: 1184, // Search for TransferOutProposal accounts
  108. },
  109. {
  110. Memcmp: &rpc.RPCFilterMemcmp{
  111. Offset: 1140, // Offset of VaaTime
  112. Bytes: solana.Base58{0, 0, 0, 0}, // VAA time is 0 when no VAA is present
  113. },
  114. },
  115. },
  116. })
  117. queryLatency.WithLabelValues("get_program_accounts").Observe(time.Since(start).Seconds())
  118. if err != nil {
  119. solanaConnectionErrors.WithLabelValues("get_program_account_error").Inc()
  120. errC <- err
  121. return
  122. }
  123. logger.Debug("fetched transfer proposals without VAA",
  124. zap.Int("n", len(accounts)),
  125. zap.Duration("took", time.Since(start)),
  126. )
  127. for _, acc := range accounts {
  128. proposal, err := ParseTransferOutProposal(acc.Account.Data)
  129. if err != nil {
  130. solanaAccountSkips.WithLabelValues("parse_transfer_out").Inc()
  131. logger.Warn(
  132. "failed to parse transfer proposal",
  133. zap.Stringer("account", acc.Pubkey),
  134. zap.Error(err),
  135. )
  136. continue
  137. }
  138. // VAA submitted
  139. if proposal.VaaTime.Unix() != 0 {
  140. solanaAccountSkips.WithLabelValues("is_submitted_vaa").Inc()
  141. continue
  142. }
  143. var txHash eth_common.Hash
  144. copy(txHash[:], acc.Pubkey[:])
  145. lock := &common.ChainLock{
  146. TxHash: txHash,
  147. Timestamp: proposal.LockupTime,
  148. Nonce: proposal.Nonce,
  149. SourceAddress: proposal.SourceAddress,
  150. TargetAddress: proposal.ForeignAddress,
  151. SourceChain: vaa.ChainIDSolana,
  152. TargetChain: proposal.ToChainID,
  153. TokenChain: proposal.Asset.Chain,
  154. TokenAddress: proposal.Asset.Address,
  155. TokenDecimals: proposal.Asset.Decimals,
  156. Amount: proposal.Amount,
  157. }
  158. solanaLockupsConfirmed.Inc()
  159. logger.Info("found lockup without VAA", zap.Stringer("lockup_address", acc.Pubkey))
  160. s.lockEvent <- lock
  161. }
  162. }()
  163. }
  164. }
  165. }()
  166. select {
  167. case <-ctx.Done():
  168. return ctx.Err()
  169. case err := <-errC:
  170. return err
  171. }
  172. }
  173. type (
  174. TransferOutProposal struct {
  175. Amount *big.Int
  176. ToChainID vaa.ChainID
  177. SourceAddress vaa.Address
  178. ForeignAddress vaa.Address
  179. Asset vaa.AssetMeta
  180. Nonce uint32
  181. VAA [1001]byte
  182. VaaTime time.Time
  183. LockupTime time.Time
  184. PokeCounter uint8
  185. SignatureAccount solana.PublicKey
  186. }
  187. )
  188. func ParseTransferOutProposal(data []byte) (*TransferOutProposal, error) {
  189. prop := &TransferOutProposal{}
  190. r := bytes.NewBuffer(data)
  191. var amountBytes [32]byte
  192. if n, err := r.Read(amountBytes[:]); err != nil || n != 32 {
  193. return nil, fmt.Errorf("failed to read amount: %w", err)
  194. }
  195. // Reverse (little endian -> big endian)
  196. for i := 0; i < len(amountBytes)/2; i++ {
  197. amountBytes[i], amountBytes[len(amountBytes)-i-1] = amountBytes[len(amountBytes)-i-1], amountBytes[i]
  198. }
  199. prop.Amount = new(big.Int).SetBytes(amountBytes[:])
  200. if err := binary.Read(r, binary.LittleEndian, &prop.ToChainID); err != nil {
  201. return nil, fmt.Errorf("failed to read to chain id: %w", err)
  202. }
  203. if n, err := r.Read(prop.SourceAddress[:]); err != nil || n != 32 {
  204. return nil, fmt.Errorf("failed to read source address: %w", err)
  205. }
  206. if n, err := r.Read(prop.ForeignAddress[:]); err != nil || n != 32 {
  207. return nil, fmt.Errorf("failed to read source address: %w", err)
  208. }
  209. assetMeta := vaa.AssetMeta{}
  210. if n, err := r.Read(assetMeta.Address[:]); err != nil || n != 32 {
  211. return nil, fmt.Errorf("failed to read asset meta address: %w", err)
  212. }
  213. if err := binary.Read(r, binary.LittleEndian, &assetMeta.Chain); err != nil {
  214. return nil, fmt.Errorf("failed to read asset meta chain: %w", err)
  215. }
  216. if err := binary.Read(r, binary.LittleEndian, &assetMeta.Decimals); err != nil {
  217. return nil, fmt.Errorf("failed to read asset meta decimals: %w", err)
  218. }
  219. prop.Asset = assetMeta
  220. // Skip alignment byte
  221. r.Next(1)
  222. if err := binary.Read(r, binary.LittleEndian, &prop.Nonce); err != nil {
  223. return nil, fmt.Errorf("failed to read nonce: %w", err)
  224. }
  225. if n, err := r.Read(prop.VAA[:]); err != nil || n != 1001 {
  226. return nil, fmt.Errorf("failed to read vaa: %w", err)
  227. }
  228. // Skip alignment bytes
  229. r.Next(3)
  230. var vaaTime uint32
  231. if err := binary.Read(r, binary.LittleEndian, &vaaTime); err != nil {
  232. return nil, fmt.Errorf("failed to read vaa time: %w", err)
  233. }
  234. prop.VaaTime = time.Unix(int64(vaaTime), 0)
  235. var lockupTime uint32
  236. if err := binary.Read(r, binary.LittleEndian, &lockupTime); err != nil {
  237. return nil, fmt.Errorf("failed to read lockup time: %w", err)
  238. }
  239. prop.LockupTime = time.Unix(int64(lockupTime), 0)
  240. if err := binary.Read(r, binary.LittleEndian, &prop.PokeCounter); err != nil {
  241. return nil, fmt.Errorf("failed to read poke counter: %w", err)
  242. }
  243. if n, err := r.Read(prop.SignatureAccount[:]); err != nil || n != 32 {
  244. return nil, fmt.Errorf("failed to read signature account: %w", err)
  245. }
  246. return prop, nil
  247. }