client.go 18 KB


  1. package solana
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/certusone/wormhole/node/pkg/common"
  7. "github.com/certusone/wormhole/node/pkg/p2p"
  8. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  9. "github.com/certusone/wormhole/node/pkg/readiness"
  10. "github.com/certusone/wormhole/node/pkg/supervisor"
  11. "github.com/certusone/wormhole/node/pkg/vaa"
  12. eth_common "github.com/ethereum/go-ethereum/common"
  13. "github.com/gagliardetto/solana-go"
  14. "github.com/gagliardetto/solana-go/rpc"
  15. "github.com/gagliardetto/solana-go/rpc/jsonrpc"
  16. "github.com/mr-tron/base58"
  17. "github.com/near/borsh-go"
  18. "github.com/prometheus/client_golang/prometheus"
  19. "github.com/prometheus/client_golang/prometheus/promauto"
  20. "go.uber.org/zap"
  21. "time"
  22. )
  23. type SolanaWatcher struct {
  24. contract solana.PublicKey
  25. wsUrl string
  26. rpcUrl string
  27. commitment rpc.CommitmentType
  28. messageEvent chan *common.MessagePublication
  29. obsvReqC chan *gossipv1.ObservationRequest
  30. rpcClient *rpc.Client
  31. }
  32. var (
  33. solanaConnectionErrors = promauto.NewCounterVec(
  34. prometheus.CounterOpts{
  35. Name: "wormhole_solana_connection_errors_total",
  36. Help: "Total number of Solana connection errors",
  37. }, []string{"commitment", "reason"})
  38. solanaAccountSkips = promauto.NewCounterVec(
  39. prometheus.CounterOpts{
  40. Name: "wormhole_solana_account_updates_skipped_total",
  41. Help: "Total number of account updates skipped due to invalid data",
  42. }, []string{"reason"})
  43. solanaMessagesConfirmed = promauto.NewCounter(
  44. prometheus.CounterOpts{
  45. Name: "wormhole_solana_observations_confirmed_total",
  46. Help: "Total number of verified Solana observations found",
  47. })
  48. currentSolanaHeight = promauto.NewGaugeVec(
  49. prometheus.GaugeOpts{
  50. Name: "wormhole_solana_current_height",
  51. Help: "Current Solana slot height",
  52. }, []string{"commitment"})
  53. queryLatency = promauto.NewHistogramVec(
  54. prometheus.HistogramOpts{
  55. Name: "wormhole_solana_query_latency",
  56. Help: "Latency histogram for Solana RPC calls",
  57. }, []string{"operation", "commitment"})
  58. )
  59. const rpcTimeout = time.Second * 5
  60. // Maximum retries for Solana fetching
  61. const maxRetries = 10
  62. const retryDelay = 5 * time.Second
  63. type ConsistencyLevel uint8
  64. // Mappings from consistency levels constants to commitment level.
  65. const (
  66. consistencyLevelConfirmed ConsistencyLevel = 0
  67. consistencyLevelFinalized ConsistencyLevel = 1
  68. )
  69. func (c ConsistencyLevel) Commitment() (rpc.CommitmentType, error) {
  70. switch c {
  71. case consistencyLevelConfirmed:
  72. return rpc.CommitmentConfirmed, nil
  73. case consistencyLevelFinalized:
  74. return rpc.CommitmentFinalized, nil
  75. default:
  76. return "", fmt.Errorf("unsupported consistency level: %d", c)
  77. }
  78. }
  79. const (
  80. postMessageInstructionNumAccounts = 9
  81. postMessageInstructionID = 0x01
  82. )
  83. // PostMessageData represents the user-supplied, untrusted instruction data
  84. // for message publications. We use this to determine consistency level before fetching accounts.
  85. type PostMessageData struct {
  86. Nonce uint32
  87. Payload []byte
  88. ConsistencyLevel ConsistencyLevel
  89. }
  90. func NewSolanaWatcher(
  91. wsUrl, rpcUrl string,
  92. contractAddress solana.PublicKey,
  93. messageEvents chan *common.MessagePublication,
  94. obsvReqC chan *gossipv1.ObservationRequest,
  95. commitment rpc.CommitmentType) *SolanaWatcher {
  96. return &SolanaWatcher{
  97. contract: contractAddress,
  98. wsUrl: wsUrl, rpcUrl: rpcUrl,
  99. messageEvent: messageEvents,
  100. obsvReqC: obsvReqC,
  101. commitment: commitment,
  102. rpcClient: rpc.New(rpcUrl),
  103. }
  104. }
  105. func (s *SolanaWatcher) Run(ctx context.Context) error {
  106. // Initialize gossip metrics (we want to broadcast the address even if we're not yet syncing)
  107. contractAddr := base58.Encode(s.contract[:])
  108. p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDSolana, &gossipv1.Heartbeat_Network{
  109. ContractAddress: contractAddr,
  110. })
  111. logger := supervisor.Logger(ctx)
  112. errC := make(chan error)
  113. var lastSlot uint64
  114. go func() {
  115. timer := time.NewTicker(time.Second * 1)
  116. defer timer.Stop()
  117. for {
  118. select {
  119. case <-ctx.Done():
  120. return
  121. case m := <-s.obsvReqC:
  122. if m.ChainId != uint32(vaa.ChainIDSolana) {
  123. panic("unexpected chain id")
  124. }
  125. acc := solana.PublicKeyFromBytes(m.TxHash)
  126. logger.Info("received observation request", zap.String("account", acc.String()))
  127. rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
  128. s.fetchMessageAccount(rCtx, logger, acc, 0)
  129. cancel()
  130. case <-timer.C:
  131. // Get current slot height
  132. rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
  133. start := time.Now()
  134. slot, err := s.rpcClient.GetSlot(rCtx, s.commitment)
  135. cancel()
  136. queryLatency.WithLabelValues("get_slot", string(s.commitment)).Observe(time.Since(start).Seconds())
  137. if err != nil {
  138. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSolana, 1)
  139. solanaConnectionErrors.WithLabelValues(string(s.commitment), "get_slot_error").Inc()
  140. errC <- err
  141. return
  142. }
  143. if lastSlot == 0 {
  144. lastSlot = slot - 1
  145. }
  146. currentSolanaHeight.WithLabelValues(string(s.commitment)).Set(float64(slot))
  147. readiness.SetReady(common.ReadinessSolanaSyncing)
  148. p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDSolana, &gossipv1.Heartbeat_Network{
  149. Height: int64(slot),
  150. ContractAddress: contractAddr,
  151. })
  152. logger.Info("fetched current Solana height",
  153. zap.String("commitment", string(s.commitment)),
  154. zap.Uint64("slot", slot),
  155. zap.Uint64("lastSlot", lastSlot),
  156. zap.Uint64("pendingSlots", slot-lastSlot),
  157. zap.Duration("took", time.Since(start)))
  158. rangeStart := lastSlot + 1
  159. rangeEnd := slot
  160. logger.Info("fetching slots in range",
  161. zap.Uint64("from", rangeStart), zap.Uint64("to", rangeEnd),
  162. zap.Duration("took", time.Since(start)),
  163. zap.String("commitment", string(s.commitment)))
  164. // Requesting each slot
  165. for slot := rangeStart; slot <= rangeEnd; slot++ {
  166. go s.retryFetchBlock(ctx, logger, slot, 0)
  167. }
  168. lastSlot = slot
  169. }
  170. }
  171. }()
  172. select {
  173. case <-ctx.Done():
  174. return ctx.Err()
  175. case err := <-errC:
  176. return err
  177. }
  178. }
  179. func (s *SolanaWatcher) retryFetchBlock(ctx context.Context, logger *zap.Logger, slot uint64, retry uint) {
  180. ok := s.fetchBlock(ctx, logger, slot, 0)
  181. if !ok {
  182. if retry >= maxRetries {
  183. logger.Error("max retries for block",
  184. zap.Uint64("slot", slot),
  185. zap.String("commitment", string(s.commitment)),
  186. zap.Uint("retry", retry))
  187. return
  188. }
  189. time.Sleep(retryDelay)
  190. logger.Info("retrying block",
  191. zap.Uint64("slot", slot),
  192. zap.String("commitment", string(s.commitment)),
  193. zap.Uint("retry", retry))
  194. go s.retryFetchBlock(ctx, logger, slot, retry+1)
  195. }
  196. }
  197. func (s *SolanaWatcher) fetchBlock(ctx context.Context, logger *zap.Logger, slot uint64, emptyRetry uint) (ok bool) {
  198. logger.Debug("requesting block",
  199. zap.Uint64("slot", slot),
  200. zap.String("commitment", string(s.commitment)),
  201. zap.Uint("empty_retry", emptyRetry))
  202. rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
  203. defer cancel()
  204. start := time.Now()
  205. rewards := false
  206. out, err := s.rpcClient.GetConfirmedBlockWithOpts(rCtx, slot, &rpc.GetConfirmedBlockOpts{
  207. Encoding: "json",
  208. TransactionDetails: "full",
  209. Rewards: &rewards,
  210. Commitment: s.commitment,
  211. })
  212. queryLatency.WithLabelValues("get_confirmed_block", string(s.commitment)).Observe(time.Since(start).Seconds())
  213. if err != nil {
  214. var rpcErr *jsonrpc.RPCError
  215. if errors.As(err, &rpcErr) && (rpcErr.Code == -32007 /* SLOT_SKIPPED */ || rpcErr.Code == -32004 /* BLOCK_NOT_AVAILABLE */) {
  216. logger.Info("empty slot", zap.Uint64("slot", slot),
  217. zap.Int("code", rpcErr.Code),
  218. zap.String("commitment", string(s.commitment)))
  219. // TODO(leo): clean this up once we know what's happening
  220. // https://github.com/solana-labs/solana/issues/20370
  221. var maxEmptyRetry uint
  222. if s.commitment == rpc.CommitmentFinalized {
  223. maxEmptyRetry = 5
  224. } else {
  225. maxEmptyRetry = 1
  226. }
  227. // Schedule a single retry just in case the Solana node was confused about the block being missing.
  228. if emptyRetry < maxEmptyRetry {
  229. go func() {
  230. time.Sleep(retryDelay)
  231. s.fetchBlock(ctx, logger, slot, emptyRetry+1)
  232. }()
  233. }
  234. return true
  235. } else {
  236. logger.Error("failed to request block", zap.Error(err), zap.Uint64("slot", slot),
  237. zap.String("commitment", string(s.commitment)))
  238. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSolana, 1)
  239. solanaConnectionErrors.WithLabelValues(string(s.commitment), "get_confirmed_block_error").Inc()
  240. }
  241. return false
  242. }
  243. if out == nil {
  244. solanaConnectionErrors.WithLabelValues(string(s.commitment), "get_confirmed_block_error").Inc()
  245. logger.Error("nil response when requesting block", zap.Error(err), zap.Uint64("slot", slot),
  246. zap.String("commitment", string(s.commitment)))
  247. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSolana, 1)
  248. return false
  249. }
  250. logger.Info("fetched block",
  251. zap.Uint64("slot", slot),
  252. zap.Int("num_tx", len(out.Transactions)),
  253. zap.Duration("took", time.Since(start)),
  254. zap.String("commitment", string(s.commitment)))
  255. OUTER:
  256. for _, tx := range out.Transactions {
  257. signature := tx.Transaction.Signatures[0]
  258. var programIndex uint16
  259. for n, key := range tx.Transaction.Message.AccountKeys {
  260. if key.Equals(s.contract) {
  261. programIndex = uint16(n)
  262. }
  263. }
  264. if programIndex == 0 {
  265. continue
  266. }
  267. if tx.Meta.Err != nil {
  268. logger.Debug("skipping failed Wormhole transaction",
  269. zap.Stringer("signature", signature),
  270. zap.Uint64("slot", slot),
  271. zap.String("commitment", string(s.commitment)))
  272. continue
  273. }
  274. logger.Info("found Wormhole transaction",
  275. zap.Stringer("signature", signature),
  276. zap.Uint64("slot", slot),
  277. zap.String("commitment", string(s.commitment)))
  278. // Find top-level instructions
  279. for i, inst := range tx.Transaction.Message.Instructions {
  280. found, err := s.processInstruction(ctx, logger, slot, inst, programIndex, tx, signature, i)
  281. if err != nil {
  282. logger.Error("malformed Wormhole instruction",
  283. zap.Error(err),
  284. zap.Int("idx", i),
  285. zap.Stringer("signature", signature),
  286. zap.Uint64("slot", slot),
  287. zap.String("commitment", string(s.commitment)),
  288. zap.Binary("data", inst.Data))
  289. continue OUTER
  290. }
  291. if found {
  292. continue OUTER
  293. }
  294. }
  295. // Call GetConfirmedTransaction to get at innerTransactions
  296. rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
  297. start := time.Now()
  298. tr, err := s.rpcClient.GetConfirmedTransactionWithOpts(rCtx, signature, &rpc.GetTransactionOpts{
  299. Encoding: "json",
  300. Commitment: s.commitment,
  301. })
  302. cancel()
  303. queryLatency.WithLabelValues("get_confirmed_transaction", string(s.commitment)).Observe(time.Since(start).Seconds())
  304. if err != nil {
  305. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSolana, 1)
  306. solanaConnectionErrors.WithLabelValues(string(s.commitment), "get_confirmed_transaction_error").Inc()
  307. logger.Error("failed to request transaction",
  308. zap.Error(err),
  309. zap.Uint64("slot", slot),
  310. zap.String("commitment", string(s.commitment)),
  311. zap.Stringer("signature", signature))
  312. return false
  313. }
  314. logger.Info("fetched transaction",
  315. zap.Uint64("slot", slot),
  316. zap.String("commitment", string(s.commitment)),
  317. zap.Stringer("signature", signature),
  318. zap.Duration("took", time.Since(start)))
  319. for _, inner := range tr.Meta.InnerInstructions {
  320. for i, inst := range inner.Instructions {
  321. _, err := s.processInstruction(ctx, logger, slot, inst, programIndex, tx, signature, i)
  322. if err != nil {
  323. logger.Error("malformed Wormhole instruction",
  324. zap.Error(err),
  325. zap.Int("idx", i),
  326. zap.Stringer("signature", signature),
  327. zap.Uint64("slot", slot),
  328. zap.String("commitment", string(s.commitment)))
  329. }
  330. }
  331. }
  332. }
  333. if emptyRetry > 0 {
  334. logger.Warn("SOLANA BUG: skipped or unavailable block retrieved on retry attempt",
  335. zap.Uint("empty_retry", emptyRetry),
  336. zap.Uint64("slot", slot),
  337. zap.String("commitment", string(s.commitment)))
  338. }
  339. return true
  340. }
  341. func (s *SolanaWatcher) processInstruction(ctx context.Context, logger *zap.Logger, slot uint64, inst solana.CompiledInstruction, programIndex uint16, tx rpc.TransactionWithMeta, signature solana.Signature, idx int) (bool, error) {
  342. if inst.ProgramIDIndex != programIndex {
  343. return false, nil
  344. }
  345. if inst.Data[0] != postMessageInstructionID {
  346. return false, nil
  347. }
  348. if len(inst.Accounts) != postMessageInstructionNumAccounts {
  349. return false, fmt.Errorf("invalid number of accounts: %d instead of %d",
  350. len(inst.Accounts), postMessageInstructionNumAccounts)
  351. }
  352. // Decode instruction data (UNTRUSTED)
  353. var data PostMessageData
  354. if err := borsh.Deserialize(&data, inst.Data[1:]); err != nil {
  355. return false, fmt.Errorf("failed to deserialize instruction data: %w", err)
  356. }
  357. logger.Info("post message data", zap.Any("deserialized_data", data),
  358. zap.Stringer("signature", signature), zap.Uint64("slot", slot), zap.Int("idx", idx))
  359. level, err := data.ConsistencyLevel.Commitment()
  360. if err != nil {
  361. return false, fmt.Errorf("failed to determine commitment: %w", err)
  362. }
  363. if level != s.commitment {
  364. return true, nil
  365. }
  366. // The second account in a well-formed Wormhole instruction is the VAA program account.
  367. acc := tx.Transaction.Message.AccountKeys[inst.Accounts[1]]
  368. logger.Info("fetching VAA account", zap.Stringer("acc", acc),
  369. zap.Stringer("signature", signature), zap.Uint64("slot", slot), zap.Int("idx", idx))
  370. go s.retryFetchMessageAccount(ctx, logger, acc, slot, 0)
  371. return true, nil
  372. }
  373. func (s *SolanaWatcher) retryFetchMessageAccount(ctx context.Context, logger *zap.Logger, acc solana.PublicKey, slot uint64, retry uint) {
  374. retryable := s.fetchMessageAccount(ctx, logger, acc, slot)
  375. if retryable {
  376. if retry >= maxRetries {
  377. logger.Error("max retries for account",
  378. zap.Uint64("slot", slot),
  379. zap.Stringer("account", acc),
  380. zap.String("commitment", string(s.commitment)),
  381. zap.Uint("retry", retry))
  382. return
  383. }
  384. time.Sleep(retryDelay)
  385. logger.Info("retrying account",
  386. zap.Uint64("slot", slot),
  387. zap.Stringer("account", acc),
  388. zap.String("commitment", string(s.commitment)),
  389. zap.Uint("retry", retry))
  390. go s.retryFetchMessageAccount(ctx, logger, acc, slot, retry+1)
  391. }
  392. }
  393. func (s *SolanaWatcher) fetchMessageAccount(ctx context.Context, logger *zap.Logger, acc solana.PublicKey, slot uint64) (retryable bool) {
  394. // Fetching account
  395. rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
  396. defer cancel()
  397. start := time.Now()
  398. info, err := s.rpcClient.GetAccountInfoWithOpts(rCtx, acc, &rpc.GetAccountInfoOpts{
  399. Encoding: solana.EncodingBase64,
  400. Commitment: s.commitment,
  401. })
  402. queryLatency.WithLabelValues("get_account_info", string(s.commitment)).Observe(time.Since(start).Seconds())
  403. if err != nil {
  404. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSolana, 1)
  405. solanaConnectionErrors.WithLabelValues(string(s.commitment), "get_account_info_error").Inc()
  406. logger.Error("failed to request account",
  407. zap.Error(err),
  408. zap.Uint64("slot", slot),
  409. zap.String("commitment", string(s.commitment)),
  410. zap.Stringer("account", acc))
  411. return true
  412. }
  413. if !info.Value.Owner.Equals(s.contract) {
  414. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSolana, 1)
  415. solanaConnectionErrors.WithLabelValues(string(s.commitment), "account_owner_mismatch").Inc()
  416. logger.Error("account has invalid owner",
  417. zap.Uint64("slot", slot),
  418. zap.String("commitment", string(s.commitment)),
  419. zap.Stringer("account", acc),
  420. zap.Stringer("unexpected_owner", info.Value.Owner))
  421. return false
  422. }
  423. data := info.Value.Data.GetBinary()
  424. if string(data[:3]) != "msg" {
  425. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSolana, 1)
  426. solanaConnectionErrors.WithLabelValues(string(s.commitment), "bad_account_data").Inc()
  427. logger.Error("account is not a message account",
  428. zap.Uint64("slot", slot),
  429. zap.String("commitment", string(s.commitment)),
  430. zap.Stringer("account", acc))
  431. return false
  432. }
  433. logger.Info("found valid VAA account",
  434. zap.Uint64("slot", slot),
  435. zap.String("commitment", string(s.commitment)),
  436. zap.Stringer("account", acc),
  437. zap.Binary("data", data))
  438. s.processMessageAccount(logger, data, acc)
  439. return false
  440. }
  441. func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, acc solana.PublicKey) {
  442. proposal, err := ParseMessagePublicationAccount(data)
  443. if err != nil {
  444. solanaAccountSkips.WithLabelValues("parse_transfer_out").Inc()
  445. logger.Error(
  446. "failed to parse transfer proposal",
  447. zap.Stringer("account", acc),
  448. zap.Binary("data", data),
  449. zap.Error(err))
  450. return
  451. }
  452. var txHash eth_common.Hash
  453. copy(txHash[:], acc[:])
  454. observation := &common.MessagePublication{
  455. TxHash: txHash,
  456. Timestamp: time.Unix(int64(proposal.SubmissionTime), 0),
  457. Nonce: proposal.Nonce,
  458. Sequence: proposal.Sequence,
  459. EmitterChain: vaa.ChainIDSolana,
  460. EmitterAddress: proposal.EmitterAddress,
  461. Payload: proposal.Payload,
  462. ConsistencyLevel: proposal.ConsistencyLevel,
  463. }
  464. solanaMessagesConfirmed.Inc()
  465. logger.Info("message observed",
  466. zap.Stringer("account", acc),
  467. zap.Time("timestamp", observation.Timestamp),
  468. zap.Uint32("nonce", observation.Nonce),
  469. zap.Uint64("sequence", observation.Sequence),
  470. zap.Stringer("emitter_chain", observation.EmitterChain),
  471. zap.Stringer("emitter_address", observation.EmitterAddress),
  472. zap.Binary("payload", observation.Payload),
  473. zap.Uint8("consistency_level", observation.ConsistencyLevel),
  474. )
  475. s.messageEvent <- observation
  476. }
  477. type (
  478. MessagePublicationAccount struct {
  479. VaaVersion uint8
  480. // Borsh does not seem to support booleans, so 0=false / 1=true
  481. ConsistencyLevel uint8
  482. VaaTime uint32
  483. VaaSignatureAccount vaa.Address
  484. SubmissionTime uint32
  485. Nonce uint32
  486. Sequence uint64
  487. EmitterChain uint16
  488. EmitterAddress vaa.Address
  489. Payload []byte
  490. }
  491. )
  492. func ParseMessagePublicationAccount(data []byte) (*MessagePublicationAccount, error) {
  493. prop := &MessagePublicationAccount{}
  494. // Skip the b"msg" prefix
  495. if err := borsh.Deserialize(prop, data[3:]); err != nil {
  496. return nil, err
  497. }
  498. return prop, nil
  499. }