send_req.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543
  1. // This tool can be used to send various queries to the p2p gossip network.
  2. // It is meant for testing purposes only.
  3. package main
  4. import (
  5. "bytes"
  6. "context"
  7. "crypto/ecdsa"
  8. "encoding/hex"
  9. "fmt"
  10. "math/big"
  11. "strings"
  12. "time"
  13. "github.com/certusone/wormhole/node/hack/query/utils"
  14. "github.com/certusone/wormhole/node/pkg/common"
  15. "github.com/certusone/wormhole/node/pkg/p2p"
  16. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  17. "github.com/certusone/wormhole/node/pkg/query"
  18. "github.com/certusone/wormhole/node/pkg/random"
  19. "github.com/ethereum/go-ethereum/accounts/abi"
  20. ethCommon "github.com/ethereum/go-ethereum/common"
  21. "github.com/ethereum/go-ethereum/common/hexutil"
  22. ethCrypto "github.com/ethereum/go-ethereum/crypto"
  23. pubsub "github.com/libp2p/go-libp2p-pubsub"
  24. "github.com/libp2p/go-libp2p/core/crypto"
  25. "go.uber.org/zap"
  26. "google.golang.org/protobuf/proto"
  27. "github.com/gagliardetto/solana-go"
  28. )
  29. // this script has to be run inside kubernetes since it relies on UDP
  30. // https://github.com/kubernetes/kubernetes/issues/47862
  31. // kubectl --namespace=wormhole exec -it spy-0 -- sh -c "cd node/hack/query/ && go run send_req.go"
  32. // one way to iterate inside the container
  33. // kubectl --namespace=wormhole exec -it spy-0 -- bash
  34. // apt update
  35. // apt install nano
  36. // cd node/hack/query
  37. // echo "" > send_req.go
  38. // nano send_req.go
  39. // [paste, ^x, y, enter]
  40. // go run send_req.go
  41. func main() {
  42. //
  43. // BEGIN SETUP
  44. //
  45. p2pNetworkID := "/wormhole/dev"
  46. var p2pPort uint = 8998 // don't collide with spy so we can run from the same container in tilt
  47. p2pBootstrap := "/dns4/guardian-0.guardian/udp/8996/quic/p2p/12D3KooWL3XJ9EMCyZvmmGXL2LMiVBtrVa2BuESsJiXkSj7333Jw"
  48. nodeKeyPath := "./querier.key"
  49. ctx := context.Background()
  50. logger, _ := zap.NewDevelopment()
  51. signingKeyPath := string("./dev.guardian.key")
  52. logger.Info("Loading signing key", zap.String("signingKeyPath", signingKeyPath))
  53. sk, err := common.LoadGuardianKey(signingKeyPath, true)
  54. if err != nil {
  55. logger.Fatal("failed to load guardian key", zap.Error(err))
  56. }
  57. logger.Info("Signing key loaded", zap.String("publicKey", ethCrypto.PubkeyToAddress(sk.PublicKey).Hex()))
  58. // Load p2p private key
  59. var priv crypto.PrivKey
  60. priv, err = common.GetOrCreateNodeKey(logger, nodeKeyPath)
  61. if err != nil {
  62. logger.Fatal("Failed to load node key", zap.Error(err))
  63. }
  64. // Manual p2p setup
  65. components := p2p.DefaultComponents()
  66. components.Port = p2pPort
  67. bootstrapPeers := p2pBootstrap
  68. networkID := p2pNetworkID + "/ccq"
  69. h, err := p2p.NewHost(logger, ctx, networkID, bootstrapPeers, components, priv)
  70. if err != nil {
  71. panic(err)
  72. }
  73. topic_req := fmt.Sprintf("%s/%s", networkID, "ccq_req")
  74. topic_resp := fmt.Sprintf("%s/%s", networkID, "ccq_resp")
  75. logger.Info("Subscribing pubsub topic", zap.String("topic_req", topic_req), zap.String("topic_resp", topic_resp))
  76. ps, err := pubsub.NewGossipSub(ctx, h)
  77. if err != nil {
  78. panic(err)
  79. }
  80. th_req, err := ps.Join(topic_req)
  81. if err != nil {
  82. logger.Panic("failed to join request topic", zap.String("topic_req", topic_req), zap.Error(err))
  83. }
  84. th_resp, err := ps.Join(topic_resp)
  85. if err != nil {
  86. logger.Panic("failed to join response topic", zap.String("topic_resp", topic_resp), zap.Error(err))
  87. }
  88. sub, err := th_resp.Subscribe()
  89. if err != nil {
  90. logger.Panic("failed to subscribe to response topic", zap.Error(err))
  91. }
  92. logger.Info("Node has been started", zap.String("peer_id", h.ID().String()),
  93. zap.String("addrs", fmt.Sprintf("%v", h.Addrs())))
  94. // Wait for peers
  95. for len(th_req.ListPeers()) < 1 {
  96. time.Sleep(time.Millisecond * 100)
  97. }
  98. //
  99. // END SETUP
  100. //
  101. //
  102. // Solana Tests
  103. //
  104. {
  105. logger.Info("Running Solana account test")
  106. // Start of query creation...
  107. account1, err := solana.PublicKeyFromBase58("Bridge1p5gheXUvJ6jGWGeCsgPKgnE3YgdGKRVCMY9o")
  108. if err != nil {
  109. panic("solana account1 is invalid")
  110. }
  111. account2, err := solana.PublicKeyFromBase58("B6RHG3mfcckmrYN1UhmJzyS1XX3fZKbkeUcpJe9Sy3FE")
  112. if err != nil {
  113. panic("solana account2 is invalid")
  114. }
  115. callRequest := &query.SolanaAccountQueryRequest{
  116. Commitment: "finalized",
  117. DataSliceOffset: 0,
  118. DataSliceLength: 100,
  119. Accounts: [][query.SolanaPublicKeyLength]byte{account1, account2},
  120. }
  121. nonce, err := random.Uint32()
  122. if err != nil {
  123. panic(err)
  124. }
  125. queryRequest := &query.QueryRequest{
  126. Nonce: nonce,
  127. PerChainQueries: []*query.PerChainQueryRequest{
  128. {
  129. ChainId: 1,
  130. Query: callRequest,
  131. },
  132. },
  133. }
  134. sendSolanaQueryAndGetRsp(queryRequest, sk, th_req, ctx, logger, sub)
  135. }
  136. {
  137. logger.Info("Running Solana PDA test")
  138. // Start of query creation...
  139. callRequest := &query.SolanaPdaQueryRequest{
  140. Commitment: "finalized",
  141. DataSliceOffset: 0,
  142. DataSliceLength: 100,
  143. PDAs: []query.SolanaPDAEntry{
  144. query.SolanaPDAEntry{
  145. ProgramAddress: ethCommon.HexToHash("0x02c806312cbe5b79ef8aa6c17e3f423d8fdfe1d46909fb1f6cdf65ee8e2e6faa"), // Devnet core bridge
  146. Seeds: [][]byte{
  147. []byte("GuardianSet"),
  148. make([]byte, 4),
  149. },
  150. },
  151. },
  152. }
  153. nonce, err := random.Uint32()
  154. if err != nil {
  155. panic(err)
  156. }
  157. queryRequest := &query.QueryRequest{
  158. Nonce: nonce,
  159. PerChainQueries: []*query.PerChainQueryRequest{
  160. {
  161. ChainId: 1,
  162. Query: callRequest,
  163. },
  164. },
  165. }
  166. sendSolanaQueryAndGetRsp(queryRequest, sk, th_req, ctx, logger, sub)
  167. }
  168. logger.Info("Solana tests complete!")
  169. // return
  170. //
  171. // EVM Tests
  172. //
  173. wethAbi, err := abi.JSON(strings.NewReader("[{\"constant\":true,\"inputs\":[],\"name\":\"name\",\"outputs\":[{\"name\":\"\",\"type\":\"string\"}],\"payable\":false,\"stateMutability\":\"view\",\"type\":\"function\"},{\"constant\":true,\"inputs\":[],\"name\":\"totalSupply\",\"outputs\":[{\"name\":\"\",\"type\":\"uint256\"}],\"payable\":false,\"stateMutability\":\"view\",\"type\":\"function\"}]"))
  174. if err != nil {
  175. panic(err)
  176. }
  177. methods := []string{"name", "totalSupply"}
  178. callData := []*query.EthCallData{}
  179. to, _ := hex.DecodeString("DDb64fE46a91D46ee29420539FC25FD07c5FEa3E")
  180. for _, method := range methods {
  181. data, err := wethAbi.Pack(method)
  182. if err != nil {
  183. panic(err)
  184. }
  185. callData = append(callData, &query.EthCallData{
  186. To: to,
  187. Data: data,
  188. })
  189. }
  190. // Fetch the latest block number
  191. //url := "https://localhost:8545"
  192. url := "http://eth-devnet:8545"
  193. logger.Info("Querying for latest block height", zap.String("url", url))
  194. blockNum, err := utils.FetchLatestBlockNumberFromUrl(ctx, url)
  195. if err != nil {
  196. logger.Fatal("Failed to fetch latest block number", zap.Error(err))
  197. }
  198. logger.Info("latest block", zap.String("num", blockNum.String()), zap.String("encoded", hexutil.EncodeBig(blockNum)))
  199. // block := "0x28d9630"
  200. // block := "latest"
  201. // block := "0x9999bac44d09a7f69ee7941819b0a19c59ccb1969640cc513be09ef95ed2d8e2"
  202. // Start of query creation...
  203. callRequest := &query.EthCallQueryRequest{
  204. BlockId: hexutil.EncodeBig(blockNum),
  205. CallData: callData,
  206. }
  207. // Send 2 individual requests for the same thing but 5 blocks apart
  208. // First request...
  209. logger.Info("calling sendQueryAndGetRsp for ", zap.String("blockNum", blockNum.String()))
  210. queryRequest := createQueryRequest(callRequest)
  211. sendQueryAndGetRsp(queryRequest, sk, th_req, ctx, logger, sub, wethAbi, methods)
  212. // This is just so that when I look at the output, it is easier for me. (Paul)
  213. logger.Info("sleeping for 5 seconds")
  214. time.Sleep(time.Second * 5)
  215. // Second request...
  216. blockNum = blockNum.Sub(blockNum, big.NewInt(5))
  217. callRequest2 := &query.EthCallQueryRequest{
  218. BlockId: hexutil.EncodeBig(blockNum),
  219. CallData: callData,
  220. }
  221. queryRequest2 := createQueryRequest(callRequest2)
  222. logger.Info("calling sendQueryAndGetRsp for ", zap.String("blockNum", blockNum.String()))
  223. sendQueryAndGetRsp(queryRequest2, sk, th_req, ctx, logger, sub, wethAbi, methods)
  224. // Now, want to send a single query with multiple requests...
  225. logger.Info("Starting multiquery test in 5...")
  226. time.Sleep(time.Second * 5)
  227. multiCallRequest := []*query.EthCallQueryRequest{callRequest, callRequest2}
  228. multQueryRequest := createQueryRequestWithMultipleRequests(multiCallRequest)
  229. sendQueryAndGetRsp(multQueryRequest, sk, th_req, ctx, logger, sub, wethAbi, methods)
  230. // Cleanly shutdown
  231. // Without this the same host won't properly discover peers until some timeout
  232. sub.Cancel()
  233. if err := th_req.Close(); err != nil {
  234. logger.Fatal("Error closing the request topic", zap.Error(err))
  235. }
  236. if err := th_resp.Close(); err != nil {
  237. logger.Fatal("Error closing the response topic", zap.Error(err))
  238. }
  239. if err := h.Close(); err != nil {
  240. logger.Fatal("Error closing the host", zap.Error(err))
  241. }
  242. //
  243. // END SHUTDOWN
  244. //
  245. logger.Info("Success! All tests passed!")
  246. }
  247. const (
  248. GuardianKeyArmoredBlock = "WORMHOLE GUARDIAN PRIVATE KEY"
  249. )
  250. func createQueryRequest(callRequest *query.EthCallQueryRequest) *query.QueryRequest {
  251. nonce, err := random.Uint32()
  252. if err != nil {
  253. panic(err)
  254. }
  255. queryRequest := &query.QueryRequest{
  256. Nonce: nonce,
  257. PerChainQueries: []*query.PerChainQueryRequest{
  258. {
  259. ChainId: 2,
  260. Query: callRequest,
  261. },
  262. },
  263. }
  264. return queryRequest
  265. }
  266. func createQueryRequestWithMultipleRequests(callRequests []*query.EthCallQueryRequest) *query.QueryRequest {
  267. perChainQueries := []*query.PerChainQueryRequest{}
  268. for _, req := range callRequests {
  269. perChainQueries = append(perChainQueries, &query.PerChainQueryRequest{
  270. ChainId: 2,
  271. Query: req,
  272. })
  273. }
  274. nonce, err := random.Uint32()
  275. if err != nil {
  276. panic(err)
  277. }
  278. queryRequest := &query.QueryRequest{
  279. Nonce: nonce,
  280. PerChainQueries: perChainQueries,
  281. }
  282. return queryRequest
  283. }
  284. func sendQueryAndGetRsp(queryRequest *query.QueryRequest, sk *ecdsa.PrivateKey, th *pubsub.Topic, ctx context.Context, logger *zap.Logger, sub *pubsub.Subscription, wethAbi abi.ABI, methods []string) {
  285. queryRequestBytes, err := queryRequest.Marshal()
  286. if err != nil {
  287. panic(err)
  288. }
  289. numQueries := len(queryRequest.PerChainQueries)
  290. // Sign the query request using our private key.
  291. digest := query.QueryRequestDigest(common.UnsafeDevNet, queryRequestBytes)
  292. sig, err := ethCrypto.Sign(digest.Bytes(), sk)
  293. if err != nil {
  294. panic(err)
  295. }
  296. signedQueryRequest := &gossipv1.SignedQueryRequest{
  297. QueryRequest: queryRequestBytes,
  298. Signature: sig,
  299. }
  300. msg := gossipv1.GossipMessage{
  301. Message: &gossipv1.GossipMessage_SignedQueryRequest{
  302. SignedQueryRequest: signedQueryRequest,
  303. },
  304. }
  305. b, err := proto.Marshal(&msg)
  306. if err != nil {
  307. panic(err)
  308. }
  309. err = th.Publish(ctx, b)
  310. if err != nil {
  311. panic(err)
  312. }
  313. logger.Info("Waiting for message...")
  314. // TODO: max wait time
  315. // TODO: accumulate signatures to reach quorum
  316. for {
  317. envelope, err := sub.Next(ctx)
  318. if err != nil {
  319. logger.Panic("failed to receive pubsub message", zap.Error(err))
  320. }
  321. var msg gossipv1.GossipMessage
  322. err = proto.Unmarshal(envelope.Data, &msg)
  323. if err != nil {
  324. logger.Info("received invalid message",
  325. zap.Binary("data", envelope.Data),
  326. zap.String("from", envelope.GetFrom().String()))
  327. continue
  328. }
  329. var isMatchingResponse bool
  330. switch m := msg.Message.(type) {
  331. case *gossipv1.GossipMessage_SignedQueryResponse:
  332. logger.Info("query response received", zap.Any("response", m.SignedQueryResponse),
  333. zap.String("responseBytes", hexutil.Encode(m.SignedQueryResponse.QueryResponse)),
  334. zap.String("sigBytes", hexutil.Encode(m.SignedQueryResponse.Signature)))
  335. var response query.QueryResponsePublication
  336. err := response.Unmarshal(m.SignedQueryResponse.QueryResponse)
  337. if err != nil {
  338. logger.Warn("failed to unmarshal response", zap.Error(err))
  339. break
  340. }
  341. if bytes.Equal(response.Request.QueryRequest, queryRequestBytes) && bytes.Equal(response.Request.Signature, sig) {
  342. // TODO: verify response signature
  343. isMatchingResponse = true
  344. if len(response.PerChainResponses) != numQueries {
  345. logger.Warn("unexpected number of per chain query responses", zap.Int("expectedNum", numQueries), zap.Int("actualNum", len(response.PerChainResponses)))
  346. break
  347. }
  348. // Do double loop over responses
  349. for index := range response.PerChainResponses {
  350. logger.Info("per chain query response index", zap.Int("index", index))
  351. var localCallData []*query.EthCallData
  352. switch ecq := queryRequest.PerChainQueries[index].Query.(type) {
  353. case *query.EthCallQueryRequest:
  354. localCallData = ecq.CallData
  355. default:
  356. panic("unsupported query type")
  357. }
  358. var localResp *query.EthCallQueryResponse
  359. switch ecq := response.PerChainResponses[index].Response.(type) {
  360. case *query.EthCallQueryResponse:
  361. localResp = ecq
  362. default:
  363. panic("unsupported query type")
  364. }
  365. if len(localResp.Results) != len(localCallData) {
  366. logger.Warn("unexpected number of results", zap.Int("expectedNum", len(localCallData)), zap.Int("expectedNum", len(localResp.Results)))
  367. break
  368. }
  369. for idx, resp := range localResp.Results {
  370. result, err := wethAbi.Methods[methods[idx]].Outputs.Unpack(resp)
  371. if err != nil {
  372. logger.Warn("failed to unpack result", zap.Error(err))
  373. break
  374. }
  375. resultStr := hexutil.Encode(resp)
  376. logger.Info("found matching response", zap.Int("idx", idx), zap.Uint64("number", localResp.BlockNumber), zap.String("hash", localResp.Hash.String()), zap.String("time", localResp.Time.String()), zap.String("method", methods[idx]), zap.Any("resultDecoded", result), zap.String("resultStr", resultStr))
  377. }
  378. }
  379. }
  380. default:
  381. continue
  382. }
  383. if isMatchingResponse {
  384. break
  385. }
  386. }
  387. }
  388. func sendSolanaQueryAndGetRsp(queryRequest *query.QueryRequest, sk *ecdsa.PrivateKey, th *pubsub.Topic, ctx context.Context, logger *zap.Logger, sub *pubsub.Subscription) {
  389. queryRequestBytes, err := queryRequest.Marshal()
  390. if err != nil {
  391. panic(err)
  392. }
  393. numQueries := len(queryRequest.PerChainQueries)
  394. // Sign the query request using our private key.
  395. digest := query.QueryRequestDigest(common.UnsafeDevNet, queryRequestBytes)
  396. sig, err := ethCrypto.Sign(digest.Bytes(), sk)
  397. if err != nil {
  398. panic(err)
  399. }
  400. signedQueryRequest := &gossipv1.SignedQueryRequest{
  401. QueryRequest: queryRequestBytes,
  402. Signature: sig,
  403. }
  404. msg := gossipv1.GossipMessage{
  405. Message: &gossipv1.GossipMessage_SignedQueryRequest{
  406. SignedQueryRequest: signedQueryRequest,
  407. },
  408. }
  409. b, err := proto.Marshal(&msg)
  410. if err != nil {
  411. panic(err)
  412. }
  413. err = th.Publish(ctx, b)
  414. if err != nil {
  415. panic(err)
  416. }
  417. logger.Info("Waiting for message...")
  418. // TODO: max wait time
  419. // TODO: accumulate signatures to reach quorum
  420. for {
  421. envelope, err := sub.Next(ctx)
  422. if err != nil {
  423. logger.Panic("failed to receive pubsub message", zap.Error(err))
  424. }
  425. var msg gossipv1.GossipMessage
  426. err = proto.Unmarshal(envelope.Data, &msg)
  427. if err != nil {
  428. logger.Info("received invalid message",
  429. zap.Binary("data", envelope.Data),
  430. zap.String("from", envelope.GetFrom().String()))
  431. continue
  432. }
  433. var isMatchingResponse bool
  434. switch m := msg.Message.(type) {
  435. case *gossipv1.GossipMessage_SignedQueryResponse:
  436. logger.Info("query response received", zap.Any("response", m.SignedQueryResponse),
  437. zap.String("responseBytes", hexutil.Encode(m.SignedQueryResponse.QueryResponse)),
  438. zap.String("sigBytes", hexutil.Encode(m.SignedQueryResponse.Signature)))
  439. isMatchingResponse = true
  440. var response query.QueryResponsePublication
  441. err := response.Unmarshal(m.SignedQueryResponse.QueryResponse)
  442. if err != nil {
  443. logger.Warn("failed to unmarshal response", zap.Error(err))
  444. break
  445. }
  446. if bytes.Equal(response.Request.QueryRequest, queryRequestBytes) && bytes.Equal(response.Request.Signature, sig) {
  447. // TODO: verify response signature
  448. isMatchingResponse = true
  449. if len(response.PerChainResponses) != numQueries {
  450. logger.Warn("unexpected number of per chain query responses", zap.Int("expectedNum", numQueries), zap.Int("actualNum", len(response.PerChainResponses)))
  451. break
  452. }
  453. // Do double loop over responses
  454. for index := range response.PerChainResponses {
  455. switch r := response.PerChainResponses[index].Response.(type) {
  456. case *query.SolanaAccountQueryResponse:
  457. logger.Info("solana account query per chain response", zap.Int("index", index), zap.Any("pcr", r))
  458. case *query.SolanaPdaQueryResponse:
  459. logger.Info("solana pda query per chain response", zap.Int("index", index), zap.Any("pcr", r))
  460. default:
  461. panic(fmt.Sprintf("unsupported query type, should be solana, index: %d", index))
  462. }
  463. }
  464. }
  465. default:
  466. continue
  467. }
  468. if isMatchingResponse {
  469. break
  470. }
  471. }
  472. }