query_test.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. package query_test
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/hex"
  6. "fmt"
  7. "os"
  8. "strings"
  9. "testing"
  10. "time"
  11. "github.com/stretchr/testify/assert"
  12. "github.com/wormhole-foundation/wormhole/sdk/vaa"
  13. "github.com/certusone/wormhole/node/hack/query/utils"
  14. "github.com/certusone/wormhole/node/pkg/common"
  15. "github.com/certusone/wormhole/node/pkg/p2p"
  16. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  17. "github.com/certusone/wormhole/node/pkg/query"
  18. "github.com/ethereum/go-ethereum/accounts/abi"
  19. ethCommon "github.com/ethereum/go-ethereum/common"
  20. "github.com/ethereum/go-ethereum/common/hexutil"
  21. ethCrypto "github.com/ethereum/go-ethereum/crypto"
  22. pubsub "github.com/libp2p/go-libp2p-pubsub"
  23. "github.com/libp2p/go-libp2p/core/crypto"
  24. "go.uber.org/zap"
  25. "google.golang.org/protobuf/proto"
  26. )
  27. func TestCrossChainQuery(t *testing.T) {
  28. if os.Getenv("INTEGRATION") == "" {
  29. t.Skip("Skipping integration test, set environment variable INTEGRATION")
  30. }
  31. p2pNetworkID := "/wormhole/dev"
  32. var p2pPort uint = 8997
  33. p2pBootstrap := "/dns4/guardian-0.guardian/udp/8996/quic/p2p/12D3KooWL3XJ9EMCyZvmmGXL2LMiVBtrVa2BuESsJiXkSj7333Jw"
  34. nodeKeyPath := "../querier.key"
  35. ctx := context.Background()
  36. logger, _ := zap.NewDevelopment()
  37. if bootstrapPeers := os.Getenv("BOOTSTRAP_PEERS"); bootstrapPeers != "" {
  38. logger.Info("Overriding bootstrap peers", zap.String("old", p2pBootstrap), zap.String("new", bootstrapPeers))
  39. p2pBootstrap = bootstrapPeers
  40. }
  41. signingKeyPath := string("../dev.guardian.key")
  42. logger.Info("Loading signing key", zap.String("signingKeyPath", signingKeyPath))
  43. sk, err := common.LoadGuardianKey(signingKeyPath, true)
  44. if err != nil {
  45. logger.Fatal("failed to load guardian key", zap.Error(err))
  46. }
  47. logger.Info("Signing key loaded", zap.String("publicKey", ethCrypto.PubkeyToAddress(sk.PublicKey).Hex()))
  48. // Fetch the current guardian set
  49. idx, sgs, err := utils.FetchCurrentGuardianSet(common.GoTest)
  50. if err != nil {
  51. logger.Fatal("Failed to fetch current guardian set", zap.Error(err))
  52. }
  53. logger.Info("Fetched guardian set", zap.Any("keys", sgs.Keys))
  54. gs := common.GuardianSet{
  55. Keys: sgs.Keys,
  56. Index: idx,
  57. }
  58. // Fetch the latest block number
  59. blockNum, err := utils.FetchLatestBlockNumber(ctx, common.GoTest)
  60. if err != nil {
  61. logger.Fatal("Failed to fetch latest block number", zap.Error(err))
  62. }
  63. // Load p2p private key
  64. var priv crypto.PrivKey
  65. priv, err = common.GetOrCreateNodeKey(logger, nodeKeyPath)
  66. if err != nil {
  67. logger.Fatal("Failed to load node key", zap.Error(err))
  68. }
  69. // Manual p2p setup
  70. components := p2p.DefaultComponents()
  71. components.Port = p2pPort
  72. bootstrapPeers := p2pBootstrap
  73. networkID := p2pNetworkID + "/ccq"
  74. h, err := p2p.NewHost(logger, ctx, networkID, bootstrapPeers, components, priv)
  75. if err != nil {
  76. panic(err)
  77. }
  78. topic_req := fmt.Sprintf("%s/%s", networkID, "ccq_req")
  79. topic_resp := fmt.Sprintf("%s/%s", networkID, "ccq_resp")
  80. logger.Info("Subscribing pubsub topic", zap.String("topic_req", topic_req), zap.String("topic_resp", topic_resp))
  81. ps, err := pubsub.NewGossipSub(ctx, h)
  82. if err != nil {
  83. panic(err)
  84. }
  85. th_req, err := ps.Join(topic_req)
  86. if err != nil {
  87. logger.Panic("failed to join request topic", zap.String("topic_req", topic_req), zap.Error(err))
  88. }
  89. th_resp, err := ps.Join(topic_resp)
  90. if err != nil {
  91. logger.Panic("failed to join response topic", zap.String("topic_resp", topic_resp), zap.Error(err))
  92. }
  93. sub, err := th_resp.Subscribe()
  94. if err != nil {
  95. logger.Panic("failed to subscribe to response topic", zap.Error(err))
  96. }
  97. logger.Info("Node has been started", zap.String("peer_id", h.ID().String()),
  98. zap.String("addrs", fmt.Sprintf("%v", h.Addrs())))
  99. // Wait for peers
  100. for len(th_req.ListPeers()) < 1 {
  101. time.Sleep(time.Millisecond * 100)
  102. }
  103. logger.Info("Detected peers")
  104. wethAbi, err := abi.JSON(strings.NewReader("[{\"constant\":true,\"inputs\":[],\"name\":\"name\",\"outputs\":[{\"name\":\"\",\"type\":\"string\"}],\"payable\":false,\"stateMutability\":\"view\",\"type\":\"function\"},{\"constant\":true,\"inputs\":[],\"name\":\"totalSupply\",\"outputs\":[{\"name\":\"\",\"type\":\"uint256\"}],\"payable\":false,\"stateMutability\":\"view\",\"type\":\"function\"}]"))
  105. if err != nil {
  106. panic(err)
  107. }
  108. methodName := "name"
  109. data, err := wethAbi.Pack(methodName)
  110. if err != nil {
  111. panic(err)
  112. }
  113. to, _ := hex.DecodeString("DDb64fE46a91D46ee29420539FC25FD07c5FEa3E") // WETH
  114. callData := []*query.EthCallData{
  115. {
  116. To: to,
  117. Data: data,
  118. },
  119. }
  120. callRequest := &query.EthCallQueryRequest{
  121. BlockId: hexutil.EncodeBig(blockNum),
  122. CallData: callData,
  123. }
  124. queryRequest := &query.QueryRequest{
  125. Nonce: 1,
  126. PerChainQueries: []*query.PerChainQueryRequest{
  127. {
  128. ChainId: 2,
  129. Query: callRequest,
  130. },
  131. },
  132. }
  133. queryRequestBytes, err := queryRequest.Marshal()
  134. if err != nil {
  135. panic(err)
  136. }
  137. // Sign the query request using our private key.
  138. digest := query.QueryRequestDigest(common.UnsafeDevNet, queryRequestBytes)
  139. sig, err := ethCrypto.Sign(digest.Bytes(), sk)
  140. if err != nil {
  141. panic(err)
  142. }
  143. signedQueryRequest := &gossipv1.SignedQueryRequest{
  144. QueryRequest: queryRequestBytes,
  145. Signature: sig,
  146. }
  147. msg := gossipv1.GossipMessage{
  148. Message: &gossipv1.GossipMessage_SignedQueryRequest{
  149. SignedQueryRequest: signedQueryRequest,
  150. },
  151. }
  152. b, err := proto.Marshal(&msg)
  153. if err != nil {
  154. panic(err)
  155. }
  156. err = th_req.Publish(ctx, b)
  157. if err != nil {
  158. panic(err)
  159. }
  160. logger.Info("Waiting for message...")
  161. var success bool
  162. signers := map[int]bool{}
  163. // The guardians can retry for up to a minute so we have to wait longer than that.
  164. subCtx, cancel := context.WithTimeout(ctx, 75*time.Second)
  165. defer cancel()
  166. for {
  167. envelope, err := sub.Next(subCtx)
  168. if err != nil {
  169. break
  170. }
  171. var msg gossipv1.GossipMessage
  172. err = proto.Unmarshal(envelope.Data, &msg)
  173. if err != nil {
  174. logger.Fatal("received invalid message",
  175. zap.Binary("data", envelope.Data),
  176. zap.String("from", envelope.GetFrom().String()))
  177. }
  178. switch m := msg.Message.(type) {
  179. case *gossipv1.GossipMessage_SignedQueryResponse:
  180. var response query.QueryResponsePublication
  181. err := response.Unmarshal(m.SignedQueryResponse.QueryResponse)
  182. if err != nil {
  183. logger.Fatal("failed to unmarshal response", zap.Error(err), zap.Any("response", m.SignedQueryResponse))
  184. }
  185. logger.Info("query response received", zap.Any("response", response))
  186. if bytes.Equal(response.Request.QueryRequest, queryRequestBytes) && bytes.Equal(response.Request.Signature, sig) {
  187. digest := query.GetQueryResponseDigestFromBytes(m.SignedQueryResponse.QueryResponse)
  188. signerBytes, err := ethCrypto.Ecrecover(digest.Bytes(), m.SignedQueryResponse.Signature)
  189. if err != nil {
  190. logger.Fatal("failed to verify signature on response",
  191. zap.String("digest", digest.Hex()),
  192. zap.String("signature", hex.EncodeToString(m.SignedQueryResponse.Signature)),
  193. zap.Error(err))
  194. }
  195. signerAddress := ethCommon.BytesToAddress(ethCrypto.Keccak256(signerBytes[1:])[12:])
  196. if keyIdx, ok := gs.KeyIndex(signerAddress); !ok {
  197. logger.Fatal("received observation by unknown guardian - is our guardian set outdated?",
  198. zap.String("digest", digest.Hex()),
  199. zap.String("address", signerAddress.Hex()),
  200. zap.Uint32("index", gs.Index),
  201. zap.Any("keys", gs.KeysAsHexStrings()),
  202. )
  203. } else {
  204. signers[keyIdx] = true
  205. }
  206. quorum := vaa.CalculateQuorum(len(gs.Keys))
  207. if len(signers) < quorum {
  208. logger.Sugar().Infof("not enough signers, have %d need %d", len(signers), quorum)
  209. continue
  210. }
  211. if len(response.PerChainResponses) != 1 {
  212. logger.Warn("unexpected number of per chain query responses", zap.Int("expectedNum", 1), zap.Int("actualNum", len(response.PerChainResponses)))
  213. break
  214. }
  215. var pcq *query.EthCallQueryResponse
  216. switch ecq := response.PerChainResponses[0].Response.(type) {
  217. case *query.EthCallQueryResponse:
  218. pcq = ecq
  219. default:
  220. panic("unsupported query type")
  221. }
  222. if len(pcq.Results) == 0 {
  223. logger.Warn("response did not contain any results", zap.Error(err))
  224. break
  225. }
  226. for idx, resp := range pcq.Results {
  227. result, err := wethAbi.Methods[methodName].Outputs.Unpack(resp)
  228. if err != nil {
  229. logger.Warn("failed to unpack result", zap.Error(err))
  230. break
  231. }
  232. resultStr := hexutil.Encode(resp)
  233. logger.Info("found matching response", zap.Int("idx", idx), zap.Uint64("number", pcq.BlockNumber), zap.String("hash", pcq.Hash.String()), zap.String("time", pcq.Time.String()), zap.Any("resultDecoded", result), zap.String("resultStr", resultStr))
  234. }
  235. success = true
  236. }
  237. default:
  238. continue
  239. }
  240. if success {
  241. break
  242. }
  243. }
  244. assert.True(t, success)
  245. // Cleanly shutdown
  246. // Without this the same host won't properly discover peers until some timeout
  247. sub.Cancel()
  248. if err := th_req.Close(); err != nil {
  249. logger.Fatal("Error closing the request topic", zap.Error(err))
  250. }
  251. if err := th_resp.Close(); err != nil {
  252. logger.Fatal("Error closing the response topic", zap.Error(err))
  253. }
  254. if err := h.Close(); err != nil {
  255. logger.Error("Error closing the host", zap.Error(err))
  256. }
  257. }
  258. const (
  259. GuardianKeyArmoredBlock = "WORMHOLE GUARDIAN PRIVATE KEY"
  260. )