p2p.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. package p2p
  2. import (
  3. "context"
  4. "crypto/ecdsa"
  5. "errors"
  6. "fmt"
  7. bridge_common "github.com/certusone/wormhole/node/pkg/common"
  8. "github.com/certusone/wormhole/node/pkg/vaa"
  9. "github.com/certusone/wormhole/node/pkg/version"
  10. "github.com/ethereum/go-ethereum/common"
  11. ethcrypto "github.com/ethereum/go-ethereum/crypto"
  12. "github.com/prometheus/client_golang/prometheus"
  13. "github.com/prometheus/client_golang/prometheus/promauto"
  14. "strings"
  15. "time"
  16. "github.com/libp2p/go-libp2p-core/peer"
  17. "github.com/multiformats/go-multiaddr"
  18. "github.com/libp2p/go-libp2p"
  19. connmgr "github.com/libp2p/go-libp2p-connmgr"
  20. "github.com/libp2p/go-libp2p-core/crypto"
  21. "github.com/libp2p/go-libp2p-core/host"
  22. "github.com/libp2p/go-libp2p-core/protocol"
  23. "github.com/libp2p/go-libp2p-core/routing"
  24. dht "github.com/libp2p/go-libp2p-kad-dht"
  25. pubsub "github.com/libp2p/go-libp2p-pubsub"
  26. libp2pquic "github.com/libp2p/go-libp2p-quic-transport"
  27. libp2ptls "github.com/libp2p/go-libp2p-tls"
  28. "go.uber.org/zap"
  29. "google.golang.org/protobuf/proto"
  30. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  31. "github.com/certusone/wormhole/node/pkg/supervisor"
  32. )
  33. var (
  34. p2pHeartbeatsSent = promauto.NewCounter(
  35. prometheus.CounterOpts{
  36. Name: "wormhole_p2p_heartbeats_sent_total",
  37. Help: "Total number of p2p heartbeats sent",
  38. })
  39. p2pMessagesSent = promauto.NewCounter(
  40. prometheus.CounterOpts{
  41. Name: "wormhole_p2p_broadcast_messages_sent_total",
  42. Help: "Total number of p2p pubsub broadcast messages sent",
  43. })
  44. p2pMessagesReceived = promauto.NewCounterVec(
  45. prometheus.CounterOpts{
  46. Name: "wormhole_p2p_broadcast_messages_received_total",
  47. Help: "Total number of p2p pubsub broadcast messages received",
  48. }, []string{"type"})
  49. )
  50. var heartbeatMessagePrefix = []byte("heartbeat|")
  51. func heartbeatDigest(b []byte) common.Hash {
  52. return ethcrypto.Keccak256Hash(append(heartbeatMessagePrefix, b...))
  53. }
  54. func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, priv crypto.PrivKey, gk *ecdsa.PrivateKey, gst *bridge_common.GuardianSetState, port uint, networkID string, bootstrapPeers string, nodeName string, disableHeartbeatVerify bool, rootCtxCancel context.CancelFunc) func(ctx context.Context) error {
  55. return func(ctx context.Context) (re error) {
  56. logger := supervisor.Logger(ctx)
  57. h, err := libp2p.New(ctx,
  58. // Use the keypair we generated
  59. libp2p.Identity(priv),
  60. // Multiple listen addresses
  61. libp2p.ListenAddrStrings(
  62. // Listen on QUIC only.
  63. // https://github.com/libp2p/go-libp2p/issues/688
  64. fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic", port),
  65. fmt.Sprintf("/ip6/::/udp/%d/quic", port),
  66. ),
  67. // Enable TLS security as the only security protocol.
  68. libp2p.Security(libp2ptls.ID, libp2ptls.New),
  69. // Enable QUIC transport as the only transport.
  70. libp2p.Transport(libp2pquic.NewTransport),
  71. // Let's prevent our peer from having too many
  72. // connections by attaching a connection manager.
  73. libp2p.ConnectionManager(connmgr.NewConnManager(
  74. 100, // Lowwater
  75. 400, // HighWater,
  76. time.Minute, // GracePeriod
  77. )),
  78. // Let this host use the DHT to find other hosts
  79. libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
  80. // TODO(leo): Persistent data store (i.e. address book)
  81. idht, err := dht.New(ctx, h, dht.Mode(dht.ModeServer),
  82. // This intentionally makes us incompatible with the global IPFS DHT
  83. dht.ProtocolPrefix(protocol.ID("/"+networkID)),
  84. )
  85. return idht, err
  86. }),
  87. )
  88. if err != nil {
  89. panic(err)
  90. }
  91. defer func() {
  92. // TODO: libp2p cannot be cleanly restarted (https://github.com/libp2p/go-libp2p/issues/992)
  93. logger.Error("p2p routine has exited, cancelling root context...", zap.Error(re))
  94. rootCtxCancel()
  95. }()
  96. logger.Info("Connecting to bootstrap peers", zap.String("bootstrap_peers", bootstrapPeers))
  97. topic := fmt.Sprintf("%s/%s", networkID, "broadcast")
  98. logger.Info("Subscribing pubsub topic", zap.String("topic", topic))
  99. ps, err := pubsub.NewGossipSub(ctx, h)
  100. if err != nil {
  101. panic(err)
  102. }
  103. th, err := ps.Join(topic)
  104. if err != nil {
  105. return fmt.Errorf("failed to join topic: %w", err)
  106. }
  107. sub, err := th.Subscribe()
  108. if err != nil {
  109. return fmt.Errorf("failed to subscribe topic: %w", err)
  110. }
  111. // Add our own bootstrap nodes
  112. // Count number of successful connection attempts. If we fail to connect to any bootstrap peer, kill
  113. // the service and have supervisor retry it.
  114. successes := 0
  115. // Are we a bootstrap node? If so, it's okay to not have any peers.
  116. bootstrapNode := false
  117. for _, addr := range strings.Split(bootstrapPeers, ",") {
  118. if addr == "" {
  119. continue
  120. }
  121. ma, err := multiaddr.NewMultiaddr(addr)
  122. if err != nil {
  123. logger.Error("Invalid bootstrap address", zap.String("peer", addr), zap.Error(err))
  124. continue
  125. }
  126. pi, err := peer.AddrInfoFromP2pAddr(ma)
  127. if err != nil {
  128. logger.Error("Invalid bootstrap address", zap.String("peer", addr), zap.Error(err))
  129. continue
  130. }
  131. if pi.ID == h.ID() {
  132. logger.Info("We're a bootstrap node")
  133. bootstrapNode = true
  134. continue
  135. }
  136. if err = h.Connect(ctx, *pi); err != nil {
  137. logger.Error("Failed to connect to bootstrap peer", zap.String("peer", addr), zap.Error(err))
  138. } else {
  139. successes += 1
  140. }
  141. }
  142. // TODO: continually reconnect to bootstrap nodes?
  143. if successes == 0 && !bootstrapNode {
  144. return fmt.Errorf("failed to connect to any bootstrap peer")
  145. } else {
  146. logger.Info("Connected to bootstrap peers", zap.Int("num", successes))
  147. }
  148. logger.Info("Node has been started", zap.String("peer_id", h.ID().String()),
  149. zap.String("addrs", fmt.Sprintf("%v", h.Addrs())))
  150. bootTime := time.Now()
  151. go func() {
  152. ctr := int64(0)
  153. tick := time.NewTicker(15 * time.Second)
  154. defer tick.Stop()
  155. for {
  156. select {
  157. case <-ctx.Done():
  158. return
  159. case <-tick.C:
  160. DefaultRegistry.mu.Lock()
  161. networks := make([]*gossipv1.Heartbeat_Network, 0, len(DefaultRegistry.networkStats))
  162. for _, v := range DefaultRegistry.networkStats {
  163. errCtr := DefaultRegistry.GetErrorCount(vaa.ChainID(v.Id))
  164. v.ErrorCount = errCtr
  165. networks = append(networks, v)
  166. }
  167. heartbeat := &gossipv1.Heartbeat{
  168. NodeName: nodeName,
  169. Counter: ctr,
  170. Timestamp: time.Now().UnixNano(),
  171. Networks: networks,
  172. Version: version.Version(),
  173. GuardianAddr: DefaultRegistry.guardianAddress,
  174. BootTimestamp: bootTime.UnixNano(),
  175. }
  176. ourAddr := ethcrypto.PubkeyToAddress(gk.PublicKey)
  177. if err := gst.SetHeartbeat(ourAddr, h.ID(), heartbeat); err != nil {
  178. panic(err)
  179. }
  180. collectNodeMetrics(ourAddr, h.ID(), heartbeat)
  181. b, err := proto.Marshal(heartbeat)
  182. if err != nil {
  183. panic(err)
  184. }
  185. DefaultRegistry.mu.Unlock()
  186. // Sign the heartbeat using our node's guardian key.
  187. digest := heartbeatDigest(b)
  188. sig, err := ethcrypto.Sign(digest.Bytes(), gk)
  189. if err != nil {
  190. panic(err)
  191. }
  192. msg := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_SignedHeartbeat{
  193. SignedHeartbeat: &gossipv1.SignedHeartbeat{
  194. Heartbeat: b,
  195. Signature: sig,
  196. GuardianAddr: ourAddr.Bytes(),
  197. }}}
  198. b, err = proto.Marshal(&msg)
  199. if err != nil {
  200. panic(err)
  201. }
  202. err = th.Publish(ctx, b)
  203. if err != nil {
  204. logger.Warn("failed to publish heartbeat message", zap.Error(err))
  205. }
  206. p2pHeartbeatsSent.Inc()
  207. ctr += 1
  208. }
  209. }
  210. }()
  211. go func() {
  212. for {
  213. select {
  214. case <-ctx.Done():
  215. return
  216. case msg := <-sendC:
  217. err := th.Publish(ctx, msg)
  218. p2pMessagesSent.Inc()
  219. if err != nil {
  220. logger.Error("failed to publish message from queue", zap.Error(err))
  221. }
  222. }
  223. }
  224. }()
  225. for {
  226. envelope, err := sub.Next(ctx)
  227. if err != nil {
  228. return fmt.Errorf("failed to receive pubsub message: %w", err)
  229. }
  230. var msg gossipv1.GossipMessage
  231. err = proto.Unmarshal(envelope.Data, &msg)
  232. if err != nil {
  233. logger.Info("received invalid message",
  234. zap.String("data", string(envelope.Data)),
  235. zap.String("from", envelope.GetFrom().String()))
  236. p2pMessagesReceived.WithLabelValues("invalid").Inc()
  237. continue
  238. }
  239. if envelope.GetFrom() == h.ID() {
  240. logger.Debug("received message from ourselves, ignoring",
  241. zap.Any("payload", msg.Message))
  242. p2pMessagesReceived.WithLabelValues("loopback").Inc()
  243. continue
  244. }
  245. logger.Debug("received message",
  246. zap.Any("payload", msg.Message),
  247. zap.Binary("raw", envelope.Data),
  248. zap.String("from", envelope.GetFrom().String()))
  249. switch m := msg.Message.(type) {
  250. case *gossipv1.GossipMessage_SignedHeartbeat:
  251. s := m.SignedHeartbeat
  252. gs := gst.Get()
  253. if gs == nil {
  254. // No valid guardian set yet - dropping heartbeat
  255. logger.Debug("skipping heartbeat - no guardian set",
  256. zap.Any("value", s),
  257. zap.String("from", envelope.GetFrom().String()))
  258. break
  259. }
  260. if heartbeat, err := processSignedHeartbeat(envelope.GetFrom(), s, gs, gst, disableHeartbeatVerify); err != nil {
  261. p2pMessagesReceived.WithLabelValues("invalid_heartbeat").Inc()
  262. logger.Debug("invalid signed heartbeat received",
  263. zap.Error(err),
  264. zap.Any("payload", msg.Message),
  265. zap.Any("value", s),
  266. zap.Binary("raw", envelope.Data),
  267. zap.String("from", envelope.GetFrom().String()))
  268. } else {
  269. p2pMessagesReceived.WithLabelValues("valid_heartbeat").Inc()
  270. logger.Debug("valid signed heartbeat received",
  271. zap.Any("value", heartbeat),
  272. zap.String("from", envelope.GetFrom().String()))
  273. }
  274. case *gossipv1.GossipMessage_SignedObservation:
  275. obsvC <- m.SignedObservation
  276. p2pMessagesReceived.WithLabelValues("observation").Inc()
  277. default:
  278. p2pMessagesReceived.WithLabelValues("unknown").Inc()
  279. logger.Warn("received unknown message type (running outdated software?)",
  280. zap.Any("payload", msg.Message),
  281. zap.Binary("raw", envelope.Data),
  282. zap.String("from", envelope.GetFrom().String()))
  283. }
  284. }
  285. }
  286. }
  287. func processSignedHeartbeat(from peer.ID, s *gossipv1.SignedHeartbeat, gs *bridge_common.GuardianSet, gst *bridge_common.GuardianSetState, disableVerify bool) (*gossipv1.Heartbeat, error) {
  288. envelopeAddr := common.BytesToAddress(s.GuardianAddr)
  289. idx, ok := gs.KeyIndex(envelopeAddr)
  290. var pk common.Address
  291. if !ok {
  292. if !disableVerify {
  293. return nil, fmt.Errorf("invalid message: %s not in guardian set", envelopeAddr)
  294. }
  295. } else {
  296. pk = gs.Keys[idx]
  297. }
  298. digest := heartbeatDigest(s.Heartbeat)
  299. pubKey, err := ethcrypto.Ecrecover(digest.Bytes(), s.Signature)
  300. if err != nil {
  301. return nil, errors.New("failed to recover public key")
  302. }
  303. signerAddr := common.BytesToAddress(ethcrypto.Keccak256(pubKey[1:])[12:])
  304. if pk != signerAddr && !disableVerify {
  305. return nil, fmt.Errorf("invalid signer: %v", signerAddr)
  306. }
  307. var h gossipv1.Heartbeat
  308. err = proto.Unmarshal(s.Heartbeat, &h)
  309. if err != nil {
  310. return nil, fmt.Errorf("failed to unmarshal heartbeat: %w", err)
  311. }
  312. // Store verified heartbeat in global guardian set state.
  313. if err := gst.SetHeartbeat(signerAddr, from, &h); err != nil {
  314. return nil, fmt.Errorf("failed to store in guardian set state: %w", err)
  315. }
  316. collectNodeMetrics(signerAddr, from, &h)
  317. return &h, nil
  318. }