Prechádzať zdrojové kódy

CCQ: Testing (#3424)

* CCQ: Testing

* Use new p2p.NewHost function

* More rework

* More rework
bruce-riley 2 rokov pred
rodič
commit
ea70610e46

+ 21 - 1
Tiltfile

@@ -72,7 +72,7 @@ config.define_bool("wormchain", False, "Enable a wormchain node")
 config.define_bool("ibc_relayer", False, "Enable IBC relayer between cosmos chains")
 config.define_bool("redis", False, "Enable a redis instance")
 config.define_bool("generic_relayer", False, "Enable the generic relayer off-chain component")
-
+config.define_bool("query_server", False, "Enable cross-chain query server")
 
 cfg = config.parse()
 num_guardians = int(cfg.get("num", "1"))
@@ -97,6 +97,7 @@ ibc_relayer = cfg.get("ibc_relayer", ci)
 btc = cfg.get("btc", False)
 redis = cfg.get('redis', ci)
 generic_relayer = cfg.get("generic_relayer", ci)
+query_server = cfg.get("query_server", ci)
 
 if ci:
     guardiand_loglevel = cfg.get("guardiand_loglevel", "warn")
@@ -584,6 +585,12 @@ if ci_tests:
         trigger_mode = trigger_mode,
         resource_deps = [], # uses devnet-consts.json, but wormchain/contracts/tools/test_accountant.sh handles waiting for guardian, not having deps gets the build earlier
     )
+    k8s_resource(
+        "query-ci-tests",
+        labels = ["ci"],
+        trigger_mode = trigger_mode,
+        resource_deps = [], # node/hack/query/test/test_query.sh handles waiting for guardian, not having deps gets the build earlier
+    )
 
 if terra_classic:
     docker_build(
@@ -893,3 +900,16 @@ if aptos:
         labels = ["aptos"],
         trigger_mode = trigger_mode,
     )
+
+if query_server:
+    k8s_yaml_with_ns("devnet/query-server.yaml")
+
+    k8s_resource(
+        "query-server",
+        resource_deps = ["guardian"],
+        port_forwards = [
+            port_forward(6069, name = "REST [:6069]", host = webHost)
+        ],
+        labels = ["query-server"],
+        trigger_mode = trigger_mode
+    )

+ 11 - 1
devnet/node.yaml

@@ -7,6 +7,9 @@ metadata:
     app: guardian
 spec:
   ports:
+    - port: 8996
+      name: ccq-p2p
+      protocol: UDP
     - port: 8999
       name: p2p
       protocol: UDP
@@ -160,7 +163,11 @@ spec:
             - /tmp/data
             - --publicRpcLogDetail
             - "full"
-          # - --chainGovernorEnabled=true
+            # - --chainGovernorEnabled=true
+            - --ccqEnabled=true
+            - --ccqAllowedRequesters=beFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBe,25021A4FCAf61F2EADC8202D3833Df48B2Fa0D54
+            - --ccqAllowedPeers=12D3KooWSnju8zhywCYVi2JwTqky1sySPnmtYLsHHzc4WerMnDQH
+            # - --logLevel=debug
           securityContext:
             capabilities:
               add:
@@ -171,6 +178,9 @@ spec:
               port: 6060
               path: /readyz
           ports:
+            - containerPort: 8996
+              name: ccq-p2p
+              protocol: UDP
             - containerPort: 8999
               name: p2p
               protocol: UDP

+ 3 - 3
devnet/query-server.yaml

@@ -36,13 +36,13 @@ spec:
             - --env
             - dev
             - --nodeKey
-            - node/cmd/ccq/ccq.p2p.key
+            - node/cmd/ccq/devnet.p2p.key
             - --signerKey
-            - node/cmd/ccq/ccq.signing.key
+            - node/cmd/ccq/devnet.signing.key
             - --listenAddr
             - "[::]:6069"
             - --permFile
-            - "node/cmd/ccq/devnet.config.json"
+            - "node/cmd/ccq/devnet.permissions.json"
             - --ethRPC
             - http://eth-devnet:8545
             - --ethContract

+ 25 - 0
devnet/tests.yaml

@@ -72,3 +72,28 @@ spec:
                 - "/app/accountant/success"
             initialDelaySeconds: 5
             periodSeconds: 5
