Pārlūkot izejas kodu

node: add near support (#1397)

* node: add near support

* Tweaks suggested in zoom review

Co-authored-by: Bruce Riley <briley@jumptrading.com>
jumpsiegel 3 gadi atpakaļ
vecāks
revīzija
3f965da33b

+ 34 - 1
node/cmd/guardiand/node.go

@@ -45,6 +45,7 @@ import (
 	cosmwasm "github.com/certusone/wormhole/node/pkg/terra"
 
 	"github.com/certusone/wormhole/node/pkg/algorand"
+	"github.com/certusone/wormhole/node/pkg/near"
 
 	ipfslog "github.com/ipfs/go-log/v2"
 )
@@ -125,6 +126,9 @@ var (
 	algorandAlgodToken   *string
 	algorandAppID        *uint64
 
+	nearRPC      *string
+	nearContract *string
+
 	solanaWsRPC *string
 	solanaRPC   *string
 
@@ -239,6 +243,9 @@ func init() {
 	algorandAlgodToken = NodeCmd.Flags().String("algorandAlgodToken", "", "Algorand Algod access token")
 	algorandAppID = NodeCmd.Flags().Uint64("algorandAppID", 0, "Algorand app id")
 
+	nearRPC = NodeCmd.Flags().String("nearRPC", "", "near RPC URL")
+	nearContract = NodeCmd.Flags().String("nearContract", "", "near contract")
+
 	solanaWsRPC = NodeCmd.Flags().String("solanaWS", "", "Solana Websocket URL (required")
 	solanaRPC = NodeCmd.Flags().String("solanaRPC", "", "Solana RPC URL (required")
 
@@ -378,6 +385,9 @@ func runNode(cmd *cobra.Command, args []string) {
 	if *algorandIndexerRPC != "" {
 		readiness.RegisterComponent(common.ReadinessAlgorandSyncing)
 	}
+	if *nearRPC != "" {
+		readiness.RegisterComponent(common.ReadinessNearSyncing)
+	}
 	readiness.RegisterComponent(common.ReadinessBSCSyncing)
 	readiness.RegisterComponent(common.ReadinessPolygonSyncing)
 	readiness.RegisterComponent(common.ReadinessAvalancheSyncing)
@@ -522,6 +532,15 @@ func runNode(cmd *cobra.Command, args []string) {
 	if *celoContract == "" && !*unsafeDevMode {
 		logger.Fatal("Please specify --celoContract")
 	}
+	if *testnetMode || *unsafeDevMode {
+		if *nearRPC != "" {
+			if *nearContract == "" {
+				logger.Fatal("If --nearRPC is specified, then --nearContract must be specified")
+			}
+		} else if *nearContract != "" {
+			logger.Fatal("If --nearContract is specified, then --nearRPC must be specified")
+		}
+	}
 	if *testnetMode {
 		if *ethRopstenRPC == "" {
 			logger.Fatal("Please specify --ethRopstenRPC")
@@ -578,6 +597,12 @@ func runNode(cmd *cobra.Command, args []string) {
 		if *injectiveContract != "" && !*unsafeDevMode {
 			logger.Fatal("Please do not specify --injectiveContract")
 		}
+		if *nearRPC != "" && !*unsafeDevMode {
+			logger.Fatal("Please do not specify --nearRPC")
+		}
+		if *nearContract != "" && !*unsafeDevMode {
+			logger.Fatal("Please do not specify --nearContract")
+		}
 	}
 	if *nodeName == "" {
 		logger.Fatal("Please specify --nodeName")
@@ -643,7 +668,6 @@ func runNode(cmd *cobra.Command, args []string) {
 				logger.Fatal("Please specify --pythnetUrl")
 			}
 		}
-
 	}
 
 	if *bigTablePersistenceEnabled {
@@ -795,6 +819,9 @@ func runNode(cmd *cobra.Command, args []string) {
 	if *testnetMode || *unsafeDevMode {
 		chainObsvReqC[vaa.ChainIDAlgorand] = make(chan *gossipv1.ObservationRequest)
 	}
+	if *nearRPC != "" {
+		chainObsvReqC[vaa.ChainIDNear] = make(chan *gossipv1.ObservationRequest)
+	}
 	chainObsvReqC[vaa.ChainIDAurora] = make(chan *gossipv1.ObservationRequest)
 	chainObsvReqC[vaa.ChainIDFantom] = make(chan *gossipv1.ObservationRequest)
 	chainObsvReqC[vaa.ChainIDKarura] = make(chan *gossipv1.ObservationRequest)
@@ -1033,6 +1060,12 @@ func runNode(cmd *cobra.Command, args []string) {
 				return err
 			}
 		}
+		if *nearRPC != "" {
+			if err := supervisor.Run(ctx, "nearwatch",
+				near.NewWatcher(*nearRPC, *nearContract, lockC, chainObsvReqC[vaa.ChainIDNear]).Run); err != nil {
+				return err
+			}
+		}
 
 		if *solanaWsRPC != "" {
 			if err := supervisor.Run(ctx, "solwatch-confirmed",

+ 1 - 0
node/pkg/common/readiness.go

@@ -7,6 +7,7 @@ const (
 	ReadinessSolanaSyncing     readiness.Component = "solanaSyncing"
 	ReadinessTerraSyncing      readiness.Component = "terraSyncing"
 	ReadinessAlgorandSyncing   readiness.Component = "algorandSyncing"
+	ReadinessNearSyncing       readiness.Component = "nearSyncing"
 	ReadinessBSCSyncing        readiness.Component = "bscSyncing"
 	ReadinessPolygonSyncing    readiness.Component = "polygonSyncing"
 	ReadinessEthRopstenSyncing readiness.Component = "ethRopstenSyncing"

+ 381 - 0
node/pkg/near/watcher.go

@@ -0,0 +1,381 @@
+package near
+
+import (
+	"bytes"
+	"context"
+	"encoding/hex"
+	"fmt"
+	"io/ioutil"
+	"net/http"
+	"strings"
+	"time"
+
+	"github.com/certusone/wormhole/node/pkg/common"
+	"github.com/certusone/wormhole/node/pkg/p2p"
+	gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
+	"github.com/certusone/wormhole/node/pkg/readiness"
+	"github.com/certusone/wormhole/node/pkg/supervisor"
+	"github.com/certusone/wormhole/node/pkg/vaa"
+	eth_common "github.com/ethereum/go-ethereum/common"
+	"github.com/mr-tron/base58"
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/client_golang/prometheus/promauto"
+	"github.com/tidwall/gjson"
+	"go.uber.org/zap"
+)
+
+type (
+	// Watcher is responsible for looking over Near blockchain and reporting new transactions to the wormhole contract
+	Watcher struct {
+		nearRPC          string
+		wormholeContract string
+
+		msgChan  chan *common.MessagePublication
+		obsvReqC chan *gossipv1.ObservationRequest
+
+		next_round uint64
+	}
+)
+
+var (
+	nearMessagesConfirmed = promauto.NewCounter(
+		prometheus.CounterOpts{
+			Name: "wormhole_near_observations_confirmed_total",
+			Help: "Total number of verified Near observations found",
+		})
+	currentNearHeight = promauto.NewGauge(
+		prometheus.GaugeOpts{
+			Name: "wormhole_near_current_height",
+			Help: "Current Near block height",
+		})
+)
+
+// NewWatcher creates a new Near appid watcher
+func NewWatcher(
+	nearRPC string,
+	wormholeContract string,
+	lockEvents chan *common.MessagePublication,
+	obsvReqC chan *gossipv1.ObservationRequest,
+) *Watcher {
+	return &Watcher{
+		nearRPC:          nearRPC,
+		wormholeContract: wormholeContract,
+		msgChan:          lockEvents,
+		obsvReqC:         obsvReqC,
+		next_round:       0,
+	}
+}
+
+func (e *Watcher) getBlock(block uint64) ([]byte, error) {
+	s := fmt.Sprintf(`{"id": "dontcare", "jsonrpc": "2.0", "method": "block", "params": {"block_id": %d}}`, block)
+	resp, err := http.Post(e.nearRPC, "application/json", bytes.NewBuffer([]byte(s)))
+
+	if err != nil {
+		return nil, err
+	}
+
+	defer resp.Body.Close()
+	return ioutil.ReadAll(resp.Body)
+}
+
+func (e *Watcher) getFinalBlock() ([]byte, error) {
+	s := `{"id": "dontcare", "jsonrpc": "2.0", "method": "block", "params": {"finality": "final"}}`
+	resp, err := http.Post(e.nearRPC, "application/json", bytes.NewBuffer([]byte(s)))
+
+	if err != nil {
+		return nil, err
+	}
+
+	defer resp.Body.Close()
+	return ioutil.ReadAll(resp.Body)
+}
+
+func (e *Watcher) getChunk(chunk string) ([]byte, error) {
+	s := fmt.Sprintf(`{"id": "dontcare", "jsonrpc": "2.0", "method": "chunk", "params": {"chunk_id": "%s"}}`, chunk)
+
+	resp, err := http.Post(e.nearRPC, "application/json", bytes.NewBuffer([]byte(s)))
+
+	if err != nil {
+		return nil, err
+	}
+
+	defer resp.Body.Close()
+	return ioutil.ReadAll(resp.Body)
+}
+
+func (e *Watcher) getTxStatus(logger *zap.Logger, tx string, src string) ([]byte, error) {
+	s := fmt.Sprintf(`{"id": "dontcare", "jsonrpc": "2.0", "method": "EXPERIMENTAL_tx_status", "params": ["%s", "%s"]}`, tx, src)
+
+	resp, err := http.Post(e.nearRPC, "application/json", bytes.NewBuffer([]byte(s)))
+
+	if err != nil {
+		return nil, err
+	}
+
+	defer resp.Body.Close()
+	return ioutil.ReadAll(resp.Body)
+}
+
+func (e *Watcher) inspectStatus(logger *zap.Logger, hash string, receiver_id string, ts uint64) error {
+	t, err := e.getTxStatus(logger, hash, receiver_id)
+
+	if err != nil {
+		return err
+	}
+
+	outcomes := gjson.ParseBytes(t).Get("result.receipts_outcome")
+
+	if !outcomes.Exists() {
+		return nil
+	}
+
+	for _, o := range outcomes.Array() {
+		outcome := o.Get("outcome")
+		if !outcome.Exists() {
+			continue
+		}
+
+		executor_id := outcome.Get("executor_id")
+		if !executor_id.Exists() {
+			continue
+		}
+
+		if executor_id.String() == e.wormholeContract {
+			l := outcome.Get("logs")
+			if !l.Exists() {
+				continue
+			}
+			for _, log := range l.Array() {
+				event := log.String()
+				if !strings.HasPrefix(event, "EVENT_JSON:") {
+					continue
+				}
+				logger.Info("event", zap.String("event", event[11:]))
+
+				event_json := gjson.ParseBytes([]byte(event[11:]))
+
+				standard := event_json.Get("standard")
+				if !standard.Exists() || standard.String() != "wormhole" {
+					continue
+				}
+				event_type := event_json.Get("event")
+				if !event_type.Exists() || event_type.String() != "publish" {
+					continue
+				}
+
+				em := event_json.Get("emitter")
+				if !em.Exists() {
+					continue
+				}
+
+				emitter, err := hex.DecodeString(em.String())
+				if err != nil {
+					return err
+				}
+
+				var a vaa.Address
+				copy(a[:], emitter)
+
+				id, err := base58.Decode(hash)
+				if err != nil {
+					return err
+				}
+
+				var txHash = eth_common.BytesToHash(id) // 32 bytes = d3b136a6a182a40554b2fafbc8d12a7a22737c10c81e33b33d1dcb74c532708b
+
+				v := event_json.Get("data")
+				if !v.Exists() {
+					logger.Info("data")
+					return nil
+				}
+
+				pl, err := hex.DecodeString(v.String())
+				if err != nil {
+					return err
+				}
+
+				observation := &common.MessagePublication{
+					TxHash:           txHash,
+					Timestamp:        time.Unix(int64(ts), 0),
+					Nonce:            uint32(event_json.Get("nonce").Uint()), // uint32
+					Sequence:         event_json.Get("seq").Uint(),
+					EmitterChain:     vaa.ChainIDNear,
+					EmitterAddress:   a,
+					Payload:          pl,
+					ConsistencyLevel: 0,
+				}
+
+				nearMessagesConfirmed.Inc()
+
+				logger.Info("message observed",
+					zap.Uint64("ts", ts),
+					zap.Time("timestamp", observation.Timestamp),
+					zap.Uint32("nonce", observation.Nonce),
+					zap.Uint64("sequence", observation.Sequence),
+					zap.Stringer("emitter_chain", observation.EmitterChain),
+					zap.Stringer("emitter_address", observation.EmitterAddress),
+					zap.Binary("payload", observation.Payload),
+					zap.Uint8("consistency_level", observation.ConsistencyLevel),
+				)
+
+				e.msgChan <- observation
+			}
+		}
+	}
+
+	return nil
+}
+
+func (e *Watcher) inspectBody(logger *zap.Logger, block uint64, body gjson.Result) error {
+	logger.Info("inspectBody", zap.Uint64("block", block))
+
+	result := body.Get("result.chunks.#.chunk_hash")
+	if !result.Exists() {
+		return nil
+	}
+
+	v := body.Get("result.header.timestamp")
+	if !v.Exists() {
+		return nil
+	}
+	ts := uint64(v.Uint()) / 1000000000
+
+	for _, name := range result.Array() {
+		chunk, err := e.getChunk(name.String())
+		if err != nil {
+			return err
+		}
+
+		txns := gjson.ParseBytes(chunk).Get("result.transactions")
+		if !txns.Exists() {
+			continue
+		}
+		for _, r := range txns.Array() {
+			hash := r.Get("hash")
+			receiver_id := r.Get("receiver_id")
+			if !hash.Exists() || !receiver_id.Exists() {
+				continue
+			}
+
+			err = e.inspectStatus(logger, hash.String(), receiver_id.String(), ts)
+			if err != nil {
+				return err
+			}
+		}
+
+	}
+	return nil
+}
+
+func (e *Watcher) Run(ctx context.Context) error {
+	p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDNear, &gossipv1.Heartbeat_Network{
+		ContractAddress: e.wormholeContract,
+	})
+
+	logger := supervisor.Logger(ctx)
+	errC := make(chan error)
+
+	logger.Info("Near watcher connecting to RPC node ", zap.String("url", e.nearRPC))
+
+	go func() {
+		if e.next_round == 0 {
+			finalBody, err := e.getFinalBlock()
+			if err != nil {
+				logger.Error("StatusAfterBlock", zap.Error(err))
+				p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDNear, 1)
+				errC <- err
+				return
+			}
+			e.next_round = gjson.ParseBytes(finalBody).Get("result.chunks.0.height_created").Uint()
+		}
+
+		timer := time.NewTicker(time.Second * 1)
+		defer timer.Stop()
+
+		for {
+			select {
+			case <-ctx.Done():
+				return
+			case r := <-e.obsvReqC:
+				if vaa.ChainID(r.ChainId) != vaa.ChainIDNear {
+					panic("invalid chain ID")
+				}
+
+				txHash := base58.Encode(r.TxHash)
+
+				logger.Info("Received obsv request", zap.String("tx_hash", txHash))
+
+				err := e.inspectStatus(logger, txHash, e.wormholeContract, 0)
+				if err != nil {
+					logger.Error(fmt.Sprintf("near obsvReqC: %s", err.Error()))
+				}
+
+			case <-timer.C:
+				finalBody, err := e.getFinalBlock()
+				if err != nil {
+					logger.Error(fmt.Sprintf("nearClient.Status: %s", err.Error()))
+
+					p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDNear, 1)
+					errC <- err
+					return
+				}
+				parsedFinalBody := gjson.ParseBytes(finalBody)
+				lastBlock := parsedFinalBody.Get("result.chunks.0.height_created").Uint()
+
+				logger.Info("lastBlock", zap.Uint64("lastBlock", lastBlock), zap.Uint64("next_round", e.next_round))
+
+				if lastBlock < e.next_round {
+					logger.Error("Went backwards... ")
+					e.next_round = lastBlock
+				}
+
+				for ; e.next_round <= lastBlock; e.next_round = e.next_round + 1 {
+					if e.next_round == lastBlock {
+						err := e.inspectBody(logger, e.next_round, parsedFinalBody)
+						if err != nil {
+							logger.Error(fmt.Sprintf("inspectBody: %s", err.Error()))
+
+							p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDNear, 1)
+							errC <- err
+							return
+
+						}
+					} else {
+						b, err := e.getBlock(e.next_round)
+						if err != nil {
+							logger.Error(fmt.Sprintf("nearClient.Status: %s", err.Error()))
+
+							p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDNear, 1)
+							errC <- err
+							return
+
+						}
+						err = e.inspectBody(logger, e.next_round, gjson.ParseBytes(b))
+						if err != nil {
+							logger.Error(fmt.Sprintf("inspectBody: %s", err.Error()))
+
+							p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDNear, 1)
+							errC <- err
+							return
+
+						}
+					}
+				}
+
+				currentNearHeight.Set(float64(e.next_round - 1))
+				p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDNear, &gossipv1.Heartbeat_Network{
+					Height:          int64(e.next_round - 1),
+					ContractAddress: e.wormholeContract,
+				})
+				readiness.SetReady(common.ReadinessNearSyncing)
+			}
+		}
+	}()
+
+	select {
+	case <-ctx.Done():
+		return ctx.Err()
+	case err := <-errC:
+		return err
+	}
+}

+ 6 - 0
node/pkg/vaa/structs.go

@@ -114,6 +114,8 @@ func (c ChainID) String() string {
 		return "fantom"
 	case ChainIDAlgorand:
 		return "algorand"
+	case ChainIDNear:
+		return "near"
 	case ChainIDEthereumRopsten:
 		return "ethereum-ropsten"
 	case ChainIDKarura:
@@ -163,6 +165,8 @@ func ChainIDFromString(s string) (ChainID, error) {
 		return ChainIDFantom, nil
 	case "algorand":
 		return ChainIDAlgorand, nil
+	case "near":
+		return ChainIDNear, nil
 	case "ethereum-ropsten":
 		return ChainIDEthereumRopsten, nil
 	case "karura":
@@ -218,6 +222,8 @@ const (
 	ChainIDKlaytn ChainID = 13
 	// ChainIDCelo is the ChainID of Celo
 	ChainIDCelo ChainID = 14
+	// ChainIDNear is the ChainID of Near
+	ChainIDNear ChainID = 15
 	// ChainIDMoonbeam is the ChainID of Moonbeam
 	ChainIDMoonbeam ChainID = 16
 	// ChainIDNeon is the ChainID of Neon

+ 3 - 0
node/pkg/vaa/structs_test.go

@@ -32,6 +32,7 @@ func TestChainIDFromString(t *testing.T) {
 		{input: "avalanche", output: ChainIDAvalanche},
 		{input: "oasis", output: ChainIDOasis},
 		{input: "algorand", output: ChainIDAlgorand},
+		{input: "near", output: ChainIDNear},
 		{input: "aurora", output: ChainIDAurora},
 		{input: "fantom", output: ChainIDFantom},
 		{input: "karura", output: ChainIDKarura},
@@ -52,6 +53,7 @@ func TestChainIDFromString(t *testing.T) {
 		{input: "Avalanche", output: ChainIDAvalanche},
 		{input: "Oasis", output: ChainIDOasis},
 		{input: "Algorand", output: ChainIDAlgorand},
+		{input: "Near", output: ChainIDNear},
 		{input: "Aurora", output: ChainIDAurora},
 		{input: "Fantom", output: ChainIDFantom},
 		{input: "Karura", output: ChainIDKarura},
@@ -148,6 +150,7 @@ func TestChainId_String(t *testing.T) {
 		{input: 12, output: "acala"},
 		{input: 13, output: "klaytn"},
 		{input: 14, output: "celo"},
+		{input: 15, output: "near"},
 		{input: 16, output: "moonbeam"},
 		{input: 17, output: "neon"},
 		{input: 18, output: "terra2"},