adminserver.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. package guardiand
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  7. publicrpcv1 "github.com/certusone/wormhole/node/pkg/proto/publicrpc/v1"
  8. "github.com/certusone/wormhole/node/pkg/publicrpc"
  9. "math"
  10. "net"
  11. "os"
  12. "time"
  13. ethcommon "github.com/ethereum/go-ethereum/common"
  14. "go.uber.org/zap"
  15. "google.golang.org/grpc/codes"
  16. "google.golang.org/grpc/status"
  17. "github.com/certusone/wormhole/node/pkg/common"
  18. nodev1 "github.com/certusone/wormhole/node/pkg/proto/node/v1"
  19. "github.com/certusone/wormhole/node/pkg/supervisor"
  20. "github.com/certusone/wormhole/node/pkg/vaa"
  21. )
  22. type nodePrivilegedService struct {
  23. nodev1.UnimplementedNodePrivilegedServiceServer
  24. injectC chan<- *vaa.VAA
  25. obsvReqSendC chan *gossipv1.ObservationRequest
  26. logger *zap.Logger
  27. }
  28. // adminGuardianSetUpdateToVAA converts a nodev1.GuardianSetUpdate message to its canonical VAA representation.
  29. // Returns an error if the data is invalid.
  30. func adminGuardianSetUpdateToVAA(req *nodev1.GuardianSetUpdate, guardianSetIndex uint32, timestamp uint32) (*vaa.VAA, error) {
  31. if len(req.Guardians) == 0 {
  32. return nil, errors.New("empty guardian set specified")
  33. }
  34. if len(req.Guardians) > common.MaxGuardianCount {
  35. return nil, fmt.Errorf("too many guardians - %d, maximum is %d", len(req.Guardians), common.MaxGuardianCount)
  36. }
  37. addrs := make([]ethcommon.Address, len(req.Guardians))
  38. for i, g := range req.Guardians {
  39. if !ethcommon.IsHexAddress(g.Pubkey) {
  40. return nil, fmt.Errorf("invalid pubkey format at index %d (%s)", i, g.Name)
  41. }
  42. ethAddr := ethcommon.HexToAddress(g.Pubkey)
  43. for j, pk := range addrs {
  44. if pk == ethAddr {
  45. return nil, fmt.Errorf("duplicate pubkey at index %d (duplicate of %d): %s", i, j, g.Name)
  46. }
  47. }
  48. addrs[i] = ethAddr
  49. }
  50. v := &vaa.VAA{
  51. Version: vaa.SupportedVAAVersion,
  52. GuardianSetIndex: guardianSetIndex,
  53. Timestamp: time.Unix(int64(timestamp), 0),
  54. Payload: &vaa.BodyGuardianSetUpdate{
  55. Keys: addrs,
  56. NewIndex: guardianSetIndex + 1,
  57. },
  58. }
  59. return v, nil
  60. }
  61. // adminContractUpgradeToVAA converts a nodev1.ContractUpgrade message to its canonical VAA representation.
  62. // Returns an error if the data is invalid.
  63. func adminContractUpgradeToVAA(req *nodev1.ContractUpgrade, guardianSetIndex uint32, timestamp uint32) (*vaa.VAA, error) {
  64. if len(req.NewContract) != 32 {
  65. return nil, errors.New("invalid new_contract address")
  66. }
  67. if req.ChainId > math.MaxUint8 {
  68. return nil, errors.New("invalid chain_id")
  69. }
  70. newContractAddress := vaa.Address{}
  71. copy(newContractAddress[:], req.NewContract)
  72. v := &vaa.VAA{
  73. Version: vaa.SupportedVAAVersion,
  74. GuardianSetIndex: guardianSetIndex,
  75. Timestamp: time.Unix(int64(timestamp), 0),
  76. Payload: &vaa.BodyContractUpgrade{
  77. ChainID: uint8(req.ChainId),
  78. NewContract: newContractAddress,
  79. },
  80. }
  81. return v, nil
  82. }
  83. func (s *nodePrivilegedService) InjectGovernanceVAA(ctx context.Context, req *nodev1.InjectGovernanceVAARequest) (*nodev1.InjectGovernanceVAAResponse, error) {
  84. s.logger.Info("governance VAA injected via admin socket", zap.String("request", req.String()))
  85. var (
  86. v *vaa.VAA
  87. err error
  88. )
  89. digests := make([][]byte, len(req.Messages))
  90. for i, message := range req.Messages {
  91. switch payload := message.Payload.(type) {
  92. case *nodev1.GovernanceMessage_GuardianSet:
  93. v, err = adminGuardianSetUpdateToVAA(payload.GuardianSet, req.CurrentSetIndex, message.Timestamp)
  94. case *nodev1.GovernanceMessage_ContractUpgrade:
  95. v, err = adminContractUpgradeToVAA(payload.ContractUpgrade, req.CurrentSetIndex, message.Timestamp)
  96. default:
  97. panic(fmt.Sprintf("unsupported VAA type: %T", payload))
  98. }
  99. if err != nil {
  100. return nil, status.Error(codes.InvalidArgument, err.Error())
  101. }
  102. // Generate digest of the unsigned VAA.
  103. digest, err := v.SigningMsg()
  104. if err != nil {
  105. panic(err)
  106. }
  107. s.logger.Info("governance VAA constructed",
  108. zap.Any("vaa", v),
  109. zap.String("digest", digest.String()),
  110. )
  111. s.injectC <- v
  112. digests[i] = digest.Bytes()
  113. }
  114. return &nodev1.InjectGovernanceVAAResponse{Digests: digests}, nil
  115. }
  116. func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- *vaa.VAA, obsvReqSendC chan *gossipv1.ObservationRequest, gst *common.GuardianSetState) (supervisor.Runnable, error) {
  117. // Delete existing UNIX socket, if present.
  118. fi, err := os.Stat(socketPath)
  119. if err == nil {
  120. fmode := fi.Mode()
  121. if fmode&os.ModeType == os.ModeSocket {
  122. err = os.Remove(socketPath)
  123. if err != nil {
  124. return nil, fmt.Errorf("failed to remove existing socket at %s: %w", socketPath, err)
  125. }
  126. } else {
  127. return nil, fmt.Errorf("%s is not a UNIX socket", socketPath)
  128. }
  129. }
  130. // Create a new UNIX socket and listen to it.
  131. // The socket is created with the default umask. We set a restrictive umask in setRestrictiveUmask
  132. // to ensure that any files we create are only readable by the user - this is much harder to mess up.
  133. // The umask avoids a race condition between file creation and chmod.
  134. laddr, err := net.ResolveUnixAddr("unix", socketPath)
  135. if err != nil {
  136. return nil, fmt.Errorf("invalid listen address: %w", err)
  137. }
  138. l, err := net.ListenUnix("unix", laddr)
  139. if err != nil {
  140. return nil, fmt.Errorf("failed to listen on %s: %w", socketPath, err)
  141. }
  142. logger.Info("admin server listening on", zap.String("path", socketPath))
  143. nodeService := &nodePrivilegedService{
  144. injectC: injectC,
  145. obsvReqSendC: obsvReqSendC,
  146. logger: logger.Named("adminservice"),
  147. }
  148. publicrpcService := publicrpc.NewPublicrpcServer(logger, gst)
  149. grpcServer := common.NewInstrumentedGRPCServer(logger)
  150. nodev1.RegisterNodePrivilegedServiceServer(grpcServer, nodeService)
  151. publicrpcv1.RegisterPublicRPCServiceServer(grpcServer, publicrpcService)
  152. return supervisor.GRPCServer(grpcServer, l, false), nil
  153. }
  154. func (s *nodePrivilegedService) SendObservationRequest(ctx context.Context, req *nodev1.SendObservationRequestRequest) (*nodev1.SendObservationRequestResponse, error) {
  155. s.obsvReqSendC <- req.ObservationRequest
  156. s.logger.Info("sent observation request", zap.Any("request", req.ObservationRequest))
  157. return &nodev1.SendObservationRequestResponse{}, nil
  158. }