p2p.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515
  1. package p2p
  2. import (
  3. "context"
  4. "crypto/ecdsa"
  5. "errors"
  6. "fmt"
  7. node_common "github.com/certusone/wormhole/node/pkg/common"
  8. "github.com/certusone/wormhole/node/pkg/vaa"
  9. "github.com/certusone/wormhole/node/pkg/version"
  10. "github.com/ethereum/go-ethereum/common"
  11. ethcrypto "github.com/ethereum/go-ethereum/crypto"
  12. "github.com/prometheus/client_golang/prometheus"
  13. "github.com/prometheus/client_golang/prometheus/promauto"
  14. "strings"
  15. "time"
  16. "github.com/libp2p/go-libp2p-core/peer"
  17. "github.com/multiformats/go-multiaddr"
  18. "github.com/libp2p/go-libp2p"
  19. connmgr "github.com/libp2p/go-libp2p-connmgr"
  20. "github.com/libp2p/go-libp2p-core/crypto"
  21. "github.com/libp2p/go-libp2p-core/host"
  22. "github.com/libp2p/go-libp2p-core/protocol"
  23. "github.com/libp2p/go-libp2p-core/routing"
  24. dht "github.com/libp2p/go-libp2p-kad-dht"
  25. pubsub "github.com/libp2p/go-libp2p-pubsub"
  26. libp2pquic "github.com/libp2p/go-libp2p-quic-transport"
  27. libp2ptls "github.com/libp2p/go-libp2p-tls"
  28. "go.uber.org/zap"
  29. "google.golang.org/protobuf/proto"
  30. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  31. "github.com/certusone/wormhole/node/pkg/supervisor"
  32. )
  33. var (
  34. p2pHeartbeatsSent = promauto.NewCounter(
  35. prometheus.CounterOpts{
  36. Name: "wormhole_p2p_heartbeats_sent_total",
  37. Help: "Total number of p2p heartbeats sent",
  38. })
  39. p2pMessagesSent = promauto.NewCounter(
  40. prometheus.CounterOpts{
  41. Name: "wormhole_p2p_broadcast_messages_sent_total",
  42. Help: "Total number of p2p pubsub broadcast messages sent",
  43. })
  44. p2pMessagesReceived = promauto.NewCounterVec(
  45. prometheus.CounterOpts{
  46. Name: "wormhole_p2p_broadcast_messages_received_total",
  47. Help: "Total number of p2p pubsub broadcast messages received",
  48. }, []string{"type"})
  49. )
  50. var heartbeatMessagePrefix = []byte("heartbeat|")
  51. var signedObservationRequestPrefix = []byte("signed_observation_request|")
  52. func heartbeatDigest(b []byte) common.Hash {
  53. return ethcrypto.Keccak256Hash(append(heartbeatMessagePrefix, b...))
  54. }
  55. func signedObservationRequestDigest(b []byte) common.Hash {
  56. return ethcrypto.Keccak256Hash(append(signedObservationRequestPrefix, b...))
  57. }
  58. func Run(obsvC chan *gossipv1.SignedObservation,
  59. sendC chan []byte,
  60. obsvReqC chan *gossipv1.ObservationRequest,
  61. obsvReqSendC chan *gossipv1.ObservationRequest,
  62. priv crypto.PrivKey,
  63. gk *ecdsa.PrivateKey,
  64. gst *node_common.GuardianSetState,
  65. port uint,
  66. networkID string,
  67. bootstrapPeers string,
  68. nodeName string,
  69. rootCtxCancel context.CancelFunc) func(ctx context.Context) error {
  70. return func(ctx context.Context) (re error) {
  71. logger := supervisor.Logger(ctx)
  72. h, err := libp2p.New(ctx,
  73. // Use the keypair we generated
  74. libp2p.Identity(priv),
  75. // Multiple listen addresses
  76. libp2p.ListenAddrStrings(
  77. // Listen on QUIC only.
  78. // https://github.com/libp2p/go-libp2p/issues/688
  79. fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic", port),
  80. fmt.Sprintf("/ip6/::/udp/%d/quic", port),
  81. ),
  82. // Enable TLS security as the only security protocol.
  83. libp2p.Security(libp2ptls.ID, libp2ptls.New),
  84. // Enable QUIC transport as the only transport.
  85. libp2p.Transport(libp2pquic.NewTransport),
  86. // Let's prevent our peer from having too many
  87. // connections by attaching a connection manager.
  88. libp2p.ConnectionManager(connmgr.NewConnManager(
  89. 100, // Lowwater
  90. 400, // HighWater,
  91. time.Minute, // GracePeriod
  92. )),
  93. // Let this host use the DHT to find other hosts
  94. libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
  95. // TODO(leo): Persistent data store (i.e. address book)
  96. idht, err := dht.New(ctx, h, dht.Mode(dht.ModeServer),
  97. // This intentionally makes us incompatible with the global IPFS DHT
  98. dht.ProtocolPrefix(protocol.ID("/"+networkID)),
  99. )
  100. return idht, err
  101. }),
  102. )
  103. if err != nil {
  104. panic(err)
  105. }
  106. defer func() {
  107. // TODO: libp2p cannot be cleanly restarted (https://github.com/libp2p/go-libp2p/issues/992)
  108. logger.Error("p2p routine has exited, cancelling root context...", zap.Error(re))
  109. rootCtxCancel()
  110. }()
  111. logger.Info("Connecting to bootstrap peers", zap.String("bootstrap_peers", bootstrapPeers))
  112. topic := fmt.Sprintf("%s/%s", networkID, "broadcast")
  113. logger.Info("Subscribing pubsub topic", zap.String("topic", topic))
  114. ps, err := pubsub.NewGossipSub(ctx, h)
  115. if err != nil {
  116. panic(err)
  117. }
  118. th, err := ps.Join(topic)
  119. if err != nil {
  120. return fmt.Errorf("failed to join topic: %w", err)
  121. }
  122. sub, err := th.Subscribe()
  123. if err != nil {
  124. return fmt.Errorf("failed to subscribe topic: %w", err)
  125. }
  126. // Add our own bootstrap nodes
  127. // Count number of successful connection attempts. If we fail to connect to any bootstrap peer, kill
  128. // the service and have supervisor retry it.
  129. successes := 0
  130. // Are we a bootstrap node? If so, it's okay to not have any peers.
  131. bootstrapNode := false
  132. for _, addr := range strings.Split(bootstrapPeers, ",") {
  133. if addr == "" {
  134. continue
  135. }
  136. ma, err := multiaddr.NewMultiaddr(addr)
  137. if err != nil {
  138. logger.Error("Invalid bootstrap address", zap.String("peer", addr), zap.Error(err))
  139. continue
  140. }
  141. pi, err := peer.AddrInfoFromP2pAddr(ma)
  142. if err != nil {
  143. logger.Error("Invalid bootstrap address", zap.String("peer", addr), zap.Error(err))
  144. continue
  145. }
  146. if pi.ID == h.ID() {
  147. logger.Info("We're a bootstrap node")
  148. bootstrapNode = true
  149. continue
  150. }
  151. if err = h.Connect(ctx, *pi); err != nil {
  152. logger.Error("Failed to connect to bootstrap peer", zap.String("peer", addr), zap.Error(err))
  153. } else {
  154. successes += 1
  155. }
  156. }
  157. // TODO: continually reconnect to bootstrap nodes?
  158. if successes == 0 && !bootstrapNode {
  159. return fmt.Errorf("failed to connect to any bootstrap peer")
  160. } else {
  161. logger.Info("Connected to bootstrap peers", zap.Int("num", successes))
  162. }
  163. logger.Info("Node has been started", zap.String("peer_id", h.ID().String()),
  164. zap.String("addrs", fmt.Sprintf("%v", h.Addrs())))
  165. bootTime := time.Now()
  166. // Periodically run guardian state set cleanup.
  167. go func() {
  168. ticker := time.NewTicker(15 * time.Second)
  169. defer ticker.Stop()
  170. for {
  171. select {
  172. case <-ticker.C:
  173. gst.Cleanup()
  174. case <-ctx.Done():
  175. return
  176. }
  177. }
  178. }()
  179. go func() {
  180. // Disable heartbeat when no node name is provided (spy mode)
  181. if nodeName == "" {
  182. return
  183. }
  184. ctr := int64(0)
  185. tick := time.NewTicker(15 * time.Second)
  186. defer tick.Stop()
  187. for {
  188. select {
  189. case <-ctx.Done():
  190. return
  191. case <-tick.C:
  192. DefaultRegistry.mu.Lock()
  193. networks := make([]*gossipv1.Heartbeat_Network, 0, len(DefaultRegistry.networkStats))
  194. for _, v := range DefaultRegistry.networkStats {
  195. errCtr := DefaultRegistry.GetErrorCount(vaa.ChainID(v.Id))
  196. v.ErrorCount = errCtr
  197. networks = append(networks, v)
  198. }
  199. heartbeat := &gossipv1.Heartbeat{
  200. NodeName: nodeName,
  201. Counter: ctr,
  202. Timestamp: time.Now().UnixNano(),
  203. Networks: networks,
  204. Version: version.Version(),
  205. GuardianAddr: DefaultRegistry.guardianAddress,
  206. BootTimestamp: bootTime.UnixNano(),
  207. }
  208. ourAddr := ethcrypto.PubkeyToAddress(gk.PublicKey)
  209. if err := gst.SetHeartbeat(ourAddr, h.ID(), heartbeat); err != nil {
  210. panic(err)
  211. }
  212. collectNodeMetrics(ourAddr, h.ID(), heartbeat)
  213. b, err := proto.Marshal(heartbeat)
  214. if err != nil {
  215. panic(err)
  216. }
  217. DefaultRegistry.mu.Unlock()
  218. // Sign the heartbeat using our node's guardian key.
  219. digest := heartbeatDigest(b)
  220. sig, err := ethcrypto.Sign(digest.Bytes(), gk)
  221. if err != nil {
  222. panic(err)
  223. }
  224. msg := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_SignedHeartbeat{
  225. SignedHeartbeat: &gossipv1.SignedHeartbeat{
  226. Heartbeat: b,
  227. Signature: sig,
  228. GuardianAddr: ourAddr.Bytes(),
  229. }}}
  230. b, err = proto.Marshal(&msg)
  231. if err != nil {
  232. panic(err)
  233. }
  234. err = th.Publish(ctx, b)
  235. if err != nil {
  236. logger.Warn("failed to publish heartbeat message", zap.Error(err))
  237. }
  238. p2pHeartbeatsSent.Inc()
  239. ctr += 1
  240. }
  241. }
  242. }()
  243. go func() {
  244. for {
  245. select {
  246. case <-ctx.Done():
  247. return
  248. case msg := <-sendC:
  249. err := th.Publish(ctx, msg)
  250. p2pMessagesSent.Inc()
  251. if err != nil {
  252. logger.Error("failed to publish message from queue", zap.Error(err))
  253. }
  254. case msg := <-obsvReqSendC:
  255. b, err := proto.Marshal(msg)
  256. if err != nil {
  257. panic(err)
  258. }
  259. // Sign the observation request using our node's guardian key.
  260. digest := signedObservationRequestDigest(b)
  261. sig, err := ethcrypto.Sign(digest.Bytes(), gk)
  262. if err != nil {
  263. panic(err)
  264. }
  265. sReq := &gossipv1.SignedObservationRequest{
  266. ObservationRequest: b,
  267. Signature: sig,
  268. GuardianAddr: ethcrypto.PubkeyToAddress(gk.PublicKey).Bytes(),
  269. }
  270. envelope := &gossipv1.GossipMessage{
  271. Message: &gossipv1.GossipMessage_SignedObservationRequest{
  272. SignedObservationRequest: sReq}}
  273. b, err = proto.Marshal(envelope)
  274. if err != nil {
  275. panic(err)
  276. }
  277. // Send to local observation request queue (the loopback message is ignored)
  278. obsvReqC <- msg
  279. err = th.Publish(ctx, b)
  280. p2pMessagesSent.Inc()
  281. if err != nil {
  282. logger.Error("failed to publish observation request", zap.Error(err))
  283. } else {
  284. logger.Info("published signed observation request", zap.Any("signed_observation_request", sReq))
  285. }
  286. }
  287. }
  288. }()
  289. for {
  290. envelope, err := sub.Next(ctx)
  291. if err != nil {
  292. return fmt.Errorf("failed to receive pubsub message: %w", err)
  293. }
  294. var msg gossipv1.GossipMessage
  295. err = proto.Unmarshal(envelope.Data, &msg)
  296. if err != nil {
  297. logger.Info("received invalid message",
  298. zap.Binary("data", envelope.Data),
  299. zap.String("from", envelope.GetFrom().String()))
  300. p2pMessagesReceived.WithLabelValues("invalid").Inc()
  301. continue
  302. }
  303. if envelope.GetFrom() == h.ID() {
  304. logger.Debug("received message from ourselves, ignoring",
  305. zap.Any("payload", msg.Message))
  306. p2pMessagesReceived.WithLabelValues("loopback").Inc()
  307. continue
  308. }
  309. logger.Debug("received message",
  310. zap.Any("payload", msg.Message),
  311. zap.Binary("raw", envelope.Data),
  312. zap.String("from", envelope.GetFrom().String()))
  313. switch m := msg.Message.(type) {
  314. case *gossipv1.GossipMessage_Heartbeat:
  315. logger.Debug("heartbeat received",
  316. zap.Any("value", m.Heartbeat),
  317. zap.String("from", envelope.GetFrom().String()))
  318. p2pMessagesReceived.WithLabelValues("heartbeat").Inc()
  319. case *gossipv1.GossipMessage_SignedHeartbeat:
  320. s := m.SignedHeartbeat
  321. gs := gst.Get()
  322. if gs == nil {
  323. // No valid guardian set yet - dropping heartbeat
  324. logger.Debug("skipping heartbeat - no guardian set",
  325. zap.Any("value", s),
  326. zap.String("from", envelope.GetFrom().String()))
  327. break
  328. }
  329. if heartbeat, err := processSignedHeartbeat(envelope.GetFrom(), s, gs, gst, false); err != nil {
  330. p2pMessagesReceived.WithLabelValues("invalid_heartbeat").Inc()
  331. logger.Debug("invalid signed heartbeat received",
  332. zap.Error(err),
  333. zap.Any("payload", msg.Message),
  334. zap.Any("value", s),
  335. zap.Binary("raw", envelope.Data),
  336. zap.String("from", envelope.GetFrom().String()))
  337. } else {
  338. p2pMessagesReceived.WithLabelValues("valid_heartbeat").Inc()
  339. logger.Debug("valid signed heartbeat received",
  340. zap.Any("value", heartbeat),
  341. zap.String("from", envelope.GetFrom().String()))
  342. }
  343. case *gossipv1.GossipMessage_SignedObservation:
  344. obsvC <- m.SignedObservation
  345. p2pMessagesReceived.WithLabelValues("observation").Inc()
  346. case *gossipv1.GossipMessage_SignedObservationRequest:
  347. s := m.SignedObservationRequest
  348. gs := gst.Get()
  349. if gs == nil {
  350. logger.Debug("dropping SignedObservationRequest - no guardian set",
  351. zap.Any("value", s),
  352. zap.String("from", envelope.GetFrom().String()))
  353. break
  354. }
  355. r, err := processSignedObservationRequest(s, gs)
  356. if err != nil {
  357. p2pMessagesReceived.WithLabelValues("invalid_signed_observation_request").Inc()
  358. logger.Debug("invalid signed observation request received",
  359. zap.Error(err),
  360. zap.Any("payload", msg.Message),
  361. zap.Any("value", s),
  362. zap.Binary("raw", envelope.Data),
  363. zap.String("from", envelope.GetFrom().String()))
  364. } else {
  365. p2pMessagesReceived.WithLabelValues("signed_observation_request").Inc()
  366. logger.Info("valid signed observation request received",
  367. zap.Any("value", r),
  368. zap.String("from", envelope.GetFrom().String()))
  369. obsvReqC <- r
  370. }
  371. default:
  372. p2pMessagesReceived.WithLabelValues("unknown").Inc()
  373. logger.Warn("received unknown message type (running outdated software?)",
  374. zap.Any("payload", msg.Message),
  375. zap.Binary("raw", envelope.Data),
  376. zap.String("from", envelope.GetFrom().String()))
  377. }
  378. }
  379. }
  380. }
  381. func processSignedHeartbeat(from peer.ID, s *gossipv1.SignedHeartbeat, gs *node_common.GuardianSet, gst *node_common.GuardianSetState, disableVerify bool) (*gossipv1.Heartbeat, error) {
  382. envelopeAddr := common.BytesToAddress(s.GuardianAddr)
  383. idx, ok := gs.KeyIndex(envelopeAddr)
  384. var pk common.Address
  385. if !ok {
  386. if !disableVerify {
  387. return nil, fmt.Errorf("invalid message: %s not in guardian set", envelopeAddr)
  388. }
  389. } else {
  390. pk = gs.Keys[idx]
  391. }
  392. digest := heartbeatDigest(s.Heartbeat)
  393. pubKey, err := ethcrypto.Ecrecover(digest.Bytes(), s.Signature)
  394. if err != nil {
  395. return nil, errors.New("failed to recover public key")
  396. }
  397. signerAddr := common.BytesToAddress(ethcrypto.Keccak256(pubKey[1:])[12:])
  398. if pk != signerAddr && !disableVerify {
  399. return nil, fmt.Errorf("invalid signer: %v", signerAddr)
  400. }
  401. var h gossipv1.Heartbeat
  402. err = proto.Unmarshal(s.Heartbeat, &h)
  403. if err != nil {
  404. return nil, fmt.Errorf("failed to unmarshal heartbeat: %w", err)
  405. }
  406. // Store verified heartbeat in global guardian set state.
  407. if err := gst.SetHeartbeat(signerAddr, from, &h); err != nil {
  408. return nil, fmt.Errorf("failed to store in guardian set state: %w", err)
  409. }
  410. collectNodeMetrics(signerAddr, from, &h)
  411. return &h, nil
  412. }
  413. func processSignedObservationRequest(s *gossipv1.SignedObservationRequest, gs *node_common.GuardianSet) (*gossipv1.ObservationRequest, error) {
  414. envelopeAddr := common.BytesToAddress(s.GuardianAddr)
  415. idx, ok := gs.KeyIndex(envelopeAddr)
  416. var pk common.Address
  417. if !ok {
  418. return nil, fmt.Errorf("invalid message: %s not in guardian set", envelopeAddr)
  419. } else {
  420. pk = gs.Keys[idx]
  421. }
  422. digest := signedObservationRequestDigest(s.ObservationRequest)
  423. pubKey, err := ethcrypto.Ecrecover(digest.Bytes(), s.Signature)
  424. if err != nil {
  425. return nil, errors.New("failed to recover public key")
  426. }
  427. signerAddr := common.BytesToAddress(ethcrypto.Keccak256(pubKey[1:])[12:])
  428. if pk != signerAddr {
  429. return nil, fmt.Errorf("invalid signer: %v", signerAddr)
  430. }
  431. var h gossipv1.ObservationRequest
  432. err = proto.Unmarshal(s.ObservationRequest, &h)
  433. if err != nil {
  434. return nil, fmt.Errorf("failed to unmarshal observation request: %w", err)
  435. }
  436. // TODO: implement per-guardian rate limiting
  437. if h.ChainId != vaa.ChainIDEthereum {
  438. return nil, fmt.Errorf("invalid chain id: %v", h.ChainId)
  439. }
  440. return &h, nil
  441. }