adminserver.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398
  1. package guardiand
  2. import (
  3. "context"
  4. "encoding/base64"
  5. "encoding/hex"
  6. "encoding/json"
  7. "errors"
  8. "fmt"
  9. "github.com/certusone/wormhole/node/pkg/db"
  10. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  11. publicrpcv1 "github.com/certusone/wormhole/node/pkg/proto/publicrpc/v1"
  12. "github.com/certusone/wormhole/node/pkg/publicrpc"
  13. ethcommon "github.com/ethereum/go-ethereum/common"
  14. "go.uber.org/zap"
  15. "google.golang.org/grpc/codes"
  16. "google.golang.org/grpc/status"
  17. "math"
  18. "math/rand"
  19. "net"
  20. "net/http"
  21. "os"
  22. "time"
  23. "github.com/certusone/wormhole/node/pkg/common"
  24. nodev1 "github.com/certusone/wormhole/node/pkg/proto/node/v1"
  25. "github.com/certusone/wormhole/node/pkg/supervisor"
  26. "github.com/certusone/wormhole/node/pkg/vaa"
  27. )
  28. type nodePrivilegedService struct {
  29. nodev1.UnimplementedNodePrivilegedServiceServer
  30. db *db.Database
  31. injectC chan<- *vaa.VAA
  32. obsvReqSendC chan *gossipv1.ObservationRequest
  33. logger *zap.Logger
  34. signedInC chan *gossipv1.SignedVAAWithQuorum
  35. }
  36. // adminGuardianSetUpdateToVAA converts a nodev1.GuardianSetUpdate message to its canonical VAA representation.
  37. // Returns an error if the data is invalid.
  38. func adminGuardianSetUpdateToVAA(req *nodev1.GuardianSetUpdate, guardianSetIndex uint32, nonce uint32, sequence uint64) (*vaa.VAA, error) {
  39. if len(req.Guardians) == 0 {
  40. return nil, errors.New("empty guardian set specified")
  41. }
  42. if len(req.Guardians) > common.MaxGuardianCount {
  43. return nil, fmt.Errorf("too many guardians - %d, maximum is %d", len(req.Guardians), common.MaxGuardianCount)
  44. }
  45. addrs := make([]ethcommon.Address, len(req.Guardians))
  46. for i, g := range req.Guardians {
  47. if !ethcommon.IsHexAddress(g.Pubkey) {
  48. return nil, fmt.Errorf("invalid pubkey format at index %d (%s)", i, g.Name)
  49. }
  50. ethAddr := ethcommon.HexToAddress(g.Pubkey)
  51. for j, pk := range addrs {
  52. if pk == ethAddr {
  53. return nil, fmt.Errorf("duplicate pubkey at index %d (duplicate of %d): %s", i, j, g.Name)
  54. }
  55. }
  56. addrs[i] = ethAddr
  57. }
  58. v := vaa.CreateGovernanceVAA(nonce, sequence, guardianSetIndex,
  59. vaa.BodyGuardianSetUpdate{
  60. Keys: addrs,
  61. NewIndex: guardianSetIndex + 1,
  62. }.Serialize())
  63. return v, nil
  64. }
  65. // adminContractUpgradeToVAA converts a nodev1.ContractUpgrade message to its canonical VAA representation.
  66. // Returns an error if the data is invalid.
  67. func adminContractUpgradeToVAA(req *nodev1.ContractUpgrade, guardianSetIndex uint32, nonce uint32, sequence uint64) (*vaa.VAA, error) {
  68. b, err := hex.DecodeString(req.NewContract)
  69. if err != nil {
  70. return nil, errors.New("invalid new contract address encoding (expected hex)")
  71. }
  72. if len(b) != 32 {
  73. return nil, errors.New("invalid new_contract address")
  74. }
  75. if req.ChainId > math.MaxUint16 {
  76. return nil, errors.New("invalid chain_id")
  77. }
  78. newContractAddress := vaa.Address{}
  79. copy(newContractAddress[:], req.NewContract)
  80. v := vaa.CreateGovernanceVAA(nonce, sequence, guardianSetIndex,
  81. vaa.BodyContractUpgrade{
  82. ChainID: vaa.ChainID(req.ChainId),
  83. NewContract: newContractAddress,
  84. }.Serialize())
  85. return v, nil
  86. }
  87. // tokenBridgeRegisterChain converts a nodev1.TokenBridgeRegisterChain message to its canonical VAA representation.
  88. // Returns an error if the data is invalid.
  89. func tokenBridgeRegisterChain(req *nodev1.BridgeRegisterChain, guardianSetIndex uint32, nonce uint32, sequence uint64) (*vaa.VAA, error) {
  90. if req.ChainId > math.MaxUint16 {
  91. return nil, errors.New("invalid chain_id")
  92. }
  93. b, err := hex.DecodeString(req.EmitterAddress)
  94. if err != nil {
  95. return nil, errors.New("invalid emitter address encoding (expected hex)")
  96. }
  97. if len(b) != 32 {
  98. return nil, errors.New("invalid emitter address (expected 32 bytes)")
  99. }
  100. emitterAddress := vaa.Address{}
  101. copy(emitterAddress[:], b)
  102. v := vaa.CreateGovernanceVAA(nonce, sequence, guardianSetIndex,
  103. vaa.BodyTokenBridgeRegisterChain{
  104. Module: req.Module,
  105. ChainID: vaa.ChainID(req.ChainId),
  106. EmitterAddress: emitterAddress,
  107. }.Serialize())
  108. return v, nil
  109. }
  110. // tokenBridgeUpgradeContract converts a nodev1.TokenBridgeRegisterChain message to its canonical VAA representation.
  111. // Returns an error if the data is invalid.
  112. func tokenBridgeUpgradeContract(req *nodev1.BridgeUpgradeContract, guardianSetIndex uint32, nonce uint32, sequence uint64) (*vaa.VAA, error) {
  113. if req.TargetChainId > math.MaxUint16 {
  114. return nil, errors.New("invalid target_chain_id")
  115. }
  116. b, err := hex.DecodeString(req.NewContract)
  117. if err != nil {
  118. return nil, errors.New("invalid new contract address (expected hex)")
  119. }
  120. if len(b) != 32 {
  121. return nil, errors.New("invalid new contract address (expected 32 bytes)")
  122. }
  123. newContract := vaa.Address{}
  124. copy(newContract[:], b)
  125. v := vaa.CreateGovernanceVAA(nonce, sequence, guardianSetIndex,
  126. vaa.BodyTokenBridgeUpgradeContract{
  127. Module: req.Module,
  128. TargetChainID: vaa.ChainID(req.TargetChainId),
  129. NewContract: newContract,
  130. }.Serialize())
  131. return v, nil
  132. }
  133. func (s *nodePrivilegedService) InjectGovernanceVAA(ctx context.Context, req *nodev1.InjectGovernanceVAARequest) (*nodev1.InjectGovernanceVAAResponse, error) {
  134. s.logger.Info("governance VAA injected via admin socket", zap.String("request", req.String()))
  135. var (
  136. v *vaa.VAA
  137. err error
  138. )
  139. digests := make([][]byte, len(req.Messages))
  140. for i, message := range req.Messages {
  141. switch payload := message.Payload.(type) {
  142. case *nodev1.GovernanceMessage_GuardianSet:
  143. v, err = adminGuardianSetUpdateToVAA(payload.GuardianSet, req.CurrentSetIndex, message.Nonce, message.Sequence)
  144. case *nodev1.GovernanceMessage_ContractUpgrade:
  145. v, err = adminContractUpgradeToVAA(payload.ContractUpgrade, req.CurrentSetIndex, message.Nonce, message.Sequence)
  146. case *nodev1.GovernanceMessage_BridgeRegisterChain:
  147. v, err = tokenBridgeRegisterChain(payload.BridgeRegisterChain, req.CurrentSetIndex, message.Nonce, message.Sequence)
  148. case *nodev1.GovernanceMessage_BridgeContractUpgrade:
  149. v, err = tokenBridgeUpgradeContract(payload.BridgeContractUpgrade, req.CurrentSetIndex, message.Nonce, message.Sequence)
  150. default:
  151. panic(fmt.Sprintf("unsupported VAA type: %T", payload))
  152. }
  153. if err != nil {
  154. return nil, status.Error(codes.InvalidArgument, err.Error())
  155. }
  156. // Generate digest of the unsigned VAA.
  157. digest := v.SigningMsg()
  158. if err != nil {
  159. panic(err)
  160. }
  161. s.logger.Info("governance VAA constructed",
  162. zap.Any("vaa", v),
  163. zap.String("digest", digest.String()),
  164. )
  165. s.injectC <- v
  166. digests[i] = digest.Bytes()
  167. }
  168. return &nodev1.InjectGovernanceVAAResponse{Digests: digests}, nil
  169. }
  170. // fetchMissing attempts to backfill a gap by fetching and storing missing signed VAAs from the network.
  171. // Returns true if the gap was filled, false otherwise.
  172. func (s *nodePrivilegedService) fetchMissing(
  173. ctx context.Context,
  174. nodes []string,
  175. c *http.Client,
  176. chain vaa.ChainID,
  177. addr string,
  178. seq uint64) (bool, error) {
  179. // shuffle the list of public RPC endpoints
  180. rand.Shuffle(len(nodes), func(i, j int) {
  181. nodes[i], nodes[j] = nodes[j], nodes[i]
  182. })
  183. ctx, cancel := context.WithTimeout(ctx, time.Second)
  184. defer cancel()
  185. for _, node := range nodes {
  186. req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf(
  187. "%s/v1/signed_vaa/%d/%s/%d", node, chain, addr, seq), nil)
  188. if err != nil {
  189. return false, fmt.Errorf("failed to create request: %w", err)
  190. }
  191. resp, err := c.Do(req)
  192. if err != nil {
  193. s.logger.Warn("failed to fetch missing VAA",
  194. zap.String("node", node),
  195. zap.String("chain", chain.String()),
  196. zap.String("address", addr),
  197. zap.Uint64("sequence", seq),
  198. zap.Error(err),
  199. )
  200. continue
  201. }
  202. switch resp.StatusCode {
  203. case http.StatusNotFound:
  204. resp.Body.Close()
  205. continue
  206. case http.StatusOK:
  207. type getVaaResp struct {
  208. VaaBytes string `json:"vaaBytes"`
  209. }
  210. var respBody getVaaResp
  211. if err := json.NewDecoder(resp.Body).Decode(&respBody); err != nil {
  212. resp.Body.Close()
  213. s.logger.Warn("failed to decode VAA response",
  214. zap.String("node", node),
  215. zap.String("chain", chain.String()),
  216. zap.String("address", addr),
  217. zap.Uint64("sequence", seq),
  218. zap.Error(err),
  219. )
  220. continue
  221. }
  222. // base64 decode the VAA bytes
  223. vaaBytes, err := base64.StdEncoding.DecodeString(respBody.VaaBytes)
  224. if err != nil {
  225. resp.Body.Close()
  226. s.logger.Warn("failed to decode VAA body",
  227. zap.String("node", node),
  228. zap.String("chain", chain.String()),
  229. zap.String("address", addr),
  230. zap.Uint64("sequence", seq),
  231. zap.Error(err),
  232. )
  233. continue
  234. }
  235. s.logger.Info("backfilled VAA",
  236. zap.Uint16("chain", uint16(chain)),
  237. zap.String("address", addr),
  238. zap.Uint64("sequence", seq),
  239. zap.Int("numBytes", len(vaaBytes)),
  240. )
  241. // Inject into the gossip signed VAA receive path.
  242. // This has the same effect as if the VAA was received from the network
  243. // (verifying signature, publishing to BigTable, storing in local DB...).
  244. s.signedInC <- &gossipv1.SignedVAAWithQuorum{
  245. Vaa: vaaBytes,
  246. }
  247. resp.Body.Close()
  248. return true, nil
  249. default:
  250. resp.Body.Close()
  251. return false, fmt.Errorf("unexpected response status: %d", resp.StatusCode)
  252. }
  253. }
  254. return false, nil
  255. }
  256. func (s *nodePrivilegedService) FindMissingMessages(ctx context.Context, req *nodev1.FindMissingMessagesRequest) (*nodev1.FindMissingMessagesResponse, error) {
  257. b, err := hex.DecodeString(req.EmitterAddress)
  258. if err != nil {
  259. return nil, status.Errorf(codes.InvalidArgument, "invalid emitter address encoding: %v", err)
  260. }
  261. emitterAddress := vaa.Address{}
  262. copy(emitterAddress[:], b)
  263. ids, first, last, err := s.db.FindEmitterSequenceGap(db.VAAID{
  264. EmitterChain: vaa.ChainID(req.EmitterChain),
  265. EmitterAddress: emitterAddress,
  266. })
  267. if err != nil {
  268. return nil, status.Errorf(codes.Internal, "database operation failed: %v", err)
  269. }
  270. if req.RpcBackfill {
  271. c := &http.Client{}
  272. unfilled := make([]uint64, 0, len(ids))
  273. for _, id := range ids {
  274. if ok, err := s.fetchMissing(ctx, req.BackfillNodes, c, vaa.ChainID(req.EmitterChain), emitterAddress.String(), id); err != nil {
  275. return nil, status.Errorf(codes.Internal, "failed to backfill VAA: %v", err)
  276. } else if ok {
  277. continue
  278. }
  279. unfilled = append(unfilled, id)
  280. }
  281. ids = unfilled
  282. }
  283. resp := make([]string, len(ids))
  284. for i, v := range ids {
  285. resp[i] = fmt.Sprintf("%d/%s/%d", req.EmitterChain, emitterAddress, v)
  286. }
  287. return &nodev1.FindMissingMessagesResponse{
  288. MissingMessages: resp,
  289. FirstSequence: first,
  290. LastSequence: last,
  291. }, nil
  292. }
  293. func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- *vaa.VAA, signedInC chan *gossipv1.SignedVAAWithQuorum, obsvReqSendC chan *gossipv1.ObservationRequest, db *db.Database, gst *common.GuardianSetState) (supervisor.Runnable, error) {
  294. // Delete existing UNIX socket, if present.
  295. fi, err := os.Stat(socketPath)
  296. if err == nil {
  297. fmode := fi.Mode()
  298. if fmode&os.ModeType == os.ModeSocket {
  299. err = os.Remove(socketPath)
  300. if err != nil {
  301. return nil, fmt.Errorf("failed to remove existing socket at %s: %w", socketPath, err)
  302. }
  303. } else {
  304. return nil, fmt.Errorf("%s is not a UNIX socket", socketPath)
  305. }
  306. }
  307. // Create a new UNIX socket and listen to it.
  308. // The socket is created with the default umask. We set a restrictive umask in setRestrictiveUmask
  309. // to ensure that any files we create are only readable by the user - this is much harder to mess up.
  310. // The umask avoids a race condition between file creation and chmod.
  311. laddr, err := net.ResolveUnixAddr("unix", socketPath)
  312. if err != nil {
  313. return nil, fmt.Errorf("invalid listen address: %v", err)
  314. }
  315. l, err := net.ListenUnix("unix", laddr)
  316. if err != nil {
  317. return nil, fmt.Errorf("failed to listen on %s: %w", socketPath, err)
  318. }
  319. logger.Info("admin server listening on", zap.String("path", socketPath))
  320. nodeService := &nodePrivilegedService{
  321. injectC: injectC,
  322. obsvReqSendC: obsvReqSendC,
  323. db: db,
  324. logger: logger.Named("adminservice"),
  325. signedInC: signedInC,
  326. }
  327. publicrpcService := publicrpc.NewPublicrpcServer(logger, db, gst)
  328. grpcServer := common.NewInstrumentedGRPCServer(logger)
  329. nodev1.RegisterNodePrivilegedServiceServer(grpcServer, nodeService)
  330. publicrpcv1.RegisterPublicRPCServiceServer(grpcServer, publicrpcService)
  331. return supervisor.GRPCServer(grpcServer, l, false), nil
  332. }
  333. func (s *nodePrivilegedService) SendObservationRequest(ctx context.Context, req *nodev1.SendObservationRequestRequest) (*nodev1.SendObservationRequestResponse, error) {
  334. s.obsvReqSendC <- req.ObservationRequest
  335. s.logger.Info("sent observation request", zap.Any("request", req.ObservationRequest))
  336. return &nodev1.SendObservationRequestResponse{}, nil
  337. }