governor_test.go 33 KB


  1. package db
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "fmt"
  6. "math"
  7. "os"
  8. "sort"
  9. "testing"
  10. "time"
  11. "github.com/certusone/wormhole/node/pkg/common"
  12. "github.com/dgraph-io/badger/v3"
  13. eth_common "github.com/ethereum/go-ethereum/common"
  14. "github.com/stretchr/testify/assert"
  15. "github.com/stretchr/testify/require"
  16. "github.com/wormhole-foundation/wormhole/sdk/vaa"
  17. "go.uber.org/zap"
  18. )
  19. func (d *Database) rowExistsInDB(key []byte) error {
  20. return d.db.View(func(txn *badger.Txn) error {
  21. _, err := txn.Get(key)
  22. return err
  23. })
  24. }
  25. func TestSerializeAndDeserializeOfTransfer(t *testing.T) {
  26. tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
  27. require.NoError(t, err)
  28. ethereumTokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
  29. require.NoError(t, err)
  30. bscTokenBridgeAddr, err := vaa.StringToAddress("0x26b4afb60d6c903165150c6f0aa14f8016be4aec")
  31. require.NoError(t, err)
  32. xfer1 := &Transfer{
  33. Timestamp: time.Unix(int64(1654516425), 0),
  34. Value: 125000,
  35. OriginChain: vaa.ChainIDEthereum,
  36. OriginAddress: tokenAddr,
  37. EmitterChain: vaa.ChainIDEthereum,
  38. EmitterAddress: ethereumTokenBridgeAddr,
  39. TargetChain: vaa.ChainIDBSC,
  40. TargetAddress: bscTokenBridgeAddr,
  41. MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415",
  42. Hash: "Hash1",
  43. }
  44. xfer1Bytes, err := xfer1.Marshal()
  45. require.NoError(t, err)
  46. xfer2, err := UnmarshalTransfer(xfer1Bytes)
  47. require.NoError(t, err)
  48. assert.Equal(t, xfer1, xfer2)
  49. expectedTransferKey := "GOV:XFER3:2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"
  50. assert.Equal(t, expectedTransferKey, string(TransferMsgID(xfer2)))
  51. }
  52. func TestPendingMsgID(t *testing.T) {
  53. ethereumTokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
  54. require.NoError(t, err)
  55. msg1 := &common.MessagePublication{
  56. TxID: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(),
  57. Timestamp: time.Unix(int64(1654516425), 0),
  58. Nonce: 123456,
  59. Sequence: 789101112131415,
  60. EmitterChain: vaa.ChainIDEthereum,
  61. EmitterAddress: ethereumTokenBridgeAddr,
  62. Payload: []byte{},
  63. ConsistencyLevel: 16,
  64. }
  65. assert.Equal(t, []byte("GOV:PENDING4:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"), PendingMsgID(msg1))
  66. }
  67. func TestTransferMsgID(t *testing.T) {
  68. tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
  69. require.NoError(t, err)
  70. ethereumTokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
  71. require.NoError(t, err)
  72. bscTokenBridgeAddr, err := vaa.StringToAddress("0x26b4afb60d6c903165150c6f0aa14f8016be4aec")
  73. require.NoError(t, err)
  74. xfer := &Transfer{
  75. Timestamp: time.Unix(int64(1654516425), 0),
  76. Value: 125000,
  77. OriginChain: vaa.ChainIDEthereum,
  78. OriginAddress: tokenAddr,
  79. EmitterChain: vaa.ChainIDEthereum,
  80. EmitterAddress: ethereumTokenBridgeAddr,
  81. TargetChain: vaa.ChainIDBSC,
  82. TargetAddress: bscTokenBridgeAddr,
  83. MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415",
  84. Hash: "Hash1",
  85. }
  86. assert.Equal(t, []byte("GOV:XFER3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"), TransferMsgID(xfer))
  87. }
  88. func TestIsTransfer(t *testing.T) {
  89. assert.Equal(t, true, IsTransfer([]byte("GOV:XFER3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
  90. assert.Equal(t, false, IsTransfer([]byte("GOV:XFER3:")))
  91. assert.Equal(t, false, IsTransfer([]byte("GOV:XFER3:1")))
  92. assert.Equal(t, false, IsTransfer([]byte("GOV:XFER3:1/1/1")))
  93. assert.Equal(t, false, IsTransfer([]byte("GOV:XFER3:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/")))
  94. assert.Equal(t, true, IsTransfer([]byte("GOV:XFER3:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0")))
  95. assert.Equal(t, false, IsTransfer([]byte("GOV:PENDING:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
  96. assert.Equal(t, false, IsTransfer([]byte{0x01, 0x02, 0x03, 0x04}))
  97. assert.Equal(t, false, IsTransfer([]byte{}))
  98. assert.Equal(t, true, isOldTransfer([]byte("GOV:XFER2:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
  99. assert.Equal(t, false, isOldTransfer([]byte("GOV:XFER3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
  100. }
  101. func TestIsPendingMsg(t *testing.T) {
  102. assert.Equal(t, true, IsPendingMsg([]byte("GOV:PENDING4:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
  103. assert.Equal(t, false, IsPendingMsg([]byte("GOV:XFER3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
  104. assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING4:")))
  105. assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING4:"+"1")))
  106. assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING4:"+"1/1/1")))
  107. assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING4:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/")))
  108. assert.Equal(t, true, IsPendingMsg([]byte("GOV:PENDING4:"+"1/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/0")))
  109. assert.Equal(t, false, IsPendingMsg([]byte("GOV:PENDING3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
  110. assert.Equal(t, false, IsPendingMsg([]byte{0x01, 0x02, 0x03, 0x04}))
  111. assert.Equal(t, false, IsPendingMsg([]byte{}))
  112. assert.Equal(t, true, isOldPendingMsg([]byte("GOV:PENDING3:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
  113. assert.Equal(t, false, isOldPendingMsg([]byte("GOV:PENDING4:"+"2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415")))
  114. }
  115. func TestGetChainGovernorData(t *testing.T) {
  116. dbPath := t.TempDir()
  117. db := OpenDb(zap.NewNop(), &dbPath)
  118. defer db.Close()
  119. logger := zap.NewNop()
  120. transfers, pending, err2 := db.GetChainGovernorData(logger)
  121. assert.Equal(t, []*Transfer(nil), transfers)
  122. assert.Equal(t, []*PendingTransfer(nil), pending)
  123. require.NoError(t, err2)
  124. }
  125. func TestStoreTransfer(t *testing.T) {
  126. dbPath := t.TempDir()
  127. db := OpenDb(zap.NewNop(), &dbPath)
  128. defer db.Close()
  129. tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
  130. require.NoError(t, err)
  131. ethereumTokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
  132. require.NoError(t, err)
  133. bscTokenBridgeAddr, err := vaa.StringToAddress("0x26b4afb60d6c903165150c6f0aa14f8016be4aec")
  134. require.NoError(t, err)
  135. xfer1 := &Transfer{
  136. Timestamp: time.Unix(int64(1654516425), 0),
  137. Value: 125000,
  138. OriginChain: vaa.ChainIDEthereum,
  139. OriginAddress: tokenAddr,
  140. EmitterChain: vaa.ChainIDEthereum,
  141. EmitterAddress: ethereumTokenBridgeAddr,
  142. TargetChain: vaa.ChainIDBSC,
  143. TargetAddress: bscTokenBridgeAddr,
  144. MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415",
  145. Hash: "Hash1",
  146. }
  147. err2 := db.StoreTransfer(xfer1)
  148. require.NoError(t, err2)
  149. }
  150. func TestDeleteTransfer(t *testing.T) {
  151. dbPath := t.TempDir()
  152. db := OpenDb(zap.NewNop(), &dbPath)
  153. defer db.Close()
  154. tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
  155. require.NoError(t, err)
  156. ethereumTokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
  157. require.NoError(t, err)
  158. bscTokenBridgeAddr, err := vaa.StringToAddress("0x26b4afb60d6c903165150c6f0aa14f8016be4aec")
  159. require.NoError(t, err)
  160. xfer1 := &Transfer{
  161. Timestamp: time.Unix(int64(1654516425), 0),
  162. Value: 125000,
  163. OriginChain: vaa.ChainIDEthereum,
  164. OriginAddress: tokenAddr,
  165. EmitterChain: vaa.ChainIDEthereum,
  166. EmitterAddress: ethereumTokenBridgeAddr,
  167. TargetChain: vaa.ChainIDBSC,
  168. TargetAddress: bscTokenBridgeAddr,
  169. MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415",
  170. Hash: "Hash1",
  171. }
  172. err2 := db.StoreTransfer(xfer1)
  173. require.NoError(t, err2)
  174. // Make sure the xfer exists in the db.
  175. assert.NoError(t, db.rowExistsInDB(TransferMsgID(xfer1)))
  176. err3 := db.DeleteTransfer(xfer1)
  177. require.NoError(t, err3)
  178. // Make sure the xfer is no longer in the db.
  179. assert.ErrorIs(t, badger.ErrKeyNotFound, db.rowExistsInDB(TransferMsgID(xfer1)))
  180. }
  181. func TestStorePendingMsg(t *testing.T) {
  182. dbPath := t.TempDir()
  183. db := OpenDb(zap.NewNop(), &dbPath)
  184. defer db.Close()
  185. tokenBridgeAddr, err2 := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
  186. assert.NoError(t, err2)
  187. msg := &common.MessagePublication{
  188. TxID: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(),
  189. Timestamp: time.Unix(int64(1654516425), 0),
  190. Nonce: 123456,
  191. Sequence: 789101112131415,
  192. EmitterChain: vaa.ChainIDEthereum,
  193. EmitterAddress: tokenBridgeAddr,
  194. Payload: []byte{},
  195. ConsistencyLevel: 16,
  196. }
  197. pending := &PendingTransfer{ReleaseTime: msg.Timestamp.Add(time.Hour * 72), Msg: *msg}
  198. err3 := db.StorePendingMsg(pending)
  199. require.NoError(t, err3)
  200. }
  201. func TestDeletePendingMsg(t *testing.T) {
  202. dbPath := t.TempDir()
  203. db := OpenDb(zap.NewNop(), &dbPath)
  204. defer db.Close()
  205. tokenBridgeAddr, err2 := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
  206. assert.NoError(t, err2)
  207. msg := &common.MessagePublication{
  208. TxID: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(),
  209. Timestamp: time.Unix(int64(1654516425), 0),
  210. Nonce: 123456,
  211. Sequence: 789101112131415,
  212. EmitterChain: vaa.ChainIDEthereum,
  213. EmitterAddress: tokenBridgeAddr,
  214. Payload: []byte{},
  215. ConsistencyLevel: 16,
  216. }
  217. pending := &PendingTransfer{ReleaseTime: msg.Timestamp.Add(time.Hour * 72), Msg: *msg}
  218. err3 := db.StorePendingMsg(pending)
  219. require.NoError(t, err3)
  220. // Make sure the pending transfer exists in the db.
  221. assert.NoError(t, db.rowExistsInDB(PendingMsgID(msg)))
  222. err4 := db.DeletePendingMsg(pending)
  223. assert.Nil(t, err4)
  224. // Make sure the pending transfer is no longer in the db.
  225. assert.ErrorIs(t, badger.ErrKeyNotFound, db.rowExistsInDB(PendingMsgID(msg)))
  226. }
  227. func TestSerializeAndDeserializeOfPendingTransfer(t *testing.T) {
  228. tokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
  229. require.NoError(t, err)
  230. msg := common.MessagePublication{
  231. TxID: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(),
  232. Timestamp: time.Unix(int64(1654516425), 0),
  233. Nonce: 123456,
  234. Sequence: 789101112131415,
  235. EmitterChain: vaa.ChainIDEthereum,
  236. EmitterAddress: tokenBridgeAddr,
  237. Payload: []byte{4, 0, 0, 0, 0, 0, 0, 0, 0, 0},
  238. ConsistencyLevel: 16,
  239. IsReobservation: true,
  240. }
  241. pending1 := &PendingTransfer{
  242. ReleaseTime: time.Unix(int64(1654516425+72*60*60), 0),
  243. Msg: msg,
  244. }
  245. pending1Bytes, err := pending1.Marshal()
  246. require.NoError(t, err)
  247. pending2, err := UnmarshalPendingTransfer(pending1Bytes, false)
  248. require.NoError(t, err)
  249. assert.Equal(t, pending1, pending2)
  250. expectedPendingKey := "GOV:PENDING4:2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"
  251. assert.Equal(t, expectedPendingKey, string(PendingMsgID(&pending2.Msg)))
  252. }
  253. func TestStoreAndReloadTransfers(t *testing.T) {
  254. dbPath := t.TempDir()
  255. db := OpenDb(zap.NewNop(), &dbPath)
  256. defer db.Close()
  257. defer os.Remove(dbPath)
  258. ethereumTokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
  259. require.NoError(t, err)
  260. bscTokenBridgeAddr, err := vaa.StringToAddress("0x26b4afb60d6c903165150c6f0aa14f8016be4aec")
  261. require.NoError(t, err)
  262. tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
  263. require.NoError(t, err)
  264. xfer1 := &Transfer{
  265. Timestamp: time.Unix(int64(1654516425), 0),
  266. Value: 125000,
  267. OriginChain: vaa.ChainIDEthereum,
  268. OriginAddress: tokenAddr,
  269. EmitterChain: vaa.ChainIDEthereum,
  270. EmitterAddress: ethereumTokenBridgeAddr,
  271. TargetChain: vaa.ChainIDBSC,
  272. TargetAddress: bscTokenBridgeAddr,
  273. MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415",
  274. Hash: "Hash1",
  275. }
  276. err = db.StoreTransfer(xfer1)
  277. assert.Nil(t, err)
  278. xfer2 := &Transfer{
  279. Timestamp: time.Unix(int64(1654516430), 0),
  280. Value: 125000,
  281. OriginChain: vaa.ChainIDEthereum,
  282. OriginAddress: tokenAddr,
  283. EmitterChain: vaa.ChainIDEthereum,
  284. EmitterAddress: ethereumTokenBridgeAddr,
  285. TargetChain: vaa.ChainIDBSC,
  286. TargetAddress: bscTokenBridgeAddr,
  287. MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131416",
  288. Hash: "Hash2",
  289. }
  290. err = db.StoreTransfer(xfer2)
  291. assert.Nil(t, err)
  292. pending1 := &PendingTransfer{
  293. ReleaseTime: time.Unix(int64(1654516435+72*60*60), 0),
  294. Msg: common.MessagePublication{
  295. TxID: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(),
  296. Timestamp: time.Unix(int64(1654516435), 0),
  297. Nonce: 123456,
  298. Sequence: 789101112131417,
  299. EmitterChain: vaa.ChainIDEthereum,
  300. EmitterAddress: ethereumTokenBridgeAddr,
  301. Payload: []byte{4, 0, 0, 0, 0, 0, 0, 0, 0, 0},
  302. ConsistencyLevel: 16,
  303. },
  304. }
  305. err = db.StorePendingMsg(pending1)
  306. assert.Nil(t, err)
  307. pending2 := &PendingTransfer{
  308. ReleaseTime: time.Unix(int64(1654516440+72*60*60), 0),
  309. Msg: common.MessagePublication{
  310. TxID: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(),
  311. Timestamp: time.Unix(int64(1654516440), 0),
  312. Nonce: 123456,
  313. Sequence: 789101112131418,
  314. EmitterChain: vaa.ChainIDEthereum,
  315. EmitterAddress: ethereumTokenBridgeAddr,
  316. Payload: []byte{4, 0, 0, 0, 0, 0, 0, 0, 0, 0},
  317. ConsistencyLevel: 16,
  318. },
  319. }
  320. err = db.StorePendingMsg(pending2)
  321. assert.Nil(t, err)
  322. logger := zap.NewNop()
  323. xfers, pending, err := db.GetChainGovernorData(logger)
  324. assert.Nil(t, err)
  325. assert.Equal(t, 2, len(xfers))
  326. assert.Equal(t, 2, len(pending))
  327. assert.Equal(t, xfer1, xfers[0])
  328. assert.Equal(t, xfer2, xfers[1])
  329. assert.Equal(t, pending1, pending[0])
  330. assert.Equal(t, pending2, pending[1])
  331. }
  332. func TestMarshalUnmarshalNoMsgIdOrHash(t *testing.T) {
  333. tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
  334. require.NoError(t, err)
  335. ethereumTokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
  336. require.NoError(t, err)
  337. bscTokenBridgeAddr, err := vaa.StringToAddress("0x26b4afb60d6c903165150c6f0aa14f8016be4aec")
  338. require.NoError(t, err)
  339. xfer1 := &Transfer{
  340. Timestamp: time.Unix(int64(1654516425), 0),
  341. Value: 125000,
  342. OriginChain: vaa.ChainIDEthereum,
  343. OriginAddress: tokenAddr,
  344. EmitterChain: vaa.ChainIDEthereum,
  345. EmitterAddress: ethereumTokenBridgeAddr,
  346. TargetChain: vaa.ChainIDBSC,
  347. TargetAddress: bscTokenBridgeAddr,
  348. // Don't set MsgID or Hash, should handle empty slices.
  349. }
  350. xfer1Bytes, err := xfer1.Marshal()
  351. require.NoError(t, err)
  352. xfer2, err := UnmarshalTransfer(xfer1Bytes)
  353. require.NoError(t, err)
  354. require.Equal(t, xfer1, xfer2)
  355. }
  356. // Note that Transfer.Marshal can't fail, so there are no negative tests for that.
  357. func TestUnmarshalTransferFailures(t *testing.T) {
  358. tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
  359. require.NoError(t, err)
  360. ethereumTokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
  361. require.NoError(t, err)
  362. bscTokenBridgeAddr, err := vaa.StringToAddress("0x26b4afb60d6c903165150c6f0aa14f8016be4aec")
  363. require.NoError(t, err)
  364. xfer1 := &Transfer{
  365. Timestamp: time.Unix(int64(1654516425), 0),
  366. Value: 125000,
  367. OriginChain: vaa.ChainIDEthereum,
  368. OriginAddress: tokenAddr,
  369. EmitterChain: vaa.ChainIDEthereum,
  370. EmitterAddress: ethereumTokenBridgeAddr,
  371. TargetChain: vaa.ChainIDBSC,
  372. TargetAddress: bscTokenBridgeAddr,
  373. MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415",
  374. Hash: "Hash1",
  375. }
  376. xfer1Bytes, err := xfer1.Marshal()
  377. require.NoError(t, err)
  378. // First make sure regular unmarshal works.
  379. xfer2, err := UnmarshalTransfer(xfer1Bytes)
  380. require.NoError(t, err)
  381. require.Equal(t, xfer1, xfer2)
  382. // Truncate the timestamp.
  383. _, err = UnmarshalTransfer(xfer1Bytes[0 : 4-1])
  384. assert.ErrorContains(t, err, "failed to read timestamp: ")
  385. // Truncate the value.
  386. _, err = UnmarshalTransfer(xfer1Bytes[0 : 4+8-1])
  387. assert.ErrorContains(t, err, "failed to read value: ")
  388. // Truncate the origin chain.
  389. _, err = UnmarshalTransfer(xfer1Bytes[0 : 4+8+2-1])
  390. assert.ErrorContains(t, err, "failed to read origin chain id: ")
  391. // Truncate the origin address.
  392. _, err = UnmarshalTransfer(xfer1Bytes[0 : 4+8+2+32-1])
  393. assert.ErrorContains(t, err, "failed to read origin address")
  394. // Truncate the emitter chain.
  395. _, err = UnmarshalTransfer(xfer1Bytes[0 : 4+8+2+32+2-1])
  396. assert.ErrorContains(t, err, "failed to read emitter chain id: ")
  397. // Truncate the emitter address.
  398. _, err = UnmarshalTransfer(xfer1Bytes[0 : 4+8+2+32+2+32-1])
  399. assert.ErrorContains(t, err, "failed to read emitter address")
  400. // Truncate the message ID length.
  401. _, err = UnmarshalTransfer(xfer1Bytes[0 : 4+8+2+32+2+32+2-1])
  402. assert.ErrorContains(t, err, "failed to read msgID length: ")
  403. // Truncate the message ID data.
  404. _, err = UnmarshalTransfer(xfer1Bytes[0 : 4+8+2+32+2+32+2+3])
  405. assert.ErrorContains(t, err, "failed to read msg id")
  406. // Truncate the hash length.
  407. _, err = UnmarshalTransfer(xfer1Bytes[0 : 4+8+2+32+2+32+2+82+2-1])
  408. assert.ErrorContains(t, err, "failed to read hash length: ")
  409. // Truncate the hash data.
  410. _, err = UnmarshalTransfer(xfer1Bytes[0 : 4+8+2+32+2+32+2+82+2+3])
  411. assert.ErrorContains(t, err, "failed to read hash")
  412. // Truncate the target chain.
  413. _, err = UnmarshalTransfer(xfer1Bytes[0 : 4+8+2+32+2+32+2+82+2+5+2-1])
  414. assert.ErrorContains(t, err, "failed to read target chain id: ")
  415. // Truncate the target address.
  416. _, err = UnmarshalTransfer(xfer1Bytes[0 : 4+8+2+32+2+32+2+82+2+5+2+32-1])
  417. assert.ErrorContains(t, err, "failed to read target address")
  418. }
  419. // Note that PendingTransfer.Marshal can't fail, so there are no negative tests for that.
  420. func TestUnmarshalPendingTransferFailures(t *testing.T) {
  421. tokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
  422. require.NoError(t, err)
  423. msg := common.MessagePublication{
  424. TxID: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(),
  425. Timestamp: time.Unix(int64(1654516425), 0),
  426. Nonce: 123456,
  427. Sequence: 789101112131415,
  428. EmitterChain: vaa.ChainIDEthereum,
  429. EmitterAddress: tokenBridgeAddr,
  430. Payload: []byte{4, 0, 0, 0, 0, 0, 0, 0, 0, 0},
  431. ConsistencyLevel: 16,
  432. IsReobservation: true,
  433. }
  434. pending1 := &PendingTransfer{
  435. ReleaseTime: time.Unix(int64(1654516425+72*60*60), 0),
  436. Msg: msg,
  437. }
  438. pending1Bytes, err := pending1.Marshal()
  439. require.NoError(t, err)
  440. // First make sure regular unmarshal works.
  441. pending2, err := UnmarshalPendingTransfer(pending1Bytes, false)
  442. require.NoError(t, err)
  443. assert.Equal(t, pending1, pending2)
  444. // Truncate the release time.
  445. _, err = UnmarshalPendingTransfer(pending1Bytes[0:4-1], false)
  446. assert.ErrorContains(t, err, "failed to read pending transfer release time: ")
  447. // The remainder is the marshaled message publication as a single buffer.
  448. // Truncate the entire serialized message.
  449. _, err = UnmarshalPendingTransfer(pending1Bytes[0:4], false)
  450. assert.ErrorContains(t, err, "failed to read pending transfer msg")
  451. // Truncate some of the serialized message.
  452. _, err = UnmarshalPendingTransfer(pending1Bytes[0:len(pending1Bytes)-10], false)
  453. assert.ErrorContains(t, err, "failed to unmarshal pending transfer msg")
  454. }
  455. func (d *Database) storeOldPendingMsg(t *testing.T, p *PendingTransfer) {
  456. buf := new(bytes.Buffer)
  457. vaa.MustWrite(buf, binary.BigEndian, uint32(p.ReleaseTime.Unix())) // #nosec G115 -- This conversion is safe until year 2106
  458. b := marshalOldMessagePublication(&p.Msg)
  459. vaa.MustWrite(buf, binary.BigEndian, b)
  460. err := d.db.Update(func(txn *badger.Txn) error {
  461. if err := txn.Set(oldPendingMsgID(&p.Msg), buf.Bytes()); err != nil {
  462. return err
  463. }
  464. return nil
  465. })
  466. require.NoError(t, err)
  467. }
  468. func marshalOldMessagePublication(msg *common.MessagePublication) []byte {
  469. buf := new(bytes.Buffer)
  470. buf.Write(msg.TxID[:])
  471. vaa.MustWrite(buf, binary.BigEndian, uint32(msg.Timestamp.Unix())) // #nosec G115 -- This conversion is safe until year 2106
  472. vaa.MustWrite(buf, binary.BigEndian, msg.Nonce)
  473. vaa.MustWrite(buf, binary.BigEndian, msg.Sequence)
  474. vaa.MustWrite(buf, binary.BigEndian, msg.ConsistencyLevel)
  475. vaa.MustWrite(buf, binary.BigEndian, msg.EmitterChain)
  476. buf.Write(msg.EmitterAddress[:])
  477. vaa.MustWrite(buf, binary.BigEndian, msg.IsReobservation)
  478. buf.Write(msg.Payload)
  479. return buf.Bytes()
  480. }
  481. func TestLoadingOldPendingTransfers(t *testing.T) {
  482. dbPath := t.TempDir()
  483. db := OpenDb(zap.NewNop(), &dbPath)
  484. defer db.Close()
  485. defer os.Remove(dbPath)
  486. ethereumTokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
  487. require.NoError(t, err)
  488. bscTokenBridgeAddr, err := vaa.StringToAddress("0x26b4afb60d6c903165150c6f0aa14f8016be4aec")
  489. require.NoError(t, err)
  490. tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
  491. require.NoError(t, err)
  492. oldXfer1 := &Transfer{
  493. Timestamp: time.Unix(int64(1654516425), 0),
  494. Value: 125000,
  495. OriginChain: vaa.ChainIDEthereum,
  496. OriginAddress: tokenAddr,
  497. EmitterChain: vaa.ChainIDEthereum,
  498. EmitterAddress: ethereumTokenBridgeAddr,
  499. // Don't set TargetChain or TargetAddress.
  500. MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415",
  501. Hash: "Hash1",
  502. }
  503. err = db.storeOldTransfer(oldXfer1)
  504. require.NoError(t, err)
  505. newXfer1 := &Transfer{
  506. Timestamp: time.Unix(int64(1654516426), 0),
  507. Value: 125000,
  508. OriginChain: vaa.ChainIDEthereum,
  509. OriginAddress: tokenAddr,
  510. EmitterChain: vaa.ChainIDEthereum,
  511. EmitterAddress: ethereumTokenBridgeAddr,
  512. TargetChain: vaa.ChainIDBSC,
  513. TargetAddress: bscTokenBridgeAddr,
  514. MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131416",
  515. Hash: "Hash1",
  516. }
  517. err = db.StoreTransfer(newXfer1)
  518. require.NoError(t, err)
  519. oldXfer2 := &Transfer{
  520. Timestamp: time.Unix(int64(1654516427), 0),
  521. Value: 125000,
  522. OriginChain: vaa.ChainIDEthereum,
  523. OriginAddress: tokenAddr,
  524. EmitterChain: vaa.ChainIDEthereum,
  525. EmitterAddress: ethereumTokenBridgeAddr,
  526. // Don't set TargetChain or TargetAddress.
  527. MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131417",
  528. Hash: "Hash2",
  529. }
  530. err = db.storeOldTransfer(oldXfer2)
  531. require.NoError(t, err)
  532. newXfer2 := &Transfer{
  533. Timestamp: time.Unix(int64(1654516428), 0),
  534. Value: 125000,
  535. OriginChain: vaa.ChainIDEthereum,
  536. OriginAddress: tokenAddr,
  537. EmitterChain: vaa.ChainIDEthereum,
  538. EmitterAddress: ethereumTokenBridgeAddr,
  539. TargetChain: vaa.ChainIDBSC,
  540. TargetAddress: bscTokenBridgeAddr,
  541. MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131418",
  542. Hash: "Hash2",
  543. }
  544. err = db.StoreTransfer(newXfer2)
  545. require.NoError(t, err)
  546. // Write the first pending event in the old format.
  547. now := time.Unix(time.Now().Unix(), 0)
  548. pending1 := &PendingTransfer{
  549. ReleaseTime: now.Add(time.Hour * 71), // Setting it to 71 hours so we can confirm it didn't get set to the default.,
  550. Msg: common.MessagePublication{
  551. TxID: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(),
  552. Timestamp: now,
  553. Nonce: 123456,
  554. Sequence: 789101112131417,
  555. EmitterChain: vaa.ChainIDEthereum,
  556. EmitterAddress: ethereumTokenBridgeAddr,
  557. Payload: []byte{4, 0, 0, 0, 0, 0, 0, 0, 0, 0},
  558. ConsistencyLevel: 16,
  559. // IsReobservation will not be serialized. It should be set to false on reload.
  560. },
  561. }
  562. db.storeOldPendingMsg(t, pending1)
  563. require.NoError(t, err)
  564. // Write the second one in the new format.
  565. now = now.Add(time.Second * 5)
  566. pending2 := &PendingTransfer{
  567. ReleaseTime: now.Add(time.Hour * 71), // Setting it to 71 hours so we can confirm it didn't get set to the default.
  568. Msg: common.MessagePublication{
  569. TxID: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(),
  570. Timestamp: now,
  571. Nonce: 123456,
  572. Sequence: 789101112131418,
  573. EmitterChain: vaa.ChainIDEthereum,
  574. EmitterAddress: ethereumTokenBridgeAddr,
  575. Payload: []byte{4, 0, 0, 0, 0, 0, 0, 0, 0, 0},
  576. ConsistencyLevel: 16,
  577. IsReobservation: true,
  578. },
  579. }
  580. err = db.StorePendingMsg(pending2)
  581. require.NoError(t, err)
  582. // Write the third pending event in the old format.
  583. now = now.Add(time.Second * 5)
  584. pending3 := &PendingTransfer{
  585. ReleaseTime: now.Add(time.Hour * 71), // Setting it to 71 hours so we can confirm it didn't get set to the default.,
  586. Msg: common.MessagePublication{
  587. TxID: eth_common.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4064").Bytes(),
  588. Timestamp: now,
  589. Nonce: 123456,
  590. Sequence: 789101112131419,
  591. EmitterChain: vaa.ChainIDEthereum,
  592. EmitterAddress: ethereumTokenBridgeAddr,
  593. Payload: []byte{4, 0, 0, 0, 0, 0, 0, 0, 0, 0},
  594. ConsistencyLevel: 16,
  595. // IsReobservation will not be serialized. It should be set to false on reload.
  596. },
  597. }
  598. db.storeOldPendingMsg(t, pending3)
  599. require.NoError(t, err)
  600. logger, zapObserver := setupLogsCapture(t)
  601. xfers, pendings, err := db.GetChainGovernorDataForTime(logger, now)
  602. require.NoError(t, err)
  603. require.Equal(t, 4, len(xfers))
  604. require.Equal(t, 3, len(pendings))
  605. // Verify that we converted the two old pending transfers and the two old completed transfers.
  606. loggedEntries := zapObserver.FilterMessage("updating format of database entry for pending vaa").All()
  607. require.Equal(t, 2, len(loggedEntries))
  608. loggedEntries = zapObserver.FilterMessage("updating format of database entry for completed transfer").All()
  609. require.Equal(t, 2, len(loggedEntries))
  610. sort.SliceStable(xfers, func(i, j int) bool {
  611. return xfers[i].Timestamp.Before(xfers[j].Timestamp)
  612. })
  613. assert.Equal(t, oldXfer1, xfers[0])
  614. assert.Equal(t, newXfer1, xfers[1])
  615. assert.Equal(t, oldXfer2, xfers[2])
  616. assert.Equal(t, newXfer2, xfers[3])
  617. // Updated old pending events get placed at the end, so we need to sort into timestamp order.
  618. sort.SliceStable(pendings, func(i, j int) bool {
  619. return pendings[i].Msg.Timestamp.Before(pendings[j].Msg.Timestamp)
  620. })
  621. assert.Equal(t, pending1.Msg, pendings[0].Msg)
  622. assert.Equal(t, pending2.Msg, pendings[1].Msg)
  623. assert.Equal(t, pending3.Msg, pendings[2].Msg)
  624. // Make sure we can reload the updated pendings.
  625. logger, zapObserver = setupLogsCapture(t)
  626. xfers2, pendings2, err := db.GetChainGovernorDataForTime(logger, now)
  627. require.NoError(t, err)
  628. require.Equal(t, 4, len(xfers2))
  629. require.Equal(t, 3, len(pendings2))
  630. // This time we shouldn't have updated anything.
  631. loggedEntries = zapObserver.FilterMessage("updating format of database entry for pending vaa").All()
  632. require.Equal(t, 0, len(loggedEntries))
  633. loggedEntries = zapObserver.FilterMessage("updating format of database entry for completed transfer").All()
  634. require.Equal(t, 0, len(loggedEntries))
  635. sort.SliceStable(xfers2, func(i, j int) bool {
  636. return xfers2[i].Timestamp.Before(xfers2[j].Timestamp)
  637. })
  638. assert.Equal(t, oldXfer1, xfers2[0])
  639. assert.Equal(t, newXfer1, xfers2[1])
  640. assert.Equal(t, oldXfer2, xfers2[2])
  641. assert.Equal(t, newXfer2, xfers2[3])
  642. assert.Equal(t, pending1.Msg, pendings2[0].Msg)
  643. assert.Equal(t, pending2.Msg, pendings2[1].Msg)
  644. }
  645. func marshalOldTransfer(xfer *Transfer) ([]byte, error) {
  646. buf := new(bytes.Buffer)
  647. vaa.MustWrite(buf, binary.BigEndian, uint32(xfer.Timestamp.Unix())) // #nosec G115 -- This conversion is safe until year 2106
  648. vaa.MustWrite(buf, binary.BigEndian, xfer.Value)
  649. vaa.MustWrite(buf, binary.BigEndian, xfer.OriginChain)
  650. buf.Write(xfer.OriginAddress[:])
  651. vaa.MustWrite(buf, binary.BigEndian, xfer.EmitterChain)
  652. buf.Write(xfer.EmitterAddress[:])
  653. if len(xfer.MsgID) > math.MaxUint16 {
  654. return nil, fmt.Errorf("failed to marshal MsgID, length too long: %d", len(xfer.MsgID))
  655. }
  656. vaa.MustWrite(buf, binary.BigEndian, uint16(len(xfer.MsgID))) // #nosec G115 -- This conversion is checked above
  657. if len(xfer.MsgID) > 0 {
  658. buf.Write([]byte(xfer.MsgID))
  659. }
  660. if len(xfer.Hash) > math.MaxUint16 {
  661. return nil, fmt.Errorf("failed to marshal Hash, length too long: %d", len(xfer.Hash))
  662. }
  663. vaa.MustWrite(buf, binary.BigEndian, uint16(len(xfer.Hash))) // #nosec G115 -- This conversion is checked above
  664. if len(xfer.Hash) > 0 {
  665. buf.Write([]byte(xfer.Hash))
  666. }
  667. return buf.Bytes(), nil
  668. }
  669. func (d *Database) storeOldTransfer(xfer *Transfer) error {
  670. key := []byte(fmt.Sprintf("%v%v", oldTransfer, xfer.MsgID))
  671. b, err := marshalOldTransfer(xfer)
  672. if err != nil {
  673. return err
  674. }
  675. return d.db.Update(func(txn *badger.Txn) error {
  676. if err := txn.Set(key, b); err != nil {
  677. return err
  678. }
  679. return nil
  680. })
  681. }
  682. func TestDeserializeOfOldTransfer(t *testing.T) {
  683. tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
  684. require.NoError(t, err)
  685. ethereumTokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
  686. require.NoError(t, err)
  687. xfer1 := &Transfer{
  688. Timestamp: time.Unix(int64(1654516425), 0),
  689. Value: 125000,
  690. OriginChain: vaa.ChainIDEthereum,
  691. OriginAddress: tokenAddr,
  692. EmitterChain: vaa.ChainIDEthereum,
  693. EmitterAddress: ethereumTokenBridgeAddr,
  694. // Don't set TargetChain or TargetAddress.
  695. MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415",
  696. Hash: "Hash1",
  697. }
  698. xfer1Bytes, err := marshalOldTransfer(xfer1)
  699. require.NoError(t, err)
  700. xfer2, err := unmarshalOldTransfer(xfer1Bytes)
  701. require.NoError(t, err)
  702. assert.Equal(t, xfer1, xfer2)
  703. expectedTransferKey := "GOV:XFER3:2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415"
  704. assert.Equal(t, expectedTransferKey, string(TransferMsgID(xfer2)))
  705. }
  706. func TestOldTransfersUpdatedWhenReloading(t *testing.T) {
  707. dbPath := t.TempDir()
  708. db := OpenDb(zap.NewNop(), &dbPath)
  709. defer db.Close()
  710. defer os.Remove(dbPath)
  711. ethereumTokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16")
  712. require.NoError(t, err)
  713. bscTokenBridgeAddr, err := vaa.StringToAddress("0x26b4afb60d6c903165150c6f0aa14f8016be4aec")
  714. require.NoError(t, err)
  715. tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8")
  716. require.NoError(t, err)
  717. // Write the first transfer in the old format.
  718. xfer1 := &Transfer{
  719. Timestamp: time.Unix(int64(1654516425), 0),
  720. Value: 125000,
  721. OriginChain: vaa.ChainIDEthereum,
  722. OriginAddress: tokenAddr,
  723. EmitterChain: vaa.ChainIDEthereum,
  724. EmitterAddress: ethereumTokenBridgeAddr,
  725. // Don't set TargetChain or TargetAddress.
  726. MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131415",
  727. // Do not set the Hash.
  728. }
  729. err = db.storeOldTransfer(xfer1)
  730. require.NoError(t, err)
  731. // Write the second one in the new format.
  732. xfer2 := &Transfer{
  733. Timestamp: time.Unix(int64(1654516430), 0),
  734. Value: 125000,
  735. OriginChain: vaa.ChainIDEthereum,
  736. OriginAddress: tokenAddr,
  737. EmitterChain: vaa.ChainIDEthereum,
  738. EmitterAddress: ethereumTokenBridgeAddr,
  739. TargetChain: vaa.ChainIDBSC,
  740. TargetAddress: bscTokenBridgeAddr,
  741. MsgID: "2/0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16/789101112131416",
  742. Hash: "Hash2",
  743. }
  744. err = db.StoreTransfer(xfer2)
  745. require.NoError(t, err)
  746. now := time.Unix(time.Now().Unix(), 0)
  747. logger := zap.NewNop()
  748. xfers, pendings, err := db.GetChainGovernorDataForTime(logger, now)
  749. require.NoError(t, err)
  750. require.Equal(t, 2, len(xfers))
  751. require.Equal(t, 0, len(pendings))
  752. // Updated old pending events get placed at the end, so we need to sort into timestamp order.
  753. sort.SliceStable(xfers, func(i, j int) bool {
  754. return xfers[i].Timestamp.Before(xfers[j].Timestamp)
  755. })
  756. assert.Equal(t, xfer1, xfers[0])
  757. assert.Equal(t, xfer2, xfers[1])
  758. // Make sure the old transfer got dropped from the database and rewritten in the new format.
  759. assert.ErrorIs(t, badger.ErrKeyNotFound, db.rowExistsInDB(oldTransferMsgID(xfer1)))
  760. assert.NoError(t, db.rowExistsInDB(TransferMsgID(xfer1)))
  761. // And make sure the other transfer is still there.
  762. assert.NoError(t, db.rowExistsInDB(TransferMsgID(xfer2)))
  763. // Make sure we can still read the database after the conversion.
  764. xfers, pendings, err = db.GetChainGovernorDataForTime(logger, now)
  765. require.NoError(t, err)
  766. require.Equal(t, 2, len(xfers))
  767. require.Equal(t, 0, len(pendings))
  768. // Updated old pending events get placed at the end, so we need to sort into timestamp order.
  769. sort.SliceStable(xfers, func(i, j int) bool {
  770. return xfers[i].Timestamp.Before(xfers[j].Timestamp)
  771. })
  772. assert.Equal(t, xfer1, xfers[0])
  773. assert.Equal(t, xfer2, xfers[1])
  774. }