| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- from time import time, sleep
- from typing import List, Tuple, Dict, Any, Optional, Union
- from base64 import b64decode
- import base64
- import random
- import hashlib
- from algosdk.v2client.algod import AlgodClient
- from algosdk.kmd import KMDClient
- from algosdk import account, mnemonic
- from algosdk.future import transaction
- from algosdk.encoding import decode_address
- from pyteal import compileTeal, Mode, Expr
- from pyteal import *
- from algosdk.logic import get_application_address
- import pprint
- # Q5XDfcbiqiBwfMlY3gO1Mb0vyNCO+szD3v9azhrG16iO5Z5aTduNzeut/FLG0NOG0+txrBGN6lhi5iwytgkyKg==
- # position atom discover cluster fiction amused toe siren slam author surround spread garage craft isolate whisper kangaroo kitchen lend toss culture various effort absent kidney
- class Account:
- """Represents a private key and address for an Algorand account"""
- def __init__(self, privateKey: str) -> None:
- self.sk = privateKey
- self.addr = account.address_from_private_key(privateKey)
- # print (privateKey + " -> " + self.getMnemonic())
- def getAddress(self) -> str:
- return self.addr
- def getPrivateKey(self) -> str:
- return self.sk
- def getMnemonic(self) -> str:
- return mnemonic.from_private_key(self.sk)
- @classmethod
- def FromMnemonic(cls, m: str) -> "Account":
- return cls(mnemonic.to_private_key(m))
- class PendingTxnResponse:
- def __init__(self, response: Dict[str, Any]) -> None:
- self.poolError: str = response["pool-error"]
- self.txn: Dict[str, Any] = response["txn"]
- self.applicationIndex: Optional[int] = response.get("application-index")
- self.assetIndex: Optional[int] = response.get("asset-index")
- self.closeRewards: Optional[int] = response.get("close-rewards")
- self.closingAmount: Optional[int] = response.get("closing-amount")
- self.confirmedRound: Optional[int] = response.get("confirmed-round")
- self.globalStateDelta: Optional[Any] = response.get("global-state-delta")
- self.localStateDelta: Optional[Any] = response.get("local-state-delta")
- self.receiverRewards: Optional[int] = response.get("receiver-rewards")
- self.senderRewards: Optional[int] = response.get("sender-rewards")
- self.innerTxns: List[Any] = response.get("inner-txns", [])
- self.logs: List[bytes] = [b64decode(l) for l in response.get("logs", [])]
- class Setup:
- def __init__(self) -> None:
- self.ALGOD_ADDRESS = "http://localhost:4001"
- self.ALGOD_TOKEN = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
- self.FUNDING_AMOUNT = 100_000_000
- self.KMD_ADDRESS = "http://localhost:4002"
- self.KMD_TOKEN = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
- self.KMD_WALLET_NAME = "unencrypted-default-wallet"
- self.KMD_WALLET_PASSWORD = ""
- self.TARGET_ACCOUNT = "position atom discover cluster fiction amused toe siren slam author surround spread garage craft isolate whisper kangaroo kitchen lend toss culture various effort absent kidney"
- self.kmdAccounts : Optional[List[Account]] = None
- self.accountList : List[Account] = []
- self.APPROVAL_PROGRAM = b""
- self.CLEAR_STATE_PROGRAM = b""
- def waitForTransaction(
- self, client: AlgodClient, txID: str, timeout: int = 10
- ) -> PendingTxnResponse:
- lastStatus = client.status()
- lastRound = lastStatus["last-round"]
- startRound = lastRound
-
- while lastRound < startRound + timeout:
- pending_txn = client.pending_transaction_info(txID)
-
- if pending_txn.get("confirmed-round", 0) > 0:
- return PendingTxnResponse(pending_txn)
-
- if pending_txn["pool-error"]:
- raise Exception("Pool error: {}".format(pending_txn["pool-error"]))
-
- lastStatus = client.status_after_block(lastRound + 1)
-
- lastRound += 1
-
- raise Exception(
- "Transaction {} not confirmed after {} rounds".format(txID, timeout)
- )
- def getKmdClient(self) -> KMDClient:
- return KMDClient(self.KMD_TOKEN, self.KMD_ADDRESS)
-
- def getGenesisAccounts(self) -> List[Account]:
- if self.kmdAccounts is None:
- kmd = self.getKmdClient()
-
- wallets = kmd.list_wallets()
- walletID = None
- for wallet in wallets:
- if wallet["name"] == self.KMD_WALLET_NAME:
- walletID = wallet["id"]
- break
-
- if walletID is None:
- raise Exception("Wallet not found: {}".format(self.KMD_WALLET_NAME))
-
- walletHandle = kmd.init_wallet_handle(walletID, self.KMD_WALLET_PASSWORD)
-
- try:
- addresses = kmd.list_keys(walletHandle)
- privateKeys = [
- kmd.export_key(walletHandle, self.KMD_WALLET_PASSWORD, addr)
- for addr in addresses
- ]
- self.kmdAccounts = [Account(sk) for sk in privateKeys]
- finally:
- kmd.release_wallet_handle(walletHandle)
-
- return self.kmdAccounts
-
- def getTargetAccount(self) -> Account:
- return Account.FromMnemonic(self.TARGET_ACCOUNT)
- def fundTargetAccount(self, client: AlgodClient, target: Account):
- print("fundTargetAccount")
- genesisAccounts = self.getGenesisAccounts()
- suggestedParams = client.suggested_params()
-
- for fundingAccount in genesisAccounts:
- txn = transaction.PaymentTxn(
- sender=fundingAccount.getAddress(),
- receiver=target.getAddress(),
- amt=self.FUNDING_AMOUNT,
- sp=suggestedParams,
- )
- pprint.pprint(txn)
- print("signing txn")
- stxn = txn.sign(fundingAccount.getPrivateKey())
- print("sending txn")
- client.send_transaction(stxn)
- print("waiting for txn")
- self.waitForTransaction(client, stxn.get_txid())
- def getAlgodClient(self) -> AlgodClient:
- return AlgodClient(self.ALGOD_TOKEN, self.ALGOD_ADDRESS)
- def getBalances(self, client: AlgodClient, account: str) -> Dict[int, int]:
- balances: Dict[int, int] = dict()
-
- accountInfo = client.account_info(account)
-
- # set key 0 to Algo balance
- balances[0] = accountInfo["amount"]
-
- assets: List[Dict[str, Any]] = accountInfo.get("assets", [])
- for assetHolding in assets:
- assetID = assetHolding["asset-id"]
- amount = assetHolding["amount"]
- balances[assetID] = amount
-
- return balances
- def setup(self):
- self.client = self.getAlgodClient()
- self.target = self.getTargetAccount()
-
- b = self.getBalances(self.client, self.target.getAddress())
- if (b[0] < 100000000):
- print("Account needs money... funding it")
- self.fundTargetAccount(self.client, self.target)
- print(self.getBalances(self.client, self.target.getAddress()))
- def deploy(self):
- vaa_processor_approval = self.client.compile(open("vaa-processor-approval.teal", "r").read())
- vaa_processor_clear = self.client.compile(open("vaa-processor-clear.teal", "r").read())
- vaa_verify = self.client.compile(open("vaa-verify.teal", "r").read())
- verify_hash = vaa_verify['hash']
- print("verify_hash " + verify_hash + " " + str(len(decode_address(verify_hash))))
- globalSchema = transaction.StateSchema(num_uints=4, num_byte_slices=20)
- localSchema = transaction.StateSchema(num_uints=0, num_byte_slices=0)
-
- app_args = [ "beFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBe", 0, 0 ]
-
- txn = transaction.ApplicationCreateTxn(
- sender=self.target.getAddress(),
- on_complete=transaction.OnComplete.NoOpOC,
- approval_program=b64decode(vaa_processor_approval["result"]),
- clear_program=b64decode(vaa_processor_clear["result"]),
- global_schema=globalSchema,
- local_schema=localSchema,
- app_args=app_args,
- sp=self.client.suggested_params(),
- )
-
- signedTxn = txn.sign(self.target.getPrivateKey())
- self.client.send_transaction(signedTxn)
- response = self.waitForTransaction(self.client, signedTxn.get_txid())
- assert response.applicationIndex is not None and response.applicationIndex > 0
- print("app_id: ", response.applicationIndex)
- appAddr = get_application_address(response.applicationIndex)
- suggestedParams = self.client.suggested_params()
- appCallTxn = transaction.ApplicationCallTxn(
- sender=self.target.getAddress(),
- index=response.applicationIndex,
- on_complete=transaction.OnComplete.NoOpOC,
- app_args=[b"setvphash", decode_address(verify_hash)],
- sp=suggestedParams,
- )
- signedAppCallTxn = appCallTxn.sign(self.target.getPrivateKey())
- self.client.send_transactions([signedAppCallTxn])
- response = self.waitForTransaction(self.client, appCallTxn.get_txid())
- print("set the vp hash to the stateless contract")
- appCallTxn = transaction.PaymentTxn(
- sender=self.target.getAddress(),
- receiver=verify_hash,
- amt=500000,
- sp=suggestedParams,
- )
- signedAppCallTxn = appCallTxn.sign(self.target.getPrivateKey())
- self.client.send_transactions([signedAppCallTxn])
- response = self.waitForTransaction(self.client, appCallTxn.get_txid())
- print("funded the stateless contract")
-
- s = Setup()
- s.setup()
- s.deploy()
|