Ver Fonte

node: store heartbeats for multiple nodes per guardian

It's safe to break the proto API at this point.

Change-Id: I235100c5fef3abc9259d28f68d9bb7bf2be0ae5e
Leo há 4 anos atrás
pai
commit
c7662d611e

+ 19 - 16
bridge/cmd/guardiand/adminnodes.go

@@ -3,7 +3,6 @@ package guardiand
 import (
 	"context"
 	"fmt"
-	gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
 	publicrpcv1 "github.com/certusone/wormhole/bridge/pkg/proto/publicrpc/v1"
 	"github.com/certusone/wormhole/bridge/pkg/vaa"
 	"github.com/spf13/cobra"
@@ -79,37 +78,41 @@ func runListNodes(cmd *cobra.Command, args []string) {
 		log.Fatalf("failed to list nodes: %v", err)
 	}
 
-	nodes := make([]*gossipv1.Heartbeat, len(lastHeartbeats.RawHeartbeats))
-	i := 0
-	for _, v := range lastHeartbeats.RawHeartbeats {
-		nodes[i] = v
-		i += 1
-	}
+	nodes := lastHeartbeats.Entries
+
 	sort.Slice(nodes, func(i, j int) bool {
-		return nodes[i].NodeName < nodes[j].NodeName
+		if nodes[i].RawHeartbeat == nil || nodes[j].RawHeartbeat == nil {
+			return false
+		}
+		return nodes[i].RawHeartbeat.NodeName < nodes[j].RawHeartbeat.NodeName
 	})
 
 	log.Printf("%d nodes in guardian state set", len(nodes))
 
 	w := tabwriter.NewWriter(os.Stdout, 0, 8, 2, ' ', 0)
 
-	w.Write([]byte("Guardian key\tNode name\tVersion\tLast seen\tUptime\tSolana\tEthereum\tTerra\tBSC\n"))
+	w.Write([]byte("Node key\tGuardian key\tNode name\tVersion\tLast seen\tUptime\tSolana\tEthereum\tTerra\tBSC\n"))
 
 	for _, h := range nodes {
-		last := time.Unix(0, h.Timestamp)
+		if h.RawHeartbeat == nil {
+			continue
+		}
+
+		last := time.Unix(0, h.RawHeartbeat.Timestamp)
 
 		heights := map[vaa.ChainID]int64{}
-		for _, n := range h.Networks {
+		for _, n := range h.RawHeartbeat.Networks {
 			heights[vaa.ChainID(n.Id)] = n.Height
 		}
 
 		fmt.Fprintf(w,
-			"%s\t%s\t%s\t%s\t%d\t%d\t%d\t%d\t%d\n",
-			h.GuardianAddr,
-			h.NodeName,
-			h.Version,
+			"%s\t%s\t%s\t%s\t%s\t%d\t%d\t%d\t%d\t%d\n",
+			h.P2PNodeAddr,
+			h.RawHeartbeat.GuardianAddr,
+			h.RawHeartbeat.NodeName,
+			h.RawHeartbeat.Version,
 			time.Since(last),
-			h.Counter,
+			h.RawHeartbeat.Counter,
 			heights[vaa.ChainIDSolana],
 			heights[vaa.ChainIDEthereum],
 			heights[vaa.ChainIDTerra],

+ 42 - 13
bridge/pkg/common/guardianset.go

@@ -1,9 +1,10 @@
 package common
 
 import (
+	"fmt"
 	gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
 	"github.com/ethereum/go-ethereum/common"
-	"google.golang.org/protobuf/proto"
+	"github.com/libp2p/go-libp2p-core/peer"
 	"sync"
 )
 
@@ -16,6 +17,13 @@ import (
 // but presumably, chain-specific transaction size limits will apply at some point (untested).
 const MaxGuardianCount = 19
 
+// MaxNodesPerGuardian specifies the maximum amount of nodes per guardian key that we'll accept
+// whenever we maintain any per-guardian, per-node state.
+//
+// There currently isn't any state clean up, so the value is on the high side to prevent
+// accidentally reaching the limit due to operational mistakes.
+const MaxNodesPerGuardian = 15
+
 type GuardianSet struct {
 	// Guardian's public key hashes truncated by the ETH standard hashing mechanism (20 bytes).
 	Keys []common.Address
@@ -49,14 +57,14 @@ type GuardianSetState struct {
 	mu      sync.Mutex
 	current *GuardianSet
 
-	// Last heartbeat message received per guardian. Maintained
+	// Last heartbeat message received per guardian per p2p node. Maintained
 	// across guardian set updates - these values don't change.
-	lastHeartbeat map[common.Address]*gossipv1.Heartbeat
+	lastHeartbeats map[common.Address]map[peer.ID]*gossipv1.Heartbeat
 }
 
 func NewGuardianSetState() *GuardianSetState {
 	return &GuardianSetState{
-		lastHeartbeat: map[common.Address]*gossipv1.Heartbeat{},
+		lastHeartbeats: map[common.Address]map[peer.ID]*gossipv1.Heartbeat{},
 	}
 }
 
@@ -76,29 +84,50 @@ func (st *GuardianSetState) Get() *GuardianSet {
 
 // LastHeartbeat returns the most recent heartbeat message received for
 // a given guardian node, or nil if none have been received.
-func (st *GuardianSetState) LastHeartbeat(addr common.Address) *gossipv1.Heartbeat {
+func (st *GuardianSetState) LastHeartbeat(addr common.Address) map[peer.ID]*gossipv1.Heartbeat {
 	st.mu.Lock()
 	defer st.mu.Unlock()
-	return st.lastHeartbeat[addr]
+	ret := make(map[peer.ID]*gossipv1.Heartbeat)
+	for k, v := range st.lastHeartbeats[addr] {
+		ret[k] = v
+	}
+	return ret
 }
 
-// SetHeartBeat stores a verified heartbeat observed by a given guardian.
-func (st *GuardianSetState) SetHeartBeat(addr common.Address, hb *gossipv1.Heartbeat) {
+// SetHeartbeat stores a verified heartbeat observed by a given guardian.
+func (st *GuardianSetState) SetHeartbeat(addr common.Address, peerId peer.ID, hb *gossipv1.Heartbeat) error {
 	st.mu.Lock()
 	defer st.mu.Unlock()
-	st.lastHeartbeat[addr] = hb
+
+	v, ok := st.lastHeartbeats[addr]
+
+	if !ok {
+		v = make(map[peer.ID]*gossipv1.Heartbeat)
+		st.lastHeartbeats[addr] = v
+	} else {
+		if len(v) >= MaxNodesPerGuardian {
+			// TODO: age out old entries?
+			return fmt.Errorf("too many nodes (%d) for guardian, cannot store entry", len(v))
+		}
+	}
+
+	v[peerId] = hb
+	return nil
 }
 
 // GetAll returns all stored heartbeats.
-func (st *GuardianSetState) GetAll() map[common.Address]*gossipv1.Heartbeat {
+func (st *GuardianSetState) GetAll() map[common.Address]map[peer.ID]*gossipv1.Heartbeat {
 	st.mu.Lock()
 	defer st.mu.Unlock()
 
-	ret := make(map[common.Address]*gossipv1.Heartbeat)
+	ret := make(map[common.Address]map[peer.ID]*gossipv1.Heartbeat)
 
 	// Deep copy
-	for k, v := range st.lastHeartbeat {
-		ret[k] = proto.Clone(v).(*gossipv1.Heartbeat)
+	for addr, v := range st.lastHeartbeats {
+		ret[addr] = make(map[peer.ID]*gossipv1.Heartbeat)
+		for peerId, hb := range v {
+			ret[addr][peerId] = hb
+		}
 	}
 
 	return ret

+ 8 - 4
bridge/pkg/p2p/p2p.go

@@ -203,7 +203,9 @@ func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, rawHeartbeat
 
 					ourAddr := ethcrypto.PubkeyToAddress(gk.PublicKey)
 					rawHeartbeatListeners.PublishHeartbeat(heartbeat)
-					gst.SetHeartBeat(ourAddr, heartbeat)
+					if err := gst.SetHeartbeat(ourAddr, h.ID(), heartbeat); err != nil {
+						panic(err)
+					}
 
 					b, err := proto.Marshal(heartbeat)
 					if err != nil {
@@ -303,7 +305,7 @@ func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, rawHeartbeat
 						zap.String("from", envelope.GetFrom().String()))
 					break
 				}
-				if heartbeat, err := processSignedHeartbeat(s, gs, gst, disableHeartbeatVerify); err != nil {
+				if heartbeat, err := processSignedHeartbeat(envelope.GetFrom(), s, gs, gst, disableHeartbeatVerify); err != nil {
 					p2pMessagesReceived.WithLabelValues("invalid_heartbeat").Inc()
 					logger.Warn("invalid signed heartbeat received",
 						zap.Error(err),
@@ -332,7 +334,7 @@ func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, rawHeartbeat
 	}
 }
 
-func processSignedHeartbeat(s *gossipv1.SignedHeartbeat, gs *bridge_common.GuardianSet, gst *bridge_common.GuardianSetState, disableVerify bool) (*gossipv1.Heartbeat, error) {
+func processSignedHeartbeat(from peer.ID, s *gossipv1.SignedHeartbeat, gs *bridge_common.GuardianSet, gst *bridge_common.GuardianSetState, disableVerify bool) (*gossipv1.Heartbeat, error) {
 	envelopeAddr := common.BytesToAddress(s.GuardianAddr)
 	idx, ok := gs.KeyIndex(envelopeAddr)
 	var pk common.Address
@@ -363,7 +365,9 @@ func processSignedHeartbeat(s *gossipv1.SignedHeartbeat, gs *bridge_common.Guard
 	}
 
 	// Store verified heartbeat in global guardian set state.
-	gst.SetHeartBeat(signerAddr, &h)
+	if err := gst.SetHeartbeat(signerAddr, from, &h); err != nil {
+		return nil, fmt.Errorf("failed to store in guardian set state: %w", err)
+	}
 
 	return &h, nil
 }

+ 9 - 10
bridge/pkg/publicrpc/publicrpcserver.go

@@ -44,20 +44,19 @@ func (s *PublicrpcServer) GetLastHeartbeats(ctx context.Context, req *publicrpcv
 	}
 
 	resp := &publicrpcv1.GetLastHeartbeatResponse{
-		RawHeartbeats: make(map[string]*gossipv1.Heartbeat),
-	}
-
-	// Request heartbeat for every guardian set entry. This ensures that
-	// offline guardians will be listed with a null heartbeat.
-	for _, addr := range gs.Keys {
-		hb := s.gst.LastHeartbeat(addr)
-		resp.RawHeartbeats[addr.Hex()] = hb
+		Entries: make([]*publicrpcv1.GetLastHeartbeatResponse_Entry, 0),
 	}
 
 	// Fetch all heartbeats (including from nodes not in the guardian set - which
 	// can happen either with --disableHeartbeatVerify or when the guardian set changes)
-	for addr, hb := range s.gst.GetAll() {
-		resp.RawHeartbeats[addr.Hex()] = hb
+	for addr, v := range s.gst.GetAll() {
+		for peerId, hb := range v {
+			resp.Entries = append(resp.Entries, &publicrpcv1.GetLastHeartbeatResponse_Entry{
+				VerifiedGuardianAddr: addr.Hex(),
+				P2PNodeAddr:          peerId.Pretty(),
+				RawHeartbeat:         hb,
+			})
+		}
 	}
 
 	return resp, nil

+ 17 - 3
proto/publicrpc/v1/publicrpc.proto

@@ -19,7 +19,7 @@ enum EmitterChain {
 message MessageID {
   // Emitter chain ID.
   EmitterChain emitter_chain = 1;
-  // Hex-encoded emitter address.
+  // Hex-encoded (without leading 0x) emitter address.
   string emitter_address = 2;
   // Sequence number for (emitter_chain, emitter_address).
   int64 sequence = 3;
@@ -67,6 +67,20 @@ message GetLastHeartbeatRequest {
 }
 
 message GetLastHeartbeatResponse {
-  // Mapping of hex-encoded guardian addresses to raw heartbeat messages.
-  map<string, gossip.v1.Heartbeat> raw_heartbeats = 1;
+  message Entry {
+    // Verified, hex-encoded (with leading 0x) guardian address. This is the guardian address
+    // which signed this heartbeat. The GuardianAddr field inside the heartbeat
+    // is NOT verified - remote nodes can put arbitrary data in it.
+    string verified_guardian_addr = 1;
+
+    // Base58-encoded libp2p node address that sent this heartbeat, used to
+    // distinguish between multiple nodes running for the same guardian.
+    string p2p_node_addr = 2;
+
+    // Raw heartbeat received from the network. Data is only as trusted
+    // as the guardian node that sent it - none of the fields are verified.
+    gossip.v1.Heartbeat raw_heartbeat = 3;
+  }
+
+  repeated Entry entries = 1;
 }