|
|
@@ -5,6 +5,7 @@ import (
|
|
|
"encoding/hex"
|
|
|
"encoding/json"
|
|
|
"errors"
|
|
|
+ "fmt"
|
|
|
"strconv"
|
|
|
"time"
|
|
|
|
|
|
@@ -28,9 +29,8 @@ const (
|
|
|
CCQ_FAST_RETRY_INTERVAL = 200 * time.Millisecond
|
|
|
)
|
|
|
|
|
|
-// ccqSendQueryResponse sends a response back to the query handler. In the case of an error, the response parameter may be nil.
|
|
|
-func (w *SolanaWatcher) ccqSendQueryResponse(req *query.PerChainQueryInternal, status query.QueryStatus, response query.ChainSpecificResponse) {
|
|
|
- queryResponse := query.CreatePerChainQueryResponseInternal(req.RequestID, req.RequestIdx, req.Request.ChainId, status, response)
|
|
|
+// ccqSendQueryResponse sends a response back to the query handler.
|
|
|
+func (w *SolanaWatcher) ccqSendQueryResponse(queryResponse *query.PerChainQueryResponseInternal) {
|
|
|
select {
|
|
|
case w.queryResponseC <- queryResponse:
|
|
|
w.ccqLogger.Debug("published query response to handler")
|
|
|
@@ -39,9 +39,14 @@ func (w *SolanaWatcher) ccqSendQueryResponse(req *query.PerChainQueryInternal, s
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// ccqSendErrorResponse creates an error query response and sends it back to the query handler. It sets the response field to nil.
|
|
|
+func (w *SolanaWatcher) ccqSendErrorResponse(req *query.PerChainQueryInternal, status query.QueryStatus) {
|
|
|
+ queryResponse := query.CreatePerChainQueryResponseInternal(req.RequestID, req.RequestIdx, req.Request.ChainId, status, nil)
|
|
|
+ w.ccqSendQueryResponse(queryResponse)
|
|
|
+}
|
|
|
+
|
|
|
// ccqHandleQuery is the top-level query handler. It breaks out the requests based on the type and calls the appropriate handler.
|
|
|
func (w *SolanaWatcher) ccqHandleQuery(ctx context.Context, queryRequest *query.PerChainQueryInternal) {
|
|
|
-
|
|
|
// This can't happen unless there is a programming error - the caller
|
|
|
// is expected to send us only requests for our chainID.
|
|
|
if queryRequest.Request.ChainId != w.chainID {
|
|
|
@@ -50,33 +55,40 @@ func (w *SolanaWatcher) ccqHandleQuery(ctx context.Context, queryRequest *query.
|
|
|
|
|
|
start := time.Now()
|
|
|
|
|
|
+ giveUpTime := start.Add(query.RetryInterval).Add(-CCQ_RETRY_SLOP)
|
|
|
switch req := queryRequest.Request.Query.(type) {
|
|
|
case *query.SolanaAccountQueryRequest:
|
|
|
- giveUpTime := start.Add(query.RetryInterval).Add(-CCQ_RETRY_SLOP)
|
|
|
- w.ccqHandleSolanaAccountQueryRequest(ctx, queryRequest, req, giveUpTime, false)
|
|
|
+ w.ccqHandleSolanaAccountQueryRequest(ctx, queryRequest, req, giveUpTime)
|
|
|
+ case *query.SolanaPdaQueryRequest:
|
|
|
+ w.ccqHandleSolanaPdaQueryRequest(ctx, queryRequest, req, giveUpTime)
|
|
|
default:
|
|
|
w.ccqLogger.Warn("received unsupported request type",
|
|
|
zap.Uint8("payload", uint8(queryRequest.Request.Query.Type())),
|
|
|
)
|
|
|
- w.ccqSendQueryResponse(queryRequest, query.QueryFatalError, nil)
|
|
|
+ w.ccqSendErrorResponse(queryRequest, query.QueryFatalError)
|
|
|
}
|
|
|
|
|
|
query.TotalWatcherTime.WithLabelValues(w.chainID.String()).Observe(float64(time.Since(start).Milliseconds()))
|
|
|
}
|
|
|
|
|
|
-// ccqHandleSolanaAccountQueryRequest is the query handler for a sol_account request.
|
|
|
-func (w *SolanaWatcher) ccqHandleSolanaAccountQueryRequest(ctx context.Context, queryRequest *query.PerChainQueryInternal, req *query.SolanaAccountQueryRequest, giveUpTime time.Time, isRetry bool) {
|
|
|
- requestId := "sol_account:" + queryRequest.ID()
|
|
|
- if !isRetry {
|
|
|
- w.ccqLogger.Info("received a sol_account query",
|
|
|
- zap.Uint64("minContextSlot", req.MinContextSlot),
|
|
|
- zap.Uint64("dataSliceOffset", req.DataSliceOffset),
|
|
|
- zap.Uint64("dataSliceLength", req.DataSliceLength),
|
|
|
- zap.Int("numAccounts", len(req.Accounts)),
|
|
|
- zap.String("requestId", requestId),
|
|
|
- )
|
|
|
- }
|
|
|
+// ccqCustomPublisher is an interface used by ccqBaseHandleSolanaAccountQueryRequest to specify how to publish the response from a query.
|
|
|
+type ccqCustomPublisher interface {
|
|
|
+ // publish should take a sol_account query response and publish it as the appropriate response type.
|
|
|
+ publish(*query.PerChainQueryResponseInternal, *query.SolanaAccountQueryResponse)
|
|
|
+}
|
|
|
|
|
|
+// ccqBaseHandleSolanaAccountQueryRequest is the base Solana Account query handler. It does the actual account queries, and if necessary does fast retries
|
|
|
+// until the minimum context slot is reached. It does not publish the response, but instead invokes the query specific publisher that is passed in.
|
|
|
+func (w *SolanaWatcher) ccqBaseHandleSolanaAccountQueryRequest(
|
|
|
+ ctx context.Context,
|
|
|
+ queryRequest *query.PerChainQueryInternal,
|
|
|
+ req *query.SolanaAccountQueryRequest,
|
|
|
+ giveUpTime time.Time,
|
|
|
+ tag string,
|
|
|
+ requestId string,
|
|
|
+ isRetry bool,
|
|
|
+ publisher ccqCustomPublisher,
|
|
|
+) {
|
|
|
rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
|
|
|
defer cancel()
|
|
|
|
|
|
@@ -106,18 +118,18 @@ func (w *SolanaWatcher) ccqHandleSolanaAccountQueryRequest(ctx context.Context,
|
|
|
// Read the accounts.
|
|
|
info, err := w.getMultipleAccountsWithOpts(rCtx, accounts, ¶ms)
|
|
|
if err != nil {
|
|
|
- if w.ccqCheckForMinSlotContext(ctx, queryRequest, req, requestId, err, giveUpTime, !isRetry) {
|
|
|
+ if w.ccqCheckForMinSlotContext(ctx, queryRequest, req, requestId, err, giveUpTime, !isRetry, tag, publisher) {
|
|
|
// Return without posting a response because a go routine was created to handle it.
|
|
|
return
|
|
|
}
|
|
|
- w.ccqLogger.Error("read failed for sol_account query request",
|
|
|
+ w.ccqLogger.Error(fmt.Sprintf("read failed for %s query request", tag),
|
|
|
zap.String("requestId", requestId),
|
|
|
zap.Any("accounts", accounts),
|
|
|
zap.Any("params", params),
|
|
|
zap.Error(err),
|
|
|
)
|
|
|
|
|
|
- w.ccqSendQueryResponse(queryRequest, query.QueryRetryNeeded, nil)
|
|
|
+ w.ccqSendErrorResponse(queryRequest, query.QueryRetryNeeded)
|
|
|
return
|
|
|
}
|
|
|
|
|
|
@@ -130,36 +142,36 @@ func (w *SolanaWatcher) ccqHandleSolanaAccountQueryRequest(ctx context.Context,
|
|
|
MaxSupportedTransactionVersion: &maxSupportedTransactionVersion,
|
|
|
})
|
|
|
if err != nil {
|
|
|
- w.ccqLogger.Error("failed to read block time for sol_account query request",
|
|
|
+ w.ccqLogger.Error(fmt.Sprintf("failed to read block time for %s query request", tag),
|
|
|
zap.String("requestId", requestId),
|
|
|
zap.Uint64("slotNumber", info.Context.Slot),
|
|
|
zap.Error(err),
|
|
|
)
|
|
|
|
|
|
- w.ccqSendQueryResponse(queryRequest, query.QueryRetryNeeded, nil)
|
|
|
+ w.ccqSendErrorResponse(queryRequest, query.QueryRetryNeeded)
|
|
|
return
|
|
|
}
|
|
|
|
|
|
if info == nil {
|
|
|
- w.ccqLogger.Error("read for sol_account query request returned nil info", zap.String("requestId", requestId))
|
|
|
- w.ccqSendQueryResponse(queryRequest, query.QueryFatalError, nil)
|
|
|
+ w.ccqLogger.Error(fmt.Sprintf("read for %s query request returned nil info", tag), zap.String("requestId", requestId))
|
|
|
+ w.ccqSendErrorResponse(queryRequest, query.QueryFatalError)
|
|
|
return
|
|
|
}
|
|
|
|
|
|
if info.Value == nil {
|
|
|
- w.ccqLogger.Error("read for sol_account query request returned nil value", zap.String("requestId", requestId))
|
|
|
- w.ccqSendQueryResponse(queryRequest, query.QueryFatalError, nil)
|
|
|
+ w.ccqLogger.Error(fmt.Sprintf("read for %s query request returned nil value", tag), zap.String("requestId", requestId))
|
|
|
+ w.ccqSendErrorResponse(queryRequest, query.QueryFatalError)
|
|
|
return
|
|
|
}
|
|
|
|
|
|
if len(info.Value) != len(req.Accounts) {
|
|
|
- w.ccqLogger.Error("read for sol_account query request returned unexpected number of results",
|
|
|
+ w.ccqLogger.Error(fmt.Sprintf("read for %s query request returned unexpected number of results", tag),
|
|
|
zap.String("requestId", requestId),
|
|
|
zap.Int("numAccounts", len(req.Accounts)),
|
|
|
zap.Int("numValues", len(info.Value)),
|
|
|
)
|
|
|
|
|
|
- w.ccqSendQueryResponse(queryRequest, query.QueryFatalError, nil)
|
|
|
+ w.ccqSendErrorResponse(queryRequest, query.QueryFatalError)
|
|
|
return
|
|
|
}
|
|
|
|
|
|
@@ -167,13 +179,13 @@ func (w *SolanaWatcher) ccqHandleSolanaAccountQueryRequest(ctx context.Context,
|
|
|
results := make([]query.SolanaAccountResult, 0, len(req.Accounts))
|
|
|
for idx, val := range info.Value {
|
|
|
if val == nil { // This can happen for an invalid account.
|
|
|
- w.ccqLogger.Error("read of account for sol_account query request failed, val is nil", zap.String("requestId", requestId), zap.Any("account", req.Accounts[idx]))
|
|
|
- w.ccqSendQueryResponse(queryRequest, query.QueryFatalError, nil)
|
|
|
+ w.ccqLogger.Error(fmt.Sprintf("read of account for %s query request failed, val is nil", tag), zap.String("requestId", requestId), zap.Any("account", req.Accounts[idx]))
|
|
|
+ w.ccqSendErrorResponse(queryRequest, query.QueryFatalError)
|
|
|
return
|
|
|
}
|
|
|
if val.Data == nil {
|
|
|
- w.ccqLogger.Error("read of account for sol_account query request failed, data is nil", zap.String("requestId", requestId), zap.Any("account", req.Accounts[idx]))
|
|
|
- w.ccqSendQueryResponse(queryRequest, query.QueryFatalError, nil)
|
|
|
+ w.ccqLogger.Error(fmt.Sprintf("read of account for %s query request failed, data is nil", tag), zap.String("requestId", requestId), zap.Any("account", req.Accounts[idx]))
|
|
|
+ w.ccqSendErrorResponse(queryRequest, query.QueryFatalError)
|
|
|
return
|
|
|
}
|
|
|
results = append(results, query.SolanaAccountResult{
|
|
|
@@ -193,7 +205,7 @@ func (w *SolanaWatcher) ccqHandleSolanaAccountQueryRequest(ctx context.Context,
|
|
|
Results: results,
|
|
|
}
|
|
|
|
|
|
- w.ccqLogger.Info("account read for sol_account_query succeeded",
|
|
|
+ w.ccqLogger.Info(fmt.Sprintf("account read for %s query succeeded", tag),
|
|
|
zap.String("requestId", requestId),
|
|
|
zap.Uint64("slotNumber", info.Context.Slot),
|
|
|
zap.Uint64("blockTime", uint64(*block.BlockTime)),
|
|
|
@@ -201,7 +213,8 @@ func (w *SolanaWatcher) ccqHandleSolanaAccountQueryRequest(ctx context.Context,
|
|
|
zap.Uint64("blockHeight", *block.BlockHeight),
|
|
|
)
|
|
|
|
|
|
- w.ccqSendQueryResponse(queryRequest, query.QuerySuccess, resp)
|
|
|
+ // Publish the response using the custom publisher.
|
|
|
+ publisher.publish(query.CreatePerChainQueryResponseInternal(queryRequest.RequestID, queryRequest.RequestIdx, queryRequest.Request.ChainId, query.QuerySuccess, resp), resp)
|
|
|
}
|
|
|
|
|
|
// ccqCheckForMinSlotContext checks to see if the returned error was due to the min context slot not being reached.
|
|
|
@@ -216,6 +229,8 @@ func (w *SolanaWatcher) ccqCheckForMinSlotContext(
|
|
|
err error,
|
|
|
giveUpTime time.Time,
|
|
|
log bool,
|
|
|
+ tag string,
|
|
|
+ publisher ccqCustomPublisher,
|
|
|
) bool {
|
|
|
if req.MinContextSlot == 0 {
|
|
|
return false
|
|
|
@@ -254,7 +269,7 @@ func (w *SolanaWatcher) ccqCheckForMinSlotContext(
|
|
|
}
|
|
|
|
|
|
// Kick off the retry after a short delay.
|
|
|
- go w.ccqSleepAndRetryAccountQuery(ctx, queryRequest, req, requestId, currentSlot, currentSlotFromError, giveUpTime, log)
|
|
|
+ go w.ccqSleepAndRetryAccountQuery(ctx, queryRequest, req, requestId, currentSlot, currentSlotFromError, giveUpTime, log, tag, publisher)
|
|
|
return true
|
|
|
}
|
|
|
|
|
|
@@ -268,6 +283,8 @@ func (w *SolanaWatcher) ccqSleepAndRetryAccountQuery(
|
|
|
currentSlotFromError uint64,
|
|
|
giveUpTime time.Time,
|
|
|
log bool,
|
|
|
+ tag string,
|
|
|
+ publisher ccqCustomPublisher,
|
|
|
) {
|
|
|
if log {
|
|
|
w.ccqLogger.Info("minimum context slot has not been reached, will retry shortly",
|
|
|
@@ -285,7 +302,7 @@ func (w *SolanaWatcher) ccqSleepAndRetryAccountQuery(
|
|
|
w.ccqLogger.Info("initiating fast retry", zap.String("requestId", requestId))
|
|
|
}
|
|
|
|
|
|
- w.ccqHandleSolanaAccountQueryRequest(ctx, queryRequest, req, giveUpTime, true)
|
|
|
+ w.ccqBaseHandleSolanaAccountQueryRequest(ctx, queryRequest, req, giveUpTime, tag, requestId, true, publisher)
|
|
|
}
|
|
|
|
|
|
// ccqIsMinContextSlotError parses an error to see if it is "Minimum context slot has not been reached". If it is, it returns the slot number
|
|
|
@@ -331,6 +348,143 @@ func ccqIsMinContextSlotError(err error) (bool, uint64) {
|
|
|
return true, currentSlot
|
|
|
}
|
|
|
|
|
|
+// ccqHandleSolanaAccountQueryRequest is the query handler for a sol_account request.
|
|
|
+func (w *SolanaWatcher) ccqHandleSolanaAccountQueryRequest(ctx context.Context, queryRequest *query.PerChainQueryInternal, req *query.SolanaAccountQueryRequest, giveUpTime time.Time) {
|
|
|
+ requestId := "sol_account" + ":" + queryRequest.ID()
|
|
|
+ w.ccqLogger.Info("received a sol_account query",
|
|
|
+ zap.Uint64("minContextSlot", req.MinContextSlot),
|
|
|
+ zap.Uint64("dataSliceOffset", req.DataSliceOffset),
|
|
|
+ zap.Uint64("dataSliceLength", req.DataSliceLength),
|
|
|
+ zap.Int("numAccounts", len(req.Accounts)),
|
|
|
+ zap.String("requestId", requestId),
|
|
|
+ )
|
|
|
+
|
|
|
+ publisher := ccqSolanaAccountPublisher{w}
|
|
|
+ w.ccqBaseHandleSolanaAccountQueryRequest(ctx, queryRequest, req, giveUpTime, "sol_account", requestId, false, publisher)
|
|
|
+}
|
|
|
+
|
|
|
+// ccqSolanaAccountPublisher is the publisher for the sol_account query. All it has to do is forward the response passed in to the watcher, as is.
|
|
|
+type ccqSolanaAccountPublisher struct {
|
|
|
+ w *SolanaWatcher
|
|
|
+}
|
|
|
+
|
|
|
+func (impl ccqSolanaAccountPublisher) publish(resp *query.PerChainQueryResponseInternal, _ *query.SolanaAccountQueryResponse) {
|
|
|
+ impl.w.ccqSendQueryResponse(resp)
|
|
|
+}
|
|
|
+
|
|
|
+// ccqHandleSolanaPdaQueryRequest is the query handler for a sol_pda request.
|
|
|
+func (w *SolanaWatcher) ccqHandleSolanaPdaQueryRequest(ctx context.Context, queryRequest *query.PerChainQueryInternal, req *query.SolanaPdaQueryRequest, giveUpTime time.Time) {
|
|
|
+ requestId := "sol_pda:" + queryRequest.ID()
|
|
|
+ w.ccqLogger.Info("received a sol_pda query",
|
|
|
+ zap.Uint64("minContextSlot", req.MinContextSlot),
|
|
|
+ zap.Uint64("dataSliceOffset", req.DataSliceOffset),
|
|
|
+ zap.Uint64("dataSliceLength", req.DataSliceLength),
|
|
|
+ zap.Int("numPdas", len(req.PDAs)),
|
|
|
+ zap.String("requestId", requestId),
|
|
|
+ )
|
|
|
+
|
|
|
+ // Derive the list of accounts from the PDAs and save those along with the bumps.
|
|
|
+ accounts := make([][query.SolanaPublicKeyLength]byte, 0, len(req.PDAs))
|
|
|
+ bumps := make([]uint8, 0, len(req.PDAs))
|
|
|
+ for _, pda := range req.PDAs {
|
|
|
+ account, bump, err := solana.FindProgramAddress(pda.Seeds, pda.ProgramAddress)
|
|
|
+ if err != nil {
|
|
|
+ w.ccqLogger.Error("failed to derive account from pda for sol_pda query",
|
|
|
+ zap.String("requestId", requestId),
|
|
|
+ zap.String("programAddress", hex.EncodeToString(pda.ProgramAddress[:])),
|
|
|
+ zap.Any("seeds", pda.Seeds),
|
|
|
+ zap.Error(err),
|
|
|
+ )
|
|
|
+
|
|
|
+ w.ccqSendErrorResponse(queryRequest, query.QueryFatalError)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ accounts = append(accounts, account)
|
|
|
+ bumps = append(bumps, bump)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Build a standard sol_account query using the derived accounts.
|
|
|
+ acctReq := &query.SolanaAccountQueryRequest{
|
|
|
+ Commitment: req.Commitment,
|
|
|
+ MinContextSlot: req.MinContextSlot,
|
|
|
+ DataSliceOffset: req.DataSliceOffset,
|
|
|
+ DataSliceLength: req.DataSliceLength,
|
|
|
+ Accounts: accounts,
|
|
|
+ }
|
|
|
+
|
|
|
+ publisher := ccqPdaPublisher{
|
|
|
+ w: w,
|
|
|
+ queryRequest: queryRequest,
|
|
|
+ requestId: requestId,
|
|
|
+ accounts: accounts,
|
|
|
+ bumps: bumps,
|
|
|
+ }
|
|
|
+
|
|
|
+ // Execute the standard sol_account query passing in the publisher to publish a sol_pda response.
|
|
|
+ w.ccqBaseHandleSolanaAccountQueryRequest(ctx, queryRequest, acctReq, giveUpTime, "sol_pda", requestId, false, publisher)
|
|
|
+}
|
|
|
+
|
|
|
+// ccqPdaPublisher is a custom publisher that publishes a sol_pda response.
|
|
|
+type ccqPdaPublisher struct {
|
|
|
+ w *SolanaWatcher
|
|
|
+ queryRequest *query.PerChainQueryInternal
|
|
|
+ requestId string
|
|
|
+ accounts [][query.SolanaPublicKeyLength]byte
|
|
|
+ bumps []uint8
|
|
|
+}
|
|
|
+
|
|
|
+func (pub ccqPdaPublisher) publish(pcrResp *query.PerChainQueryResponseInternal, acctResp *query.SolanaAccountQueryResponse) {
|
|
|
+ if pcrResp == nil {
|
|
|
+ pub.w.ccqLogger.Error("sol_pda query failed, pcrResp is nil", zap.String("requestId", pub.requestId))
|
|
|
+ pub.w.ccqSendErrorResponse(pub.queryRequest, query.QueryFatalError)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ if pcrResp.Status != query.QuerySuccess {
|
|
|
+ // publish() should only get called in success cases.
|
|
|
+ pub.w.ccqLogger.Error("received an unexpected query response for sol_pda query", zap.String("requestId", pub.requestId), zap.Any("pcrResp", pcrResp))
|
|
|
+ pub.w.ccqSendErrorResponse(pub.queryRequest, query.QueryFatalError)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ if acctResp == nil {
|
|
|
+ pub.w.ccqLogger.Error("sol_pda query failed, acctResp is nil", zap.String("requestId", pub.requestId))
|
|
|
+ pub.w.ccqSendErrorResponse(pub.queryRequest, query.QueryFatalError)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(acctResp.Results) != len(pub.accounts) {
|
|
|
+ pub.w.ccqLogger.Error("sol_pda query failed, unexpected number of results", zap.String("requestId", pub.requestId), zap.Int("numResults", len(acctResp.Results)), zap.Int("expectedResults", len(pub.accounts)))
|
|
|
+ pub.w.ccqSendErrorResponse(pub.queryRequest, query.QueryFatalError)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ // Build the PDA response from the base response.
|
|
|
+ results := make([]query.SolanaPdaResult, 0, len(pub.accounts))
|
|
|
+ for idx, acctResult := range acctResp.Results {
|
|
|
+ results = append(results, query.SolanaPdaResult{
|
|
|
+ Account: pub.accounts[idx],
|
|
|
+ Bump: pub.bumps[idx],
|
|
|
+ Lamports: acctResult.Lamports,
|
|
|
+ RentEpoch: acctResult.RentEpoch,
|
|
|
+ Executable: acctResult.Executable,
|
|
|
+ Owner: acctResult.Owner,
|
|
|
+ Data: acctResult.Data,
|
|
|
+ })
|
|
|
+ }
|
|
|
+
|
|
|
+ resp := &query.SolanaPdaQueryResponse{
|
|
|
+ SlotNumber: acctResp.SlotNumber,
|
|
|
+ BlockTime: acctResp.BlockTime,
|
|
|
+ BlockHash: acctResp.BlockHash,
|
|
|
+ Results: results,
|
|
|
+ }
|
|
|
+
|
|
|
+ // Finally, publish the result.
|
|
|
+ pub.w.ccqSendQueryResponse(query.CreatePerChainQueryResponseInternal(pub.queryRequest.RequestID, pub.queryRequest.RequestIdx, pub.queryRequest.Request.ChainId, query.QuerySuccess, resp))
|
|
|
+}
|
|
|
+
|
|
|
type M map[string]interface{}
|
|
|
|
|
|
// getMultipleAccountsWithOpts is a work-around for the fact that the library call doesn't honor MinContextSlot.
|