submitter.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. package solana
  2. import (
  3. "context"
  4. "encoding/hex"
  5. "fmt"
  6. "github.com/prometheus/client_golang/prometheus"
  7. "strings"
  8. "time"
  9. "google.golang.org/grpc"
  10. "google.golang.org/grpc/codes"
  11. "google.golang.org/grpc/status"
  12. "go.uber.org/zap"
  13. agentv1 "github.com/certusone/wormhole/node/pkg/proto/agent/v1"
  14. "github.com/certusone/wormhole/node/pkg/readiness"
  15. "github.com/certusone/wormhole/node/pkg/common"
  16. "github.com/certusone/wormhole/node/pkg/supervisor"
  17. "github.com/certusone/wormhole/node/pkg/vaa"
  18. )
  19. var (
  20. solanaVAASubmitted = prometheus.NewCounter(
  21. prometheus.CounterOpts{
  22. Name: "wormhole_solana_vaa_submitted_total",
  23. Help: "Total number of VAAs successfully submitted to the chain",
  24. })
  25. solanaFeePayerBalance = prometheus.NewGauge(
  26. prometheus.GaugeOpts{
  27. Name: "wormhole_solana_fee_account_balance_lamports",
  28. Help: "Current fee payer account balance in lamports",
  29. })
  30. )
  31. func init() {
  32. prometheus.MustRegister(solanaVAASubmitted)
  33. prometheus.MustRegister(solanaFeePayerBalance)
  34. }
  35. type (
  36. SolanaVAASubmitter struct {
  37. url string
  38. vaaChan chan *vaa.VAA
  39. skipPreflight bool
  40. }
  41. )
  42. func NewSolanaVAASubmitter(url string, vaaQueue chan *vaa.VAA, skipPreflight bool) *SolanaVAASubmitter {
  43. return &SolanaVAASubmitter{url: url, vaaChan: vaaQueue, skipPreflight: skipPreflight}
  44. }
  45. func (e *SolanaVAASubmitter) Run(ctx context.Context) error {
  46. // We only support UNIX sockets since we rely on Unix filesystem permissions for access control.
  47. path := fmt.Sprintf("unix://%s", e.url)
  48. timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
  49. defer cancel()
  50. conn, err := grpc.DialContext(timeout, path, grpc.WithBlock(), grpc.WithInsecure())
  51. if err != nil {
  52. return fmt.Errorf("failed to dial agent at %s: %w", path, err)
  53. }
  54. defer conn.Close()
  55. c := agentv1.NewAgentClient(conn)
  56. errC := make(chan error)
  57. logger := supervisor.Logger(ctx)
  58. // Check whether agent is up by doing a GetBalance call.
  59. balance, err := c.GetBalance(timeout, &agentv1.GetBalanceRequest{})
  60. if err != nil {
  61. solanaConnectionErrors.WithLabelValues("get_balance_error").Inc()
  62. return fmt.Errorf("failed to get balance: %v", err)
  63. }
  64. readiness.SetReady(common.ReadinessSolanaSyncing)
  65. logger.Info("account balance", zap.Uint64("lamports", balance.Balance))
  66. // Periodically request the balance for monitoring
  67. btick := time.NewTicker(1 * time.Minute)
  68. go func() {
  69. for {
  70. select {
  71. case <-ctx.Done():
  72. return
  73. case <-btick.C:
  74. timeout, cancel := context.WithTimeout(ctx, 5*time.Second)
  75. balance, err = c.GetBalance(timeout, &agentv1.GetBalanceRequest{})
  76. if err != nil {
  77. solanaConnectionErrors.WithLabelValues("get_balance_error").Inc()
  78. // With PostVAA, it's hard to tell, but if this one fails we know
  79. // that something went wrong and we should restart the service.
  80. errC <- fmt.Errorf("failed to get balance: %v", err)
  81. cancel()
  82. break
  83. }
  84. cancel()
  85. solanaFeePayerBalance.Set(float64(balance.Balance))
  86. case v := <-e.vaaChan:
  87. vaaBytes, err := v.Marshal()
  88. if err != nil {
  89. panic(err)
  90. }
  91. // Calculate digest so we can log it (TODO: refactor to vaa method? we do this in different places)
  92. m, err := v.SigningMsg()
  93. if err != nil {
  94. panic(err)
  95. }
  96. h := hex.EncodeToString(m.Bytes())
  97. timeout, cancel := context.WithTimeout(ctx, 120*time.Second)
  98. res, err := c.SubmitVAA(timeout, &agentv1.SubmitVAARequest{Vaa: vaaBytes, SkipPreflight: e.skipPreflight})
  99. cancel()
  100. if err != nil {
  101. st, ok := status.FromError(err)
  102. if !ok {
  103. panic("err not a status")
  104. }
  105. // For transient errors, we can put the VAA back into the queue such that it can
  106. // be retried after the runnable has been rescheduled.
  107. switch st.Code() {
  108. case
  109. // Our context was cancelled, likely because the watcher stream died.
  110. codes.Canceled,
  111. // The agent encountered a transient error, likely node unavailability.
  112. codes.Unavailable,
  113. codes.Aborted,
  114. codes.DeadlineExceeded:
  115. solanaConnectionErrors.WithLabelValues("postvaa_transient_error").Inc()
  116. logger.Error("transient error, requeuing VAA", zap.Error(err), zap.String("digest", h))
  117. // Tombstone goroutine
  118. go func(v *vaa.VAA) {
  119. time.Sleep(10 * time.Second)
  120. e.vaaChan <- v
  121. }(v)
  122. case codes.Internal:
  123. // This VAA has already been executed on chain, successfully or not.
  124. // TODO: dissect InstructionError in agent and convert this to the proper gRPC code
  125. if strings.Contains(st.Message(), "custom program error: 0xb") { // AlreadyExists
  126. solanaConnectionErrors.WithLabelValues("postvaa_already_exists").Inc()
  127. logger.Info("VAA already submitted on-chain, ignoring", zap.Error(err), zap.String("digest", h))
  128. break
  129. }
  130. if strings.Contains(st.Message(), "tx sending failed: unable to confirm transaction") { // Unavailable
  131. solanaConnectionErrors.WithLabelValues("postvaa_tx_sending_failed").Inc()
  132. logger.Error("tx sending failed, requeuing VAA", zap.Error(err), zap.String("digest", h))
  133. go func(v *vaa.VAA) {
  134. time.Sleep(1 * time.Second)
  135. e.vaaChan <- v
  136. }(v)
  137. break
  138. }
  139. fallthrough
  140. default:
  141. solanaConnectionErrors.WithLabelValues("postvaa_internal_error").Inc()
  142. logger.Error("error submitting VAA", zap.Error(err), zap.String("digest", h))
  143. }
  144. break
  145. }
  146. solanaVAASubmitted.Inc()
  147. logger.Info("submitted VAA",
  148. zap.String("tx_sig", res.Signature), zap.String("digest", h))
  149. }
  150. }
  151. }()
  152. select {
  153. case <-ctx.Done():
  154. return ctx.Err()
  155. case err := <-errC:
  156. return err
  157. }
  158. }