submitter.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package solana
  2. import (
  3. "context"
  4. "encoding/hex"
  5. "fmt"
  6. "github.com/prometheus/client_golang/prometheus"
  7. "strings"
  8. "time"
  9. "google.golang.org/grpc"
  10. "google.golang.org/grpc/codes"
  11. "google.golang.org/grpc/status"
  12. "go.uber.org/zap"
  13. agentv1 "github.com/certusone/wormhole/bridge/pkg/proto/agent/v1"
  14. "github.com/certusone/wormhole/bridge/pkg/readiness"
  15. "github.com/certusone/wormhole/bridge/pkg/common"
  16. "github.com/certusone/wormhole/bridge/pkg/supervisor"
  17. "github.com/certusone/wormhole/bridge/pkg/vaa"
  18. )
  19. var (
  20. solanaVAASubmitted = prometheus.NewCounter(
  21. prometheus.CounterOpts{
  22. Name: "wormhole_solana_vaa_submitted_total",
  23. Help: "Total number of VAAs successfully submitted to the chain",
  24. })
  25. solanaFeePayerBalance = prometheus.NewGauge(
  26. prometheus.GaugeOpts{
  27. Name: "wormhole_solana_fee_account_balance_lamports",
  28. Help: "Current fee payer account balance in lamports",
  29. })
  30. )
  31. func init() {
  32. prometheus.MustRegister(solanaVAASubmitted)
  33. prometheus.MustRegister(solanaFeePayerBalance)
  34. }
  35. type (
  36. SolanaVAASubmitter struct {
  37. url string
  38. messageChan chan *common.MessagePublication
  39. vaaChan chan *vaa.VAA
  40. skipPreflight bool
  41. }
  42. )
  43. func NewSolanaVAASubmitter(url string, lockEvents chan *common.MessagePublication, vaaQueue chan *vaa.VAA, skipPreflight bool) *SolanaVAASubmitter {
  44. return &SolanaVAASubmitter{url: url, messageChan: lockEvents, vaaChan: vaaQueue, skipPreflight: skipPreflight}
  45. }
  46. func (e *SolanaVAASubmitter) Run(ctx context.Context) error {
  47. // We only support UNIX sockets since we rely on Unix filesystem permissions for access control.
  48. path := fmt.Sprintf("unix://%s", e.url)
  49. timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
  50. defer cancel()
  51. conn, err := grpc.DialContext(timeout, path, grpc.WithBlock(), grpc.WithInsecure())
  52. if err != nil {
  53. return fmt.Errorf("failed to dial agent at %s: %w", path, err)
  54. }
  55. defer conn.Close()
  56. c := agentv1.NewAgentClient(conn)
  57. errC := make(chan error)
  58. logger := supervisor.Logger(ctx)
  59. // Check whether agent is up by doing a GetBalance call.
  60. balance, err := c.GetBalance(timeout, &agentv1.GetBalanceRequest{})
  61. if err != nil {
  62. solanaConnectionErrors.WithLabelValues("get_balance_error").Inc()
  63. return fmt.Errorf("failed to get balance: %v", err)
  64. }
  65. readiness.SetReady(common.ReadinessSolanaSyncing)
  66. logger.Info("account balance", zap.Uint64("lamports", balance.Balance))
  67. // Periodically request the balance for monitoring
  68. btick := time.NewTicker(1 * time.Minute)
  69. go func() {
  70. for {
  71. select {
  72. case <-ctx.Done():
  73. return
  74. case <-btick.C:
  75. timeout, cancel := context.WithTimeout(ctx, 5*time.Second)
  76. balance, err = c.GetBalance(timeout, &agentv1.GetBalanceRequest{})
  77. if err != nil {
  78. solanaConnectionErrors.WithLabelValues("get_balance_error").Inc()
  79. // With PostVAA, it's hard to tell, but if this one fails we know
  80. // that something went wrong and we should restart the service.
  81. errC <- fmt.Errorf("failed to get balance: %v", err)
  82. cancel()
  83. break
  84. }
  85. cancel()
  86. solanaFeePayerBalance.Set(float64(balance.Balance))
  87. case v := <-e.vaaChan:
  88. vaaBytes, err := v.Marshal()
  89. if err != nil {
  90. panic(err)
  91. }
  92. // Calculate digest so we can log it (TODO: refactor to vaa method? we do this in different places)
  93. m, err := v.SigningMsg()
  94. if err != nil {
  95. panic(err)
  96. }
  97. h := hex.EncodeToString(m.Bytes())
  98. timeout, cancel := context.WithTimeout(ctx, 120*time.Second)
  99. res, err := c.SubmitVAA(timeout, &agentv1.SubmitVAARequest{Vaa: vaaBytes, SkipPreflight: e.skipPreflight})
  100. cancel()
  101. if err != nil {
  102. st, ok := status.FromError(err)
  103. if !ok {
  104. panic("err not a status")
  105. }
  106. // For transient errors, we can put the VAA back into the queue such that it can
  107. // be retried after the runnable has been rescheduled.
  108. switch st.Code() {
  109. case
  110. // Our context was cancelled, likely because the watcher stream died.
  111. codes.Canceled,
  112. // The agent encountered a transient error, likely node unavailability.
  113. codes.Unavailable,
  114. codes.Aborted:
  115. solanaConnectionErrors.WithLabelValues("postvaa_transient_error").Inc()
  116. logger.Error("transient error, requeuing VAA", zap.Error(err), zap.String("digest", h))
  117. // Tombstone goroutine
  118. go func(v *vaa.VAA) {
  119. time.Sleep(10 * time.Second)
  120. e.vaaChan <- v
  121. }(v)
  122. case codes.Internal:
  123. // This VAA has already been executed on chain, successfully or not.
  124. // TODO: dissect InstructionError in agent and convert this to the proper gRPC code
  125. if strings.Contains(st.Message(), "custom program error: 0xb") { // AlreadyExists
  126. logger.Info("VAA already submitted on-chain, ignoring", zap.Error(err), zap.String("digest", h))
  127. break
  128. }
  129. fallthrough
  130. default:
  131. solanaConnectionErrors.WithLabelValues("postvaa_internal_error").Inc()
  132. logger.Error("error submitting VAA", zap.Error(err), zap.String("digest", h))
  133. }
  134. break
  135. }
  136. solanaVAASubmitted.Inc()
  137. logger.Info("submitted VAA",
  138. zap.String("tx_sig", res.Signature), zap.String("digest", h))
  139. }
  140. }
  141. }()
  142. select {
  143. case <-ctx.Done():
  144. return ctx.Err()
  145. case err := <-errC:
  146. return err
  147. }
  148. }