Parcourir la source

node: add GetSignedVAA endpoint

Works:

$ curl 'http://localhost:7071/v1/signed_vaa/1/1268b2bf4a[...]/0'
{"vaaBytes":"AQAAAAABACbK50nrmgWPtTmRlYf/[...]"}

Bug: certusone/wormhole#282
Change-Id: I09eade00c4649c550f06a2efe350d6d9ff9da3ae
Leo il y a 4 ans
Parent
commit
723cf5fe95

+ 3 - 2
bridge/cmd/guardiand/adminserver.go

@@ -4,6 +4,7 @@ import (
 	"context"
 	"errors"
 	"fmt"
+	"github.com/certusone/wormhole/bridge/pkg/db"
 	publicrpcv1 "github.com/certusone/wormhole/bridge/pkg/proto/publicrpc/v1"
 	"github.com/certusone/wormhole/bridge/pkg/publicrpc"
 	"math"
@@ -131,7 +132,7 @@ func (s *nodePrivilegedService) InjectGovernanceVAA(ctx context.Context, req *no
 	return &nodev1.InjectGovernanceVAAResponse{Digest: digest.Bytes()}, nil
 }
 
-func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- *vaa.VAA, hl *publicrpc.RawHeartbeatConns) (supervisor.Runnable, error) {
+func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- *vaa.VAA, hl *publicrpc.RawHeartbeatConns, db *db.Database) (supervisor.Runnable, error) {
 	// Delete existing UNIX socket, if present.
 	fi, err := os.Stat(socketPath)
 	if err == nil {
@@ -165,7 +166,7 @@ func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<-
 		logger:  logger.Named("adminservice"),
 	}
 
-	publicrpcService := publicrpc.NewPublicrpcServer(logger, hl)
+	publicrpcService := publicrpc.NewPublicrpcServer(logger, hl, db)
 
 	grpcServer := grpc.NewServer()
 	nodev1.RegisterNodePrivilegedServer(grpcServer, nodeService)

+ 2 - 2
bridge/cmd/guardiand/bridge.go

@@ -385,13 +385,13 @@ func runBridge(cmd *cobra.Command, args []string) {
 
 	// subscriber channel multiplexing for public gPRC streams
 	rawHeartbeatListeners := publicrpc.HeartbeatStreamMultiplexer(logger)
-	publicrpcService, err := publicrpcServiceRunnable(logger, *publicRPC, rawHeartbeatListeners)
+	publicrpcService, err := publicrpcServiceRunnable(logger, *publicRPC, rawHeartbeatListeners, db)
 	if err != nil {
 		log.Fatal("failed to create publicrpc service socket", zap.Error(err))
 	}
 
 	// local admin service socket
-	adminService, err := adminServiceRunnable(logger, *adminSocketPath, injectC, rawHeartbeatListeners)
+	adminService, err := adminServiceRunnable(logger, *adminSocketPath, injectC, rawHeartbeatListeners, db)
 	if err != nil {
 		logger.Fatal("failed to create admin service socket", zap.Error(err))
 	}

+ 3 - 2
bridge/cmd/guardiand/publicrpc.go

@@ -2,6 +2,7 @@ package guardiand
 
 import (
 	"fmt"
+	"github.com/certusone/wormhole/bridge/pkg/db"
 	publicrpcv1 "github.com/certusone/wormhole/bridge/pkg/proto/publicrpc/v1"
 	"github.com/certusone/wormhole/bridge/pkg/publicrpc"
 	"github.com/certusone/wormhole/bridge/pkg/supervisor"
@@ -10,7 +11,7 @@ import (
 	"net"
 )
 
-func publicrpcServiceRunnable(logger *zap.Logger, listenAddr string, hl *publicrpc.RawHeartbeatConns) (supervisor.Runnable, error) {
+func publicrpcServiceRunnable(logger *zap.Logger, listenAddr string, hl *publicrpc.RawHeartbeatConns, db *db.Database) (supervisor.Runnable, error) {
 	l, err := net.Listen("tcp", listenAddr)
 	if err != nil {
 		return nil, fmt.Errorf("failed to listen: %w", err)
@@ -18,7 +19,7 @@ func publicrpcServiceRunnable(logger *zap.Logger, listenAddr string, hl *publicr
 
 	logger.Info("publicrpc server listening", zap.String("addr", l.Addr().String()))
 
-	rpcServer := publicrpc.NewPublicrpcServer(logger, hl)
+	rpcServer := publicrpc.NewPublicrpcServer(logger, hl, db)
 	grpcServer := grpc.NewServer()
 	publicrpcv1.RegisterPublicrpcServer(grpcServer, rpcServer)
 

+ 8 - 1
bridge/pkg/db/db.go

@@ -54,6 +54,8 @@ func (d *Database) StoreSignedVAA(v *vaa.VAA) error {
 
 	b, _ := v.Marshal()
 
+	// TODO: panic if same VAA is stored with different value
+
 	err := d.db.Update(func(txn *badger.Txn) error {
 		if err := txn.Set(vaaIDFromVAA(v).Bytes(), b); err != nil {
 			return err
@@ -74,11 +76,16 @@ func (d *Database) GetSignedVAABytes(id VAAID) (b []byte, err error) {
 		if err != nil {
 			return err
 		}
-		if _, err := item.ValueCopy(b); err != nil {
+		if val, err := item.ValueCopy(nil); err != nil {
 			return err
+		} else {
+			b = val
 		}
 		return nil
 	}); err != nil {
+		if err == badger.ErrKeyNotFound {
+			return nil, ErrVAANotFound
+		}
 		return nil, err
 	}
 	return

+ 2 - 1
bridge/pkg/processor/observation.go

@@ -230,7 +230,8 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs
 			p.logger.Info("signed VAA with quorum",
 				zap.String("digest", hash),
 				zap.Any("vaa", signed),
-				zap.String("bytes", hex.EncodeToString(vaaBytes)))
+				zap.String("bytes", hex.EncodeToString(vaaBytes)),
+				zap.String("message_id", signed.MessageID()))
 
 			if err := p.db.StoreSignedVAA(signed); err != nil {
 				p.logger.Error("failed to store signed VAA", zap.Error(err))

+ 41 - 1
bridge/pkg/publicrpc/publicrpcserver.go

@@ -1,9 +1,16 @@
 package publicrpc
 
 import (
+	"context"
+	"encoding/hex"
+	"fmt"
+	"github.com/certusone/wormhole/bridge/pkg/db"
 	gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
 	publicrpcv1 "github.com/certusone/wormhole/bridge/pkg/proto/publicrpc/v1"
+	"github.com/certusone/wormhole/bridge/pkg/vaa"
 	"go.uber.org/zap"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
 )
 
 // PublicrpcServer implements the publicrpc gRPC service.
@@ -11,12 +18,14 @@ type PublicrpcServer struct {
 	publicrpcv1.UnimplementedPublicrpcServer
 	rawHeartbeatListeners *RawHeartbeatConns
 	logger                *zap.Logger
+	db                    *db.Database
 }
 
-func NewPublicrpcServer(logger *zap.Logger, rawHeartbeatListeners *RawHeartbeatConns) *PublicrpcServer {
+func NewPublicrpcServer(logger *zap.Logger, rawHeartbeatListeners *RawHeartbeatConns, db *db.Database) *PublicrpcServer {
 	return &PublicrpcServer{
 		rawHeartbeatListeners: rawHeartbeatListeners,
 		logger:                logger.Named("publicrpcserver"),
+		db:                    db,
 	}
 }
 
@@ -40,3 +49,34 @@ func (s *PublicrpcServer) GetRawHeartbeats(req *publicrpcv1.GetRawHeartbeatsRequ
 		}
 	}
 }
+
+func (s *PublicrpcServer) GetSignedVAA(ctx context.Context, req *publicrpcv1.GetSignedVAARequest) (*publicrpcv1.GetSignedVAAResponse, error) {
+	address, err := hex.DecodeString(req.MessageId.EmitterAddress)
+	if err != nil {
+		return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("failed to decode address: %v", err))
+	}
+	if len(address) != 32 {
+		return nil, status.Error(codes.InvalidArgument, "address must be 32 bytes")
+	}
+
+	addr := vaa.Address{}
+	copy(addr[:], address)
+
+	b, err := s.db.GetSignedVAABytes(db.VAAID{
+		EmitterChain:   vaa.ChainID(req.MessageId.EmitterChain.Number()),
+		EmitterAddress: addr,
+		Sequence:       uint64(req.MessageId.Sequence),
+	})
+
+	if err != nil {
+		if err == db.ErrVAANotFound {
+			return nil, status.Error(codes.NotFound, err.Error())
+		}
+		s.logger.Error("failed to fetch VAA", zap.Error(err), zap.Any("request", req))
+		return nil, status.Error(codes.Internal, "internal server error")
+	}
+
+	return &publicrpcv1.GetSignedVAAResponse{
+		VaaBytes: b,
+	}, nil
+}

+ 5 - 0
bridge/pkg/vaa/structs.go

@@ -241,6 +241,11 @@ func (v *VAA) Marshal() ([]byte, error) {
 	return buf.Bytes(), nil
 }
 
+// MessageID returns a human-readable emitter_chain/emitter_address/sequence tuple.
+func (v *VAA) MessageID() string {
+	return fmt.Sprintf("%d/%s/%d", v.EmitterChain, v.EmitterAddress, v.Sequence)
+}
+
 func (v *VAA) serializeBody() ([]byte, error) {
 	buf := new(bytes.Buffer)
 	MustWrite(buf, binary.BigEndian, uint32(v.Timestamp.Unix()))

+ 32 - 0
proto/publicrpc/v1/publicrpc.proto

@@ -7,6 +7,24 @@ option go_package = "github.com/certusone/wormhole/bridge/pkg/proto/publicrpc/v1
 import "gossip/v1/gossip.proto";
 import "google/api/annotations.proto";
 
+enum EmitterChain {
+  CHAIN_ID_UNKNOWN = 0;
+  CHAIN_ID_SOLANA = 1;
+  CHAIN_ID_ETHEREUM = 2;
+  CHAIN_ID_TERRA = 3;
+  CHAIN_ID_BSC = 4;
+}
+
+// MessageID is a VAA's globally unique identifier (see data availability design document).
+message MessageID {
+  // Emitter chain ID.
+  EmitterChain emitter_chain = 1;
+  // Hex-encoded emitter address.
+  string emitter_address = 2;
+  // Sequence number for (emitter_chain, emitter_address).
+  int64 sequence = 3;
+}
+
 // Publicrpc service exposes endpoints to be consumed externally; GUIs, historical record keeping, etc.
 service Publicrpc {
   // GetRawHeartbeats rpc endpoint returns a stream of the p2p heartbeat messages received.
@@ -17,7 +35,21 @@ service Publicrpc {
       get: "/v1/heartbeats:stream_raw"
     };
   };
+
+  rpc GetSignedVAA (GetSignedVAARequest) returns (GetSignedVAAResponse) {
+    option (google.api.http) = {
+      get: "/v1/signed_vaa/{message_id.emitter_chain}/{message_id.emitter_address}/{message_id.sequence}"
+    };
+  }
 }
 
 message GetRawHeartbeatsRequest {
 }
+
+message GetSignedVAARequest {
+  MessageID message_id = 1;
+}
+
+message GetSignedVAAResponse {
+  bytes vaa_bytes = 1;
+}