setup.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. from time import time, sleep
  2. from typing import List, Tuple, Dict, Any, Optional, Union
  3. from base64 import b64decode
  4. import base64
  5. import random
  6. import hashlib
  7. from algosdk.v2client.algod import AlgodClient
  8. from algosdk.kmd import KMDClient
  9. from algosdk import account, mnemonic
  10. from algosdk.future import transaction
  11. from algosdk.encoding import decode_address
  12. from pyteal import compileTeal, Mode, Expr
  13. from pyteal import *
  14. from algosdk.logic import get_application_address
  15. import pprint
  16. # Q5XDfcbiqiBwfMlY3gO1Mb0vyNCO+szD3v9azhrG16iO5Z5aTduNzeut/FLG0NOG0+txrBGN6lhi5iwytgkyKg==
  17. # position atom discover cluster fiction amused toe siren slam author surround spread garage craft isolate whisper kangaroo kitchen lend toss culture various effort absent kidney
  18. class Account:
  19. """Represents a private key and address for an Algorand account"""
  20. def __init__(self, privateKey: str) -> None:
  21. self.sk = privateKey
  22. self.addr = account.address_from_private_key(privateKey)
  23. # print (privateKey + " -> " + self.getMnemonic())
  24. def getAddress(self) -> str:
  25. return self.addr
  26. def getPrivateKey(self) -> str:
  27. return self.sk
  28. def getMnemonic(self) -> str:
  29. return mnemonic.from_private_key(self.sk)
  30. @classmethod
  31. def FromMnemonic(cls, m: str) -> "Account":
  32. return cls(mnemonic.to_private_key(m))
  33. class PendingTxnResponse:
  34. def __init__(self, response: Dict[str, Any]) -> None:
  35. self.poolError: str = response["pool-error"]
  36. self.txn: Dict[str, Any] = response["txn"]
  37. self.applicationIndex: Optional[int] = response.get("application-index")
  38. self.assetIndex: Optional[int] = response.get("asset-index")
  39. self.closeRewards: Optional[int] = response.get("close-rewards")
  40. self.closingAmount: Optional[int] = response.get("closing-amount")
  41. self.confirmedRound: Optional[int] = response.get("confirmed-round")
  42. self.globalStateDelta: Optional[Any] = response.get("global-state-delta")
  43. self.localStateDelta: Optional[Any] = response.get("local-state-delta")
  44. self.receiverRewards: Optional[int] = response.get("receiver-rewards")
  45. self.senderRewards: Optional[int] = response.get("sender-rewards")
  46. self.innerTxns: List[Any] = response.get("inner-txns", [])
  47. self.logs: List[bytes] = [b64decode(l) for l in response.get("logs", [])]
  48. class Setup:
  49. def __init__(self) -> None:
  50. self.ALGOD_ADDRESS = "http://localhost:4001"
  51. self.ALGOD_TOKEN = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
  52. self.FUNDING_AMOUNT = 100_000_000
  53. self.KMD_ADDRESS = "http://localhost:4002"
  54. self.KMD_TOKEN = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
  55. self.KMD_WALLET_NAME = "unencrypted-default-wallet"
  56. self.KMD_WALLET_PASSWORD = ""
  57. self.TARGET_ACCOUNT = "position atom discover cluster fiction amused toe siren slam author surround spread garage craft isolate whisper kangaroo kitchen lend toss culture various effort absent kidney"
  58. self.kmdAccounts : Optional[List[Account]] = None
  59. self.accountList : List[Account] = []
  60. self.APPROVAL_PROGRAM = b""
  61. self.CLEAR_STATE_PROGRAM = b""
  62. def waitForTransaction(
  63. self, client: AlgodClient, txID: str, timeout: int = 10
  64. ) -> PendingTxnResponse:
  65. lastStatus = client.status()
  66. lastRound = lastStatus["last-round"]
  67. startRound = lastRound
  68. while lastRound < startRound + timeout:
  69. pending_txn = client.pending_transaction_info(txID)
  70. if pending_txn.get("confirmed-round", 0) > 0:
  71. return PendingTxnResponse(pending_txn)
  72. if pending_txn["pool-error"]:
  73. raise Exception("Pool error: {}".format(pending_txn["pool-error"]))
  74. lastStatus = client.status_after_block(lastRound + 1)
  75. lastRound += 1
  76. raise Exception(
  77. "Transaction {} not confirmed after {} rounds".format(txID, timeout)
  78. )
  79. def getKmdClient(self) -> KMDClient:
  80. return KMDClient(self.KMD_TOKEN, self.KMD_ADDRESS)
  81. def getGenesisAccounts(self) -> List[Account]:
  82. if self.kmdAccounts is None:
  83. kmd = self.getKmdClient()
  84. wallets = kmd.list_wallets()
  85. walletID = None
  86. for wallet in wallets:
  87. if wallet["name"] == self.KMD_WALLET_NAME:
  88. walletID = wallet["id"]
  89. break
  90. if walletID is None:
  91. raise Exception("Wallet not found: {}".format(self.KMD_WALLET_NAME))
  92. walletHandle = kmd.init_wallet_handle(walletID, self.KMD_WALLET_PASSWORD)
  93. try:
  94. addresses = kmd.list_keys(walletHandle)
  95. privateKeys = [
  96. kmd.export_key(walletHandle, self.KMD_WALLET_PASSWORD, addr)
  97. for addr in addresses
  98. ]
  99. self.kmdAccounts = [Account(sk) for sk in privateKeys]
  100. finally:
  101. kmd.release_wallet_handle(walletHandle)
  102. return self.kmdAccounts
  103. def getTargetAccount(self) -> Account:
  104. return Account.FromMnemonic(self.TARGET_ACCOUNT)
  105. def fundTargetAccount(self, client: AlgodClient, target: Account):
  106. print("fundTargetAccount")
  107. genesisAccounts = self.getGenesisAccounts()
  108. suggestedParams = client.suggested_params()
  109. for fundingAccount in genesisAccounts:
  110. txn = transaction.PaymentTxn(
  111. sender=fundingAccount.getAddress(),
  112. receiver=target.getAddress(),
  113. amt=self.FUNDING_AMOUNT,
  114. sp=suggestedParams,
  115. )
  116. pprint.pprint(txn)
  117. print("signing txn")
  118. stxn = txn.sign(fundingAccount.getPrivateKey())
  119. print("sending txn")
  120. client.send_transaction(stxn)
  121. print("waiting for txn")
  122. self.waitForTransaction(client, stxn.get_txid())
  123. def getAlgodClient(self) -> AlgodClient:
  124. return AlgodClient(self.ALGOD_TOKEN, self.ALGOD_ADDRESS)
  125. def getBalances(self, client: AlgodClient, account: str) -> Dict[int, int]:
  126. balances: Dict[int, int] = dict()
  127. accountInfo = client.account_info(account)
  128. # set key 0 to Algo balance
  129. balances[0] = accountInfo["amount"]
  130. assets: List[Dict[str, Any]] = accountInfo.get("assets", [])
  131. for assetHolding in assets:
  132. assetID = assetHolding["asset-id"]
  133. amount = assetHolding["amount"]
  134. balances[assetID] = amount
  135. return balances
  136. def setup(self):
  137. self.client = self.getAlgodClient()
  138. self.target = self.getTargetAccount()
  139. b = self.getBalances(self.client, self.target.getAddress())
  140. if (b[0] < 100000000):
  141. print("Account needs money... funding it")
  142. self.fundTargetAccount(self.client, self.target)
  143. print(self.getBalances(self.client, self.target.getAddress()))
  144. def deploy(self):
  145. vaa_processor_approval = self.client.compile(open("vaa-processor-approval.teal", "r").read())
  146. vaa_processor_clear = self.client.compile(open("vaa-processor-clear.teal", "r").read())
  147. vaa_verify = self.client.compile(open("vaa-verify.teal", "r").read())
  148. verify_hash = vaa_verify['hash']
  149. print("verify_hash " + verify_hash + " " + str(len(decode_address(verify_hash))))
  150. globalSchema = transaction.StateSchema(num_uints=4, num_byte_slices=20)
  151. localSchema = transaction.StateSchema(num_uints=0, num_byte_slices=0)
  152. app_args = [ "beFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBe", 0, 0 ]
  153. txn = transaction.ApplicationCreateTxn(
  154. sender=self.target.getAddress(),
  155. on_complete=transaction.OnComplete.NoOpOC,
  156. approval_program=b64decode(vaa_processor_approval["result"]),
  157. clear_program=b64decode(vaa_processor_clear["result"]),
  158. global_schema=globalSchema,
  159. local_schema=localSchema,
  160. app_args=app_args,
  161. sp=self.client.suggested_params(),
  162. )
  163. signedTxn = txn.sign(self.target.getPrivateKey())
  164. self.client.send_transaction(signedTxn)
  165. response = self.waitForTransaction(self.client, signedTxn.get_txid())
  166. assert response.applicationIndex is not None and response.applicationIndex > 0
  167. print("app_id: ", response.applicationIndex)
  168. appAddr = get_application_address(response.applicationIndex)
  169. suggestedParams = self.client.suggested_params()
  170. appCallTxn = transaction.ApplicationCallTxn(
  171. sender=self.target.getAddress(),
  172. index=response.applicationIndex,
  173. on_complete=transaction.OnComplete.NoOpOC,
  174. app_args=[b"setvphash", decode_address(verify_hash)],
  175. sp=suggestedParams,
  176. )
  177. signedAppCallTxn = appCallTxn.sign(self.target.getPrivateKey())
  178. self.client.send_transactions([signedAppCallTxn])
  179. response = self.waitForTransaction(self.client, appCallTxn.get_txid())
  180. print("set the vp hash to the stateless contract")
  181. appCallTxn = transaction.PaymentTxn(
  182. sender=self.target.getAddress(),
  183. receiver=verify_hash,
  184. amt=500000,
  185. sp=suggestedParams,
  186. )
  187. signedAppCallTxn = appCallTxn.sign(self.target.getPrivateKey())
  188. self.client.send_transactions([signedAppCallTxn])
  189. response = self.waitForTransaction(self.client, appCallTxn.get_txid())
  190. print("funded the stateless contract")
  191. s = Setup()
  192. s.setup()
  193. s.deploy()