client.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. package solana
  2. import (
  3. "context"
  4. "github.com/certusone/wormhole/bridge/pkg/common"
  5. "github.com/certusone/wormhole/bridge/pkg/p2p"
  6. gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
  7. "github.com/certusone/wormhole/bridge/pkg/supervisor"
  8. "github.com/certusone/wormhole/bridge/pkg/vaa"
  9. "github.com/dfuse-io/solana-go"
  10. "github.com/dfuse-io/solana-go/rpc"
  11. eth_common "github.com/ethereum/go-ethereum/common"
  12. "github.com/mr-tron/base58"
  13. "github.com/near/borsh-go"
  14. "github.com/prometheus/client_golang/prometheus"
  15. "go.uber.org/zap"
  16. "time"
  17. )
  18. type SolanaWatcher struct {
  19. bridge solana.PublicKey
  20. wsUrl string
  21. rpcUrl string
  22. messageEvent chan *common.MessagePublication
  23. }
  24. var (
  25. solanaConnectionErrors = prometheus.NewCounterVec(
  26. prometheus.CounterOpts{
  27. Name: "wormhole_solana_connection_errors_total",
  28. Help: "Total number of Solana connection errors",
  29. }, []string{"reason"})
  30. solanaAccountSkips = prometheus.NewCounterVec(
  31. prometheus.CounterOpts{
  32. Name: "wormhole_solana_account_updates_skipped_total",
  33. Help: "Total number of account updates skipped due to invalid data",
  34. }, []string{"reason"})
  35. solanaLockupsConfirmed = prometheus.NewCounter(
  36. prometheus.CounterOpts{
  37. Name: "wormhole_solana_lockups_confirmed_total",
  38. Help: "Total number of verified Solana lockups found",
  39. })
  40. currentSolanaHeight = prometheus.NewGauge(
  41. prometheus.GaugeOpts{
  42. Name: "wormhole_solana_current_height",
  43. Help: "Current Solana slot height (at default commitment level, not the level used for lockups)",
  44. })
  45. queryLatency = prometheus.NewHistogramVec(
  46. prometheus.HistogramOpts{
  47. Name: "wormhole_solana_query_latency",
  48. Help: "Latency histogram for Solana RPC calls",
  49. }, []string{"operation", "commitment"})
  50. )
  51. func init() {
  52. prometheus.MustRegister(solanaConnectionErrors)
  53. prometheus.MustRegister(solanaAccountSkips)
  54. prometheus.MustRegister(solanaLockupsConfirmed)
  55. prometheus.MustRegister(currentSolanaHeight)
  56. prometheus.MustRegister(queryLatency)
  57. }
  58. func NewSolanaWatcher(wsUrl, rpcUrl string, bridgeAddress solana.PublicKey, messageEvents chan *common.MessagePublication) *SolanaWatcher {
  59. return &SolanaWatcher{bridge: bridgeAddress, wsUrl: wsUrl, rpcUrl: rpcUrl, messageEvent: messageEvents}
  60. }
  61. func (s *SolanaWatcher) Run(ctx context.Context) error {
  62. // Initialize gossip metrics (we want to broadcast the address even if we're not yet syncing)
  63. bridgeAddr := base58.Encode(s.bridge[:])
  64. p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDSolana, &gossipv1.Heartbeat_Network{
  65. BridgeAddress: bridgeAddr,
  66. })
  67. rpcClient := rpc.NewClient(s.rpcUrl)
  68. logger := supervisor.Logger(ctx)
  69. errC := make(chan error)
  70. go func() {
  71. timer := time.NewTicker(time.Second * 5)
  72. defer timer.Stop()
  73. for {
  74. select {
  75. case <-ctx.Done():
  76. return
  77. case <-timer.C:
  78. func() {
  79. // Get current slot height
  80. rCtx, cancel := context.WithTimeout(ctx, time.Second*5)
  81. defer cancel()
  82. start := time.Now()
  83. slot, err := rpcClient.GetSlot(rCtx, "")
  84. queryLatency.WithLabelValues("get_slot", "processed").Observe(time.Since(start).Seconds())
  85. if err != nil {
  86. solanaConnectionErrors.WithLabelValues("get_slot_error").Inc()
  87. errC <- err
  88. return
  89. }
  90. currentSolanaHeight.Set(float64(slot))
  91. p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDSolana, &gossipv1.Heartbeat_Network{
  92. Height: int64(slot),
  93. BridgeAddress: bridgeAddr,
  94. })
  95. logger.Info("current Solana height", zap.Uint64("slot", uint64(slot)))
  96. // Find MessagePublicationAccount accounts without a VAA
  97. rCtx, cancel = context.WithTimeout(ctx, time.Second*5)
  98. defer cancel()
  99. start = time.Now()
  100. // Get finalized accounts
  101. fAccounts, err := rpcClient.GetProgramAccounts(rCtx, s.bridge, &rpc.GetProgramAccountsOpts{
  102. Commitment: rpc.CommitmentMax, // TODO: deprecated, use Finalized
  103. Filters: []rpc.RPCFilter{
  104. {
  105. Memcmp: &rpc.RPCFilterMemcmp{
  106. Offset: 0, // Start of the account
  107. Bytes: solana.Base58{'m', 's', 'g'}, // Prefix of the posted message accounts
  108. },
  109. },
  110. {
  111. Memcmp: &rpc.RPCFilterMemcmp{
  112. Offset: 4, // Start of the Persist flag
  113. Bytes: solana.Base58{0x01}, // Only grab messages that need to be persisted
  114. },
  115. },
  116. {
  117. Memcmp: &rpc.RPCFilterMemcmp{
  118. Offset: 5, // Start of the ConsistencyLevel value
  119. Bytes: solana.Base58{32}, // Only grab messages that require max confirmations
  120. },
  121. },
  122. {
  123. Memcmp: &rpc.RPCFilterMemcmp{
  124. Offset: 6, // Offset of VaaTime
  125. Bytes: solana.Base58{0, 0, 0, 0}, // This means this VAA hasn't been signed yet
  126. },
  127. },
  128. },
  129. })
  130. queryLatency.WithLabelValues("get_program_accounts", "max").Observe(time.Since(start).Seconds())
  131. if err != nil {
  132. solanaConnectionErrors.WithLabelValues("get_program_account_error").Inc()
  133. errC <- err
  134. return
  135. }
  136. // Get confirmed accounts
  137. cAccounts, err := rpcClient.GetProgramAccounts(rCtx, s.bridge, &rpc.GetProgramAccountsOpts{
  138. Commitment: rpc.CommitmentSingle, // TODO: deprecated, use Confirmed
  139. Filters: []rpc.RPCFilter{
  140. {
  141. Memcmp: &rpc.RPCFilterMemcmp{
  142. Offset: 0, // Start of the account
  143. Bytes: solana.Base58{'m', 's', 'g'}, // Prefix of the posted message accounts
  144. },
  145. },
  146. {
  147. Memcmp: &rpc.RPCFilterMemcmp{
  148. Offset: 4, // Start of the Persist flag
  149. Bytes: solana.Base58{0x01}, // Only grab messages that need to be persisted
  150. },
  151. },
  152. {
  153. Memcmp: &rpc.RPCFilterMemcmp{
  154. Offset: 5, // Start of the ConsistencyLevel value
  155. Bytes: solana.Base58{1}, // Only grab messages that require the Confirmed level
  156. },
  157. },
  158. {
  159. Memcmp: &rpc.RPCFilterMemcmp{
  160. Offset: 6, // Offset of VaaTime
  161. Bytes: solana.Base58{0, 0, 0, 0}, // This means this VAA hasn't been signed yet
  162. },
  163. },
  164. },
  165. })
  166. queryLatency.WithLabelValues("get_program_accounts", "single").Observe(time.Since(start).Seconds())
  167. if err != nil {
  168. solanaConnectionErrors.WithLabelValues("get_program_account_error").Inc()
  169. errC <- err
  170. return
  171. }
  172. // Merge accounts
  173. accounts := append(fAccounts, cAccounts...)
  174. logger.Debug("fetched transfer proposals without VAA",
  175. zap.Int("n", len(accounts)),
  176. zap.Duration("took", time.Since(start)),
  177. )
  178. for _, acc := range accounts {
  179. proposal, err := ParseTransferOutProposal(acc.Account.Data)
  180. if err != nil {
  181. solanaAccountSkips.WithLabelValues("parse_transfer_out").Inc()
  182. logger.Warn(
  183. "failed to parse transfer proposal",
  184. zap.Stringer("account", acc.Pubkey),
  185. zap.Error(err),
  186. )
  187. continue
  188. }
  189. // VAA submitted
  190. if proposal.VaaTime != 0 {
  191. solanaAccountSkips.WithLabelValues("is_submitted_vaa").Inc()
  192. continue
  193. }
  194. var txHash eth_common.Hash
  195. copy(txHash[:], acc.Pubkey[:])
  196. lock := &common.MessagePublication{
  197. TxHash: txHash,
  198. Timestamp: time.Unix(int64(proposal.SubmissionTime), 0),
  199. Nonce: proposal.Nonce,
  200. Sequence: proposal.Sequence,
  201. EmitterChain: vaa.ChainIDSolana,
  202. EmitterAddress: proposal.EmitterAddress,
  203. Payload: proposal.Payload,
  204. Persist: proposal.Persist == 1,
  205. ConsistencyLevel: proposal.ConsistencyLevel,
  206. }
  207. solanaLockupsConfirmed.Inc()
  208. logger.Info("found lockup without VAA", zap.Stringer("lockup_address", acc.Pubkey))
  209. s.messageEvent <- lock
  210. }
  211. }()
  212. }
  213. }
  214. }()
  215. select {
  216. case <-ctx.Done():
  217. return ctx.Err()
  218. case err := <-errC:
  219. return err
  220. }
  221. }
  222. type (
  223. MessagePublicationAccount struct {
  224. VaaVersion uint8
  225. // Borsh does not seem to support booleans, so 0=false / 1=true
  226. Persist uint8
  227. ConsistencyLevel uint8
  228. VaaTime uint32
  229. VaaSignatureAccount vaa.Address
  230. SubmissionTime uint32
  231. Nonce uint32
  232. Sequence uint64
  233. EmitterChain uint16
  234. EmitterAddress vaa.Address
  235. Payload []byte
  236. }
  237. )
  238. func ParseTransferOutProposal(data []byte) (*MessagePublicationAccount, error) {
  239. prop := &MessagePublicationAccount{}
  240. // Skip the b"msg" prefix
  241. if err := borsh.Deserialize(prop, data[3:]); err != nil {
  242. return nil, err
  243. }
  244. return prop, nil
  245. }