from collections.abc import Mapping
from dataclasses import dataclass
from hashlib import sha256
from types import SimpleNamespace
from typing import Any, Optional, Union, TypedDict
from unittest.mock import MagicMock, patch
from async_substrate_interface import SubstrateInterface
from bittensor_wallet import Wallet
import bittensor.core.subtensor as subtensor_module
from bittensor.core.chain_data import (
NeuronInfo,
NeuronInfoLite,
PrometheusInfo,
AxonInfo,
)
from bittensor.core.errors import ChainQueryError
from bittensor.core.subtensor import Subtensor
from bittensor.core.types import AxonServeCallParams, PrometheusServeCallParams
from bittensor.utils import RAOPERTAO, u16_normalized_float
from bittensor.utils.balance import Balance
# Mock Testing Constant
__GLOBAL_MOCK_STATE__ = {}
BlockNumber = int
[docs]
class InfoDict(Mapping):
[docs]
@classmethod
def default(cls):
raise NotImplementedError
def __getitem__(self, key):
return getattr(self, key)
def __setitem__(self, key, value):
return setattr(self, key, value)
def __iter__(self):
return iter(self.__dict__)
def __len__(self):
return len(self.__dict__)
[docs]
@dataclass
class AxonInfoDict(InfoDict):
block: int
version: int
ip: int # integer representation of ip address
port: int
ip_type: int
protocol: int
placeholder1: int # placeholder for future use
placeholder2: int
[docs]
@classmethod
def default(cls):
return cls(
block=0,
version=0,
ip=0,
port=0,
ip_type=0,
protocol=0,
placeholder1=0,
placeholder2=0,
)
[docs]
@dataclass
class PrometheusInfoDict(InfoDict):
block: int
version: int
ip: int # integer representation of ip address
port: int
ip_type: int
[docs]
@classmethod
def default(cls):
return cls(block=0, version=0, ip=0, port=0, ip_type=0)
[docs]
@dataclass
class MockSubtensorValue:
value: Optional[Any]
[docs]
class MockMapResult:
records: Optional[list[tuple[MockSubtensorValue, MockSubtensorValue]]]
def __init__(
self,
records: Optional[
list[tuple[Union[Any, MockSubtensorValue], Union[Any, MockSubtensorValue]]]
] = None,
):
_records = [
(
(
MockSubtensorValue(value=record[0]),
MockSubtensorValue(value=record[1]),
)
# Make sure record is a tuple of MockSubtensorValue (dict with value attr)
if not (
isinstance(record, tuple)
and all(
isinstance(item, dict) and hasattr(item, "value")
for item in record
)
)
else record
)
for record in records
]
self.records = _records
def __iter__(self):
return iter(self.records)
[docs]
class MockSystemState(TypedDict):
Account: dict[str, dict[int, int]] # address -> block -> balance
[docs]
class MockSubtensorState(TypedDict):
Rho: dict[int, dict[BlockNumber, int]] # netuid -> block -> rho
Kappa: dict[int, dict[BlockNumber, int]] # netuid -> block -> kappa
Difficulty: dict[int, dict[BlockNumber, int]] # netuid -> block -> difficulty
ImmunityPeriod: dict[
int, dict[BlockNumber, int]
] # netuid -> block -> immunity_period
ValidatorBatchSize: dict[
int, dict[BlockNumber, int]
] # netuid -> block -> validator_batch_size
Active: dict[int, dict[BlockNumber, bool]] # (netuid, uid), block -> active
Stake: dict[str, dict[str, dict[int, int]]] # (hotkey, coldkey) -> block -> stake
Delegates: dict[str, dict[int, float]] # address -> block -> delegate_take
NetworksAdded: dict[int, dict[BlockNumber, bool]] # netuid -> block -> added
[docs]
class MockChainState(TypedDict):
System: MockSystemState
SubtensorModule: MockSubtensorState
[docs]
class ReusableCoroutine:
def __init__(self, coroutine):
self.coroutine = coroutine
def __await__(self):
return self.reset().__await__()
[docs]
def reset(self):
return self.coroutine()
async def _async_block():
return 1
[docs]
class MockSubtensor(Subtensor):
"""
A Mock Subtensor class for running tests.
This should mock only methods that make queries to the chain.
e.g. We mock `Subtensor.query_subtensor` instead of all query methods.
This class will also store a local (mock) state of the chain.
"""
chain_state: MockChainState
block_number: int
[docs]
@classmethod
def reset(cls) -> None:
__GLOBAL_MOCK_STATE__.clear()
_ = cls()
[docs]
def setup(self) -> None:
if not hasattr(self, "chain_state") or getattr(self, "chain_state") is None:
self.chain_state = {
"System": {"Account": {}},
"Balances": {"ExistentialDeposit": {0: 500}},
"SubtensorModule": {
"NetworksAdded": {},
"Rho": {},
"Kappa": {},
"Difficulty": {},
"ImmunityPeriod": {},
"ValidatorBatchSize": {},
"ValidatorSequenceLength": {},
"ValidatorEpochsPerReset": {},
"ValidatorEpochLength": {},
"MaxAllowedValidators": {},
"MinAllowedWeights": {},
"MaxWeightLimit": {},
"SynergyScalingLawPower": {},
"ScalingLawPower": {},
"SubnetworkN": {},
"MaxAllowedUids": {},
"NetworkModality": {},
"BlocksSinceLastStep": {},
"Tempo": {},
"NetworkConnect": {},
"EmissionValues": {},
"Burn": {},
"Active": {},
"Uids": {},
"Keys": {},
"Owner": {},
"IsNetworkMember": {},
"LastUpdate": {},
"Rank": {},
"Emission": {},
"Incentive": {},
"Consensus": {},
"Trust": {},
"ValidatorTrust": {},
"Dividends": {},
"PruningScores": {},
"ValidatorPermit": {},
"Weights": {},
"Bonds": {},
"Stake": {},
"TotalStake": {0: 0},
"TotalIssuance": {0: 0},
"TotalHotkeyStake": {},
"TotalColdkeyStake": {},
"TxRateLimit": {0: 0}, # No limit
"Delegates": {},
"Axons": {},
"Prometheus": {},
"SubnetOwner": {},
"Commits": {},
"AdjustmentAlpha": {},
"BondsMovingAverage": {},
},
}
self.block_number = 0
self.network = "mock"
self.chain_endpoint = "ws://mock_endpoint.bt"
self.substrate = MagicMock(autospec=SubstrateInterface)
def __init__(self, *args, **kwargs) -> None:
mock_substrate_interface = MagicMock(autospec=SubstrateInterface)
with patch.object(
subtensor_module,
"SubstrateInterface",
return_value=mock_substrate_interface,
):
super().__init__()
self.__dict__ = __GLOBAL_MOCK_STATE__
if not hasattr(self, "chain_state") or getattr(self, "chain_state") is None:
self.setup()
[docs]
def get_block_hash(self, block: Optional[int] = None) -> str:
return "0x" + sha256(str(block).encode()).hexdigest()[:64]
[docs]
def create_subnet(self, netuid: int) -> None:
subtensor_state = self.chain_state["SubtensorModule"]
if netuid not in subtensor_state["NetworksAdded"]:
# Per Subnet
subtensor_state["Rho"][netuid] = {}
subtensor_state["Rho"][netuid][0] = 10
subtensor_state["Kappa"][netuid] = {}
subtensor_state["Kappa"][netuid][0] = 32_767
subtensor_state["Difficulty"][netuid] = {}
subtensor_state["Difficulty"][netuid][0] = 10_000_000
subtensor_state["ImmunityPeriod"][netuid] = {}
subtensor_state["ImmunityPeriod"][netuid][0] = 4096
subtensor_state["ValidatorBatchSize"][netuid] = {}
subtensor_state["ValidatorBatchSize"][netuid][0] = 32
subtensor_state["ValidatorSequenceLength"][netuid] = {}
subtensor_state["ValidatorSequenceLength"][netuid][0] = 256
subtensor_state["ValidatorEpochsPerReset"][netuid] = {}
subtensor_state["ValidatorEpochsPerReset"][netuid][0] = 60
subtensor_state["ValidatorEpochLength"][netuid] = {}
subtensor_state["ValidatorEpochLength"][netuid][0] = 100
subtensor_state["MaxAllowedValidators"][netuid] = {}
subtensor_state["MaxAllowedValidators"][netuid][0] = 128
subtensor_state["MinAllowedWeights"][netuid] = {}
subtensor_state["MinAllowedWeights"][netuid][0] = 1024
subtensor_state["MaxWeightLimit"][netuid] = {}
subtensor_state["MaxWeightLimit"][netuid][0] = 1_000
subtensor_state["SynergyScalingLawPower"][netuid] = {}
subtensor_state["SynergyScalingLawPower"][netuid][0] = 50
subtensor_state["ScalingLawPower"][netuid] = {}
subtensor_state["ScalingLawPower"][netuid][0] = 50
subtensor_state["SubnetworkN"][netuid] = {}
subtensor_state["SubnetworkN"][netuid][0] = 0
subtensor_state["MaxAllowedUids"][netuid] = {}
subtensor_state["MaxAllowedUids"][netuid][0] = 4096
subtensor_state["NetworkModality"][netuid] = {}
subtensor_state["NetworkModality"][netuid][0] = 0
subtensor_state["BlocksSinceLastStep"][netuid] = {}
subtensor_state["BlocksSinceLastStep"][netuid][0] = 0
subtensor_state["Tempo"][netuid] = {}
subtensor_state["Tempo"][netuid][0] = 99
# subtensor_state['NetworkConnect'][netuid] = {}
# subtensor_state['NetworkConnect'][netuid][0] = {}
subtensor_state["EmissionValues"][netuid] = {}
subtensor_state["EmissionValues"][netuid][0] = 0
subtensor_state["Burn"][netuid] = {}
subtensor_state["Burn"][netuid][0] = 0
subtensor_state["Commits"][netuid] = {}
# Per-UID/Hotkey
subtensor_state["Uids"][netuid] = {}
subtensor_state["Keys"][netuid] = {}
subtensor_state["Owner"][netuid] = {}
subtensor_state["LastUpdate"][netuid] = {}
subtensor_state["Active"][netuid] = {}
subtensor_state["Rank"][netuid] = {}
subtensor_state["Emission"][netuid] = {}
subtensor_state["Incentive"][netuid] = {}
subtensor_state["Consensus"][netuid] = {}
subtensor_state["Trust"][netuid] = {}
subtensor_state["ValidatorTrust"][netuid] = {}
subtensor_state["Dividends"][netuid] = {}
subtensor_state["PruningScores"][netuid] = {}
subtensor_state["PruningScores"][netuid][0] = {}
subtensor_state["ValidatorPermit"][netuid] = {}
subtensor_state["Weights"][netuid] = {}
subtensor_state["Bonds"][netuid] = {}
subtensor_state["Axons"][netuid] = {}
subtensor_state["Prometheus"][netuid] = {}
subtensor_state["NetworksAdded"][netuid] = {}
subtensor_state["NetworksAdded"][netuid][0] = True
subtensor_state["AdjustmentAlpha"][netuid] = {}
subtensor_state["AdjustmentAlpha"][netuid][0] = 1000
subtensor_state["BondsMovingAverage"][netuid] = {}
subtensor_state["BondsMovingAverage"][netuid][0] = 1000
else:
raise Exception("Subnet already exists")
[docs]
def set_difficulty(self, netuid: int, difficulty: int) -> None:
subtensor_state = self.chain_state["SubtensorModule"]
if netuid not in subtensor_state["NetworksAdded"]:
raise Exception("Subnet does not exist")
subtensor_state["Difficulty"][netuid][self.block_number] = difficulty
@staticmethod
def _convert_to_balance(balance: Union["Balance", float, int]) -> "Balance":
if isinstance(balance, float):
balance = Balance.from_tao(balance)
if isinstance(balance, int):
balance = Balance.from_rao(balance)
return balance
[docs]
def force_set_balance(
self, ss58_address: str, balance: Union["Balance", float, int] = Balance(0)
) -> tuple[bool, Optional[str]]:
"""
Returns:
tuple[bool, Optional[str]]: (success, err_msg)
"""
balance = self._convert_to_balance(balance)
if ss58_address not in self.chain_state["System"]["Account"]:
self.chain_state["System"]["Account"][ss58_address] = {
"data": {"free": {0: 0}}
}
old_balance = self.get_balance(ss58_address, self.block_number)
diff = balance.rao - old_balance.rao
# Update total issuance
self.chain_state["SubtensorModule"]["TotalIssuance"][self.block_number] = (
self._get_most_recent_storage(
self.chain_state["SubtensorModule"]["TotalIssuance"]
)
+ diff
)
self.chain_state["System"]["Account"][ss58_address] = {
"data": {"free": {self.block_number: balance.rao}}
}
return True, None
# Alias for force_set_balance
sudo_force_set_balance = force_set_balance
[docs]
def do_block_step(self) -> None:
self.block_number += 1
# Doesn't do epoch
subtensor_state = self.chain_state["SubtensorModule"]
for subnet in subtensor_state["NetworksAdded"]:
subtensor_state["BlocksSinceLastStep"][subnet][self.block_number] = (
self._get_most_recent_storage(
subtensor_state["BlocksSinceLastStep"][subnet]
)
+ 1
)
def _handle_type_default(self, name: str, params: list[object]) -> object:
defaults_mapping = {
"TotalStake": 0,
"TotalHotkeyStake": 0,
"TotalColdkeyStake": 0,
"Stake": 0,
}
return defaults_mapping.get(name, None)
[docs]
def commit(self, wallet: "Wallet", netuid: int, data: str) -> None:
uid = self.get_uid_for_hotkey_on_subnet(
hotkey_ss58=wallet.hotkey.ss58_address,
netuid=netuid,
)
if uid is None:
raise Exception("Neuron not found")
subtensor_state = self.chain_state["SubtensorModule"]
subtensor_state["Commits"][netuid].setdefault(self.block_number, {})[uid] = data
[docs]
def get_commitment(self, netuid: int, uid: int, block: Optional[int] = None) -> str:
if block and self.block_number < block:
raise Exception("Cannot query block in the future")
block = block or self.block_number
subtensor_state = self.chain_state["SubtensorModule"]
return subtensor_state["Commits"][netuid][block][uid]
[docs]
def query_subtensor(
self,
name: str,
block: Optional[int] = None,
params: Optional[list[object]] = None,
) -> MockSubtensorValue:
if params is None:
params = []
if block:
if self.block_number < block:
raise Exception("Cannot query block in the future")
else:
block = self.block_number
state = self.chain_state["SubtensorModule"][name]
if state is not None:
# Use prefix
if len(params) > 0:
while state is not None and len(params) > 0:
state = state.get(params.pop(0), None)
if state is None:
return SimpleNamespace(
value=self._handle_type_default(name, params)
)
# Use block
state_at_block = state.get(block, None)
while state_at_block is None and block > 0:
block -= 1
state_at_block = state.get(block, None)
if state_at_block is not None:
return SimpleNamespace(value=state_at_block)
return SimpleNamespace(value=self._handle_type_default(name, params))
else:
return SimpleNamespace(value=self._handle_type_default(name, params))
[docs]
def query_map_subtensor(
self,
name: str,
block: Optional[int] = None,
params: Optional[list[object]] = None,
) -> Optional[MockMapResult]:
"""
Note: Double map requires one param
"""
if params is None:
params = []
if block:
if self.block_number < block:
raise Exception("Cannot query block in the future")
else:
block = self.block_number
state = self.chain_state["SubtensorModule"][name]
if state is not None:
# Use prefix
if len(params) > 0:
while state is not None and len(params) > 0:
state = state.get(params.pop(0), None)
if state is None:
return MockMapResult([])
# Check if single map or double map
if len(state.keys()) == 0:
return MockMapResult([])
inner = list(state.values())[0]
# Should have at least one key
if len(inner.keys()) == 0:
raise Exception("Invalid state")
# Check if double map
if isinstance(list(inner.values())[0], dict):
# is double map
raise ChainQueryError("Double map requires one param")
# Iterate over each key and add value to list, max at block
records = []
for key in state:
result = self._get_most_recent_storage(state[key], block)
if result is None:
continue # Skip if no result for this key at `block` or earlier
records.append((key, result))
return MockMapResult(records)
else:
return MockMapResult([])
[docs]
def query_constant(
self, module_name: str, constant_name: str, block: Optional[int] = None
) -> Optional[object]:
if block:
if self.block_number < block:
raise Exception("Cannot query block in the future")
else:
block = self.block_number
state: Optional[dict] = self.chain_state.get(module_name, None)
if state is not None:
if constant_name in state:
state = state[constant_name]
else:
return None
# Use block
state_at_block = self._get_most_recent_storage(state, block)
if state_at_block is not None:
return SimpleNamespace(value=state_at_block)
return state_at_block["data"]["free"] # Can be None
else:
return None
[docs]
def get_current_block(self) -> int:
return self.block_number
# ==== Balance RPC methods ====
[docs]
def get_balance(self, address: str, block: int = None) -> "Balance":
if block:
if self.block_number < block:
raise Exception("Cannot query block in the future")
else:
block = self.block_number
state = self.chain_state["System"]["Account"]
if state is not None:
if address in state:
state = state[address]
else:
return Balance(0)
# Use block
balance_state = state["data"]["free"]
state_at_block = self._get_most_recent_storage(
balance_state, block
) # Can be None
if state_at_block is not None:
bal_as_int = state_at_block
return Balance.from_rao(bal_as_int)
else:
return Balance(0)
else:
return Balance(0)
# ==== Neuron RPC methods ====
[docs]
def neuron_for_uid(
self, uid: int, netuid: int, block: Optional[int] = None
) -> Optional[NeuronInfo]:
if uid is None:
return NeuronInfo.get_null_neuron()
if block:
if self.block_number < block:
raise Exception("Cannot query block in the future")
else:
block = self.block_number
if netuid not in self.chain_state["SubtensorModule"]["NetworksAdded"]:
return None
neuron_info = self._neuron_subnet_exists(uid, netuid, block)
if neuron_info is None:
return None
else:
return neuron_info
[docs]
def neurons(self, netuid: int, block: Optional[int] = None) -> list[NeuronInfo]:
if netuid not in self.chain_state["SubtensorModule"]["NetworksAdded"]:
raise Exception("Subnet does not exist")
neurons = []
subnet_n = self._get_most_recent_storage(
self.chain_state["SubtensorModule"]["SubnetworkN"][netuid], block
)
for uid in range(subnet_n):
neuron_info = self.neuron_for_uid(uid, netuid, block)
if neuron_info is not None:
neurons.append(neuron_info)
return neurons
@staticmethod
def _get_most_recent_storage(
storage: dict[BlockNumber, Any], block_number: Optional[int] = None
) -> Any:
if block_number is None:
items = list(storage.items())
items.sort(key=lambda x: x[0], reverse=True)
if len(items) == 0:
return None
return items[0][1]
else:
while block_number >= 0:
if block_number in storage:
return storage[block_number]
block_number -= 1
return None
def _get_axon_info(
self, netuid: int, hotkey: str, block: Optional[int] = None
) -> AxonInfoDict:
# Axons [netuid][hotkey][block_number]
subtensor_state = self.chain_state["SubtensorModule"]
if netuid not in subtensor_state["Axons"]:
return AxonInfoDict.default()
if hotkey not in subtensor_state["Axons"][netuid]:
return AxonInfoDict.default()
result = self._get_most_recent_storage(
subtensor_state["Axons"][netuid][hotkey], block
)
if not result:
return AxonInfoDict.default()
return result
def _get_prometheus_info(
self, netuid: int, hotkey: str, block: Optional[int] = None
) -> PrometheusInfoDict:
subtensor_state = self.chain_state["SubtensorModule"]
if netuid not in subtensor_state["Prometheus"]:
return PrometheusInfoDict.default()
if hotkey not in subtensor_state["Prometheus"][netuid]:
return PrometheusInfoDict.default()
result = self._get_most_recent_storage(
subtensor_state["Prometheus"][netuid][hotkey], block
)
if not result:
return PrometheusInfoDict.default()
return result
def _neuron_subnet_exists(
self, uid: int, netuid: int, block: Optional[int] = None
) -> Optional[NeuronInfo]:
subtensor_state = self.chain_state["SubtensorModule"]
if netuid not in subtensor_state["NetworksAdded"]:
return None
if self._get_most_recent_storage(subtensor_state["SubnetworkN"][netuid]) <= uid:
return None
hotkey = self._get_most_recent_storage(subtensor_state["Keys"][netuid][uid])
if hotkey is None:
return None
axon_info_ = self._get_axon_info(netuid, hotkey, block)
prometheus_info = self._get_prometheus_info(netuid, hotkey, block)
coldkey = self._get_most_recent_storage(subtensor_state["Owner"][hotkey], block)
active = self._get_most_recent_storage(
subtensor_state["Active"][netuid][uid], block
)
rank = self._get_most_recent_storage(
subtensor_state["Rank"][netuid][uid], block
)
emission = self._get_most_recent_storage(
subtensor_state["Emission"][netuid][uid], block
)
incentive = self._get_most_recent_storage(
subtensor_state["Incentive"][netuid][uid], block
)
consensus = self._get_most_recent_storage(
subtensor_state["Consensus"][netuid][uid], block
)
trust = self._get_most_recent_storage(
subtensor_state["Trust"][netuid][uid], block
)
validator_trust = self._get_most_recent_storage(
subtensor_state["ValidatorTrust"][netuid][uid], block
)
dividends = self._get_most_recent_storage(
subtensor_state["Dividends"][netuid][uid], block
)
pruning_score = self._get_most_recent_storage(
subtensor_state["PruningScores"][netuid][uid], block
)
last_update = self._get_most_recent_storage(
subtensor_state["LastUpdate"][netuid][uid], block
)
validator_permit = self._get_most_recent_storage(
subtensor_state["ValidatorPermit"][netuid][uid], block
)
weights = self._get_most_recent_storage(
subtensor_state["Weights"][netuid][uid], block
)
bonds = self._get_most_recent_storage(
subtensor_state["Bonds"][netuid][uid], block
)
stake_dict = {
coldkey: Balance.from_rao(
self._get_most_recent_storage(
subtensor_state["Stake"][hotkey][coldkey], block
)
)
for coldkey in subtensor_state["Stake"][hotkey]
}
stake = sum(stake_dict.values())
weights = [[int(weight[0]), int(weight[1])] for weight in weights]
bonds = [[int(bond[0]), int(bond[1])] for bond in bonds]
rank = u16_normalized_float(rank)
emission = emission / RAOPERTAO
incentive = u16_normalized_float(incentive)
consensus = u16_normalized_float(consensus)
trust = u16_normalized_float(trust)
validator_trust = u16_normalized_float(validator_trust)
dividends = u16_normalized_float(dividends)
prometheus_info = PrometheusInfo.from_dict(prometheus_info)
axon_info_ = AxonInfo.from_neuron_info(
{"hotkey": hotkey, "coldkey": coldkey, "axon_info": axon_info_}
)
neuron_info = NeuronInfo(
hotkey=hotkey,
coldkey=coldkey,
uid=uid,
netuid=netuid,
active=active,
rank=rank,
emission=emission,
incentive=incentive,
consensus=consensus,
trust=trust,
validator_trust=validator_trust,
dividends=dividends,
pruning_score=pruning_score,
last_update=last_update,
validator_permit=validator_permit,
stake=stake,
stake_dict=stake_dict,
total_stake=stake,
prometheus_info=prometheus_info,
axon_info=axon_info_,
weights=weights,
bonds=bonds,
is_null=False,
)
return neuron_info
[docs]
def neurons_lite(
self, netuid: int, block: Optional[int] = None
) -> list[NeuronInfoLite]:
if netuid not in self.chain_state["SubtensorModule"]["NetworksAdded"]:
raise Exception("Subnet does not exist")
neurons = []
subnet_n = self._get_most_recent_storage(
self.chain_state["SubtensorModule"]["SubnetworkN"][netuid]
)
for uid in range(subnet_n):
neuron_info = self.neuron_for_uid_lite(uid, netuid, block)
if neuron_info is not None:
neurons.append(neuron_info)
return neurons
[docs]
def neuron_for_uid_lite(
self, uid: int, netuid: int, block: Optional[int] = None
) -> Optional[NeuronInfoLite]:
if uid is None:
return NeuronInfoLite.get_null_neuron()
if block:
if self.block_number < block:
raise Exception("Cannot query block in the future")
else:
block = self.block_number
if netuid not in self.chain_state["SubtensorModule"]["NetworksAdded"]:
return None
neuron_info = self._neuron_subnet_exists(uid, netuid, block)
if neuron_info is None:
# TODO Why does this return None here but a null neuron earlier?
return None
else:
return NeuronInfoLite(
hotkey=neuron_info.hotkey,
coldkey=neuron_info.coldkey,
uid=neuron_info.uid,
netuid=neuron_info.netuid,
active=neuron_info.active,
stake=neuron_info.stake,
stake_dict=neuron_info.stake_dict,
total_stake=neuron_info.total_stake,
rank=neuron_info.rank,
emission=neuron_info.emission,
incentive=neuron_info.incentive,
consensus=neuron_info.consensus,
trust=neuron_info.trust,
validator_trust=neuron_info.validator_trust,
dividends=neuron_info.dividends,
last_update=neuron_info.last_update,
validator_permit=neuron_info.validator_permit,
prometheus_info=neuron_info.prometheus_info,
axon_info=neuron_info.axon_info,
pruning_score=neuron_info.pruning_score,
is_null=neuron_info.is_null,
)
[docs]
def get_transfer_fee(
self, wallet: "Wallet", dest: str, value: Union["Balance", float, int]
) -> "Balance":
return Balance(700)
[docs]
def do_transfer(
self,
wallet: "Wallet",
dest: str,
transfer_balance: "Balance",
wait_for_inclusion: bool = True,
wait_for_finalization: bool = False,
) -> tuple[bool, Optional[str], Optional[str]]:
bal = self.get_balance(wallet.coldkeypub.ss58_address)
dest_bal = self.get_balance(dest)
transfer_fee = self.get_transfer_fee(wallet, dest, transfer_balance)
existential_deposit = self.get_existential_deposit()
if bal < transfer_balance + existential_deposit + transfer_fee:
raise Exception("Insufficient balance")
# Remove from the free balance
self.chain_state["System"]["Account"][wallet.coldkeypub.ss58_address]["data"][
"free"
][self.block_number] = (bal - transfer_balance - transfer_fee).rao
# Add to the free balance
if dest not in self.chain_state["System"]["Account"]:
self.chain_state["System"]["Account"][dest] = {"data": {"free": {}}}
self.chain_state["System"]["Account"][dest]["data"]["free"][
self.block_number
] = (dest_bal + transfer_balance).rao
return True, None, None
[docs]
@staticmethod
def min_required_stake():
"""
As the minimum required stake may change, this method allows us to dynamically
update the amount in the mock without updating the tests
"""
# valid minimum threshold as of 2024/05/01
return 100_000_000 # RAO
[docs]
def do_serve_prometheus(
self,
wallet: "Wallet",
call_params: "PrometheusServeCallParams",
wait_for_inclusion: bool = False,
wait_for_finalization: bool = True,
) -> tuple[bool, Optional[str]]:
return True, None
[docs]
def do_set_weights(
self,
wallet: "Wallet",
netuid: int,
uids: int,
vals: list[int],
version_key: int,
wait_for_inclusion: bool = False,
wait_for_finalization: bool = True,
) -> tuple[bool, Optional[str]]:
return True, None
[docs]
def do_serve_axon(
self,
wallet: "Wallet",
call_params: "AxonServeCallParams",
wait_for_inclusion: bool = False,
wait_for_finalization: bool = True,
) -> tuple[bool, Optional[str]]:
return True, None