adminserver.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. package guardiand
  2. import (
  3. "context"
  4. "encoding/hex"
  5. "errors"
  6. "fmt"
  7. "github.com/certusone/wormhole/node/pkg/db"
  8. publicrpcv1 "github.com/certusone/wormhole/node/pkg/proto/publicrpc/v1"
  9. "github.com/certusone/wormhole/node/pkg/publicrpc"
  10. ethcommon "github.com/ethereum/go-ethereum/common"
  11. grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
  12. grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
  13. grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
  14. grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
  15. "go.uber.org/zap"
  16. "google.golang.org/grpc"
  17. "google.golang.org/grpc/codes"
  18. "google.golang.org/grpc/status"
  19. "math"
  20. "net"
  21. "os"
  22. "github.com/certusone/wormhole/node/pkg/common"
  23. nodev1 "github.com/certusone/wormhole/node/pkg/proto/node/v1"
  24. "github.com/certusone/wormhole/node/pkg/supervisor"
  25. "github.com/certusone/wormhole/node/pkg/vaa"
  26. )
  27. type nodePrivilegedService struct {
  28. nodev1.UnimplementedNodePrivilegedServiceServer
  29. db *db.Database
  30. injectC chan<- *vaa.VAA
  31. logger *zap.Logger
  32. }
  33. // adminGuardianSetUpdateToVAA converts a nodev1.GuardianSetUpdate message to its canonical VAA representation.
  34. // Returns an error if the data is invalid.
  35. func adminGuardianSetUpdateToVAA(req *nodev1.GuardianSetUpdate, guardianSetIndex uint32, nonce uint32, sequence uint64) (*vaa.VAA, error) {
  36. if len(req.Guardians) == 0 {
  37. return nil, errors.New("empty guardian set specified")
  38. }
  39. if len(req.Guardians) > common.MaxGuardianCount {
  40. return nil, fmt.Errorf("too many guardians - %d, maximum is %d", len(req.Guardians), common.MaxGuardianCount)
  41. }
  42. addrs := make([]ethcommon.Address, len(req.Guardians))
  43. for i, g := range req.Guardians {
  44. if !ethcommon.IsHexAddress(g.Pubkey) {
  45. return nil, fmt.Errorf("invalid pubkey format at index %d (%s)", i, g.Name)
  46. }
  47. ethAddr := ethcommon.HexToAddress(g.Pubkey)
  48. for j, pk := range addrs {
  49. if pk == ethAddr {
  50. return nil, fmt.Errorf("duplicate pubkey at index %d (duplicate of %d): %s", i, j, g.Name)
  51. }
  52. }
  53. addrs[i] = ethAddr
  54. }
  55. v := vaa.CreateGovernanceVAA(nonce, sequence, guardianSetIndex,
  56. vaa.BodyGuardianSetUpdate{
  57. Keys: addrs,
  58. NewIndex: guardianSetIndex + 1,
  59. }.Serialize())
  60. return v, nil
  61. }
  62. // adminContractUpgradeToVAA converts a nodev1.ContractUpgrade message to its canonical VAA representation.
  63. // Returns an error if the data is invalid.
  64. func adminContractUpgradeToVAA(req *nodev1.ContractUpgrade, guardianSetIndex uint32, nonce uint32, sequence uint64) (*vaa.VAA, error) {
  65. if len(req.NewContract) != 32 {
  66. return nil, errors.New("invalid new_contract address")
  67. }
  68. if req.ChainId > math.MaxUint8 {
  69. return nil, errors.New("invalid chain_id")
  70. }
  71. newContractAddress := vaa.Address{}
  72. copy(newContractAddress[:], req.NewContract)
  73. v := vaa.CreateGovernanceVAA(nonce, sequence, guardianSetIndex,
  74. vaa.BodyContractUpgrade{
  75. ChainID: vaa.ChainID(req.ChainId),
  76. NewContract: newContractAddress,
  77. }.Serialize())
  78. return v, nil
  79. }
  80. // tokenBridgeRegisterChain converts a nodev1.TokenBridgeRegisterChain message to its canonical VAA representation.
  81. // Returns an error if the data is invalid.
  82. func tokenBridgeRegisterChain(req *nodev1.BridgeRegisterChain, guardianSetIndex uint32, nonce uint32, sequence uint64) (*vaa.VAA, error) {
  83. if req.ChainId > math.MaxUint8 {
  84. return nil, errors.New("invalid chain_id")
  85. }
  86. b, err := hex.DecodeString(req.EmitterAddress)
  87. if err != nil {
  88. return nil, errors.New("invalid emitter address encoding (expected hex)")
  89. }
  90. if len(b) != 32 {
  91. return nil, errors.New("invalid emitter address (expected 32 bytes)")
  92. }
  93. emitterAddress := vaa.Address{}
  94. copy(emitterAddress[:], b)
  95. v := vaa.CreateGovernanceVAA(nonce, sequence, guardianSetIndex,
  96. vaa.BodyTokenBridgeRegisterChain{
  97. Module: req.Module,
  98. ChainID: vaa.ChainID(req.ChainId),
  99. EmitterAddress: emitterAddress,
  100. }.Serialize())
  101. return v, nil
  102. }
  103. // tokenBridgeUpgradeContract converts a nodev1.TokenBridgeRegisterChain message to its canonical VAA representation.
  104. // Returns an error if the data is invalid.
  105. func tokenBridgeUpgradeContract(req *nodev1.BridgeUpgradeContract, guardianSetIndex uint32, nonce uint32, sequence uint64) (*vaa.VAA, error) {
  106. if req.TargetChainId > math.MaxUint16 {
  107. return nil, errors.New("invalid target_chain_id")
  108. }
  109. b, err := hex.DecodeString(req.NewContract)
  110. if err != nil {
  111. return nil, errors.New("invalid new contract address (expected hex)")
  112. }
  113. if len(b) != 32 {
  114. return nil, errors.New("invalid new contract address (expected 32 bytes)")
  115. }
  116. newContract := vaa.Address{}
  117. copy(newContract[:], b)
  118. v := vaa.CreateGovernanceVAA(nonce, sequence, guardianSetIndex,
  119. vaa.BodyTokenBridgeUpgradeContract{
  120. Module: req.Module,
  121. TargetChainID: vaa.ChainID(req.TargetChainId),
  122. NewContract: newContract,
  123. }.Serialize())
  124. return v, nil
  125. }
  126. func (s *nodePrivilegedService) InjectGovernanceVAA(ctx context.Context, req *nodev1.InjectGovernanceVAARequest) (*nodev1.InjectGovernanceVAAResponse, error) {
  127. s.logger.Info("governance VAA injected via admin socket", zap.String("request", req.String()))
  128. var (
  129. v *vaa.VAA
  130. err error
  131. )
  132. switch payload := req.Payload.(type) {
  133. case *nodev1.InjectGovernanceVAARequest_GuardianSet:
  134. v, err = adminGuardianSetUpdateToVAA(payload.GuardianSet, req.CurrentSetIndex, req.Nonce, req.Sequence)
  135. case *nodev1.InjectGovernanceVAARequest_ContractUpgrade:
  136. v, err = adminContractUpgradeToVAA(payload.ContractUpgrade, req.CurrentSetIndex, req.Nonce, req.Sequence)
  137. case *nodev1.InjectGovernanceVAARequest_BridgeRegisterChain:
  138. v, err = tokenBridgeRegisterChain(payload.BridgeRegisterChain, req.CurrentSetIndex, req.Nonce, req.Sequence)
  139. case *nodev1.InjectGovernanceVAARequest_BridgeContractUpgrade:
  140. v, err = tokenBridgeUpgradeContract(payload.BridgeContractUpgrade, req.CurrentSetIndex, req.Nonce, req.Sequence)
  141. default:
  142. panic(fmt.Sprintf("unsupported VAA type: %T", payload))
  143. }
  144. if err != nil {
  145. return nil, status.Error(codes.InvalidArgument, err.Error())
  146. }
  147. // Generate digest of the unsigned VAA.
  148. digest, err := v.SigningMsg()
  149. if err != nil {
  150. panic(err)
  151. }
  152. s.logger.Info("governance VAA constructed",
  153. zap.Any("vaa", v),
  154. zap.String("digest", digest.String()),
  155. )
  156. s.injectC <- v
  157. return &nodev1.InjectGovernanceVAAResponse{Digest: digest.Bytes()}, nil
  158. }
  159. func (s *nodePrivilegedService) FindMissingMessages(ctx context.Context, req *nodev1.FindMissingMessagesRequest) (*nodev1.FindMissingMessagesResponse, error) {
  160. b, err := hex.DecodeString(req.EmitterAddress)
  161. if err != nil {
  162. return nil, status.Errorf(codes.InvalidArgument, "invalid emitter address encoding: %v", err)
  163. }
  164. emitterAddress := vaa.Address{}
  165. copy(emitterAddress[:], b)
  166. ids, first, last, err := s.db.FindEmitterSequenceGap(db.VAAID{
  167. EmitterChain: vaa.ChainID(req.EmitterChain),
  168. EmitterAddress: emitterAddress,
  169. })
  170. if err != nil {
  171. return nil, status.Errorf(codes.Internal, "database operation failed: %v", err)
  172. }
  173. resp := make([]string, len(ids))
  174. for i, v := range ids {
  175. resp[i] = fmt.Sprintf("%d/%s/%d", req.EmitterChain, emitterAddress, v)
  176. }
  177. return &nodev1.FindMissingMessagesResponse{
  178. MissingMessages: resp,
  179. FirstSequence: first,
  180. LastSequence: last,
  181. }, nil
  182. }
  183. func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- *vaa.VAA, db *db.Database, gst *common.GuardianSetState) (supervisor.Runnable, error) {
  184. // Delete existing UNIX socket, if present.
  185. fi, err := os.Stat(socketPath)
  186. if err == nil {
  187. fmode := fi.Mode()
  188. if fmode&os.ModeType == os.ModeSocket {
  189. err = os.Remove(socketPath)
  190. if err != nil {
  191. return nil, fmt.Errorf("failed to remove existing socket at %s: %w", socketPath, err)
  192. }
  193. } else {
  194. return nil, fmt.Errorf("%s is not a UNIX socket", socketPath)
  195. }
  196. }
  197. // Create a new UNIX socket and listen to it.
  198. // The socket is created with the default umask. We set a restrictive umask in setRestrictiveUmask
  199. // to ensure that any files we create are only readable by the user - this is much harder to mess up.
  200. // The umask avoids a race condition between file creation and chmod.
  201. laddr, err := net.ResolveUnixAddr("unix", socketPath)
  202. if err != nil {
  203. return nil, fmt.Errorf("invalid listen address: %v", err)
  204. }
  205. l, err := net.ListenUnix("unix", laddr)
  206. if err != nil {
  207. return nil, fmt.Errorf("failed to listen on %s: %w", socketPath, err)
  208. }
  209. logger.Info("admin server listening on", zap.String("path", socketPath))
  210. nodeService := &nodePrivilegedService{
  211. injectC: injectC,
  212. db: db,
  213. logger: logger.Named("adminservice"),
  214. }
  215. publicrpcService := publicrpc.NewPublicrpcServer(logger, db, gst)
  216. grpcServer := newGRPCServer(logger)
  217. nodev1.RegisterNodePrivilegedServiceServer(grpcServer, nodeService)
  218. publicrpcv1.RegisterPublicRPCServiceServer(grpcServer, publicrpcService)
  219. return supervisor.GRPCServer(grpcServer, l, false), nil
  220. }
  221. func newGRPCServer(logger *zap.Logger) *grpc.Server {
  222. server := grpc.NewServer(
  223. grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
  224. grpc_ctxtags.StreamServerInterceptor(),
  225. grpc_prometheus.StreamServerInterceptor,
  226. grpc_zap.StreamServerInterceptor(logger),
  227. )),
  228. grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
  229. grpc_ctxtags.UnaryServerInterceptor(),
  230. grpc_prometheus.UnaryServerInterceptor,
  231. grpc_zap.UnaryServerInterceptor(logger),
  232. )),
  233. )
  234. grpc_prometheus.EnableHandlingTimeHistogram()
  235. grpc_prometheus.Register(server)
  236. return server
  237. }