소스 검색

Node/CCQ: Solana min context slot fix (#3751)

* Node/CCQ: Solana min context slot fix

* Node/CCQ: retry not updating timeout
bruce-riley 1 년 전
부모
커밋
3f074f30ee
4개의 변경된 파일55개의 추가작업 그리고 37개의 파일을 삭제
  1. 1 1
      node/pkg/query/query.go
  2. 2 0
      node/pkg/query/request.go
  3. 37 21
      node/pkg/watchers/solana/ccq.go
  4. 15 15
      node/pkg/watchers/solana/ccq_test.go

+ 1 - 1
node/pkg/query/query.go

@@ -392,7 +392,7 @@ func handleQueryRequestsImpl(
 									zap.Stringer("lastUpdateTime", pcq.lastUpdateTime),
 									zap.Stringer("lastUpdateTime", pcq.lastUpdateTime),
 									zap.String("chainID", pq.queries[requestIdx].req.Request.ChainId.String()),
 									zap.String("chainID", pq.queries[requestIdx].req.Request.ChainId.String()),
 								)
 								)
-								pcq.ccqForwardToWatcher(qLogger, pq.receiveTime)
+								pcq.ccqForwardToWatcher(qLogger, now)
 							}
 							}
 						}
 						}
 					}
 					}

+ 2 - 0
node/pkg/query/request.go

