adminserver.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. package guardiand
  2. import (
  3. "context"
  4. "encoding/hex"
  5. "errors"
  6. "fmt"
  7. "github.com/certusone/wormhole/node/pkg/db"
  8. publicrpcv1 "github.com/certusone/wormhole/node/pkg/proto/publicrpc/v1"
  9. "github.com/certusone/wormhole/node/pkg/publicrpc"
  10. ethcommon "github.com/ethereum/go-ethereum/common"
  11. grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
  12. grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
  13. grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
  14. grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
  15. "go.uber.org/zap"
  16. "google.golang.org/grpc"
  17. "google.golang.org/grpc/codes"
  18. "google.golang.org/grpc/status"
  19. "math"
  20. "net"
  21. "os"
  22. "github.com/certusone/wormhole/node/pkg/common"
  23. nodev1 "github.com/certusone/wormhole/node/pkg/proto/node/v1"
  24. "github.com/certusone/wormhole/node/pkg/supervisor"
  25. "github.com/certusone/wormhole/node/pkg/vaa"
  26. )
  27. type nodePrivilegedService struct {
  28. nodev1.UnimplementedNodePrivilegedServiceServer
  29. db *db.Database
  30. injectC chan<- *vaa.VAA
  31. logger *zap.Logger
  32. }
  33. // adminGuardianSetUpdateToVAA converts a nodev1.GuardianSetUpdate message to its canonical VAA representation.
  34. // Returns an error if the data is invalid.
  35. func adminGuardianSetUpdateToVAA(req *nodev1.GuardianSetUpdate, guardianSetIndex uint32, nonce uint32, sequence uint64) (*vaa.VAA, error) {
  36. if len(req.Guardians) == 0 {
  37. return nil, errors.New("empty guardian set specified")
  38. }
  39. if len(req.Guardians) > common.MaxGuardianCount {
  40. return nil, fmt.Errorf("too many guardians - %d, maximum is %d", len(req.Guardians), common.MaxGuardianCount)
  41. }
  42. addrs := make([]ethcommon.Address, len(req.Guardians))
  43. for i, g := range req.Guardians {
  44. if !ethcommon.IsHexAddress(g.Pubkey) {
  45. return nil, fmt.Errorf("invalid pubkey format at index %d (%s)", i, g.Name)
  46. }
  47. ethAddr := ethcommon.HexToAddress(g.Pubkey)
  48. for j, pk := range addrs {
  49. if pk == ethAddr {
  50. return nil, fmt.Errorf("duplicate pubkey at index %d (duplicate of %d): %s", i, j, g.Name)
  51. }
  52. }
  53. addrs[i] = ethAddr
  54. }
  55. v := vaa.CreateGovernanceVAA(nonce, sequence, guardianSetIndex,
  56. vaa.BodyGuardianSetUpdate{
  57. Keys: addrs,
  58. NewIndex: guardianSetIndex + 1,
  59. }.Serialize())
  60. return v, nil
  61. }
  62. // adminContractUpgradeToVAA converts a nodev1.ContractUpgrade message to its canonical VAA representation.
  63. // Returns an error if the data is invalid.
  64. func adminContractUpgradeToVAA(req *nodev1.ContractUpgrade, guardianSetIndex uint32, nonce uint32, sequence uint64) (*vaa.VAA, error) {
  65. b, err := hex.DecodeString(req.NewContract)
  66. if err != nil {
  67. return nil, errors.New("invalid new contract address encoding (expected hex)")
  68. }
  69. if len(b) != 32 {
  70. return nil, errors.New("invalid new_contract address")
  71. }
  72. if req.ChainId > math.MaxUint16 {
  73. return nil, errors.New("invalid chain_id")
  74. }
  75. newContractAddress := vaa.Address{}
  76. copy(newContractAddress[:], req.NewContract)
  77. v := vaa.CreateGovernanceVAA(nonce, sequence, guardianSetIndex,
  78. vaa.BodyContractUpgrade{
  79. ChainID: vaa.ChainID(req.ChainId),
  80. NewContract: newContractAddress,
  81. }.Serialize())
  82. return v, nil
  83. }
  84. // tokenBridgeRegisterChain converts a nodev1.TokenBridgeRegisterChain message to its canonical VAA representation.
  85. // Returns an error if the data is invalid.
  86. func tokenBridgeRegisterChain(req *nodev1.BridgeRegisterChain, guardianSetIndex uint32, nonce uint32, sequence uint64) (*vaa.VAA, error) {
  87. if req.ChainId > math.MaxUint16 {
  88. return nil, errors.New("invalid chain_id")
  89. }
  90. b, err := hex.DecodeString(req.EmitterAddress)
  91. if err != nil {
  92. return nil, errors.New("invalid emitter address encoding (expected hex)")
  93. }
  94. if len(b) != 32 {
  95. return nil, errors.New("invalid emitter address (expected 32 bytes)")
  96. }
  97. emitterAddress := vaa.Address{}
  98. copy(emitterAddress[:], b)
  99. v := vaa.CreateGovernanceVAA(nonce, sequence, guardianSetIndex,
  100. vaa.BodyTokenBridgeRegisterChain{
  101. Module: req.Module,
  102. ChainID: vaa.ChainID(req.ChainId),
  103. EmitterAddress: emitterAddress,
  104. }.Serialize())
  105. return v, nil
  106. }
  107. // tokenBridgeUpgradeContract converts a nodev1.TokenBridgeRegisterChain message to its canonical VAA representation.
  108. // Returns an error if the data is invalid.
  109. func tokenBridgeUpgradeContract(req *nodev1.BridgeUpgradeContract, guardianSetIndex uint32, nonce uint32, sequence uint64) (*vaa.VAA, error) {
  110. if req.TargetChainId > math.MaxUint16 {
  111. return nil, errors.New("invalid target_chain_id")
  112. }
  113. b, err := hex.DecodeString(req.NewContract)
  114. if err != nil {
  115. return nil, errors.New("invalid new contract address (expected hex)")
  116. }
  117. if len(b) != 32 {
  118. return nil, errors.New("invalid new contract address (expected 32 bytes)")
  119. }
  120. newContract := vaa.Address{}
  121. copy(newContract[:], b)
  122. v := vaa.CreateGovernanceVAA(nonce, sequence, guardianSetIndex,
  123. vaa.BodyTokenBridgeUpgradeContract{
  124. Module: req.Module,
  125. TargetChainID: vaa.ChainID(req.TargetChainId),
  126. NewContract: newContract,
  127. }.Serialize())
  128. return v, nil
  129. }
  130. func (s *nodePrivilegedService) InjectGovernanceVAA(ctx context.Context, req *nodev1.InjectGovernanceVAARequest) (*nodev1.InjectGovernanceVAAResponse, error) {
  131. s.logger.Info("governance VAA injected via admin socket", zap.String("request", req.String()))
  132. var (
  133. v *vaa.VAA
  134. err error
  135. )
  136. switch payload := req.Payload.(type) {
  137. case *nodev1.InjectGovernanceVAARequest_GuardianSet:
  138. v, err = adminGuardianSetUpdateToVAA(payload.GuardianSet, req.CurrentSetIndex, req.Nonce, req.Sequence)
  139. case *nodev1.InjectGovernanceVAARequest_ContractUpgrade:
  140. v, err = adminContractUpgradeToVAA(payload.ContractUpgrade, req.CurrentSetIndex, req.Nonce, req.Sequence)
  141. case *nodev1.InjectGovernanceVAARequest_BridgeRegisterChain:
  142. v, err = tokenBridgeRegisterChain(payload.BridgeRegisterChain, req.CurrentSetIndex, req.Nonce, req.Sequence)
  143. case *nodev1.InjectGovernanceVAARequest_BridgeContractUpgrade:
  144. v, err = tokenBridgeUpgradeContract(payload.BridgeContractUpgrade, req.CurrentSetIndex, req.Nonce, req.Sequence)
  145. default:
  146. panic(fmt.Sprintf("unsupported VAA type: %T", payload))
  147. }
  148. if err != nil {
  149. return nil, status.Error(codes.InvalidArgument, err.Error())
  150. }
  151. // Generate digest of the unsigned VAA.
  152. digest, err := v.SigningMsg()
  153. if err != nil {
  154. panic(err)
  155. }
  156. s.logger.Info("governance VAA constructed",
  157. zap.Any("vaa", v),
  158. zap.String("digest", digest.String()),
  159. )
  160. s.injectC <- v
  161. return &nodev1.InjectGovernanceVAAResponse{Digest: digest.Bytes()}, nil
  162. }
  163. func (s *nodePrivilegedService) FindMissingMessages(ctx context.Context, req *nodev1.FindMissingMessagesRequest) (*nodev1.FindMissingMessagesResponse, error) {
  164. b, err := hex.DecodeString(req.EmitterAddress)
  165. if err != nil {
  166. return nil, status.Errorf(codes.InvalidArgument, "invalid emitter address encoding: %v", err)
  167. }
  168. emitterAddress := vaa.Address{}
  169. copy(emitterAddress[:], b)
  170. ids, first, last, err := s.db.FindEmitterSequenceGap(db.VAAID{
  171. EmitterChain: vaa.ChainID(req.EmitterChain),
  172. EmitterAddress: emitterAddress,
  173. })
  174. if err != nil {
  175. return nil, status.Errorf(codes.Internal, "database operation failed: %v", err)
  176. }
  177. resp := make([]string, len(ids))
  178. for i, v := range ids {
  179. resp[i] = fmt.Sprintf("%d/%s/%d", req.EmitterChain, emitterAddress, v)
  180. }
  181. return &nodev1.FindMissingMessagesResponse{
  182. MissingMessages: resp,
  183. FirstSequence: first,
  184. LastSequence: last,
  185. }, nil
  186. }
  187. func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- *vaa.VAA, db *db.Database, gst *common.GuardianSetState) (supervisor.Runnable, error) {
  188. // Delete existing UNIX socket, if present.
  189. fi, err := os.Stat(socketPath)
  190. if err == nil {
  191. fmode := fi.Mode()
  192. if fmode&os.ModeType == os.ModeSocket {
  193. err = os.Remove(socketPath)
  194. if err != nil {
  195. return nil, fmt.Errorf("failed to remove existing socket at %s: %w", socketPath, err)
  196. }
  197. } else {
  198. return nil, fmt.Errorf("%s is not a UNIX socket", socketPath)
  199. }
  200. }
  201. // Create a new UNIX socket and listen to it.
  202. // The socket is created with the default umask. We set a restrictive umask in setRestrictiveUmask
  203. // to ensure that any files we create are only readable by the user - this is much harder to mess up.
  204. // The umask avoids a race condition between file creation and chmod.
  205. laddr, err := net.ResolveUnixAddr("unix", socketPath)
  206. if err != nil {
  207. return nil, fmt.Errorf("invalid listen address: %v", err)
  208. }
  209. l, err := net.ListenUnix("unix", laddr)
  210. if err != nil {
  211. return nil, fmt.Errorf("failed to listen on %s: %w", socketPath, err)
  212. }
  213. logger.Info("admin server listening on", zap.String("path", socketPath))
  214. nodeService := &nodePrivilegedService{
  215. injectC: injectC,
  216. db: db,
  217. logger: logger.Named("adminservice"),
  218. }
  219. publicrpcService := publicrpc.NewPublicrpcServer(logger, db, gst)
  220. grpcServer := newGRPCServer(logger)
  221. nodev1.RegisterNodePrivilegedServiceServer(grpcServer, nodeService)
  222. publicrpcv1.RegisterPublicRPCServiceServer(grpcServer, publicrpcService)
  223. return supervisor.GRPCServer(grpcServer, l, false), nil
  224. }
  225. func newGRPCServer(logger *zap.Logger) *grpc.Server {
  226. server := grpc.NewServer(
  227. grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
  228. grpc_ctxtags.StreamServerInterceptor(),
  229. grpc_prometheus.StreamServerInterceptor,
  230. grpc_zap.StreamServerInterceptor(logger),
  231. )),
  232. grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
  233. grpc_ctxtags.UnaryServerInterceptor(),
  234. grpc_prometheus.UnaryServerInterceptor,
  235. grpc_zap.UnaryServerInterceptor(logger),
  236. )),
  237. )
  238. grpc_prometheus.EnableHandlingTimeHistogram()
  239. grpc_prometheus.Register(server)
  240. return server
  241. }