Prechádzať zdrojové kódy

node: add FindMissingMessages admin RPC method

Change-Id: I57c1227c1a591e10f5e77b3553216915df247d65
Leo 4 rokov pred
rodič
commit
c253f769fa

+ 43 - 0
node/cmd/guardiand/adminclient.go

@@ -7,6 +7,7 @@ import (
 	"github.com/spf13/pflag"
 	"io/ioutil"
 	"log"
+	"strconv"
 	"time"
 
 	"github.com/spf13/cobra"
@@ -31,9 +32,11 @@ func init() {
 	}
 
 	AdminClientInjectGuardianSetUpdateCmd.Flags().AddFlagSet(pf)
+	AdminClientFindMissingMessagesCmd.Flags().AddFlagSet(pf)
 	AdminClientListNodes.Flags().AddFlagSet(pf)
 
 	AdminCmd.AddCommand(AdminClientInjectGuardianSetUpdateCmd)
+	AdminCmd.AddCommand(AdminClientFindMissingMessagesCmd)
 	AdminCmd.AddCommand(AdminClientGovernanceVAAVerifyCmd)
 	AdminCmd.AddCommand(AdminClientListNodes)
 }
@@ -50,6 +53,13 @@ var AdminClientInjectGuardianSetUpdateCmd = &cobra.Command{
 	Args:  cobra.ExactArgs(1),
 }
 