+---
+kind: Job
+apiVersion: batch/v1
+metadata:
+  name: query-ci-tests
+spec:
+  backoffLimit: 0
+  template:
+    spec:
+      restartPolicy: Never
+      containers:
+        - name: query-ci-tests
+          image: guardiand-image
+          command:
+            - /bin/sh
+            - -c
+            - "cd /app/node/hack/query/test && bash test_query.sh && touch success"
+          readinessProbe:
+            exec:
+              command:
+                - test
+                - -e
+                - "/app/node/hack/query/test/success"
+            initialDelaySeconds: 5
+            periodSeconds: 5

+ 0 - 0
node/cmd/ccq/ccq.p2p.key → node/cmd/ccq/devnet.p2p.key


+ 0 - 0
node/cmd/ccq/devnet.config.json → node/cmd/ccq/devnet.permissions.json


+ 0 - 0
node/cmd/ccq/ccq.signing.key → node/cmd/ccq/devnet.signing.key


+ 8 - 0
node/hack/query/dev.guardian.key

@@ -0,0 +1,8 @@
+-----BEGIN WORMHOLE GUARDIAN PRIVATE KEY-----
+PublicKey: 0xbeFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBe
+Description: auto-generated deterministic devnet key
+
+CiDPsSMDoZzeWAu03XcWObDSa8aDU2RVcajP9RarLuEToBAB
+=VN/A
+-----END WORMHOLE GUARDIAN PRIVATE KEY-----
+

BIN
node/hack/query/querier.key


+ 357 - 0
node/hack/query/send_req.go

