adminserver.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805
  1. package guardiand
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/ecdsa"
  6. "encoding/base64"
  7. "encoding/hex"
  8. "encoding/json"
  9. "errors"
  10. "fmt"
  11. "math"
  12. "math/big"
  13. "math/rand"
  14. "net"
  15. "net/http"
  16. "os"
  17. "sync"
  18. "time"
  19. "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors"
  20. ethcrypto "github.com/ethereum/go-ethereum/crypto"
  21. "github.com/holiman/uint256"
  22. "golang.org/x/exp/slices"
  23. "github.com/certusone/wormhole/node/pkg/db"
  24. "github.com/certusone/wormhole/node/pkg/governor"
  25. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  26. publicrpcv1 "github.com/certusone/wormhole/node/pkg/proto/publicrpc/v1"
  27. "github.com/certusone/wormhole/node/pkg/publicrpc"
  28. ethcommon "github.com/ethereum/go-ethereum/common"
  29. "go.uber.org/zap"
  30. "google.golang.org/grpc/codes"
  31. "google.golang.org/grpc/status"
  32. "github.com/certusone/wormhole/node/pkg/common"
  33. nodev1 "github.com/certusone/wormhole/node/pkg/proto/node/v1"
  34. "github.com/certusone/wormhole/node/pkg/supervisor"
  35. "github.com/wormhole-foundation/wormhole/sdk/vaa"
  36. )
  37. type nodePrivilegedService struct {
  38. nodev1.UnimplementedNodePrivilegedServiceServer
  39. db *db.Database
  40. injectC chan<- *vaa.VAA
  41. obsvReqSendC chan<- *gossipv1.ObservationRequest
  42. logger *zap.Logger
  43. signedInC chan<- *gossipv1.SignedVAAWithQuorum
  44. governor *governor.ChainGovernor
  45. evmConnector connectors.Connector
  46. gsCache sync.Map
  47. gk *ecdsa.PrivateKey
  48. guardianAddress ethcommon.Address
  49. }
  50. // adminGuardianSetUpdateToVAA converts a nodev1.GuardianSetUpdate message to its canonical VAA representation.
  51. // Returns an error if the data is invalid.
  52. func adminGuardianSetUpdateToVAA(req *nodev1.GuardianSetUpdate, timestamp time.Time, guardianSetIndex uint32, nonce uint32, sequence uint64) (*vaa.VAA, error) {
  53. if len(req.Guardians) == 0 {
  54. return nil, errors.New("empty guardian set specified")
  55. }
  56. if len(req.Guardians) > common.MaxGuardianCount {
  57. return nil, fmt.Errorf("too many guardians - %d, maximum is %d", len(req.Guardians), common.MaxGuardianCount)
  58. }
  59. addrs := make([]ethcommon.Address, len(req.Guardians))
  60. for i, g := range req.Guardians {
  61. if !ethcommon.IsHexAddress(g.Pubkey) {
  62. return nil, fmt.Errorf("invalid pubkey format at index %d (%s)", i, g.Name)
  63. }
  64. ethAddr := ethcommon.HexToAddress(g.Pubkey)
  65. for j, pk := range addrs {
  66. if pk == ethAddr {
  67. return nil, fmt.Errorf("duplicate pubkey at index %d (duplicate of %d): %s", i, j, g.Name)
  68. }
  69. }
  70. addrs[i] = ethAddr
  71. }
  72. v := vaa.CreateGovernanceVAA(timestamp, nonce, sequence, guardianSetIndex,
  73. vaa.BodyGuardianSetUpdate{
  74. Keys: addrs,
  75. NewIndex: guardianSetIndex + 1,
  76. }.Serialize())
  77. return v, nil
  78. }
  79. // adminContractUpgradeToVAA converts a nodev1.ContractUpgrade message to its canonical VAA representation.
  80. // Returns an error if the data is invalid.
  81. func adminContractUpgradeToVAA(req *nodev1.ContractUpgrade, timestamp time.Time, guardianSetIndex uint32, nonce uint32, sequence uint64) (*vaa.VAA, error) {
  82. b, err := hex.DecodeString(req.NewContract)
  83. if err != nil {
  84. return nil, errors.New("invalid new contract address encoding (expected hex)")
  85. }
  86. if len(b) != 32 {
  87. return nil, errors.New("invalid new_contract address")
  88. }
  89. if req.ChainId > math.MaxUint16 {
  90. return nil, errors.New("invalid chain_id")
  91. }
  92. newContractAddress := vaa.Address{}
  93. copy(newContractAddress[:], b)
  94. v := vaa.CreateGovernanceVAA(timestamp, nonce, sequence, guardianSetIndex,
  95. vaa.BodyContractUpgrade{
  96. ChainID: vaa.ChainID(req.ChainId),
  97. NewContract: newContractAddress,
  98. }.Serialize())
  99. return v, nil
  100. }
  101. // tokenBridgeRegisterChain converts a nodev1.TokenBridgeRegisterChain message to its canonical VAA representation.
  102. // Returns an error if the data is invalid.
  103. func tokenBridgeRegisterChain(req *nodev1.BridgeRegisterChain, timestamp time.Time, guardianSetIndex uint32, nonce uint32, sequence uint64) (*vaa.VAA, error) {
  104. if req.ChainId > math.MaxUint16 {
  105. return nil, errors.New("invalid chain_id")
  106. }
  107. b, err := hex.DecodeString(req.EmitterAddress)
  108. if err != nil {
  109. return nil, errors.New("invalid emitter address encoding (expected hex)")
  110. }
  111. if len(b) != 32 {
  112. return nil, errors.New("invalid emitter address (expected 32 bytes)")
  113. }
  114. emitterAddress := vaa.Address{}
  115. copy(emitterAddress[:], b)
  116. v := vaa.CreateGovernanceVAA(timestamp, nonce, sequence, guardianSetIndex,
  117. vaa.BodyTokenBridgeRegisterChain{
  118. Module: req.Module,
  119. ChainID: vaa.ChainID(req.ChainId),
  120. EmitterAddress: emitterAddress,
  121. }.Serialize())
  122. return v, nil
  123. }
  124. // tokenBridgeUpgradeContract converts a nodev1.TokenBridgeRegisterChain message to its canonical VAA representation.
  125. // Returns an error if the data is invalid.
  126. func tokenBridgeModifyBalance(req *nodev1.BridgeModifyBalance, timestamp time.Time, guardianSetIndex uint32, nonce uint32, sequence uint64) (*vaa.VAA, error) {
  127. if req.TargetChainId > math.MaxUint16 {
  128. return nil, errors.New("invalid target_chain_id")
  129. }
  130. if req.ChainId > math.MaxUint16 {
  131. return nil, errors.New("invalid chain_id")
  132. }
  133. if req.TokenChain > math.MaxUint16 {
  134. return nil, errors.New("invalid token_chain")
  135. }
  136. b, err := hex.DecodeString(req.TokenAddress)
  137. if err != nil {
  138. return nil, errors.New("invalid token address (expected hex)")
  139. }
  140. if len(b) != 32 {
  141. return nil, errors.New("invalid new token address (expected 32 bytes)")
  142. }
  143. if len(req.Reason) > 32 {
  144. return nil, errors.New("the reason should not be larger than 32 bytes")
  145. }
  146. amount_big := big.NewInt(0)
  147. amount_big, ok := amount_big.SetString(req.Amount, 10)
  148. if !ok {
  149. return nil, errors.New("invalid amount")
  150. }
  151. // uint256 has Bytes32 method for easier serialization
  152. amount, ok := uint256.FromBig(amount_big)
  153. if !ok {
  154. return nil, errors.New("invalid amount")
  155. }
  156. tokenAdress := vaa.Address{}
  157. copy(tokenAdress[:], b)
  158. v := vaa.CreateGovernanceVAA(timestamp, nonce, sequence, guardianSetIndex,
  159. vaa.BodyTokenBridgeModifyBalance{
  160. Module: req.Module,
  161. TargetChainID: vaa.ChainID(req.TargetChainId),
  162. Sequence: req.Sequence,
  163. ChainId: vaa.ChainID(req.ChainId),
  164. TokenChain: vaa.ChainID(req.TokenChain),
  165. TokenAddress: tokenAdress,
  166. Kind: uint8(req.Kind),
  167. Amount: amount,
  168. Reason: req.Reason,
  169. }.Serialize())
  170. return v, nil
  171. }
  172. // tokenBridgeUpgradeContract converts a nodev1.TokenBridgeRegisterChain message to its canonical VAA representation.
  173. // Returns an error if the data is invalid.
  174. func tokenBridgeUpgradeContract(req *nodev1.BridgeUpgradeContract, timestamp time.Time, guardianSetIndex uint32, nonce uint32, sequence uint64) (*vaa.VAA, error) {
  175. if req.TargetChainId > math.MaxUint16 {
  176. return nil, errors.New("invalid target_chain_id")
  177. }
  178. b, err := hex.DecodeString(req.NewContract)
  179. if err != nil {
  180. return nil, errors.New("invalid new contract address (expected hex)")
  181. }
  182. if len(b) != 32 {
  183. return nil, errors.New("invalid new contract address (expected 32 bytes)")
  184. }
  185. newContract := vaa.Address{}
  186. copy(newContract[:], b)
  187. v := vaa.CreateGovernanceVAA(timestamp, nonce, sequence, guardianSetIndex,
  188. vaa.BodyTokenBridgeUpgradeContract{
  189. Module: req.Module,
  190. TargetChainID: vaa.ChainID(req.TargetChainId),
  191. NewContract: newContract,
  192. }.Serialize())
  193. return v, nil
  194. }
  195. // wormchainStoreCode converts a nodev1.WormchainStoreCode to its canonical VAA representation
  196. // Returns an error if the data is invalid
  197. func wormchainStoreCode(req *nodev1.WormchainStoreCode, timestamp time.Time, guardianSetIndex uint32, nonce uint32, sequence uint64) (*vaa.VAA, error) {
  198. // validate the length of the hex passed in
  199. b, err := hex.DecodeString(req.WasmHash)
  200. if err != nil {
  201. return nil, fmt.Errorf("invalid cosmwasm bytecode hash (expected hex): %w", err)
  202. }
  203. if len(b) != 32 {
  204. return nil, fmt.Errorf("invalid cosmwasm bytecode hash (expected 32 bytes but received %d bytes)", len(b))
  205. }
  206. wasmHash := [32]byte{}
  207. copy(wasmHash[:], b)
  208. v := vaa.CreateGovernanceVAA(timestamp, nonce, sequence, guardianSetIndex,
  209. vaa.BodyWormchainStoreCode{
  210. WasmHash: wasmHash,
  211. }.Serialize())
  212. return v, nil
  213. }
  214. // wormchainInstantiateContract converts a nodev1.WormchainInstantiateContract to its canonical VAA representation
  215. // Returns an error if the data is invalid
  216. func wormchainInstantiateContract(req *nodev1.WormchainInstantiateContract, timestamp time.Time, guardianSetIndex uint32, nonce uint32, sequence uint64) (*vaa.VAA, error) {
  217. instantiationParams_hash := vaa.CreateInstatiateCosmwasmContractHash(req.CodeId, req.Label, []byte(req.InstantiationMsg))
  218. v := vaa.CreateGovernanceVAA(timestamp, nonce, sequence, guardianSetIndex,
  219. vaa.BodyWormchainInstantiateContract{
  220. InstantiationParamsHash: instantiationParams_hash,
  221. }.Serialize())
  222. return v, nil
  223. }
  224. // wormchainMigrateContract converts a nodev1.WormchainMigrateContract to its canonical VAA representation
  225. func wormchainMigrateContract(req *nodev1.WormchainMigrateContract, timestamp time.Time, guardianSetIndex uint32, nonce uint32, sequence uint64) (*vaa.VAA, error) {
  226. instantiationParams_hash := vaa.CreateMigrateCosmwasmContractHash(req.CodeId, req.Contract, []byte(req.InstantiationMsg))
  227. v := vaa.CreateGovernanceVAA(timestamp, nonce, sequence, guardianSetIndex,
  228. vaa.BodyWormchainMigrateContract{
  229. MigrationParamsHash: instantiationParams_hash,
  230. }.Serialize())
  231. return v, nil
  232. }
  233. func (s *nodePrivilegedService) InjectGovernanceVAA(ctx context.Context, req *nodev1.InjectGovernanceVAARequest) (*nodev1.InjectGovernanceVAAResponse, error) {
  234. s.logger.Info("governance VAA injected via admin socket", zap.String("request", req.String()))
  235. var (
  236. v *vaa.VAA
  237. err error
  238. )
  239. timestamp := time.Unix(int64(req.Timestamp), 0)
  240. digests := make([][]byte, len(req.Messages))
  241. for i, message := range req.Messages {
  242. switch payload := message.Payload.(type) {
  243. case *nodev1.GovernanceMessage_GuardianSet:
  244. v, err = adminGuardianSetUpdateToVAA(payload.GuardianSet, timestamp, req.CurrentSetIndex, message.Nonce, message.Sequence)
  245. case *nodev1.GovernanceMessage_ContractUpgrade:
  246. v, err = adminContractUpgradeToVAA(payload.ContractUpgrade, timestamp, req.CurrentSetIndex, message.Nonce, message.Sequence)
  247. case *nodev1.GovernanceMessage_BridgeRegisterChain:
  248. v, err = tokenBridgeRegisterChain(payload.BridgeRegisterChain, timestamp, req.CurrentSetIndex, message.Nonce, message.Sequence)
  249. case *nodev1.GovernanceMessage_BridgeContractUpgrade:
  250. v, err = tokenBridgeUpgradeContract(payload.BridgeContractUpgrade, timestamp, req.CurrentSetIndex, message.Nonce, message.Sequence)
  251. case *nodev1.GovernanceMessage_BridgeModifyBalance:
  252. v, err = tokenBridgeModifyBalance(payload.BridgeModifyBalance, timestamp, req.CurrentSetIndex, message.Nonce, message.Sequence)
  253. case *nodev1.GovernanceMessage_WormchainStoreCode:
  254. v, err = wormchainStoreCode(payload.WormchainStoreCode, timestamp, req.CurrentSetIndex, message.Nonce, message.Sequence)
  255. case *nodev1.GovernanceMessage_WormchainInstantiateContract:
  256. v, err = wormchainInstantiateContract(payload.WormchainInstantiateContract, timestamp, req.CurrentSetIndex, message.Nonce, message.Sequence)
  257. case *nodev1.GovernanceMessage_WormchainMigrateContract:
  258. v, err = wormchainMigrateContract(payload.WormchainMigrateContract, timestamp, req.CurrentSetIndex, message.Nonce, message.Sequence)
  259. default:
  260. panic(fmt.Sprintf("unsupported VAA type: %T", payload))
  261. }
  262. if err != nil {
  263. return nil, status.Error(codes.InvalidArgument, err.Error())
  264. }
  265. // Generate digest of the unsigned VAA.
  266. digest := v.SigningMsg()
  267. s.logger.Info("governance VAA constructed",
  268. zap.Any("vaa", v),
  269. zap.String("digest", digest.String()),
  270. )
  271. s.injectC <- v
  272. digests[i] = digest.Bytes()
  273. }
  274. return &nodev1.InjectGovernanceVAAResponse{Digests: digests}, nil
  275. }
  276. // fetchMissing attempts to backfill a gap by fetching and storing missing signed VAAs from the network.
  277. // Returns true if the gap was filled, false otherwise.
  278. func (s *nodePrivilegedService) fetchMissing(
  279. ctx context.Context,
  280. nodes []string,
  281. c *http.Client,
  282. chain vaa.ChainID,
  283. addr string,
  284. seq uint64) (bool, error) {
  285. // shuffle the list of public RPC endpoints
  286. rand.Shuffle(len(nodes), func(i, j int) {
  287. nodes[i], nodes[j] = nodes[j], nodes[i]
  288. })
  289. ctx, cancel := context.WithTimeout(ctx, time.Second)
  290. defer cancel()
  291. for _, node := range nodes {
  292. req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf(
  293. "%s/v1/signed_vaa/%d/%s/%d", node, chain, addr, seq), nil)
  294. if err != nil {
  295. return false, fmt.Errorf("failed to create request: %w", err)
  296. }
  297. resp, err := c.Do(req)
  298. if err != nil {
  299. s.logger.Warn("failed to fetch missing VAA",
  300. zap.String("node", node),
  301. zap.String("chain", chain.String()),
  302. zap.String("address", addr),
  303. zap.Uint64("sequence", seq),
  304. zap.Error(err),
  305. )
  306. continue
  307. }
  308. switch resp.StatusCode {
  309. case http.StatusNotFound:
  310. resp.Body.Close()
  311. continue
  312. case http.StatusOK:
  313. type getVaaResp struct {
  314. VaaBytes string `json:"vaaBytes"`
  315. }
  316. var respBody getVaaResp
  317. if err := json.NewDecoder(resp.Body).Decode(&respBody); err != nil {
  318. resp.Body.Close()
  319. s.logger.Warn("failed to decode VAA response",
  320. zap.String("node", node),
  321. zap.String("chain", chain.String()),
  322. zap.String("address", addr),
  323. zap.Uint64("sequence", seq),
  324. zap.Error(err),
  325. )
  326. continue
  327. }
  328. // base64 decode the VAA bytes
  329. vaaBytes, err := base64.StdEncoding.DecodeString(respBody.VaaBytes)
  330. if err != nil {
  331. resp.Body.Close()
  332. s.logger.Warn("failed to decode VAA body",
  333. zap.String("node", node),
  334. zap.String("chain", chain.String()),
  335. zap.String("address", addr),
  336. zap.Uint64("sequence", seq),
  337. zap.Error(err),
  338. )
  339. continue
  340. }
  341. s.logger.Info("backfilled VAA",
  342. zap.Uint16("chain", uint16(chain)),
  343. zap.String("address", addr),
  344. zap.Uint64("sequence", seq),
  345. zap.Int("numBytes", len(vaaBytes)),
  346. )
  347. // Inject into the gossip signed VAA receive path.
  348. // This has the same effect as if the VAA was received from the network
  349. // (verifying signature, publishing to BigTable, storing in local DB...).
  350. s.signedInC <- &gossipv1.SignedVAAWithQuorum{
  351. Vaa: vaaBytes,
  352. }
  353. resp.Body.Close()
  354. return true, nil
  355. default:
  356. resp.Body.Close()
  357. return false, fmt.Errorf("unexpected response status: %d", resp.StatusCode)
  358. }
  359. }
  360. return false, nil
  361. }
  362. func (s *nodePrivilegedService) FindMissingMessages(ctx context.Context, req *nodev1.FindMissingMessagesRequest) (*nodev1.FindMissingMessagesResponse, error) {
  363. b, err := hex.DecodeString(req.EmitterAddress)
  364. if err != nil {
  365. return nil, status.Errorf(codes.InvalidArgument, "invalid emitter address encoding: %v", err)
  366. }
  367. emitterAddress := vaa.Address{}
  368. copy(emitterAddress[:], b)
  369. ids, first, last, err := s.db.FindEmitterSequenceGap(db.VAAID{
  370. EmitterChain: vaa.ChainID(req.EmitterChain),
  371. EmitterAddress: emitterAddress,
  372. })
  373. if err != nil {
  374. return nil, status.Errorf(codes.Internal, "database operation failed: %v", err)
  375. }
  376. if req.RpcBackfill {
  377. c := &http.Client{}
  378. unfilled := make([]uint64, 0, len(ids))
  379. for _, id := range ids {
  380. if ok, err := s.fetchMissing(ctx, req.BackfillNodes, c, vaa.ChainID(req.EmitterChain), emitterAddress.String(), id); err != nil {
  381. return nil, status.Errorf(codes.Internal, "failed to backfill VAA: %v", err)
  382. } else if ok {
  383. continue
  384. }
  385. unfilled = append(unfilled, id)
  386. }
  387. ids = unfilled
  388. }
  389. resp := make([]string, len(ids))
  390. for i, v := range ids {
  391. resp[i] = fmt.Sprintf("%d/%s/%d", req.EmitterChain, emitterAddress, v)
  392. }
  393. return &nodev1.FindMissingMessagesResponse{
  394. MissingMessages: resp,
  395. FirstSequence: first,
  396. LastSequence: last,
  397. }, nil
  398. }
  399. func adminServiceRunnable(
  400. logger *zap.Logger,
  401. socketPath string,
  402. injectC chan<- *vaa.VAA,
  403. signedInC chan<- *gossipv1.SignedVAAWithQuorum,
  404. obsvReqSendC chan<- *gossipv1.ObservationRequest,
  405. db *db.Database,
  406. gst *common.GuardianSetState,
  407. gov *governor.ChainGovernor,
  408. gk *ecdsa.PrivateKey,
  409. ethRpc *string,
  410. ethContract *string,
  411. ) (supervisor.Runnable, error) {
  412. // Delete existing UNIX socket, if present.
  413. fi, err := os.Stat(socketPath)
  414. if err == nil {
  415. fmode := fi.Mode()
  416. if fmode&os.ModeType == os.ModeSocket {
  417. err = os.Remove(socketPath)
  418. if err != nil {
  419. return nil, fmt.Errorf("failed to remove existing socket at %s: %w", socketPath, err)
  420. }
  421. } else {
  422. return nil, fmt.Errorf("%s is not a UNIX socket", socketPath)
  423. }
  424. }
  425. // Create a new UNIX socket and listen to it.
  426. // The socket is created with the default umask. We set a restrictive umask in setRestrictiveUmask
  427. // to ensure that any files we create are only readable by the user - this is much harder to mess up.
  428. // The umask avoids a race condition between file creation and chmod.
  429. laddr, err := net.ResolveUnixAddr("unix", socketPath)
  430. if err != nil {
  431. return nil, fmt.Errorf("invalid listen address: %v", err)
  432. }
  433. l, err := net.ListenUnix("unix", laddr)
  434. if err != nil {
  435. return nil, fmt.Errorf("failed to listen on %s: %w", socketPath, err)
  436. }
  437. logger.Info("admin server listening on", zap.String("path", socketPath))
  438. ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
  439. defer cancel()
  440. var evmConnector connectors.Connector
  441. if ethRPC != nil && ethContract != nil {
  442. contract := ethcommon.HexToAddress(*ethContract)
  443. evmConnector, err = connectors.NewEthereumConnector(ctx, "eth", *ethRpc, contract, logger)
  444. if err != nil {
  445. return nil, fmt.Errorf("failed to connecto to ethereum")
  446. }
  447. }
  448. nodeService := &nodePrivilegedService{
  449. db: db,
  450. injectC: injectC,
  451. obsvReqSendC: obsvReqSendC,
  452. logger: logger.Named("adminservice"),
  453. signedInC: signedInC,
  454. governor: gov,
  455. gk: gk,
  456. guardianAddress: ethcrypto.PubkeyToAddress(gk.PublicKey),
  457. evmConnector: evmConnector,
  458. }
  459. publicrpcService := publicrpc.NewPublicrpcServer(logger, db, gst, gov)
  460. grpcServer := common.NewInstrumentedGRPCServer(logger)
  461. nodev1.RegisterNodePrivilegedServiceServer(grpcServer, nodeService)
  462. publicrpcv1.RegisterPublicRPCServiceServer(grpcServer, publicrpcService)
  463. return supervisor.GRPCServer(grpcServer, l, false), nil
  464. }
  465. func (s *nodePrivilegedService) SendObservationRequest(ctx context.Context, req *nodev1.SendObservationRequestRequest) (*nodev1.SendObservationRequestResponse, error) {
  466. if err := common.PostObservationRequest(s.obsvReqSendC, req.ObservationRequest); err != nil {
  467. return nil, err
  468. }
  469. s.logger.Info("sent observation request", zap.Any("request", req.ObservationRequest))
  470. return &nodev1.SendObservationRequestResponse{}, nil
  471. }
  472. func (s *nodePrivilegedService) ChainGovernorStatus(ctx context.Context, req *nodev1.ChainGovernorStatusRequest) (*nodev1.ChainGovernorStatusResponse, error) {
  473. if s.governor == nil {
  474. return nil, fmt.Errorf("chain governor is not enabled")
  475. }
  476. return &nodev1.ChainGovernorStatusResponse{
  477. Response: s.governor.Status(),
  478. }, nil
  479. }
  480. func (s *nodePrivilegedService) ChainGovernorReload(ctx context.Context, req *nodev1.ChainGovernorReloadRequest) (*nodev1.ChainGovernorReloadResponse, error) {
  481. if s.governor == nil {
  482. return nil, fmt.Errorf("chain governor is not enabled")
  483. }
  484. resp, err := s.governor.Reload()
  485. if err != nil {
  486. return nil, err
  487. }
  488. return &nodev1.ChainGovernorReloadResponse{
  489. Response: resp,
  490. }, nil
  491. }
  492. func (s *nodePrivilegedService) ChainGovernorDropPendingVAA(ctx context.Context, req *nodev1.ChainGovernorDropPendingVAARequest) (*nodev1.ChainGovernorDropPendingVAAResponse, error) {
  493. if s.governor == nil {
  494. return nil, fmt.Errorf("chain governor is not enabled")
  495. }
  496. if len(req.VaaId) == 0 {
  497. return nil, fmt.Errorf("the VAA id must be specified as \"chainId/emitterAddress/seqNum\"")
  498. }
  499. resp, err := s.governor.DropPendingVAA(req.VaaId)
  500. if err != nil {
  501. return nil, err
  502. }
  503. return &nodev1.ChainGovernorDropPendingVAAResponse{
  504. Response: resp,
  505. }, nil
  506. }
  507. func (s *nodePrivilegedService) ChainGovernorReleasePendingVAA(ctx context.Context, req *nodev1.ChainGovernorReleasePendingVAARequest) (*nodev1.ChainGovernorReleasePendingVAAResponse, error) {
  508. if s.governor == nil {
  509. return nil, fmt.Errorf("chain governor is not enabled")
  510. }
  511. if len(req.VaaId) == 0 {
  512. return nil, fmt.Errorf("the VAA id must be specified as \"chainId/emitterAddress/seqNum\"")
  513. }
  514. resp, err := s.governor.ReleasePendingVAA(req.VaaId)
  515. if err != nil {
  516. return nil, err
  517. }
  518. return &nodev1.ChainGovernorReleasePendingVAAResponse{
  519. Response: resp,
  520. }, nil
  521. }
  522. func (s *nodePrivilegedService) ChainGovernorResetReleaseTimer(ctx context.Context, req *nodev1.ChainGovernorResetReleaseTimerRequest) (*nodev1.ChainGovernorResetReleaseTimerResponse, error) {
  523. if s.governor == nil {
  524. return nil, fmt.Errorf("chain governor is not enabled")
  525. }
  526. if len(req.VaaId) == 0 {
  527. return nil, fmt.Errorf("the VAA id must be specified as \"chainId/emitterAddress/seqNum\"")
  528. }
  529. resp, err := s.governor.ResetReleaseTimer(req.VaaId)
  530. if err != nil {
  531. return nil, err
  532. }
  533. return &nodev1.ChainGovernorResetReleaseTimerResponse{
  534. Response: resp,
  535. }, nil
  536. }
  537. func (s *nodePrivilegedService) PurgePythNetVaas(ctx context.Context, req *nodev1.PurgePythNetVaasRequest) (*nodev1.PurgePythNetVaasResponse, error) {
  538. prefix := db.VAAID{EmitterChain: vaa.ChainIDPythNet}
  539. oldestTime := time.Now().Add(-time.Hour * 24 * time.Duration(req.DaysOld))
  540. resp, err := s.db.PurgeVaas(prefix, oldestTime, req.LogOnly)
  541. if err != nil {
  542. return nil, err
  543. }
  544. return &nodev1.PurgePythNetVaasResponse{
  545. Response: resp,
  546. }, nil
  547. }
  548. func (s *nodePrivilegedService) SignExistingVAA(ctx context.Context, req *nodev1.SignExistingVAARequest) (*nodev1.SignExistingVAAResponse, error) {
  549. v, err := vaa.Unmarshal(req.Vaa)
  550. if err != nil {
  551. return nil, fmt.Errorf("failed to unmarshal VAA: %w", err)
  552. }
  553. if req.NewGuardianSetIndex <= v.GuardianSetIndex {
  554. return nil, errors.New("new guardian set index must be higher than provided VAA")
  555. }
  556. if s.evmConnector == nil {
  557. return nil, errors.New("the node needs to have an Ethereum connection configured to sign existing VAAs")
  558. }
  559. var gs *common.GuardianSet
  560. if cachedGs, exists := s.gsCache.Load(v.GuardianSetIndex); exists {
  561. gs = cachedGs.(*common.GuardianSet)
  562. } else {
  563. evmGs, err := s.evmConnector.GetGuardianSet(ctx, v.GuardianSetIndex)
  564. if err != nil {
  565. return nil, fmt.Errorf("failed to load guardian set [%d]: %w", v.GuardianSetIndex, err)
  566. }
  567. gs = &common.GuardianSet{
  568. Keys: evmGs.Keys,
  569. Index: v.GuardianSetIndex,
  570. }
  571. s.gsCache.Store(v.GuardianSetIndex, gs)
  572. }
  573. if slices.Index(gs.Keys, s.guardianAddress) != -1 {
  574. return nil, fmt.Errorf("local guardian is already on the old set")
  575. }
  576. // Verify VAA
  577. err = v.Verify(gs.Keys)
  578. if err != nil {
  579. return nil, fmt.Errorf("failed to verify existing VAA: %w", err)
  580. }
  581. if len(req.NewGuardianAddrs) > 255 {
  582. return nil, errors.New("new guardian set has too many guardians")
  583. }
  584. newGS := make([]ethcommon.Address, len(req.NewGuardianAddrs))
  585. for i, guardianString := range req.NewGuardianAddrs {
  586. guardianAddress := ethcommon.HexToAddress(guardianString)
  587. newGS[i] = guardianAddress
  588. }
  589. // Make sure there are no duplicates. Compact needs to take a sorted slice to remove all duplicates.
  590. newGSSorted := slices.Clone(newGS)
  591. slices.SortFunc(newGSSorted, func(a, b ethcommon.Address) bool {
  592. return bytes.Compare(a[:], b[:]) < 0
  593. })
  594. newGsLen := len(newGSSorted)
  595. if len(slices.Compact(newGSSorted)) != newGsLen {
  596. return nil, fmt.Errorf("duplicate guardians in the guardian set")
  597. }
  598. localGuardianIndex := slices.Index(newGS, s.guardianAddress)
  599. if localGuardianIndex == -1 {
  600. return nil, fmt.Errorf("local guardian is not a member of the new guardian set")
  601. }
  602. newVAA := &vaa.VAA{
  603. Version: v.Version,
  604. // Set the new guardian set index
  605. GuardianSetIndex: req.NewGuardianSetIndex,
  606. // Signatures will be repopulated
  607. Signatures: nil,
  608. Timestamp: v.Timestamp,
  609. Nonce: v.Nonce,
  610. Sequence: v.Sequence,
  611. ConsistencyLevel: v.ConsistencyLevel,
  612. EmitterChain: v.EmitterChain,
  613. EmitterAddress: v.EmitterAddress,
  614. Payload: v.Payload,
  615. }
  616. // Copy original VAA signatures
  617. for _, sig := range v.Signatures {
  618. signerAddress := gs.Keys[sig.Index]
  619. newIndex := slices.Index(newGS, signerAddress)
  620. // Guardian is not part of the new set
  621. if newIndex == -1 {
  622. continue
  623. }
  624. newVAA.Signatures = append(newVAA.Signatures, &vaa.Signature{
  625. Index: uint8(newIndex),
  626. Signature: sig.Signature,
  627. })
  628. }
  629. // Add our own signature only if the new guardian set would reach quorum
  630. if vaa.CalculateQuorum(len(newGS)) > len(newVAA.Signatures)+1 {
  631. return nil, errors.New("cannot reach quorum on new guardian set with the local signature")
  632. }
  633. // Add local signature
  634. newVAA.AddSignature(s.gk, uint8(localGuardianIndex))
  635. // Sort VAA signatures by guardian ID
  636. slices.SortFunc(newVAA.Signatures, func(a, b *vaa.Signature) bool {
  637. return a.Index < b.Index
  638. })
  639. newVAABytes, err := newVAA.Marshal()
  640. if err != nil {
  641. return nil, fmt.Errorf("failed to marshal new VAA: %w", err)
  642. }
  643. return &nodev1.SignExistingVAAResponse{Vaa: newVAABytes}, nil
  644. }
  645. func (s *nodePrivilegedService) DumpRPCs(ctx context.Context, req *nodev1.DumpRPCsRequest) (*nodev1.DumpRPCsResponse, error) {
  646. rpcMap := make(map[string]string)
  647. rpcMap["acalaRPC"] = *acalaRPC
  648. rpcMap["algorandIndexerRPC"] = *algorandIndexerRPC
  649. rpcMap["algorandAlgodRPC"] = *algorandAlgodRPC
  650. rpcMap["aptosRPC"] = *aptosRPC
  651. rpcMap["arbitrumRPC"] = *arbitrumRPC
  652. rpcMap["auroraRPC"] = *auroraRPC
  653. rpcMap["avalancheRPC"] = *avalancheRPC
  654. rpcMap["bscRPC"] = *bscRPC
  655. rpcMap["celoRPC"] = *celoRPC
  656. rpcMap["ethRPC"] = *ethRPC
  657. rpcMap["fantomRPC"] = *fantomRPC
  658. rpcMap["karuraRPC"] = *karuraRPC
  659. rpcMap["klaytnRPC"] = *klaytnRPC
  660. rpcMap["moonbeamRPC"] = *moonbeamRPC
  661. rpcMap["nearRPC"] = *nearRPC
  662. rpcMap["neonRPC"] = *neonRPC
  663. rpcMap["oasisRPC"] = *oasisRPC
  664. rpcMap["polygonRPC"] = *polygonRPC
  665. rpcMap["pythnetRPC"] = *pythnetRPC
  666. rpcMap["pythnetWS"] = *pythnetWS
  667. rpcMap["solanaRPC"] = *solanaRPC
  668. rpcMap["terraWS"] = *terraWS
  669. rpcMap["terraLCD"] = *terraLCD
  670. rpcMap["terra2WS"] = *terra2WS
  671. rpcMap["terra2LCD"] = *terra2LCD
  672. rpcMap["wormchainWS"] = *wormchainWS
  673. rpcMap["wormchainLCD"] = *wormchainLCD
  674. rpcMap["xplaWS"] = *xplaWS
  675. rpcMap["xplaLCD"] = *xplaLCD
  676. return &nodev1.DumpRPCsResponse{
  677. Response: rpcMap,
  678. }, nil
  679. }