Browse Source

node/cosmwasm: restructure a little
node/cosmwasm: convert from gorilla to nhooyr

Josh Siegel 2 years ago
parent
commit
3de4f875dc
2 changed files with 85 additions and 75 deletions
  1. 4 4
      node/cmd/guardiand/node.go
  2. 81 71
      node/pkg/watchers/cosmwasm/watcher.go

+ 4 - 4
node/cmd/guardiand/node.go

@@ -1097,7 +1097,7 @@ func runNode(cmd *cobra.Command, args []string) {
 			readiness.RegisterComponent(common.ReadinessTerraSyncing)
 			readiness.RegisterComponent(common.ReadinessTerraSyncing)
 			chainObsvReqC[vaa.ChainIDTerra] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
 			chainObsvReqC[vaa.ChainIDTerra] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
 			if err := supervisor.Run(ctx, "terrawatch",
 			if err := supervisor.Run(ctx, "terrawatch",
-				cosmwasm.NewWatcher(*terraWS, *terraLCD, *terraContract, lockC, chainObsvReqC[vaa.ChainIDTerra], common.ReadinessTerraSyncing, vaa.ChainIDTerra).Run); err != nil {
+				common.WrapWithScissors(cosmwasm.NewWatcher(*terraWS, *terraLCD, *terraContract, lockC, chainObsvReqC[vaa.ChainIDTerra], common.ReadinessTerraSyncing, vaa.ChainIDTerra).Run, "terrawatch")); err != nil {
 				return err
 				return err
 			}
 			}
 		}
 		}
@@ -1107,7 +1107,7 @@ func runNode(cmd *cobra.Command, args []string) {
 			readiness.RegisterComponent(common.ReadinessTerra2Syncing)
 			readiness.RegisterComponent(common.ReadinessTerra2Syncing)
 			chainObsvReqC[vaa.ChainIDTerra2] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
 			chainObsvReqC[vaa.ChainIDTerra2] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
 			if err := supervisor.Run(ctx, "terra2watch",
 			if err := supervisor.Run(ctx, "terra2watch",
-				cosmwasm.NewWatcher(*terra2WS, *terra2LCD, *terra2Contract, lockC, chainObsvReqC[vaa.ChainIDTerra2], common.ReadinessTerra2Syncing, vaa.ChainIDTerra2).Run); err != nil {
+				common.WrapWithScissors(cosmwasm.NewWatcher(*terra2WS, *terra2LCD, *terra2Contract, lockC, chainObsvReqC[vaa.ChainIDTerra2], common.ReadinessTerra2Syncing, vaa.ChainIDTerra2).Run, "terra2watch")); err != nil {
 				return err
 				return err
 			}
 			}
 		}
 		}
@@ -1117,7 +1117,7 @@ func runNode(cmd *cobra.Command, args []string) {
 			readiness.RegisterComponent(common.ReadinessXplaSyncing)
 			readiness.RegisterComponent(common.ReadinessXplaSyncing)
 			chainObsvReqC[vaa.ChainIDXpla] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
 			chainObsvReqC[vaa.ChainIDXpla] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
 			if err := supervisor.Run(ctx, "xplawatch",
 			if err := supervisor.Run(ctx, "xplawatch",
-				cosmwasm.NewWatcher(*xplaWS, *xplaLCD, *xplaContract, lockC, chainObsvReqC[vaa.ChainIDXpla], common.ReadinessXplaSyncing, vaa.ChainIDXpla).Run); err != nil {
+				common.WrapWithScissors(cosmwasm.NewWatcher(*xplaWS, *xplaLCD, *xplaContract, lockC, chainObsvReqC[vaa.ChainIDXpla], common.ReadinessXplaSyncing, vaa.ChainIDXpla).Run, "xplawatch")); err != nil {
 				return err
 				return err
 			}
 			}
 		}
 		}
