소스 검색

Node/Acct: code review rework

Change-Id: Ib45aec98f855f3e4cff4dabf2d5597ce19b060c9
Bruce Riley 2 년 전
부모
커밋
90545f7c2a
6개의 변경된 파일208개의 추가작업 그리고 131개의 파일을 삭제
  1. 61 21
      node/pkg/accountant/accountant.go
  2. 98 85
      node/pkg/accountant/audit.go
  3. 30 17
      node/pkg/accountant/query_test.go
  4. 11 2
      node/pkg/accountant/submit_obs.go
  5. 0 3
      node/pkg/processor/processor.go
  6. 8 3
      node/pkg/wormconn/query.go

+ 61 - 21
node/pkg/accountant/accountant.go

@@ -47,14 +47,22 @@ type (
 
 	// pendingEntry is the payload for each pending transfer
 	pendingEntry struct {
-		msg     *common.MessagePublication
-		msgId   string
-		digest  string
-		updTime time.Time
-
-		// submitPending indicates if the observation is either in the channel waiting to be submitted or in an outstanding transaction.
-		// The audit should not resubmit anything where submitPending is set to true.
-		submitPending bool
+		msg    *common.MessagePublication
+		msgId  string
+		digest string
+
+		// stateLock is used to protect the contents of the state struct.
+		stateLock sync.Mutex
+
+		// The state struct contains anything that can be modifed. It is protected by the state lock.
+		state struct {
+			// updTime is the time that the state struct was last updated.
+			updTime time.Time
+
+			// submitPending indicates if the observation is either in the channel waiting to be submitted or in an outstanding transaction.
+			// The audit should not resubmit anything where submitPending is set to true.
+			submitPending bool
+		}
 	}
 )
 
@@ -76,7 +84,6 @@ type Accountant struct {
 	pendingTransfersLock sync.Mutex
 	pendingTransfers     map[string]*pendingEntry // Key is the message ID (emitterChain/emitterAddr/seqNo)
 	subChan              chan *common.MessagePublication
-	lastAuditTime        time.Time
 	env                  int
 }
 
@@ -162,6 +169,10 @@ func (acct *Accountant) Start(ctx context.Context) error {
 		if err := supervisor.Run(ctx, "acctwatcher", common.WrapWithScissors(acct.watcher, "acctwatcher")); err != nil {
 			return fmt.Errorf("failed to start watcher: %w", err)
 		}
+
+		if err := supervisor.Run(ctx, "acctaudit", common.WrapWithScissors(acct.audit, "acctaudit")); err != nil {
+			return fmt.Errorf("failed to start audit worker: %w", err)
+		}
 	}
 
 	return nil
@@ -224,7 +235,7 @@ func (acct *Accountant) SubmitObservation(msg *common.MessagePublication) (bool,
 	}
 
 	// Add it to the pending map and the database.