@@ -0,0 +1,357 @@
+// This tool can be used to send various queries to the p2p gossip network.
+// It is meant for testing purposes only.
+
+package main
+
+import (
+	"bytes"
+	"context"
+	"crypto/ecdsa"
+	"encoding/hex"
+	"fmt"
+	"math/big"
+	"strings"
+	"time"
+
+	"github.com/certusone/wormhole/node/hack/query/utils"
+	"github.com/certusone/wormhole/node/pkg/common"
+	"github.com/certusone/wormhole/node/pkg/p2p"
+	gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
+	"github.com/certusone/wormhole/node/pkg/query"
+	"github.com/ethereum/go-ethereum/accounts/abi"
+	"github.com/ethereum/go-ethereum/common/hexutil"
+	ethCrypto "github.com/ethereum/go-ethereum/crypto"
+	pubsub "github.com/libp2p/go-libp2p-pubsub"
+	"github.com/libp2p/go-libp2p/core/crypto"
+	"github.com/tendermint/tendermint/libs/rand"
+	"go.uber.org/zap"
+	"google.golang.org/protobuf/proto"
+)
+
+// this script has to be run inside kubernetes since it relies on UDP
+// https://github.com/kubernetes/kubernetes/issues/47862
+// kubectl --namespace=wormhole exec -it spy-0 -- sh -c "cd node/hack/query/ && go run send_req.go"
+// one way to iterate inside the container
+// kubectl --namespace=wormhole exec -it spy-0 -- bash
+// apt update
+// apt install nano
+// cd node/hack/query
+// echo "" > send_req.go
+// nano send_req.go
+// [paste, ^x, y, enter]
+// go run send_req.go
+
+func main() {
+
+	//
+	// BEGIN SETUP
+	//
+
+	p2pNetworkID := "/wormhole/dev"
+	var p2pPort uint = 8998 // don't collide with spy so we can run from the same container in tilt
+	p2pBootstrap := "/dns4/guardian-0.guardian/udp/8996/quic/p2p/12D3KooWL3XJ9EMCyZvmmGXL2LMiVBtrVa2BuESsJiXkSj7333Jw"
+	nodeKeyPath := "./querier.key"
+
+	ctx := context.Background()
+	logger, _ := zap.NewDevelopment()
+
+	signingKeyPath := string("./dev.guardian.key")
+
+	logger.Info("Loading signing key", zap.String("signingKeyPath", signingKeyPath))
+	sk, err := common.LoadGuardianKey(signingKeyPath, true)
+	if err != nil {
+		logger.Fatal("failed to load guardian key", zap.Error(err))
+	}
+	logger.Info("Signing key loaded", zap.String("publicKey", ethCrypto.PubkeyToAddress(sk.PublicKey).Hex()))
+
+	// Load p2p private key
+	var priv crypto.PrivKey
+	priv, err = common.GetOrCreateNodeKey(logger, nodeKeyPath)
+	if err != nil {
+		logger.Fatal("Failed to load node key", zap.Error(err))
+	}
+
+	// Manual p2p setup
+	components := p2p.DefaultComponents()
+	components.Port = p2pPort
+	bootstrapPeers := p2pBootstrap
+	networkID := p2pNetworkID
+
+	h, err := p2p.NewHost(logger, ctx, networkID, bootstrapPeers, components, priv)
+	if err != nil {
+		panic(err)
+	}
+
+	topic_req := fmt.Sprintf("%s/%s", networkID, "ccq_req")
+	topic_resp := fmt.Sprintf("%s/%s", networkID, "ccq_resp")
+
+	logger.Info("Subscribing pubsub topic", zap.String("topic_req", topic_req), zap.String("topic_resp", topic_resp))
+	ps, err := pubsub.NewGossipSub(ctx, h)
+	if err != nil {
+		panic(err)
+	}
+
+	th_req, err := ps.Join(topic_req)
+	if err != nil {
+		logger.Panic("failed to join request topic", zap.String("topic_req", topic_req), zap.Error(err))
+	}
+
+	th_resp, err := ps.Join(topic_resp)
+	if err != nil {
+		logger.Panic("failed to join response topic", zap.String("topic_resp", topic_resp), zap.Error(err))
+	}
+
+	sub, err := th_resp.Subscribe()
+	if err != nil {
+		logger.Panic("failed to subscribe to response topic", zap.Error(err))
+	}
+
+	logger.Info("Node has been started", zap.String("peer_id", h.ID().String()),
+		zap.String("addrs", fmt.Sprintf("%v", h.Addrs())))
+	// Wait for peers
+	for len(th_req.ListPeers()) < 1 {
+		time.Sleep(time.Millisecond * 100)
+	}
+
+	//
+	// END SETUP
+	//
+
+	wethAbi, err := abi.JSON(strings.NewReader("[{\"constant\":true,\"inputs\":[],\"name\":\"name\",\"outputs\":[{\"name\":\"\",\"type\":\"string\"}],\"payable\":false,\"stateMutability\":\"view\",\"type\":\"function\"},{\"constant\":true,\"inputs\":[],\"name\":\"totalSupply\",\"outputs\":[{\"name\":\"\",\"type\":\"uint256\"}],\"payable\":false,\"stateMutability\":\"view\",\"type\":\"function\"}]"))
+	if err != nil {
+		panic(err)
+	}
+
+	methods := []string{"name", "totalSupply"}
+	callData := []*query.EthCallData{}
+	to, _ := hex.DecodeString("DDb64fE46a91D46ee29420539FC25FD07c5FEa3E")
+
+	for _, method := range methods {
+		data, err := wethAbi.Pack(method)
+		if err != nil {
+			panic(err)
+		}
+
+		callData = append(callData, &query.EthCallData{
+			To:   to,
+			Data: data,
+		})
+	}
+
+	// Fetch the latest block number
+	//url := "https://localhost:8545"
+	url := "http://eth-devnet:8545"
+	logger.Info("Querying for latest block height", zap.String("url", url))
+	blockNum, err := utils.FetchLatestBlockNumberFromUrl(ctx, url)
+	if err != nil {
+		logger.Fatal("Failed to fetch latest block number", zap.Error(err))
+	}
+
+	logger.Info("latest block", zap.String("num", blockNum.String()), zap.String("encoded", hexutil.EncodeBig(blockNum)))
+
+	// block := "0x28d9630"
+	// block := "latest"
+	// block := "0x9999bac44d09a7f69ee7941819b0a19c59ccb1969640cc513be09ef95ed2d8e2"
+
+	// Start of query creation...
+	callRequest := &query.EthCallQueryRequest{
+		BlockId:  hexutil.EncodeBig(blockNum),
+		CallData: callData,
+	}
+
+	// Send 2 individual requests for the same thing but 5 blocks apart
+	// First request...
+	logger.Info("calling sendQueryAndGetRsp for ", zap.String("blockNum", blockNum.String()))
+	queryRequest := createQueryRequest(callRequest)
+	sendQueryAndGetRsp(queryRequest, sk, th_req, ctx, logger, sub, wethAbi, methods)
+
+	// This is just so that when I look at the output, it is easier for me. (Paul)
+	logger.Info("sleeping for 5 seconds")
+	time.Sleep(time.Second * 5)
+
+	// Second request...
+	blockNum = blockNum.Sub(blockNum, big.NewInt(5))
+	callRequest2 := &query.EthCallQueryRequest{
+		BlockId:  hexutil.EncodeBig(blockNum),
+		CallData: callData,
+	}
+	queryRequest2 := createQueryRequest(callRequest2)
+	logger.Info("calling sendQueryAndGetRsp for ", zap.String("blockNum", blockNum.String()))
+	sendQueryAndGetRsp(queryRequest2, sk, th_req, ctx, logger, sub, wethAbi, methods)
+
+	// Now, want to send a single query with multiple requests...
+	logger.Info("Starting multiquery test in 5...")
+	time.Sleep(time.Second * 5)
+	multiCallRequest := []*query.EthCallQueryRequest{callRequest, callRequest2}
+	multQueryRequest := createQueryRequestWithMultipleRequests(multiCallRequest)
+	sendQueryAndGetRsp(multQueryRequest, sk, th_req, ctx, logger, sub, wethAbi, methods)
+
+	// Cleanly shutdown
+	// Without this the same host won't properly discover peers until some timeout
+	sub.Cancel()
+	if err := th_req.Close(); err != nil {
+		logger.Fatal("Error closing the request topic", zap.Error(err))
+	}
+	if err := th_resp.Close(); err != nil {
+		logger.Fatal("Error closing the response topic", zap.Error(err))
+	}
+	if err := h.Close(); err != nil {
+		logger.Fatal("Error closing the host", zap.Error(err))
+	}
+
+	//
+	// END SHUTDOWN
+	//
+
+	logger.Info("Success! All tests passed!")
+}
+
+const (
+	GuardianKeyArmoredBlock = "WORMHOLE GUARDIAN PRIVATE KEY"
+)
+
+func createQueryRequest(callRequest *query.EthCallQueryRequest) *query.QueryRequest {
+	queryRequest := &query.QueryRequest{
+		Nonce: rand.Uint32(),
+		PerChainQueries: []*query.PerChainQueryRequest{
+			{
+				ChainId: 2,
+				Query:   callRequest,
+			},
+		},
+	}
+	return queryRequest
+}
+
+func createQueryRequestWithMultipleRequests(callRequests []*query.EthCallQueryRequest) *query.QueryRequest {
+	perChainQueries := []*query.PerChainQueryRequest{}
+	for _, req := range callRequests {
+		perChainQueries = append(perChainQueries, &query.PerChainQueryRequest{
+			ChainId: 2,
+			Query:   req,
+		})
+	}
+
+	queryRequest := &query.QueryRequest{
+		Nonce:           rand.Uint32(),
+		PerChainQueries: perChainQueries,
+	}
+	return queryRequest
+}
+
+func sendQueryAndGetRsp(queryRequest *query.QueryRequest, sk *ecdsa.PrivateKey, th *pubsub.Topic, ctx context.Context, logger *zap.Logger, sub *pubsub.Subscription, wethAbi abi.ABI, methods []string) {
+	queryRequestBytes, err := queryRequest.Marshal()
+	if err != nil {
+		panic(err)
+	}
+	numQueries := len(queryRequest.PerChainQueries)
+
+	// Sign the query request using our private key.
+	digest := query.QueryRequestDigest(common.UnsafeDevNet, queryRequestBytes)
+	sig, err := ethCrypto.Sign(digest.Bytes(), sk)
+	if err != nil {
+		panic(err)
+	}
+
+	signedQueryRequest := &gossipv1.SignedQueryRequest{
+		QueryRequest: queryRequestBytes,
+		Signature:    sig,
+	}
+
+	msg := gossipv1.GossipMessage{
+		Message: &gossipv1.GossipMessage_SignedQueryRequest{
+			SignedQueryRequest: signedQueryRequest,
+		},
+	}
+
+	b, err := proto.Marshal(&msg)
+	if err != nil {
+		panic(err)
+	}
+
+	err = th.Publish(ctx, b)
+	if err != nil {
+		panic(err)
+	}
+
+	logger.Info("Waiting for message...")
+	// TODO: max wait time
+	// TODO: accumulate signatures to reach quorum
+	for {
+		envelope, err := sub.Next(ctx)
+		if err != nil {
+			logger.Panic("failed to receive pubsub message", zap.Error(err))
+		}
+		var msg gossipv1.GossipMessage
+		err = proto.Unmarshal(envelope.Data, &msg)
+		if err != nil {
+			logger.Info("received invalid message",
+				zap.Binary("data", envelope.Data),
+				zap.String("from", envelope.GetFrom().String()))
+			continue
+		}
+		var isMatchingResponse bool
+		switch m := msg.Message.(type) {
+		case *gossipv1.GossipMessage_SignedQueryResponse:
+			logger.Info("query response received", zap.Any("response", m.SignedQueryResponse),
+				zap.String("responseBytes", hexutil.Encode(m.SignedQueryResponse.QueryResponse)),
+				zap.String("sigBytes", hexutil.Encode(m.SignedQueryResponse.Signature)))
+			var response query.QueryResponsePublication
+			err := response.Unmarshal(m.SignedQueryResponse.QueryResponse)
+			if err != nil {
+				logger.Warn("failed to unmarshal response", zap.Error(err))
+				break
+			}
+			if bytes.Equal(response.Request.QueryRequest, queryRequestBytes) && bytes.Equal(response.Request.Signature, sig) {
+				// TODO: verify response signature
+				isMatchingResponse = true
+
+				if len(response.PerChainResponses) != numQueries {
+					logger.Warn("unexpected number of per chain query responses", zap.Int("expectedNum", numQueries), zap.Int("actualNum", len(response.PerChainResponses)))
+					break
+				}
+				// Do double loop over responses
+				for index := range response.PerChainResponses {
+					logger.Info("per chain query response index", zap.Int("index", index))
+
+					var localCallData []*query.EthCallData
+					switch ecq := queryRequest.PerChainQueries[index].Query.(type) {
+					case *query.EthCallQueryRequest:
+						localCallData = ecq.CallData
+					default:
+						panic("unsupported query type")
+					}
+
+					var localResp *query.EthCallQueryResponse
+					switch ecq := response.PerChainResponses[index].Response.(type) {
+					case *query.EthCallQueryResponse:
+						localResp = ecq
+					default:
+						panic("unsupported query type")
+					}
+
+					if len(localResp.Results) != len(localCallData) {
+						logger.Warn("unexpected number of results", zap.Int("expectedNum", len(localCallData)), zap.Int("expectedNum", len(localResp.Results)))
+						break
+					}
+
+					for idx, resp := range localResp.Results {
+						result, err := wethAbi.Methods[methods[idx]].Outputs.Unpack(resp)
+						if err != nil {
+							logger.Warn("failed to unpack result", zap.Error(err))
+							break
+						}
+
+						resultStr := hexutil.Encode(resp)
+						logger.Info("found matching response", zap.Int("idx", idx), zap.Uint64("number", localResp.BlockNumber), zap.String("hash", localResp.Hash.String()), zap.String("time", localResp.Time.String()), zap.String("method", methods[idx]), zap.Any("resultDecoded", result), zap.String("resultStr", resultStr))
+					}
+				}
+			}
+		default:
+			continue
+		}
+		if isMatchingResponse {
+			break
+		}
+	}
+}

