p2p.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. package p2p
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/certusone/wormhole/node/pkg/version"
  6. "github.com/prometheus/client_golang/prometheus"
  7. "strings"
  8. "time"
  9. "github.com/libp2p/go-libp2p-core/peer"
  10. "github.com/multiformats/go-multiaddr"
  11. "github.com/libp2p/go-libp2p"
  12. connmgr "github.com/libp2p/go-libp2p-connmgr"
  13. "github.com/libp2p/go-libp2p-core/crypto"
  14. "github.com/libp2p/go-libp2p-core/host"
  15. "github.com/libp2p/go-libp2p-core/protocol"
  16. "github.com/libp2p/go-libp2p-core/routing"
  17. dht "github.com/libp2p/go-libp2p-kad-dht"
  18. pubsub "github.com/libp2p/go-libp2p-pubsub"
  19. libp2pquic "github.com/libp2p/go-libp2p-quic-transport"
  20. libp2ptls "github.com/libp2p/go-libp2p-tls"
  21. "go.uber.org/zap"
  22. "google.golang.org/protobuf/proto"
  23. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  24. "github.com/certusone/wormhole/node/pkg/supervisor"
  25. )
  26. var (
  27. p2pHeartbeatsSent = prometheus.NewCounter(
  28. prometheus.CounterOpts{
  29. Name: "wormhole_p2p_heartbeats_sent_total",
  30. Help: "Total number of p2p heartbeats sent",
  31. })
  32. p2pMessagesSent = prometheus.NewCounter(
  33. prometheus.CounterOpts{
  34. Name: "wormhole_p2p_broadcast_messages_sent_total",
  35. Help: "Total number of p2p pubsub broadcast messages sent",
  36. })
  37. p2pMessagesReceived = prometheus.NewCounterVec(
  38. prometheus.CounterOpts{
  39. Name: "wormhole_p2p_broadcast_messages_received_total",
  40. Help: "Total number of p2p pubsub broadcast messages received",
  41. }, []string{"type"})
  42. )
  43. func init() {
  44. prometheus.MustRegister(p2pHeartbeatsSent)
  45. prometheus.MustRegister(p2pMessagesSent)
  46. prometheus.MustRegister(p2pMessagesReceived)
  47. }
  48. func Run(obsvC chan *gossipv1.SignedObservation,
  49. sendC chan []byte,
  50. priv crypto.PrivKey,
  51. port uint,
  52. networkID string,
  53. bootstrapPeers string,
  54. nodeName string,
  55. rootCtxCancel context.CancelFunc) func(ctx context.Context) error {
  56. return func(ctx context.Context) (re error) {
  57. logger := supervisor.Logger(ctx)
  58. h, err := libp2p.New(ctx,
  59. // Use the keypair we generated
  60. libp2p.Identity(priv),
  61. // Multiple listen addresses
  62. libp2p.ListenAddrStrings(
  63. // Listen on QUIC only.
  64. // https://github.com/libp2p/go-libp2p/issues/688
  65. fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic", port),
  66. fmt.Sprintf("/ip6/::/udp/%d/quic", port),
  67. ),
  68. // Enable TLS security as the only security protocol.
  69. libp2p.Security(libp2ptls.ID, libp2ptls.New),
  70. // Enable QUIC transport as the only transport.
  71. libp2p.Transport(libp2pquic.NewTransport),
  72. // Let's prevent our peer from having too many
  73. // connections by attaching a connection manager.
  74. libp2p.ConnectionManager(connmgr.NewConnManager(
  75. 100, // Lowwater
  76. 400, // HighWater,
  77. time.Minute, // GracePeriod
  78. )),
  79. // Let this host use the DHT to find other hosts
  80. libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
  81. // TODO(leo): Persistent data store (i.e. address book)
  82. idht, err := dht.New(ctx, h, dht.Mode(dht.ModeServer),
  83. // TODO(leo): This intentionally makes us incompatible with the global IPFS DHT
  84. dht.ProtocolPrefix(protocol.ID("/"+networkID)),
  85. )
  86. return idht, err
  87. }),
  88. )
  89. if err != nil {
  90. panic(err)
  91. }
  92. defer func() {
  93. // TODO: libp2p cannot be cleanly restarted (https://github.com/libp2p/go-libp2p/issues/992)
  94. logger.Error("p2p routine has exited, cancelling root context...", zap.Error(re))
  95. rootCtxCancel()
  96. }()
  97. logger.Info("Connecting to bootstrap peers", zap.String("bootstrap_peers", bootstrapPeers))
  98. topic := fmt.Sprintf("%s/%s", networkID, "broadcast")
  99. logger.Info("Subscribing pubsub topic", zap.String("topic", topic))
  100. ps, err := pubsub.NewGossipSub(ctx, h)
  101. if err != nil {
  102. panic(err)
  103. }
  104. th, err := ps.Join(topic)
  105. if err != nil {
  106. return fmt.Errorf("failed to join topic: %w", err)
  107. }
  108. sub, err := th.Subscribe()
  109. if err != nil {
  110. return fmt.Errorf("failed to subscribe topic: %w", err)
  111. }
  112. // Add our own bootstrap nodes
  113. // Count number of successful connection attempts. If we fail to connect to any bootstrap peer, kill
  114. // the service and have supervisor retry it.
  115. successes := 0
  116. // Are we a bootstrap node? If so, it's okay to not have any peers.
  117. bootstrapNode := false
  118. for _, addr := range strings.Split(bootstrapPeers, ",") {
  119. if addr == "" {
  120. continue
  121. }
  122. ma, err := multiaddr.NewMultiaddr(addr)
  123. if err != nil {
  124. logger.Error("Invalid bootstrap address", zap.String("peer", addr), zap.Error(err))
  125. continue
  126. }
  127. pi, err := peer.AddrInfoFromP2pAddr(ma)
  128. if err != nil {
  129. logger.Error("Invalid bootstrap address", zap.String("peer", addr), zap.Error(err))
  130. continue
  131. }
  132. if pi.ID == h.ID() {
  133. logger.Info("We're a bootstrap node")
  134. bootstrapNode = true
  135. continue
  136. }
  137. if err = h.Connect(ctx, *pi); err != nil {
  138. logger.Error("Failed to connect to bootstrap peer", zap.String("peer", addr), zap.Error(err))
  139. } else {
  140. successes += 1
  141. }
  142. }
  143. // TODO: continually reconnect to bootstrap nodes?
  144. if successes == 0 && !bootstrapNode {
  145. return fmt.Errorf("failed to connect to any bootstrap peer")
  146. } else {
  147. logger.Info("Connected to bootstrap peers", zap.Int("num", successes))
  148. }
  149. logger.Info("Node has been started", zap.String("peer_id", h.ID().String()),
  150. zap.String("addrs", fmt.Sprintf("%v", h.Addrs())))
  151. go func() {
  152. ctr := int64(0)
  153. tick := time.NewTicker(15 * time.Second)
  154. defer tick.Stop()
  155. for {
  156. select {
  157. case <-ctx.Done():
  158. return
  159. case <-tick.C:
  160. DefaultRegistry.mu.Lock()
  161. networks := make([]*gossipv1.Heartbeat_Network, 0, len(DefaultRegistry.networkStats))
  162. for _, v := range DefaultRegistry.networkStats {
  163. networks = append(networks, v)
  164. }
  165. msg := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_Heartbeat{
  166. Heartbeat: &gossipv1.Heartbeat{
  167. NodeName: nodeName,
  168. Counter: ctr,
  169. Timestamp: time.Now().UnixNano(),
  170. Networks: networks,
  171. Version: version.Version(),
  172. GuardianAddr: DefaultRegistry.guardianAddress,
  173. }}}
  174. b, err := proto.Marshal(&msg)
  175. if err != nil {
  176. panic(err)
  177. }
  178. DefaultRegistry.mu.Unlock()
  179. err = th.Publish(ctx, b)
  180. if err != nil {
  181. logger.Warn("failed to publish heartbeat message", zap.Error(err))
  182. }
  183. p2pHeartbeatsSent.Inc()
  184. ctr += 1
  185. }
  186. }
  187. }()
  188. go func() {
  189. for {
  190. select {
  191. case <-ctx.Done():
  192. return
  193. case msg := <-sendC:
  194. err := th.Publish(ctx, msg)
  195. p2pMessagesSent.Inc()
  196. if err != nil {
  197. logger.Error("failed to publish message from queue", zap.Error(err))
  198. }
  199. }
  200. }
  201. }()
  202. for {
  203. envelope, err := sub.Next(ctx)
  204. if err != nil {
  205. return fmt.Errorf("failed to receive pubsub message: %w", err)
  206. }
  207. var msg gossipv1.GossipMessage
  208. err = proto.Unmarshal(envelope.Data, &msg)
  209. if err != nil {
  210. logger.Info("received invalid message",
  211. zap.String("data", string(envelope.Data)),
  212. zap.String("from", envelope.GetFrom().String()))
  213. p2pMessagesReceived.WithLabelValues("invalid").Inc()
  214. continue
  215. }
  216. if envelope.GetFrom() == h.ID() {
  217. logger.Debug("received message from ourselves, ignoring",
  218. zap.Any("payload", msg.Message))
  219. p2pMessagesReceived.WithLabelValues("loopback").Inc()
  220. continue
  221. }
  222. logger.Debug("received message",
  223. zap.Any("payload", msg.Message),
  224. zap.Binary("raw", envelope.Data),
  225. zap.String("from", envelope.GetFrom().String()))
  226. switch m := msg.Message.(type) {
  227. case *gossipv1.GossipMessage_Heartbeat:
  228. logger.Debug("heartbeat received",
  229. zap.Any("value", m.Heartbeat),
  230. zap.String("from", envelope.GetFrom().String()))
  231. p2pMessagesReceived.WithLabelValues("heartbeat").Inc()
  232. case *gossipv1.GossipMessage_SignedObservation:
  233. obsvC <- m.SignedObservation
  234. p2pMessagesReceived.WithLabelValues("observation").Inc()
  235. default:
  236. p2pMessagesReceived.WithLabelValues("unknown").Inc()
  237. logger.Warn("received unknown message type (running outdated software?)",
  238. zap.Any("payload", msg.Message),
  239. zap.Binary("raw", envelope.Data),
  240. zap.String("from", envelope.GetFrom().String()))
  241. }
  242. }
  243. }
  244. }