client.go 15 KB


  1. package solana
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/certusone/wormhole/node/pkg/common"
  6. "github.com/certusone/wormhole/node/pkg/p2p"
  7. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  8. "github.com/certusone/wormhole/node/pkg/readiness"
  9. "github.com/certusone/wormhole/node/pkg/supervisor"
  10. "github.com/certusone/wormhole/node/pkg/vaa"
  11. eth_common "github.com/ethereum/go-ethereum/common"
  12. "github.com/gagliardetto/solana-go"
  13. "github.com/gagliardetto/solana-go/rpc"
  14. "github.com/mr-tron/base58"
  15. "github.com/near/borsh-go"
  16. "github.com/prometheus/client_golang/prometheus"
  17. "github.com/prometheus/client_golang/prometheus/promauto"
  18. "go.uber.org/zap"
  19. "time"
  20. )
  21. type SolanaWatcher struct {
  22. bridge solana.PublicKey
  23. wsUrl string
  24. rpcUrl string
  25. commitment rpc.CommitmentType
  26. messageEvent chan *common.MessagePublication
  27. logger *zap.Logger
  28. rpcClient *rpc.Client
  29. }
  30. var (
  31. solanaConnectionErrors = promauto.NewCounterVec(
  32. prometheus.CounterOpts{
  33. Name: "wormhole_solana_connection_errors_total",
  34. Help: "Total number of Solana connection errors",
  35. }, []string{"commitment", "reason"})
  36. solanaAccountSkips = promauto.NewCounterVec(
  37. prometheus.CounterOpts{
  38. Name: "wormhole_solana_account_updates_skipped_total",
  39. Help: "Total number of account updates skipped due to invalid data",
  40. }, []string{"reason"})
  41. solanaMessagesConfirmed = promauto.NewCounter(
  42. prometheus.CounterOpts{
  43. Name: "wormhole_solana_observations_confirmed_total",
  44. Help: "Total number of verified Solana observations found",
  45. })
  46. currentSolanaHeight = promauto.NewGaugeVec(
  47. prometheus.GaugeOpts{
  48. Name: "wormhole_solana_current_height",
  49. Help: "Current Solana slot height",
  50. }, []string{"commitment"})
  51. queryLatency = promauto.NewHistogramVec(
  52. prometheus.HistogramOpts{
  53. Name: "wormhole_solana_query_latency",
  54. Help: "Latency histogram for Solana RPC calls",
  55. }, []string{"operation", "commitment"})
  56. )
  57. const rpcTimeout = time.Second * 5
  58. type ConsistencyLevel uint8
  59. // Mappings from consistency levels constants to commitment level.
  60. const (
  61. consistencyLevelConfirmed ConsistencyLevel = 0
  62. consistencyLevelFinalized ConsistencyLevel = 1
  63. )
  64. func (c ConsistencyLevel) Commitment() (rpc.CommitmentType, error) {
  65. switch c {
  66. case consistencyLevelConfirmed:
  67. return rpc.CommitmentConfirmed, nil
  68. case consistencyLevelFinalized:
  69. return rpc.CommitmentFinalized, nil
  70. default:
  71. return "", fmt.Errorf("unsupported consistency level: %d", c)
  72. }
  73. }
  74. const (
  75. postMessageInstructionNumAccounts = 9
  76. postMessageInstructionID = 0x01
  77. )
  78. // PostMessageData represents the user-supplied, untrusted instruction data
  79. // for message publications. We use this to determine consistency level before fetching accounts.
  80. type PostMessageData struct {
  81. Nonce uint32
  82. Payload []byte
  83. ConsistencyLevel ConsistencyLevel
  84. }
  85. func NewSolanaWatcher(
  86. wsUrl, rpcUrl string,
  87. bridgeAddress solana.PublicKey,
  88. messageEvents chan *common.MessagePublication,
  89. commitment rpc.CommitmentType) *SolanaWatcher {
  90. return &SolanaWatcher{
  91. bridge: bridgeAddress,
  92. wsUrl: wsUrl, rpcUrl: rpcUrl,
  93. messageEvent: messageEvents,
  94. commitment: commitment,
  95. rpcClient: rpc.New(rpcUrl),
  96. }
  97. }
  98. func (s *SolanaWatcher) Run(ctx context.Context) error {
  99. // Initialize gossip metrics (we want to broadcast the address even if we're not yet syncing)
  100. bridgeAddr := base58.Encode(s.bridge[:])
  101. p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDSolana, &gossipv1.Heartbeat_Network{
  102. BridgeAddress: bridgeAddr,
  103. })
  104. s.logger = supervisor.Logger(ctx)
  105. errC := make(chan error)
  106. var lastSlot uint64
  107. go func() {
  108. timer := time.NewTicker(time.Second * 1)
  109. defer timer.Stop()
  110. for {
  111. select {
  112. case <-ctx.Done():
  113. return
  114. case <-timer.C:
  115. // Get current slot height
  116. rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
  117. defer cancel()
  118. start := time.Now()
  119. slot, err := s.rpcClient.GetSlot(rCtx, s.commitment)
  120. queryLatency.WithLabelValues("get_slot", string(s.commitment)).Observe(time.Since(start).Seconds())
  121. if err != nil {
  122. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSolana, 1)
  123. solanaConnectionErrors.WithLabelValues(string(s.commitment), "get_slot_error").Inc()
  124. errC <- err
  125. return
  126. }
  127. if lastSlot == 0 {
  128. lastSlot = slot - 1
  129. }
  130. currentSolanaHeight.WithLabelValues(string(s.commitment)).Set(float64(slot))
  131. readiness.SetReady(common.ReadinessSolanaSyncing)
  132. p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDSolana, &gossipv1.Heartbeat_Network{
  133. Height: int64(slot),
  134. BridgeAddress: bridgeAddr,
  135. })
  136. s.logger.Info("fetched current Solana height",
  137. zap.String("commitment", string(s.commitment)),
  138. zap.Uint64("slot", slot),
  139. zap.Uint64("lastSlot", lastSlot),
  140. zap.Uint64("pendingSlots", slot-lastSlot),
  141. zap.Duration("took", time.Since(start)))
  142. // Determine which slots we're missing
  143. //
  144. // Get list of confirmed blocks since the last request. The result
  145. // won't contain skipped slots.
  146. rangeStart := lastSlot + 1
  147. rangeEnd := slot
  148. rCtx, cancel = context.WithTimeout(ctx, rpcTimeout)
  149. defer cancel()
  150. start = time.Now()
  151. slots, err := s.rpcClient.GetConfirmedBlocks(rCtx, rangeStart, &rangeEnd, s.commitment)
  152. queryLatency.WithLabelValues("get_confirmed_blocks", string(s.commitment)).Observe(time.Since(start).Seconds())
  153. if err != nil {
  154. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSolana, 1)
  155. solanaConnectionErrors.WithLabelValues(string(s.commitment), "get_confirmed_blocks_error").Inc()
  156. errC <- err
  157. return
  158. }
  159. s.logger.Info("fetched slots in range",
  160. zap.Uint64("from", rangeStart), zap.Uint64("to", rangeEnd),
  161. zap.Duration("took", time.Since(start)),
  162. zap.String("commitment", string(s.commitment)))
  163. // Requesting each slot
  164. for _, slot := range slots {
  165. if slot <= lastSlot {
  166. // Skip out-of-range result
  167. // https://github.com/solana-labs/solana/issues/18946
  168. continue
  169. }
  170. go s.fetchBlock(ctx, slot)
  171. }
  172. lastSlot = slot
  173. }
  174. }
  175. }()
  176. select {
  177. case <-ctx.Done():
  178. return ctx.Err()
  179. case err := <-errC:
  180. return err
  181. }
  182. }
  183. func (s *SolanaWatcher) fetchBlock(ctx context.Context, slot uint64) {
  184. s.logger.Debug("requesting block", zap.Uint64("slot", slot), zap.String("commitment", string(s.commitment)))
  185. rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
  186. defer cancel()
  187. start := time.Now()
  188. rewards := false
  189. out, err := s.rpcClient.GetConfirmedBlockWithOpts(rCtx, slot, &rpc.GetConfirmedBlockOpts{
  190. Encoding: "json",
  191. TransactionDetails: "full",
  192. Rewards: &rewards,
  193. Commitment: s.commitment,
  194. })
  195. queryLatency.WithLabelValues("get_confirmed_block", string(s.commitment)).Observe(time.Since(start).Seconds())
  196. if err != nil {
  197. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSolana, 1)
  198. solanaConnectionErrors.WithLabelValues(string(s.commitment), "get_confirmed_block_error").Inc()
  199. s.logger.Error("failed to request block", zap.Error(err), zap.Uint64("slot", slot),
  200. zap.String("commitment", string(s.commitment)))
  201. return
  202. }
  203. if out == nil {
  204. solanaConnectionErrors.WithLabelValues(string(s.commitment), "get_confirmed_block_error").Inc()
  205. s.logger.Error("nil response when requesting block", zap.Error(err), zap.Uint64("slot", slot),
  206. zap.String("commitment", string(s.commitment)))
  207. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSolana, 1)
  208. return
  209. }
  210. s.logger.Info("fetched block",
  211. zap.Uint64("slot", slot),
  212. zap.Int("num_tx", len(out.Transactions)),
  213. zap.Duration("took", time.Since(start)),
  214. zap.String("commitment", string(s.commitment)))
  215. OUTER:
  216. for _, tx := range out.Transactions {
  217. signature := tx.Transaction.Signatures[0]
  218. var programIndex uint16
  219. for n, key := range tx.Transaction.Message.AccountKeys {
  220. if key.Equals(s.bridge) {
  221. programIndex = uint16(n)
  222. }
  223. }
  224. if programIndex == 0 {
  225. continue
  226. }
  227. s.logger.Info("found Wormhole transaction",
  228. zap.Stringer("signature", signature),
  229. zap.Uint64("slot", slot),
  230. zap.String("commitment", string(s.commitment)))
  231. // Find top-level instructions
  232. for _, inst := range tx.Transaction.Message.Instructions {
  233. found, err := s.processInstruction(ctx, slot, inst, programIndex, tx)
  234. if err != nil {
  235. s.logger.Error("malformed Wormhole instruction",
  236. zap.Error(err),
  237. zap.Stringer("signature", signature),
  238. zap.Uint64("slot", slot),
  239. zap.String("commitment", string(s.commitment)),
  240. zap.Binary("data", inst.Data))
  241. continue OUTER
  242. }
  243. if found {
  244. continue OUTER
  245. }
  246. }
  247. // Call GetConfirmedTransaction to get at innerTransactions
  248. rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
  249. defer cancel()
  250. start := time.Now()
  251. tr, err := s.rpcClient.GetConfirmedTransactionWithOpts(rCtx, signature, &rpc.GetTransactionOpts{
  252. Encoding: "json",
  253. Commitment: s.commitment,
  254. })
  255. queryLatency.WithLabelValues("get_confirmed_transaction", string(s.commitment)).Observe(time.Since(start).Seconds())
  256. if err != nil {
  257. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSolana, 1)
  258. solanaConnectionErrors.WithLabelValues(string(s.commitment), "get_confirmed_transaction_error").Inc()
  259. s.logger.Error("failed to request transaction",
  260. zap.Error(err),
  261. zap.Uint64("slot", slot),
  262. zap.String("commitment", string(s.commitment)),
  263. zap.Stringer("signature", signature))
  264. return
  265. }
  266. s.logger.Info("fetched transaction",
  267. zap.Uint64("slot", slot),
  268. zap.String("commitment", string(s.commitment)),
  269. zap.Stringer("signature", signature),
  270. zap.Duration("took", time.Since(start)))
  271. for _, inner := range tr.Meta.InnerInstructions {
  272. for _, inst := range inner.Instructions {
  273. _, err := s.processInstruction(ctx, slot, inst, programIndex, tx)
  274. if err != nil {
  275. s.logger.Error("malformed Wormhole instruction",
  276. zap.Error(err),
  277. zap.Stringer("signature", signature),
  278. zap.Uint64("slot", slot),
  279. zap.String("commitment", string(s.commitment)))
  280. }
  281. }
  282. }
  283. }
  284. }
  285. func (s *SolanaWatcher) processInstruction(ctx context.Context, slot uint64, inst solana.CompiledInstruction, programIndex uint16, tx rpc.TransactionWithMeta) (bool, error) {
  286. if inst.ProgramIDIndex != programIndex {
  287. return false, nil
  288. }
  289. if len(inst.Accounts) != postMessageInstructionNumAccounts {
  290. return false, fmt.Errorf("invalid number of accounts: %d instead of %d",
  291. len(inst.Accounts), postMessageInstructionNumAccounts)
  292. }
  293. if inst.Data[0] != postMessageInstructionID {
  294. return false, fmt.Errorf("invalid postMessage instruction ID, got: %d", inst.Data[0])
  295. }
  296. // Decode instruction data (UNTRUSTED)
  297. var data PostMessageData
  298. if err := borsh.Deserialize(&data, inst.Data[1:]); err != nil {
  299. return false, fmt.Errorf("failed to deserialize instruction data: %w", err)
  300. }
  301. s.logger.Info("post message data", zap.Any("deserialized_data", data))
  302. level, err := data.ConsistencyLevel.Commitment()
  303. if err != nil {
  304. return false, fmt.Errorf("failed to determine commitment: %w", err)
  305. }
  306. if level != s.commitment {
  307. return true, nil
  308. }
  309. // The second account in a well-formed Wormhole instruction is the VAA program account.
  310. acc := tx.Transaction.Message.AccountKeys[inst.Accounts[1]]
  311. go s.fetchMessageAccount(ctx, acc, slot)
  312. return true, nil
  313. }
  314. func (s *SolanaWatcher) fetchMessageAccount(ctx context.Context, acc solana.PublicKey, slot uint64) {
  315. // Fetching account
  316. rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
  317. defer cancel()
  318. start := time.Now()
  319. info, err := s.rpcClient.GetAccountInfoWithOpts(rCtx, acc, &rpc.GetAccountInfoOpts{
  320. Encoding: solana.EncodingBase64,
  321. Commitment: s.commitment,
  322. })
  323. queryLatency.WithLabelValues("get_account_info", string(s.commitment)).Observe(time.Since(start).Seconds())
  324. if err != nil {
  325. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSolana, 1)
  326. solanaConnectionErrors.WithLabelValues(string(s.commitment), "get_account_info_error").Inc()
  327. s.logger.Error("failed to request account",
  328. zap.Error(err),
  329. zap.Uint64("slot", slot),
  330. zap.String("commitment", string(s.commitment)),
  331. zap.Stringer("account", acc))
  332. return
  333. }
  334. if !info.Value.Owner.Equals(s.bridge) {
  335. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSolana, 1)
  336. solanaConnectionErrors.WithLabelValues(string(s.commitment), "account_owner_mismatch").Inc()
  337. s.logger.Error("account has invalid owner",
  338. zap.Uint64("slot", slot),
  339. zap.String("commitment", string(s.commitment)),
  340. zap.Stringer("account", acc),
  341. zap.Stringer("unexpected_owner", info.Value.Owner))
  342. return
  343. }
  344. data := info.Value.Data.GetBinary()
  345. if string(data[:3]) != "msg" {
  346. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSolana, 1)
  347. solanaConnectionErrors.WithLabelValues(string(s.commitment), "bad_account_data").Inc()
  348. s.logger.Error("account is not a message account",
  349. zap.Uint64("slot", slot),
  350. zap.String("commitment", string(s.commitment)),
  351. zap.Stringer("account", acc))
  352. return
  353. }
  354. s.logger.Info("found valid VAA account",
  355. zap.Uint64("slot", slot),
  356. zap.String("commitment", string(s.commitment)),
  357. zap.Stringer("account", acc),
  358. zap.Binary("data", data))
  359. s.processMessageAccount(data, acc)
  360. }
  361. func (s *SolanaWatcher) processMessageAccount(data []byte, acc solana.PublicKey) {
  362. proposal, err := ParseTransferOutProposal(data)
  363. if err != nil {
  364. solanaAccountSkips.WithLabelValues("parse_transfer_out").Inc()
  365. s.logger.Error(
  366. "failed to parse transfer proposal",
  367. zap.Stringer("account", acc),
  368. zap.Binary("data", data),
  369. zap.Error(err))
  370. return
  371. }
  372. var txHash eth_common.Hash
  373. copy(txHash[:], acc[:])
  374. observation := &common.MessagePublication{
  375. TxHash: txHash,
  376. Timestamp: time.Unix(int64(proposal.SubmissionTime), 0),
  377. Nonce: proposal.Nonce,
  378. Sequence: proposal.Sequence,
  379. EmitterChain: vaa.ChainIDSolana,
  380. EmitterAddress: proposal.EmitterAddress,
  381. Payload: proposal.Payload,
  382. ConsistencyLevel: proposal.ConsistencyLevel,
  383. }
  384. solanaMessagesConfirmed.Inc()
  385. s.logger.Info("message observed",
  386. zap.Stringer("account", acc),
  387. zap.Time("timestamp", observation.Timestamp),
  388. zap.Uint32("nonce", observation.Nonce),
  389. zap.Uint64("sequence", observation.Sequence),
  390. zap.Stringer("emitter_chain", observation.EmitterChain),
  391. zap.Stringer("emitter_address", observation.EmitterAddress),
  392. zap.Binary("payload", observation.Payload),
  393. zap.Uint8("consistency_level", observation.ConsistencyLevel),
  394. )
  395. s.messageEvent <- observation
  396. }
  397. type (
  398. MessagePublicationAccount struct {
  399. VaaVersion uint8
  400. // Borsh does not seem to support booleans, so 0=false / 1=true
  401. ConsistencyLevel uint8
  402. VaaTime uint32
  403. VaaSignatureAccount vaa.Address
  404. SubmissionTime uint32
  405. Nonce uint32
  406. Sequence uint64
  407. EmitterChain uint16
  408. EmitterAddress vaa.Address
  409. Payload []byte
  410. }
  411. )
  412. func ParseTransferOutProposal(data []byte) (*MessagePublicationAccount, error) {
  413. prop := &MessagePublicationAccount{}
  414. // Skip the b"msg" prefix
  415. if err := borsh.Deserialize(prop, data[3:]); err != nil {
  416. return nil, err
  417. }
  418. return prop, nil
  419. }