attest.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550
  1. # python3 -m pip install pycryptodomex uvarint pyteal web3 coincurve
  2. import sys
  3. sys.path.append("..")
  4. from admin import PortalCore, Account
  5. from gentest import GenTest
  6. from base64 import b64decode
  7. from typing import List, Tuple, Dict, Any, Optional, Union
  8. import base64
  9. import random
  10. import time
  11. import hashlib
  12. import uuid
  13. import json
  14. from algosdk.v2client.algod import AlgodClient
  15. from algosdk.kmd import KMDClient
  16. from algosdk import account, mnemonic
  17. from algosdk.encoding import decode_address, encode_address
  18. from algosdk.future import transaction
  19. from pyteal import compileTeal, Mode, Expr
  20. from pyteal import *
  21. from algosdk.logic import get_application_address
  22. from vaa_verify import get_vaa_verify
  23. from algosdk.future.transaction import LogicSig
  24. from test_contract import get_test_app
  25. from algosdk.v2client import indexer
  26. import pprint
  27. class AlgoTest(PortalCore):
  28. def __init__(self) -> None:
  29. super().__init__()
  30. def getBalances(self, client: AlgodClient, account: str) -> Dict[int, int]:
  31. balances: Dict[int, int] = dict()
  32. accountInfo = client.account_info(account)
  33. # set key 0 to Algo balance
  34. balances[0] = accountInfo["amount"]
  35. assets: List[Dict[str, Any]] = accountInfo.get("assets", [])
  36. for assetHolding in assets:
  37. assetID = assetHolding["asset-id"]
  38. amount = assetHolding["amount"]
  39. balances[assetID] = amount
  40. return balances
  41. def createTestApp(
  42. self,
  43. client: AlgodClient,
  44. sender: Account,
  45. ) -> int:
  46. approval, clear = get_test_app(client)
  47. globalSchema = transaction.StateSchema(num_uints=4, num_byte_slices=30)
  48. localSchema = transaction.StateSchema(num_uints=0, num_byte_slices=16)
  49. app_args = []
  50. txn = transaction.ApplicationCreateTxn(
  51. sender=sender.getAddress(),
  52. on_complete=transaction.OnComplete.NoOpOC,
  53. approval_program=b64decode(approval["result"]),
  54. clear_program=b64decode(clear["result"]),
  55. global_schema=globalSchema,
  56. local_schema=localSchema,
  57. app_args=app_args,
  58. sp=client.suggested_params(),
  59. )
  60. signedTxn = txn.sign(sender.getPrivateKey())
  61. client.send_transaction(signedTxn)
  62. response = self.waitForTransaction(client, signedTxn.get_txid())
  63. assert response.applicationIndex is not None and response.applicationIndex > 0
  64. txn = transaction.PaymentTxn(sender = sender.getAddress(), sp = client.suggested_params(),
  65. receiver = get_application_address(response.applicationIndex), amt = 300000)
  66. signedTxn = txn.sign(sender.getPrivateKey())
  67. client.send_transaction(signedTxn)
  68. return response.applicationIndex
  69. def parseSeqFromLog(self, txn):
  70. try:
  71. return int.from_bytes(b64decode(txn.innerTxns[-1]["logs"][0]), "big")
  72. except Exception as err:
  73. pprint.pprint(txn.__dict__)
  74. raise
  75. def getVAA(self, client, sender, sid, app):
  76. if sid == None:
  77. raise Exception("getVAA called with a sid of None")
  78. saddr = get_application_address(app)
  79. # SOOO, we send a nop txn through to push the block forward
  80. # one
  81. # This is ONLY needed on a local net... the indexer will sit
  82. # on the last block for 30 to 60 seconds... we don't want this
  83. # log in prod since it is wasteful of gas
  84. if (self.INDEXER_ROUND > 512 and not self.args.testnet): # until they fix it
  85. print("indexer is broken in local net... stop/clean/restart the sandbox")
  86. sys.exit(0)
  87. txns = []
  88. txns.append(
  89. transaction.ApplicationCallTxn(
  90. sender=sender.getAddress(),
  91. index=self.tokenid,
  92. on_complete=transaction.OnComplete.NoOpOC,
  93. app_args=[b"nop"],
  94. sp=client.suggested_params(),
  95. )
  96. )
  97. self.sendTxn(client, sender, txns, False)
  98. if self.myindexer == None:
  99. print("indexer address: " + self.INDEXER_ADDRESS)
  100. self.myindexer = indexer.IndexerClient(indexer_token=self.INDEXER_TOKEN, indexer_address=self.INDEXER_ADDRESS)
  101. while True:
  102. nexttoken = ""
  103. while True:
  104. response = self.myindexer.search_transactions( min_round=self.INDEXER_ROUND, note_prefix=self.NOTE_PREFIX, next_page=nexttoken)
  105. # pprint.pprint(response)
  106. for x in response["transactions"]:
  107. # pprint.pprint(x)
  108. for y in x["inner-txns"]:
  109. if "application-transaction" not in y:
  110. continue
  111. if y["application-transaction"]["application-id"] != self.coreid:
  112. continue
  113. if len(y["logs"]) == 0:
  114. continue
  115. args = y["application-transaction"]["application-args"]
  116. if len(args) < 2:
  117. continue
  118. if base64.b64decode(args[0]) != b'publishMessage':
  119. continue
  120. seq = int.from_bytes(base64.b64decode(y["logs"][0]), "big")
  121. if seq != sid:
  122. continue
  123. if y["sender"] != saddr:
  124. continue;
  125. emitter = decode_address(y["sender"])
  126. payload = base64.b64decode(args[1])
  127. # pprint.pprint([seq, y["sender"], payload.hex()])
  128. # sys.exit(0)
  129. return self.gt.genVaa(emitter, seq, payload)
  130. if 'next-token' in response:
  131. nexttoken = response['next-token']
  132. else:
  133. self.INDEXER_ROUND = response['current-round'] + 1
  134. break
  135. time.sleep(1)
  136. def publishMessage(self, client, sender, vaa, appid):
  137. aa = decode_address(get_application_address(appid)).hex()
  138. emitter_addr = self.optin(client, sender, self.coreid, 0, aa)
  139. txns = []
  140. sp = client.suggested_params()
  141. a = transaction.ApplicationCallTxn(
  142. sender=sender.getAddress(),
  143. index=appid,
  144. on_complete=transaction.OnComplete.NoOpOC,
  145. app_args=[b"test1", vaa, self.coreid],
  146. foreign_apps = [self.coreid],
  147. accounts=[emitter_addr],
  148. sp=sp
  149. )
  150. a.fee = a.fee * 2
  151. txns.append(a)
  152. resp = self.sendTxn(client, sender, txns, True)
  153. self.INDEXER_ROUND = resp.confirmedRound
  154. return self.parseSeqFromLog(resp)
  155. def createTestAsset(self, client, sender):
  156. txns = []
  157. a = transaction.PaymentTxn(
  158. sender = sender.getAddress(),
  159. sp = client.suggested_params(),
  160. receiver = get_application_address(self.testid),
  161. amt = 300000
  162. )
  163. txns.append(a)
  164. sp = client.suggested_params()
  165. a = transaction.ApplicationCallTxn(
  166. sender=sender.getAddress(),
  167. index=self.testid,
  168. on_complete=transaction.OnComplete.NoOpOC,
  169. app_args=[b"setup"],
  170. sp=sp
  171. )
  172. a.fee = a.fee * 2
  173. txns.append(a)
  174. transaction.assign_group_id(txns)
  175. grp = []
  176. pk = sender.getPrivateKey()
  177. for t in txns:
  178. grp.append(t.sign(pk))
  179. client.send_transactions(grp)
  180. resp = self.waitForTransaction(client, grp[-1].get_txid())
  181. aid = int.from_bytes(resp.__dict__["logs"][0], "big")
  182. print("Opting " + sender.getAddress() + " into " + str(aid))
  183. self.asset_optin(client, sender, aid, sender.getAddress())
  184. txns = []
  185. a = transaction.ApplicationCallTxn(
  186. sender=sender.getAddress(),
  187. index=self.testid,
  188. on_complete=transaction.OnComplete.NoOpOC,
  189. app_args=[b"mint"],
  190. foreign_assets = [aid],
  191. sp=sp
  192. )
  193. a.fee = a.fee * 2
  194. txns.append(a)
  195. resp = self.sendTxn(client, sender, txns, True)
  196. # self.INDEXER_ROUND = resp.confirmedRound
  197. return aid
  198. def getCreator(self, client, sender, asset_id):
  199. return client.asset_info(asset_id)["params"]["creator"]
  200. def testAttest(self, client, sender, asset_id):
  201. taddr = get_application_address(self.tokenid)
  202. aa = decode_address(taddr).hex()
  203. emitter_addr = self.optin(client, sender, self.coreid, 0, aa)
  204. creator = self.getCreator(client, sender, asset_id)
  205. c = client.account_info(creator)
  206. wormhole = c.get("auth-addr") == taddr
  207. if not wormhole:
  208. creator = self.optin(client, sender, self.tokenid, asset_id, b"native".hex())
  209. txns = []
  210. sp = client.suggested_params()
  211. txns.append(transaction.ApplicationCallTxn(
  212. sender=sender.getAddress(),
  213. index=self.tokenid,
  214. on_complete=transaction.OnComplete.NoOpOC,
  215. app_args=[b"nop"],
  216. sp=sp
  217. ))
  218. mfee = self.getMessageFee()
  219. if (mfee > 0):
  220. txns.append(transaction.PaymentTxn(sender = sender.getAddress(), sp = sp, receiver = get_application_address(self.tokenid), amt = mfee))
  221. a = transaction.ApplicationCallTxn(
  222. sender=sender.getAddress(),
  223. index=self.tokenid,
  224. on_complete=transaction.OnComplete.NoOpOC,
  225. app_args=[b"attestToken", asset_id],
  226. foreign_apps = [self.coreid],
  227. foreign_assets = [asset_id],
  228. accounts=[emitter_addr, creator, c["address"], get_application_address(self.coreid)],
  229. sp=sp
  230. )
  231. if (mfee > 0):
  232. a.fee = a.fee * 3
  233. else:
  234. a.fee = a.fee * 2
  235. txns.append(a)
  236. resp = self.sendTxn(client, sender, txns, True)
  237. # Point us at the correct round
  238. self.INDEXER_ROUND = resp.confirmedRound
  239. # print(encode_address(resp.__dict__["logs"][0]))
  240. # print(encode_address(resp.__dict__["logs"][1]))
  241. # pprint.pprint(resp.__dict__)
  242. return self.parseSeqFromLog(resp)
  243. def transferAsset(self, client, sender, asset_id, quantity, receiver, chain, fee, payload = None):
  244. # pprint.pprint(["transferAsset", asset_id, quantity, receiver, chain, fee])
  245. taddr = get_application_address(self.tokenid)
  246. aa = decode_address(taddr).hex()
  247. emitter_addr = self.optin(client, sender, self.coreid, 0, aa)
  248. # asset_id 0 is ALGO
  249. if asset_id == 0:
  250. wormhole = False
  251. else:
  252. creator = self.getCreator(client, sender, asset_id)
  253. c = client.account_info(creator)
  254. wormhole = c.get("auth-addr") == taddr
  255. txns = []
  256. mfee = self.getMessageFee()
  257. if (mfee > 0):
  258. txns.append(transaction.PaymentTxn(sender = sender.getAddress(), sp = sp, receiver = get_application_address(self.tokenid), amt = mfee))
  259. if not wormhole:
  260. creator = self.optin(client, sender, self.tokenid, asset_id, b"native".hex())
  261. print("non wormhole account " + creator)
  262. sp = client.suggested_params()
  263. if (asset_id != 0) and (not self.asset_optin_check(client, sender, asset_id, creator)):
  264. print("Looks like we need to optin")
  265. txns.append(
  266. transaction.PaymentTxn(
  267. sender=sender.getAddress(),
  268. receiver=creator,
  269. amt=100000,
  270. sp=sp
  271. )
  272. )
  273. # The tokenid app needs to do the optin since it has signature authority
  274. a = transaction.ApplicationCallTxn(
  275. sender=sender.getAddress(),
  276. index=self.tokenid,
  277. on_complete=transaction.OnComplete.NoOpOC,
  278. app_args=[b"optin", asset_id],
  279. foreign_assets = [asset_id],
  280. accounts=[creator],
  281. sp=sp
  282. )
  283. a.fee = a.fee * 2
  284. txns.append(a)
  285. self.sendTxn(client, sender, txns, False)
  286. txns = []
  287. txns.insert(0,
  288. transaction.ApplicationCallTxn(
  289. sender=sender.getAddress(),
  290. index=self.tokenid,
  291. on_complete=transaction.OnComplete.NoOpOC,
  292. app_args=[b"nop"],
  293. sp=client.suggested_params(),
  294. )
  295. )
  296. if asset_id == 0:
  297. print("asset_id == 0")
  298. txns.append(transaction.PaymentTxn(
  299. sender=sender.getAddress(),
  300. receiver=creator,
  301. amt=quantity,
  302. sp=sp,
  303. ))
  304. accounts=[emitter_addr, creator, creator]
  305. else:
  306. print("asset_id != 0")
  307. txns.append(
  308. transaction.AssetTransferTxn(
  309. sender = sender.getAddress(),
  310. sp = sp,
  311. receiver = creator,
  312. amt = quantity,
  313. index = asset_id
  314. ))
  315. accounts=[emitter_addr, creator, c["address"]]
  316. args = [b"sendTransfer", asset_id, quantity, decode_address(receiver), chain, fee]
  317. if None != payload:
  318. args.append(payload)
  319. #pprint.pprint(args)
  320. # print(self.tokenid)
  321. a = transaction.ApplicationCallTxn(
  322. sender=sender.getAddress(),
  323. index=self.tokenid,
  324. on_complete=transaction.OnComplete.NoOpOC,
  325. app_args=args,
  326. foreign_apps = [self.coreid],
  327. foreign_assets = [asset_id],
  328. accounts=accounts,
  329. sp=sp
  330. )
  331. a.fee = a.fee * 2
  332. txns.append(a)
  333. resp = self.sendTxn(client, sender, txns, True)
  334. self.INDEXER_ROUND = resp.confirmedRound
  335. # pprint.pprint([self.coreid, self.tokenid, resp.__dict__,
  336. # int.from_bytes(resp.__dict__["logs"][1], "big"),
  337. # int.from_bytes(resp.__dict__["logs"][2], "big"),
  338. # int.from_bytes(resp.__dict__["logs"][3], "big"),
  339. # int.from_bytes(resp.__dict__["logs"][4], "big"),
  340. # int.from_bytes(resp.__dict__["logs"][5], "big")
  341. # ])
  342. # print(encode_address(resp.__dict__["logs"][0]))
  343. # print(encode_address(resp.__dict__["logs"][1]))
  344. return self.parseSeqFromLog(resp)
  345. def asset_optin_check(self, client, sender, asset, receiver):
  346. if receiver not in self.asset_cache:
  347. self.asset_cache[receiver] = {}
  348. if asset in self.asset_cache[receiver]:
  349. return True
  350. ai = client.account_info(receiver)
  351. if "assets" in ai:
  352. for x in ai["assets"]:
  353. if x["asset-id"] == asset:
  354. self.asset_cache[receiver][asset] = True
  355. return True
  356. return False
  357. def asset_optin(self, client, sender, asset, receiver):
  358. if self.asset_optin_check(client, sender, asset, receiver):
  359. return
  360. pprint.pprint(["asset_optin", asset, receiver])
  361. sp = client.suggested_params()
  362. optin_txn = transaction.AssetTransferTxn(
  363. sender = sender.getAddress(),
  364. sp = sp,
  365. receiver = receiver,
  366. amt = 0,
  367. index = asset
  368. )
  369. transaction.assign_group_id([optin_txn])
  370. signed_optin = optin_txn.sign(sender.getPrivateKey())
  371. client.send_transactions([signed_optin])
  372. resp = self.waitForTransaction(client, signed_optin.get_txid())
  373. assert self.asset_optin_check(client, sender, asset, receiver), "The optin failed"
  374. print("woah! optin succeeded")
  375. def simple_test(self):
  376. # q = bytes.fromhex(gt.genAssetMeta(gt.guardianPrivKeys, 1, 1, 1, bytes.fromhex("4523c3F29447d1f32AEa95BEBD00383c4640F1b4"), 1, 8, b"USDC", b"CircleCoin"))
  377. # pprint.pprint(self.parseVAA(q))
  378. # sys.exit(0)
  379. # vaa = self.parseVAA(bytes.fromhex("01000000010100e1232697de3681d67ca0c46fbbc9ea5d282c473daae8fda2b23145e7b7167f9a35888acf80ed9d091af3069108c25324a22d8665241db884dda53ca53a8212d100625436600000000100020000000000000000000000003ee18b2214aff97000d974cf647e7c347e8fa585000000000000000120010000000000000000000000000000000000000000000000000000000005f5e1000000000000000000000000000000000000000000000000004523c3F29447d1f32AEa95BEBD00383c4640F1b400020000000000000000000000000000000000000000000000000000aabbcc00080000000000000000000000000000000000000000000000000000000000000000"))
  380. # pprint.pprint(vaa)
  381. # sys.exit(0)
  382. gt = GenTest(True)
  383. self.gt = gt
  384. self.setup_args()
  385. if self.args.testnet:
  386. self.testnet()
  387. client = self.client = self.getAlgodClient()
  388. self.genTeal()
  389. self.vaa_verify = self.client.compile(get_vaa_verify())
  390. self.vaa_verify["lsig"] = LogicSig(base64.b64decode(self.vaa_verify["result"]))
  391. vaaLogs = []
  392. args = self.args
  393. if self.args.mnemonic:
  394. self.foundation = Account.FromMnemonic(self.args.mnemonic)
  395. if self.foundation == None:
  396. print("Generating the foundation account...")
  397. self.foundation = self.getTemporaryAccount(self.client)
  398. if self.foundation == None:
  399. print("We dont have a account? ")
  400. sys.exit(0)
  401. foundation = self.foundation
  402. seq = int(time.time())
  403. self.coreid = 1004
  404. self.tokenid = 1006
  405. player = self.getTemporaryAccount(client)
  406. print("token bridge " + str(self.tokenid) + " address " + get_application_address(self.tokenid))
  407. player2 = self.getTemporaryAccount(client)
  408. player3 = self.getTemporaryAccount(client)
  409. # This creates a asset by using another app... you can also just creat the asset from the client sdk like we do in the typescript test
  410. self.testid = self.createTestApp(client, player2)
  411. print("Lets create a brand new non-wormhole asset and try to attest and send it out")
  412. self.testasset = self.createTestAsset(client, player2)
  413. print("test asset id: " + str(self.testasset))
  414. print("Lets try to create an attest for a non-wormhole thing with a huge number of decimals")
  415. # paul - attestFromAlgorand
  416. sid = self.testAttest(client, player2, self.testasset)
  417. print("... track down the generated VAA")
  418. vaa = self.getVAA(client, player, sid, self.tokenid)
  419. v = self.parseVAA(bytes.fromhex(vaa))
  420. print("We got a " + v["Meta"])
  421. if __name__ == "__main__":
  422. core = AlgoTest()
  423. core.simple_test()