watcher.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. package terra
  2. import (
  3. "context"
  4. "encoding/base64"
  5. "encoding/hex"
  6. "fmt"
  7. "github.com/certusone/wormhole/node/pkg/p2p"
  8. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  9. "github.com/prometheus/client_golang/prometheus/promauto"
  10. "io/ioutil"
  11. "net/http"
  12. "strconv"
  13. "time"
  14. "github.com/prometheus/client_golang/prometheus"
  15. eth_common "github.com/ethereum/go-ethereum/common"
  16. "github.com/certusone/wormhole/node/pkg/common"
  17. "github.com/certusone/wormhole/node/pkg/readiness"
  18. "github.com/certusone/wormhole/node/pkg/supervisor"
  19. "github.com/certusone/wormhole/node/pkg/vaa"
  20. "github.com/gorilla/websocket"
  21. "github.com/tidwall/gjson"
  22. "go.uber.org/zap"
  23. )
  24. type (
  25. // Watcher is responsible for looking over Terra blockchain and reporting new transactions to the contract
  26. Watcher struct {
  27. urlWS string
  28. urlLCD string
  29. contract string
  30. msgChan chan *common.MessagePublication
  31. setChan chan *common.GuardianSet
  32. }
  33. )
  34. var (
  35. terraConnectionErrors = promauto.NewCounterVec(
  36. prometheus.CounterOpts{
  37. Name: "wormhole_terra_connection_errors_total",
  38. Help: "Total number of Terra connection errors",
  39. }, []string{"reason"})
  40. terraMessagesConfirmed = promauto.NewCounter(
  41. prometheus.CounterOpts{
  42. Name: "wormhole_terra_messages_confirmed_total",
  43. Help: "Total number of verified terra messages found",
  44. })
  45. currentTerraHeight = promauto.NewGauge(
  46. prometheus.GaugeOpts{
  47. Name: "wormhole_terra_current_height",
  48. Help: "Current terra slot height (at default commitment level, not the level used for observations)",
  49. })
  50. queryLatency = promauto.NewHistogramVec(
  51. prometheus.HistogramOpts{
  52. Name: "wormhole_terra_query_latency",
  53. Help: "Latency histogram for terra RPC calls",
  54. }, []string{"operation"})
  55. )
  56. type clientRequest struct {
  57. JSONRPC string `json:"jsonrpc"`
  58. // A String containing the name of the method to be invoked.
  59. Method string `json:"method"`
  60. // Object to pass as request parameter to the method.
  61. Params [1]string `json:"params"`
  62. // The request id. This can be of any type. It is used to match the
  63. // response with the request that it is replying to.
  64. ID uint64 `json:"id"`
  65. }
  66. // NewWatcher creates a new Terra contract watcher
  67. func NewWatcher(urlWS string, urlLCD string, contract string, lockEvents chan *common.MessagePublication, setEvents chan *common.GuardianSet) *Watcher {
  68. return &Watcher{urlWS: urlWS, urlLCD: urlLCD, contract: contract, msgChan: lockEvents, setChan: setEvents}
  69. }
  70. func (e *Watcher) Run(ctx context.Context) error {
  71. p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDTerra, &gossipv1.Heartbeat_Network{
  72. ContractAddress: e.contract,
  73. })
  74. errC := make(chan error)
  75. logger := supervisor.Logger(ctx)
  76. logger.Info("connecting to websocket", zap.String("url", e.urlWS))
  77. c, _, err := websocket.DefaultDialer.DialContext(ctx, e.urlWS, nil)
  78. if err != nil {
  79. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDTerra, 1)
  80. terraConnectionErrors.WithLabelValues("websocket_dial_error").Inc()
  81. return fmt.Errorf("websocket dial failed: %w", err)
  82. }
  83. defer c.Close()
  84. // Subscribe to smart contract transactions
  85. params := [...]string{fmt.Sprintf("tm.event='Tx' AND execute_contract.contract_address='%s'", e.contract)}
  86. command := &clientRequest{
  87. JSONRPC: "2.0",
  88. Method: "subscribe",
  89. Params: params,
  90. ID: 1,
  91. }
  92. err = c.WriteJSON(command)
  93. if err != nil {
  94. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDTerra, 1)
  95. terraConnectionErrors.WithLabelValues("websocket_subscription_error").Inc()
  96. return fmt.Errorf("websocket subscription failed: %w", err)
  97. }
  98. // Wait for the success response
  99. _, _, err = c.ReadMessage()
  100. if err != nil {
  101. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDTerra, 1)
  102. terraConnectionErrors.WithLabelValues("event_subscription_error").Inc()
  103. return fmt.Errorf("event subscription failed: %w", err)
  104. }
  105. logger.Info("subscribed to new transaction events")
  106. readiness.SetReady(common.ReadinessTerraSyncing)
  107. go func() {
  108. t := time.NewTicker(5 * time.Second)
  109. client := &http.Client{
  110. Timeout: time.Second * 5,
  111. }
  112. for {
  113. <-t.C
  114. // Query and report height and set currentTerraHeight
  115. resp, err := client.Get(fmt.Sprintf("%s/blocks/latest", e.urlLCD))
  116. if err != nil {
  117. logger.Error("query latest block response error", zap.Error(err))
  118. continue
  119. }
  120. blocksBody, err := ioutil.ReadAll(resp.Body)
  121. if err != nil {
  122. logger.Error("query guardian set error", zap.Error(err))
  123. errC <- err
  124. resp.Body.Close()
  125. continue
  126. }
  127. resp.Body.Close()
  128. blockJSON := string(blocksBody)
  129. latestBlock := gjson.Get(blockJSON, "block.header.height")
  130. logger.Info("current Terra height", zap.Int64("block", latestBlock.Int()))
  131. currentTerraHeight.Set(float64(latestBlock.Int()))
  132. p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDTerra, &gossipv1.Heartbeat_Network{
  133. Height: latestBlock.Int(),
  134. ContractAddress: e.contract,
  135. })
  136. }
  137. }()
  138. go func() {
  139. defer close(errC)
  140. for {
  141. _, message, err := c.ReadMessage()
  142. if err != nil {
  143. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDTerra, 1)
  144. terraConnectionErrors.WithLabelValues("channel_read_error").Inc()
  145. logger.Error("error reading channel", zap.Error(err))
  146. errC <- err
  147. return
  148. }
  149. // Received a message from the blockchain
  150. json := string(message)
  151. txHashRaw := gjson.Get(json, "result.events.tx\\.hash.0")
  152. if !txHashRaw.Exists() {
  153. logger.Warn("terra message does not have tx hash", zap.String("payload", json))
  154. continue
  155. }
  156. txHash := txHashRaw.String()
  157. events := gjson.Get(json, "result.data.value.TxResult.result.events")
  158. if !events.Exists() {
  159. logger.Warn("terra message has no events", zap.String("payload", json))
  160. continue
  161. }
  162. for _, event := range events.Array() {
  163. if !event.IsObject() {
  164. logger.Warn("terra event is invalid", zap.String("tx_hash", txHash), zap.String("event", event.String()))
  165. continue
  166. }
  167. eventType := gjson.Get(event.String(), "type")
  168. if eventType.String() != "wasm" {
  169. continue
  170. }
  171. attributes := gjson.Get(event.String(), "attributes")
  172. if !attributes.Exists() {
  173. logger.Warn("terra message event has no attributes", zap.String("payload", json), zap.String("event", event.String()))
  174. continue
  175. }
  176. mappedAttributes := map[string]string{}
  177. for _, attribute := range attributes.Array() {
  178. if !attribute.IsObject() {
  179. logger.Warn("terra event attribute is invalid", zap.String("tx_hash", txHash), zap.String("attribute", attribute.String()))
  180. continue
  181. }
  182. keyBase := gjson.Get(attribute.String(), "key")
  183. if !keyBase.Exists(){
  184. logger.Warn("terra event attribute does not have key", zap.String("tx_hash", txHash), zap.String("attribute", attribute.String()))
  185. continue
  186. }
  187. valueBase := gjson.Get(attribute.String(), "value")
  188. if !valueBase.Exists(){
  189. logger.Warn("terra event attribute does not have value", zap.String("tx_hash", txHash), zap.String("attribute", attribute.String()))
  190. continue
  191. }
  192. key, err := base64.StdEncoding.DecodeString(keyBase.String())
  193. if err != nil {
  194. logger.Warn("terra event key attribute is invalid", zap.String("tx_hash", txHash), zap.String("key", keyBase.String()))
  195. continue
  196. }
  197. value, err := base64.StdEncoding.DecodeString(valueBase.String())
  198. if err != nil {
  199. logger.Warn("terra event value attribute is invalid", zap.String("tx_hash", txHash), zap.String("key", keyBase.String()), zap.String("value", valueBase.String()))
  200. continue
  201. }
  202. if _, ok := mappedAttributes[string(key)]; ok {
  203. logger.Debug("duplicate key in events", zap.String("tx_hash", txHash), zap.String("key", keyBase.String()), zap.String("value", valueBase.String()))
  204. continue
  205. }
  206. mappedAttributes[string(key)] = string(value)
  207. }
  208. contractAddress, ok := mappedAttributes["contract_address"]
  209. if !ok {
  210. logger.Warn("terra wasm event without contract address field set", zap.String("event", event.String()))
  211. continue
  212. }
  213. // This is not a wormhole message
  214. if contractAddress != e.contract {
  215. continue
  216. }
  217. payload, ok := mappedAttributes["message.message"]
  218. if !ok {
  219. logger.Error("wormhole event does not have a message field", zap.String("tx_hash", txHash), zap.String("attributes", attributes.String()))
  220. continue
  221. }
  222. sender, ok := mappedAttributes["message.sender"]
  223. if !ok {
  224. logger.Error("wormhole event does not have a sender field", zap.String("tx_hash", txHash), zap.String("attributes", attributes.String()))
  225. continue
  226. }
  227. chainId, ok := mappedAttributes["message.chain_id"]
  228. if !ok {
  229. logger.Error("wormhole event does not have a chain_id field", zap.String("tx_hash", txHash), zap.String("attributes", attributes.String()))
  230. continue
  231. }
  232. nonce, ok := mappedAttributes["message.nonce"]
  233. if !ok {
  234. logger.Error("wormhole event does not have a nonce field", zap.String("tx_hash", txHash), zap.String("attributes", attributes.String()))
  235. continue
  236. }
  237. sequence, ok := mappedAttributes["message.sequence"]
  238. if !ok {
  239. logger.Error("wormhole event does not have a sequence field", zap.String("tx_hash", txHash), zap.String("attributes", attributes.String()))
  240. continue
  241. }
  242. blockTime, ok := mappedAttributes["message.block_time"]
  243. if !ok {
  244. logger.Error("wormhole event does not have a block_time field", zap.String("tx_hash", txHash), zap.String("attributes", attributes.String()))
  245. continue
  246. }
  247. logger.Info("new message detected on terra",
  248. zap.String("chainId", chainId),
  249. zap.String("txHash", txHash),
  250. zap.String("sender", sender),
  251. zap.String("nonce", nonce),
  252. zap.String("sequence", sequence),
  253. zap.String("blockTime", blockTime),
  254. )
  255. senderAddress, err := StringToAddress(sender)
  256. if err != nil {
  257. logger.Error("cannot decode emitter hex", zap.String("tx_hash", txHash), zap.String("value", sender))
  258. continue
  259. }
  260. txHashValue, err := StringToHash(txHash)
  261. if err != nil {
  262. logger.Error("cannot decode tx hash hex", zap.String("tx_hash", txHash), zap.String("value", txHash))
  263. continue
  264. }
  265. payloadValue, err := hex.DecodeString(payload)
  266. if err != nil {
  267. logger.Error("cannot decode payload", zap.String("tx_hash", txHash), zap.String("value", payload))
  268. continue
  269. }
  270. blockTimeInt, err := strconv.ParseInt(blockTime, 10, 64)
  271. if err != nil {
  272. logger.Error("blocktime cannot be parsed as int", zap.String("tx_hash", txHash), zap.String("value", blockTime))
  273. continue
  274. }
  275. nonceInt, err := strconv.ParseUint(nonce, 10, 32)
  276. if err != nil {
  277. logger.Error("nonce cannot be parsed as int", zap.String("tx_hash", txHash), zap.String("value", blockTime))
  278. continue
  279. }
  280. sequenceInt, err := strconv.ParseUint(sequence, 10, 64)
  281. if err != nil {
  282. logger.Error("sequence cannot be parsed as int", zap.String("tx_hash", txHash), zap.String("value", blockTime))
  283. continue
  284. }
  285. messagePublication := &common.MessagePublication{
  286. TxHash: txHashValue,
  287. Timestamp: time.Unix(blockTimeInt, 0),
  288. Nonce: uint32(nonceInt),
  289. Sequence: sequenceInt,
  290. EmitterChain: vaa.ChainIDTerra,
  291. EmitterAddress: senderAddress,
  292. Payload: payloadValue,
  293. ConsistencyLevel: 0, // Instant finality
  294. }
  295. e.msgChan <- messagePublication
  296. terraMessagesConfirmed.Inc()
  297. }
  298. client := &http.Client{
  299. Timeout: time.Second * 15,
  300. }
  301. // Query and report guardian set status
  302. requestURL := fmt.Sprintf("%s/wasm/contracts/%s/store?query_msg={\"guardian_set_info\":{}}", e.urlLCD, e.contract)
  303. req, err := http.NewRequestWithContext(ctx, http.MethodGet, requestURL, nil)
  304. if err != nil {
  305. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDTerra, 1)
  306. terraConnectionErrors.WithLabelValues("guardian_set_req_error").Inc()
  307. logger.Error("query guardian set request error", zap.Error(err))
  308. errC <- err
  309. return
  310. }
  311. msm := time.Now()
  312. resp, err := client.Do(req)
  313. if err != nil {
  314. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDTerra, 1)
  315. logger.Error("query guardian set response error", zap.Error(err))
  316. errC <- err
  317. return
  318. }
  319. body, err := ioutil.ReadAll(resp.Body)
  320. queryLatency.WithLabelValues("guardian_set_info").Observe(time.Since(msm).Seconds())
  321. if err != nil {
  322. p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDTerra, 1)
  323. logger.Error("query guardian set error", zap.Error(err))
  324. errC <- err
  325. resp.Body.Close()
  326. return
  327. }
  328. json = string(body)
  329. guardianSetIndex := gjson.Get(json, "result.guardian_set_index")
  330. addresses := gjson.Get(json, "result.addresses.#.bytes")
  331. logger.Debug("current guardian set on Terra",
  332. zap.Any("guardianSetIndex", guardianSetIndex),
  333. zap.Any("addresses", addresses))
  334. resp.Body.Close()
  335. // We do not send guardian changes to the processor - ETH guardians are the source of truth.
  336. }
  337. }()
  338. select {
  339. case <-ctx.Done():
  340. err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
  341. if err != nil {
  342. logger.Error("error on closing socket ", zap.Error(err))
  343. }
  344. return ctx.Err()
  345. case err := <-errC:
  346. return err
  347. }
  348. }
  349. // StringToAddress convert string into address
  350. func StringToAddress(value string) (vaa.Address, error) {
  351. var address vaa.Address
  352. res, err := hex.DecodeString(value)
  353. if err != nil {
  354. return address, err
  355. }
  356. copy(address[:], res)
  357. return address, nil
  358. }
  359. // StringToHash convert string into transaction hash
  360. func StringToHash(value string) (eth_common.Hash, error) {
  361. var hash eth_common.Hash
  362. res, err := hex.DecodeString(value)
  363. if err != nil {
  364. return hash, err
  365. }
  366. copy(hash[:], res)
  367. return hash, nil
  368. }