@@ -125,6 +125,8 @@ const SolanaAccountQueryRequestType ChainSpecificQueryType = 4
 type SolanaAccountQueryRequest struct {
 type SolanaAccountQueryRequest struct {
 	// Commitment identifies the commitment level to be used in the queried. Currently it may only "finalized".
 	// Commitment identifies the commitment level to be used in the queried. Currently it may only "finalized".
 	// Before we can support "confirmed", we need a way to read the account data and the block information atomically.
 	// Before we can support "confirmed", we need a way to read the account data and the block information atomically.
+	// We would also need to deal with the fact that queries are only handled in the finalized watcher and it does not
+	// have access to the latest confirmed slot needed for MinContextSlot retries.
 	Commitment string
 	Commitment string
 
 
 	// The minimum slot that the request can be evaluated at. Zero means unused.
 	// The minimum slot that the request can be evaluated at. Zero means unused.

+ 37 - 21
node/pkg/watchers/solana/ccq.go

@@ -5,7 +5,6 @@ import (
 	"encoding/hex"
 	"encoding/hex"
 	"encoding/json"
 	"encoding/json"
 	"errors"
 	"errors"
-	"fmt"
 	"strconv"
 	"strconv"
 	"time"
 	"time"
 
 
@@ -227,24 +226,27 @@ func (w *SolanaWatcher) ccqCheckForMinSlotContext(
 		return false
 		return false
 	}
 	}
 
 
-	isMinContext, currentSlot, err := ccqIsMinContextSlotError(err)
-	if err != nil {
-		w.ccqLogger.Error("failed to parse for min context slot error", zap.Error(err))
+	isMinContext, currentSlotFromError := ccqIsMinContextSlotError(err)
+	if !isMinContext {
 		return false
 		return false
 	}
 	}
 
 
-	if !isMinContext {
-		return false
+	var currentSlot uint64
+	if currentSlotFromError != 0 {
+		currentSlot = currentSlotFromError
+	} else {
+		currentSlot = w.GetLatestFinalizedBlockNumber()
 	}
 	}
 
 
 	// Estimate how far in the future the requested slot is, using our estimated slot time.
 	// Estimate how far in the future the requested slot is, using our estimated slot time.
 	futureSlotEstimate := time.Duration(req.MinContextSlot-currentSlot) * CCQ_ESTIMATED_SLOT_TIME
 	futureSlotEstimate := time.Duration(req.MinContextSlot-currentSlot) * CCQ_ESTIMATED_SLOT_TIME
 
 
-	// If the requested slot is more than ten seconds in the future, use the regular retry mechanism.
-	if futureSlotEstimate > query.RetryInterval {
+	// If the requested slot is definitively more than the retry interval, use the regular retry mechanism.
+	if futureSlotEstimate > query.RetryInterval*2 {
 		w.ccqLogger.Info("minimum context slot is too far in the future, requesting slow retry",
 		w.ccqLogger.Info("minimum context slot is too far in the future, requesting slow retry",
 			zap.String("requestId", requestId),
 			zap.String("requestId", requestId),
 			zap.Uint64("currentSlot", currentSlot),
 			zap.Uint64("currentSlot", currentSlot),
+			zap.Uint64("currentSlotFromError", currentSlotFromError),
 			zap.Uint64("minContextSlot", req.MinContextSlot),
 			zap.Uint64("minContextSlot", req.MinContextSlot),
 			zap.Stringer("futureSlotEstimate", futureSlotEstimate),
 			zap.Stringer("futureSlotEstimate", futureSlotEstimate),
 		)
 		)
@@ -252,16 +254,26 @@ func (w *SolanaWatcher) ccqCheckForMinSlotContext(
 	}
 	}
 
 
 	// Kick off the retry after a short delay.
 	// Kick off the retry after a short delay.
-	go w.ccqSleepAndRetryAccountQuery(ctx, queryRequest, req, requestId, currentSlot, giveUpTime, log)
+	go w.ccqSleepAndRetryAccountQuery(ctx, queryRequest, req, requestId, currentSlot, currentSlotFromError, giveUpTime, log)
 	return true
 	return true
 }
 }
 
 
 // ccqSleepAndRetryAccountQuery does a short sleep and then initiates a retry.
 // ccqSleepAndRetryAccountQuery does a short sleep and then initiates a retry.
-func (w *SolanaWatcher) ccqSleepAndRetryAccountQuery(ctx context.Context, queryRequest *query.PerChainQueryInternal, req *query.SolanaAccountQueryRequest, requestId string, currentSlot uint64, giveUpTime time.Time, log bool) {
+func (w *SolanaWatcher) ccqSleepAndRetryAccountQuery(
+	ctx context.Context,
+	queryRequest *query.PerChainQueryInternal,
+	req *query.SolanaAccountQueryRequest,
+	requestId string,
+	currentSlot uint64,
+	currentSlotFromError uint64,
+	giveUpTime time.Time,
+	log bool,
+) {
 	if log {
 	if log {
 		w.ccqLogger.Info("minimum context slot has not been reached, will retry shortly",
 		w.ccqLogger.Info("minimum context slot has not been reached, will retry shortly",
 			zap.String("requestId", requestId),
 			zap.String("requestId", requestId),
 			zap.Uint64("currentSlot", currentSlot),
 			zap.Uint64("currentSlot", currentSlot),
+			zap.Uint64("currentSlotFromError", currentSlotFromError),
 			zap.Uint64("minContextSlot", req.MinContextSlot),
 			zap.Uint64("minContextSlot", req.MinContextSlot),
 			zap.Stringer("retryInterval", CCQ_FAST_RETRY_INTERVAL),
 			zap.Stringer("retryInterval", CCQ_FAST_RETRY_INTERVAL),
 		)
 		)
@@ -277,42 +289,46 @@ func (w *SolanaWatcher) ccqSleepAndRetryAccountQuery(ctx context.Context, queryR
 }
 }
 
 
 // ccqIsMinContextSlotError parses an error to see if it is "Minimum context slot has not been reached". If it is, it returns the slot number
 // ccqIsMinContextSlotError parses an error to see if it is "Minimum context slot has not been reached". If it is, it returns the slot number
-func ccqIsMinContextSlotError(err error) (bool, uint64, error) {
+func ccqIsMinContextSlotError(err error) (bool, uint64) {
 	/*
 	/*
-	  A MinContextSlot error looks like this (and contains the context slot):
-	  "(*jsonrpc.RPCError)(0xc00b3881b0)({\n Code: (int) -32016,\n Message: (string) (len=41) \"Minimum context slot has not been reached\",\n Data: (map[string]interface {}) (len=1) {\n  (string) (len=11) \"contextSlot\": (json.Number) (len=4) \"3630\"\n }\n})\n"
+		  A MinContextSlot error looks like this (and contains the context slot):
+		  "(*jsonrpc.RPCError)(0xc00b3881b0)({\n Code: (int) -32016,\n Message: (string) (len=41) \"Minimum context slot has not been reached\",\n Data: (map[string]interface {}) (len=1) {\n  (string) (len=11) \"contextSlot\": (json.Number) (len=4) \"3630\"\n }\n})\n"
+
+			Except some endpoints return something like this instead:
+			"(*jsonrpc.RPCError)(0xc03c0bcd20)({\n Code: (int) -32016,\n Message: (string) (len=41) \"Minimum context slot has not been reached\",\n Data: (interface {}) <nil>\n})\n"
 	*/
 	*/
 	var rpcErr *jsonrpc.RPCError
 	var rpcErr *jsonrpc.RPCError
 	if !errors.As(err, &rpcErr) {
 	if !errors.As(err, &rpcErr) {
-		return false, 0, nil // Some other kind of error. That's okay.
+		return false, 0 // Some other kind of error.
 	}
 	}
 
 
 	if rpcErr.Code != -32016 { // Minimum context slot has not been reached
 	if rpcErr.Code != -32016 { // Minimum context slot has not been reached
-		return false, 0, nil // Some other kind of RPC error. That's okay.
+		return false, 0 // Some other kind of RPC error.
 	}
 	}
 
 
-	// From here on down, any error is bad because the MinContextSlot error is not in the expected format.
+	// We know it is a MinContextSlot error. If it contains the current slot number, extract and return that.
+	// Since some Solana endpoints do not return that, we can't treat it as an error if it is missing.
 	m, ok := rpcErr.Data.(map[string]interface{})
 	m, ok := rpcErr.Data.(map[string]interface{})
 	if !ok {
 	if !ok {
-		return false, 0, fmt.Errorf("failed to extract data from min context slot error")
+		return true, 0
 	}
 	}
 
 
 	contextSlot, ok := m["contextSlot"]
 	contextSlot, ok := m["contextSlot"]
 	if !ok {
 	if !ok {
-		return false, 0, fmt.Errorf(`min context slot error does not contain "contextSlot"`)
+		return true, 0
 	}
 	}
 
 
 	currentSlotAsJson, ok := contextSlot.(json.Number)
 	currentSlotAsJson, ok := contextSlot.(json.Number)
 	if !ok {
 	if !ok {
-		return false, 0, fmt.Errorf(`min context slot error "contextSlot" is not json.Number`)
+		return true, 0
 	}
 	}
 
 
 	currentSlot, typeErr := strconv.ParseUint(currentSlotAsJson.String(), 10, 64)
 	currentSlot, typeErr := strconv.ParseUint(currentSlotAsJson.String(), 10, 64)
 	if typeErr != nil {
 	if typeErr != nil {
-		return false, 0, fmt.Errorf(`min context slot error "contextSlot" is not uint64: %w`, err)
+		return true, 0
 	}
 	}
 
 
-	return true, currentSlot, nil
+	return true, currentSlot
 }
 }
 
 
 type M map[string]interface{}
 type M map[string]interface{}

+ 15 - 15
node/pkg/watchers/solana/ccq_test.go

@@ -3,7 +3,6 @@ package solana
 import (
 import (
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
-	"strings"
 	"testing"
 	"testing"
 
 
 	"github.com/certusone/wormhole/node/pkg/query"
 	"github.com/certusone/wormhole/node/pkg/query"
@@ -26,16 +25,14 @@ func TestCcqIsMinContextSlotErrorSuccess(t *testing.T) {
 		},
 		},
 	}
 	}
 
 
-	isMinContext, currentSlot, err := ccqIsMinContextSlotError(error(myErr))
-	require.NoError(t, err)
+	isMinContext, currentSlot := ccqIsMinContextSlotError(error(myErr))
 	require.True(t, isMinContext)
 	require.True(t, isMinContext)
 	assert.Equal(t, uint64(13526), currentSlot)
 	assert.Equal(t, uint64(13526), currentSlot)
 }
 }
 
 
 func TestCcqIsMinContextSlotErrorSomeOtherError(t *testing.T) {
 func TestCcqIsMinContextSlotErrorSomeOtherError(t *testing.T) {
 	myErr := fmt.Errorf("Some other error")
 	myErr := fmt.Errorf("Some other error")
-	isMinContext, _, err := ccqIsMinContextSlotError(error(myErr))
-	require.NoError(t, err)
+	isMinContext, _ := ccqIsMinContextSlotError(error(myErr))
 	require.False(t, isMinContext)
 	require.False(t, isMinContext)
 }
 }
 
 
@@ -48,8 +45,7 @@ func TestCcqIsMinContextSlotErrorSomeOtherRPCError(t *testing.T) {
 		},
 		},
 	}
 	}
 
 
