client.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. package solana
  2. import (
  3. "context"
  4. "github.com/certusone/wormhole/bridge/pkg/common"
  5. "github.com/certusone/wormhole/bridge/pkg/p2p"
  6. gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
  7. "github.com/certusone/wormhole/bridge/pkg/supervisor"
  8. "github.com/certusone/wormhole/bridge/pkg/vaa"
  9. "github.com/dfuse-io/solana-go"
  10. "github.com/dfuse-io/solana-go/rpc"
  11. eth_common "github.com/ethereum/go-ethereum/common"
  12. "github.com/mr-tron/base58"
  13. "github.com/near/borsh-go"
  14. "github.com/prometheus/client_golang/prometheus"
  15. "go.uber.org/zap"
  16. "time"
  17. )
  18. type SolanaWatcher struct {
  19. bridge solana.PublicKey
  20. wsUrl string
  21. rpcUrl string
  22. messageEvent chan *common.MessagePublication
  23. }
  24. var (
  25. solanaConnectionErrors = prometheus.NewCounterVec(
  26. prometheus.CounterOpts{
  27. Name: "wormhole_solana_connection_errors_total",
  28. Help: "Total number of Solana connection errors",
  29. }, []string{"reason"})
  30. solanaAccountSkips = prometheus.NewCounterVec(
  31. prometheus.CounterOpts{
  32. Name: "wormhole_solana_account_updates_skipped_total",
  33. Help: "Total number of account updates skipped due to invalid data",
  34. }, []string{"reason"})
  35. solanaLockupsConfirmed = prometheus.NewCounter(
  36. prometheus.CounterOpts{
  37. Name: "wormhole_solana_lockups_confirmed_total",
  38. Help: "Total number of verified Solana lockups found",
  39. })
  40. currentSolanaHeight = prometheus.NewGauge(
  41. prometheus.GaugeOpts{
  42. Name: "wormhole_solana_current_height",
  43. Help: "Current Solana slot height (at default commitment level, not the level used for lockups)",
  44. })
  45. queryLatency = prometheus.NewHistogramVec(
  46. prometheus.HistogramOpts{
  47. Name: "wormhole_solana_query_latency",
  48. Help: "Latency histogram for Solana RPC calls",
  49. }, []string{"operation"})
  50. )
  51. func init() {
  52. prometheus.MustRegister(solanaConnectionErrors)
  53. prometheus.MustRegister(solanaAccountSkips)
  54. prometheus.MustRegister(solanaLockupsConfirmed)
  55. prometheus.MustRegister(currentSolanaHeight)
  56. prometheus.MustRegister(queryLatency)
  57. }
  58. func NewSolanaWatcher(wsUrl, rpcUrl string, bridgeAddress solana.PublicKey, messageEvents chan *common.MessagePublication) *SolanaWatcher {
  59. return &SolanaWatcher{bridge: bridgeAddress, wsUrl: wsUrl, rpcUrl: rpcUrl, messageEvent: messageEvents}
  60. }
  61. func (s *SolanaWatcher) Run(ctx context.Context) error {
  62. // Initialize gossip metrics (we want to broadcast the address even if we're not yet syncing)
  63. bridgeAddr := base58.Encode(s.bridge[:])
  64. p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDSolana, &gossipv1.Heartbeat_Network{
  65. BridgeAddress: bridgeAddr,
  66. })
  67. rpcClient := rpc.NewClient(s.rpcUrl)
  68. logger := supervisor.Logger(ctx)
  69. errC := make(chan error)
  70. go func() {
  71. timer := time.NewTicker(time.Second * 5)
  72. defer timer.Stop()
  73. for {
  74. select {
  75. case <-ctx.Done():
  76. return
  77. case <-timer.C:
  78. func() {
  79. // Get current slot height
  80. rCtx, cancel := context.WithTimeout(ctx, time.Second*5)
  81. defer cancel()
  82. start := time.Now()
  83. slot, err := rpcClient.GetSlot(rCtx, "")
  84. queryLatency.WithLabelValues("get_slot").Observe(time.Since(start).Seconds())
  85. if err != nil {
  86. solanaConnectionErrors.WithLabelValues("get_slot_error").Inc()
  87. errC <- err
  88. return
  89. }
  90. currentSolanaHeight.Set(float64(slot))
  91. p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDSolana, &gossipv1.Heartbeat_Network{
  92. Height: int64(slot),
  93. BridgeAddress: bridgeAddr,
  94. })
  95. logger.Info("current Solana height", zap.Uint64("slot", uint64(slot)))
  96. // Find MessagePublicationAccount accounts without a VAA
  97. rCtx, cancel = context.WithTimeout(ctx, time.Second*5)
  98. defer cancel()
  99. start = time.Now()
  100. accounts, err := rpcClient.GetProgramAccounts(rCtx, s.bridge, &rpc.GetProgramAccountsOpts{
  101. Commitment: rpc.CommitmentMax, // TODO: deprecated, use Finalized
  102. Filters: []rpc.RPCFilter{
  103. {
  104. Memcmp: &rpc.RPCFilterMemcmp{
  105. Offset: 0, // Start of the account
  106. Bytes: solana.Base58{'m', 's', 'g'}, // Prefix of the posted message accounts
  107. },
  108. },
  109. {
  110. Memcmp: &rpc.RPCFilterMemcmp{
  111. Offset: 4, // Start of the Persist flag
  112. Bytes: solana.Base58{0x01}, // Only grab messages that need to be persisted
  113. },
  114. },
  115. {
  116. Memcmp: &rpc.RPCFilterMemcmp{
  117. Offset: 5, // Offset of VaaTime
  118. Bytes: solana.Base58{0, 0, 0, 0}, // This means this VAA hasn't been signed yet
  119. },
  120. },
  121. },
  122. })
  123. queryLatency.WithLabelValues("get_program_accounts").Observe(time.Since(start).Seconds())
  124. if err != nil {
  125. solanaConnectionErrors.WithLabelValues("get_program_account_error").Inc()
  126. errC <- err
  127. return
  128. }
  129. logger.Debug("fetched transfer proposals without VAA",
  130. zap.Int("n", len(accounts)),
  131. zap.Duration("took", time.Since(start)),
  132. )
  133. for _, acc := range accounts {
  134. proposal, err := ParseTransferOutProposal(acc.Account.Data)
  135. if err != nil {
  136. solanaAccountSkips.WithLabelValues("parse_transfer_out").Inc()
  137. logger.Warn(
  138. "failed to parse transfer proposal",
  139. zap.Stringer("account", acc.Pubkey),
  140. zap.Error(err),
  141. )
  142. continue
  143. }
  144. // VAA submitted
  145. if proposal.VaaTime != 0 {
  146. solanaAccountSkips.WithLabelValues("is_submitted_vaa").Inc()
  147. continue
  148. }
  149. var txHash eth_common.Hash
  150. copy(txHash[:], acc.Pubkey[:])
  151. lock := &common.MessagePublication{
  152. TxHash: txHash,
  153. Timestamp: time.Unix(int64(proposal.SubmissionTime), 0),
  154. Nonce: proposal.Nonce,
  155. Sequence: proposal.Sequence,
  156. EmitterChain: vaa.ChainIDSolana,
  157. EmitterAddress: proposal.EmitterAddress,
  158. Payload: proposal.Payload,
  159. Persist: proposal.Persist == 1,
  160. }
  161. solanaLockupsConfirmed.Inc()
  162. logger.Info("found lockup without VAA", zap.Stringer("lockup_address", acc.Pubkey))
  163. s.messageEvent <- lock
  164. }
  165. }()
  166. }
  167. }
  168. }()
  169. select {
  170. case <-ctx.Done():
  171. return ctx.Err()
  172. case err := <-errC:
  173. return err
  174. }
  175. }
  176. type (
  177. MessagePublicationAccount struct {
  178. VaaVersion uint8
  179. // Borsh does not seem to support booleans, so 0=false / 1=true
  180. Persist uint8
  181. VaaTime uint32
  182. VaaSignatureAccount vaa.Address
  183. SubmissionTime uint32
  184. Nonce uint32
  185. Sequence uint64
  186. EmitterChain uint16
  187. EmitterAddress vaa.Address
  188. Payload []byte
  189. }
  190. )
  191. func ParseTransferOutProposal(data []byte) (*MessagePublicationAccount, error) {
  192. prop := &MessagePublicationAccount{}
  193. // Skip the b"msg" prefix
  194. if err := borsh.Deserialize(prop, data[3:]); err != nil {
  195. return nil, err
  196. }
  197. return prop, nil
  198. }