-	pe := &pendingEntry{msg: msg, msgId: msgId, digest: digest, updTime: time.Now()}
+	pe := &pendingEntry{msg: msg, msgId: msgId, digest: digest}
 	if err := acct.addPendingTransferAlreadyLocked(pe); err != nil {
 		acct.logger.Error("acct: failed to persist pending transfer, blocking publishing", zap.String("msgID", msgId), zap.Error(err))
 		return false, err
@@ -233,7 +244,7 @@ func (acct *Accountant) SubmitObservation(msg *common.MessagePublication) (bool,
 	// This transaction may take a while. Pass it off to the worker so we don't block the processor.
 	if acct.env != GoTestMode {
 		acct.logger.Info("acct: submitting transfer to accountant for approval", zap.String("msgID", msgId), zap.Bool("canPublish", !acct.enforceFlag))
-		acct.submitObservation(pe)
+		_ = acct.submitObservation(pe)
 	}
 
 	// If we are not enforcing accountant, the event can be published. Otherwise we have to wait to hear back from the contract.
@@ -253,6 +264,7 @@ func (acct *Accountant) publishTransferAlreadyLocked(pe *pendingEntry) {
 // addPendingTransferAlreadyLocked adds a pending transfer to both the map and the database. It assumes the caller holds the lock.
 func (acct *Accountant) addPendingTransferAlreadyLocked(pe *pendingEntry) error {
 	acct.logger.Debug("acct: addPendingTransferAlreadyLocked", zap.String("msgId", pe.msgId))
+	pe.state.updTime = time.Now()
 	if err := acct.db.AcctStorePendingTransfer(pe.msg); err != nil {
 		return err
 	}
@@ -273,8 +285,8 @@ func (acct *Accountant) deletePendingTransfer(msgId string) {
 func (acct *Accountant) deletePendingTransferAlreadyLocked(msgId string) {
 	acct.logger.Debug("acct: deletePendingTransfer", zap.String("msgId", msgId))
 	if _, exists := acct.pendingTransfers[msgId]; exists {
-		transfersOutstanding.Set(float64(len(acct.pendingTransfers)))
 		delete(acct.pendingTransfers, msgId)
+		transfersOutstanding.Set(float64(len(acct.pendingTransfers)))
 	}
 	if err := acct.db.AcctDeletePendingTransfer(msgId); err != nil {
 		acct.logger.Error("acct: failed to delete pending transfer from the db", zap.String("msgId", msgId), zap.Error(err))
@@ -294,7 +306,8 @@ func (acct *Accountant) loadPendingTransfers() error {
 		acct.logger.Info("acct: reloaded pending transfer", zap.String("msgID", msgId))
 
 		digest := msg.CreateDigest()
-		pe := &pendingEntry{msg: msg, msgId: msgId, digest: digest} // Leave the updTime unset so we will query this on the first audit interval.
+		pe := &pendingEntry{msg: msg, msgId: msgId, digest: digest}
+		pe.state.updTime = time.Now()
 		acct.pendingTransfers[msgId] = pe
 	}
 
@@ -308,28 +321,55 @@ func (acct *Accountant) loadPendingTransfers() error {
 	return nil
 }
 
-// submitObservation sends an observation request to the worker so it can be submited to the contract.
-// If writing to the channel would block, this function resets the timestamp on the entry so it will be
-// retried next audit interval. This method assumes the caller holds the lock.
-func (acct *Accountant) submitObservation(pe *pendingEntry) {
-	pe.submitPending = true
+// submitObservation sends an observation request to the worker so it can be submited to the contract.  If the transfer is already
+// marked as "submit pending", this function returns false without doing anything. Otherwise it returns true. The return value can
+// be used to avoid unnecessary error logging. If writing to the channel would block, this function returns without doing anything,
+// assuming the pending transfer will be handled on the next audit interval. This function grabs the state lock.
+func (acct *Accountant) submitObservation(pe *pendingEntry) bool {
+	pe.stateLock.Lock()
+	defer pe.stateLock.Unlock()
+
+	if pe.state.submitPending {
+		return false
+	}
+
+	pe.state.submitPending = true
+	pe.state.updTime = time.Now()
+
 	select {
 	case acct.subChan <- pe.msg:
 		acct.logger.Debug("acct: submitted observation to channel", zap.String("msgId", pe.msgId))
 	default:
 		acct.logger.Error("acct: unable to submit observation because the channel is full, will try next interval", zap.String("msgId", pe.msgId))
-		pe.submitPending = false
-		pe.updTime = time.Time{}
+		pe.state.submitPending = false
 	}
+
+	return true
 }
 
 // clearSubmitPendingFlags is called after a batch is finished being submitted (success or fail). It clears the submit pending flag for everything in the batch.
+// It grabs the pending transfer and state locks.
 func (acct *Accountant) clearSubmitPendingFlags(msgs []*common.MessagePublication) {
 	acct.pendingTransfersLock.Lock()
 	defer acct.pendingTransfersLock.Unlock()
 	for _, msg := range msgs {
 		if pe, exists := acct.pendingTransfers[msg.MessageIDString()]; exists {
-			pe.submitPending = false
+			pe.setSubmitPending(false)
 		}
 	}
 }
+
+// setSubmitPending sets the submit pending flag on the pending transfer object to the specified value. It grabs the state lock.
+func (pe *pendingEntry) setSubmitPending(val bool) {
+	pe.stateLock.Lock()
+	defer pe.stateLock.Unlock()
+	pe.state.submitPending = val
+	pe.state.updTime = time.Now()
+}
+
+// updTime returns the last update time from the pending transfer object. It grabs the state lock.
+func (pe *pendingEntry) updTime() time.Time {
+	pe.stateLock.Lock()
+	defer pe.stateLock.Unlock()
+	return pe.state.updTime
+}

+ 98 - 85
node/pkg/accountant/audit.go

@@ -1,5 +1,5 @@
-// This code audits the set of pending transfers against the state reported by the smart contract. It is called from the processor every minute,
-// but the audit is performed less frequently. The audit occurs in two phases that operate off of a temporary map of all pending transfers known to this guardian.
+// This code audits the set of pending transfers against the state reported by the smart contract. It has a runnable that is started when the accountant initializes.
+// It uses a ticker to periodically run the audit. The audit occurs in two phases that operate off of a temporary map of all pending transfers known to this guardian.
 //
 // The first phase involves querying the smart contract for any observations that it thinks are missing for this guardian. The audit processes everything in the
 // returned results and does one of the following:
@@ -7,9 +7,9 @@
 // - If the observation is not in the temporary map, we request a reobservation from the local watcher.
 //
 // The second phase consists of requesting the status from the contract for everything that is still in the temporary map. For each returned item, we do the following:
-// - If the contract indicates that the transfer has been committed, we validate the digest, then publish it and delete it from the map.
+// - If the contract indicates that the transfer has been committed, we validate the digest, then publish the transfer and delete it from the map.
 // - If the contract indicates that the transfer is pending, we continue to wait for it to be committed.
-// - If the contract indicates any other status (most likely meaning it does not know about it), we resubmit an observation to the contract
+// - If the contract indicates any other status (most likely meaning it does not know about it), we resubmit an observation to the contract.
 //
 // Note that any time we are considering resubmitting an observation to the contract, we first check the "submit pending" flag. If that is set, we do not
 // submit the observation to the contract, but continue to wait for it to work its way through the queue.
@@ -17,6 +17,7 @@
 package accountant
 
 import (
+	"context"
 	"encoding/hex"
 	"encoding/json"
 	"fmt"
@@ -27,14 +28,12 @@ import (
 
 	cosmossdk "github.com/cosmos/cosmos-sdk/types"
 
-	ethCommon "github.com/ethereum/go-ethereum/common"
-
 	"go.uber.org/zap"
 )
 
 const (
-	// auditInterval indicates how often the audit runs (given that it is invoked by the processor once per minute)
-	auditInterval = 5 * time.Minute
+	// auditInterval indicates how often the audit runs.
+	auditInterval = 1 * time.Minute // TODO: Dont commit this!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
 
 	// maxSubmitPendingTime indicates how long a transfer can be in the submit pending state before the audit starts complaining about it.
 	maxSubmitPendingTime = 30 * time.Minute
@@ -43,34 +42,39 @@ const (
 type (
 	// MissingObservationsResponse is the result returned from the "missing_observations" query.
 	MissingObservationsResponse struct {
-		Missing []MissingObservation
+		Missing []MissingObservation `json:"missing"`
 	}
 
+	// MissingObservation is what is returned for a single missing observation.
 	MissingObservation struct {
 		ChainId uint16 `json:"chain_id"`
 		TxHash  []byte `json:"tx_hash"`
 	}
 
-	// BatchTransferStatusResponse is the result returned by the "batch_transfer_status" query.
+	// BatchTransferStatusResponse contains the details returned by the "batch_transfer_status" query.
 	BatchTransferStatusResponse struct {
 		Details []TransferDetails `json:"details"`
 	}
 
+	// TransferDetails contains the details returned for a single transfer.
 	TransferDetails struct {
-		Key    TransferKey
-		Status TransferStatus
+		Key    TransferKey     `json:"key"`
+		Status *TransferStatus `json:"status"`
 	}
 
+	// TransferStatus contains the status returned for a transfer.
 	TransferStatus struct {
 		Committed *TransferStatusCommitted `json:"committed"`
 		Pending   *TransferStatusPending   `json:"pending"`
 	}
 
+	// TransferStatusCommitted contains the data returned for a committed transfer.
 	TransferStatusCommitted struct {
 		Data   TransferData `json:"data"`
 		Digest []byte       `json:"digest"`
 	}
 
+	// TransferData contains the detailed data returned for a committed transfer.
 	TransferData struct {
 		Amount         *cosmossdk.Int `json:"amount"`
 		TokenChain     uint16         `json:"token_chain"`
@@ -78,10 +82,16 @@ type (
 		RecipientChain uint16         `json:"recipient_chain"`
 	}
 
+	// TransferStatusPending contains the data returned for a committed transfer.
 	TransferStatusPending struct {
+		// TODO: Fill this in once we get a sample.
 	}
 )
 
+func (mo MissingObservation) String() string {
+	return fmt.Sprintf("%d-%s", mo.ChainId, hex.EncodeToString(mo.TxHash))
+}
+
 // makeAuditKey creates an audit map key from a missing observation.
 func (mo *MissingObservation) makeAuditKey() string {
 	return fmt.Sprintf("%d-%s", mo.ChainId, hex.EncodeToString(mo.TxHash[:]))
@@ -92,31 +102,52 @@ func (pe *pendingEntry) makeAuditKey() string {
 	return fmt.Sprintf("%d-%s", pe.msg.EmitterChain, pe.msg.TxHash.String())
 }
 
-// AuditPendingTransfers is the entry point for the audit of the pending transfer map. It determines if it has been long enough since the last audit.
-// If so, it creates a temporary map of all pending transfers and invokes the main audit function as a go routine.
-func (acct *Accountant) AuditPendingTransfers() {
+// audit is the runnable that executes the audit each interval.
+func (acct *Accountant) audit(ctx context.Context) error {
+	ticker := time.NewTicker(auditInterval)
+	defer ticker.Stop()
+
+	for {
+		select {
+		case <-ctx.Done():
+			return nil
+		case <-ticker.C:
+			acct.runAudit()
+		}
+	}
+}
+
+// runAudit is the entry point for the audit of the pending transfer map. It creates a temporary map of all pending transfers and invokes the main audit function.
+func (acct *Accountant) runAudit() {
+	tmpMap := acct.createAuditMap()
+	acct.logger.Debug("acctaudit: in AuditPendingTransfers: starting audit", zap.Int("numPending", len(tmpMap)))
+	acct.performAudit(tmpMap)
+	acct.logger.Debug("acctaudit: leaving AuditPendingTransfers")
+}
+
+// createAuditMap creates a temporary map of all pending transfers. It grabs the pending transfer lock.
+func (acct *Accountant) createAuditMap() map[string]*pendingEntry {
 	acct.pendingTransfersLock.Lock()
 	defer acct.pendingTransfersLock.Unlock()
 
-	if time.Since(acct.lastAuditTime) < auditInterval {
-		acct.logger.Debug("acctaudit: in AuditPendingTransfers, not time to run yet", zap.Stringer("lastAuditTime", acct.lastAuditTime))
-		return
-	}
-
 	tmpMap := make(map[string]*pendingEntry)
 	for _, pe := range acct.pendingTransfers {
-		if (pe.submitPending) && (time.Since(pe.updTime) > maxSubmitPendingTime) {
+		if pe.hasBeenPendingForTooLong() {
 			auditErrors.Inc()
-			acct.logger.Error("acctaudit: transfer has been in the submit pending state for too long", zap.Stringer("lastUpdateTime", pe.updTime))
+			acct.logger.Error("acctaudit: transfer has been in the submit pending state for too long", zap.Stringer("lastUpdateTime", pe.updTime()))
 		}
-		acct.logger.Debug("acctaudit: will audit pending transfer", zap.String("msgId", pe.msgId), zap.Stringer("lastUpdateTime", pe.updTime))
+		acct.logger.Debug("acctaudit: will audit pending transfer", zap.String("msgId", pe.msgId), zap.Stringer("lastUpdateTime", pe.updTime()))
 		tmpMap[pe.makeAuditKey()] = pe
 	}
 
-	acct.logger.Debug("acctaudit: in AuditPendingTransfers: starting audit", zap.Int("numPending", len(tmpMap)))
-	acct.lastAuditTime = time.Now()
-	go acct.performAudit(tmpMap)
-	acct.logger.Debug("acctaudit: leaving AuditPendingTransfers")
+	return tmpMap
+}
+
+// hasBeenPendingForTooLong determines if a transfer has been in the "submit pending" state for too long.
+func (pe *pendingEntry) hasBeenPendingForTooLong() bool {
+	pe.stateLock.Lock()
+	defer pe.stateLock.Unlock()
+	return pe.state.submitPending && time.Since(pe.state.updTime) > maxSubmitPendingTime
 }
 
 // performAudit audits the temporary map against the smart contract. It is meant to be run in a go routine. It takes a temporary map of all pending transfers
@@ -132,23 +163,20 @@ func (acct *Accountant) performAudit(tmpMap map[string]*pendingEntry) {
 		return
 	}
 
-	if len(missingObservations) != 0 {
-		for _, mo := range missingObservations {
-			key := mo.makeAuditKey()
-			pe, exists := tmpMap[key]
-			if exists {
-				if !pe.submitPending {
-					auditErrors.Inc()
-					acct.logger.Error("acctaudit: contract reported pending observation as missing, resubmitting it", zap.String("msgID", pe.msgId))
-					acct.submitObservation(pe)
-				} else {
-					acct.logger.Info("acctaudit: contract reported pending observation as missing but it is queued up to be submitted, skipping it", zap.String("msgID", pe.msgId))
-				}
-
-				delete(tmpMap, key)
+	for _, mo := range missingObservations {
+		key := mo.makeAuditKey()
+		pe, exists := tmpMap[key]
+		if exists {
+			if acct.submitObservation(pe) {
+				auditErrors.Inc()
+				acct.logger.Error("acctaudit: contract reported pending observation as missing, resubmitting it", zap.String("msgID", pe.msgId))
 			} else {
-				acct.handleMissingObservation(mo)
+				acct.logger.Info("acctaudit: contract reported pending observation as missing but it is queued up to be submitted, skipping it", zap.String("msgID", pe.msgId))
 			}
+
+			delete(tmpMap, key)
+		} else {
+			acct.handleMissingObservation(mo)
 		}
 	}
 
@@ -170,21 +198,20 @@ func (acct *Accountant) performAudit(tmpMap map[string]*pendingEntry) {
 		}
 
 		for _, pe := range pendingTransfers {
-			item, exists := transferDetails[pe.msgId]
+			status, exists := transferDetails[pe.msgId]
 			if !exists {
-				if !pe.submitPending {
+				if acct.submitObservation(pe) {
 					auditErrors.Inc()
 					acct.logger.Error("acctaudit: query did not return status for transfer, this should not happen, resubmitting it", zap.String("msgId", pe.msgId))
-					acct.submitObservation(pe)
 				} else {
-					acct.logger.Debug("acctaudit: query did not return status for transfer we have not submitted yet, ignoring it", zap.String("msgId", pe.msgId))
+					acct.logger.Info("acctaudit: query did not return status for transfer we have not submitted yet, ignoring it", zap.String("msgId", pe.msgId))
 				}
 
 				continue
 			}
 
-			if item.Status.Committed != nil {
-				digest := hex.EncodeToString(item.Status.Committed.Digest)
+			if status.Committed != nil {
+				digest := hex.EncodeToString(status.Committed.Digest)
 				if pe.digest == digest {
 					acct.logger.Info("acctaudit: audit determined that transfer has been committed, publishing it", zap.String("msgId", pe.msgId))
 					acct.handleCommittedTransfer(pe.msgId)
@@ -193,12 +220,14 @@ func (acct *Accountant) performAudit(tmpMap map[string]*pendingEntry) {
 					acct.logger.Error("acctaudit: audit detected a digest mismatch, dropping transfer", zap.String("msgId", pe.msgId), zap.String("ourDigest", pe.digest), zap.String("reportedDigest", digest))
 					acct.deletePendingTransfer(pe.msgId)
 				}
-			} else if item.Status.Pending != nil {
+			} else if status.Pending != nil {
 				acct.logger.Debug("acctaudit: contract says transfer is still pending", zap.String("msgId", pe.msgId))
-			} else if !pe.submitPending {
-				auditErrors.Inc()
-				acct.logger.Error("acctaudit: contract does not know about pending transfer, resubmitting it", zap.String("msgId", pe.msgId))
-				acct.submitObservation(pe)
+			} else {
+				// This is the case when the contract does not know about a transfer. Resubmit it.
+				if acct.submitObservation(pe) {
+					auditErrors.Inc()
+					acct.logger.Error("acctaudit: contract does not know about pending transfer, resubmitting it", zap.String("msgId", pe.msgId))
+				}
 			}
 		}
 	}
@@ -206,33 +235,17 @@ func (acct *Accountant) performAudit(tmpMap map[string]*pendingEntry) {
 	acct.logger.Debug("acctaudit: exiting performAudit")
 }
 
-// handleMissingObservation submits a reobservation request if appropriate.
+// handleMissingObservation submits a local reobservation request. It relies on the reobservation code to throttle requests.
 func (acct *Accountant) handleMissingObservation(mo MissingObservation) {
-	// It's possible we received this transfer after we built the temporary map. If so, we don't want to do a reobservation.
-	if acct.transferNowExists(mo) {
-		acct.logger.Debug("acctaudit: contract reported unknown observation as missing but it is now in our pending map, ignoring it", zap.Uint16("chainId", mo.ChainId), zap.String("txHash", hex.EncodeToString(mo.TxHash)))
-		return
-	}
-
-	acct.logger.Debug("acctaudit: contract reported unknown observation as missing, requesting reobservation", zap.Uint16("chainId", mo.ChainId), zap.String("txHash", hex.EncodeToString(mo.TxHash)))
+	acct.logger.Info("acctaudit: contract reported unknown observation as missing, requesting local reobservation", zap.Stringer("moKey", mo))
 	msg := &gossipv1.ObservationRequest{ChainId: uint32(mo.ChainId), TxHash: mo.TxHash}
-	acct.obsvReqWriteC <- msg
-}
 
-// transferNowExists checks to see if a missed observation exists in the pending transfer map. It grabs the lock.
-func (acct *Accountant) transferNowExists(mo MissingObservation) bool {
-	acct.pendingTransfersLock.Lock()
-	defer acct.pendingTransfersLock.Unlock()
-
-	chanId := vaa.ChainID(mo.ChainId)
-	txHash := ethCommon.BytesToHash(mo.TxHash)
-	for _, pe := range acct.pendingTransfers {
-		if (pe.msg.EmitterChain == chanId) && (pe.msg.TxHash == txHash) {
-			return true
-		}
+	select {
+	case acct.obsvReqWriteC <- msg:
+		acct.logger.Debug("acct: submitted local reobservation", zap.Stringer("moKey", mo))
+	default:
+		acct.logger.Error("acct: unable to submit local reobservation because the channel is full, will try next interval", zap.Stringer("moKey", mo))
 	}
-
-	return false
 }
 
 // queryMissingObservations queries the contract for the set of observations it thinks are missing for this guardian.
@@ -249,22 +262,22 @@ func (acct *Accountant) queryMissingObservations() ([]MissingObservation, error)
 
 	query := fmt.Sprintf(`{"missing_observations":{"guardian_set": %d, "index": %d}}`, gs.Index, guardianIndex)
 	acct.logger.Debug("acctaudit: submitting missing_observations query", zap.String("query", query))
-	resp, err := acct.wormchainConn.SubmitQuery(acct.ctx, acct.contract, []byte(query))
+	respBytes, err := acct.wormchainConn.SubmitQuery(acct.ctx, acct.contract, []byte(query))
 	if err != nil {
 		return nil, fmt.Errorf("missing_observations query failed: %w", err)
 	}
 
 	var ret MissingObservationsResponse
-	if err := json.Unmarshal(resp.Data, &ret); err != nil {
+	if err := json.Unmarshal(respBytes, &ret); err != nil {
 		return nil, fmt.Errorf("failed to parse missing_observations response: %w", err)
 	}
 
-	acct.logger.Debug("acctaudit: missing_observations query response", zap.Int("numEntries", len(ret.Missing)), zap.String("result", string(resp.Data)))
+	acct.logger.Debug("acctaudit: missing_observations query response", zap.Int("numEntries", len(ret.Missing)), zap.String("result", string(respBytes)))
 	return ret.Missing, nil
 }
 
 // queryBatchTransferStatus queries the status of the specified transfers and returns a map keyed by transfer key (as a string) to the status.
-func (acct *Accountant) queryBatchTransferStatus(keys []TransferKey) (map[string]TransferDetails, error) {
+func (acct *Accountant) queryBatchTransferStatus(keys []TransferKey) (map[string]TransferStatus, error) {
 	bytes, err := json.Marshal(keys)
 	if err != nil {
 		return nil, fmt.Errorf("failed to marshal keys: %w", err)
@@ -272,21 +285,21 @@ func (acct *Accountant) queryBatchTransferStatus(keys []TransferKey) (map[string
 
 	query := fmt.Sprintf(`{"batch_transfer_status":%s}`, string(bytes))
 	acct.logger.Debug("acctaudit: submitting batch_transfer_status query", zap.String("query", query))
-	resp, err := acct.wormchainConn.SubmitQuery(acct.ctx, acct.contract, []byte(query))
+	respBytes, err := acct.wormchainConn.SubmitQuery(acct.ctx, acct.contract, []byte(query))
 	if err != nil {
 		return nil, fmt.Errorf("batch_transfer_status query failed: %w", err)
 	}
 
 	var response BatchTransferStatusResponse
-	if err := json.Unmarshal(resp.Data, &response); err != nil {
+	if err := json.Unmarshal(respBytes, &response); err != nil {
 		return nil, fmt.Errorf("failed to unmarshal response: %w", err)
 	}
 
-	ret := make(map[string]TransferDetails)
+	ret := make(map[string]TransferStatus)
 	for _, item := range response.Details {
-		ret[item.Key.String()] = item
+		ret[item.Key.String()] = *item.Status
 	}
 
-	acct.logger.Debug("acctaudit: batch_transfer_status query response", zap.Int("numEntries", len(ret)), zap.String("result", string(resp.Data)))
+	acct.logger.Debug("acctaudit: batch_transfer_status query response", zap.Int("numEntries", len(ret)), zap.String("result", string(respBytes)))
 	return ret, nil
 }

+ 30 - 17
node/pkg/accountant/query_test.go

@@ -18,12 +18,12 @@ func TestParseMissingObservationsResponse(t *testing.T) {
 	//TODO: Write this test once we get a sample response.
 }
 
-func TestParseBatchTransferStatusResponse(t *testing.T) {
-	responsesJson := []byte("{\"details\":[{\"key\":{\"emitter_chain\":2,\"emitter_address\":\"0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16\",\"sequence\":1674568234},\"status\":{\"committed\":{\"data\":{\"amount\":\"1000000000000000000\",\"token_chain\":2,\"token_address\":\"0000000000000000000000002d8be6bf0baa74e0a907016679cae9190e80dd0a\",\"recipient_chain\":4},\"digest\":\"1nbbff/7/ai9GJUs4h2JymFuO4+XcasC6t05glXc99M=\"}}},{\"key\":{\"emitter_chain\":2,\"emitter_address\":\"0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16\",\"sequence\":1674484597},\"status\":null}]}")
+func TestParseBatchTransferStatusCommittedResponse(t *testing.T) {
+	responsesJson := []byte("{\"details\":[{\"key\":{\"emitter_chain\":2,\"emitter_address\":\"0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16\",\"sequence\":1674568234},\"status\":{\"committed\":{\"data\":{\"amount\":\"1000000000000000000\",\"token_chain\":2,\"token_address\":\"0000000000000000000000002d8be6bf0baa74e0a907016679cae9190e80dd0a\",\"recipient_chain\":4},\"digest\":\"1nbbff/7/ai9GJUs4h2JymFuO4+XcasC6t05glXc99M=\"}}}]}")
 	var response BatchTransferStatusResponse
 	err := json.Unmarshal(responsesJson, &response)
 	require.NoError(t, err)
-	require.Equal(t, 2, len(response.Details))
+	require.Equal(t, 1, len(response.Details))
 
 	expectedEmitterAddress, err := vaa.StringToAddress("0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16")
 	require.NoError(t, err)
@@ -31,44 +31,57 @@ func TestParseBatchTransferStatusResponse(t *testing.T) {
 	expectedTokenAddress, err := vaa.StringToAddress("0000000000000000000000002d8be6bf0baa74e0a907016679cae9190e80dd0a")
 	require.NoError(t, err)
 
-	expectedAmount0 := cosmossdk.NewInt(1000000000000000000)
+	expectedAmount := cosmossdk.NewInt(1000000000000000000)
 
-	expectedDigest0, err := hex.DecodeString("d676db7dfffbfda8bd18952ce21d89ca616e3b8f9771ab02eadd398255dcf7d3")
+	expectedDigest, err := hex.DecodeString("d676db7dfffbfda8bd18952ce21d89ca616e3b8f9771ab02eadd398255dcf7d3")
 	require.NoError(t, err)
 
-	expectedResult0 := TransferDetails{
+	expectedResult := TransferDetails{
 		Key: TransferKey{
 			EmitterChain:   uint16(vaa.ChainIDEthereum),
 			EmitterAddress: expectedEmitterAddress,
 			Sequence:       1674568234,
 		},
-		Status: TransferStatus{
+		Status: &TransferStatus{
 			Committed: &TransferStatusCommitted{
 				Data: TransferData{
-					Amount:         &expectedAmount0,
+					Amount:         &expectedAmount,
 					TokenChain:     uint16(vaa.ChainIDEthereum),
 					TokenAddress:   expectedTokenAddress,
 					RecipientChain: uint16(vaa.ChainIDBSC),
 				},
-				Digest: expectedDigest0,
+				Digest: expectedDigest,
 			},
 		},
 	}
 
-	expectedResult1 := TransferDetails{
+	// Use DeepEqual() because the response contains pointers.
+	assert.True(t, reflect.DeepEqual(expectedResult, response.Details[0]))
+}
+
+func TestParseBatchTransferStatusNotFoundResponse(t *testing.T) {
+	responsesJson := []byte("{\"details\":[{\"key\":{\"emitter_chain\":2,\"emitter_address\":\"0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16\",\"sequence\":1674484597},\"status\":null}]}")
+	var response BatchTransferStatusResponse
+	err := json.Unmarshal(responsesJson, &response)
+	require.NoError(t, err)
+	require.Equal(t, 1, len(response.Details))
+
+	expectedEmitterAddress, err := vaa.StringToAddress("0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16")
+	require.NoError(t, err)
+
+	expectedResult := TransferDetails{
 		Key: TransferKey{
 			EmitterChain:   uint16(vaa.ChainIDEthereum),
 			EmitterAddress: expectedEmitterAddress,
 			Sequence:       1674484597,
 		},
-		Status: TransferStatus{},
+		Status: nil,
 	}
 
-	require.NotNil(t, response.Details[0].Status.Committed)
-	require.Nil(t, response.Details[0].Status.Pending)
-	assert.True(t, reflect.DeepEqual(expectedResult0, response.Details[0]))
-
 	// Use DeepEqual() because the response contains pointers.
-	assert.True(t, reflect.DeepEqual(expectedResult0, response.Details[0]))
-	assert.True(t, reflect.DeepEqual(expectedResult1, response.Details[1]))
+	assert.True(t, reflect.DeepEqual(expectedResult, response.Details[0]))
+}
+
+func TestParseBatchTransferStatusPendingResponse(t *testing.T) {
+	//TODO: Write this test once we get a sample response.
 }

+ 11 - 2
node/pkg/accountant/submit_obs.go

@@ -52,7 +52,9 @@ func (acct *Accountant) handleBatch(ctx context.Context) error {
 		return fmt.Errorf("failed to read messages from `acct.subChan`: %w", err)
 	}
 
-	msgs = acct.removeCompleted(msgs)
+	if len(msgs) != 0 {
+		msgs = acct.removeCompleted(msgs)
+	}
 
 	if len(msgs) == 0 {
 		return nil
@@ -91,6 +93,9 @@ func readFromChannel[T any](ctx context.Context, ch <-chan T, count int) ([]T, e
 // removeCompleted drops any messages that are no longer in the pending transfer map. This is to handle the case where the contract reports
 // that a transfer is committed while it is in the channel. There is no point in submitting the observation once the transfer is committed.
 func (acct *Accountant) removeCompleted(msgs []*common.MessagePublication) []*common.MessagePublication {
+	acct.pendingTransfersLock.Lock()
+	defer acct.pendingTransfersLock.Unlock()
+
 	out := make([]*common.MessagePublication, 0, len(msgs))
 	for _, msg := range msgs {
 		if _, exists := acct.pendingTransfers[msg.MessageIDString()]; exists {
@@ -188,7 +193,6 @@ func (sb SignatureBytes) MarshalJSON() ([]byte, error) {
 // It should be called from a go routine because it can block.
 func (acct *Accountant) submitObservationsToContract(msgs []*common.MessagePublication, gsIndex uint32, guardianIndex uint32) {
 	txResp, err := SubmitObservationsToContract(acct.ctx, acct.logger, acct.gk, gsIndex, guardianIndex, acct.wormchainConn, acct.contract, msgs)
-	acct.clearSubmitPendingFlags(msgs)
 	if err != nil {
 		// This means the whole batch failed. They will all get retried the next audit cycle.
 		acct.logger.Error("acct: failed to submit any observations in batch", zap.Int("numMsgs", len(msgs)), zap.Error(err))
@@ -197,6 +201,7 @@ func (acct *Accountant) submitObservationsToContract(msgs []*common.MessagePubli
 		}
 
 		submitFailures.Add(float64(len(msgs)))
+		acct.clearSubmitPendingFlags(msgs)
 		return
 	}
 
@@ -209,6 +214,7 @@ func (acct *Accountant) submitObservationsToContract(msgs []*common.MessagePubli
 		}
 
 		submitFailures.Add(float64(len(msgs)))
+		acct.clearSubmitPendingFlags(msgs)
 		return
 	}
 
@@ -220,6 +226,7 @@ func (acct *Accountant) submitObservationsToContract(msgs []*common.MessagePubli
 		}
 
 		submitFailures.Add(float64(len(msgs)))
+		acct.clearSubmitPendingFlags(msgs)
 		return
 	}
 
@@ -248,6 +255,8 @@ func (acct *Accountant) submitObservationsToContract(msgs []*common.MessagePubli
 			submitFailures.Inc()
 		}
 	}
+
+	acct.clearSubmitPendingFlags(msgs)
 }
 
 // handleCommittedTransfer updates the pending map and publishes a committed transfer. It grabs the lock.

+ 0 - 3
node/pkg/processor/processor.go

@@ -260,9 +260,6 @@ func (p *Processor) Run(ctx context.Context) error {
 					}
 				}
 			}
-			if p.acct != nil {
-				p.acct.AuditPendingTransfers()
-			}
 			if (p.governor != nil) || (p.acct != nil) {
 				govTimer = time.NewTimer(time.Minute)
 			}

+ 8 - 3
node/pkg/wormconn/query.go

@@ -8,12 +8,17 @@ import (
 )
 
 // SubmitQuery submits a query to a smart contract and returns the result.
-func (c *ClientConn) SubmitQuery(ctx context.Context, contractAddress string, query []byte) (*wasmdtypes.QuerySmartContractStateResponse, error) {
+func (c *ClientConn) SubmitQuery(ctx context.Context, contractAddress string, query []byte) ([]byte, error) {
 	req := wasmdtypes.QuerySmartContractStateRequest{Address: contractAddress, QueryData: query}
 	qc := wasmdtypes.NewQueryClient(c.c)
 	if qc == nil {
-		return nil, fmt.Errorf("failed to create query client")
+		return []byte{}, fmt.Errorf("failed to create query client")
 	}
 
-	return qc.SmartContractState(ctx, &req)
+	resp, err := qc.SmartContractState(ctx, &req)
+	if err != nil {
+		return []byte{}, err
+	}
+
+	return resp.Data, nil
 }