p2p.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. package ccq
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/hex"
  6. "errors"
  7. "fmt"
  8. "net/http"
  9. "time"
  10. "github.com/certusone/wormhole/node/pkg/p2p"
  11. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  12. "github.com/certusone/wormhole/node/pkg/query"
  13. ethCommon "github.com/ethereum/go-ethereum/common"
  14. ethCrypto "github.com/ethereum/go-ethereum/crypto"
  15. pubsub "github.com/libp2p/go-libp2p-pubsub"
  16. "github.com/libp2p/go-libp2p/core/crypto"
  17. "github.com/libp2p/go-libp2p/core/host"
  18. "github.com/libp2p/go-libp2p/core/peer"
  19. "github.com/wormhole-foundation/wormhole/sdk/vaa"
  20. "go.uber.org/zap"
  21. "google.golang.org/protobuf/proto"
  22. )
  23. type GuardianSignature struct {
  24. Index int
  25. Signature string
  26. }
  27. type SignedResponse struct {
  28. Response *query.QueryResponsePublication
  29. Signatures []GuardianSignature
  30. }
  31. type P2PSub struct {
  32. sub *pubsub.Subscription
  33. topic_req *pubsub.Topic
  34. topic_resp *pubsub.Topic
  35. host host.Host
  36. }
  37. func runP2P(
  38. ctx context.Context,
  39. priv crypto.PrivKey,
  40. port uint,
  41. networkID string,
  42. bootstrapPeers string,
  43. ethRpcUrl string,
  44. ethCoreAddr string,
  45. pendingResponses *PendingResponses,
  46. logger *zap.Logger,
  47. monitorPeers bool,
  48. loggingMap *LoggingMap,
  49. gossipAdvertiseAddress string,
  50. protectedPeers []string,
  51. ) (*P2PSub, error) {
  52. // p2p setup
  53. components := p2p.DefaultComponents()
  54. components.Port = port
  55. components.GossipAdvertiseAddress = gossipAdvertiseAddress
  56. h, err := p2p.NewHost(logger, ctx, networkID, bootstrapPeers, components, priv)
  57. if err != nil {
  58. return nil, err
  59. }
  60. if len(protectedPeers) != 0 {
  61. for _, peerId := range protectedPeers {
  62. components.ConnMgr.Protect(peer.ID(peerId), "configured")
  63. }
  64. }
  65. topic_req := fmt.Sprintf("%s/%s", networkID, "ccq_req")
  66. topic_resp := fmt.Sprintf("%s/%s", networkID, "ccq_resp")
  67. logger.Info("Subscribing pubsub topic", zap.String("topic_req", topic_req), zap.String("topic_resp", topic_resp))
  68. // Comment from security team in PR #2981: CCQServers should have a parameter of D = 36, Dlo = 19, Dhi = 40, Dout = 18 such that they can reach all Guardians directly.
  69. gossipParams := pubsub.DefaultGossipSubParams()
  70. gossipParams.D = 36
  71. gossipParams.Dlo = 19
  72. gossipParams.Dhi = 40
  73. gossipParams.Dout = 18
  74. ps, err := pubsub.NewGossipSub(ctx, h, pubsub.WithGossipSubParams(gossipParams))
  75. if err != nil {
  76. logger.Error("failed to create gossip subscription", zap.Error(err))
  77. return nil, err
  78. }
  79. th_req, err := ps.Join(topic_req)
  80. if err != nil {
  81. logger.Error("failed to join request topic", zap.String("topic_req", topic_req), zap.Error(err))
  82. return nil, err
  83. }
  84. th_resp, err := ps.Join(topic_resp)
  85. if err != nil {
  86. logger.Error("failed to join response topic", zap.String("topic_resp", topic_resp), zap.Error(err))
  87. return nil, err
  88. }
  89. sub, err := th_resp.Subscribe()
  90. if err != nil {
  91. logger.Error("failed to subscribe to response topic", zap.Error(err))
  92. return nil, err
  93. }
  94. logger.Info("Node has been started", zap.String("peer_id", h.ID().String()),
  95. zap.String("addrs", fmt.Sprintf("%v", h.Addrs())))
  96. bootstrappers, _ := p2p.BootstrapAddrs(logger, bootstrapPeers, h.ID())
  97. successes := p2p.ConnectToPeers(ctx, logger, h, bootstrappers)
  98. logger.Info("Connected to bootstrap peers", zap.Int("num", successes))
  99. // Wait for peers
  100. for len(th_req.ListPeers()) < 1 {
  101. time.Sleep(time.Millisecond * 100)
  102. }
  103. logger.Info("Found peers", zap.Int("numPeers", len(th_req.ListPeers())))
  104. if monitorPeers {
  105. logger.Info("Will monitor for missing peers once per minute.")
  106. go func() {
  107. t := time.NewTicker(time.Minute)
  108. for {
  109. select {
  110. case <-ctx.Done():
  111. logger.Info("Context cancelled, exiting peer monitoring.")
  112. case <-t.C:
  113. peers := th_req.ListPeers()
  114. logger.Info("current peers", zap.Int("numPeers", len(peers)), zap.Any("peers", peers))
  115. peerMap := map[string]struct{}{}
  116. for _, peer := range peers {
  117. peerMap[peer.String()] = struct{}{}
  118. }
  119. for _, p := range bootstrappers {
  120. if _, exists := peerMap[p.ID.String()]; !exists {
  121. logger.Info("attempting to reconnect to peer", zap.String("peer", p.ID.String()))
  122. if err := h.Connect(ctx, p); err != nil {
  123. logger.Error("failed to reconnect to peer", zap.String("peer", p.ID.String()), zap.Error(err))
  124. } else {
  125. logger.Info("Reconnected to peer", zap.String("peer", p.ID.String()))
  126. peerMap[p.ID.String()] = struct{}{}
  127. successfulReconnects.Inc()
  128. }
  129. }
  130. }
  131. }
  132. }
  133. }()
  134. }
  135. // Fetch the initial current guardian set
  136. guardianSet, err := FetchCurrentGuardianSet(ctx, ethRpcUrl, ethCoreAddr)
  137. if err != nil {
  138. logger.Fatal("Failed to fetch current guardian set", zap.Error(err))
  139. }
  140. quorum := vaa.CalculateQuorum(len(guardianSet.Keys))
  141. // Listen to the p2p network for query responses
  142. go func() {
  143. // Maps the request signature to a map of response digests which maps to a list of guardian signatures.
  144. // A request could have responses with different digests, because the guardians could have
  145. // different results returned for the query in the event of a rollback.
  146. responses := make(map[string]map[ethCommon.Hash][]GuardianSignature)
  147. for {
  148. envelope, err := sub.Next(ctx)
  149. if err != nil {
  150. logger.Error("Failed to read next pubsub message", zap.Error(err))
  151. return
  152. }
  153. var msg gossipv1.GossipMessage
  154. err = proto.Unmarshal(envelope.Data, &msg)
  155. if err != nil {
  156. logger.Error("received invalid message", zap.Binary("data", envelope.Data), zap.String("from", envelope.GetFrom().String()))
  157. inboundP2pError.WithLabelValues("failed_to_unmarshal_gossip_msg").Inc()
  158. continue
  159. }
  160. switch m := msg.Message.(type) {
  161. case *gossipv1.GossipMessage_SignedQueryResponse:
  162. logger.Debug("query response received", zap.Any("response", m.SignedQueryResponse))
  163. peerId := envelope.GetFrom().String()
  164. queryResponsesReceived.WithLabelValues(peerId).Inc()
  165. var queryResponse query.QueryResponsePublication
  166. err := queryResponse.Unmarshal(m.SignedQueryResponse.QueryResponse)
  167. if err != nil {
  168. logger.Error("failed to unmarshal response", zap.Error(err))
  169. inboundP2pError.WithLabelValues("failed_to_unmarshal_response").Inc()
  170. continue
  171. }
  172. for _, pcr := range queryResponse.PerChainResponses {
  173. queryResponsesReceivedByChainAndPeerID.WithLabelValues(pcr.ChainId.String(), peerId).Inc()
  174. }
  175. requestSignature := hex.EncodeToString(queryResponse.Request.Signature)
  176. logger.Info("query response received from gossip", zap.String("peerId", peerId), zap.Any("requestId", requestSignature))
  177. if loggingMap.ShouldLogResponse(requestSignature) {
  178. var queryRequest query.QueryRequest
  179. if err := queryRequest.Unmarshal(queryResponse.Request.QueryRequest); err == nil {
  180. logger.Info("logging response", zap.String("peerId", peerId), zap.Any("requestId", requestSignature), zap.Any("request", queryRequest), zap.Any("response", queryResponse))
  181. } else {
  182. logger.Error("logging response (failed to unmarshal request)", zap.String("peerId", peerId), zap.Any("requestId", requestSignature), zap.Any("response", queryResponse))
  183. }
  184. }
  185. // Check that we're handling the request for this response
  186. pendingResponse := pendingResponses.Get(requestSignature)
  187. if pendingResponse == nil {
  188. // This will happen for responses that come in after quorum is reached.
  189. logger.Debug("skipping query response for unknown request", zap.String("signature", requestSignature))
  190. continue
  191. }
  192. // Make sure that the request bytes match
  193. if !bytes.Equal(queryResponse.Request.QueryRequest, pendingResponse.req.QueryRequest) ||
  194. !bytes.Equal(queryResponse.Request.Signature, pendingResponse.req.Signature) {
  195. continue
  196. }
  197. digest := query.GetQueryResponseDigestFromBytes(m.SignedQueryResponse.QueryResponse)
  198. signerBytes, err := ethCrypto.Ecrecover(digest.Bytes(), m.SignedQueryResponse.Signature)
  199. if err != nil {
  200. logger.Error("failed to verify signature on response",
  201. zap.String("digest", digest.Hex()),
  202. zap.String("signature", hex.EncodeToString(m.SignedQueryResponse.Signature)),
  203. zap.Error(err))
  204. inboundP2pError.WithLabelValues("failed_to_verify_signature").Inc()
  205. continue
  206. }
  207. signerAddress := ethCommon.BytesToAddress(ethCrypto.Keccak256(signerBytes[1:])[12:])
  208. keyIdx, hasKeyIdx := guardianSet.KeyIndex(signerAddress)
  209. if hasKeyIdx {
  210. if _, ok := responses[requestSignature]; !ok {
  211. responses[requestSignature] = make(map[ethCommon.Hash][]GuardianSignature)
  212. }
  213. found := false
  214. for _, gs := range responses[requestSignature][digest] {
  215. if gs.Index == keyIdx {
  216. found = true
  217. break
  218. }
  219. }
  220. if found {
  221. // Already handled the response from this guardian
  222. continue
  223. }
  224. responses[requestSignature][digest] = append(responses[requestSignature][digest], GuardianSignature{
  225. Index: keyIdx,
  226. Signature: hex.EncodeToString(m.SignedQueryResponse.Signature),
  227. })
  228. // quorum is reached when a super-majority of guardians have signed a response with the same digest
  229. numSigners := len(responses[requestSignature][digest])
  230. if numSigners >= quorum {
  231. s := &SignedResponse{
  232. Response: &queryResponse,
  233. Signatures: responses[requestSignature][digest],
  234. }
  235. delete(responses, requestSignature)
  236. select {
  237. case pendingResponse.ch <- s:
  238. logger.Info("quorum reached, forwarded query response",
  239. zap.String("peerId", peerId),
  240. zap.String("userId", pendingResponse.userName),
  241. zap.Any("requestId", requestSignature),
  242. zap.Int("numSigners", numSigners),
  243. zap.Int("quorum", quorum),
  244. )
  245. default:
  246. logger.Error("failed to write query response to channel, dropping it", zap.String("peerId", peerId), zap.Any("requestId", requestSignature))
  247. // Leave the request in the pending map. It will get cleaned up if it times out.
  248. }
  249. } else {
  250. // Proxy should return early if quorum is no longer possible - i.e maxMatchingResponses + outstandingResponses < quorum
  251. var totalSigners, maxMatchingResponses int
  252. for _, signers := range responses[requestSignature] {
  253. totalSigners += len(signers)
  254. if len(signers) > maxMatchingResponses {
  255. maxMatchingResponses = len(signers)
  256. }
  257. }
  258. outstandingResponses := len(guardianSet.Keys) - totalSigners
  259. pendingResponse.updateStats(maxMatchingResponses, outstandingResponses, quorum)
  260. if maxMatchingResponses+outstandingResponses < quorum {
  261. quorumNotMetByUser.WithLabelValues(pendingResponse.userName).Inc()
  262. failedQueriesByUser.WithLabelValues(pendingResponse.userName).Inc()
  263. delete(responses, requestSignature)
  264. select {
  265. case pendingResponse.errCh <- &ErrorEntry{err: errors.New("quorum not met"), status: http.StatusBadRequest}:
  266. logger.Info("query failed, quorum not met",
  267. zap.String("peerId", peerId),
  268. zap.String("userId", pendingResponse.userName),
  269. zap.Any("requestId", requestSignature),
  270. zap.Int("numSigners", numSigners),
  271. zap.Int("maxMatchingResponses", maxMatchingResponses),
  272. zap.Int("outstandingResponses", outstandingResponses),
  273. zap.Int("quorum", quorum),
  274. )
  275. default:
  276. logger.Error("failed to write query error response to channel, dropping it", zap.String("peerId", peerId), zap.Any("requestId", requestSignature))
  277. // Leave the request in the pending map. It will get cleaned up if it times out.
  278. }
  279. } else {
  280. logger.Info("waiting for more query responses",
  281. zap.String("peerId", peerId),
  282. zap.String("userId", pendingResponse.userName),
  283. zap.Any("requestId", requestSignature),
  284. zap.Int("numSigners", numSigners),
  285. zap.Int("maxMatchingResponses", maxMatchingResponses),
  286. zap.Int("outstandingResponses", outstandingResponses),
  287. zap.Int("quorum", quorum),
  288. )
  289. }
  290. }
  291. } else {
  292. logger.Warn("received observation by unknown guardian - is our guardian set outdated?",
  293. zap.String("digest", digest.Hex()), zap.String("address", signerAddress.Hex()),
  294. )
  295. inboundP2pError.WithLabelValues("unknown_guardian").Inc()
  296. }
  297. default:
  298. // Since CCQ gossip is isolated, this really shouldn't happen.
  299. logger.Debug("unexpected gossip message type", zap.Any("msg", m))
  300. inboundP2pError.WithLabelValues("unexpected_gossip_msg_type").Inc()
  301. }
  302. }
  303. }()
  304. return &P2PSub{
  305. sub: sub,
  306. topic_req: th_req,
  307. topic_resp: th_resp,
  308. host: h,
  309. }, nil
  310. }