process-vaa.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. package p
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/binary"
  6. "encoding/hex"
  7. "fmt"
  8. "log"
  9. "cloud.google.com/go/bigtable"
  10. "cloud.google.com/go/pubsub"
  11. "github.com/certusone/wormhole/node/pkg/vaa"
  12. "github.com/holiman/uint256"
  13. )
  14. type PubSubMessage struct {
  15. Data []byte `json:"data"`
  16. }
  17. // The keys are emitterAddress hex values, so that we can quickly check a message against the index to see if it
  18. // meets the criteria for saving payload info: if it is a token transfer, or an NFT transfer.
  19. var NFTEmitters = map[string]string{
  20. // mainnet
  21. "0def15a24423e1edd1a5ab16f557b9060303ddbab8c803d2ee48f4b78a1cfd6b": "WnFt12ZrnzZrFZkt2xsNsaNWoQribnuQ5B5FrDbwDhD", // solana
  22. "0000000000000000000000006ffd7ede62328b3af38fcd61461bbfc52f5651fe": "0x6FFd7EdE62328b3Af38FCD61461Bbfc52F5651fE", // ethereum
  23. "0000000000000000000000005a58505a96d1dbf8df91cb21b54419fc36e93fde": "0x5a58505a96D1dbf8dF91cB21B54419FC36e93fdE", // bsc
  24. "00000000000000000000000090bbd86a6fe93d3bc3ed6335935447e75fab7fcf": "0x90bbd86a6fe93d3bc3ed6335935447e75fab7fcf", // polygon
  25. // devnet
  26. "96ee982293251b48729804c8e8b24b553eb6b887867024948d2236fd37a577ab": "NFTWqJR8YnRVqPDvTJrYuLrQDitTG5AScqbeghi4zSA", // solana
  27. "00000000000000000000000026b4afb60d6c903165150c6f0aa14f8016be4aec": "0x26b4afb60d6c903165150c6f0aa14f8016be4aec", // ethereum
  28. }
  29. var TokenTransferEmitters = map[string]string{
  30. // mainnet
  31. "ec7372995d5cc8732397fb0ad35c0121e0eaa90d26f828a534cab54391b3a4f5": "wormDTUJ6AWPNvk59vGQbDvGJmqbDTdgWgAqcLBCgUb", // solana
  32. "0000000000000000000000003ee18b2214aff97000d974cf647e7c347e8fa585": "0x3ee18B2214AFF97000D974cf647E7C347E8fa585", // ethereum
  33. "0000000000000000000000007cf7b764e38a0a5e967972c1df77d432510564e2": "terra10nmmwe8r3g99a9newtqa7a75xfgs2e8z87r2sf", // terra
  34. "000000000000000000000000b6f6d86a8f9879a9c87f643768d9efc38c1da6e7": "0xB6F6D86a8f9879A9c87f643768d9efc38c1Da6E7", // bsc
  35. "0000000000000000000000005a58505a96d1dbf8df91cb21b54419fc36e93fde": "0x5a58505a96d1dbf8df91cb21b54419fc36e93fde", // polygon
  36. // devnet
  37. "c69a1b1a65dd336bf1df6a77afb501fc25db7fc0938cb08595a9ef473265cb4f": "B6RHG3mfcckmrYN1UhmJzyS1XX3fZKbkeUcpJe9Sy3FE", // solana
  38. "0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16": "0x0290fb167208af455bb137780163b7b7a9a10c16", // EVM chains
  39. "000000000000000000000000784999135aaa8a3ca5914468852fdddbddd8789d": "terra10pyejy66429refv3g35g2t7am0was7ya7kz2a4", // terra
  40. }
  41. // this address is an emitter for BSC and Polygon.
  42. var sharedEmitterAddress = "0000000000000000000000005a58505a96d1dbf8df91cb21b54419fc36e93fde"
  43. type (
  44. TokenTransfer struct {
  45. PayloadId uint8
  46. Amount uint256.Int
  47. OriginAddress [32]byte
  48. OriginChain uint16
  49. TargetAddress [32]byte
  50. TargetChain uint16
  51. }
  52. NFTTransfer struct {
  53. PayloadId uint8
  54. OriginAddress [32]byte
  55. OriginChain uint16
  56. Symbol [32]byte
  57. Name [32]byte
  58. TokenId uint256.Int
  59. URI []byte
  60. TargetAddress [32]byte
  61. TargetChain uint16
  62. }
  63. AssetMeta struct {
  64. PayloadId uint8
  65. TokenAddress [32]byte
  66. TokenChain uint16
  67. Decimals uint8
  68. Symbol [32]byte
  69. Name [32]byte
  70. }
  71. )
  72. func DecodeTokenTransfer(data []byte) (*TokenTransfer, error) {
  73. tt := &TokenTransfer{}
  74. tt.PayloadId = data[0]
  75. reader := bytes.NewReader(data[1:])
  76. if err := binary.Read(reader, binary.BigEndian, &tt.Amount); err != nil {
  77. return nil, fmt.Errorf("failed to read Amount: %w", err)
  78. }
  79. if err := binary.Read(reader, binary.BigEndian, &tt.OriginAddress); err != nil {
  80. return nil, fmt.Errorf("failed to read OriginAddress: %w", err)
  81. }
  82. if err := binary.Read(reader, binary.BigEndian, &tt.OriginChain); err != nil {
  83. return nil, fmt.Errorf("failed to read OriginChain: %w", err)
  84. }
  85. if err := binary.Read(reader, binary.BigEndian, &tt.TargetAddress); err != nil {
  86. return nil, fmt.Errorf("failed to read TargetAddress: %w", err)
  87. }
  88. if err := binary.Read(reader, binary.BigEndian, &tt.TargetChain); err != nil {
  89. return nil, fmt.Errorf("failed to read TargetChain: %w", err)
  90. }
  91. return tt, nil
  92. }
  93. func DecodeNFTTransfer(data []byte) (*NFTTransfer, error) {
  94. nt := &NFTTransfer{}
  95. nt.PayloadId = data[0]
  96. reader := bytes.NewReader(data[1:])
  97. if err := binary.Read(reader, binary.BigEndian, &nt.OriginAddress); err != nil {
  98. return nil, fmt.Errorf("failed to read OriginAddress: %w", err)
  99. }
  100. if err := binary.Read(reader, binary.BigEndian, &nt.OriginChain); err != nil {
  101. return nil, fmt.Errorf("failed to read OriginChain: %w", err)
  102. }
  103. if err := binary.Read(reader, binary.BigEndian, &nt.Symbol); err != nil {
  104. return nil, fmt.Errorf("failed to read Symbol: %w", err)
  105. }
  106. if err := binary.Read(reader, binary.BigEndian, &nt.Name); err != nil {
  107. return nil, fmt.Errorf("failed to read Name: %w", err)
  108. }
  109. if err := binary.Read(reader, binary.BigEndian, &nt.TokenId); err != nil {
  110. return nil, fmt.Errorf("failed to read TokenId: %w", err)
  111. }
  112. // uri len
  113. uriLen, er := reader.ReadByte()
  114. if er != nil {
  115. return nil, fmt.Errorf("failed to read URI length")
  116. }
  117. // uri
  118. uri := make([]byte, int(uriLen))
  119. n, err := reader.Read(uri)
  120. if err != nil || n == 0 {
  121. return nil, fmt.Errorf("failed to read uri [%d]: %w", n, err)
  122. }
  123. nt.URI = uri[:n]
  124. if err := binary.Read(reader, binary.BigEndian, &nt.TargetAddress); err != nil {
  125. return nil, fmt.Errorf("failed to read : %w", err)
  126. }
  127. if err := binary.Read(reader, binary.BigEndian, &nt.TargetChain); err != nil {
  128. return nil, fmt.Errorf("failed to read : %w", err)
  129. }
  130. return nt, nil
  131. }
  132. func DecodeAssetMeta(data []byte) (*AssetMeta, error) {
  133. am := &AssetMeta{}
  134. am.PayloadId = data[0]
  135. reader := bytes.NewReader(data[1:])
  136. tokenAddress := [32]byte{}
  137. if n, err := reader.Read(tokenAddress[:]); err != nil || n != 32 {
  138. return nil, fmt.Errorf("failed to read TokenAddress [%d]: %w", n, err)
  139. }
  140. am.TokenAddress = tokenAddress
  141. if err := binary.Read(reader, binary.BigEndian, &am.TokenChain); err != nil {
  142. return nil, fmt.Errorf("failed to read TokenChain: %w", err)
  143. }
  144. if err := binary.Read(reader, binary.BigEndian, &am.Decimals); err != nil {
  145. return nil, fmt.Errorf("failed to read Decimals: %w", err)
  146. }
  147. if err := binary.Read(reader, binary.BigEndian, &am.Symbol); err != nil {
  148. return nil, fmt.Errorf("failed to read Symbol: %w", err)
  149. }
  150. if err := binary.Read(reader, binary.BigEndian, &am.Name); err != nil {
  151. return nil, fmt.Errorf("failed to read Name: %w", err)
  152. }
  153. return am, nil
  154. }
  155. // TEMP: until this https://forge.certus.one/c/wormhole/+/1850 lands
  156. func makeRowKey(emitterChain vaa.ChainID, emitterAddress vaa.Address, sequence uint64) string {
  157. // left-pad the sequence with zeros to 16 characters, because bigtable keys are stored lexicographically
  158. return fmt.Sprintf("%d:%s:%016d", emitterChain, emitterAddress, sequence)
  159. }
  160. func writePayloadToBigTable(ctx context.Context, rowKey string, colFam string, mutation *bigtable.Mutation) error {
  161. filter := bigtable.ChainFilters(
  162. bigtable.FamilyFilter(colFam),
  163. bigtable.ColumnFilter("PayloadId"))
  164. conditionalMutation := bigtable.NewCondMutation(filter, nil, mutation)
  165. err := tbl.Apply(ctx, rowKey, conditionalMutation)
  166. if err != nil {
  167. log.Printf("Failed to write payload for %v to BigTable. err: %v", rowKey, err)
  168. return err
  169. }
  170. return nil
  171. }
  172. func TrimUnicodeFromByteArray(b []byte) []byte {
  173. // Escaped Unicode that has been observed in payload's token names and symbol:
  174. null := "\u0000"
  175. start := "\u0002"
  176. ack := "\u0006"
  177. tab := "\u0009"
  178. control := "\u0012"
  179. return bytes.Trim(b, null+start+ack+tab+control)
  180. }
  181. // ProcessVAA is triggered by a PubSub message, emitted after row is saved to BigTable by guardiand
  182. func ProcessVAA(ctx context.Context, m PubSubMessage) error {
  183. data := string(m.Data)
  184. if data == "" {
  185. return fmt.Errorf("no data to process in message")
  186. }
  187. signedVaa, err := vaa.Unmarshal(m.Data)
  188. if err != nil {
  189. log.Println("failed Unmarshaling VAA")
  190. return err
  191. }
  192. // create the bigtable identifier from the VAA data
  193. rowKey := makeRowKey(signedVaa.EmitterChain, signedVaa.EmitterAddress, signedVaa.Sequence)
  194. emitterHex := signedVaa.EmitterAddress.String()
  195. payloadId := int(signedVaa.Payload[0])
  196. // BSC and Polygon have the same contract address: "0x5a58505a96d1dbf8df91cb21b54419fc36e93fde".
  197. // The BSC contract is the NFT emitter address.
  198. // The Polygon contract is the token transfer emitter address.
  199. // Due to that, ensure that the block below only runs for token transfers by checking for chain == 4 and emitter addaress.
  200. if _, ok := TokenTransferEmitters[emitterHex]; ok && !(signedVaa.EmitterChain == 4 && signedVaa.EmitterAddress.String() == sharedEmitterAddress) {
  201. // figure out if it's a transfer or asset metadata
  202. if payloadId == 1 {
  203. // token transfer
  204. payload, decodeErr := DecodeTokenTransfer(signedVaa.Payload)
  205. if decodeErr != nil {
  206. log.Println("failed decoding payload for row ", rowKey)
  207. return decodeErr
  208. }
  209. // save payload to bigtable, then publish a new PubSub message for further processing
  210. colFam := columnFamilies[2]
  211. mutation := bigtable.NewMutation()
  212. ts := bigtable.Now()
  213. mutation.Set(colFam, "PayloadId", ts, []byte(fmt.Sprint(payload.PayloadId)))
  214. // TODO: find a better way of representing amount as a string
  215. amount := []byte(fmt.Sprint(payload.Amount[3]))
  216. if payload.Amount[2] != 0 {
  217. log.Printf("payload.Amount is larger than uint64 for row %v", rowKey)
  218. amount = payload.Amount.Bytes()
  219. }
  220. mutation.Set(colFam, "Amount", ts, amount)
  221. mutation.Set(colFam, "OriginAddress", ts, []byte(hex.EncodeToString(payload.OriginAddress[:])))
  222. mutation.Set(colFam, "OriginChain", ts, []byte(fmt.Sprint(payload.OriginChain)))
  223. mutation.Set(colFam, "TargetAddress", ts, []byte(hex.EncodeToString(payload.TargetAddress[:])))
  224. mutation.Set(colFam, "TargetChain", ts, []byte(fmt.Sprint(payload.TargetChain)))
  225. writeErr := writePayloadToBigTable(ctx, rowKey, colFam, mutation)
  226. if writeErr != nil {
  227. return writeErr
  228. }
  229. // now that the payload is saved to BigTable,
  230. // pass along the message to the topic that will calculate TokenTransferDetails
  231. pubSubTokenTransferDetailsTopic.Publish(ctx, &pubsub.Message{Data: m.Data})
  232. } else if payloadId == 2 {
  233. // asset meta
  234. payload, decodeErr := DecodeAssetMeta(signedVaa.Payload)
  235. if decodeErr != nil {
  236. log.Println("failed decoding payload for row ", rowKey)
  237. return decodeErr
  238. }
  239. // save payload to bigtable
  240. colFam := columnFamilies[3]
  241. mutation := bigtable.NewMutation()
  242. ts := bigtable.Now()
  243. mutation.Set(colFam, "PayloadId", ts, []byte(fmt.Sprint(payload.PayloadId)))
  244. mutation.Set(colFam, "TokenAddress", ts, []byte(hex.EncodeToString(payload.TokenAddress[:])))
  245. mutation.Set(colFam, "TokenChain", ts, []byte(fmt.Sprint(payload.TokenChain)))
  246. mutation.Set(colFam, "Decimals", ts, []byte(fmt.Sprint(payload.Decimals)))
  247. mutation.Set(colFam, "Name", ts, TrimUnicodeFromByteArray(payload.Name[:]))
  248. mutation.Set(colFam, "Symbol", ts, TrimUnicodeFromByteArray(payload.Symbol[:]))
  249. writeErr := writePayloadToBigTable(ctx, rowKey, colFam, mutation)
  250. if writeErr != nil {
  251. log.Println("wrote TokenTransferPayload to bigtable!", rowKey)
  252. }
  253. return writeErr
  254. } else {
  255. // unknown payload type
  256. log.Println("encountered unknown payload type for row ", rowKey)
  257. return nil
  258. }
  259. } else if _, ok := NFTEmitters[emitterHex]; ok {
  260. if payloadId == 1 {
  261. // NFT transfer
  262. payload, decodeErr := DecodeNFTTransfer(signedVaa.Payload)
  263. if decodeErr != nil {
  264. log.Println("failed decoding payload for row ", rowKey)
  265. return decodeErr
  266. }
  267. // save payload to bigtable
  268. colFam := columnFamilies[4]
  269. mutation := bigtable.NewMutation()
  270. ts := bigtable.Now()
  271. mutation.Set(colFam, "PayloadId", ts, []byte(fmt.Sprint(payload.PayloadId)))
  272. mutation.Set(colFam, "OriginAddress", ts, []byte(hex.EncodeToString(payload.OriginAddress[:])))
  273. mutation.Set(colFam, "OriginChain", ts, []byte(fmt.Sprint(payload.OriginChain)))
  274. mutation.Set(colFam, "Symbol", ts, TrimUnicodeFromByteArray(payload.Symbol[:]))
  275. mutation.Set(colFam, "Name", ts, TrimUnicodeFromByteArray(payload.Name[:]))
  276. mutation.Set(colFam, "TokenId", ts, payload.TokenId.Bytes())
  277. mutation.Set(colFam, "URI", ts, TrimUnicodeFromByteArray(payload.URI))
  278. mutation.Set(colFam, "TargetAddress", ts, []byte(hex.EncodeToString(payload.TargetAddress[:])))
  279. mutation.Set(colFam, "TargetChain", ts, []byte(fmt.Sprint(payload.TargetChain)))
  280. writeErr := writePayloadToBigTable(ctx, rowKey, colFam, mutation)
  281. if writeErr == nil {
  282. log.Println("wrote NFTTransferPayload to bigtable!", rowKey)
  283. }
  284. return writeErr
  285. } else {
  286. // unknown payload type
  287. log.Println("encountered unknown payload type for row ", rowKey)
  288. return nil
  289. }
  290. }
  291. // this is not a payload we are ready to decode & save. return success
  292. return nil
  293. }