|
|
@@ -62,6 +62,9 @@ type (
|
|
|
|
|
|
// b64Encoded indicates if transactions are base 64 encoded.
|
|
|
b64Encoded bool
|
|
|
+
|
|
|
+ // Human readable chain name
|
|
|
+ networkName string
|
|
|
}
|
|
|
)
|
|
|
|
|
|
@@ -122,6 +125,9 @@ func NewWatcher(
|
|
|
// Terra2 no longer base64 encodes parameters.
|
|
|
b64Encoded := env == common.UnsafeDevNet || (chainID != vaa.ChainIDInjective && chainID != vaa.ChainIDTerra2 && chainID != vaa.ChainIDTerra)
|
|
|
|
|
|
+ // Human readable network name
|
|
|
+ networkName := vaa.ChainID(chainID).String()
|
|
|
+
|
|
|
return &Watcher{
|
|
|
urlWS: urlWS,
|
|
|
urlLCD: urlLCD,
|
|
|
@@ -134,12 +140,11 @@ func NewWatcher(
|
|
|
contractAddressLogKey: contractAddressLogKey,
|
|
|
latestBlockURL: latestBlockURL,
|
|
|
b64Encoded: b64Encoded,
|
|
|
+ networkName: networkName,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
func (e *Watcher) Run(ctx context.Context) error {
|
|
|
- networkName := e.chainID.String()
|
|
|
-
|
|
|
p2p.DefaultRegistry.SetNetworkStats(e.chainID, &gossipv1.Heartbeat_Network{
|
|
|
ContractAddress: e.contract,
|
|
|
})
|
|
|
@@ -155,17 +160,19 @@ func (e *Watcher) Run(ctx context.Context) error {
|
|
|
zap.String("chainID", e.chainID.String()),
|
|
|
)
|
|
|
|
|
|
- logger.Info("connecting to websocket", zap.String("network", networkName), zap.String("url", e.urlWS))
|
|
|
+ logger.Info("connecting to websocket", zap.String("network", e.networkName), zap.String("url", e.urlWS))
|
|
|
|
|
|
//nolint:bodyclose // The close is down below. The linter misses it.
|
|
|
c, _, err := websocket.Dial(ctx, e.urlWS, nil)
|
|
|
if err != nil {
|
|
|
p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
|
|
|
- connectionErrors.WithLabelValues(networkName, "websocket_dial_error").Inc()
|
|
|
+ connectionErrors.WithLabelValues(e.networkName, "websocket_dial_error").Inc()
|
|
|
return fmt.Errorf("websocket dial failed: %w", err)
|
|
|
}
|
|
|
defer c.Close(websocket.StatusNormalClosure, "")
|
|
|
|
|
|
+ e.logVersion(ctx, logger, c)
|
|
|
+
|
|
|
c.SetReadLimit(ReadLimitSize)
|
|
|
|
|
|
// Subscribe to smart contract transactions
|
|
|
@@ -179,7 +186,7 @@ func (e *Watcher) Run(ctx context.Context) error {
|
|
|
err = wsjson.Write(ctx, c, command)
|
|
|
if err != nil {
|
|
|
p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
|
|
|
- connectionErrors.WithLabelValues(networkName, "websocket_subscription_error").Inc()
|
|
|
+ connectionErrors.WithLabelValues(e.networkName, "websocket_subscription_error").Inc()
|
|
|
return fmt.Errorf("websocket subscription failed: %w", err)
|
|
|
}
|
|
|
|
|
|
@@ -187,10 +194,10 @@ func (e *Watcher) Run(ctx context.Context) error {
|
|
|
_, _, err = c.Read(ctx)
|
|
|
if err != nil {
|
|
|
p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
|
|
|
- connectionErrors.WithLabelValues(networkName, "event_subscription_error").Inc()
|
|
|
+ connectionErrors.WithLabelValues(e.networkName, "event_subscription_error").Inc()
|
|
|
return fmt.Errorf("event subscription failed: %w", err)
|
|
|
}
|
|
|
- logger.Info("subscribed to new transaction events", zap.String("network", networkName))
|
|
|
+ logger.Info("subscribed to new transaction events", zap.String("network", e.networkName))
|
|
|
|
|
|
readiness.SetReady(e.readinessSync)
|
|
|
|
|
|
@@ -209,12 +216,12 @@ func (e *Watcher) Run(ctx context.Context) error {
|
|
|
// Query and report height and set currentSlotHeight
|
|
|
resp, err := client.Get(fmt.Sprintf("%s/%s", e.urlLCD, e.latestBlockURL)) //nolint:noctx // TODO FIXME we should propagate context with Deadline here.
|
|
|
if err != nil {
|
|
|
- logger.Error("query latest block response error", zap.String("network", networkName), zap.Error(err))
|
|
|
+ logger.Error("query latest block response error", zap.String("network", e.networkName), zap.Error(err))
|
|
|
continue
|
|
|
}
|
|
|
blocksBody, err := common.SafeRead(resp.Body)
|
|
|
if err != nil {
|
|
|
- logger.Error("query latest block response read error", zap.String("network", networkName), zap.Error(err))
|
|
|
+ logger.Error("query latest block response read error", zap.String("network", e.networkName), zap.Error(err))
|
|
|
errC <- err //nolint:channelcheck // The watcher will exit anyway
|
|
|
resp.Body.Close()
|
|
|
continue
|
|
|
@@ -222,12 +229,12 @@ func (e *Watcher) Run(ctx context.Context) error {
|
|
|
resp.Body.Close()
|
|
|
|
|
|
// Update the prom metrics with how long the http request took to the rpc
|
|
|
- queryLatency.WithLabelValues(networkName, "block_latest").Observe(time.Since(msm).Seconds())
|
|
|
+ queryLatency.WithLabelValues(e.networkName, "block_latest").Observe(time.Since(msm).Seconds())
|
|
|
|
|
|
blockJSON := string(blocksBody)
|
|
|
latestBlock := gjson.Get(blockJSON, "block.header.height")
|
|
|
- logger.Debug("current height", zap.String("network", networkName), zap.Int64("block", latestBlock.Int()))
|
|
|
- currentSlotHeight.WithLabelValues(networkName).Set(float64(latestBlock.Int()))
|
|
|
+ logger.Debug("current height", zap.String("network", e.networkName), zap.Int64("block", latestBlock.Int()))
|
|
|
+ currentSlotHeight.WithLabelValues(e.networkName).Set(float64(latestBlock.Int()))
|
|
|
p2p.DefaultRegistry.SetNetworkStats(e.chainID, &gossipv1.Heartbeat_Network{
|
|
|
Height: latestBlock.Int(),
|
|
|
ContractAddress: e.contract,
|
|
|
@@ -253,7 +260,7 @@ func (e *Watcher) Run(ctx context.Context) error {
|
|
|
|
|
|
tx := hex.EncodeToString(r.TxHash)
|
|
|
|
|
|
- logger.Info("received observation request", zap.String("network", networkName), zap.String("tx_hash", tx))
|
|
|
+ logger.Info("received observation request", zap.String("network", e.networkName), zap.String("tx_hash", tx))
|
|
|
|
|
|
client := &http.Client{
|
|
|
Timeout: time.Second * 5,
|
|
|
@@ -262,12 +269,12 @@ func (e *Watcher) Run(ctx context.Context) error {
|
|
|
// Query for tx by hash
|
|
|
resp, err := client.Get(fmt.Sprintf("%s/cosmos/tx/v1beta1/txs/%s", e.urlLCD, tx)) //nolint:noctx // TODO FIXME we should propagate context with Deadline here.
|
|
|
if err != nil {
|
|
|
- logger.Error("query tx response error", zap.String("network", networkName), zap.Error(err))
|
|
|
+ logger.Error("query tx response error", zap.String("network", e.networkName), zap.Error(err))
|
|
|
continue
|
|
|
}
|
|
|
txBody, err := common.SafeRead(resp.Body)
|
|
|
if err != nil {
|
|
|
- logger.Error("query tx response read error", zap.String("network", networkName), zap.Error(err))
|
|
|
+ logger.Error("query tx response read error", zap.String("network", e.networkName), zap.Error(err))
|
|
|
resp.Body.Close()
|
|
|
continue
|
|
|
}
|
|
|
@@ -277,14 +284,14 @@ func (e *Watcher) Run(ctx context.Context) error {
|
|
|
|
|
|
txHashRaw := gjson.Get(txJSON, "tx_response.txhash")
|
|
|
if !txHashRaw.Exists() {
|
|
|
- logger.Error("tx does not have tx hash", zap.String("network", networkName), zap.String("payload", txJSON))
|
|
|
+ logger.Error("tx does not have tx hash", zap.String("network", e.networkName), zap.String("payload", txJSON))
|
|
|
continue
|
|
|
}
|
|
|
txHash := txHashRaw.String()
|
|
|
|
|
|
events := gjson.Get(txJSON, "tx_response.events")
|
|
|
if !events.Exists() {
|
|
|
- logger.Error("tx has no events", zap.String("network", networkName), zap.String("payload", txJSON))
|
|
|
+ logger.Error("tx has no events", zap.String("network", e.networkName), zap.String("payload", txJSON))
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
@@ -293,12 +300,12 @@ func (e *Watcher) Run(ctx context.Context) error {
|
|
|
// Terra Classic upgraded WASM versions starting at block 13215800. If this transaction is from before that, we need to use the old contract address format.
|
|
|
blockHeightStr := gjson.Get(txJSON, "tx_response.height")
|
|
|
if !blockHeightStr.Exists() {
|
|
|
- logger.Error("failed to look up block height on old reobserved tx", zap.String("network", networkName), zap.String("txHash", txHash), zap.String("payload", txJSON))
|
|
|
+ logger.Error("failed to look up block height on old reobserved tx", zap.String("network", e.networkName), zap.String("txHash", txHash), zap.String("payload", txJSON))
|
|
|
continue
|
|
|
}
|
|
|
blockHeight := blockHeightStr.Int()
|
|
|
if blockHeight < 13215800 {
|
|
|
- logger.Info("doing look up of old tx", zap.String("network", networkName), zap.String("txHash", txHash), zap.Int64("blockHeight", blockHeight))
|
|
|
+ logger.Info("doing look up of old tx", zap.String("network", e.networkName), zap.String("txHash", txHash), zap.Int64("blockHeight", blockHeight))
|
|
|
contractAddressLogKey = "contract_address"
|
|
|
}
|
|
|
}
|
|
|
@@ -307,8 +314,8 @@ func (e *Watcher) Run(ctx context.Context) error {
|
|
|
for _, msg := range msgs {
|
|
|
msg.IsReobservation = true
|
|
|
e.msgC <- msg //nolint:channelcheck // The channel to the processor is buffered and shared across chains, if it backs up we should stop processing new observations
|
|
|
- messagesConfirmed.WithLabelValues(networkName).Inc()
|
|
|
- watchers.ReobservationsByChain.WithLabelValues(networkName, "std").Inc()
|
|
|
+ messagesConfirmed.WithLabelValues(e.networkName).Inc()
|
|
|
+ watchers.ReobservationsByChain.WithLabelValues(e.networkName, "std").Inc()
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -323,8 +330,8 @@ func (e *Watcher) Run(ctx context.Context) error {
|
|
|
_, message, err := c.Read(ctx)
|
|
|
if err != nil {
|
|
|
p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
|
|
|
- connectionErrors.WithLabelValues(networkName, "channel_read_error").Inc()
|
|
|
- logger.Error("error reading channel", zap.String("network", networkName), zap.Error(err))
|
|
|
+ connectionErrors.WithLabelValues(e.networkName, "channel_read_error").Inc()
|
|
|
+ logger.Error("error reading channel", zap.String("network", e.networkName), zap.Error(err))
|
|
|
errC <- err //nolint:channelcheck // The watcher will exit anyway
|
|
|
return nil
|
|
|
}
|
|
|
@@ -334,21 +341,21 @@ func (e *Watcher) Run(ctx context.Context) error {
|
|
|
|
|
|
txHashRaw := gjson.Get(json, "result.events.tx\\.hash.0")
|
|
|
if !txHashRaw.Exists() {
|
|
|
- logger.Warn("message does not have tx hash", zap.String("network", networkName), zap.String("payload", json))
|
|
|
+ logger.Warn("message does not have tx hash", zap.String("network", e.networkName), zap.String("payload", json))
|
|
|
continue
|
|
|
}
|
|
|
txHash := txHashRaw.String()
|
|
|
|
|
|
events := gjson.Get(json, "result.data.value.TxResult.result.events")
|
|
|
if !events.Exists() {
|
|
|
- logger.Warn("message has no events", zap.String("network", networkName), zap.String("payload", json))
|
|
|
+ logger.Warn("message has no events", zap.String("network", e.networkName), zap.String("payload", json))
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
msgs := EventsToMessagePublications(e.contract, txHash, events.Array(), logger, e.chainID, e.contractAddressLogKey, e.b64Encoded)
|
|
|
for _, msg := range msgs {
|
|
|
e.msgC <- msg //nolint:channelcheck // The channel to the processor is buffered and shared across chains, if it backs up we should stop processing new observations
|
|
|
- messagesConfirmed.WithLabelValues(networkName).Inc()
|
|
|
+ messagesConfirmed.WithLabelValues(e.networkName).Inc()
|
|
|
}
|
|
|
|
|
|
// We do not send guardian changes to the processor - ETH guardians are the source of truth.
|
|
|
@@ -364,6 +371,60 @@ func (e *Watcher) Run(ctx context.Context) error {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// logVersion uses the abci_info rpc to log node version information.
|
|
|
+func (e *Watcher) logVersion(ctx context.Context, logger *zap.Logger, c *websocket.Conn) {
|
|
|
+ // NOTE: This function is ugly because this watcher doesn't use a
|
|
|
+ // client library. It can be rewritten in a followup change.
|
|
|
+ //
|
|
|
+ // Get information about the application (the /status endpoint returns the
|
|
|
+ // version of the tendermint or cometbft library, no the actual application
|
|
|
+ // version.
|
|
|
+ //
|
|
|
+ // From:
|
|
|
+ // https://docs.cometbft.com/v0.34/rpc/#/ABCI/abci_info
|
|
|
+ // https://docs.tendermint.com/v0.34/rpc/#/ABCI/abci_info
|
|
|
+ command := map[string]interface{}{
|
|
|
+ "jsonrpc": "2.0",
|
|
|
+ "method": "abci_info",
|
|
|
+ "params": []interface{}{},
|
|
|
+ "id": 1,
|
|
|
+ }
|
|
|
+
|
|
|
+ err := wsjson.Write(ctx, c, command)
|
|
|
+ if err != nil {
|
|
|
+ logger.Error("problem retrieving node version when building request",
|
|
|
+ zap.String("network", e.networkName),
|
|
|
+ zap.Error(err),
|
|
|
+ )
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // Wait for the success response
|
|
|
+ _, data, err := c.Read(ctx)
|
|
|
+ if err != nil {
|
|
|
+ logger.Error("problem retrieving node version",
|
|
|
+ zap.String("network", e.networkName),
|
|
|
+ zap.Error(err),
|
|
|
+ )
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ version := gjson.GetBytes(data, "result.response.version").String()
|
|
|
+
|
|
|
+ if version == "" {
|
|
|
+ logger.Error("problem retrieving node version due to an empty response version ",
|
|
|
+ zap.String("network", e.networkName),
|
|
|
+ zap.String("response", string(data)),
|
|
|
+ )
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ logger.Info("node version",
|
|
|
+ zap.String("network", e.networkName),
|
|
|
+ zap.String("version", version),
|
|
|
+ )
|
|
|
+}
|
|
|
+
|
|
|
func EventsToMessagePublications(contract string, txHash string, events []gjson.Result, logger *zap.Logger, chainID vaa.ChainID, contractAddressKey string, b64Encoded bool) []*common.MessagePublication {
|
|
|
networkName := chainID.String()
|
|
|
msgs := make([]*common.MessagePublication, 0, len(events))
|