adminserver.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. package guardiand
  2. import (
  3. "context"
  4. "encoding/hex"
  5. "errors"
  6. "fmt"
  7. "github.com/certusone/wormhole/node/pkg/db"
  8. publicrpcv1 "github.com/certusone/wormhole/node/pkg/proto/publicrpc/v1"
  9. "github.com/certusone/wormhole/node/pkg/publicrpc"
  10. ethcommon "github.com/ethereum/go-ethereum/common"
  11. "go.uber.org/zap"
  12. "google.golang.org/grpc/codes"
  13. "google.golang.org/grpc/status"
  14. "math"
  15. "math/rand"
  16. "net"
  17. "net/http"
  18. "os"
  19. "time"
  20. "github.com/certusone/wormhole/node/pkg/common"
  21. nodev1 "github.com/certusone/wormhole/node/pkg/proto/node/v1"
  22. "github.com/certusone/wormhole/node/pkg/supervisor"
  23. "github.com/certusone/wormhole/node/pkg/vaa"
  24. )
  25. type nodePrivilegedService struct {
  26. nodev1.UnimplementedNodePrivilegedServiceServer
  27. db *db.Database
  28. injectC chan<- *vaa.VAA
  29. logger *zap.Logger
  30. }
  31. // adminGuardianSetUpdateToVAA converts a nodev1.GuardianSetUpdate message to its canonical VAA representation.
  32. // Returns an error if the data is invalid.
  33. func adminGuardianSetUpdateToVAA(req *nodev1.GuardianSetUpdate, guardianSetIndex uint32, nonce uint32, sequence uint64) (*vaa.VAA, error) {
  34. if len(req.Guardians) == 0 {
  35. return nil, errors.New("empty guardian set specified")
  36. }
  37. if len(req.Guardians) > common.MaxGuardianCount {
  38. return nil, fmt.Errorf("too many guardians - %d, maximum is %d", len(req.Guardians), common.MaxGuardianCount)
  39. }
  40. addrs := make([]ethcommon.Address, len(req.Guardians))
  41. for i, g := range req.Guardians {
  42. if !ethcommon.IsHexAddress(g.Pubkey) {
  43. return nil, fmt.Errorf("invalid pubkey format at index %d (%s)", i, g.Name)
  44. }
  45. ethAddr := ethcommon.HexToAddress(g.Pubkey)
  46. for j, pk := range addrs {
  47. if pk == ethAddr {
  48. return nil, fmt.Errorf("duplicate pubkey at index %d (duplicate of %d): %s", i, j, g.Name)
  49. }
  50. }
  51. addrs[i] = ethAddr
  52. }
  53. v := vaa.CreateGovernanceVAA(nonce, sequence, guardianSetIndex,
  54. vaa.BodyGuardianSetUpdate{
  55. Keys: addrs,
  56. NewIndex: guardianSetIndex + 1,
  57. }.Serialize())
  58. return v, nil
  59. }
  60. // adminContractUpgradeToVAA converts a nodev1.ContractUpgrade message to its canonical VAA representation.
  61. // Returns an error if the data is invalid.
  62. func adminContractUpgradeToVAA(req *nodev1.ContractUpgrade, guardianSetIndex uint32, nonce uint32, sequence uint64) (*vaa.VAA, error) {
  63. b, err := hex.DecodeString(req.NewContract)
  64. if err != nil {
  65. return nil, errors.New("invalid new contract address encoding (expected hex)")
  66. }
  67. if len(b) != 32 {
  68. return nil, errors.New("invalid new_contract address")
  69. }
  70. if req.ChainId > math.MaxUint16 {
  71. return nil, errors.New("invalid chain_id")
  72. }
  73. newContractAddress := vaa.Address{}
  74. copy(newContractAddress[:], req.NewContract)
  75. v := vaa.CreateGovernanceVAA(nonce, sequence, guardianSetIndex,
  76. vaa.BodyContractUpgrade{
  77. ChainID: vaa.ChainID(req.ChainId),
  78. NewContract: newContractAddress,
  79. }.Serialize())
  80. return v, nil
  81. }
  82. // tokenBridgeRegisterChain converts a nodev1.TokenBridgeRegisterChain message to its canonical VAA representation.
  83. // Returns an error if the data is invalid.
  84. func tokenBridgeRegisterChain(req *nodev1.BridgeRegisterChain, guardianSetIndex uint32, nonce uint32, sequence uint64) (*vaa.VAA, error) {
  85. if req.ChainId > math.MaxUint16 {
  86. return nil, errors.New("invalid chain_id")
  87. }
  88. b, err := hex.DecodeString(req.EmitterAddress)
  89. if err != nil {
  90. return nil, errors.New("invalid emitter address encoding (expected hex)")
  91. }
  92. if len(b) != 32 {
  93. return nil, errors.New("invalid emitter address (expected 32 bytes)")
  94. }
  95. emitterAddress := vaa.Address{}
  96. copy(emitterAddress[:], b)
  97. v := vaa.CreateGovernanceVAA(nonce, sequence, guardianSetIndex,
  98. vaa.BodyTokenBridgeRegisterChain{
  99. Module: req.Module,
  100. ChainID: vaa.ChainID(req.ChainId),
  101. EmitterAddress: emitterAddress,
  102. }.Serialize())
  103. return v, nil
  104. }
  105. // tokenBridgeUpgradeContract converts a nodev1.TokenBridgeRegisterChain message to its canonical VAA representation.
  106. // Returns an error if the data is invalid.
  107. func tokenBridgeUpgradeContract(req *nodev1.BridgeUpgradeContract, guardianSetIndex uint32, nonce uint32, sequence uint64) (*vaa.VAA, error) {
  108. if req.TargetChainId > math.MaxUint16 {
  109. return nil, errors.New("invalid target_chain_id")
  110. }
  111. b, err := hex.DecodeString(req.NewContract)
  112. if err != nil {
  113. return nil, errors.New("invalid new contract address (expected hex)")
  114. }
  115. if len(b) != 32 {
  116. return nil, errors.New("invalid new contract address (expected 32 bytes)")
  117. }
  118. newContract := vaa.Address{}
  119. copy(newContract[:], b)
  120. v := vaa.CreateGovernanceVAA(nonce, sequence, guardianSetIndex,
  121. vaa.BodyTokenBridgeUpgradeContract{
  122. Module: req.Module,
  123. TargetChainID: vaa.ChainID(req.TargetChainId),
  124. NewContract: newContract,
  125. }.Serialize())
  126. return v, nil
  127. }
  128. func (s *nodePrivilegedService) InjectGovernanceVAA(ctx context.Context, req *nodev1.InjectGovernanceVAARequest) (*nodev1.InjectGovernanceVAAResponse, error) {
  129. s.logger.Info("governance VAA injected via admin socket", zap.String("request", req.String()))
  130. var (
  131. v *vaa.VAA
  132. err error
  133. )
  134. digests := make([][]byte, len(req.Messages))
  135. for i, message := range req.Messages {
  136. switch payload := message.Payload.(type) {
  137. case *nodev1.GovernanceMessage_GuardianSet:
  138. v, err = adminGuardianSetUpdateToVAA(payload.GuardianSet, req.CurrentSetIndex, message.Nonce, message.Sequence)
  139. case *nodev1.GovernanceMessage_ContractUpgrade:
  140. v, err = adminContractUpgradeToVAA(payload.ContractUpgrade, req.CurrentSetIndex, message.Nonce, message.Sequence)
  141. case *nodev1.GovernanceMessage_BridgeRegisterChain:
  142. v, err = tokenBridgeRegisterChain(payload.BridgeRegisterChain, req.CurrentSetIndex, message.Nonce, message.Sequence)
  143. case *nodev1.GovernanceMessage_BridgeContractUpgrade:
  144. v, err = tokenBridgeUpgradeContract(payload.BridgeContractUpgrade, req.CurrentSetIndex, message.Nonce, message.Sequence)
  145. default:
  146. panic(fmt.Sprintf("unsupported VAA type: %T", payload))
  147. }
  148. if err != nil {
  149. return nil, status.Error(codes.InvalidArgument, err.Error())
  150. }
  151. // Generate digest of the unsigned VAA.
  152. digest := v.SigningMsg()
  153. if err != nil {
  154. panic(err)
  155. }
  156. s.logger.Info("governance VAA constructed",
  157. zap.Any("vaa", v),
  158. zap.String("digest", digest.String()),
  159. )
  160. s.injectC <- v
  161. digests[i] = digest.Bytes()
  162. }
  163. return &nodev1.InjectGovernanceVAAResponse{Digests: digests}, nil
  164. }
  165. // fetchMissing attempts to backfill a gap by fetching missing signed VAAs from the network.
  166. // Returns true if the gap was filled, false otherwise.
  167. func (s *nodePrivilegedService) fetchMissing(
  168. ctx context.Context,
  169. nodes []string,
  170. c *http.Client,
  171. chain vaa.ChainID,
  172. addr string,
  173. seq uint64) (bool, error) {
  174. // shuffle the list of public RPC endpoints
  175. rand.Shuffle(len(nodes), func(i, j int) {
  176. nodes[i], nodes[j] = nodes[j], nodes[i]
  177. })
  178. ctx, cancel := context.WithTimeout(ctx, time.Second)
  179. defer cancel()
  180. for _, node := range nodes {
  181. req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf(
  182. "%s/v1/signed_vaa/%d/%s/%d", node, chain, addr, seq), nil)
  183. if err != nil {
  184. return false, fmt.Errorf("failed to create request: %w", err)
  185. }
  186. resp, err := c.Do(req)
  187. if err != nil {
  188. s.logger.Warn("failed to fetch missing VAA",
  189. zap.String("node", node),
  190. zap.String("chain", chain.String()),
  191. zap.String("address", addr),
  192. zap.Uint64("sequence", seq),
  193. zap.Error(err),
  194. )
  195. continue
  196. }
  197. switch resp.StatusCode {
  198. case http.StatusNotFound:
  199. resp.Body.Close()
  200. continue
  201. case http.StatusOK:
  202. s.logger.Info("backfilled VAA",
  203. zap.Uint16("chain", uint16(chain)),
  204. zap.String("address", addr),
  205. zap.Uint64("sequence", seq),
  206. )
  207. resp.Body.Close()
  208. return true, nil
  209. default:
  210. resp.Body.Close()
  211. return false, fmt.Errorf("unexpected response status: %d", resp.StatusCode)
  212. }
  213. }
  214. return false, nil
  215. }
  216. func (s *nodePrivilegedService) FindMissingMessages(ctx context.Context, req *nodev1.FindMissingMessagesRequest) (*nodev1.FindMissingMessagesResponse, error) {
  217. b, err := hex.DecodeString(req.EmitterAddress)
  218. if err != nil {
  219. return nil, status.Errorf(codes.InvalidArgument, "invalid emitter address encoding: %v", err)
  220. }
  221. emitterAddress := vaa.Address{}
  222. copy(emitterAddress[:], b)
  223. ids, first, last, err := s.db.FindEmitterSequenceGap(db.VAAID{
  224. EmitterChain: vaa.ChainID(req.EmitterChain),
  225. EmitterAddress: emitterAddress,
  226. })
  227. if err != nil {
  228. return nil, status.Errorf(codes.Internal, "database operation failed: %v", err)
  229. }
  230. if req.RpcBackfill {
  231. c := &http.Client{}
  232. unfilled := make([]uint64, 0, len(ids))
  233. for _, id := range ids {
  234. if ok, err := s.fetchMissing(ctx, req.BackfillNodes, c, vaa.ChainID(req.EmitterChain), emitterAddress.String(), id); err != nil {
  235. return nil, status.Errorf(codes.Internal, "failed to backfill VAA: %v", err)
  236. } else if ok {
  237. continue
  238. }
  239. unfilled = append(unfilled, id)
  240. }
  241. ids = unfilled
  242. }
  243. resp := make([]string, len(ids))
  244. for i, v := range ids {
  245. resp[i] = fmt.Sprintf("%d/%s/%d", req.EmitterChain, emitterAddress, v)
  246. }
  247. return &nodev1.FindMissingMessagesResponse{
  248. MissingMessages: resp,
  249. FirstSequence: first,
  250. LastSequence: last,
  251. }, nil
  252. }
  253. func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- *vaa.VAA, db *db.Database, gst *common.GuardianSetState) (supervisor.Runnable, error) {
  254. // Delete existing UNIX socket, if present.
  255. fi, err := os.Stat(socketPath)
  256. if err == nil {
  257. fmode := fi.Mode()
  258. if fmode&os.ModeType == os.ModeSocket {
  259. err = os.Remove(socketPath)
  260. if err != nil {
  261. return nil, fmt.Errorf("failed to remove existing socket at %s: %w", socketPath, err)
  262. }
  263. } else {
  264. return nil, fmt.Errorf("%s is not a UNIX socket", socketPath)
  265. }
  266. }
  267. // Create a new UNIX socket and listen to it.
  268. // The socket is created with the default umask. We set a restrictive umask in setRestrictiveUmask
  269. // to ensure that any files we create are only readable by the user - this is much harder to mess up.
  270. // The umask avoids a race condition between file creation and chmod.
  271. laddr, err := net.ResolveUnixAddr("unix", socketPath)
  272. if err != nil {
  273. return nil, fmt.Errorf("invalid listen address: %v", err)
  274. }
  275. l, err := net.ListenUnix("unix", laddr)
  276. if err != nil {
  277. return nil, fmt.Errorf("failed to listen on %s: %w", socketPath, err)
  278. }
  279. logger.Info("admin server listening on", zap.String("path", socketPath))
  280. nodeService := &nodePrivilegedService{
  281. injectC: injectC,
  282. db: db,
  283. logger: logger.Named("adminservice"),
  284. }
  285. publicrpcService := publicrpc.NewPublicrpcServer(logger, db, gst)
  286. grpcServer := common.NewInstrumentedGRPCServer(logger)
  287. nodev1.RegisterNodePrivilegedServiceServer(grpcServer, nodeService)
  288. publicrpcv1.RegisterPublicRPCServiceServer(grpcServer, publicrpcService)
  289. return supervisor.GRPCServer(grpcServer, l, false), nil
  290. }