Эх сурвалжийг харах

Stacks Watcher update (#4591)

* fix: add reobserve call and counter

* fix: add empty block response for not found blocks

* chore: fix return values
jannik-stacks 21 цаг өмнө
parent
commit
8514fa9e16

+ 8 - 0
node/pkg/watchers/stacks/fetch.go

@@ -144,6 +144,14 @@ func (w *Watcher) fetchTenureBlocksByBurnHeight(ctx context.Context, height uint
 	}
 	defer resp.Body.Close()
 
+	if resp.StatusCode == http.StatusNotFound {
+		// Burn block was skipped, return empty tenure blocks
+		return &StacksV3TenureBlocksResponse{
+			BurnBlockHeight: height,
+			StacksBlocks:    []StacksV3TenureBlock{},
+		}, nil
+	}
+
 	if resp.StatusCode != http.StatusOK {
 		return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
 	}

+ 47 - 28
node/pkg/watchers/stacks/watcher.go

@@ -118,9 +118,31 @@ func (w *Watcher) Run(ctx context.Context) error {
 			case <-ctx.Done():
 				return nil
 			case req := <-w.obsvReqC:
-				logger.Info("received observation request",
-					zap.String("tx_hash", hex.EncodeToString(req.TxHash)))
-				continue
+
+				if req.ChainId != uint32(vaa.ChainIDStacks) {
+					logger.Error("Unexpected chain ID",
+						zap.Uint32("chain_id", req.ChainId))
+					continue
+				}
+
+				logger.Info("Received Stacks observation request",
+					zap.String("tx_hash", hex.EncodeToString(req.TxHash)),
+					zap.Int64("timestamp", req.Timestamp))
+
+				numObservations, err := w.Reobserve(ctx, vaa.ChainIDStacks, req.TxHash, "")
+				if err != nil {
+					logger.Error("Failed to process observation request",
+						zap.String("tx_hash", hex.EncodeToString(req.TxHash)),
+						zap.Uint32("num_observations", numObservations),
+						zap.Error(err),
+					)
+					continue
+				}
+
+				logger.Info("Reobserved transactions",
+					zap.String("tx_hash", hex.EncodeToString(req.TxHash)),
+					zap.Uint32("num_observations", numObservations),
+				)
 			}
 		}
 	})