@@ -1205,7 +1205,7 @@ func runNode(cmd *cobra.Command, args []string) {
 			readiness.RegisterComponent(common.ReadinessInjectiveSyncing)
 			readiness.RegisterComponent(common.ReadinessInjectiveSyncing)
 			chainObsvReqC[vaa.ChainIDInjective] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
 			chainObsvReqC[vaa.ChainIDInjective] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
 			if err := supervisor.Run(ctx, "injectivewatch",
 			if err := supervisor.Run(ctx, "injectivewatch",
-				cosmwasm.NewWatcher(*injectiveWS, *injectiveLCD, *injectiveContract, lockC, chainObsvReqC[vaa.ChainIDInjective], common.ReadinessInjectiveSyncing, vaa.ChainIDInjective).Run); err != nil {
+				common.WrapWithScissors(cosmwasm.NewWatcher(*injectiveWS, *injectiveLCD, *injectiveContract, lockC, chainObsvReqC[vaa.ChainIDInjective], common.ReadinessInjectiveSyncing, vaa.ChainIDInjective).Run, "injectivewatch")); err != nil {
 				return err
 				return err
 			}
 			}
 		}
 		}

+ 81 - 71
node/pkg/watchers/cosmwasm/watcher.go

@@ -21,10 +21,12 @@ import (
 	"github.com/certusone/wormhole/node/pkg/common"
 	"github.com/certusone/wormhole/node/pkg/common"
 	"github.com/certusone/wormhole/node/pkg/readiness"
 	"github.com/certusone/wormhole/node/pkg/readiness"
 	"github.com/certusone/wormhole/node/pkg/supervisor"
 	"github.com/certusone/wormhole/node/pkg/supervisor"
-	"github.com/gorilla/websocket"
+
 	"github.com/tidwall/gjson"
 	"github.com/tidwall/gjson"
 	"github.com/wormhole-foundation/wormhole/sdk/vaa"
 	"github.com/wormhole-foundation/wormhole/sdk/vaa"
 	"go.uber.org/zap"
 	"go.uber.org/zap"
+	"nhooyr.io/websocket"
+	"nhooyr.io/websocket/wsjson"
 )
 )
 
 
 type (
 type (
@@ -141,13 +143,18 @@ func (e *Watcher) Run(ctx context.Context) error {
 
 
 	logger.Info("connecting to websocket", zap.String("network", networkName), zap.String("url", e.urlWS))
 	logger.Info("connecting to websocket", zap.String("network", networkName), zap.String("url", e.urlWS))
 
 
-	c, _, err := websocket.DefaultDialer.DialContext(ctx, e.urlWS, nil)
+	c, _, err := websocket.Dial(ctx, e.urlWS, nil)
 	if err != nil {
 	if err != nil {
 		p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
 		p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
 		connectionErrors.WithLabelValues(networkName, "websocket_dial_error").Inc()
 		connectionErrors.WithLabelValues(networkName, "websocket_dial_error").Inc()
 		return fmt.Errorf("websocket dial failed: %w", err)
 		return fmt.Errorf("websocket dial failed: %w", err)
 	}
 	}
-	defer c.Close()
+	defer c.Close(websocket.StatusNormalClosure, "")
+
+	// During testing, I got a message larger then the default
+	// 32768.  Increasing this limit effects an internal buffer that is used
+	// to as part of the zero alloc/copy design.
+	c.SetReadLimit(524288)
 
 
 	// Subscribe to smart contract transactions
 	// Subscribe to smart contract transactions
 	params := [...]string{fmt.Sprintf("tm.event='Tx' AND %s='%s'", e.contractAddressFilterKey, e.contract)}
 	params := [...]string{fmt.Sprintf("tm.event='Tx' AND %s='%s'", e.contractAddressFilterKey, e.contract)}
@@ -157,7 +164,7 @@ func (e *Watcher) Run(ctx context.Context) error {
 		Params:  params,
 		Params:  params,
 		ID:      1,
 		ID:      1,
 	}
 	}
-	err = c.WriteJSON(command)
+	err = wsjson.Write(ctx, c, command)
 	if err != nil {
 	if err != nil {
 		p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
 		p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
 		connectionErrors.WithLabelValues(networkName, "websocket_subscription_error").Inc()
 		connectionErrors.WithLabelValues(networkName, "websocket_subscription_error").Inc()
@@ -165,7 +172,7 @@ func (e *Watcher) Run(ctx context.Context) error {
 	}
 	}
 
 
 	// Wait for the success response
 	// Wait for the success response
-	_, _, err = c.ReadMessage()
+	_, _, err = c.Read(ctx)
 	if err != nil {
 	if err != nil {
 		p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
 		p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
 		connectionErrors.WithLabelValues(networkName, "event_subscription_error").Inc()
 		connectionErrors.WithLabelValues(networkName, "event_subscription_error").Inc()
@@ -175,49 +182,53 @@ func (e *Watcher) Run(ctx context.Context) error {
 
 
 	readiness.SetReady(e.readiness)
 	readiness.SetReady(e.readiness)
 
 
-	go func() {
+	common.RunWithScissors(ctx, errC, "cosmwasm_block_height", func(ctx context.Context) error {
 		t := time.NewTicker(5 * time.Second)
 		t := time.NewTicker(5 * time.Second)
 		client := &http.Client{
 		client := &http.Client{
 			Timeout: time.Second * 5,
 			Timeout: time.Second * 5,
 		}
 		}
 
 
 		for {
 		for {
-			<-t.C
-			msm := time.Now()
-			// Query and report height and set currentSlotHeight
-			resp, err := client.Get(fmt.Sprintf("%s/%s", e.urlLCD, e.latestBlockURL))
-			if err != nil {
-				logger.Error("query latest block response error", zap.String("network", networkName), zap.Error(err))
-				continue
-			}
-			blocksBody, err := io.ReadAll(resp.Body)
-			if err != nil {
-				logger.Error("query latest block response read error", zap.String("network", networkName), zap.Error(err))
-				errC <- err
+			select {
+			case <-ctx.Done():
+				return nil
+			case <-t.C:
+				msm := time.Now()
+				// Query and report height and set currentSlotHeight
+				resp, err := client.Get(fmt.Sprintf("%s/%s", e.urlLCD, e.latestBlockURL))
+				if err != nil {
+					logger.Error("query latest block response error", zap.String("network", networkName), zap.Error(err))
+					continue
+				}
+				blocksBody, err := io.ReadAll(resp.Body)
+				if err != nil {
+					logger.Error("query latest block response read error", zap.String("network", networkName), zap.Error(err))
+					errC <- err
+					resp.Body.Close()
+					continue
+				}
 				resp.Body.Close()
 				resp.Body.Close()
-				continue
+
+				// Update the prom metrics with how long the http request took to the rpc
+				queryLatency.WithLabelValues(networkName, "block_latest").Observe(time.Since(msm).Seconds())
+
+				blockJSON := string(blocksBody)
+				latestBlock := gjson.Get(blockJSON, "block.header.height")
+				logger.Info("current height", zap.String("network", networkName), zap.Int64("block", latestBlock.Int()))
+				currentSlotHeight.WithLabelValues(networkName).Set(float64(latestBlock.Int()))
+				p2p.DefaultRegistry.SetNetworkStats(e.chainID, &gossipv1.Heartbeat_Network{
+					Height:          latestBlock.Int(),
+					ContractAddress: e.contract,
+				})
 			}
 			}
-			resp.Body.Close()
-
-			// Update the prom metrics with how long the http request took to the rpc
-			queryLatency.WithLabelValues(networkName, "block_latest").Observe(time.Since(msm).Seconds())
-
-			blockJSON := string(blocksBody)
-			latestBlock := gjson.Get(blockJSON, "block.header.height")
-			logger.Info("current height", zap.String("network", networkName), zap.Int64("block", latestBlock.Int()))
-			currentSlotHeight.WithLabelValues(networkName).Set(float64(latestBlock.Int()))
-			p2p.DefaultRegistry.SetNetworkStats(e.chainID, &gossipv1.Heartbeat_Network{
-				Height:          latestBlock.Int(),
-				ContractAddress: e.contract,
-			})
 		}
 		}
-	}()
+	})
 
 
-	go func() {
+	common.RunWithScissors(ctx, errC, "cosmwasm_objs_req", func(ctx context.Context) error {
 		for {
 		for {
 			select {
 			select {
 			case <-ctx.Done():
 			case <-ctx.Done():
-				return
+				return nil
 			case r := <-e.obsvReqC:
 			case r := <-e.obsvReqC:
 				if vaa.ChainID(r.ChainId) != e.chainID {
 				if vaa.ChainID(r.ChainId) != e.chainID {
 					panic("invalid chain ID")
 					panic("invalid chain ID")
@@ -267,53 +278,52 @@ func (e *Watcher) Run(ctx context.Context) error {
 				}
 				}
 			}
 			}
 		}
 		}
-	}()
-
-	go func() {
-		defer close(errC)
+	})
 
 
+	common.RunWithScissors(ctx, errC, "cosmwasm_data_pump", func(ctx context.Context) error {
 		for {
 		for {
-			_, message, err := c.ReadMessage()
-			if err != nil {
-				p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
-				connectionErrors.WithLabelValues(networkName, "channel_read_error").Inc()
-				logger.Error("error reading channel", zap.String("network", networkName), zap.Error(err))
-				errC <- err
-				return
-			}
+			select {
+			case <-ctx.Done():
+				return nil
+			default:
+				_, message, err := c.Read(ctx)
+				if err != nil {
+					p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
+					connectionErrors.WithLabelValues(networkName, "channel_read_error").Inc()
+					logger.Error("error reading channel", zap.String("network", networkName), zap.Error(err))
+					errC <- err
+					return nil
+				}
 
 
-			// Received a message from the blockchain
-			json := string(message)
+				// Received a message from the blockchain
+				json := string(message)
 
 
-			txHashRaw := gjson.Get(json, "result.events.tx\\.hash.0")
-			if !txHashRaw.Exists() {
-				logger.Warn("message does not have tx hash", zap.String("network", networkName), zap.String("payload", json))
-				continue
-			}
-			txHash := txHashRaw.String()
+				txHashRaw := gjson.Get(json, "result.events.tx\\.hash.0")
+				if !txHashRaw.Exists() {
+					logger.Warn("message does not have tx hash", zap.String("network", networkName), zap.String("payload", json))
+					continue
+				}
+				txHash := txHashRaw.String()
 
 
-			events := gjson.Get(json, "result.data.value.TxResult.result.events")
-			if !events.Exists() {
-				logger.Warn("message has no events", zap.String("network", networkName), zap.String("payload", json))
-				continue
-			}
+				events := gjson.Get(json, "result.data.value.TxResult.result.events")
+				if !events.Exists() {
+					logger.Warn("message has no events", zap.String("network", networkName), zap.String("payload", json))
+					continue
+				}
 
 
-			msgs := EventsToMessagePublications(e.contract, txHash, events.Array(), logger, e.chainID, e.contractAddressLogKey)
-			for _, msg := range msgs {
-				e.msgChan <- msg
-				messagesConfirmed.WithLabelValues(networkName).Inc()
-			}
+				msgs := EventsToMessagePublications(e.contract, txHash, events.Array(), logger, e.chainID, e.contractAddressLogKey)
+				for _, msg := range msgs {
+					e.msgChan <- msg
+					messagesConfirmed.WithLabelValues(networkName).Inc()
+				}
 
 
-			// We do not send guardian changes to the processor - ETH guardians are the source of truth.
+				// We do not send guardian changes to the processor - ETH guardians are the source of truth.
+			}
 		}
 		}
-	}()
+	})
 
 
 	select {
 	select {
 	case <-ctx.Done():
 	case <-ctx.Done():
-		err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
-		if err != nil {
-			logger.Error("error on closing socket ", zap.String("network", networkName), zap.Error(err))
-		}
 		return ctx.Err()
 		return ctx.Err()
 	case err := <-errC:
 	case err := <-errC:
 		return err
 		return err