sui.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. package txverifier
  2. import (
  3. "context"
  4. "fmt"
  5. "os"
  6. "strconv"
  7. "time"
  8. "github.com/certusone/wormhole/node/pkg/telemetry"
  9. txverifier "github.com/certusone/wormhole/node/pkg/txverifier"
  10. "github.com/certusone/wormhole/node/pkg/version"
  11. ipfslog "github.com/ipfs/go-log/v2"
  12. "github.com/spf13/cobra"
  13. "go.uber.org/zap"
  14. )
  15. const (
  16. INITIAL_EVENT_FETCH_LIMIT = 25
  17. )
  18. // CLI args
  19. var (
  20. suiRPC *string
  21. suiCoreContract *string
  22. suiTokenBridgeEmitter *string
  23. suiTokenBridgeContract *string
  24. suiProcessInitialEvents *bool
  25. )
  26. var TransferVerifierCmdSui = &cobra.Command{
  27. Use: "sui",
  28. Short: "Transfer Verifier for Sui",
  29. Run: runTransferVerifierSui,
  30. }
  31. // CLI parameters
  32. // The MarkFlagRequired calls will cause the script to fail on their own. No need to handle the errors manually.
  33. //
  34. //nolint:errcheck
  35. func init() {
  36. suiRPC = TransferVerifierCmdSui.Flags().String("suiRPC", "", "Sui RPC url")
  37. suiCoreContract = TransferVerifierCmdSui.Flags().String("suiCoreContract", "", "Sui core contract address")
  38. suiTokenBridgeEmitter = TransferVerifierCmdSui.Flags().String("suiTokenBridgeEmitter", "", "Token bridge emitter on Sui")
  39. suiTokenBridgeContract = TransferVerifierCmdSui.Flags().String("suiTokenBridgeContract", "", "Token bridge contract on Sui")
  40. suiProcessInitialEvents = TransferVerifierCmdSui.Flags().Bool("suiProcessInitialEvents", false, "Indicate whether the Sui transfer verifier should process the initial events it fetches")
  41. TransferVerifierCmd.MarkFlagRequired("suiRPC")
  42. TransferVerifierCmd.MarkFlagRequired("suiCoreContract")
  43. TransferVerifierCmd.MarkFlagRequired("suiTokenBridgeEmitter")
  44. TransferVerifierCmd.MarkFlagRequired("suiTokenBridgeContract")
  45. }
  46. func runTransferVerifierSui(cmd *cobra.Command, args []string) {
  47. ctx := context.Background()
  48. // Setup logging
  49. // lvl, err := ipfslog.LevelFromString(*logLevel)
  50. lvl, err := ipfslog.LevelFromString("info")
  51. if err != nil {
  52. fmt.Println("Invalid log level")
  53. os.Exit(1)
  54. }
  55. logger := ipfslog.Logger("wormhole-transfer-verifier-sui").Desugar()
  56. ipfslog.SetAllLoggers(lvl)
  57. // Setup logging to Loki if configured
  58. if *telemetryLokiUrl != "" && *telemetryNodeName != "" {
  59. labels := map[string]string{
  60. // Is this required?
  61. // "network": *p2pNetworkID,
  62. "node_name": *telemetryNodeName,
  63. "version": version.Version(),
  64. }
  65. tm, err := telemetry.NewLokiCloudLogger(
  66. context.Background(),
  67. logger,
  68. *telemetryLokiUrl,
  69. "transfer-verifier-sui",
  70. // Private logs are not used in this code
  71. false,
  72. labels,
  73. )
  74. if err != nil {
  75. logger.Fatal("Failed to initialize telemetry", zap.Error(err))
  76. }
  77. defer tm.Close()
  78. logger = tm.WrapLogger(logger) // Wrap logger with telemetry logger
  79. }
  80. logger.Info("Starting Sui transfer verifier")
  81. logger.Debug("Sui rpc connection", zap.String("url", *suiRPC))
  82. logger.Debug("Sui core contract", zap.String("address", *suiCoreContract))
  83. logger.Debug("Sui token bridge contract", zap.String("address", *suiTokenBridgeContract))
  84. logger.Debug("token bridge event emitter", zap.String("object id", *suiTokenBridgeEmitter))
  85. logger.Debug("process initial events", zap.Bool("processInitialEvents", *suiProcessInitialEvents))
  86. // Verify CLI parameters
  87. if *suiRPC == "" || *suiCoreContract == "" || *suiTokenBridgeEmitter == "" || *suiTokenBridgeContract == "" {
  88. logger.Fatal("One or more CLI parameters are empty",
  89. zap.String("suiRPC", *suiRPC),
  90. zap.String("suiCoreContract", *suiCoreContract),
  91. zap.String("suiTokenBridgeEmitter", *suiTokenBridgeEmitter),
  92. zap.String("suiTokenBridgeContract", *suiTokenBridgeContract))
  93. }
  94. // Create a new SuiTransferVerifier
  95. suiTransferVerifier := txverifier.NewSuiTransferVerifier(*suiCoreContract, *suiTokenBridgeEmitter, *suiTokenBridgeContract)
  96. // Get the event filter
  97. eventFilter := suiTransferVerifier.GetEventFilter()
  98. suiApiConnection := txverifier.NewSuiApiConnection(*suiRPC)
  99. // Initial event fetching
  100. resp, err := suiApiConnection.QueryEvents(eventFilter, "null", INITIAL_EVENT_FETCH_LIMIT, true)
  101. if err != nil {
  102. logger.Fatal("Error in querying initial events", zap.Error(err))
  103. }
  104. initialEvents := resp.Result.Data
  105. // Use the latest timestamp to determine the starting point for live processing
  106. var latestTimestamp int
  107. for _, event := range initialEvents {
  108. if event.Timestamp != nil {
  109. timestampInt, err := strconv.Atoi(*event.Timestamp)
  110. if err != nil {
  111. logger.Error("Error converting timestamp to int", zap.Error(err))
  112. continue
  113. }
  114. if timestampInt > latestTimestamp {
  115. latestTimestamp = timestampInt
  116. }
  117. }
  118. }
  119. logger.Info("Initial events fetched", zap.Int("number of initial events", len(initialEvents)), zap.Int("latestTimestamp", latestTimestamp))
  120. // If specified, process the initial events. This is useful for running a number of digests
  121. // through the verifier before starting live processing.
  122. if *suiProcessInitialEvents {
  123. logger.Info("Processing initial events")
  124. for _, event := range initialEvents {
  125. if event.ID.TxDigest != nil {
  126. _, err = suiTransferVerifier.ProcessDigest(*event.ID.TxDigest, suiApiConnection, logger)
  127. if err != nil {
  128. logger.Error(err.Error())
  129. }
  130. }
  131. }
  132. }
  133. // Ticker for live processing
  134. ticker := time.NewTicker(5 * time.Second)
  135. defer ticker.Stop()
  136. for {
  137. select {
  138. case <-ctx.Done():
  139. logger.Info("Context cancelled")
  140. case <-ticker.C:
  141. // Fetch new events
  142. resp, err := suiApiConnection.QueryEvents(eventFilter, "null", 25, true)
  143. if err != nil {
  144. logger.Error("Error in querying new events", zap.Error(err))
  145. continue
  146. }
  147. newEvents := resp.Result.Data
  148. // List of transaction digests for transactions in which the WormholeMessage
  149. // event was emitted.
  150. var txDigests []string
  151. // Iterate over all events and get the transaction digests for events younger
  152. // than latestTimestamp. Also update latestTimestamp.
  153. for _, event := range newEvents {
  154. if event.Timestamp != nil {
  155. timestampInt, err := strconv.Atoi(*event.Timestamp)
  156. if err != nil {
  157. logger.Error("Error converting timestamp to int", zap.Error(err))
  158. continue
  159. }
  160. if timestampInt > latestTimestamp {
  161. latestTimestamp = timestampInt
  162. if event.ID.TxDigest != nil {
  163. txDigests = append(txDigests, *event.ID.TxDigest)
  164. }
  165. }
  166. }
  167. }
  168. for _, txDigest := range txDigests {
  169. _, err := suiTransferVerifier.ProcessDigest(txDigest, suiApiConnection, logger)
  170. if err != nil {
  171. logger.Error(err.Error())
  172. }
  173. }
  174. logger.Info("New events processed", zap.Int("latestTimestamp", latestTimestamp), zap.Int("txDigestCount", len(txDigests)))
  175. }
  176. }
  177. }