+ 297 - 0
node/hack/query/test/query_test.go

@@ -0,0 +1,297 @@
+package query_test
+
+import (
+	"bytes"
+	"context"
+	"encoding/hex"
+	"fmt"
+	"os"
+	"strings"
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+	"github.com/wormhole-foundation/wormhole/sdk/vaa"
+
+	"github.com/certusone/wormhole/node/hack/query/utils"
+	"github.com/certusone/wormhole/node/pkg/common"
+	"github.com/certusone/wormhole/node/pkg/p2p"
+	gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
+	"github.com/certusone/wormhole/node/pkg/query"
+	"github.com/ethereum/go-ethereum/accounts/abi"
+	ethCommon "github.com/ethereum/go-ethereum/common"
+	"github.com/ethereum/go-ethereum/common/hexutil"
+	ethCrypto "github.com/ethereum/go-ethereum/crypto"
+	pubsub "github.com/libp2p/go-libp2p-pubsub"
+	"github.com/libp2p/go-libp2p/core/crypto"
+	"go.uber.org/zap"
+	"google.golang.org/protobuf/proto"
+)
+
+func TestCrossChainQuery(t *testing.T) {
+	if os.Getenv("INTEGRATION") == "" {
+		t.Skip("Skipping integration test, set environment variable INTEGRATION")
+	}
+
+	p2pNetworkID := "/wormhole/dev"
+	var p2pPort uint = 8997
+	p2pBootstrap := "/dns4/guardian-0.guardian/udp/8996/quic/p2p/12D3KooWL3XJ9EMCyZvmmGXL2LMiVBtrVa2BuESsJiXkSj7333Jw"
+	nodeKeyPath := "../querier.key"
+
+	ctx := context.Background()
+	logger, _ := zap.NewDevelopment()
+
+	signingKeyPath := string("../dev.guardian.key")
+
+	logger.Info("Loading signing key", zap.String("signingKeyPath", signingKeyPath))
+	sk, err := common.LoadGuardianKey(signingKeyPath, true)
+	if err != nil {
+		logger.Fatal("failed to load guardian key", zap.Error(err))
+	}
+	logger.Info("Signing key loaded", zap.String("publicKey", ethCrypto.PubkeyToAddress(sk.PublicKey).Hex()))
+
+	// Fetch the current guardian set
+	idx, sgs, err := utils.FetchCurrentGuardianSet(common.GoTest)
+	if err != nil {
+		logger.Fatal("Failed to fetch current guardian set", zap.Error(err))
+	}
+	logger.Info("Fetched guardian set", zap.Any("keys", sgs.Keys))
+	gs := common.GuardianSet{
+		Keys:  sgs.Keys,
+		Index: idx,
+	}
+
+	// Fetch the latest block number
+	blockNum, err := utils.FetchLatestBlockNumber(ctx, common.GoTest)
+	if err != nil {
+		logger.Fatal("Failed to fetch latest block number", zap.Error(err))
+	}
+
+	// Load p2p private key
+	var priv crypto.PrivKey
+	priv, err = common.GetOrCreateNodeKey(logger, nodeKeyPath)
+	if err != nil {
+		logger.Fatal("Failed to load node key", zap.Error(err))
+	}
+
+	// Manual p2p setup
+	components := p2p.DefaultComponents()
+	components.Port = p2pPort
+	bootstrapPeers := p2pBootstrap
+	networkID := p2pNetworkID
+
+	h, err := p2p.NewHost(logger, ctx, networkID, bootstrapPeers, components, priv)
+	if err != nil {
+		panic(err)
+	}
+
+	topic_req := fmt.Sprintf("%s/%s", networkID, "ccq_req")
+	topic_resp := fmt.Sprintf("%s/%s", networkID, "ccq_resp")
+
+	logger.Info("Subscribing pubsub topic", zap.String("topic_req", topic_req), zap.String("topic_resp", topic_resp))
+	ps, err := pubsub.NewGossipSub(ctx, h)
+	if err != nil {
+		panic(err)
+	}
+
+	th_req, err := ps.Join(topic_req)
+	if err != nil {
+		logger.Panic("failed to join request topic", zap.String("topic_req", topic_req), zap.Error(err))
+	}
+
+	th_resp, err := ps.Join(topic_resp)
+	if err != nil {
+		logger.Panic("failed to join response topic", zap.String("topic_resp", topic_resp), zap.Error(err))
+	}
+
+	sub, err := th_resp.Subscribe()
+	if err != nil {
+		logger.Panic("failed to subscribe to response topic", zap.Error(err))
+	}
+
+	logger.Info("Node has been started", zap.String("peer_id", h.ID().String()),
+		zap.String("addrs", fmt.Sprintf("%v", h.Addrs())))
+
+	// Wait for peers
+	for len(th_req.ListPeers()) < 1 {
+		time.Sleep(time.Millisecond * 100)
+	}
+
+	logger.Info("Detected peers")
+
+	wethAbi, err := abi.JSON(strings.NewReader("[{\"constant\":true,\"inputs\":[],\"name\":\"name\",\"outputs\":[{\"name\":\"\",\"type\":\"string\"}],\"payable\":false,\"stateMutability\":\"view\",\"type\":\"function\"},{\"constant\":true,\"inputs\":[],\"name\":\"totalSupply\",\"outputs\":[{\"name\":\"\",\"type\":\"uint256\"}],\"payable\":false,\"stateMutability\":\"view\",\"type\":\"function\"}]"))
+	if err != nil {
+		panic(err)
+	}
+
+	methodName := "name"
+	data, err := wethAbi.Pack(methodName)
+	if err != nil {
+		panic(err)
+	}
+	to, _ := hex.DecodeString("DDb64fE46a91D46ee29420539FC25FD07c5FEa3E") // WETH
+
+	callData := []*query.EthCallData{
+		{
+			To:   to,
+			Data: data,
+		},
+	}
+
+	callRequest := &query.EthCallQueryRequest{
+		BlockId:  hexutil.EncodeBig(blockNum),
+		CallData: callData,
+	}
+
+	queryRequest := &query.QueryRequest{
+		Nonce: 1,
+		PerChainQueries: []*query.PerChainQueryRequest{
+			{
+				ChainId: 2,
+				Query:   callRequest,
+			},
+		},
+	}
+
+	queryRequestBytes, err := queryRequest.Marshal()
+	if err != nil {
+		panic(err)
+	}
+
+	// Sign the query request using our private key.
+	digest := query.QueryRequestDigest(common.UnsafeDevNet, queryRequestBytes)
+	sig, err := ethCrypto.Sign(digest.Bytes(), sk)
+	if err != nil {
+		panic(err)
+	}
+
+	signedQueryRequest := &gossipv1.SignedQueryRequest{
+		QueryRequest: queryRequestBytes,
+		Signature:    sig,
+	}
+
+	msg := gossipv1.GossipMessage{
+		Message: &gossipv1.GossipMessage_SignedQueryRequest{
+			SignedQueryRequest: signedQueryRequest,
+		},
+	}
+
+	b, err := proto.Marshal(&msg)
+	if err != nil {
+		panic(err)
+	}
+
+	err = th_req.Publish(ctx, b)
+	if err != nil {
+		panic(err)
+	}
+
+	logger.Info("Waiting for message...")
+	var success bool
+	signers := map[int]bool{}
+	subCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
+	defer cancel()
+	for {
+		envelope, err := sub.Next(subCtx)
+		if err != nil {
+			break
+		}
+		var msg gossipv1.GossipMessage
+		err = proto.Unmarshal(envelope.Data, &msg)
+		if err != nil {
+			logger.Fatal("received invalid message",
+				zap.Binary("data", envelope.Data),
+				zap.String("from", envelope.GetFrom().String()))
+		}
+		switch m := msg.Message.(type) {
+		case *gossipv1.GossipMessage_SignedQueryResponse:
+			logger.Info("query response received", zap.Any("response", m.SignedQueryResponse))
+			var response query.QueryResponsePublication
+			err := response.Unmarshal(m.SignedQueryResponse.QueryResponse)
+			if err != nil {
+				logger.Fatal("failed to unmarshal response", zap.Error(err))
+			}
+			if bytes.Equal(response.Request.QueryRequest, queryRequestBytes) && bytes.Equal(response.Request.Signature, sig) {
+				digest := query.GetQueryResponseDigestFromBytes(m.SignedQueryResponse.QueryResponse)
+				signerBytes, err := ethCrypto.Ecrecover(digest.Bytes(), m.SignedQueryResponse.Signature)
+				if err != nil {
+					logger.Fatal("failed to verify signature on response",
+						zap.String("digest", digest.Hex()),
+						zap.String("signature", hex.EncodeToString(m.SignedQueryResponse.Signature)),
+						zap.Error(err))
+				}
+				signerAddress := ethCommon.BytesToAddress(ethCrypto.Keccak256(signerBytes[1:])[12:])
+				if keyIdx, ok := gs.KeyIndex(signerAddress); !ok {
+					logger.Fatal("received observation by unknown guardian - is our guardian set outdated?",
+						zap.String("digest", digest.Hex()),
+						zap.String("address", signerAddress.Hex()),
+						zap.Uint32("index", gs.Index),
+						zap.Any("keys", gs.KeysAsHexStrings()),
+					)
+				} else {
+					signers[keyIdx] = true
+				}
+				quorum := vaa.CalculateQuorum(len(gs.Keys))
+				if len(signers) < quorum {
+					logger.Sugar().Infof("not enough signers, have %d need %d", len(signers), quorum)
+					continue
+				}
+
+				if len(response.PerChainResponses) != 1 {
+					logger.Warn("unexpected number of per chain query responses", zap.Int("expectedNum", 1), zap.Int("actualNum", len(response.PerChainResponses)))
+					break
+				}
+
+				var pcq *query.EthCallQueryResponse
+				switch ecq := response.PerChainResponses[0].Response.(type) {
+				case *query.EthCallQueryResponse:
+					pcq = ecq
+				default:
+					panic("unsupported query type")
+				}
+
+				if len(pcq.Results) == 0 {
+					logger.Warn("response did not contain any results", zap.Error(err))
+					break
+				}
+
+				for idx, resp := range pcq.Results {
+					result, err := wethAbi.Methods[methodName].Outputs.Unpack(resp)
+					if err != nil {
+						logger.Warn("failed to unpack result", zap.Error(err))
+						break
+					}
+
+					resultStr := hexutil.Encode(resp)
+					logger.Info("found matching response", zap.Int("idx", idx), zap.Uint64("number", pcq.BlockNumber), zap.String("hash", pcq.Hash.String()), zap.String("time", pcq.Time.String()), zap.Any("resultDecoded", result), zap.String("resultStr", resultStr))
+				}
+
+				success = true
+			}
+		default:
+			continue
+		}
+		if success {
+			break
+		}
+	}
+
+	assert.True(t, success)
+
+	// Cleanly shutdown
+	// Without this the same host won't properly discover peers until some timeout
+	sub.Cancel()
+	if err := th_req.Close(); err != nil {
+		logger.Fatal("Error closing the request topic", zap.Error(err))
+	}
+	if err := th_resp.Close(); err != nil {
+		logger.Fatal("Error closing the response topic", zap.Error(err))
+	}
+	if err := h.Close(); err != nil {
+		logger.Error("Error closing the host", zap.Error(err))
+	}
+}
+
+const (
+	GuardianKeyArmoredBlock = "WORMHOLE GUARDIAN PRIVATE KEY"
+)

