db.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. package db
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/certusone/wormhole/node/pkg/vaa"
  6. "github.com/dgraph-io/badger/v3"
  7. )
  8. type Database struct {
  9. db *badger.DB
  10. }
  11. type VAAID struct {
  12. EmitterChain vaa.ChainID
  13. EmitterAddress vaa.Address
  14. Sequence uint64
  15. }
  16. func VaaIDFromVAA(v *vaa.VAA) *VAAID {
  17. return &VAAID{
  18. EmitterChain: v.EmitterChain,
  19. EmitterAddress: v.EmitterAddress,
  20. Sequence: v.Sequence,
  21. }
  22. }
  23. var (
  24. ErrVAANotFound = errors.New("requested VAA not found in store")
  25. )
  26. func (i *VAAID) Bytes() []byte {
  27. return []byte(fmt.Sprintf("signed/%d/%s/%d", i.EmitterChain, i.EmitterAddress, i.Sequence))
  28. }
  29. func (i *VAAID) EmitterPrefixBytes() []byte {
  30. return []byte(fmt.Sprintf("signed/%d/%s", i.EmitterChain, i.EmitterAddress))
  31. }
  32. func Open(path string) (*Database, error) {
  33. db, err := badger.Open(badger.DefaultOptions(path))
  34. if err != nil {
  35. return nil, fmt.Errorf("failed to open database: %w", err)
  36. }
  37. return &Database{
  38. db: db,
  39. }, nil
  40. }
  41. func (d *Database) Close() error {
  42. return d.db.Close()
  43. }
  44. func (d *Database) StoreSignedVAA(v *vaa.VAA) error {
  45. if len(v.Signatures) == 0 {
  46. panic("StoreSignedVAA called for unsigned VAA")
  47. }
  48. b, _ := v.Marshal()
  49. // We allow overriding of existing VAAs, since there are multiple ways to
  50. // acquire signed VAA bytes. For instance, the node may have a signed VAA
  51. // via gossip before it reaches quorum on its own. The new entry may have
  52. // a different set of signatures, but the same VAA.
  53. //
  54. // TODO: panic on non-identical signing digest?
  55. err := d.db.Update(func(txn *badger.Txn) error {
  56. if err := txn.Set(VaaIDFromVAA(v).Bytes(), b); err != nil {
  57. return err
  58. }
  59. return nil
  60. })
  61. if err != nil {
  62. return fmt.Errorf("failed to commit tx: %w", err)
  63. }
  64. return nil
  65. }
  66. func (d *Database) GetSignedVAABytes(id VAAID) (b []byte, err error) {
  67. if err := d.db.View(func(txn *badger.Txn) error {
  68. item, err := txn.Get(id.Bytes())
  69. if err != nil {
  70. return err
  71. }
  72. if val, err := item.ValueCopy(nil); err != nil {
  73. return err
  74. } else {
  75. b = val
  76. }
  77. return nil
  78. }); err != nil {
  79. if err == badger.ErrKeyNotFound {
  80. return nil, ErrVAANotFound
  81. }
  82. return nil, err
  83. }
  84. return
  85. }
  86. func (d *Database) FindEmitterSequenceGap(prefix VAAID) (resp []uint64, firstSeq uint64, lastSeq uint64, err error) {
  87. resp = make([]uint64, 0)
  88. if err = d.db.View(func(txn *badger.Txn) error {
  89. it := txn.NewIterator(badger.DefaultIteratorOptions)
  90. defer it.Close()
  91. prefix := prefix.EmitterPrefixBytes()
  92. // Find all sequence numbers (the message IDs are ordered lexicographically,
  93. // rather than numerically, so we need to sort them in-memory).
  94. seqs := make(map[uint64]bool)
  95. for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
  96. item := it.Item()
  97. key := item.Key()
  98. err := item.Value(func(val []byte) error {
  99. v, err := vaa.Unmarshal(val)
  100. if err != nil {
  101. return fmt.Errorf("failed to unmarshal VAA for %s: %v", string(key), err)
  102. }
  103. seqs[v.Sequence] = true
  104. return nil
  105. })
  106. if err != nil {
  107. return err
  108. }
  109. }
  110. // Find min/max (yay lack of Go generics)
  111. first := false
  112. for k := range seqs {
  113. if first {
  114. firstSeq = k
  115. first = false
  116. }
  117. if k < firstSeq {
  118. firstSeq = k
  119. }
  120. if k > lastSeq {
  121. lastSeq = k
  122. }
  123. }
  124. // Figure out gaps.
  125. for i := firstSeq; i <= lastSeq; i++ {
  126. if !seqs[i] {
  127. fmt.Printf("missing: %d\n", i)
  128. resp = append(resp, i)
  129. }
  130. }
  131. return nil
  132. }); err != nil {
  133. return
  134. }
  135. return
  136. }