p2p.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. package ccq
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/hex"
  6. "errors"
  7. "fmt"
  8. "net/http"
  9. "time"
  10. "github.com/certusone/wormhole/node/pkg/p2p"
  11. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  12. "github.com/certusone/wormhole/node/pkg/query"
  13. ethCommon "github.com/ethereum/go-ethereum/common"
  14. ethCrypto "github.com/ethereum/go-ethereum/crypto"
  15. pubsub "github.com/libp2p/go-libp2p-pubsub"
  16. "github.com/libp2p/go-libp2p/core/crypto"
  17. "github.com/libp2p/go-libp2p/core/host"
  18. "github.com/wormhole-foundation/wormhole/sdk/vaa"
  19. "go.uber.org/zap"
  20. "google.golang.org/protobuf/proto"
  21. )
  22. type GuardianSignature struct {
  23. Index int
  24. Signature string
  25. }
  26. type SignedResponse struct {
  27. Response *query.QueryResponsePublication
  28. Signatures []GuardianSignature
  29. }
  30. type P2PSub struct {
  31. sub *pubsub.Subscription
  32. topic_req *pubsub.Topic
  33. topic_resp *pubsub.Topic
  34. host host.Host
  35. }
  36. func runP2P(
  37. ctx context.Context,
  38. priv crypto.PrivKey,
  39. port uint,
  40. networkID string,
  41. bootstrapPeers string,
  42. ethRpcUrl string,
  43. ethCoreAddr string,
  44. pendingResponses *PendingResponses,
  45. logger *zap.Logger,
  46. monitorPeers bool,
  47. loggingMap *LoggingMap,
  48. gossipAdvertiseAddress string,
  49. ) (*P2PSub, error) {
  50. // p2p setup
  51. components := p2p.DefaultComponents()
  52. components.Port = port
  53. components.GossipAdvertiseAddress = gossipAdvertiseAddress
  54. h, err := p2p.NewHost(logger, ctx, networkID, bootstrapPeers, components, priv)
  55. if err != nil {
  56. return nil, err
  57. }
  58. topic_req := fmt.Sprintf("%s/%s", networkID, "ccq_req")
  59. topic_resp := fmt.Sprintf("%s/%s", networkID, "ccq_resp")
  60. logger.Info("Subscribing pubsub topic", zap.String("topic_req", topic_req), zap.String("topic_resp", topic_resp))
  61. // Comment from security team in PR #2981: CCQServers should have a parameter of D = 36, Dlo = 19, Dhi = 40, Dout = 18 such that they can reach all Guardians directly.
  62. gossipParams := pubsub.DefaultGossipSubParams()
  63. gossipParams.D = 36
  64. gossipParams.Dlo = 19
  65. gossipParams.Dhi = 40
  66. gossipParams.Dout = 18
  67. ps, err := pubsub.NewGossipSub(ctx, h, pubsub.WithGossipSubParams(gossipParams))
  68. if err != nil {
  69. logger.Error("failed to create gossip subscription", zap.Error(err))
  70. return nil, err
  71. }
  72. th_req, err := ps.Join(topic_req)
  73. if err != nil {
  74. logger.Error("failed to join request topic", zap.String("topic_req", topic_req), zap.Error(err))
  75. return nil, err
  76. }
  77. th_resp, err := ps.Join(topic_resp)
  78. if err != nil {
  79. logger.Error("failed to join response topic", zap.String("topic_resp", topic_resp), zap.Error(err))
  80. return nil, err
  81. }
  82. sub, err := th_resp.Subscribe()
  83. if err != nil {
  84. logger.Error("failed to subscribe to response topic", zap.Error(err))
  85. return nil, err
  86. }
  87. logger.Info("Node has been started", zap.String("peer_id", h.ID().String()),
  88. zap.String("addrs", fmt.Sprintf("%v", h.Addrs())))
  89. bootstrappers, _ := p2p.BootstrapAddrs(logger, bootstrapPeers, h.ID())
  90. successes := p2p.ConnectToPeers(ctx, logger, h, bootstrappers)
  91. logger.Info("Connected to bootstrap peers", zap.Int("num", successes))
  92. // Wait for peers
  93. for len(th_req.ListPeers()) < 1 {
  94. time.Sleep(time.Millisecond * 100)
  95. }
  96. logger.Info("Found peers", zap.Int("numPeers", len(th_req.ListPeers())))
  97. if monitorPeers {
  98. logger.Info("Will monitor for missing peers once per minute.")
  99. go func() {
  100. t := time.NewTicker(time.Minute)
  101. for {
  102. select {
  103. case <-ctx.Done():
  104. logger.Info("Context cancelled, exiting peer monitoring.")
  105. case <-t.C:
  106. peers := th_req.ListPeers()
  107. logger.Info("current peers", zap.Int("numPeers", len(peers)), zap.Any("peers", peers))
  108. peerMap := map[string]struct{}{}
  109. for _, peer := range peers {
  110. peerMap[peer.String()] = struct{}{}
  111. }
  112. for _, p := range bootstrappers {
  113. if _, exists := peerMap[p.ID.String()]; !exists {
  114. logger.Info("attempting to reconnect to peer", zap.String("peer", p.ID.String()))
  115. if err := h.Connect(ctx, p); err != nil {
  116. logger.Error("failed to reconnect to peer", zap.String("peer", p.ID.String()), zap.Error(err))
  117. } else {
  118. logger.Info("Reconnected to peer", zap.String("peer", p.ID.String()))
  119. peerMap[p.ID.String()] = struct{}{}
  120. successfulReconnects.Inc()
  121. }
  122. }
  123. }
  124. }
  125. }
  126. }()
  127. }
  128. // Fetch the initial current guardian set
  129. guardianSet, err := FetchCurrentGuardianSet(ethRpcUrl, ethCoreAddr)
  130. if err != nil {
  131. logger.Fatal("Failed to fetch current guardian set", zap.Error(err))
  132. }
  133. quorum := vaa.CalculateQuorum(len(guardianSet.Keys))
  134. // Listen to the p2p network for query responses
  135. go func() {
  136. // Maps the request signature to a map of response digests which maps to a list of guardian signatures.
  137. // A request could have responses with different digests, because the guardians could have
  138. // different results returned for the query in the event of a rollback.
  139. responses := make(map[string]map[ethCommon.Hash][]GuardianSignature)
  140. for {
  141. envelope, err := sub.Next(ctx)
  142. if err != nil {
  143. logger.Error("Failed to read next pubsub message", zap.Error(err))
  144. return
  145. }
  146. var msg gossipv1.GossipMessage
  147. err = proto.Unmarshal(envelope.Data, &msg)
  148. if err != nil {
  149. logger.Error("received invalid message", zap.Binary("data", envelope.Data), zap.String("from", envelope.GetFrom().String()))
  150. inboundP2pError.WithLabelValues("failed_to_unmarshal_gossip_msg").Inc()
  151. continue
  152. }
  153. switch m := msg.Message.(type) {
  154. case *gossipv1.GossipMessage_SignedQueryResponse:
  155. logger.Debug("query response received", zap.Any("response", m.SignedQueryResponse))
  156. peerId := envelope.GetFrom().String()
  157. queryResponsesReceived.WithLabelValues(peerId).Inc()
  158. var queryResponse query.QueryResponsePublication
  159. err := queryResponse.Unmarshal(m.SignedQueryResponse.QueryResponse)
  160. if err != nil {
  161. logger.Error("failed to unmarshal response", zap.Error(err))
  162. inboundP2pError.WithLabelValues("failed_to_unmarshal_response").Inc()
  163. continue
  164. }
  165. for _, pcr := range queryResponse.PerChainResponses {
  166. queryResponsesReceivedByChainAndPeerID.WithLabelValues(pcr.ChainId.String(), peerId).Inc()
  167. }
  168. requestSignature := hex.EncodeToString(queryResponse.Request.Signature)
  169. logger.Info("query response received from gossip", zap.String("peerId", peerId), zap.Any("requestId", requestSignature))
  170. if loggingMap.ShouldLogResponse(requestSignature) {
  171. var queryRequest query.QueryRequest
  172. if err := queryRequest.Unmarshal(queryResponse.Request.QueryRequest); err == nil {
  173. logger.Info("logging response", zap.String("peerId", peerId), zap.Any("requestId", requestSignature), zap.Any("request", queryRequest), zap.Any("response", queryResponse))
  174. } else {
  175. logger.Error("logging response (failed to unmarshal request)", zap.String("peerId", peerId), zap.Any("requestId", requestSignature), zap.Any("response", queryResponse))
  176. }
  177. }
  178. // Check that we're handling the request for this response
  179. pendingResponse := pendingResponses.Get(requestSignature)
  180. if pendingResponse == nil {
  181. // This will happen for responses that come in after quorum is reached.
  182. logger.Debug("skipping query response for unknown request", zap.String("signature", requestSignature))
  183. continue
  184. }
  185. // Make sure that the request bytes match
  186. if !bytes.Equal(queryResponse.Request.QueryRequest, pendingResponse.req.QueryRequest) ||
  187. !bytes.Equal(queryResponse.Request.Signature, pendingResponse.req.Signature) {
  188. continue
  189. }
  190. digest := query.GetQueryResponseDigestFromBytes(m.SignedQueryResponse.QueryResponse)
  191. signerBytes, err := ethCrypto.Ecrecover(digest.Bytes(), m.SignedQueryResponse.Signature)
  192. if err != nil {
  193. logger.Error("failed to verify signature on response",
  194. zap.String("digest", digest.Hex()),
  195. zap.String("signature", hex.EncodeToString(m.SignedQueryResponse.Signature)),
  196. zap.Error(err))
  197. inboundP2pError.WithLabelValues("failed_to_verify_signature").Inc()
  198. continue
  199. }
  200. signerAddress := ethCommon.BytesToAddress(ethCrypto.Keccak256(signerBytes[1:])[12:])
  201. keyIdx, hasKeyIdx := guardianSet.KeyIndex(signerAddress)
  202. if hasKeyIdx {
  203. if _, ok := responses[requestSignature]; !ok {
  204. responses[requestSignature] = make(map[ethCommon.Hash][]GuardianSignature)
  205. }
  206. found := false
  207. for _, gs := range responses[requestSignature][digest] {
  208. if gs.Index == keyIdx {
  209. found = true
  210. break
  211. }
  212. }
  213. if found {
  214. // Already handled the response from this guardian
  215. continue
  216. }
  217. responses[requestSignature][digest] = append(responses[requestSignature][digest], GuardianSignature{
  218. Index: keyIdx,
  219. Signature: hex.EncodeToString(m.SignedQueryResponse.Signature),
  220. })
  221. // quorum is reached when a super-majority of guardians have signed a response with the same digest
  222. numSigners := len(responses[requestSignature][digest])
  223. if numSigners >= quorum {
  224. s := &SignedResponse{
  225. Response: &queryResponse,
  226. Signatures: responses[requestSignature][digest],
  227. }
  228. delete(responses, requestSignature)
  229. select {
  230. case pendingResponse.ch <- s:
  231. logger.Info("quorum reached, forwarded query response",
  232. zap.String("peerId", peerId),
  233. zap.String("userId", pendingResponse.userName),
  234. zap.Any("requestId", requestSignature),
  235. zap.Int("numSigners", numSigners),
  236. zap.Int("quorum", quorum),
  237. )
  238. default:
  239. logger.Error("failed to write query response to channel, dropping it", zap.String("peerId", peerId), zap.Any("requestId", requestSignature))
  240. // Leave the request in the pending map. It will get cleaned up if it times out.
  241. }
  242. } else {
  243. // Proxy should return early if quorum is no longer possible - i.e maxMatchingResponses + outstandingResponses < quorum
  244. var totalSigners, maxMatchingResponses int
  245. for _, signers := range responses[requestSignature] {
  246. totalSigners += len(signers)
  247. if len(signers) > maxMatchingResponses {
  248. maxMatchingResponses = len(signers)
  249. }
  250. }
  251. outstandingResponses := len(guardianSet.Keys) - totalSigners
  252. if maxMatchingResponses+outstandingResponses < quorum {
  253. quorumNotMetByUser.WithLabelValues(pendingResponse.userName).Inc()
  254. failedQueriesByUser.WithLabelValues(pendingResponse.userName).Inc()
  255. delete(responses, requestSignature)
  256. select {
  257. case pendingResponse.errCh <- &ErrorEntry{err: errors.New("quorum not met"), status: http.StatusBadRequest}:
  258. logger.Info("query failed, quorum not met",
  259. zap.String("peerId", peerId),
  260. zap.String("userId", pendingResponse.userName),
  261. zap.Any("requestId", requestSignature),
  262. zap.Int("numSigners", numSigners),
  263. zap.Int("maxMatchingResponses", maxMatchingResponses),
  264. zap.Int("outstandingResponses", outstandingResponses),
  265. zap.Int("quorum", quorum),
  266. )
  267. default:
  268. logger.Error("failed to write query error response to channel, dropping it", zap.String("peerId", peerId), zap.Any("requestId", requestSignature))
  269. // Leave the request in the pending map. It will get cleaned up if it times out.
  270. }
  271. } else {
  272. logger.Info("waiting for more query responses",
  273. zap.String("peerId", peerId),
  274. zap.String("userId", pendingResponse.userName),
  275. zap.Any("requestId", requestSignature),
  276. zap.Int("numSigners", numSigners),
  277. zap.Int("maxMatchingResponses", maxMatchingResponses),
  278. zap.Int("outstandingResponses", outstandingResponses),
  279. zap.Int("quorum", quorum),
  280. )
  281. }
  282. }
  283. } else {
  284. logger.Warn("received observation by unknown guardian - is our guardian set outdated?",
  285. zap.String("digest", digest.Hex()), zap.String("address", signerAddress.Hex()),
  286. )
  287. inboundP2pError.WithLabelValues("unknown_guardian").Inc()
  288. }
  289. default:
  290. // Since CCQ gossip is isolated, this really shouldn't happen.
  291. logger.Debug("unexpected gossip message type", zap.Any("msg", m))
  292. inboundP2pError.WithLabelValues("unexpected_gossip_msg_type").Inc()
  293. }
  294. }
  295. }()
  296. return &P2PSub{
  297. sub: sub,
  298. topic_req: th_req,
  299. topic_resp: th_resp,
  300. host: h,
  301. }, nil
  302. }