watcher.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. package terra
  2. import (
  3. "context"
  4. "encoding/hex"
  5. "fmt"
  6. "github.com/certusone/wormhole/bridge/pkg/p2p"
  7. gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
  8. "io/ioutil"
  9. "net/http"
  10. "time"
  11. "github.com/prometheus/client_golang/prometheus"
  12. eth_common "github.com/ethereum/go-ethereum/common"
  13. "github.com/certusone/wormhole/bridge/pkg/common"
  14. "github.com/certusone/wormhole/bridge/pkg/readiness"
  15. "github.com/certusone/wormhole/bridge/pkg/supervisor"
  16. "github.com/certusone/wormhole/bridge/pkg/vaa"
  17. "github.com/gorilla/websocket"
  18. "github.com/tidwall/gjson"
  19. "go.uber.org/zap"
  20. )
  21. type (
  22. // BridgeWatcher is responsible for looking over Terra blockchain and reporting new transactions to the bridge
  23. BridgeWatcher struct {
  24. urlWS string
  25. urlLCD string
  26. bridge string
  27. msgChan chan *common.MessagePublication
  28. setChan chan *common.GuardianSet
  29. }
  30. )
  31. var (
  32. terraConnectionErrors = prometheus.NewCounterVec(
  33. prometheus.CounterOpts{
  34. Name: "wormhole_terra_connection_errors_total",
  35. Help: "Total number of Terra connection errors",
  36. }, []string{"reason"})
  37. terraLockupsConfirmed = prometheus.NewCounter(
  38. prometheus.CounterOpts{
  39. Name: "wormhole_terra_lockups_confirmed_total",
  40. Help: "Total number of verified terra lockups found",
  41. })
  42. currentTerraHeight = prometheus.NewGauge(
  43. prometheus.GaugeOpts{
  44. Name: "wormhole_terra_current_height",
  45. Help: "Current terra slot height (at default commitment level, not the level used for lockups)",
  46. })
  47. queryLatency = prometheus.NewHistogramVec(
  48. prometheus.HistogramOpts{
  49. Name: "wormhole_terra_query_latency",
  50. Help: "Latency histogram for terra RPC calls",
  51. }, []string{"operation"})
  52. )
  53. func init() {
  54. prometheus.MustRegister(terraConnectionErrors)
  55. prometheus.MustRegister(terraLockupsConfirmed)
  56. prometheus.MustRegister(currentTerraHeight)
  57. prometheus.MustRegister(queryLatency)
  58. }
  59. type clientRequest struct {
  60. JSONRPC string `json:"jsonrpc"`
  61. // A String containing the name of the method to be invoked.
  62. Method string `json:"method"`
  63. // Object to pass as request parameter to the method.
  64. Params [1]string `json:"params"`
  65. // The request id. This can be of any type. It is used to match the
  66. // response with the request that it is replying to.
  67. ID uint64 `json:"id"`
  68. }
  69. // NewTerraBridgeWatcher creates a new terra bridge watcher
  70. func NewTerraBridgeWatcher(urlWS string, urlLCD string, bridge string, lockEvents chan *common.MessagePublication, setEvents chan *common.GuardianSet) *BridgeWatcher {
  71. return &BridgeWatcher{urlWS: urlWS, urlLCD: urlLCD, bridge: bridge, msgChan: lockEvents, setChan: setEvents}
  72. }
  73. // Run is the main Terra Bridge run cycle
  74. func (e *BridgeWatcher) Run(ctx context.Context) error {
  75. p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDTerra, &gossipv1.Heartbeat_Network{
  76. BridgeAddress: e.bridge,
  77. })
  78. errC := make(chan error)
  79. logger := supervisor.Logger(ctx)
  80. logger.Info("connecting to websocket", zap.String("url", e.urlWS))
  81. c, _, err := websocket.DefaultDialer.DialContext(ctx, e.urlWS, nil)
  82. if err != nil {
  83. terraConnectionErrors.WithLabelValues("websocket_dial_error").Inc()
  84. return fmt.Errorf("websocket dial failed: %w", err)
  85. }
  86. defer c.Close()
  87. // Subscribe to smart contract transactions
  88. params := [...]string{fmt.Sprintf("tm.event='Tx' AND execute_contract.contract_address='%s'", e.bridge)}
  89. command := &clientRequest{
  90. JSONRPC: "2.0",
  91. Method: "subscribe",
  92. Params: params,
  93. ID: 1,
  94. }
  95. err = c.WriteJSON(command)
  96. if err != nil {
  97. terraConnectionErrors.WithLabelValues("websocket_subscription_error").Inc()
  98. return fmt.Errorf("websocket subscription failed: %w", err)
  99. }
  100. // Wait for the success response
  101. _, _, err = c.ReadMessage()
  102. if err != nil {
  103. terraConnectionErrors.WithLabelValues("event_subscription_error").Inc()
  104. return fmt.Errorf("event subscription failed: %w", err)
  105. }
  106. logger.Info("subscribed to new transaction events")
  107. readiness.SetReady(common.ReadinessTerraSyncing)
  108. go func() {
  109. t := time.NewTicker(5 * time.Second)
  110. client := &http.Client{
  111. Timeout: time.Second * 5,
  112. }
  113. for {
  114. <-t.C
  115. // Query and report height and set currentTerraHeight
  116. resp, err := client.Get(fmt.Sprintf("%s/blocks/latest", e.urlLCD))
  117. if err != nil {
  118. logger.Error("query latest block response error", zap.Error(err))
  119. continue
  120. }
  121. blocksBody, err := ioutil.ReadAll(resp.Body)
  122. if err != nil {
  123. logger.Error("query guardian set error", zap.Error(err))
  124. errC <- err
  125. resp.Body.Close()
  126. continue
  127. }
  128. resp.Body.Close()
  129. blockJSON := string(blocksBody)
  130. latestBlock := gjson.Get(blockJSON, "block.header.height")
  131. logger.Info("current Terra height", zap.Int64("block", latestBlock.Int()))
  132. currentTerraHeight.Set(float64(latestBlock.Int()))
  133. p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDTerra, &gossipv1.Heartbeat_Network{
  134. Height: latestBlock.Int(),
  135. BridgeAddress: e.bridge,
  136. })
  137. }
  138. }()
  139. go func() {
  140. defer close(errC)
  141. for {
  142. _, message, err := c.ReadMessage()
  143. if err != nil {
  144. terraConnectionErrors.WithLabelValues("channel_read_error").Inc()
  145. logger.Error("error reading channel", zap.Error(err))
  146. errC <- err
  147. return
  148. }
  149. // Received a message from the blockchain
  150. json := string(message)
  151. payload := gjson.Get(json, "result.events.from_contract\\.message\\.message.0")
  152. sender := gjson.Get(json, "result.events.from_contract\\.message\\.sender.0")
  153. chainId := gjson.Get(json, "result.events.from_contract\\.message\\.chain_id.0")
  154. nonce := gjson.Get(json, "result.events.from_contract\\.message\\.nonce.0")
  155. sequence := gjson.Get(json, "result.events.from_contract\\.message\\.sequence.0")
  156. blockTime := gjson.Get(json, "result.events.from_contract\\.message\\.block_time.0")
  157. persist := gjson.Get(json, "result.events.from_contract\\.message\\.persist.0")
  158. txHash := gjson.Get(json, "result.events.tx\\.hash.0")
  159. if payload.Exists() && sender.Exists() && chainId.Exists() && nonce.Exists() && sequence.Exists() &&
  160. blockTime.Exists() && txHash.Exists() && persist.Exists() {
  161. logger.Info("new message detected on terra",
  162. zap.String("chainId", chainId.String()),
  163. zap.String("txHash", txHash.String()),
  164. zap.String("sender", sender.String()),
  165. zap.String("nonce", nonce.String()),
  166. zap.String("sequence", sequence.String()),
  167. zap.String("blockTime", blockTime.String()),
  168. zap.String("persist", persist.String()),
  169. )
  170. senderAddress, err := StringToAddress(sender.String())
  171. if err != nil {
  172. logger.Error("cannot decode emitter hex", zap.String("value", sender.String()))
  173. continue
  174. }
  175. txHashValue, err := StringToHash(txHash.String())
  176. if err != nil {
  177. logger.Error("cannot decode tx hash hex", zap.String("value", txHash.String()))
  178. continue
  179. }
  180. payloadValue, err := hex.DecodeString(payload.String())
  181. if err != nil {
  182. logger.Error("cannot decode payload", zap.String("value", payload.String()))
  183. continue
  184. }
  185. messagePublication := &common.MessagePublication{
  186. TxHash: txHashValue,
  187. Timestamp: time.Unix(blockTime.Int(), 0),
  188. Nonce: uint32(nonce.Uint()),
  189. Sequence: sequence.Uint(),
  190. EmitterChain: vaa.ChainIDTerra,
  191. EmitterAddress: senderAddress,
  192. Payload: payloadValue,
  193. Persist: persist.Bool(),
  194. }
  195. e.msgChan <- messagePublication
  196. terraLockupsConfirmed.Inc()
  197. }
  198. client := &http.Client{
  199. Timeout: time.Second * 15,
  200. }
  201. // Query and report guardian set status
  202. requestURL := fmt.Sprintf("%s/wasm/contracts/%s/store?query_msg={\"guardian_set_info\":{}}", e.urlLCD, e.bridge)
  203. req, err := http.NewRequestWithContext(ctx, http.MethodGet, requestURL, nil)
  204. if err != nil {
  205. terraConnectionErrors.WithLabelValues("guardian_set_req_error").Inc()
  206. logger.Error("query guardian set request error", zap.Error(err))
  207. errC <- err
  208. return
  209. }
  210. msm := time.Now()
  211. resp, err := client.Do(req)
  212. if err != nil {
  213. logger.Error("query guardian set response error", zap.Error(err))
  214. errC <- err
  215. return
  216. }
  217. body, err := ioutil.ReadAll(resp.Body)
  218. queryLatency.WithLabelValues("guardian_set_info").Observe(time.Since(msm).Seconds())
  219. if err != nil {
  220. logger.Error("query guardian set error", zap.Error(err))
  221. errC <- err
  222. resp.Body.Close()
  223. return
  224. }
  225. json = string(body)
  226. guardianSetIndex := gjson.Get(json, "result.guardian_set_index")
  227. addresses := gjson.Get(json, "result.addresses.#.bytes")
  228. logger.Debug("current guardian set on Terra",
  229. zap.Any("guardianSetIndex", guardianSetIndex),
  230. zap.Any("addresses", addresses))
  231. resp.Body.Close()
  232. // We do not send guardian changes to the processor - ETH guardians are the source of truth.
  233. }
  234. }()
  235. select {
  236. case <-ctx.Done():
  237. err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
  238. if err != nil {
  239. logger.Error("error on closing socket ", zap.Error(err))
  240. }
  241. return ctx.Err()
  242. case err := <-errC:
  243. return err
  244. }
  245. }
  246. // StringToAddress convert string into address
  247. func StringToAddress(value string) (vaa.Address, error) {
  248. var address vaa.Address
  249. res, err := hex.DecodeString(value)
  250. if err != nil {
  251. return address, err
  252. }
  253. copy(address[:], res)
  254. return address, nil
  255. }
  256. // StringToHash convert string into transaction hash
  257. func StringToHash(value string) (eth_common.Hash, error) {
  258. var hash eth_common.Hash
  259. res, err := hex.DecodeString(value)
  260. if err != nil {
  261. return hash, err
  262. }
  263. copy(hash[:], res)
  264. return hash, nil
  265. }