accountant.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512
  1. // The accountant package manages the interface to the accountant smart contract on wormchain. It is passed all VAAs before
  2. // they are signed and published. It determines if the VAA is for a token bridge transfer, and if it is, it submits an observation
  3. // request to the accountant contract. When that happens, the VAA is queued up until the accountant contract responds indicating
  4. // that the VAA has been approved. If the VAA is approved, this module will forward the VAA back to the processor loop to be signed
  5. // and published.
  6. package accountant
  7. import (
  8. "context"
  9. "fmt"
  10. "sync"
  11. "time"
  12. "github.com/certusone/wormhole/node/pkg/common"
  13. guardianDB "github.com/certusone/wormhole/node/pkg/db"
  14. "github.com/certusone/wormhole/node/pkg/guardiansigner"
  15. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  16. "github.com/certusone/wormhole/node/pkg/supervisor"
  17. sdktypes "github.com/cosmos/cosmos-sdk/types"
  18. sdktx "github.com/cosmos/cosmos-sdk/types/tx"
  19. "github.com/wormhole-foundation/wormhole/sdk"
  20. "github.com/wormhole-foundation/wormhole/sdk/vaa"
  21. ethCommon "github.com/ethereum/go-ethereum/common"
  22. ethCrypto "github.com/ethereum/go-ethereum/crypto"
  23. "go.uber.org/zap"
  24. )
  25. // MsgChannelCapacity specifies the capacity of the message channel used to publish messages released from the accountant.
  26. // This channel should not back up, but if it does, the accountant will start dropping messages, which would require reobservations.
  27. const MsgChannelCapacity = 5 * batchSize
  28. type (
  29. AccountantWormchainConn interface {
  30. Close()
  31. SenderAddress() string
  32. SubmitQuery(ctx context.Context, contractAddress string, query []byte) ([]byte, error)
  33. SignAndBroadcastTx(ctx context.Context, msg sdktypes.Msg) (*sdktx.BroadcastTxResponse, error)
  34. BroadcastTxResponseToString(txResp *sdktx.BroadcastTxResponse) string
  35. }
  36. // emitterKey is the key to a map of emitters to be monitored
  37. emitterKey struct {
  38. emitterChainId vaa.ChainID
  39. emitterAddr vaa.Address
  40. }
  41. // validEmitters is a set of supported emitter chain / address pairs. The payload is the enforcement flag.
  42. validEmitters map[emitterKey]bool
  43. // pendingEntry is the payload for each pending transfer
  44. pendingEntry struct {
  45. msg *common.MessagePublication
  46. msgId string
  47. digest string
  48. isNTT bool
  49. enforceFlag bool
  50. // stateLock is used to protect the contents of the state struct.
  51. stateLock sync.Mutex
  52. // The state struct contains anything that can be modified. It is protected by the state lock.
  53. state struct {
  54. // updTime is the time that the state struct was last updated.
  55. updTime time.Time
  56. // submitPending indicates if the observation is either in the channel waiting to be submitted or in an outstanding transaction.
  57. // The audit should not resubmit anything where submitPending is set to true.
  58. submitPending bool
  59. }
  60. }
  61. )
  62. // Accountant is the object that manages the interface to the wormchain accountant smart contract.
  63. type Accountant struct {
  64. ctx context.Context
  65. logger *zap.Logger
  66. db guardianDB.AccountantDB
  67. obsvReqWriteC chan<- *gossipv1.ObservationRequest
  68. contract string
  69. wsUrl string
  70. wormchainConn AccountantWormchainConn
  71. enforceFlag bool
  72. guardianSigner guardiansigner.GuardianSigner
  73. gst *common.GuardianSetState
  74. guardianAddr ethCommon.Address
  75. msgChan chan<- *common.MessagePublication
  76. tokenBridges validEmitters
  77. pendingTransfersLock sync.Mutex
  78. pendingTransfers map[string]*pendingEntry // Key is the message ID (emitterChain/emitterAddr/seqNo)
  79. subChan chan *common.MessagePublication
  80. env common.Environment
  81. nttContract string
  82. nttWormchainConn AccountantWormchainConn
  83. nttDirectEmitters validEmitters
  84. nttArEmitters validEmitters
  85. nttSubChan chan *common.MessagePublication
  86. }
  87. // On startup, there can be a large number of re-submission requests.
  88. const subChanSize = 500
  89. // baseEnabled returns true if the base accountant is enabled, false if not.
  90. func (acct *Accountant) baseEnabled() bool {
  91. return acct.contract != ""
  92. }
  93. // NewAccountant creates a new instance of the Accountant object.
  94. func NewAccountant(
  95. ctx context.Context,
  96. logger *zap.Logger,
  97. db guardianDB.AccountantDB,
  98. obsvReqWriteC chan<- *gossipv1.ObservationRequest,
  99. contract string, // the address of the smart contract on wormchain
  100. wsUrl string, // the URL of the wormchain websocket interface
  101. wormchainConn AccountantWormchainConn, // used for communicating with the smart contract
  102. enforceFlag bool, // whether or not accountant should be enforced
  103. nttContract string, // the address of the NTT smart contract on wormchain
  104. nttWormchainConn AccountantWormchainConn, // used for communicating with the NTT smart contract
  105. guardianSigner guardiansigner.GuardianSigner, // the guardian signer used for signing observation requests
  106. gst *common.GuardianSetState, // used to get the current guardian set index when sending observation requests
  107. msgChan chan<- *common.MessagePublication, // the channel where transfers received by the accountant runnable should be published
  108. env common.Environment, // Controls the set of token bridges to be monitored
  109. ) *Accountant {
  110. return &Accountant{
  111. ctx: ctx,
  112. logger: logger.With(zap.String("component", "gacct")),
  113. db: db,
  114. obsvReqWriteC: obsvReqWriteC,
  115. contract: contract,
  116. wsUrl: wsUrl,
  117. wormchainConn: wormchainConn,
  118. enforceFlag: enforceFlag,
  119. guardianSigner: guardianSigner,
  120. gst: gst,
  121. guardianAddr: ethCrypto.PubkeyToAddress(guardianSigner.PublicKey(ctx)),
  122. msgChan: msgChan,
  123. tokenBridges: make(validEmitters),
  124. pendingTransfers: make(map[string]*pendingEntry),
  125. subChan: make(chan *common.MessagePublication, subChanSize),
  126. env: env,
  127. nttContract: nttContract,
  128. nttWormchainConn: nttWormchainConn,
  129. nttDirectEmitters: make(validEmitters),
  130. nttArEmitters: make(validEmitters),
  131. nttSubChan: make(chan *common.MessagePublication, subChanSize),
  132. }
  133. }
  134. // Start initializes the accountant and starts the worker and watcher runnables.
  135. func (acct *Accountant) Start(ctx context.Context) error {
  136. acct.logger.Debug("entering Start", zap.Bool("enforceFlag", acct.enforceFlag), zap.Bool("baseEnabled", acct.baseEnabled()), zap.Bool("nttEnabled", acct.nttEnabled()))
  137. acct.pendingTransfersLock.Lock()
  138. defer acct.pendingTransfersLock.Unlock()
  139. if !acct.baseEnabled() && !acct.nttEnabled() {
  140. return fmt.Errorf("start should not be called when neither base nor NTT accountant are enabled")
  141. }
  142. if acct.baseEnabled() {
  143. emitterMap := sdk.KnownTokenbridgeEmitters
  144. if acct.env == common.TestNet {
  145. emitterMap = sdk.KnownTestnetTokenbridgeEmitters
  146. } else if acct.env == common.UnsafeDevNet || acct.env == common.GoTest || acct.env == common.AccountantMock {
  147. emitterMap = sdk.KnownDevnetTokenbridgeEmitters
  148. }
  149. // Build the map of token bridges to be monitored.
  150. for chainId, emitterAddrBytes := range emitterMap {
  151. emitterAddr, err := vaa.BytesToAddress(emitterAddrBytes)
  152. if err != nil {
  153. return fmt.Errorf("failed to convert emitter address for chain: %v", chainId)
  154. }
  155. tbk := emitterKey{emitterChainId: chainId, emitterAddr: emitterAddr}
  156. _, exists := acct.tokenBridges[tbk]
  157. if exists {
  158. return fmt.Errorf("detected duplicate token bridge for chain: %v", chainId)
  159. }
  160. acct.tokenBridges[tbk] = acct.enforceFlag
  161. acct.logger.Info("will monitor token bridge:", zap.Stringer("emitterChainId", tbk.emitterChainId), zap.Stringer("emitterAddr", tbk.emitterAddr))
  162. }
  163. }
  164. // The NTT data structures should be set up before we reload from the db.
  165. if acct.nttEnabled() {
  166. if err := acct.nttStart(ctx); err != nil {
  167. return fmt.Errorf("failed to start ntt accountant: %w", err)
  168. }
  169. }
  170. // Load any existing pending transfers from the db.
  171. if err := acct.loadPendingTransfers(); err != nil {
  172. return fmt.Errorf("failed to load pending transfers from the db: %w", err)
  173. }
  174. // Start the watcher to listen to transfer events from the smart contract.
  175. if acct.baseEnabled() {
  176. if acct.env == common.AccountantMock {
  177. // We're not in a runnable context, so we can't use supervisor.
  178. go func() {
  179. _ = acct.baseWorker(ctx)
  180. }()
  181. } else if acct.env != common.GoTest {
  182. if err := supervisor.Run(ctx, "acctworker", common.WrapWithScissors(acct.baseWorker, "acctworker")); err != nil {
  183. return fmt.Errorf("failed to start submit observation worker: %w", err)
  184. }
  185. if err := supervisor.Run(ctx, "acctwatcher", common.WrapWithScissors(acct.baseWatcher, "acctwatcher")); err != nil {
  186. return fmt.Errorf("failed to start watcher: %w", err)
  187. }
  188. if err := supervisor.Run(ctx, "acctaudit", common.WrapWithScissors(acct.audit, "acctaudit")); err != nil {
  189. return fmt.Errorf("failed to start audit worker: %w", err)
  190. }
  191. }
  192. }
  193. return nil
  194. }
  195. func (acct *Accountant) Close() {
  196. if acct.wormchainConn != nil {
  197. acct.wormchainConn.Close()
  198. acct.wormchainConn = nil
  199. }
  200. if acct.nttWormchainConn != nil {
  201. acct.nttWormchainConn.Close()
  202. acct.nttWormchainConn = nil
  203. }
  204. }
  205. func (acct *Accountant) FeatureString() string {
  206. var ret string
  207. if !acct.enforceFlag {
  208. ret = "acct-logonly"
  209. } else {
  210. ret = "acct"
  211. }
  212. if acct.nttEnabled() {
  213. if ret != "" {
  214. ret += ":"
  215. }
  216. ret += "ntt-acct"
  217. }
  218. return ret
  219. }
  220. // IsMessageCoveredByAccountant returns `true` if a message should be processed by the Global Accountant, `false` if not.
  221. func (acct *Accountant) IsMessageCoveredByAccountant(msg *common.MessagePublication) bool {
  222. ret, _, _ := acct.isMessageCoveredByAccountant(msg)
  223. return ret
  224. }
  225. // isMessageCoveredByAccountant returns true if a message should be processed by the Global Accountant, false if not.
  226. // It also returns whether or not it is a Native Token Transfer and whether or not accounting is being enforced for this emitter.
  227. func (acct *Accountant) isMessageCoveredByAccountant(msg *common.MessagePublication) (bool, bool, bool) {
  228. isTBT, enforceFlag := acct.isTokenBridgeTransfer(msg)
  229. if isTBT {
  230. return true, false, enforceFlag
  231. }
  232. isNTT, enforceFlag := nttIsMsgDirectNTT(msg, acct.nttDirectEmitters)
  233. if isNTT {
  234. return true, true, enforceFlag
  235. }
  236. isNTT, enforceFlag = nttIsMsgArNTT(msg, acct.nttArEmitters, acct.nttDirectEmitters)
  237. if isNTT {
  238. return true, true, enforceFlag
  239. }
  240. return false, false, false
  241. }
  242. // isTokenBridgeTransfer returns true if a message is a token bridge transfer and whether or not accounting is being enforced for this emitter.
  243. func (acct *Accountant) isTokenBridgeTransfer(msg *common.MessagePublication) (bool, bool) {
  244. msgId := msg.MessageIDString()
  245. // We only care about token bridges.
  246. enforceFlag, exists := acct.tokenBridges[emitterKey{emitterChainId: msg.EmitterChain, emitterAddr: msg.EmitterAddress}]
  247. if !exists {
  248. return false, false
  249. }
  250. // We only care about transfers.
  251. if !vaa.IsTransfer(msg.Payload) {
  252. acct.logger.Info("ignoring vaa because it is not a transfer", zap.String("msgID", msgId))
  253. return false, false
  254. }
  255. return true, enforceFlag
  256. }
  257. // SubmitObservation will submit token bridge transfers to the accountant smart contract. This is called from the processor
  258. // loop when a local observation is received from a watcher. It returns true if the observation can be published immediately,
  259. // false if not (because it has been submitted to the accountant).
  260. func (acct *Accountant) SubmitObservation(msg *common.MessagePublication) (bool, error) {
  261. msgId := msg.MessageIDString()
  262. acct.logger.Debug("in SubmitObservation", zap.String("msgID", msgId))
  263. coveredByAcct, isNTT, enforceFlag := acct.isMessageCoveredByAccountant(msg)
  264. if !coveredByAcct {
  265. return true, nil
  266. }
  267. digest := msg.CreateDigest()
  268. acct.pendingTransfersLock.Lock()
  269. defer acct.pendingTransfersLock.Unlock()
  270. // If this is already pending, don't send it again.
  271. if oldEntry, exists := acct.pendingTransfers[msgId]; exists {
  272. if oldEntry.digest != digest {
  273. digestMismatches.Inc()
  274. acct.logger.Error("digest in pending transfer has changed, dropping it",
  275. zap.String("msgID", msgId),
  276. zap.String("oldDigest", oldEntry.digest),
  277. zap.String("newDigest", digest),
  278. zap.Bool("enforcing", enforceFlag),
  279. )
  280. } else {
  281. acct.logger.Info("blocking transfer because it is already outstanding", zap.String("msgID", msgId), zap.Bool("enforcing", enforceFlag))
  282. }
  283. return !enforceFlag, nil
  284. }
  285. // Add it to the pending map and the database.
  286. pe := &pendingEntry{msg: msg, msgId: msgId, digest: digest, isNTT: isNTT, enforceFlag: enforceFlag}
  287. if err := acct.addPendingTransferAlreadyLocked(pe); err != nil {
  288. acct.logger.Error("failed to persist pending transfer, blocking publishing", zap.String("msgID", msgId), zap.Error(err))
  289. return false, err
  290. }
  291. // This transaction may take a while. Pass it off to the worker so we don't block the processor.
  292. if acct.env != common.GoTest {
  293. tag := "accountant"
  294. if isNTT {
  295. tag = "ntt-accountant"
  296. }
  297. acct.logger.Info(fmt.Sprintf("submitting transfer to %s for approval", tag), zap.String("msgID", msgId), zap.Bool("canPublish", !enforceFlag))
  298. _ = acct.submitObservation(pe)
  299. }
  300. // If we are not enforcing accountant, the event can be published. Otherwise we have to wait to hear back from the contract.
  301. return !enforceFlag, nil
  302. }
  303. // publishTransferAlreadyLocked publishes a pending transfer to the accountant channel and deletes it from the pending map. It assumes the caller holds the lock.
  304. func (acct *Accountant) publishTransferAlreadyLocked(pe *pendingEntry) {
  305. if pe.enforceFlag {
  306. select {
  307. case acct.msgChan <- pe.msg:
  308. acct.logger.Debug("published transfer to channel", zap.String("msgId", pe.msgId))
  309. default:
  310. acct.logger.Error("unable to publish transfer because the channel is full", zap.String("msgId", pe.msgId))
  311. }
  312. }
  313. acct.deletePendingTransferAlreadyLocked(pe.msgId)
  314. }
  315. // addPendingTransferAlreadyLocked adds a pending transfer to both the map and the database. It assumes the caller holds the lock.
  316. func (acct *Accountant) addPendingTransferAlreadyLocked(pe *pendingEntry) error {
  317. pe.setUpdTime()
  318. if err := acct.db.AcctStorePendingTransfer(pe.msg); err != nil {
  319. return err
  320. }
  321. acct.pendingTransfers[pe.msgId] = pe
  322. transfersOutstanding.Set(float64(len(acct.pendingTransfers)))
  323. return nil
  324. }
  325. // deletePendingTransfer deletes the transfer from both the map and the database. It accquires the lock.
  326. func (acct *Accountant) deletePendingTransfer(msgId string) {
  327. acct.pendingTransfersLock.Lock()
  328. defer acct.pendingTransfersLock.Unlock()
  329. acct.deletePendingTransferAlreadyLocked(msgId)
  330. }
  331. // deletePendingTransferAlreadyLocked deletes the transfer from both the map and the database. It assumes the caller holds the lock.
  332. func (acct *Accountant) deletePendingTransferAlreadyLocked(msgId string) {
  333. acct.logger.Debug("deletePendingTransfer", zap.String("msgId", msgId))
  334. if _, exists := acct.pendingTransfers[msgId]; exists {
  335. delete(acct.pendingTransfers, msgId)
  336. transfersOutstanding.Set(float64(len(acct.pendingTransfers)))
  337. }
  338. if err := acct.db.AcctDeletePendingTransfer(msgId); err != nil {
  339. acct.logger.Error("failed to delete pending transfer from the db", zap.String("msgId", msgId), zap.Error(err))
  340. // Ignore this error and keep going.
  341. }
  342. }
  343. // loadPendingTransfers loads any pending transfers that are present in the database. This method assumes the caller holds the lock.
  344. func (acct *Accountant) loadPendingTransfers() error {
  345. pendingTransfers, err := acct.db.AcctGetData(acct.logger)
  346. if err != nil {
  347. return err
  348. }
  349. for _, msg := range pendingTransfers {
  350. msgId := msg.MessageIDString()
  351. coveredByAcct, isNTT, enforceFlag := acct.isMessageCoveredByAccountant(msg)
  352. if !coveredByAcct {
  353. acct.logger.Error("dropping reloaded pending transfer because it is not covered by the accountant", zap.String("msgID", msgId))
  354. if err := acct.db.AcctDeletePendingTransfer(msgId); err != nil {
  355. acct.logger.Error("failed to delete pending transfer from the db", zap.String("msgId", msgId), zap.Error(err))
  356. // Ignore this error and keep going.
  357. }
  358. continue
  359. }
  360. acct.logger.Info("reloaded pending transfer", zap.String("msgID", msgId))
  361. digest := msg.CreateDigest()
  362. pe := &pendingEntry{msg: msg, msgId: msgId, digest: digest, isNTT: isNTT, enforceFlag: enforceFlag}
  363. pe.setUpdTime()
  364. acct.pendingTransfers[msgId] = pe
  365. }
  366. transfersOutstanding.Set(float64(len(acct.pendingTransfers)))
  367. if len(acct.pendingTransfers) != 0 {
  368. acct.logger.Info("reloaded pending transfers", zap.Int("total", len(acct.pendingTransfers)))
  369. } else {
  370. acct.logger.Info("no pending transfers to be reloaded")
  371. }
  372. return nil
  373. }
  374. // submitObservation sends an observation request to the worker so it can be submitted to the contract. If the transfer is already
  375. // marked as "submit pending", this function returns false without doing anything. Otherwise it returns true. The return value can
  376. // be used to avoid unnecessary error logging. If writing to the channel would block, this function returns without doing anything,
  377. // assuming the pending transfer will be handled on the next audit interval. This function grabs the state lock.
  378. func (acct *Accountant) submitObservation(pe *pendingEntry) bool {
  379. pe.stateLock.Lock()
  380. defer pe.stateLock.Unlock()
  381. if pe.state.submitPending {
  382. return false
  383. }
  384. pe.state.submitPending = true
  385. pe.state.updTime = time.Now()
  386. if pe.isNTT {
  387. acct.submitToChannel(pe, acct.nttSubChan, "ntt-accountant")
  388. } else {
  389. acct.submitToChannel(pe, acct.subChan, "accountant")
  390. }
  391. return true
  392. }
  393. // submitToChannel submits an observation to the specified channel. If the submission fails because the channel is full,
  394. // it marks the transfer as pending so it will be resubmitted by the audit.
  395. func (acct *Accountant) submitToChannel(pe *pendingEntry, subChan chan *common.MessagePublication, tag string) {
  396. select {
  397. case subChan <- pe.msg:
  398. acct.logger.Debug(fmt.Sprintf("submitted observation to channel for %s", tag), zap.String("msgId", pe.msgId))
  399. default:
  400. acct.logger.Error(fmt.Sprintf("unable to submit observation to %s because the channel is full, will try next interval", tag), zap.String("msgId", pe.msgId))
  401. pe.state.submitPending = false
  402. }
  403. }
  404. // clearSubmitPendingFlags is called after a batch is finished being submitted (success or fail). It clears the submit pending flag for everything in the batch.
  405. // It grabs the pending transfer and state locks.
  406. func (acct *Accountant) clearSubmitPendingFlags(msgs []*common.MessagePublication) {
  407. acct.pendingTransfersLock.Lock()
  408. defer acct.pendingTransfersLock.Unlock()
  409. for _, msg := range msgs {
  410. if pe, exists := acct.pendingTransfers[msg.MessageIDString()]; exists {
  411. pe.setSubmitPending(false)
  412. }
  413. }
  414. }
  415. // setSubmitPending sets the submit pending flag on the pending transfer object to the specified value. It grabs the state lock.
  416. func (pe *pendingEntry) setSubmitPending(val bool) {
  417. pe.stateLock.Lock()
  418. defer pe.stateLock.Unlock()
  419. pe.state.submitPending = val
  420. pe.state.updTime = time.Now()
  421. }
  422. // submitPending returns the "submit pending" flag from the pending transfer object. It grabs the state lock.
  423. func (pe *pendingEntry) submitPending() bool {
  424. pe.stateLock.Lock()
  425. defer pe.stateLock.Unlock()
  426. return pe.state.submitPending
  427. }
  428. // setUpdTime sets the last update time on the pending transfer object to the current time. It grabs the state lock.
  429. func (pe *pendingEntry) setUpdTime() {
  430. pe.stateLock.Lock()
  431. defer pe.stateLock.Unlock()
  432. pe.state.updTime = time.Now()
  433. }
  434. // updTime returns the last update time from the pending transfer object. It grabs the state lock.
  435. func (pe *pendingEntry) updTime() time.Time {
  436. pe.stateLock.Lock()
  437. defer pe.stateLock.Unlock()
  438. return pe.state.updTime
  439. }