@@ -131,7 +153,7 @@ func (w *Watcher) Run(ctx context.Context) error {
 	// Wait for error or context cancellation
 	select {
 	case <-ctx.Done():
-		logger.Info("context cancelled, stopping Stacks watcher")
+		logger.Info("Context cancelled, stopping Stacks watcher")
 		return nil
 	case err := <-errC:
 		return err
@@ -153,16 +175,15 @@ func (w *Watcher) Reobserve(ctx context.Context, chainID vaa.ChainID, txID []byt
 		zap.String("custom_endpoint", customEndpoint))
 
 	// Process the transaction
-	err := w.reobserveStacksTransactionByTxId(ctx, txIdString, logger)
+	count, err := w.reobserveStacksTransactionByTxId(ctx, txIdString, logger)
 	if err != nil {
 		logger.Error("Failed to reobserve transaction",
-			zap.String("tx_hash", txIdString),
+			zap.String("tx_id", txIdString),
 			zap.Error(err))
 		return 0, err
 	}
 
-	// Return 1 to indicate we processed the request
-	return 1, nil
+	return count, nil
 }
 
 /// RUN
@@ -319,7 +340,7 @@ func (w *Watcher) processStacksBlock(ctx context.Context, blockHash string, logg
 	}
 
 	for _, tx := range replay.Transactions {
-		if err := w.processStacksTransaction(ctx, &tx, replay, false, logger); err != nil {
+		if _, err := w.processStacksTransaction(ctx, &tx, replay, false, logger); err != nil {
 			logger.Error("Failed to process transaction",
 				zap.String("tx_id", tx.TxId),
 				zap.Error(err))
@@ -331,32 +352,32 @@ func (w *Watcher) processStacksBlock(ctx context.Context, blockHash string, logg
 }
 
 // Processes a single transaction from a Stacks block
-func (w *Watcher) processStacksTransaction(_ context.Context, tx *StacksV3TenureBlockTransaction, replay *StacksV3TenureBlockReplayResponse, isReobservation bool, logger *zap.Logger) error {
+func (w *Watcher) processStacksTransaction(_ context.Context, tx *StacksV3TenureBlockTransaction, replay *StacksV3TenureBlockReplayResponse, isReobservation bool, logger *zap.Logger) (uint32, error) {
 	logger.Info("Processing Stacks transaction", zap.String("tx_id", tx.TxId))
 
 	// non-okay response
 	if !strings.HasPrefix(tx.ResultHex, "0x07") { // (ok) is 0x07...
-		return fmt.Errorf("transaction %s failed due to response hex: %s", tx.TxId, tx.ResultHex)
+		return 0, fmt.Errorf("transaction %s failed due to response hex: %s", tx.TxId, tx.ResultHex)
 	}
 
 	// abort_by_response
 	if !isTransactionResultCommitted(tx.Result) {
-		return fmt.Errorf("transaction %s failed due to response: %v", tx.TxId, tx.Result)
+		return 0, fmt.Errorf("transaction %s failed due to response: %v", tx.TxId, tx.Result)
 	}
 
 	// abort_by_post_condition
 	if tx.PostConditionAborted {
-		return fmt.Errorf("transaction %s failed due to post-condition aborted", tx.TxId)
+		return 0, fmt.Errorf("transaction %s failed due to post-condition aborted", tx.TxId)
 	}
 
 	// other runtime error
 	if tx.VmError != nil {
-		return fmt.Errorf("transaction %s failed due to runtime error: %s", tx.TxId, *tx.VmError)
+		return 0, fmt.Errorf("transaction %s failed due to runtime error: %s", tx.TxId, *tx.VmError)
 	}
 
 	// success
 
-	wormholeEvents := 0
+	wormholeEvents := uint32(0)
 	for _, event := range tx.Events {
 		// Skip events that don't match our criteria
 		if !event.Committed ||
@@ -410,28 +431,28 @@ func (w *Watcher) processStacksTransaction(_ context.Context, tx *StacksV3Tenure
 
 	logger.Info("Finished processing transaction events",
 		zap.String("tx_id", tx.TxId),
-		zap.Int("wormhole_events_processed", wormholeEvents))
+		zap.Uint32("wormhole_events_processed", wormholeEvents))
 
-	return nil
+	return wormholeEvents, nil
 }
 
 // Processes a single transaction by its txid (used for reobservations)
-func (w *Watcher) reobserveStacksTransactionByTxId(ctx context.Context, txId string, logger *zap.Logger) error {
+func (w *Watcher) reobserveStacksTransactionByTxId(ctx context.Context, txId string, logger *zap.Logger) (uint32, error) {
 	logger.Info("Processing transaction by txid", zap.String("tx_id", txId))
 
 	transaction, err := w.fetchStacksTransactionByTxId(ctx, txId)
 	if err != nil {
-		return fmt.Errorf("failed to fetch transaction: %w", err)
+		return 0, fmt.Errorf("failed to fetch transaction: %w", err)
 	}
 
 	replay, err := w.fetchStacksBlockReplay(ctx, transaction.IndexBlockHash)
 	if err != nil {
-		return fmt.Errorf("failed to fetch block replay: %w", err)
+		return 0, fmt.Errorf("failed to fetch block replay: %w", err)
 	}
 
 	stableBitcoinBlockHeight := w.stableBitcoinHeight.Load()
 	if replay.BlockHeight > stableBitcoinBlockHeight {
-		return fmt.Errorf("block replay height %d is greater than stable Bitcoin (burn) block height %d", replay.BlockHeight, stableBitcoinBlockHeight)
+		return 0, fmt.Errorf("block replay height %d is greater than stable Bitcoin (burn) block height %d", replay.BlockHeight, stableBitcoinBlockHeight)
 	}
 
 	var tx *StacksV3TenureBlockTransaction
@@ -443,18 +464,16 @@ func (w *Watcher) reobserveStacksTransactionByTxId(ctx context.Context, txId str
 	}
 
 	if tx == nil {
-		return fmt.Errorf("transaction %s not found in block replay", txId)
+		return 0, fmt.Errorf("transaction %s not found in block replay", txId)
 	}
 
 	// Process the transaction using the same processing function used in polling
-	if err := w.processStacksTransaction(ctx, tx, replay, true, logger); err != nil {
-		return fmt.Errorf("failed to process transaction: %w", err)
+	count, err := w.processStacksTransaction(ctx, tx, replay, true, logger)
+	if err != nil {
+		return 0, fmt.Errorf("failed to process transaction: %w", err)
 	}
 
-	logger.Info("Successfully processed transaction for reobservation",
-		zap.String("tx_id", txId))
-
-	return nil
+	return count, nil
 }
 
 // Processes a core contract event tuple and extracts message fields