소스 검색

aptos: update watcher to read emitter as decimal

Csongor Kiss 3 년 전
부모
커밋
c6630b69fb
1개의 변경된 파일38개의 추가작업 그리고 20개의 파일을 삭제
  1. 38 20
      node/pkg/aptos/watcher.go

+ 38 - 20
node/pkg/aptos/watcher.go

@@ -34,7 +34,7 @@ type (
 		msgChan  chan *common.MessagePublication
 		obsvReqC chan *gossipv1.ObservationRequest
 
-		next_sequence uint64
+		next_sequence uint64 // aptos native sequence number for wormhole contract
 	}
 )
 
@@ -83,24 +83,37 @@ func (e *Watcher) retrievePayload(s string) ([]byte, error) {
 	return body, err
 }
 
-func (e *Watcher) observeData(logger *zap.Logger, data gjson.Result, seq uint64) {
+func (e *Watcher) observeData(logger *zap.Logger, data gjson.Result, native_seq uint64) {
 	em := data.Get("sender")
 	if !em.Exists() {
 		logger.Info("sender")
 		return
 	}
 
-	emitter, err := hex.DecodeString(em.String()[2:])
-	if err != nil {
-		logger.Info("sender decode")
+	// We read the emitter address as a 64 bit unsigned integer.
+	// .Uint() will happily return 0 if the input string is not a valid u64.
+	// We absolutely want to make sure that we only get a 0 when the input
+	// string was "0", and not swallow an error silently.
+	// The emitter address in the contract is represented as a u128, so there's
+	// a chance of overflow (though it will take a while to get there, as the
+	// emitter addresses are handed out incrementally -- TODO: we might want to
+	// consider changing that to u64 instead, which will require a testnet
+	// redeploy at a different address).
+	em_string := em.String()
+	em_uint := em.Uint()
+	if (em_string != "0" && em_uint == 0) {
+		logger.Error("Invalid emitter", zap.String("emitter string", em_string))
 		return
 	}
 
+	emitter := make([]byte, 8)
+	binary.BigEndian.PutUint64(emitter, em.Uint())
+
 	var a vaa.Address
-	copy(a[:], emitter)
+	copy(a[24:], emitter)
 
 	id := make([]byte, 8)
-	binary.BigEndian.PutUint64(id, seq)
+	binary.BigEndian.PutUint64(id, native_seq)
 
 	var txHash = eth_common.BytesToHash(id) // 32 bytes = d3b136a6a182a40554b2fafbc8d12a7a22737c10c81e33b33d1dcb74c532708b
 
@@ -193,11 +206,11 @@ func (e *Watcher) Run(ctx context.Context) error {
 					panic("invalid chain ID")
 				}
 
-				seq := binary.BigEndian.Uint64(r.TxHash)
+				native_seq := binary.BigEndian.Uint64(r.TxHash)
 
-				logger.Info("Received obsv request", zap.Uint64("tx_hash", seq))
+				logger.Info("Received obsv request", zap.Uint64("tx_hash", native_seq))
 
-				s := fmt.Sprintf(`%s?start=%d&limit=1`, e.aptosQuery, seq)
+				s := fmt.Sprintf(`%s?start=%d&limit=1`, e.aptosQuery, native_seq)
 
 				body, err := e.retrievePayload(s)
 				if err != nil {
@@ -222,8 +235,8 @@ func (e *Watcher) Run(ctx context.Context) error {
 						break
 					}
 
-					if newSeq.Uint() != seq {
-						logger.Error("newSeq != seq")
+					if newSeq.Uint() != native_seq {
+						logger.Error("newSeq != native_seq")
 						break
 
 					}
@@ -232,7 +245,7 @@ func (e *Watcher) Run(ctx context.Context) error {
 					if !data.Exists() {
 						break
 					}
-					e.observeData(logger, data, seq)
+					e.observeData(logger, data, native_seq)
 				}
 
 			case <-timer.C:
@@ -251,6 +264,11 @@ func (e *Watcher) Run(ctx context.Context) error {
 					break
 				}
 
+				// data doesn't exist yet. skip, and try again later
+				if string(body) == "" {
+					continue
+				}
+
 				if !gjson.Valid(string(body)) {
 					logger.Error("InvalidJson: " + string(body))
 					p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAptos, 1)
@@ -261,22 +279,22 @@ func (e *Watcher) Run(ctx context.Context) error {
 				outcomes := gjson.ParseBytes(body)
 
 				for _, chunk := range outcomes.Array() {
-					seq := chunk.Get("sequence_number")
-					if !seq.Exists() {
+					native_seq := chunk.Get("sequence_number")
+					if !native_seq.Exists() {
 						continue
 					}
 					if e.next_sequence == 0 {
-						e.next_sequence = seq.Uint() + 1
+						e.next_sequence = native_seq.Uint() + 1
 						break
 					} else {
-						e.next_sequence = seq.Uint() + 1
+						e.next_sequence = native_seq.Uint() + 1
 					}
 
 					data := chunk.Get("data")
 					if !data.Exists() {
 						continue
 					}
-					e.observeData(logger, data, seq.Uint())
+					e.observeData(logger, data, native_seq.Uint())
 				}
 
 				health, err := e.retrievePayload(e.aptosHealth)
@@ -288,9 +306,9 @@ func (e *Watcher) Run(ctx context.Context) error {
 				}
 
 				if !gjson.Valid(string(health)) {
-					logger.Error("InvalidJson: " + string(health))
+					logger.Error("Invalid JSON in health response: " + string(health))
 					p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAptos, 1)
-					break
+					continue
 
 				}