Python Examples¶
Complete, runnable Python scripts demonstrating common Dilithia SDK workflows. Each scenario is self-contained and covers a realistic use case from start to finish.
Prerequisites¶
Requires Python 3.11 or later.
Scenario 1: Balance Monitor Bot¶
A bot that connects to a Dilithia node, recovers its wallet from a saved mnemonic, checks its balance, and if the balance exceeds a threshold, sends tokens to a destination address via a contract call. Covers: client setup, wallet recovery, balance query, contract call construction, signing, submission, and receipt polling.
Sync version¶
import os
import sys
from dilithia_sdk import (
DilithiaClient,
Balance,
Receipt,
DilithiaError,
load_native_crypto_adapter,
)
RPC_URL: str = "https://rpc.dilithia.network/rpc"
TOKEN_CONTRACT: str = "dil1_token_main"
DESTINATION: str = "dil1_recipient_address"
THRESHOLD: int = 500_000
SEND_AMOUNT: int = 100_000
def main() -> None:
# 1. Initialize client with context manager
with DilithiaClient(RPC_URL, timeout=15.0) as client:
# 2. Recover wallet (crypto adapter loaded separately)
crypto = load_native_crypto_adapter()
if crypto is None:
raise DilithiaError("Native crypto adapter unavailable")
mnemonic: str | None = os.environ.get("BOT_MNEMONIC")
if not mnemonic:
print("Set BOT_MNEMONIC env var", file=sys.stderr)
sys.exit(1)
account = crypto.recover_hd_wallet(mnemonic)
print(f"Bot address: {account.address}")
# 3. Check current balance — returns typed Balance with .balance as TokenAmount
balance_result: Balance = client.balance(account.address)
raw_balance: int = balance_result.balance.to_raw()
print(f"Current balance: {balance_result.balance.formatted()}")
if raw_balance < THRESHOLD:
print(f"Balance {raw_balance} below threshold {THRESHOLD}. Nothing to do.")
return
# 4. Build a contract call to transfer tokens
call: dict = client.build_contract_call(
TOKEN_CONTRACT,
"transfer",
{"to": DESTINATION, "amount": SEND_AMOUNT},
)
# 5. Simulate to verify it would succeed
sim_result: dict = client.simulate(call)
print("Simulation result:", sim_result)
# 6. Sign and submit
submitted: dict = client.send_signed_call(call, crypto.signer_for(account))
tx_hash: str = submitted["tx_hash"]
print(f"Transaction submitted: {tx_hash}")
# 7. Poll for receipt — returns typed Receipt
try:
receipt: Receipt = client.wait_for_receipt(
tx_hash, max_attempts=20, delay_seconds=2.0
)
print(f"Confirmed in block {receipt.block_height}, status: {receipt.status}")
except DilithiaError as exc:
print(f"Receipt polling failed: {exc}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
Async version¶
import asyncio
import os
import sys
from dilithia_sdk import (
AsyncDilithiaClient,
Balance,
Receipt,
DilithiaError,
load_async_native_crypto_adapter,
)
RPC_URL: str = "https://rpc.dilithia.network/rpc"
TOKEN_CONTRACT: str = "dil1_token_main"
DESTINATION: str = "dil1_recipient_address"
THRESHOLD: int = 500_000
SEND_AMOUNT: int = 100_000
async def main() -> None:
# 1. Initialize async client with context manager
async with AsyncDilithiaClient(RPC_URL, timeout=15.0) as client:
# 2. Load async crypto adapter and recover wallet
crypto = load_async_native_crypto_adapter()
if crypto is None:
raise DilithiaError("Native crypto adapter unavailable")
mnemonic: str | None = os.environ.get("BOT_MNEMONIC")
if not mnemonic:
print("Set BOT_MNEMONIC env var", file=sys.stderr)
sys.exit(1)
account = await crypto.recover_hd_wallet(mnemonic)
print(f"Bot address: {account.address}")
# 3. Check current balance — returns typed Balance
balance_result: Balance = await client.balance(account.address)
raw_balance: int = balance_result.balance.to_raw()
print(f"Current balance: {balance_result.balance.formatted()}")
if raw_balance < THRESHOLD:
print(f"Balance {raw_balance} below threshold {THRESHOLD}. Nothing to do.")
return
# 4. Build and simulate a contract call
call: dict = client.build_contract_call(
TOKEN_CONTRACT,
"transfer",
{"to": DESTINATION, "amount": SEND_AMOUNT},
)
sim_result: dict = await client.simulate(call)
print("Simulation result:", sim_result)
# 5. Sign and submit
submitted: dict = await client.send_signed_call(
call, crypto.signer_for(account)
)
tx_hash: str = submitted["tx_hash"]
print(f"Transaction submitted: {tx_hash}")
# 6. Poll for receipt — returns typed Receipt
try:
receipt: Receipt = await client.wait_for_receipt(
tx_hash, max_attempts=20, delay_seconds=2.0
)
print(f"Confirmed in block {receipt.block_height}, status: {receipt.status}")
except DilithiaError as exc:
print(f"Receipt polling failed: {exc}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())
Scenario 2: Multi-Account Treasury Manager¶
A service that manages multiple HD wallet accounts derived from a single mnemonic. It derives accounts 0 through 4, checks each balance, and consolidates all funds into account 0. Covers: HD derivation loop, multiple balance queries, and batch transaction construction.
import os
import sys
from dilithia_sdk import (
DilithiaAccount,
DilithiaClient,
Balance,
Receipt,
DilithiaError,
RpcError,
load_native_crypto_adapter,
)
RPC_URL: str = "https://rpc.dilithia.network/rpc"
TOKEN_CONTRACT: str = "dil1_token_main"
NUM_ACCOUNTS: int = 5
def main() -> None:
with DilithiaClient(RPC_URL) as client:
crypto = load_native_crypto_adapter()
if crypto is None:
raise DilithiaError("Native crypto adapter unavailable")
mnemonic: str = os.environ.get("TREASURY_MNEMONIC", "")
if not mnemonic:
print("Set TREASURY_MNEMONIC env var", file=sys.stderr)
sys.exit(1)
# 1. Derive all accounts from the same mnemonic
accounts: list[DilithiaAccount] = []
for i in range(NUM_ACCOUNTS):
acct: DilithiaAccount = crypto.recover_hd_wallet_account(mnemonic, i)
accounts.append(acct)
treasury_address: str = accounts[0].address
print(f"Treasury address (account 0): {treasury_address}")
# 2. Check balances — balance() returns typed Balance
balances: list[int] = []
for acct in accounts:
result: Balance = client.balance(acct.address)
bal: int = result.balance.to_raw()
balances.append(bal)
print(f" Account {acct.account_index}: {acct.address} -> {result.balance.formatted()}")
# 3. Consolidate: transfer from accounts 1-4 to account 0
for i in range(1, NUM_ACCOUNTS):
if balances[i] <= 0:
print(f" Account {i}: zero balance, skipping.")
continue
call: dict = client.build_contract_call(
TOKEN_CONTRACT,
"transfer",
{"to": treasury_address, "amount": balances[i]},
)
print(f" Consolidating {balances[i]} from account {i}...")
try:
submitted: dict = client.send_signed_call(
call, crypto.signer_for(accounts[i])
)
receipt: Receipt = client.wait_for_receipt(submitted["tx_hash"])
print(f" Done. Block: {receipt.block_height}, status: {receipt.status}")
except RpcError as exc:
print(f" RPC error from account {i}: {exc}", file=sys.stderr)
except DilithiaError as exc:
print(f" Transfer from account {i} failed: {exc}", file=sys.stderr)
# 4. Final balance check
final: Balance = client.balance(treasury_address)
print(f"\nTreasury final balance: {final.balance.formatted()}")
if __name__ == "__main__":
main()
Scenario 3: Signature Verification Service¶
An API endpoint that receives a signed message and verifies the signature against the claimed public key and address. Covers: address validation, public key validation, signature verification, and structured error handling.
from typing import Any
from dilithia_sdk import DilithiaError, load_native_crypto_adapter
def verify_signed_message(
public_key: str,
address: str,
message: str,
signature: str,
) -> dict[str, Any]:
"""Verify a signed message against a claimed public key and address.
Returns a dict with ``"valid": True`` on success, or
``"valid": False`` and an ``"error"`` description on failure.
"""
crypto = load_native_crypto_adapter()
if crypto is None:
return {"valid": False, "error": "Crypto adapter unavailable"}
# 1. Validate the public key format
try:
crypto.validate_public_key(public_key)
except DilithiaError:
return {"valid": False, "error": "Invalid public key format"}
# 2. Validate the signature format
try:
crypto.validate_signature(signature)
except DilithiaError:
return {"valid": False, "error": "Invalid signature format"}
# 3. Validate the claimed address format
try:
crypto.validate_address(address)
except DilithiaError:
return {"valid": False, "error": "Invalid address format"}
# 4. Verify that the public key maps to the claimed address
derived_address: str = crypto.address_from_public_key(public_key)
if derived_address != address:
return {"valid": False, "error": "Address does not match public key"}
# 5. Verify the cryptographic signature
is_valid: bool = crypto.verify_message(public_key, message, signature)
if not is_valid:
return {"valid": False, "error": "Signature verification failed"}
return {"valid": True}
if __name__ == "__main__":
result: dict[str, Any] = verify_signed_message(
public_key="abcd1234...",
address="dil1_abc123",
message="Login nonce: 98765",
signature="deadbeef...",
)
if result["valid"]:
print("Signature is valid. User authenticated.")
else:
print(f"Verification failed: {result['error']}")
Scenario 4: Wallet Backup and Recovery¶
Create a new wallet, save the encrypted wallet file to disk, then recover it later from the saved file. Covers the full wallet lifecycle: generate mnemonic, create encrypted wallet file, serialize, write to disk, read from disk, deserialize, and recover.
import json
import os
from pathlib import Path
from dilithia_sdk import DilithiaAccount, DilithiaError, load_native_crypto_adapter
WALLET_PATH: Path = Path("./my-wallet.json")
PASSWORD: str = "my-secure-passphrase"
def main() -> None:
crypto = load_native_crypto_adapter()
if crypto is None:
raise DilithiaError("Native crypto adapter unavailable")
if not WALLET_PATH.exists():
# ---- CREATE NEW WALLET ----
print("No wallet found. Creating a new one...")
# 1. Generate a fresh mnemonic
mnemonic: str = crypto.generate_mnemonic()
print("SAVE THIS MNEMONIC SECURELY:")
print(mnemonic)
print()
# 2. Create an encrypted wallet file from the mnemonic
account: DilithiaAccount = crypto.create_hd_wallet_file_from_mnemonic(
mnemonic, PASSWORD
)
print(f"Address: {account.address}")
print(f"Public key: {account.public_key}")
# 3. Serialize and save to disk
if account.wallet_file is None:
raise DilithiaError("Wallet file not generated")
WALLET_PATH.write_text(
json.dumps(account.wallet_file, indent=2), encoding="utf-8"
)
print(f"Wallet saved to {WALLET_PATH}")
else:
# ---- RECOVER EXISTING WALLET ----
print("Wallet file found. Recovering...")
# 4. Read the wallet file from disk
wallet_file: dict = json.loads(WALLET_PATH.read_text(encoding="utf-8"))
# 5. Recover using mnemonic + password
mnemonic_env: str | None = os.environ.get("WALLET_MNEMONIC")
if not mnemonic_env:
raise DilithiaError("Set WALLET_MNEMONIC env var to recover")
account = crypto.recover_wallet_file(wallet_file, mnemonic_env, PASSWORD)
print(f"Recovered address: {account.address}")
print(f"Public key: {account.public_key}")
print("Wallet recovered successfully. Ready to sign transactions.")
if __name__ == "__main__":
main()
Scenario 5: Gas-Sponsored Meta-Transaction¶
Submit a transaction where the gas fee is paid by a sponsor contract instead of the sender. This is useful for onboarding new users who have no tokens to pay for gas. Covers: gas sponsor connector setup, checking remaining quota, building a paymaster-attached call, signing, and submission.
Sync version¶
import os
import sys
from dilithia_sdk import (
DilithiaClient,
DilithiaGasSponsorConnector,
Receipt,
DilithiaError,
RpcError,
load_native_crypto_adapter,
)
RPC_URL: str = "https://rpc.dilithia.network/rpc"
SPONSOR_CONTRACT: str = "dil1_gas_sponsor_v1"
PAYMASTER_ADDRESS: str = "dil1_paymaster_addr"
TARGET_CONTRACT: str = "dil1_nft_mint"
def main() -> None:
with DilithiaClient(RPC_URL) as client:
crypto = load_native_crypto_adapter()
if crypto is None:
raise DilithiaError("Crypto adapter unavailable")
# 1. Recover the user's wallet (new user with zero balance)
mnemonic: str = os.environ.get("USER_MNEMONIC", "")
if not mnemonic:
print("Set USER_MNEMONIC env var", file=sys.stderr)
sys.exit(1)
account = crypto.recover_hd_wallet(mnemonic)
print(f"User address: {account.address}")
# 2. Set up the gas sponsor connector
sponsor = DilithiaGasSponsorConnector(
client, SPONSOR_CONTRACT, PAYMASTER_ADDRESS
)
# 3. Check if the sponsor will accept this call
accept_query: dict = sponsor.build_accept_query(
account.address, TARGET_CONTRACT, "mint"
)
accept_result: dict = client.query_contract(
SPONSOR_CONTRACT, "accept", accept_query["args"]
)
print("Sponsor accepts:", accept_result)
# 4. Check remaining gas quota for this user
quota_query: dict = sponsor.build_remaining_quota_query(account.address)
quota_result: dict = client.query_contract(
SPONSOR_CONTRACT, "remaining_quota", quota_query["args"]
)
print("Remaining quota:", quota_result)
# 5. Build the actual contract call
call: dict = client.build_contract_call(
TARGET_CONTRACT,
"mint",
{"token_id": "nft_001", "metadata": "ipfs://QmSomeHash"},
)
# 6. Apply the paymaster (sponsor pays the gas)
sponsored_call: dict = sponsor.apply_paymaster(call)
print("Sponsored call:", sponsored_call)
# 7. Sign and submit the sponsored call
try:
submitted: dict = sponsor.send_sponsored_call(
call, crypto.signer_for(account)
)
tx_hash: str = submitted["tx_hash"]
print(f"Sponsored tx submitted: {tx_hash}")
# 8. Wait for confirmation — returns typed Receipt
receipt: Receipt = client.wait_for_receipt(tx_hash)
print(f"Confirmed in block {receipt.block_height}, status: {receipt.status}")
except RpcError as exc:
print(f"RPC error: {exc}", file=sys.stderr)
sys.exit(1)
except DilithiaError as exc:
print(f"Sponsored transaction failed: {exc}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
Async version¶
import asyncio
import os
import sys
from dilithia_sdk import (
AsyncDilithiaClient,
DilithiaGasSponsorConnector,
Receipt,
DilithiaError,
RpcError,
load_async_native_crypto_adapter,
)
RPC_URL: str = "https://rpc.dilithia.network/rpc"
SPONSOR_CONTRACT: str = "dil1_gas_sponsor_v1"
PAYMASTER_ADDRESS: str = "dil1_paymaster_addr"
TARGET_CONTRACT: str = "dil1_nft_mint"
async def main() -> None:
async with AsyncDilithiaClient(RPC_URL) as client:
crypto = load_async_native_crypto_adapter()
if crypto is None:
raise DilithiaError("Crypto adapter unavailable")
mnemonic: str = os.environ.get("USER_MNEMONIC", "")
if not mnemonic:
print("Set USER_MNEMONIC env var", file=sys.stderr)
sys.exit(1)
account = await crypto.recover_hd_wallet(mnemonic)
print(f"User address: {account.address}")
# Set up the gas sponsor connector (works with both sync and async clients)
sponsor = DilithiaGasSponsorConnector(
client, SPONSOR_CONTRACT, PAYMASTER_ADDRESS
)
# Check sponsor acceptance and quota concurrently
accept_query: dict = sponsor.build_accept_query(
account.address, TARGET_CONTRACT, "mint"
)
quota_query: dict = sponsor.build_remaining_quota_query(account.address)
accept_result, quota_result = await asyncio.gather(
client.query_contract(
SPONSOR_CONTRACT, "accept", accept_query["args"]
),
client.query_contract(
SPONSOR_CONTRACT, "remaining_quota", quota_query["args"]
),
)
print("Sponsor accepts:", accept_result)
print("Remaining quota:", quota_result)
# Build, sponsor, sign, and submit
call: dict = client.build_contract_call(
TARGET_CONTRACT,
"mint",
{"token_id": "nft_001", "metadata": "ipfs://QmSomeHash"},
)
try:
submitted: dict = sponsor.send_sponsored_call(
call, crypto.signer_for(account)
)
tx_hash: str = submitted["tx_hash"]
print(f"Sponsored tx submitted: {tx_hash}")
receipt: Receipt = await client.wait_for_receipt(tx_hash)
print(f"Confirmed in block {receipt.block_height}, status: {receipt.status}")
except RpcError as exc:
print(f"RPC error: {exc}", file=sys.stderr)
sys.exit(1)
except DilithiaError as exc:
print(f"Sponsored transaction failed: {exc}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())
Scenario 6: Cross-Chain Message Sender¶
Send a message to another Dilithia chain via the messaging connector. This is useful for bridging data or triggering actions on a remote chain. Covers: messaging connector setup, building outbound messages, querying the outbox, signing, and submission.
import os
import sys
import time
from dilithia_sdk import (
DilithiaClient,
DilithiaMessagingConnector,
Receipt,
DilithiaError,
HttpError,
load_native_crypto_adapter,
)
RPC_URL: str = "https://rpc.dilithia.network/rpc"
MESSAGING_CONTRACT: str = "dil1_bridge_v1"
PAYMASTER: str = "dil1_bridge_paymaster"
DEST_CHAIN: str = "dilithia-testnet-2"
def main() -> None:
with DilithiaClient(RPC_URL) as client:
crypto = load_native_crypto_adapter()
if crypto is None:
raise DilithiaError("Crypto adapter unavailable")
mnemonic: str = os.environ.get("BRIDGE_MNEMONIC", "")
if not mnemonic:
print("Set BRIDGE_MNEMONIC env var", file=sys.stderr)
sys.exit(1)
account = crypto.recover_hd_wallet(mnemonic)
print(f"Sender address: {account.address}")
# 1. Set up the messaging connector
messaging = DilithiaMessagingConnector(
client, MESSAGING_CONTRACT, PAYMASTER
)
# 2. Check current outbox state
outbox: dict = messaging.query_outbox()
print("Current outbox:", outbox)
# 3. Build the cross-chain message
payload: dict = {
"action": "lock_tokens",
"sender": account.address,
"amount": 50_000,
"recipient": "dil1_remote_recipient",
"timestamp": int(time.time() * 1000),
}
message_call: dict = messaging.build_send_message_call(DEST_CHAIN, payload)
print("Message call:", message_call)
# 4. Simulate the message send
sim_result: dict = client.simulate(message_call)
print("Simulation:", sim_result)
# 5. Sign and send the message
try:
submitted: dict = messaging.send_message(
DEST_CHAIN, payload, crypto.signer_for(account)
)
tx_hash: str = submitted["tx_hash"]
print(f"Message tx submitted: {tx_hash}")
# 6. Wait for confirmation — returns typed Receipt
receipt: Receipt = client.wait_for_receipt(tx_hash)
print(f"Message confirmed in block {receipt.block_height}, status: {receipt.status}")
# 7. Verify message appears in outbox
updated_outbox: dict = messaging.query_outbox()
print("Updated outbox:", updated_outbox)
except HttpError as exc:
print(f"HTTP error: {exc}", file=sys.stderr)
sys.exit(1)
except DilithiaError as exc:
print(f"Cross-chain message failed: {exc}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
Scenario 7: Contract Deployment¶
Deploy a WASM smart contract to the Dilithia chain. Reads the WASM binary, hashes the bytecode, builds and signs a canonical deploy payload, assembles the full deploy body, sends the deploy request, and waits for confirmation.
Sync version¶
import json
import os
import sys
from dilithia_sdk import (
DilithiaClient,
Nonce,
Receipt,
DilithiaError,
RpcError,
load_native_crypto_adapter,
read_wasm_file_hex,
)
RPC_URL: str = "https://rpc.dilithia.network/rpc"
CONTRACT_NAME: str = "my_contract"
WASM_PATH: str = "./my_contract.wasm"
CHAIN_ID: str = "dilithia-mainnet"
def main() -> None:
# 1. Initialize client and crypto adapter
with DilithiaClient(RPC_URL, timeout=30.0) as client:
crypto = load_native_crypto_adapter()
if crypto is None:
raise DilithiaError("Native crypto adapter unavailable")
# 2. Recover wallet from mnemonic
mnemonic: str | None = os.environ.get("DEPLOYER_MNEMONIC")
if not mnemonic:
print("Set DEPLOYER_MNEMONIC env var", file=sys.stderr)
sys.exit(1)
account = crypto.recover_hd_wallet(mnemonic)
print(f"Deployer address: {account.address}")
# 3. Read the WASM file as hex
bytecode_hex: str = read_wasm_file_hex(WASM_PATH)
print(f"Bytecode size: {len(bytecode_hex) // 2} bytes")
# 4. Get the current nonce — returns typed Nonce with .next_nonce
nonce_result: Nonce = client.nonce(account.address)
nonce: int = nonce_result.next_nonce
print(f"Current nonce: {nonce}")
# 5. Hash the bytecode hex for the canonical payload
bytecode_hash: str = crypto.hash_hex(bytecode_hex)
print(f"Bytecode hash: {bytecode_hash}")
# 6. Build the canonical deploy payload (keys sorted for deterministic signing)
canonical: dict = client.build_deploy_canonical_payload(
account.address, CONTRACT_NAME, bytecode_hash, nonce, CHAIN_ID
)
print("Canonical payload:", canonical)
# 7. Sign the canonical payload
canonical_json: str = json.dumps(canonical, sort_keys=True)
sig = crypto.sign_message(account.secret_key, canonical_json)
print(f"Signed with algorithm: {sig.algorithm}")
# 8. Assemble the full deploy body
body: dict = client.deploy_contract_body(
name=CONTRACT_NAME,
bytecode=bytecode_hex,
from_addr=account.address,
alg=sig.algorithm,
pk=account.public_key,
sig=sig.signature,
nonce=nonce,
chain_id=CHAIN_ID,
version=1,
)
# 9. Send the deploy request
result: dict = client.deploy_contract(body)
tx_hash: str = result["tx_hash"]
print(f"Deploy tx submitted: {tx_hash}")
# 10. Wait for the receipt — returns typed Receipt
try:
receipt: Receipt = client.wait_for_receipt(
tx_hash, max_attempts=30, delay_seconds=3.0
)
print(f"Contract deployed in block {receipt.block_height}, status: {receipt.status}")
except RpcError as exc:
print(f"RPC error: {exc}", file=sys.stderr)
sys.exit(1)
except DilithiaError as exc:
print(f"Receipt polling failed: {exc}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
Async version¶
import asyncio
import json
import os
import sys
from dilithia_sdk import (
AsyncDilithiaClient,
Nonce,
Receipt,
DilithiaError,
RpcError,
TimeoutError,
load_async_native_crypto_adapter,
read_wasm_file_hex,
)
RPC_URL: str = "https://rpc.dilithia.network/rpc"
CONTRACT_NAME: str = "my_contract"
WASM_PATH: str = "./my_contract.wasm"
CHAIN_ID: str = "dilithia-mainnet"
async def main() -> None:
# 1. Initialize async client and crypto adapter with context manager
async with AsyncDilithiaClient(RPC_URL, timeout=30.0) as client:
crypto = load_async_native_crypto_adapter()
if crypto is None:
raise DilithiaError("Native crypto adapter unavailable")
# 2. Recover wallet from mnemonic
mnemonic: str | None = os.environ.get("DEPLOYER_MNEMONIC")
if not mnemonic:
print("Set DEPLOYER_MNEMONIC env var", file=sys.stderr)
sys.exit(1)
account = await crypto.recover_hd_wallet(mnemonic)
print(f"Deployer address: {account.address}")
# 3. Read the WASM file as hex (sync I/O is fine for a single file)
bytecode_hex: str = read_wasm_file_hex(WASM_PATH)
print(f"Bytecode size: {len(bytecode_hex) // 2} bytes")
# 4. Get the current nonce — returns typed Nonce with .next_nonce
nonce_result: Nonce = await client.nonce(account.address)
nonce: int = nonce_result.next_nonce
print(f"Current nonce: {nonce}")
# 5. Hash the bytecode hex for the canonical payload
bytecode_hash: str = await crypto.hash_hex(bytecode_hex)
print(f"Bytecode hash: {bytecode_hash}")
# 6. Build the canonical deploy payload
canonical: dict = client.build_deploy_canonical_payload(
account.address, CONTRACT_NAME, bytecode_hash, nonce, CHAIN_ID
)
# 7. Sign the canonical payload
canonical_json: str = json.dumps(canonical, sort_keys=True)
sig = await crypto.sign_message(account.secret_key, canonical_json)
# 8. Assemble the full deploy body
body: dict = client.deploy_contract_body(
name=CONTRACT_NAME,
bytecode=bytecode_hex,
from_addr=account.address,
alg=sig.algorithm,
pk=account.public_key,
sig=sig.signature,
nonce=nonce,
chain_id=CHAIN_ID,
version=1,
)
# 9. Send the deploy request
try:
result: dict = await client.deploy_contract(body)
tx_hash: str = result["tx_hash"]
print(f"Deploy tx submitted: {tx_hash}")
# 10. Wait for the receipt — returns typed Receipt
receipt: Receipt = await client.wait_for_receipt(
tx_hash, max_attempts=30, delay_seconds=3.0
)
print(f"Contract deployed in block {receipt.block_height}, status: {receipt.status}")
except TimeoutError as exc:
print(f"Deployment timed out: {exc}", file=sys.stderr)
sys.exit(1)
except RpcError as exc:
print(f"RPC error: {exc}", file=sys.stderr)
sys.exit(1)
except DilithiaError as exc:
print(f"Deployment failed: {exc}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())
Scenario 8: Shielded Pool Deposit & Withdraw¶
Deposit tokens into a shielded pool using a ZK commitment, then withdraw them later by proving knowledge of the secret and spending the nullifier. Covers: ZK adapter setup, commitment computation, shielded deposit, commitment root verification, nullifier computation, shielded withdraw, and receipt polling.
import os
import sys
from dilithia_sdk import (
DilithiaClient,
Receipt,
DilithiaError,
load_native_crypto_adapter,
)
from dilithia_sdk.zk import load_native_zk_adapter
RPC_URL: str = "https://rpc.dilithia.network/rpc"
SHIELDED_POOL: str = "dil1_shielded_pool_v1"
DEPOSIT_AMOUNT: int = 100_000
def main() -> None:
with DilithiaClient(RPC_URL) as client:
crypto = load_native_crypto_adapter()
if crypto is None:
raise DilithiaError("Native crypto adapter unavailable")
zk = load_native_zk_adapter()
if zk is None:
raise DilithiaError("Native ZK adapter unavailable")
# 1. Recover wallet
mnemonic: str = os.environ.get("SHIELDED_MNEMONIC", "")
if not mnemonic:
print("Set SHIELDED_MNEMONIC env var", file=sys.stderr)
sys.exit(1)
account = crypto.recover_hd_wallet(mnemonic)
print(f"Address: {account.address}")
# 2. Generate a secret and nonce for the commitment
secret_hex: str = crypto.hash_hex("my_deposit_secret_entropy")
nonce_hex: str = crypto.hash_hex("my_deposit_nonce_entropy")
# 3. Compute the Poseidon commitment: H(value, secret, nonce)
commitment: str = zk.compute_commitment(DEPOSIT_AMOUNT, secret_hex, nonce_hex)
print(f"Commitment: {commitment}")
# 4. Deposit into the shielded pool
try:
deposit_result: dict = client.shielded_deposit(
SHIELDED_POOL,
DEPOSIT_AMOUNT,
commitment,
crypto.signer_for(account),
)
deposit_tx: str = deposit_result["tx_hash"]
print(f"Deposit tx submitted: {deposit_tx}")
receipt: Receipt = client.wait_for_receipt(
deposit_tx, max_attempts=20, delay_seconds=2.0
)
print(f"Deposit confirmed in block {receipt.block_height}, status: {receipt.status}")
except DilithiaError as exc:
print(f"Deposit failed: {exc}", file=sys.stderr)
sys.exit(1)
# 5. Verify the commitment is included in the Merkle root
root: str = client.get_commitment_root(SHIELDED_POOL)
print(f"Current commitment root: {root}")
# 6. Compute the nullifier to spend this commitment
nullifier: str = zk.compute_nullifier(secret_hex, nonce_hex)
print(f"Nullifier: {nullifier}")
# 7. Check that the nullifier has not been spent yet
already_spent: bool = client.is_nullifier_spent(SHIELDED_POOL, nullifier)
if already_spent:
print("Nullifier already spent. Cannot withdraw.", file=sys.stderr)
sys.exit(1)
print("Nullifier is unspent. Proceeding with withdraw.")
# 8. Generate a preimage proof for the withdrawal
proof = zk.generate_preimage_proof([DEPOSIT_AMOUNT, secret_hex, nonce_hex])
print(f"Proof generated: {len(proof.proof_bytes)} bytes")
# 9. Withdraw from the shielded pool
try:
withdraw_result: dict = client.shielded_withdraw(
SHIELDED_POOL,
DEPOSIT_AMOUNT,
nullifier,
proof.proof_bytes,
account.address,
crypto.signer_for(account),
)
withdraw_tx: str = withdraw_result["tx_hash"]
print(f"Withdraw tx submitted: {withdraw_tx}")
receipt = client.wait_for_receipt(
withdraw_tx, max_attempts=20, delay_seconds=2.0
)
print(f"Withdraw confirmed in block {receipt.block_height}, status: {receipt.status}")
except DilithiaError as exc:
print(f"Withdraw failed: {exc}", file=sys.stderr)
sys.exit(1)
# 10. Confirm the nullifier is now spent
spent_after: bool = client.is_nullifier_spent(SHIELDED_POOL, nullifier)
print(f"Nullifier spent after withdraw: {spent_after}")
if __name__ == "__main__":
main()
Scenario 9: ZK Proof Generation & Verification¶
Standalone zero-knowledge proof operations without any chain interaction. Demonstrates Poseidon hashing, commitment computation, nullifier derivation, preimage proof generation and verification, and range proof generation and verification. Covers: ZK adapter setup, Poseidon hash, commitment and nullifier helpers, preimage proofs, and range proofs.
from dilithia_sdk import DilithiaError, load_native_crypto_adapter
from dilithia_sdk.zk import load_native_zk_adapter
def main() -> None:
crypto = load_native_crypto_adapter()
if crypto is None:
raise DilithiaError("Native crypto adapter unavailable")
zk = load_native_zk_adapter()
if zk is None:
raise DilithiaError("Native ZK adapter unavailable")
# ---- Poseidon Hashing ----
# 1. Hash a list of field elements using the Poseidon hash function
hash_result: str = zk.poseidon_hash([42, 1337, 99])
print(f"Poseidon hash of [42, 1337, 99]: {hash_result}")
# 2. Hash a single value
single_hash: str = zk.poseidon_hash([12345])
print(f"Poseidon hash of [12345]: {single_hash}")
# ---- Commitment & Nullifier ----
# 3. Generate deterministic secret and nonce via the crypto adapter
secret_hex: str = crypto.hash_hex("user_secret_entropy_value")
nonce_hex: str = crypto.hash_hex("user_nonce_entropy_value")
deposit_value: int = 250_000
print(f"\nSecret: {secret_hex}")
print(f"Nonce: {nonce_hex}")
# 4. Compute the commitment: H(value, secret, nonce)
commitment: str = zk.compute_commitment(deposit_value, secret_hex, nonce_hex)
print(f"Commitment: {commitment}")
# 5. Compute the nullifier: H(secret, nonce)
nullifier: str = zk.compute_nullifier(secret_hex, nonce_hex)
print(f"Nullifier: {nullifier}")
# ---- Preimage Proof Generation & Verification ----
# 6. Generate a preimage proof proving knowledge of the inputs that hash to a
# known output, without revealing those inputs
print("\n--- Preimage Proof ---")
preimage_proof = zk.generate_preimage_proof([deposit_value, secret_hex, nonce_hex])
print(f"Proof size: {len(preimage_proof.proof_bytes)} bytes")
print(f"Verification key size: {len(preimage_proof.verification_key)} bytes")
print(f"Public inputs: {preimage_proof.public_inputs}")
# 7. Verify the preimage proof
preimage_valid: bool = zk.verify_preimage_proof(
preimage_proof.proof_bytes,
preimage_proof.verification_key,
preimage_proof.public_inputs,
)
print(f"Preimage proof valid: {preimage_valid}")
if not preimage_valid:
raise DilithiaError("Preimage proof verification failed unexpectedly")
# ---- Range Proof Generation & Verification ----
# 8. Generate a range proof proving that a value lies within [min, max]
# without revealing the exact value
print("\n--- Range Proof ---")
secret_value: int = 500
range_min: int = 100
range_max: int = 1000
range_proof = zk.generate_range_proof(secret_value, range_min, range_max)
print(f"Proof size: {len(range_proof.proof_bytes)} bytes")
print(f"Verification key size: {len(range_proof.verification_key)} bytes")
print(f"Public inputs: {range_proof.public_inputs}")
# 9. Verify the range proof
range_valid: bool = zk.verify_range_proof(
range_proof.proof_bytes,
range_proof.verification_key,
range_proof.public_inputs,
)
print(f"Range proof valid: {range_valid}")
if not range_valid:
raise DilithiaError("Range proof verification failed unexpectedly")
# 10. Demonstrate that a value outside the range would fail (conceptual)
print("\n--- Summary ---")
print(f"Poseidon hash: {hash_result}")
print(f"Commitment: {commitment}")
print(f"Nullifier: {nullifier}")
print(f"Preimage proof OK: {preimage_valid}")
print(f"Range proof OK: {range_valid}")
print("All ZK operations completed successfully.")
if __name__ == "__main__":
main()
Scenario 10: Name Service & Identity Profile¶
A utility that registers a .dili name, configures profile records, resolves names, and demonstrates the full name service lifecycle. Covers: getRegistrationCost, isNameAvailable, registerName, setNameTarget, setNameRecord, getNameRecords, lookupName, resolveName, reverseResolveName, getNamesByOwner, renewName, transferName, releaseName.
import os
import sys
from dilithia_sdk import (
DilithiaClient,
Receipt,
DilithiaError,
load_native_crypto_adapter,
)
RPC_URL: str = "https://rpc.dilithia.network/rpc"
NAME: str = "alice"
TRANSFER_TO: str = "dil1_bob_address"
def main() -> None:
with DilithiaClient(RPC_URL) as client:
crypto = load_native_crypto_adapter()
if crypto is None:
raise DilithiaError("Native crypto adapter unavailable")
# 1–2. Recover wallet from mnemonic
mnemonic = os.environ.get("WALLET_MNEMONIC")
if not mnemonic:
print("Set WALLET_MNEMONIC env var", file=sys.stderr)
sys.exit(1)
account = crypto.recover_hd_wallet(mnemonic)
print(f"Address: {account.address}")
# 3. Query registration cost for the name
cost = client.get_registration_cost(NAME)
print(f'Registration cost for "{NAME}": {cost.formatted()}')
# 4. Check if name is available
available: bool = client.is_name_available(NAME)
print(f'Name "{NAME}" available: {available}')
if not available:
raise DilithiaError(f'Name "{NAME}" is already taken')
# 5. Register name
reg_result = client.register_name(NAME, account.address, account.secret_key)
reg_receipt: Receipt = client.wait_for_receipt(reg_result.tx_hash, retries=20, delay_ms=2000)
print(f"Name registered in block {reg_receipt.block_height}")
# 6. Set target address
target_result = client.set_name_target(NAME, account.address, account.secret_key)
client.wait_for_receipt(target_result.tx_hash, retries=20, delay_ms=2000)
print(f"Target address set to {account.address}")
# 7. Set profile records
records = [
("display_name", "Alice"),
("avatar", "https://example.com/alice.png"),
("bio", "Builder on Dilithia"),
("email", "alice@example.com"),
("website", "https://alice.dev"),
]
for key, value in records:
res = client.set_name_record(NAME, key, value, account.secret_key)
client.wait_for_receipt(res.tx_hash, retries=20, delay_ms=2000)
print(f' Set record "{key}" = "{value}"')
# 8. Get all records
all_records = client.get_name_records(NAME)
print(f"All records: {all_records}")
# 9. Resolve name to address
resolved = client.resolve_name(NAME)
print(f'resolve_name("{NAME}") -> {resolved}')
# 10. Reverse resolve address to name
reverse_name = client.reverse_resolve_name(account.address)
print(f'reverse_resolve_name("{account.address}") -> {reverse_name}')
# 11. List all names by owner
owned = client.get_names_by_owner(account.address)
print(f"Names owned by {account.address}: {owned}")
# 12. Renew name
renew_result = client.renew_name(NAME, account.secret_key)
renew_receipt: Receipt = client.wait_for_receipt(renew_result.tx_hash, retries=20, delay_ms=2000)
print(f"Name renewed in block {renew_receipt.block_height}")
# 13. Transfer name to another address
transfer_result = client.transfer_name(NAME, TRANSFER_TO, account.secret_key)
transfer_receipt: Receipt = client.wait_for_receipt(transfer_result.tx_hash, retries=20, delay_ms=2000)
print(f"Name transferred to {TRANSFER_TO} in block {transfer_receipt.block_height}")
if __name__ == "__main__":
main()
Scenario 11: Credential Issuance & Verification¶
An issuer creates a KYC credential schema, issues a credential to a holder, and a verifier checks it with selective disclosure. Covers: registerSchema, issueCredential, getSchema, getCredential, listCredentialsByHolder, listCredentialsByIssuer, verifyCredential, revokeCredential.
import os
import sys
import time
from dilithia_sdk import (
DilithiaClient,
Receipt,
DilithiaError,
load_native_crypto_adapter,
)
RPC_URL: str = "https://rpc.dilithia.network/rpc"
HOLDER_ADDRESS: str = "dil1_holder_address"
def main() -> None:
with DilithiaClient(RPC_URL) as client:
crypto = load_native_crypto_adapter()
if crypto is None:
raise DilithiaError("Native crypto adapter unavailable")
# 1–2. Recover issuer wallet
mnemonic = os.environ.get("ISSUER_MNEMONIC")
if not mnemonic:
print("Set ISSUER_MNEMONIC env var", file=sys.stderr)
sys.exit(1)
issuer = crypto.recover_hd_wallet(mnemonic)
print(f"Issuer address: {issuer.address}")
# 3. Register a KYC schema
schema = {
"name": "KYC_Basic_v1",
"attributes": [
{"name": "full_name", "type": "string"},
{"name": "country", "type": "string"},
{"name": "age", "type": "u64"},
{"name": "verified", "type": "bool"},
],
}
schema_result = client.register_schema(schema, issuer.secret_key)
schema_receipt: Receipt = client.wait_for_receipt(schema_result.tx_hash, retries=20, delay_ms=2000)
schema_hash: str = schema_receipt.logs[0]["schema_hash"]
print(f"Schema registered: {schema_hash}")
# 4. Issue credential to holder with commitment hash
commitment_hash: str = crypto.hash_hex(f"{HOLDER_ADDRESS}:KYC_Basic_v1:{int(time.time())}")
issue_result = client.issue_credential(
schema_hash=schema_hash,
holder=HOLDER_ADDRESS,
commitment_hash=commitment_hash,
attributes={"full_name": "Alice Smith", "country": "CH", "age": 30, "verified": True},
signer=issuer.secret_key,
)
issue_receipt: Receipt = client.wait_for_receipt(issue_result.tx_hash, retries=20, delay_ms=2000)
print(f"Credential issued in block {issue_receipt.block_height}")
# 5. Get schema by hash
fetched_schema = client.get_schema(schema_hash)
print(f"Schema: {fetched_schema}")
# 6. Get credential by commitment
credential = client.get_credential(commitment_hash)
print(f"Credential: {credential}")
# 7. List credentials by holder
holder_creds = client.list_credentials_by_holder(HOLDER_ADDRESS)
print(f"Holder has {len(holder_creds)} credential(s)")
# 8. List credentials by issuer
issuer_creds = client.list_credentials_by_issuer(issuer.address)
print(f"Issuer has {len(issuer_creds)} credential(s)")
# 9. Verify selective disclosure — prove age > 18 without revealing exact age
proof = crypto.generate_selective_disclosure_proof(
commitment_hash, ["age"], {"age": {"operator": "gt", "threshold": 18}},
)
verified: bool = client.verify_credential(commitment_hash, proof)
print(f"Selective disclosure (age > 18) verified: {verified}")
# 10. Revoke the credential
revoke_result = client.revoke_credential(commitment_hash, issuer.secret_key)
revoke_receipt: Receipt = client.wait_for_receipt(revoke_result.tx_hash, retries=20, delay_ms=2000)
print(f"Credential revoked in block {revoke_receipt.block_height}")
# 11. Verify revocation by fetching credential again
revoked_cred = client.get_credential(commitment_hash)
print(f"Credential status after revocation: {revoked_cred.status}")
if __name__ == "__main__":
main()