process-transfer.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. package p
  2. import (
  3. "context"
  4. "encoding/binary"
  5. "encoding/hex"
  6. "fmt"
  7. "log"
  8. "math"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "github.com/certusone/wormhole/node/pkg/vaa"
  13. "github.com/cosmos/cosmos-sdk/types/bech32"
  14. "cloud.google.com/go/bigtable"
  15. "github.com/gagliardetto/solana-go"
  16. )
  17. // terra native tokens do not have a bech32 address like cw20s do, handle them manually.
  18. var tokenAddressExceptions = map[string]string{
  19. "0100000000000000000000000000000000000000000000000000000075757364": "ust",
  20. "010000000000000000000000000000000000000000000000000000756c756e61": "uluna",
  21. }
  22. func fetchTokenPrice(chain vaa.ChainID, symbol, address string, timestamp time.Time) (float64, string, string) {
  23. // try coingecko, return if good
  24. // if coingecko does not work, try chain-specific options
  25. // initialize strings that will be returned if we find a symbol/name
  26. // when looking up this token by contract address
  27. foundSymbol := ""
  28. foundName := ""
  29. if symbol == "" && chain == vaa.ChainIDSolana {
  30. // try to lookup the symbol in solana token list, from the address
  31. if token, ok := solanaTokens[address]; ok {
  32. symbol = token.Symbol
  33. foundSymbol = token.Symbol
  34. foundName = token.Name
  35. }
  36. }
  37. coinGeckoId := ""
  38. if _, ok := coinGeckoCoins[strings.ToLower(symbol)]; ok {
  39. tokens := coinGeckoCoins[strings.ToLower(symbol)]
  40. coinGeckoId = tokens[0].Id
  41. }
  42. if coinGeckoId != "" {
  43. price, _ := fetchCoinGeckoPrice(coinGeckoId, timestamp)
  44. if price != 0 {
  45. return price, foundSymbol, foundName
  46. }
  47. }
  48. return float64(0), foundSymbol, foundName
  49. }
  50. // returns a pair of dates before and after the input time.
  51. // useful for creating a time rage for querying historical price APIs.
  52. func rangeFromTime(t time.Time, hours int) (start time.Time, end time.Time) {
  53. duration := time.Duration(hours) * time.Hour
  54. return t.Add(-duration), t.Add(duration)
  55. }
  56. func transformHexAddressToNative(chain vaa.ChainID, address string) string {
  57. switch chain {
  58. case vaa.ChainIDSolana:
  59. addr, err := hex.DecodeString(address)
  60. if err != nil {
  61. panic(fmt.Errorf("failed to decode solana string: %v", err))
  62. }
  63. if len(addr) != 32 {
  64. panic(fmt.Errorf("address must be 32 bytes. address: %v", address))
  65. }
  66. solPk := solana.PublicKeyFromBytes(addr[:])
  67. return solPk.String()
  68. case vaa.ChainIDEthereum,
  69. vaa.ChainIDBSC,
  70. vaa.ChainIDPolygon:
  71. addr := fmt.Sprintf("0x%v", address[(len(address)-40):])
  72. return addr
  73. case vaa.ChainIDTerra:
  74. // handle terra native assets manually
  75. if val, ok := tokenAddressExceptions[address]; ok {
  76. return val
  77. }
  78. trimmed := address[(len(address) - 40):]
  79. data, decodeErr := hex.DecodeString(trimmed)
  80. if decodeErr != nil {
  81. fmt.Printf("failed to decode unpadded string: %v\n", decodeErr)
  82. }
  83. encodedAddr, convertErr := bech32.ConvertAndEncode("terra", data)
  84. if convertErr != nil {
  85. fmt.Println("convert error from cosmos bech32. err", convertErr)
  86. }
  87. return encodedAddr
  88. default:
  89. panic(fmt.Errorf("cannot process address for unknown chain: %v", chain))
  90. }
  91. }
  92. // ProcessTransfer is triggered by a PubSub message, once a TokenTransferPayload is written to a row.
  93. func ProcessTransfer(ctx context.Context, m PubSubMessage) error {
  94. data := string(m.Data)
  95. if data == "" {
  96. return fmt.Errorf("no data to process in message")
  97. }
  98. signedVaa, err := vaa.Unmarshal(m.Data)
  99. if err != nil {
  100. log.Println("failed Unmarshaling VAA")
  101. return err
  102. }
  103. // create the bigtable identifier from the VAA data
  104. rowKey := makeRowKey(signedVaa.EmitterChain, signedVaa.EmitterAddress, signedVaa.Sequence)
  105. row, err := tbl.ReadRow(ctx, rowKey)
  106. if err != nil {
  107. log.Fatalf("Could not read row with key %s: %v", rowKey, err)
  108. }
  109. // get the payload data for this transfer
  110. var tokenAddress string
  111. var tokenChain vaa.ChainID
  112. var amount string
  113. for _, item := range row[columnFamilies[2]] {
  114. switch item.Column {
  115. case "TokenTransferPayload:OriginAddress":
  116. tokenAddress = string(item.Value)
  117. case "TokenTransferPayload:OriginChain":
  118. chainInt, _ := strconv.ParseUint(string(item.Value), 10, 32)
  119. chainID := vaa.ChainID(chainInt)
  120. tokenChain = chainID
  121. case "TokenTransferPayload:Amount":
  122. amount = string(item.Value)
  123. }
  124. }
  125. // lookup the asset meta for this transfer.
  126. // find an AssetMeta message that matches the OriginChain & TokenAddress of the transfer
  127. var result bigtable.Row
  128. chainIDPrefix := fmt.Sprintf("%d", tokenChain) // create a string containing the tokenChain chainID, ie "2"
  129. queryErr := tbl.ReadRows(ctx, bigtable.PrefixRange(chainIDPrefix), func(row bigtable.Row) bool {
  130. result = row
  131. return true
  132. }, bigtable.RowFilter(
  133. bigtable.ChainFilters(
  134. bigtable.FamilyFilter(columnFamilies[3]),
  135. bigtable.ColumnFilter("TokenAddress"),
  136. bigtable.ValueFilter(tokenAddress),
  137. )))
  138. if queryErr != nil {
  139. log.Fatalf("failed to read rows: %v", queryErr)
  140. }
  141. if result == nil {
  142. log.Printf("did not find AssetMeta row for tokenAddress: %v. Transfer rowKey: %v\n", tokenAddress, rowKey)
  143. return fmt.Errorf("did not find AssetMeta row for tokenAddress %v", tokenAddress)
  144. }
  145. // now get the entire row
  146. assetMetaRow, assetMetaErr := tbl.ReadRow(ctx, result.Key(), bigtable.RowFilter(bigtable.LatestNFilter(1)))
  147. if assetMetaErr != nil {
  148. log.Fatalf("Could not read row with key %s: %v", rowKey, assetMetaErr)
  149. }
  150. if _, ok := assetMetaRow[columnFamilies[3]]; !ok {
  151. log.Println("did not find AssetMeta matching TokenAddress", tokenAddress)
  152. return fmt.Errorf("did not find AssetMeta matching TokenAddress %v", tokenAddress)
  153. }
  154. // get AssetMeta values
  155. var decimals int
  156. var symbol string
  157. var name string
  158. for _, item := range assetMetaRow[columnFamilies[3]] {
  159. switch item.Column {
  160. case "AssetMetaPayload:Decimals":
  161. decimalStr := string(item.Value)
  162. dec, err := strconv.Atoi(decimalStr)
  163. if err != nil {
  164. log.Fatalf("failed parsing decimals of row %v", assetMetaRow.Key())
  165. }
  166. decimals = dec
  167. case "AssetMetaPayload:Symbol":
  168. symbol = string(item.Value)
  169. case "AssetMetaPayload:Name":
  170. name = string(item.Value)
  171. }
  172. }
  173. // transfers created by the bridge UI will have at most 8 decimals.
  174. if decimals > 8 {
  175. decimals = 8
  176. }
  177. // ensure amount string is long enough
  178. if len(amount) < decimals {
  179. amount = fmt.Sprintf("%0*v", decimals, amount)
  180. }
  181. intAmount := amount[:len(amount)-decimals]
  182. decAmount := amount[len(amount)-decimals:]
  183. calculatedAmount := intAmount + "." + decAmount
  184. nativeTokenAddress := transformHexAddressToNative(tokenChain, tokenAddress)
  185. timestamp := signedVaa.Timestamp.UTC()
  186. price, foundSymbol, foundName := fetchTokenPrice(tokenChain, symbol, nativeTokenAddress, timestamp)
  187. if price == 0 {
  188. // no price found, don't save
  189. log.Printf("no price for symbol: %v, name: %v, address: %v, at: %v. rowKey: %v\n", symbol, name, nativeTokenAddress, timestamp.String(), rowKey)
  190. return nil
  191. }
  192. // update symbol and name if they are missing
  193. if symbol == "" {
  194. symbol = foundSymbol
  195. }
  196. if name == "" {
  197. name = foundName
  198. }
  199. // convert the amount string so it can be used for math
  200. amountFloat, convErr := strconv.ParseFloat(calculatedAmount, 64)
  201. if convErr != nil {
  202. log.Fatalf("failed parsing calculatedAmount '%v' to float64. err %v", calculatedAmount, convErr)
  203. }
  204. notional := amountFloat * price
  205. notionalStr := fmt.Sprintf("%f", notional)
  206. // write to BigTable
  207. colFam := columnFamilies[5]
  208. mutation := bigtable.NewMutation()
  209. ts := bigtable.Now()
  210. mutation.Set(colFam, "Amount", ts, []byte(calculatedAmount))
  211. mutation.Set(colFam, "Decimals", ts, []byte(fmt.Sprint(decimals)))
  212. var notionalbuf [8]byte
  213. binary.BigEndian.PutUint64(notionalbuf[:], math.Float64bits(notional))
  214. mutation.Set(colFam, "NotionalUSD", ts, notionalbuf[:])
  215. mutation.Set(colFam, "NotionalUSDStr", ts, []byte(notionalStr))
  216. var priceBuf [8]byte
  217. binary.BigEndian.PutUint64(priceBuf[:], math.Float64bits(price))
  218. mutation.Set(colFam, "TokenPriceUSD", ts, priceBuf[:])
  219. mutation.Set(colFam, "TokenPriceUSDStr", ts, []byte(fmt.Sprintf("%f", price)))
  220. mutation.Set(colFam, "TransferTimestamp", ts, []byte(timestamp.String()))
  221. mutation.Set(colFam, "OriginSymbol", ts, []byte(symbol))
  222. mutation.Set(colFam, "OriginName", ts, []byte(name))
  223. mutation.Set(colFam, "OriginTokenAddress", ts, []byte(nativeTokenAddress))
  224. // TODO - find the symbol & name of the asset on the target chain?
  225. // mutation.Set(colFam, "TargetSymbol", ts, []byte())
  226. // mutation.Set(colFam, "TargetName", ts, []byte())
  227. // conditional mutation - don't write if row already has an Amount value.
  228. filter := bigtable.ChainFilters(
  229. bigtable.FamilyFilter(colFam),
  230. bigtable.ColumnFilter("Amount"))
  231. conditionalMutation := bigtable.NewCondMutation(filter, nil, mutation)
  232. writeErr := tbl.Apply(ctx, rowKey, conditionalMutation)
  233. if writeErr != nil {
  234. log.Printf("Failed to write TokenTransferDetails for %v to BigTable. err: %v\n", rowKey, writeErr)
  235. return writeErr
  236. }
  237. log.Println("done writing TokenTransferDetails to bigtable", rowKey)
  238. // success
  239. return nil
  240. }