| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347 |
- package guardiand
- import (
- "context"
- "encoding/hex"
- "errors"
- "fmt"
- "github.com/certusone/wormhole/node/pkg/db"
- publicrpcv1 "github.com/certusone/wormhole/node/pkg/proto/publicrpc/v1"
- "github.com/certusone/wormhole/node/pkg/publicrpc"
- ethcommon "github.com/ethereum/go-ethereum/common"
- "go.uber.org/zap"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
- "math"
- "math/rand"
- "net"
- "net/http"
- "os"
- "time"
- "github.com/certusone/wormhole/node/pkg/common"
- nodev1 "github.com/certusone/wormhole/node/pkg/proto/node/v1"
- "github.com/certusone/wormhole/node/pkg/supervisor"
- "github.com/certusone/wormhole/node/pkg/vaa"
- )
- type nodePrivilegedService struct {
- nodev1.UnimplementedNodePrivilegedServiceServer
- db *db.Database
- injectC chan<- *vaa.VAA
- logger *zap.Logger
- }
- // adminGuardianSetUpdateToVAA converts a nodev1.GuardianSetUpdate message to its canonical VAA representation.
- // Returns an error if the data is invalid.
- func adminGuardianSetUpdateToVAA(req *nodev1.GuardianSetUpdate, guardianSetIndex uint32, nonce uint32, sequence uint64) (*vaa.VAA, error) {
- if len(req.Guardians) == 0 {
- return nil, errors.New("empty guardian set specified")
- }
- if len(req.Guardians) > common.MaxGuardianCount {
- return nil, fmt.Errorf("too many guardians - %d, maximum is %d", len(req.Guardians), common.MaxGuardianCount)
- }
- addrs := make([]ethcommon.Address, len(req.Guardians))
- for i, g := range req.Guardians {
- if !ethcommon.IsHexAddress(g.Pubkey) {
- return nil, fmt.Errorf("invalid pubkey format at index %d (%s)", i, g.Name)
- }
- ethAddr := ethcommon.HexToAddress(g.Pubkey)
- for j, pk := range addrs {
- if pk == ethAddr {
- return nil, fmt.Errorf("duplicate pubkey at index %d (duplicate of %d): %s", i, j, g.Name)
- }
- }
- addrs[i] = ethAddr
- }
- v := vaa.CreateGovernanceVAA(nonce, sequence, guardianSetIndex,
- vaa.BodyGuardianSetUpdate{
- Keys: addrs,
- NewIndex: guardianSetIndex + 1,
- }.Serialize())
- return v, nil
- }
- // adminContractUpgradeToVAA converts a nodev1.ContractUpgrade message to its canonical VAA representation.
- // Returns an error if the data is invalid.
- func adminContractUpgradeToVAA(req *nodev1.ContractUpgrade, guardianSetIndex uint32, nonce uint32, sequence uint64) (*vaa.VAA, error) {
- b, err := hex.DecodeString(req.NewContract)
- if err != nil {
- return nil, errors.New("invalid new contract address encoding (expected hex)")
- }
- if len(b) != 32 {
- return nil, errors.New("invalid new_contract address")
- }
- if req.ChainId > math.MaxUint16 {
- return nil, errors.New("invalid chain_id")
- }
- newContractAddress := vaa.Address{}
- copy(newContractAddress[:], req.NewContract)
- v := vaa.CreateGovernanceVAA(nonce, sequence, guardianSetIndex,
- vaa.BodyContractUpgrade{
- ChainID: vaa.ChainID(req.ChainId),
- NewContract: newContractAddress,
- }.Serialize())
- return v, nil
- }
- // tokenBridgeRegisterChain converts a nodev1.TokenBridgeRegisterChain message to its canonical VAA representation.
- // Returns an error if the data is invalid.
- func tokenBridgeRegisterChain(req *nodev1.BridgeRegisterChain, guardianSetIndex uint32, nonce uint32, sequence uint64) (*vaa.VAA, error) {
- if req.ChainId > math.MaxUint16 {
- return nil, errors.New("invalid chain_id")
- }
- b, err := hex.DecodeString(req.EmitterAddress)
- if err != nil {
- return nil, errors.New("invalid emitter address encoding (expected hex)")
- }
- if len(b) != 32 {
- return nil, errors.New("invalid emitter address (expected 32 bytes)")
- }
- emitterAddress := vaa.Address{}
- copy(emitterAddress[:], b)
- v := vaa.CreateGovernanceVAA(nonce, sequence, guardianSetIndex,
- vaa.BodyTokenBridgeRegisterChain{
- Module: req.Module,
- ChainID: vaa.ChainID(req.ChainId),
- EmitterAddress: emitterAddress,
- }.Serialize())
- return v, nil
- }
- // tokenBridgeUpgradeContract converts a nodev1.TokenBridgeRegisterChain message to its canonical VAA representation.
- // Returns an error if the data is invalid.
- func tokenBridgeUpgradeContract(req *nodev1.BridgeUpgradeContract, guardianSetIndex uint32, nonce uint32, sequence uint64) (*vaa.VAA, error) {
- if req.TargetChainId > math.MaxUint16 {
- return nil, errors.New("invalid target_chain_id")
- }
- b, err := hex.DecodeString(req.NewContract)
- if err != nil {
- return nil, errors.New("invalid new contract address (expected hex)")
- }
- if len(b) != 32 {
- return nil, errors.New("invalid new contract address (expected 32 bytes)")
- }
- newContract := vaa.Address{}
- copy(newContract[:], b)
- v := vaa.CreateGovernanceVAA(nonce, sequence, guardianSetIndex,
- vaa.BodyTokenBridgeUpgradeContract{
- Module: req.Module,
- TargetChainID: vaa.ChainID(req.TargetChainId),
- NewContract: newContract,
- }.Serialize())
- return v, nil
- }
- func (s *nodePrivilegedService) InjectGovernanceVAA(ctx context.Context, req *nodev1.InjectGovernanceVAARequest) (*nodev1.InjectGovernanceVAAResponse, error) {
- s.logger.Info("governance VAA injected via admin socket", zap.String("request", req.String()))
- var (
- v *vaa.VAA
- err error
- )
- digests := make([][]byte, len(req.Messages))
- for i, message := range req.Messages {
- switch payload := message.Payload.(type) {
- case *nodev1.GovernanceMessage_GuardianSet:
- v, err = adminGuardianSetUpdateToVAA(payload.GuardianSet, req.CurrentSetIndex, message.Nonce, message.Sequence)
- case *nodev1.GovernanceMessage_ContractUpgrade:
- v, err = adminContractUpgradeToVAA(payload.ContractUpgrade, req.CurrentSetIndex, message.Nonce, message.Sequence)
- case *nodev1.GovernanceMessage_BridgeRegisterChain:
- v, err = tokenBridgeRegisterChain(payload.BridgeRegisterChain, req.CurrentSetIndex, message.Nonce, message.Sequence)
- case *nodev1.GovernanceMessage_BridgeContractUpgrade:
- v, err = tokenBridgeUpgradeContract(payload.BridgeContractUpgrade, req.CurrentSetIndex, message.Nonce, message.Sequence)
- default:
- panic(fmt.Sprintf("unsupported VAA type: %T", payload))
- }
- if err != nil {
- return nil, status.Error(codes.InvalidArgument, err.Error())
- }
- // Generate digest of the unsigned VAA.
- digest := v.SigningMsg()
- if err != nil {
- panic(err)
- }
- s.logger.Info("governance VAA constructed",
- zap.Any("vaa", v),
- zap.String("digest", digest.String()),
- )
- s.injectC <- v
- digests[i] = digest.Bytes()
- }
- return &nodev1.InjectGovernanceVAAResponse{Digests: digests}, nil
- }
- // fetchMissing attempts to backfill a gap by fetching missing signed VAAs from the network.
- // Returns true if the gap was filled, false otherwise.
- func (s *nodePrivilegedService) fetchMissing(
- ctx context.Context,
- nodes []string,
- c *http.Client,
- chain vaa.ChainID,
- addr string,
- seq uint64) (bool, error) {
- // shuffle the list of public RPC endpoints
- rand.Shuffle(len(nodes), func(i, j int) {
- nodes[i], nodes[j] = nodes[j], nodes[i]
- })
- ctx, cancel := context.WithTimeout(ctx, time.Second)
- defer cancel()
- for _, node := range nodes {
- req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf(
- "%s/v1/signed_vaa/%d/%s/%d", node, chain, addr, seq), nil)
- if err != nil {
- return false, fmt.Errorf("failed to create request: %w", err)
- }
- resp, err := c.Do(req)
- if err != nil {
- s.logger.Warn("failed to fetch missing VAA",
- zap.String("node", node),
- zap.String("chain", chain.String()),
- zap.String("address", addr),
- zap.Uint64("sequence", seq),
- zap.Error(err),
- )
- continue
- }
- switch resp.StatusCode {
- case http.StatusNotFound:
- resp.Body.Close()
- continue
- case http.StatusOK:
- s.logger.Info("backfilled VAA",
- zap.Uint16("chain", uint16(chain)),
- zap.String("address", addr),
- zap.Uint64("sequence", seq),
- )
- resp.Body.Close()
- return true, nil
- default:
- resp.Body.Close()
- return false, fmt.Errorf("unexpected response status: %d", resp.StatusCode)
- }
- }
- return false, nil
- }
- func (s *nodePrivilegedService) FindMissingMessages(ctx context.Context, req *nodev1.FindMissingMessagesRequest) (*nodev1.FindMissingMessagesResponse, error) {
- b, err := hex.DecodeString(req.EmitterAddress)
- if err != nil {
- return nil, status.Errorf(codes.InvalidArgument, "invalid emitter address encoding: %v", err)
- }
- emitterAddress := vaa.Address{}
- copy(emitterAddress[:], b)
- ids, first, last, err := s.db.FindEmitterSequenceGap(db.VAAID{
- EmitterChain: vaa.ChainID(req.EmitterChain),
- EmitterAddress: emitterAddress,
- })
- if err != nil {
- return nil, status.Errorf(codes.Internal, "database operation failed: %v", err)
- }
- if req.RpcBackfill {
- c := &http.Client{}
- unfilled := make([]uint64, 0, len(ids))
- for _, id := range ids {
- if ok, err := s.fetchMissing(ctx, req.BackfillNodes, c, vaa.ChainID(req.EmitterChain), emitterAddress.String(), id); err != nil {
- return nil, status.Errorf(codes.Internal, "failed to backfill VAA: %v", err)
- } else if ok {
- continue
- }
- unfilled = append(unfilled, id)
- }
- ids = unfilled
- }
- resp := make([]string, len(ids))
- for i, v := range ids {
- resp[i] = fmt.Sprintf("%d/%s/%d", req.EmitterChain, emitterAddress, v)
- }
- return &nodev1.FindMissingMessagesResponse{
- MissingMessages: resp,
- FirstSequence: first,
- LastSequence: last,
- }, nil
- }
- func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- *vaa.VAA, db *db.Database, gst *common.GuardianSetState) (supervisor.Runnable, error) {
- // Delete existing UNIX socket, if present.
- fi, err := os.Stat(socketPath)
- if err == nil {
- fmode := fi.Mode()
- if fmode&os.ModeType == os.ModeSocket {
- err = os.Remove(socketPath)
- if err != nil {
- return nil, fmt.Errorf("failed to remove existing socket at %s: %w", socketPath, err)
- }
- } else {
- return nil, fmt.Errorf("%s is not a UNIX socket", socketPath)
- }
- }
- // Create a new UNIX socket and listen to it.
- // The socket is created with the default umask. We set a restrictive umask in setRestrictiveUmask
- // to ensure that any files we create are only readable by the user - this is much harder to mess up.
- // The umask avoids a race condition between file creation and chmod.
- laddr, err := net.ResolveUnixAddr("unix", socketPath)
- if err != nil {
- return nil, fmt.Errorf("invalid listen address: %v", err)
- }
- l, err := net.ListenUnix("unix", laddr)
- if err != nil {
- return nil, fmt.Errorf("failed to listen on %s: %w", socketPath, err)
- }
- logger.Info("admin server listening on", zap.String("path", socketPath))
- nodeService := &nodePrivilegedService{
- injectC: injectC,
- db: db,
- logger: logger.Named("adminservice"),
- }
- publicrpcService := publicrpc.NewPublicrpcServer(logger, db, gst)
- grpcServer := common.NewInstrumentedGRPCServer(logger)
- nodev1.RegisterNodePrivilegedServiceServer(grpcServer, nodeService)
- publicrpcv1.RegisterPublicRPCServiceServer(grpcServer, publicrpcService)
- return supervisor.GRPCServer(grpcServer, l, false), nil
- }
|