repair.go 8.3 KB


  1. package main
  2. import (
  3. "context"
  4. "encoding/hex"
  5. "flag"
  6. "fmt"
  7. "log"
  8. "net/http"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "github.com/certusone/wormhole/node/pkg/db"
  13. gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
  14. nodev1 "github.com/certusone/wormhole/node/pkg/proto/node/v1"
  15. "github.com/gagliardetto/solana-go"
  16. "github.com/gagliardetto/solana-go/rpc"
  17. "github.com/wormhole-foundation/wormhole/sdk"
  18. "github.com/wormhole-foundation/wormhole/sdk/vaa"
  19. "golang.org/x/time/rate"
  20. "google.golang.org/grpc"
  21. "google.golang.org/grpc/credentials/insecure"
  22. )
  23. var (
  24. solanaRPC = flag.String("solanaRPC", "http://localhost:8899", "Solana RPC address")
  25. adminRPC = flag.String("adminRPC", "/run/guardiand/admin.socket", "Admin RPC address")
  26. solanaAddr = flag.String("solanaProgram", "worm2ZoG2kUd4vFXhvjh93UUH596ayRfgQ2MgjNMTth", "Solana program address")
  27. )
  28. const (
  29. postMessageInstructionID = 0x01
  30. )
  31. func getAdminClient(ctx context.Context, addr string) (*grpc.ClientConn, nodev1.NodePrivilegedServiceClient, error) {
  32. conn, err := grpc.DialContext(ctx, fmt.Sprintf("unix:///%s", addr), grpc.WithTransportCredentials(insecure.NewCredentials()))
  33. if err != nil {
  34. log.Fatalf("failed to connect to %s: %v", addr, err)
  35. }
  36. c := nodev1.NewNodePrivilegedServiceClient(conn)
  37. return conn, c, err
  38. }
  39. func main() {
  40. flag.Parse()
  41. ctx := context.Background()
  42. sr := rpc.New(*solanaRPC)
  43. conn, admin, err := getAdminClient(ctx, *adminRPC)
  44. if err != nil {
  45. conn.Close()
  46. log.Fatalf("failed to get admin client: %v", err)
  47. }
  48. defer conn.Close()
  49. for _, emitter := range sdk.KnownEmitters {
  50. if emitter.ChainID != vaa.ChainIDSolana {
  51. continue
  52. }
  53. log.Printf("Requesting missing messages for %s", emitter)
  54. msg := nodev1.FindMissingMessagesRequest{
  55. EmitterChain: uint32(vaa.ChainIDSolana),
  56. EmitterAddress: emitter.Emitter,
  57. RpcBackfill: true,
  58. BackfillNodes: sdk.PublicRPCEndpoints,
  59. }
  60. resp, err := admin.FindMissingMessages(ctx, &msg)
  61. if err != nil {
  62. log.Fatalf("failed to run find FindMissingMessages RPC: %v", err)
  63. }
  64. msgs := make([]*db.VAAID, len(resp.MissingMessages))
  65. for i, id := range resp.MissingMessages {
  66. fmt.Println(id)
  67. vId, err := db.VaaIDFromString(id)
  68. if err != nil {
  69. log.Fatalf("failed to parse VAAID: %v", err)
  70. }
  71. msgs[i] = vId
  72. }
  73. if len(msgs) == 0 {
  74. log.Printf("No missing messages found for %s", emitter)
  75. continue
  76. }
  77. lowest := msgs[0].Sequence
  78. highest := msgs[len(msgs)-1].Sequence
  79. log.Printf("Found %d missing messages for %s: %d - %d", len(msgs), emitter, lowest, highest)
  80. limiter := rate.NewLimiter(rate.Every(100*time.Millisecond), 10)
  81. var before solana.Signature
  82. decoded, err := hex.DecodeString(emitter.Emitter)
  83. if err != nil {
  84. log.Fatalf("Failed to decode emitter address: %v", err)
  85. }
  86. addr := solana.PublicKeyFromBytes(decoded)
  87. hc := http.Client{Timeout: 10 * time.Second}
  88. log.Printf("Starting repair for %s (%s)", emitter, addr)
  89. for {
  90. err := limiter.Wait(ctx)
  91. if err != nil {
  92. log.Fatal(err)
  93. }
  94. limit := 100
  95. txs, err := sr.GetSignaturesForAddressWithOpts(ctx, addr, &rpc.GetSignaturesForAddressOpts{
  96. Limit: &limit,
  97. Before: before,
  98. })
  99. if err != nil {
  100. log.Fatalf("GetConfirmedSignaturesForAddress2 %s: %v", emitter, err)
  101. }
  102. if len(txs) == 0 {
  103. log.Printf("fetched all txes for %s (%s)", emitter, addr)
  104. break
  105. }
  106. var lastSeq, firstSeq uint64
  107. var last solana.Signature
  108. for i := 0; lastSeq == 0; i-- {
  109. log.Printf("lastSeq offset: %d", i)
  110. last = txs[len(txs)-1+i].Signature
  111. _, lastSeq, err = fetchTxSeq(ctx, sr, last)
  112. if err != nil {
  113. log.Fatalf("fetch last tx seq: %v", err)
  114. }
  115. }
  116. for i := 0; firstSeq == 0; i++ {
  117. log.Printf("firstSeq offset: %d", i)
  118. _, firstSeq, err = fetchTxSeq(ctx, sr, txs[i].Signature)
  119. if err != nil {
  120. log.Fatalf("fetch first tx seq: %v", err)
  121. }
  122. }
  123. log.Printf("fetched %d transactions, from %s (%d) to %s (%d)",
  124. len(txs), txs[0].Signature, firstSeq, last, lastSeq)
  125. if highest < lastSeq {
  126. log.Printf("skipping (%d < %d)", highest, lastSeq)
  127. goto skip
  128. }
  129. if lowest > firstSeq {
  130. log.Printf("done (%d < %d)", lowest, lastSeq)
  131. break
  132. }
  133. for _, p := range msgs {
  134. if p.Sequence > lastSeq && p.Sequence < firstSeq {
  135. offset := firstSeq - p.Sequence - 10
  136. log.Printf("repairing: %d (offset %d)", p.Sequence, offset)
  137. var tx *rpc.GetTransactionResult
  138. var nseq uint64
  139. var err error
  140. for {
  141. if offset >= uint64(len(txs)) {
  142. log.Fatalf("out of range at offset %d", offset)
  143. }
  144. tx, nseq, err = fetchTxSeq(ctx, sr, txs[offset].Signature)
  145. if err != nil {
  146. log.Fatalf("failed to fetch %s at offset %d: %v", txs[offset].Signature, offset, err)
  147. }
  148. if tx == nil {
  149. offset += 1
  150. log.Printf("not a Wormhole tx, offset +1")
  151. time.Sleep(1 * time.Second)
  152. continue
  153. }
  154. if nseq != p.Sequence {
  155. offset += 1
  156. log.Printf("%d != %d, delta +%d, offset +%d", nseq, p.Sequence, nseq-p.Sequence, offset)
  157. time.Sleep(1 * time.Second)
  158. continue
  159. } else {
  160. break
  161. }
  162. }
  163. acc, err := process(tx)
  164. if err != nil {
  165. log.Fatalf("process: %v", err)
  166. }
  167. log.Printf("found account %v (%s)", acc, hex.EncodeToString(acc[:]))
  168. _, err = admin.SendObservationRequest(ctx, &nodev1.SendObservationRequestRequest{
  169. ObservationRequest: &gossipv1.ObservationRequest{
  170. ChainId: uint32(vaa.ChainIDSolana),
  171. TxHash: acc[:],
  172. Timestamp: time.Now().UnixNano(),
  173. }})
  174. if err != nil {
  175. log.Fatalf("SendObservationRequest: %v", err)
  176. }
  177. for {
  178. log.Printf("verifying %d", p.Sequence)
  179. req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf(
  180. "%s/v1/signed_vaa/%d/%s/%d",
  181. sdk.PublicRPCEndpoints[0],
  182. vaa.ChainIDSolana,
  183. hex.EncodeToString(addr[:]),
  184. p.Sequence), nil)
  185. if err != nil {
  186. panic(err)
  187. }
  188. resp, err := hc.Do(req)
  189. if err != nil {
  190. log.Fatalf("verify: %v", err)
  191. }
  192. defer resp.Body.Close()
  193. if resp.StatusCode != http.StatusOK {
  194. log.Printf("status %d, retrying", resp.StatusCode)
  195. time.Sleep(5 * time.Second)
  196. continue
  197. } else {
  198. log.Printf("success %d", p.Sequence)
  199. break
  200. }
  201. }
  202. }
  203. }
  204. skip:
  205. before = last
  206. }
  207. }
  208. }
  209. func fetchTxSeq(ctx context.Context, c *rpc.Client, sig solana.Signature) (*rpc.GetTransactionResult, uint64, error) {
  210. maxSupportedTransactionVersion := uint64(0)
  211. params := rpc.GetTransactionOpts{
  212. Encoding: solana.EncodingBase64,
  213. Commitment: rpc.CommitmentConfirmed,
  214. MaxSupportedTransactionVersion: &maxSupportedTransactionVersion,
  215. }
  216. out, err := c.GetTransaction(ctx, sig, &params)
  217. if err != nil {
  218. return nil, 0, fmt.Errorf("GetConfirmedTransaction: %v", err)
  219. }
  220. for _, msg := range out.Meta.LogMessages {
  221. if strings.HasPrefix(msg, "Program log: Sequence:") {
  222. seq := msg[23:]
  223. seqInt, err := strconv.Atoi(seq)
  224. if err != nil {
  225. log.Printf("failed to parse seq %s: %v", seq, err)
  226. continue
  227. }
  228. return out, uint64(seqInt), nil // #nosec G115 -- The sequence number cannot exceed a uint64
  229. }
  230. }
  231. return nil, 0, nil
  232. }
  233. func process(out *rpc.GetTransactionResult) (*solana.PublicKey, error) {
  234. program, err := solana.PublicKeyFromBase58(*solanaAddr)
  235. if err != nil {
  236. log.Fatalf("Invalid program address: %v", err)
  237. return nil, err
  238. }
  239. tx, err := out.Transaction.GetTransaction()
  240. if err != nil {
  241. log.Fatalf("Failed to unmarshal transaction: %v", err)
  242. return nil, err
  243. }
  244. signature := tx.Signatures[0]
  245. var programIndex uint16
  246. for n, key := range tx.Message.AccountKeys {
  247. if key.Equals(program) {
  248. programIndex = uint16(n) // #nosec G115 -- The solana runtime can only support 64 accounts per transaction max
  249. }
  250. }
  251. if programIndex == 0 {
  252. return nil, nil
  253. }
  254. log.Printf("found Wormhole tx in %s", signature)
  255. txs := make([]solana.CompiledInstruction, 0, len(tx.Message.Instructions))
  256. txs = append(txs, tx.Message.Instructions...)
  257. for _, inner := range out.Meta.InnerInstructions {
  258. txs = append(txs, inner.Instructions...)
  259. }
  260. for _, inst := range txs {
  261. if inst.ProgramIDIndex != programIndex {
  262. continue
  263. }
  264. if inst.Data[0] != postMessageInstructionID {
  265. continue
  266. }
  267. acc := tx.Message.AccountKeys[inst.Accounts[1]]
  268. return &acc, nil
  269. }
  270. return nil, nil
  271. }