db.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. package db
  2. import (
  3. "errors"
  4. "fmt"
  5. "strconv"
  6. "strings"
  7. "github.com/dgraph-io/badger/v3"
  8. "github.com/prometheus/client_golang/prometheus"
  9. "github.com/prometheus/client_golang/prometheus/promauto"
  10. "github.com/wormhole-foundation/wormhole/sdk/vaa"
  11. )
  12. var storedVaaTotal = promauto.NewCounter(
  13. prometheus.CounterOpts{
  14. Name: "wormhole_db_total_vaas",
  15. Help: "Total number of VAAs added to database",
  16. })
  17. type Database struct {
  18. db *badger.DB
  19. }
  20. type VAAID struct {
  21. EmitterChain vaa.ChainID
  22. EmitterAddress vaa.Address
  23. Sequence uint64
  24. }
  25. // VaaIDFromString parses a <chain>/<address>/<sequence> string into a VAAID.
  26. func VaaIDFromString(s string) (*VAAID, error) {
  27. parts := strings.Split(s, "/")
  28. if len(parts) != 3 {
  29. return nil, errors.New("invalid message id")
  30. }
  31. emitterChain, err := strconv.ParseUint(parts[0], 10, 16)
  32. if err != nil {
  33. return nil, fmt.Errorf("invalid emitter chain: %s", err)
  34. }
  35. emitterAddress, err := vaa.StringToAddress(parts[1])
  36. if err != nil {
  37. return nil, fmt.Errorf("invalid emitter address: %s", err)
  38. }
  39. sequence, err := strconv.ParseUint(parts[2], 10, 64)
  40. if err != nil {
  41. return nil, fmt.Errorf("invalid sequence: %s", err)
  42. }
  43. msgId := &VAAID{
  44. EmitterChain: vaa.ChainID(emitterChain),
  45. EmitterAddress: emitterAddress,
  46. Sequence: sequence,
  47. }
  48. return msgId, nil
  49. }
  50. func VaaIDFromVAA(v *vaa.VAA) *VAAID {
  51. return &VAAID{
  52. EmitterChain: v.EmitterChain,
  53. EmitterAddress: v.EmitterAddress,
  54. Sequence: v.Sequence,
  55. }
  56. }
  57. var (
  58. ErrVAANotFound = errors.New("requested VAA not found in store")
  59. nullAddr = vaa.Address{}
  60. )
  61. func (i *VAAID) Bytes() []byte {
  62. return []byte(fmt.Sprintf("signed/%d/%s/%d", i.EmitterChain, i.EmitterAddress, i.Sequence))
  63. }
  64. func (i *VAAID) EmitterPrefixBytes() []byte {
  65. if i.EmitterAddress == nullAddr {
  66. return []byte(fmt.Sprintf("signed/%d", i.EmitterChain))
  67. }
  68. return []byte(fmt.Sprintf("signed/%d/%s", i.EmitterChain, i.EmitterAddress))
  69. }
  70. func (d *Database) Close() error {
  71. return d.db.Close()
  72. }
  73. func (d *Database) StoreSignedVAA(v *vaa.VAA) error {
  74. if len(v.Signatures) == 0 {
  75. panic("StoreSignedVAA called for unsigned VAA")
  76. }
  77. b, _ := v.Marshal()
  78. // We allow overriding of existing VAAs, since there are multiple ways to
  79. // acquire signed VAA bytes. For instance, the node may have a signed VAA
  80. // via gossip before it reaches quorum on its own. The new entry may have
  81. // a different set of signatures, but the same VAA.
  82. //
  83. // TODO: panic on non-identical signing digest?
  84. err := d.db.Update(func(txn *badger.Txn) error {
  85. if err := txn.Set(VaaIDFromVAA(v).Bytes(), b); err != nil {
  86. return err
  87. }
  88. return nil
  89. })
  90. if err != nil {
  91. return fmt.Errorf("failed to commit tx: %w", err)
  92. }
  93. storedVaaTotal.Inc()
  94. return nil
  95. }
  96. // StoreSignedVAABatch writes multiple VAAs to the database using the BadgerDB batch API.
  97. // Note that the API takes care of splitting up the slice into the maximum allowed count
  98. // and size so we don't need to worry about that.
  99. func (d *Database) StoreSignedVAABatch(vaaBatch []*vaa.VAA) error {
  100. batchTx := d.db.NewWriteBatch()
  101. defer batchTx.Cancel()
  102. for _, v := range vaaBatch {
  103. if len(v.Signatures) == 0 {
  104. panic("StoreSignedVAABatch called for unsigned VAA")
  105. }
  106. b, err := v.Marshal()
  107. if err != nil {
  108. panic("StoreSignedVAABatch failed to marshal VAA")
  109. }
  110. err = batchTx.Set(VaaIDFromVAA(v).Bytes(), b)
  111. if err != nil {
  112. return err
  113. }
  114. }
  115. // Wait for the batch to finish.
  116. err := batchTx.Flush()
  117. storedVaaTotal.Add(float64(len(vaaBatch)))
  118. return err
  119. }
  120. func (d *Database) HasVAA(id VAAID) (bool, error) {
  121. err := d.db.View(func(txn *badger.Txn) error {
  122. _, err := txn.Get(id.Bytes())
  123. return err
  124. })
  125. if err == nil {
  126. return true, nil
  127. }
  128. if errors.Is(err, badger.ErrKeyNotFound) {
  129. return false, nil
  130. }
  131. return false, err
  132. }
  133. func (d *Database) GetSignedVAABytes(id VAAID) (b []byte, err error) {
  134. if err := d.db.View(func(txn *badger.Txn) error {
  135. item, err := txn.Get(id.Bytes())
  136. if err != nil {
  137. return err
  138. }
  139. if val, err := item.ValueCopy(nil); err != nil {
  140. return err
  141. } else {
  142. b = val
  143. }
  144. return nil
  145. }); err != nil {
  146. if errors.Is(err, badger.ErrKeyNotFound) {
  147. return nil, ErrVAANotFound
  148. }
  149. return nil, err
  150. }
  151. return
  152. }
  153. func (d *Database) FindEmitterSequenceGap(prefix VAAID) (resp []uint64, firstSeq uint64, lastSeq uint64, err error) {
  154. resp = make([]uint64, 0)
  155. if err = d.db.View(func(txn *badger.Txn) error {
  156. it := txn.NewIterator(badger.DefaultIteratorOptions)
  157. defer it.Close()
  158. prefix := prefix.EmitterPrefixBytes()
  159. // Find all sequence numbers (the message IDs are ordered lexicographically,
  160. // rather than numerically, so we need to sort them in-memory).
  161. seqs := make(map[uint64]bool)
  162. for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
  163. item := it.Item()
  164. key := item.Key()
  165. err := item.Value(func(val []byte) error {
  166. v, err := vaa.Unmarshal(val)
  167. if err != nil {
  168. return fmt.Errorf("failed to unmarshal VAA for %s: %v", string(key), err)
  169. }
  170. seqs[v.Sequence] = true
  171. return nil
  172. })
  173. if err != nil {
  174. return err
  175. }
  176. }
  177. // Find min/max (yay lack of Go generics)
  178. first := false
  179. for k := range seqs {
  180. if first {
  181. firstSeq = k
  182. first = false
  183. }
  184. if k < firstSeq {
  185. firstSeq = k
  186. }
  187. if k > lastSeq {
  188. lastSeq = k
  189. }
  190. }
  191. // Figure out gaps.
  192. for i := firstSeq; i <= lastSeq; i++ {
  193. if !seqs[i] {
  194. resp = append(resp, i)
  195. }
  196. }
  197. return nil
  198. }); err != nil {
  199. return
  200. }
  201. return
  202. }
  203. // Conn returns a pointer to the underlying database connection.
  204. func (d *Database) Conn() *badger.DB {
  205. return d.db
  206. }