adminserver.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. package guardiand
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/certusone/wormhole/node/pkg/db"
  7. publicrpcv1 "github.com/certusone/wormhole/node/pkg/proto/publicrpc/v1"
  8. "github.com/certusone/wormhole/node/pkg/publicrpc"
  9. grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
  10. grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
  11. grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
  12. grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
  13. "math"
  14. "net"
  15. "os"
  16. "time"
  17. ethcommon "github.com/ethereum/go-ethereum/common"
  18. "go.uber.org/zap"
  19. "google.golang.org/grpc"
  20. "google.golang.org/grpc/codes"
  21. "google.golang.org/grpc/status"
  22. "github.com/certusone/wormhole/node/pkg/common"
  23. nodev1 "github.com/certusone/wormhole/node/pkg/proto/node/v1"
  24. "github.com/certusone/wormhole/node/pkg/supervisor"
  25. "github.com/certusone/wormhole/node/pkg/vaa"
  26. )
  27. type nodePrivilegedService struct {
  28. nodev1.UnimplementedNodePrivilegedServiceServer
  29. injectC chan<- *vaa.VAA
  30. logger *zap.Logger
  31. }
  32. // adminGuardianSetUpdateToVAA converts a nodev1.GuardianSetUpdate message to its canonical VAA representation.
  33. // Returns an error if the data is invalid.
  34. func adminGuardianSetUpdateToVAA(req *nodev1.GuardianSetUpdate, guardianSetIndex uint32, timestamp uint32) (*vaa.VAA, error) {
  35. if len(req.Guardians) == 0 {
  36. return nil, errors.New("empty guardian set specified")
  37. }
  38. if len(req.Guardians) > common.MaxGuardianCount {
  39. return nil, fmt.Errorf("too many guardians - %d, maximum is %d", len(req.Guardians), common.MaxGuardianCount)
  40. }
  41. addrs := make([]ethcommon.Address, len(req.Guardians))
  42. for i, g := range req.Guardians {
  43. if !ethcommon.IsHexAddress(g.Pubkey) {
  44. return nil, fmt.Errorf("invalid pubkey format at index %d (%s)", i, g.Name)
  45. }
  46. ethAddr := ethcommon.HexToAddress(g.Pubkey)
  47. for j, pk := range addrs {
  48. if pk == ethAddr {
  49. return nil, fmt.Errorf("duplicate pubkey at index %d (duplicate of %d): %s", i, j, g.Name)
  50. }
  51. }
  52. addrs[i] = ethAddr
  53. }
  54. v := &vaa.VAA{
  55. Version: vaa.SupportedVAAVersion,
  56. GuardianSetIndex: guardianSetIndex,
  57. Timestamp: time.Unix(int64(timestamp), 0),
  58. Payload: vaa.BodyGuardianSetUpdate{
  59. Keys: addrs,
  60. NewIndex: guardianSetIndex + 1,
  61. }.Serialize(),
  62. }
  63. return v, nil
  64. }
  65. // adminContractUpgradeToVAA converts a nodev1.ContractUpgrade message to its canonical VAA representation.
  66. // Returns an error if the data is invalid.
  67. func adminContractUpgradeToVAA(req *nodev1.ContractUpgrade, guardianSetIndex uint32, timestamp uint32) (*vaa.VAA, error) {
  68. if len(req.NewContract) != 32 {
  69. return nil, errors.New("invalid new_contract address")
  70. }
  71. if req.ChainId > math.MaxUint8 {
  72. return nil, errors.New("invalid chain_id")
  73. }
  74. newContractAddress := vaa.Address{}
  75. copy(newContractAddress[:], req.NewContract)
  76. v := &vaa.VAA{
  77. Version: vaa.SupportedVAAVersion,
  78. GuardianSetIndex: guardianSetIndex,
  79. Timestamp: time.Unix(int64(timestamp), 0),
  80. Payload: vaa.BodyContractUpgrade{
  81. ChainID: vaa.ChainID(req.ChainId),
  82. NewContract: newContractAddress,
  83. }.Serialize(),
  84. }
  85. return v, nil
  86. }
  87. func (s *nodePrivilegedService) InjectGovernanceVAA(ctx context.Context, req *nodev1.InjectGovernanceVAARequest) (*nodev1.InjectGovernanceVAAResponse, error) {
  88. s.logger.Info("governance VAA injected via admin socket", zap.String("request", req.String()))
  89. var (
  90. v *vaa.VAA
  91. err error
  92. )
  93. switch payload := req.Payload.(type) {
  94. case *nodev1.InjectGovernanceVAARequest_GuardianSet:
  95. v, err = adminGuardianSetUpdateToVAA(payload.GuardianSet, req.CurrentSetIndex, req.Timestamp)
  96. case *nodev1.InjectGovernanceVAARequest_ContractUpgrade:
  97. v, err = adminContractUpgradeToVAA(payload.ContractUpgrade, req.CurrentSetIndex, req.Timestamp)
  98. default:
  99. panic(fmt.Sprintf("unsupported VAA type: %T", payload))
  100. }
  101. if err != nil {
  102. return nil, status.Error(codes.InvalidArgument, err.Error())
  103. }
  104. // Generate digest of the unsigned VAA.
  105. digest, err := v.SigningMsg()
  106. if err != nil {
  107. panic(err)
  108. }
  109. s.logger.Info("governance VAA constructed",
  110. zap.Any("vaa", v),
  111. zap.String("digest", digest.String()),
  112. )
  113. s.injectC <- v
  114. return &nodev1.InjectGovernanceVAAResponse{Digest: digest.Bytes()}, nil
  115. }
  116. func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- *vaa.VAA, db *db.Database, gst *common.GuardianSetState) (supervisor.Runnable, error) {
  117. // Delete existing UNIX socket, if present.
  118. fi, err := os.Stat(socketPath)
  119. if err == nil {
  120. fmode := fi.Mode()
  121. if fmode&os.ModeType == os.ModeSocket {
  122. err = os.Remove(socketPath)
  123. if err != nil {
  124. return nil, fmt.Errorf("failed to remove existing socket at %s: %w", socketPath, err)
  125. }
  126. } else {
  127. return nil, fmt.Errorf("%s is not a UNIX socket", socketPath)
  128. }
  129. }
  130. // Create a new UNIX socket and listen to it.
  131. // The socket is created with the default umask. We set a restrictive umask in setRestrictiveUmask
  132. // to ensure that any files we create are only readable by the user - this is much harder to mess up.
  133. // The umask avoids a race condition between file creation and chmod.
  134. laddr, err := net.ResolveUnixAddr("unix", socketPath)
  135. l, err := net.ListenUnix("unix", laddr)
  136. if err != nil {
  137. return nil, fmt.Errorf("failed to listen on %s: %w", socketPath, err)
  138. }
  139. logger.Info("admin server listening on", zap.String("path", socketPath))
  140. nodeService := &nodePrivilegedService{
  141. injectC: injectC,
  142. logger: logger.Named("adminservice"),
  143. }
  144. publicrpcService := publicrpc.NewPublicrpcServer(logger, db, gst)
  145. grpcServer := newGRPCServer(logger)
  146. nodev1.RegisterNodePrivilegedServiceServer(grpcServer, nodeService)
  147. publicrpcv1.RegisterPublicRPCServiceServer(grpcServer, publicrpcService)
  148. return supervisor.GRPCServer(grpcServer, l, false), nil
  149. }
  150. func newGRPCServer(logger *zap.Logger) *grpc.Server {
  151. server := grpc.NewServer(
  152. grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
  153. grpc_ctxtags.StreamServerInterceptor(),
  154. grpc_prometheus.StreamServerInterceptor,
  155. grpc_zap.StreamServerInterceptor(logger),
  156. )),
  157. grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
  158. grpc_ctxtags.UnaryServerInterceptor(),
  159. grpc_prometheus.UnaryServerInterceptor,
  160. grpc_zap.UnaryServerInterceptor(logger),
  161. )),
  162. )
  163. grpc_prometheus.EnableHandlingTimeHistogram()
  164. grpc_prometheus.Register(server)
  165. return server
  166. }