+var AdminClientFindMissingMessagesCmd = &cobra.Command{
+	Use:   "find-missing-messages [CHAIN_ID] [EMITTER_ADDRESS_HEX]",
+	Short: "Find sequence number gaps for the given chain ID and emitter address",
+	Run:   runFindMissingMessages,
+	Args:  cobra.ExactArgs(2),
+}
+
 func getAdminClient(ctx context.Context, addr string) (*grpc.ClientConn, error, nodev1.NodePrivilegedServiceClient) {
 	conn, err := grpc.DialContext(ctx, fmt.Sprintf("unix:///%s", addr), grpc.WithInsecure())
 
@@ -101,3 +111,36 @@ func runInjectGovernanceVAA(cmd *cobra.Command, args []string) {
 
 	log.Printf("VAA successfully injected with digest %s", hexutils.BytesToHex(resp.Digest))
 }
+
+func runFindMissingMessages(cmd *cobra.Command, args []string) {
+	chainID, err := strconv.Atoi(args[0])
+	if err != nil {
+		log.Fatalf("invalid chain ID: %v", err)
+	}
+	emitterAddress := args[1]
+
+	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+	defer cancel()
+
+	conn, err, c := getAdminClient(ctx, *clientSocketPath)
+	defer conn.Close()
+	if err != nil {
+		log.Fatalf("failed to get admin client: %v", err)
+	}
+
+	msg := nodev1.FindMissingMessagesRequest{
+		EmitterChain:   uint32(chainID),
+		EmitterAddress: emitterAddress,
+	}
+	resp, err := c.FindMissingMessages(ctx, &msg)
+	if err != nil {
+		log.Fatalf("failed to run find FindMissingMessages RPC: %v", err)
+	}
+
+	for _, id := range resp.MissingMessages {
+		fmt.Println(id)
+	}
+
+	log.Printf("processed %s sequences %d to %d (%d gaps)",
+		emitterAddress, resp.FirstSequence, resp.LastSequence, len(resp.MissingMessages))
+}

+ 29 - 0
node/cmd/guardiand/adminserver.go

@@ -29,6 +29,7 @@ import (
 
 type nodePrivilegedService struct {
 	nodev1.UnimplementedNodePrivilegedServiceServer
+	db      *db.Database
 	injectC chan<- *vaa.VAA
 	logger  *zap.Logger
 }
@@ -158,6 +159,33 @@ func (s *nodePrivilegedService) InjectGovernanceVAA(ctx context.Context, req *no
 	return &nodev1.InjectGovernanceVAAResponse{Digest: digest.Bytes()}, nil
 }
 
+func (s *nodePrivilegedService) FindMissingMessages(ctx context.Context, req *nodev1.FindMissingMessagesRequest) (*nodev1.FindMissingMessagesResponse, error) {
+	b, err := hex.DecodeString(req.EmitterAddress)
+	if err != nil {
+		return nil, status.Errorf(codes.InvalidArgument, "invalid emitter address encoding: %v", err)
+	}
+	emitterAddress := vaa.Address{}
+	copy(emitterAddress[:], b)
+
+	ids, first, last, err := s.db.FindEmitterSequenceGap(db.VAAID{
+		EmitterChain:   vaa.ChainID(req.EmitterChain),
+		EmitterAddress: emitterAddress,
+	})
+	if err != nil {
+		return nil, status.Errorf(codes.Internal, "database operation failed: %v", err)
+	}
+
+	resp := make([]string, len(ids))
+	for i, v := range ids {
+		resp[i] = fmt.Sprintf("%d/%s/%d", req.EmitterChain, emitterAddress, v)
+	}
+	return &nodev1.FindMissingMessagesResponse{
+		MissingMessages: resp,
+		FirstSequence:   first,
+		LastSequence:    last,
+	}, nil
+}
+
 func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- *vaa.VAA, db *db.Database, gst *common.GuardianSetState) (supervisor.Runnable, error) {
 	// Delete existing UNIX socket, if present.
 	fi, err := os.Stat(socketPath)
@@ -192,6 +220,7 @@ func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<-
 
 	nodeService := &nodePrivilegedService{
 		injectC: injectC,
+		db:      db,
 		logger:  logger.Named("adminservice"),
 	}
 

+ 61 - 0
node/pkg/db/db.go

@@ -33,6 +33,10 @@ func (i *VAAID) Bytes() []byte {
 	return []byte(fmt.Sprintf("signed/%d/%s/%d", i.EmitterChain, i.EmitterAddress, i.Sequence))
 }
 
+func (i *VAAID) EmitterPrefixBytes() []byte {
+	return []byte(fmt.Sprintf("signed/%d/%s", i.EmitterChain, i.EmitterAddress))
+}
+
 func Open(path string) (*Database, error) {
 	db, err := badger.Open(badger.DefaultOptions(path))
 	if err != nil {
@@ -90,3 +94,60 @@ func (d *Database) GetSignedVAABytes(id VAAID) (b []byte, err error) {
 	}
 	return
 }
+
+func (d *Database) FindEmitterSequenceGap(prefix VAAID) (resp []uint64, firstSeq uint64, lastSeq uint64, err error) {
+	resp = make([]uint64, 0)
+	if err = d.db.View(func(txn *badger.Txn) error {
+		it := txn.NewIterator(badger.DefaultIteratorOptions)
+		defer it.Close()
+		prefix := prefix.EmitterPrefixBytes()
+
+		// Find all sequence numbers (the message IDs are ordered lexicographically,
+		// rather than numerically, so we need to sort them in-memory).
+		seqs := make(map[uint64]bool)
+		for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
+			item := it.Item()
+			key := item.Key()
+			err := item.Value(func(val []byte) error {
+				v, err := vaa.Unmarshal(val)
+				if err != nil {
+					return fmt.Errorf("failed to unmarshal VAA for %s: %v", string(key), err)
+				}
+
+				seqs[v.Sequence] = true
+				return nil
+			})
+			if err != nil {
+				return err
+			}
+		}
+
+		// Find min/max (yay lack of Go generics)
+		first := false
+		for k := range seqs {
+			if first {
+				firstSeq = k
+				first = false
+			}
+			if k < firstSeq {
+				firstSeq = k
+			}
+			if k > lastSeq {
+				lastSeq = k
+			}
+		}
+
+		// Figure out gaps.
+		for i := firstSeq; i <= lastSeq; i++ {
+			if !seqs[i] {
+				fmt.Printf("missing: %d\n", i)
+				resp = append(resp, i)
+			}
+		}
+
+		return nil
+	}); err != nil {
+		return
+	}
+	return
+}

+ 23 - 0
proto/node/v1/node.proto

@@ -14,6 +14,13 @@ service NodePrivilegedService {
   // VAA timeout window for it to reach consensus.
   //
   rpc InjectGovernanceVAA (InjectGovernanceVAARequest) returns (InjectGovernanceVAAResponse);
+
+  // FindMissingMessages will detect message sequence gaps in the local VAA store for a
+  // specific emitter chain and address. Start and end slots are the lowest and highest
+  // sequence numbers available in the local store, respectively.
+  //
+  // An error is returned if more than 1000 gaps are found.
+  rpc FindMissingMessages (FindMissingMessagesRequest) returns (FindMissingMessagesResponse);
 }
 
 message InjectGovernanceVAARequest {
@@ -89,3 +96,19 @@ message ContractUpgrade {
   // Address of the new program/contract.
   bytes new_contract = 2;
 }
+
+message FindMissingMessagesRequest {
+  // Emitter chain ID to iterate.
+  uint32 emitter_chain = 1;
+  // Hex-encoded (without leading 0x) emitter address to iterate.
+  string emitter_address = 2;
+}
+
+message FindMissingMessagesResponse {
+  // List of missing sequence numbers.
+  repeated string missing_messages = 1;
+
+  // Range processed
+  uint64 first_sequence = 2;
+  uint64 last_sequence = 3;
+}