+ 4 - 0
node/hack/query/test/test_query.sh

@@ -0,0 +1,4 @@
+#!/bin/sh
+set -e
+while [[ "$(curl -s -o /dev/null -w ''%{http_code}'' guardian:6060/readyz)" != "200" ]]; do sleep 5; done
+INTEGRATION=true go test -v .

+ 96 - 0
node/hack/query/utils/fetchCurrentGuardianSet.go

@@ -0,0 +1,96 @@
+package utils
+
+import (
+	"context"
+	"fmt"
+	"math/big"
+	"time"
+
+	"github.com/certusone/wormhole/node/pkg/common"
+	"github.com/certusone/wormhole/node/pkg/watchers/evm/connectors/ethabi"
+	ethAbi "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors/ethabi"
+	ethBind "github.com/ethereum/go-ethereum/accounts/abi/bind"
+	eth_common "github.com/ethereum/go-ethereum/common"
+	ethClient "github.com/ethereum/go-ethereum/ethclient"
+	ethRpc "github.com/ethereum/go-ethereum/rpc"
+)
+
+func GetRpcUrl(network common.Environment) string {
+	switch network {
+	case common.MainNet:
+		return "https://rpc.ankr.com/eth"
+	case common.TestNet:
+		return "https://rpc.ankr.com/eth_goerli"
+	case common.UnsafeDevNet:
+		return "http://localhost:8545"
+	case common.GoTest:
+		return "http://eth-devnet:8545"
+	default:
+		return ""
+	}
+}
+
+func FetchLatestBlockNumber(ctx context.Context, network common.Environment) (*big.Int, error) {
+	rawUrl := GetRpcUrl(network)
+	if rawUrl == "" {
+		return nil, fmt.Errorf("unable to get rpc url")
+	}
+	return FetchLatestBlockNumberFromUrl(ctx, rawUrl)
+}
+
+func FetchLatestBlockNumberFromUrl(ctx context.Context, rawUrl string) (*big.Int, error) {
+	rawClient, err := ethRpc.DialContext(ctx, rawUrl)
+	if err != nil {
+		return nil, fmt.Errorf("unable to dial eth context: %w", err)
+	}
+	client := ethClient.NewClient(rawClient)
+	header, err := client.HeaderByNumber(ctx, nil)
+	if err != nil {
+		return nil, fmt.Errorf("unable to fetch latest header: %w", err)
+	}
+	return header.Number, nil
+}
+
+func FetchCurrentGuardianSet(network common.Environment) (uint32, *ethabi.StructsGuardianSet, error) {
+	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
+	defer cancel()
+
+	rawUrl := GetRpcUrl(network)
+	if rawUrl == "" {
+		return 0, nil, fmt.Errorf("unable to get rpc url")
+	}
+	var ethContract string
+	switch network {
+	case common.MainNet:
+		ethContract = "0x98f3c9e6E3fAce36bAAd05FE09d375Ef1464288B"
+	case common.TestNet:
+		ethContract = "0x706abc4E45D419950511e474C7B9Ed348A4a716c"
+	case common.UnsafeDevNet:
+	case common.GoTest:
+		ethContract = "0xC89Ce4735882C9F0f0FE26686c53074E09B0D550"
+	default:
+		return 0, nil, fmt.Errorf("unable to fetch guardian set for unknown network %s", network)
+	}
+
+	contract := eth_common.HexToAddress(ethContract)
+	rawClient, err := ethRpc.DialContext(ctx, rawUrl)
+	if err != nil {
+		return 0, nil, fmt.Errorf("failed to connect to ethereum")
+	}
+	client := ethClient.NewClient(rawClient)
+	caller, err := ethAbi.NewAbiCaller(contract, client)
+	if err != nil {
+		return 0, nil, fmt.Errorf("failed to create caller")
+	}
+	currentIndex, err := caller.GetCurrentGuardianSetIndex(&ethBind.CallOpts{Context: ctx})
+	if err != nil {
+		return 0, nil, fmt.Errorf("error requesting current guardian set index: %w", err)
+	}
+
+	gs, err := caller.GetGuardianSet(&ethBind.CallOpts{Context: ctx}, currentIndex)
+	if err != nil {
+		return 0, nil, fmt.Errorf("error requesting current guardian set value: %w", err)
+	}
+
+	return currentIndex, &gs, nil
+}