-	isMinContext, _, err := ccqIsMinContextSlotError(error(myErr))
-	require.NoError(t, err)
+	isMinContext, _ := ccqIsMinContextSlotError(error(myErr))
 	require.False(t, isMinContext)
 	require.False(t, isMinContext)
 }
 }
 
 
@@ -59,8 +55,9 @@ func TestCcqIsMinContextSlotErrorNoData(t *testing.T) {
 		Message: "Minimum context slot has not been reached",
 		Message: "Minimum context slot has not been reached",
 	}
 	}
 
 
-	_, _, err := ccqIsMinContextSlotError(error(myErr))
-	assert.EqualError(t, err, `failed to extract data from min context slot error`)
+	isMinContext, currentSlot := ccqIsMinContextSlotError(error(myErr))
+	require.True(t, isMinContext)
+	assert.Equal(t, uint64(0), currentSlot)
 }
 }
 
 
 func TestCcqIsMinContextSlotErrorContextSlotMissing(t *testing.T) {
 func TestCcqIsMinContextSlotErrorContextSlotMissing(t *testing.T) {
@@ -72,8 +69,9 @@ func TestCcqIsMinContextSlotErrorContextSlotMissing(t *testing.T) {
 		},
 		},
 	}
 	}
 
 
-	_, _, err := ccqIsMinContextSlotError(error(myErr))
-	assert.EqualError(t, err, `min context slot error does not contain "contextSlot"`)
+	isMinContext, currentSlot := ccqIsMinContextSlotError(error(myErr))
+	require.True(t, isMinContext)
+	assert.Equal(t, uint64(0), currentSlot)
 }
 }
 
 
 func TestCcqIsMinContextSlotErrorContextSlotIsNotAJsonNumber(t *testing.T) {
 func TestCcqIsMinContextSlotErrorContextSlotIsNotAJsonNumber(t *testing.T) {
@@ -85,8 +83,9 @@ func TestCcqIsMinContextSlotErrorContextSlotIsNotAJsonNumber(t *testing.T) {
 		},
 		},
 	}
 	}
 
 
-	_, _, err := ccqIsMinContextSlotError(error(myErr))
-	assert.EqualError(t, err, `min context slot error "contextSlot" is not json.Number`)
+	isMinContext, currentSlot := ccqIsMinContextSlotError(error(myErr))
+	require.True(t, isMinContext)
+	assert.Equal(t, uint64(0), currentSlot)
 }
 }
 
 
 func TestCcqIsMinContextSlotErrorContextSlotIsNotUint64(t *testing.T) {
 func TestCcqIsMinContextSlotErrorContextSlotIsNotUint64(t *testing.T) {
@@ -98,6 +97,7 @@ func TestCcqIsMinContextSlotErrorContextSlotIsNotUint64(t *testing.T) {
 		},
 		},
 	}
 	}
 
 
-	_, _, err := ccqIsMinContextSlotError(error(myErr))
-	assert.True(t, strings.Contains(err.Error(), `min context slot error "contextSlot" is not uint64`))
+	isMinContext, currentSlot := ccqIsMinContextSlotError(error(myErr))
+	require.True(t, isMinContext)
+	assert.Equal(t, uint64(0), currentSlot)
 }
 }