diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000..e765405 --- /dev/null +++ b/mypy.ini @@ -0,0 +1,3 @@ +[mypy] +ignore_missing_imports = True +show_error_codes = True diff --git a/pool/difficulty_adjustment.py b/pool/difficulty_adjustment.py index 57bb37d..c713377 100644 --- a/pool/difficulty_adjustment.py +++ b/pool/difficulty_adjustment.py @@ -1,4 +1,4 @@ -from typing import Tuple, List +from typing import List, Tuple from chia.util.ints import uint64 @@ -27,12 +27,12 @@ def get_new_difficulty( # Lower the difficulty if we are really slow since our last partial last_timestamp = recent_partials[0][0] if current_time - last_timestamp > 3 * 3600: - return max(min_difficulty, current_difficulty // 5) + return uint64(max(min_difficulty, current_difficulty // 5)) if current_time - last_timestamp > 3600: - return max(min_difficulty, uint64(int(current_difficulty // 1.5))) + return uint64(max(min_difficulty, uint64(int(current_difficulty // 1.5)))) - time_taken = uint64(recent_partials[0][0] - recent_partials[-1][0]) + time_taken = (recent_partials[0][0] - recent_partials[-1][0]) * 1.0 # If we don't have enough partials at this difficulty and time between last and # 1st partials is below target time, don't update yet @@ -45,4 +45,4 @@ def get_new_difficulty( # Finally, this is the standard case of normal farming and slow (or no) growth, adjust to the new difficulty new_difficulty = uint64(int(current_difficulty * time_target / time_taken)) - return max(min_difficulty, new_difficulty) + return uint64(max(min_difficulty, new_difficulty)) diff --git a/pool/pool.py b/pool/pool.py index 3bb0ead..34fcaee 100644 --- a/pool/pool.py +++ b/pool/pool.py @@ -6,51 +6,50 @@ import traceback from asyncio import Task from math import floor -from typing import Dict, Optional, Set, List, Tuple, Callable +from typing import Any, Callable, Dict, List, Optional, Set, Tuple from blspy import AugSchemeMPL, G1Element from chia.consensus.block_rewards import calculate_pool_reward -from chia.pools.pool_wallet_info import PoolState, PoolSingletonState +from chia.consensus.constants import ConsensusConstants +from chia.consensus.pot_iterations import calculate_iterations_quality +from chia.full_node.signage_point import SignagePoint +from chia.pools.pool_puzzles import ( + get_delayed_puz_info_from_launcher_spend, + get_most_recent_singleton_coin_from_coin_spend, + launcher_id_to_p2_puzzle_hash, +) +from chia.pools.pool_wallet_info import PoolSingletonState, PoolState from chia.protocols.pool_protocol import ( + POOL_PROTOCOL_VERSION, PoolErrorCode, - PostPartialRequest, - PostPartialResponse, PostFarmerRequest, PostFarmerResponse, + PostPartialRequest, + PostPartialResponse, PutFarmerRequest, - PutFarmerResponse, - POOL_PROTOCOL_VERSION, ) +from chia.rpc.full_node_rpc_client import FullNodeRpcClient from chia.rpc.wallet_rpc_client import WalletRpcClient from chia.types.blockchain_format.coin import Coin from chia.types.blockchain_format.proof_of_space import verify_and_get_quality_string +from chia.types.blockchain_format.sized_bytes import bytes32 from chia.types.coin_record import CoinRecord from chia.types.coin_spend import CoinSpend +from chia.types.end_of_slot_bundle import EndOfSubSlotBundle from chia.util.bech32m import decode_puzzle_hash -from chia.consensus.constants import ConsensusConstants -from chia.util.ints import uint8, uint16, uint32, uint64 from chia.util.byte_types import hexstr_to_bytes +from chia.util.chia_logging import initialize_logging from chia.util.default_root import DEFAULT_ROOT_PATH -from chia.rpc.full_node_rpc_client import FullNodeRpcClient -from chia.full_node.signage_point import SignagePoint -from chia.types.end_of_slot_bundle import EndOfSubSlotBundle -from chia.types.blockchain_format.sized_bytes import bytes32 -from chia.consensus.pot_iterations import calculate_iterations_quality +from chia.util.ints import uint8, uint16, uint32, uint64 from chia.util.lru_cache import LRUCache -from chia.util.chia_logging import initialize_logging from chia.wallet.transaction_record import TransactionRecord -from chia.pools.pool_puzzles import ( - get_most_recent_singleton_coin_from_coin_spend, - get_delayed_puz_info_from_launcher_spend, - launcher_id_to_p2_puzzle_hash, -) from .difficulty_adjustment import get_new_difficulty -from .singleton import create_absorb_transaction, get_singleton_state, get_coin_spend, get_farmed_height +from .record import FarmerRecord +from .singleton import create_absorb_transaction, get_coin_spend, get_singleton_state from .store.abstract import AbstractPoolStore from .store.sqlite_store import SqlitePoolStore -from .record import FarmerRecord -from .util import error_dict, RequestMetadata +from .util import RequestMetadata, error_dict class Pool: @@ -78,13 +77,14 @@ def __init__( self.config = config self.constants = constants + self.store: AbstractPoolStore if pool_config.get("store") == "MariadbPoolStore": from .store.mariadb_store import MariadbPoolStore - self.store: AbstractPoolStore = pool_store or MariadbPoolStore() + self.store = pool_store or MariadbPoolStore() else: - self.store: AbstractPoolStore = pool_store or SqlitePoolStore() + self.store = pool_store or SqlitePoolStore() self.pool_fee = pool_config["pool_fee"] @@ -160,7 +160,7 @@ def __init__( self.pending_payments: Optional[asyncio.Queue] = None # Keeps track of the latest state of our node - self.blockchain_state = {"peak": None} + self.blockchain_state: Dict[str, Any] = {"peak": None} # Whether or not the wallet is synced (required to make payments) self.wallet_synced = False @@ -247,12 +247,13 @@ async def get_peak_loop(self): self.log.error(f"Unexpected error in get_peak_loop: {e}") await asyncio.sleep(30) - async def collect_pool_rewards_loop(self): + async def collect_pool_rewards_loop(self) -> None: """ Iterates through the blockchain, looking for pool rewards, and claims them, creating a transaction to the pool's puzzle_hash. """ + assert self.node_rpc_client while True: try: if not self.blockchain_state["sync"]["synced"]: @@ -357,7 +358,7 @@ async def collect_pool_rewards_loop(self): self.log.error(f"Unexpected error in collect_pool_rewards_loop: {e} {error_stack}") await asyncio.sleep(self.collect_pool_rewards_interval) - async def create_payment_loop(self): + async def create_payment_loop(self) -> None: """ Calculates the points of each farmer, and splits the total funds received into coins for each farmer. Saves the transactions that we should make, to `amount_to_distribute`. @@ -369,6 +370,7 @@ async def create_payment_loop(self): await asyncio.sleep(60) continue + assert self.pending_payments if self.pending_payments.qsize() != 0: self.log.warning(f"Pending payments ({self.pending_payments.qsize()}), waiting") await asyncio.sleep(60) @@ -376,6 +378,7 @@ async def create_payment_loop(self): self.log.info("Starting to create payment") + assert self.node_rpc_client coin_records: List[CoinRecord] = await self.node_rpc_client.get_coin_records_by_puzzle_hash( self.default_target_puzzle_hash, include_spent_coins=False, @@ -441,16 +444,18 @@ async def create_payment_loop(self): self.log.error(f"Unexpected error in create_payments_loop: {e} {error_stack}") await asyncio.sleep(self.payment_interval) - async def submit_payment_loop(self): + async def submit_payment_loop(self) -> None: while True: try: peak_height = self.blockchain_state["peak"].height + assert self.wallet_rpc_client await self.wallet_rpc_client.log_in(fingerprint=self.wallet_fingerprint) if not self.blockchain_state["sync"]["synced"] or not self.wallet_synced: self.log.warning("Waiting for wallet sync") await asyncio.sleep(60) continue + assert self.pending_payments payment_targets = await self.pending_payments.get() assert len(payment_targets) > 0 @@ -461,12 +466,14 @@ async def submit_payment_loop(self): # blockchain_fee = 0.00001 * (10 ** 12) * len(payment_targets) blockchain_fee: uint64 = uint64(0) try: + assert self.wallet_rpc_client transaction: TransactionRecord = await self.wallet_rpc_client.send_transaction_multi( self.wallet_id, payment_targets, fee=blockchain_fee ) except ValueError as e: self.log.error(f"Error making payment: {e}") await asyncio.sleep(10) + assert self.pending_payments await self.pending_payments.put(payment_targets) continue @@ -476,6 +483,7 @@ async def submit_payment_loop(self): not transaction.confirmed or not (peak_height - transaction.confirmed_at_height) > self.confirmation_security_threshold ): + assert self.wallet_rpc_client transaction = await self.wallet_rpc_client.get_transaction(self.wallet_id, transaction.name) peak_height = self.blockchain_state["peak"].height self.log.info( @@ -498,7 +506,7 @@ async def submit_payment_loop(self): self.log.error(f"Unexpected error in submit_payment_loop: {e}") await asyncio.sleep(60) - async def confirm_partials_loop(self): + async def confirm_partials_loop(self) -> None: """ Pulls things from the queue of partials one at a time, and adjusts balances. """ @@ -507,6 +515,7 @@ async def confirm_partials_loop(self): try: # The points are based on the difficulty at the time of partial submission, not at the time of # confirmation + assert self.pending_point_partials partial, time_received, points_received = await self.pending_point_partials.get() # Wait a few minutes to check if partial is still valid in the blockchain (no reorgs) @@ -521,6 +530,9 @@ async def confirm_partials_loop(self): self.log.error(f"Unexpected error: {e}") async def check_and_confirm_partial(self, partial: PostPartialRequest, points_received: uint64) -> None: + if self.node_rpc_client is None: + self.log.error("Logic Error: check_and_confirm_partial called with self.node_rpc_client=None") + return try: # TODO(pool): these lookups to the full node are not efficient and can be cached, especially for # scaling to many users @@ -554,11 +566,13 @@ async def check_and_confirm_partial(self, partial: PostPartialRequest, points_re _, _, is_member = singleton_state_tuple if not is_member: - self.log.info(f"Singleton is not assigned to this pool") + self.log.info(f"Singleton {partial.payload.launcher_id} is not assigned to this pool") return async with self.store.lock: farmer_record: Optional[FarmerRecord] = await self.store.get_farmer_record(partial.payload.launcher_id) + if farmer_record is None: + raise RuntimeError(f"get_farmer_record failed for {partial.payload.launcher_id}") assert ( partial.payload.proof_of_space.pool_contract_puzzle_hash == farmer_record.p2_singleton_puzzle_hash @@ -576,8 +590,8 @@ async def check_and_confirm_partial(self, partial: PostPartialRequest, points_re async def validate_payout_instructions(self, payout_instructions: str) -> Optional[str]: """ - Returns the puzzle hash as a hex string from the payout instructions (puzzle hash hex or bech32m address) if it's encoded - correctly, otherwise returns None. + Returns the puzzle hash as a hex string from the payout instructions (puzzle hash hex or bech32m address) + if it's encoded correctly, otherwise returns None. """ try: if len(decode_puzzle_hash(payout_instructions)) == 32: @@ -611,7 +625,10 @@ async def add_farmer(self, request: PostFarmerRequest, metadata: RequestMetadata last_spend, last_state, is_member = singleton_state_tuple if is_member is None: - return error_dict(PoolErrorCode.INVALID_SINGLETON, f"Singleton is not assigned to this pool") + return error_dict( + PoolErrorCode.INVALID_SINGLETON, + f"Singleton {request.payload.launcher_id} is not assigned to this pool", + ) if ( request.payload.suggested_difficulty is None @@ -625,22 +642,25 @@ async def add_farmer(self, request: PostFarmerRequest, metadata: RequestMetadata if puzzle_hash is None: return error_dict( PoolErrorCode.INVALID_PAYOUT_INSTRUCTIONS, - f"Payout instructions must be an xch address or puzzle hash for this pool.", + f"Payout instructions must be an xch address or puzzle hash for this pool. We got: " + f"[{request.payload.payout_instructions}]", ) if not AugSchemeMPL.verify(last_state.owner_pubkey, request.payload.get_hash(), request.signature): - return error_dict(PoolErrorCode.INVALID_SIGNATURE, f"Invalid signature") + return error_dict(PoolErrorCode.INVALID_SIGNATURE, f"Invalid signature: {request.signature}") + assert self.node_rpc_client launcher_coin: Optional[CoinRecord] = await self.node_rpc_client.get_coin_record_by_name( request.payload.launcher_id ) assert launcher_coin is not None and launcher_coin.spent - + assert self.node_rpc_client launcher_solution: Optional[CoinSpend] = await get_coin_spend(self.node_rpc_client, launcher_coin) + assert launcher_solution delay_time, delay_puzzle_hash = get_delayed_puz_info_from_launcher_spend(launcher_solution) if delay_time < 3600: - return error_dict(PoolErrorCode.DELAY_TIME_TOO_SHORT, f"Delay time too short, must be at least 1 hour") + return error_dict(PoolErrorCode.DELAY_TIME_TOO_SHORT, "Delay time too short, must be at least 1 hour") p2_singleton_puzzle_hash = launcher_id_to_p2_puzzle_hash( request.payload.launcher_id, delay_time, delay_puzzle_hash @@ -669,7 +689,7 @@ async def update_farmer(self, request: PutFarmerRequest, metadata: RequestMetada # First check if this launcher_id is currently blocked for farmer updates, if so there is no reason to validate # all the stuff below if launcher_id in self.farmer_update_blocked: - return error_dict(PoolErrorCode.REQUEST_FAILED, f"Cannot update farmer yet.") + return error_dict(PoolErrorCode.REQUEST_FAILED, "Cannot update farmer yet.") farmer_record: Optional[FarmerRecord] = await self.store.get_farmer_record(launcher_id) if farmer_record is None: return error_dict(PoolErrorCode.FARMER_NOT_KNOWN, f"Farmer with launcher_id {launcher_id} not known.") @@ -683,10 +703,12 @@ async def update_farmer(self, request: PutFarmerRequest, metadata: RequestMetada last_spend, last_state, is_member = singleton_state_tuple if is_member is None: - return error_dict(PoolErrorCode.INVALID_SINGLETON, f"Singleton is not assigned to this pool") + return error_dict( + PoolErrorCode.INVALID_SINGLETON, f"Singleton {request.payload.launcher_id} is not assigned to this pool" + ) if not AugSchemeMPL.verify(last_state.owner_pubkey, request.payload.get_hash(), request.signature): - return error_dict(PoolErrorCode.INVALID_SIGNATURE, f"Invalid signature") + return error_dict(PoolErrorCode.INVALID_SIGNATURE, f"Invalid signature {request.signature}") updated_record: FarmerRecord = dataclasses.replace(farmer_record) response_dict: Dict[str, bool] = {} @@ -745,9 +767,10 @@ async def get_and_validate_singleton_state( """ singleton_task: Optional[Task] = self.follow_singleton_tasks.get(launcher_id, None) remove_after = False - farmer_rec = None + farmer_rec: Optional[FarmerRecord] = None + assert self.node_rpc_client if singleton_task is None or singleton_task.done(): - farmer_rec: Optional[FarmerRecord] = await self.store.get_farmer_record(launcher_id) + farmer_rec = await self.store.get_farmer_record(launcher_id) singleton_task = asyncio.create_task( get_singleton_state( self.node_rpc_client, @@ -818,6 +841,7 @@ async def process_partial( partial: PostPartialRequest, farmer_record: FarmerRecord, time_received_partial: uint64, + peak_height: uint32, ) -> Dict: # Validate signatures message: bytes32 = partial.payload.get_hash() @@ -868,12 +892,20 @@ async def get_signage_point_or_eos(): # Validate the proof if signage_point is not None: + assert signage_point.cc_vdf challenge_hash: bytes32 = signage_point.cc_vdf.challenge else: + assert end_of_sub_slot + assert end_of_sub_slot.challenge_chain challenge_hash = end_of_sub_slot.challenge_chain.get_hash() + # Note the use of peak_height + 1. We Are evaluating the suitability for the next block quality_string: Optional[bytes32] = verify_and_get_quality_string( - partial.payload.proof_of_space, self.constants, challenge_hash, partial.payload.sp_hash + partial.payload.proof_of_space, + self.constants, + challenge_hash, + partial.payload.sp_hash, + height=uint32(peak_height + 1), ) if quality_string is None: return error_dict(PoolErrorCode.INVALID_PROOF, f"Invalid proof of space {partial.payload.sp_hash}") @@ -893,13 +925,14 @@ async def get_signage_point_or_eos(): f"Proof of space has required iters {required_iters}, too high for difficulty " f"{current_difficulty}", ) + assert self.pending_point_partials await self.pending_point_partials.put((partial, time_received_partial, current_difficulty)) async with self.store.lock: # Obtains the new record in case we just updated difficulty - farmer_record: Optional[FarmerRecord] = await self.store.get_farmer_record(partial.payload.launcher_id) - if farmer_record is not None: - current_difficulty = farmer_record.difficulty + farmer_record_from_store = await self.store.get_farmer_record(partial.payload.launcher_id) + if farmer_record_from_store is not None: + current_difficulty = farmer_record_from_store.difficulty # Decide whether to update the difficulty recent_partials = await self.store.get_recent_partials( partial.payload.launcher_id, self.number_of_partials_target diff --git a/pool/pool_server.py b/pool/pool_server.py index e50869c..923c630 100644 --- a/pool/pool_server.py +++ b/pool/pool_server.py @@ -4,37 +4,37 @@ import ssl import time import traceback -from typing import Dict, Callable, Optional +from typing import Callable, Dict, Optional, Union import aiohttp import yaml -from blspy import AugSchemeMPL, G2Element from aiohttp import web +from blspy import AugSchemeMPL, G2Element +from chia.consensus.constants import ConsensusConstants +from chia.consensus.default_constants import DEFAULT_CONSTANTS from chia.protocols.pool_protocol import ( - PoolErrorCode, + POOL_PROTOCOL_VERSION, + AuthenticationPayload, GetFarmerResponse, GetPoolInfoResponse, - PostPartialRequest, + PoolErrorCode, PostFarmerRequest, + PostPartialRequest, PutFarmerRequest, validate_authentication_token, - POOL_PROTOCOL_VERSION, - AuthenticationPayload, ) from chia.types.blockchain_format.sized_bytes import bytes32 from chia.util.byte_types import hexstr_to_bytes +from chia.util.config import load_config +from chia.util.default_root import DEFAULT_ROOT_PATH from chia.util.hash import std_hash -from chia.consensus.default_constants import DEFAULT_CONSTANTS -from chia.consensus.constants import ConsensusConstants +from chia.util.ints import uint8, uint32, uint64 from chia.util.json_util import obj_to_response -from chia.util.ints import uint8, uint64, uint32 -from chia.util.default_root import DEFAULT_ROOT_PATH -from chia.util.config import load_config -from .record import FarmerRecord from .pool import Pool +from .record import FarmerRecord from .store.abstract import AbstractPoolStore -from .util import error_response, RequestMetadata +from .util import RequestMetadata, error_response def allow_cors(response: web.Response) -> web.Response: @@ -116,7 +116,7 @@ async def get_pool_info(self, _) -> web.Response: async def get_farmer(self, request_obj) -> web.Response: # TODO(pool): add rate limiting - launcher_id: bytes32 = hexstr_to_bytes(request_obj.rel_url.query["launcher_id"]) + launcher_id: bytes32 = bytes32(hexstr_to_bytes(request_obj.rel_url.query["launcher_id"])) authentication_token = uint64(request_obj.rel_url.query["authentication_token"]) authentication_token_error: Optional[web.Response] = check_authentication_token( @@ -228,7 +228,11 @@ async def post_partial(self, request_obj) -> web.Response: f"Farmer with launcher_id {partial.payload.launcher_id.hex()} not known.", ) - post_partial_response = await self.pool.process_partial(partial, farmer_record, uint64(int(start_time))) + peak_height = self.pool.blockchain_state["peak"].height + # Note the use of peak_height + 1. We Are evaluating the suitability for the next block + post_partial_response = await self.pool.process_partial( + partial, farmer_record, uint64(int(start_time)), peak_height + 1 + ) self.pool.log.info( f"post_partial response {post_partial_response}, time: {time.time() - start_time} " @@ -238,7 +242,7 @@ async def post_partial(self, request_obj) -> web.Response: async def get_login(self, request_obj) -> web.Response: # TODO(pool): add rate limiting - launcher_id: bytes32 = hexstr_to_bytes(request_obj.rel_url.query["launcher_id"]) + launcher_id: bytes32 = bytes32(hexstr_to_bytes(request_obj.rel_url.query["launcher_id"])) authentication_token: uint64 = uint64(request_obj.rel_url.query["authentication_token"]) authentication_token_error = check_authentication_token( launcher_id, authentication_token, self.pool.authentication_token_timeout @@ -267,9 +271,9 @@ async def get_login(self, request_obj) -> web.Response: return await self.login_response(launcher_id) - async def login_response(self, launcher_id): + async def login_response(self, launcher_id) -> web.Response: record: Optional[FarmerRecord] = await self.pool.store.get_farmer_record(launcher_id) - response = {} + response: Dict[str, Union[FarmerRecord, list[tuple[uint64, uint64]]]] = {} if record is not None: response["farmer_record"] = record recent_partials = await self.pool.store.get_recent_partials(launcher_id, 20) diff --git a/pool/record.py b/pool/record.py index 98009b8..18d6581 100644 --- a/pool/record.py +++ b/pool/record.py @@ -5,7 +5,7 @@ from chia.types.blockchain_format.sized_bytes import bytes32 from chia.types.coin_spend import CoinSpend from chia.util.ints import uint64 -from chia.util.streamable import streamable, Streamable +from chia.util.streamable import Streamable, streamable @streamable diff --git a/pool/singleton.py b/pool/singleton.py index 8951dd2..b5cb40a 100644 --- a/pool/singleton.py +++ b/pool/singleton.py @@ -1,15 +1,15 @@ -from typing import List, Optional, Tuple import logging +from typing import List, Optional, Tuple from blspy import G2Element from chia.consensus.coinbase import pool_parent_id from chia.pools.pool_puzzles import ( create_absorb_spend, - solution_to_pool_state, - get_most_recent_singleton_coin_from_coin_spend, - pool_state_to_inner_puzzle, create_full_puzzle, get_delayed_puz_info_from_launcher_spend, + get_most_recent_singleton_coin_from_coin_spend, + pool_state_to_inner_puzzle, + solution_to_pool_state, ) from chia.pools.pool_wallet import PoolSingletonState from chia.pools.pool_wallet_info import PoolState @@ -25,7 +25,6 @@ from chia.util.ints import uint32, uint64 from chia.wallet.transaction_record import TransactionRecord - from .record import FarmerRecord log = logging @@ -59,6 +58,7 @@ async def get_singleton_state( confirmation_security_threshold: int, genesis_challenge: bytes32, ) -> Optional[Tuple[CoinSpend, PoolState, PoolState]]: + last_spend: Optional[CoinSpend] try: if farmer_record is None: launcher_coin: Optional[CoinRecord] = await node_rpc_client.get_coin_record_by_name(launcher_id) @@ -69,10 +69,16 @@ async def get_singleton_state( log.warning(f"Genesis coin {launcher_id} not spent") return None - last_spend: Optional[CoinSpend] = await get_coin_spend(node_rpc_client, launcher_coin) + last_spend = await get_coin_spend(node_rpc_client, launcher_coin) + if last_spend is None: + raise RuntimeError( + f"Failed to get_coin_spend from {node_rpc_client.hostname}:{node_rpc_client.port}" + f" for singleton {launcher_coin}" + ) delay_time, delay_puzzle_hash = get_delayed_puz_info_from_launcher_spend(last_spend) saved_state = solution_to_pool_state(last_spend) - assert last_spend is not None and saved_state is not None + if saved_state is None: + raise RuntimeError(f"solution_to_pool_state failed to get state for spend {last_spend}") else: last_spend = farmer_record.singleton_tip saved_state = farmer_record.singleton_tip_state @@ -108,7 +114,7 @@ async def get_singleton_state( return None break - last_spend: Optional[CoinSpend] = await get_coin_spend(node_rpc_client, next_coin_record) + last_spend = await get_coin_spend(node_rpc_client, next_coin_record) assert last_spend is not None pool_state: Optional[PoolState] = solution_to_pool_state(last_spend) @@ -145,7 +151,7 @@ async def create_absorb_transaction( peak_height: uint32, reward_coin_records: List[CoinRecord], genesis_challenge: bytes32, - fee_amount: Optional[uint64] = None, + fee_amount: uint64 = uint64(0), wallet_rpc_client: Optional[WalletRpcClient] = None, fee_target_puzzle_hash: Optional[bytes32] = None, ) -> Optional[SpendBundle]: @@ -193,6 +199,7 @@ async def create_absorb_transaction( if len(coin_announcements) > 0: # address can be anything + assert wallet_rpc_client signed_transaction: TransactionRecord = await wallet_rpc_client.create_signed_transaction( additions=[{"amount": uint64(1), "puzzle_hash": fee_target_puzzle_hash}], fee=uint64(fee_amount * len(coin_announcements)), diff --git a/pool/store/abstract.py b/pool/store/abstract.py index 35eabe3..79c1436 100644 --- a/pool/store/abstract.py +++ b/pool/store/abstract.py @@ -1,6 +1,6 @@ -from abc import ABC, abstractmethod import asyncio -from typing import Optional, Set, List, Tuple +from abc import ABC, abstractmethod +from typing import List, Optional, Set, Tuple from chia.pools.pool_wallet_info import PoolState from chia.types.blockchain_format.sized_bytes import bytes32 diff --git a/pool/store/mariadb_store.py b/pool/store/mariadb_store.py index 5e3ef43..2553706 100644 --- a/pool/store/mariadb_store.py +++ b/pool/store/mariadb_store.py @@ -1,19 +1,19 @@ -import os -import yaml import logging -from typing import Optional, Set, List, Tuple, Dict +import os +from typing import Dict, List, Optional, Set, Tuple import aiomysql -import pymysql +import pymysql # type: ignore +import yaml from blspy import G1Element from chia.pools.pool_wallet_info import PoolState from chia.types.blockchain_format.sized_bytes import bytes32 -from chia.types.coin_solution import CoinSpend +from chia.types.coin_spend import CoinSpend from chia.util.ints import uint64 -from .abstract import AbstractPoolStore from ..record import FarmerRecord from ..util import RequestMetadata +from .abstract import AbstractPoolStore pymysql.converters.encoders[uint64] = pymysql.converters.escape_int pymysql.converters.conversions = pymysql.converters.encoders.copy() @@ -25,14 +25,14 @@ class MariadbPoolStore(AbstractPoolStore): Pool store based on MariaDB. """ - async def connect(self): + async def connect(self) -> None: try: # initialize logging self.log = logging # load config with open(os.getcwd() + "/config.yaml") as f: config: Dict = yaml.safe_load(f) - self.pool = await aiomysql.create_pool( + self.pool_db = await aiomysql.create_pool( minsize=1, maxsize=12, host=config["db_host"], @@ -44,7 +44,7 @@ async def connect(self): except pymysql.err.OperationalError as e: self.log.error("Error In Database Config. Check your config file! %s", e) raise ConnectionError("Unable to Connect to SQL Database.") - self.connection = await self.pool.acquire() + self.connection = await self.pool_db.acquire() self.cursor = await self.connection.cursor() await self.cursor.execute( ( @@ -73,15 +73,15 @@ async def connect(self): await self.cursor.execute("CREATE INDEX IF NOT EXISTS timestamp_index on partial(timestamp)") await self.cursor.execute("CREATE INDEX IF NOT EXISTS launcher_id_index on partial(launcher_id)") await self.connection.commit() - self.pool.release(self.connection) + self.pool_db.release(self.connection) @staticmethod def _row_to_farmer_record(row) -> FarmerRecord: return FarmerRecord( - bytes.fromhex(row[0]), - bytes.fromhex(row[1]), + bytes32(bytes.fromhex(row[0])), + bytes32(bytes.fromhex(row[1])), row[2], - bytes.fromhex(row[3]), + bytes32(bytes.fromhex(row[3])), G1Element.from_bytes(bytes.fromhex(row[4])), CoinSpend.from_bytes(row[5]), PoolState.from_bytes(row[6]), @@ -92,13 +92,13 @@ def _row_to_farmer_record(row) -> FarmerRecord: ) async def add_farmer_record(self, farmer_record: FarmerRecord, metadata: RequestMetadata): - with await self.pool as connection: + with await self.pool_db as connection: cursor = await connection.cursor() await cursor.execute( - f"INSERT INTO farmer VALUES(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) " - f"ON DUPLICATE KEY UPDATE p2_singleton_puzzle_hash=%s, delay_time=%s, delay_puzzle_hash=%s," - f"authentication_public_key=%s, singleton_tip=%s, singleton_tip_state=%s, payout_instructions=%s, " - f"is_pool_member=%s", + "INSERT INTO farmer VALUES(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) " + "ON DUPLICATE KEY UPDATE p2_singleton_puzzle_hash=%s, delay_time=%s, delay_puzzle_hash=%s," + "authentication_public_key=%s, singleton_tip=%s, singleton_tip_state=%s, payout_instructions=%s, " + "is_pool_member=%s", ( farmer_record.launcher_id.hex(), farmer_record.p2_singleton_puzzle_hash.hex(), @@ -125,10 +125,10 @@ async def add_farmer_record(self, farmer_record: FarmerRecord, metadata: Request async def get_farmer_record(self, launcher_id: bytes32) -> Optional[FarmerRecord]: # TODO(pool): use cache - with await self.pool as connection: + with await self.pool_db as connection: cursor = await connection.cursor() await cursor.execute( - f"SELECT * FROM farmer WHERE launcher_id=%s", + "SELECT * FROM farmer WHERE launcher_id=%s", (launcher_id.hex(),), ) row = await cursor.fetchone() @@ -137,11 +137,11 @@ async def get_farmer_record(self, launcher_id: bytes32) -> Optional[FarmerRecord return self._row_to_farmer_record(row) async def update_difficulty(self, launcher_id: bytes32, difficulty: uint64): - with await self.pool as connection: - connection = await self.pool.acquire() + with await self.pool_db as connection: + connection = await self.pool_db.acquire() cursor = await connection.cursor() await cursor.execute( - f"UPDATE farmer SET difficulty=%s WHERE launcher_id=%s", (difficulty, launcher_id.hex()) + "UPDATE farmer SET difficulty=%s WHERE launcher_id=%s", (difficulty, launcher_id.hex()) ) await connection.commit() @@ -153,16 +153,16 @@ async def update_singleton( is_pool_member: bool, ): entry = (bytes(singleton_tip), bytes(singleton_tip_state), int(is_pool_member), launcher_id.hex()) - with await self.pool as connection: + with await self.pool_db as connection: cursor = await connection.cursor() await cursor.execute( - f"UPDATE farmer SET singleton_tip=%s, singleton_tip_state=%s, is_pool_member=%s WHERE launcher_id=%s", + "UPDATE farmer SET singleton_tip=%s, singleton_tip_state=%s, is_pool_member=%s WHERE launcher_id=%s", entry, ) await connection.commit() async def get_pay_to_singleton_phs(self) -> Set[bytes32]: - with await self.pool as connection: + with await self.pool_db as connection: cursor = await connection.cursor() await cursor.execute("SELECT p2_singleton_puzzle_hash from farmer") rows = await cursor.fetchall() @@ -177,7 +177,7 @@ async def get_farmer_records_for_p2_singleton_phs(self, puzzle_hashes: Set[bytes if len(puzzle_hashes) == 0: return [] puzzle_hashes_db = tuple([ph.hex() for ph in list(puzzle_hashes)]) - with await self.pool as connection: + with await self.pool_db as connection: cursor = await connection.cursor() await cursor.execute( f'SELECT * from farmer WHERE p2_singleton_puzzle_hash in ({"%s," * (len(puzzle_hashes_db) - 1)}%s) ', @@ -188,9 +188,9 @@ async def get_farmer_records_for_p2_singleton_phs(self, puzzle_hashes: Set[bytes return [self._row_to_farmer_record(row) for row in rows] async def get_farmer_points_and_payout_instructions(self) -> List[Tuple[uint64, bytes]]: - with await self.pool as connection: + with await self.pool_db as connection: cursor = await connection.cursor() - await cursor.execute(f"SELECT points, payout_instructions FROM farmer") + await cursor.execute("SELECT points, payout_instructions FROM farmer") rows = await cursor.fetchall() await cursor.close() accumulated: Dict[bytes32, uint64] = {} @@ -198,40 +198,40 @@ async def get_farmer_points_and_payout_instructions(self) -> List[Tuple[uint64, points: uint64 = uint64(row[0]) ph: bytes32 = bytes32(bytes.fromhex(row[1])) if ph in accumulated: - accumulated[ph] += points + accumulated[ph] = uint64(accumulated[ph] + points) else: accumulated[ph] = points - ret: List[Tuple[uint64, bytes32]] = [] + ret: List[Tuple[uint64, bytes]] = [] for ph, total_points in accumulated.items(): ret.append((total_points, ph)) return ret async def clear_farmer_points(self) -> None: - with await self.pool as connection: + with await self.pool_db as connection: cursor = await connection.cursor() - await cursor.execute(f"UPDATE farmer SET points=0") + await cursor.execute("UPDATE farmer SET points=0") await cursor.close() await connection.commit() async def add_partial(self, launcher_id: bytes32, timestamp: uint64, difficulty: uint64): - with await self.pool as connection: + with await self.pool_db as connection: cursor = await connection.cursor() await cursor.execute( "INSERT INTO partial VALUES(%s, %s, %s)", (launcher_id.hex(), timestamp, difficulty), ) await connection.commit() - with await self.pool as connection: + with await self.pool_db as connection: cursor = await connection.cursor() await cursor.execute( - f"UPDATE farmer SET points=points+%s WHERE launcher_id=%s", (difficulty, launcher_id.hex()) + "UPDATE farmer SET points=points+%s WHERE launcher_id=%s", (difficulty, launcher_id.hex()) ) await connection.commit() await cursor.close() async def get_recent_partials(self, launcher_id: bytes32, count: int) -> List[Tuple[uint64, uint64]]: - with await self.pool as connection: + with await self.pool_db as connection: cursor = await connection.cursor() await cursor.execute( "SELECT timestamp, difficulty from partial WHERE launcher_id=%s ORDER BY timestamp DESC LIMIT %s", diff --git a/pool/store/sqlite_store.py b/pool/store/sqlite_store.py index ac35876..dedb27c 100644 --- a/pool/store/sqlite_store.py +++ b/pool/store/sqlite_store.py @@ -1,5 +1,7 @@ +from __future__ import annotations + from pathlib import Path -from typing import Optional, Set, List, Tuple, Dict +from typing import Dict, List, Optional, Set, Tuple import aiosqlite from blspy import G1Element @@ -8,9 +10,9 @@ from chia.types.coin_spend import CoinSpend from chia.util.ints import uint64 -from .abstract import AbstractPoolStore from ..record import FarmerRecord from ..util import RequestMetadata +from .abstract import AbstractPoolStore class SqlitePoolStore(AbstractPoolStore): @@ -57,10 +59,10 @@ async def connect(self): @staticmethod def _row_to_farmer_record(row) -> FarmerRecord: return FarmerRecord( - bytes.fromhex(row[0]), - bytes.fromhex(row[1]), + bytes32(bytes.fromhex(row[0])), + bytes32(bytes.fromhex(row[1])), row[2], - bytes.fromhex(row[3]), + bytes32(bytes.fromhex(row[3])), G1Element.from_bytes(bytes.fromhex(row[4])), CoinSpend.from_bytes(row[5]), PoolState.from_bytes(row[6]), @@ -70,8 +72,9 @@ def _row_to_farmer_record(row) -> FarmerRecord: True if row[10] == 1 else False, ) - async def add_farmer_record(self, farmer_record: FarmerRecord, metadata: RequestMetadata): + async def add_farmer_record(self, farmer_record: FarmerRecord, metadata: RequestMetadata) -> None: # Find the launcher_id exists. + assert self.connection cursor = await self.connection.execute( "SELECT * from farmer where launcher_id=?", (farmer_record.launcher_id.hex(),), @@ -80,7 +83,7 @@ async def add_farmer_record(self, farmer_record: FarmerRecord, metadata: Request # Insert for None if row is None: cursor = await self.connection.execute( - f"INSERT INTO farmer VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + "INSERT INTO farmer VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ( farmer_record.launcher_id.hex(), farmer_record.p2_singleton_puzzle_hash.hex(), @@ -98,16 +101,16 @@ async def add_farmer_record(self, farmer_record: FarmerRecord, metadata: Request # update for Exist else: cursor = await self.connection.execute( - f"UPDATE farmer SET " - f"p2_singleton_puzzle_hash=?, " - f"delay_time=?, " - f"delay_puzzle_hash=?, " - f"authentication_public_key=?, " - f"singleton_tip=?, " - f"singleton_tip_state=?, " - f"payout_instructions=?, " - f"is_pool_member=? " - f"WHERE launcher_id=?", + "UPDATE farmer SET " + "p2_singleton_puzzle_hash=?, " + "delay_time=?, " + "delay_puzzle_hash=?, " + "authentication_public_key=?, " + "singleton_tip=?, " + "singleton_tip_state=?, " + "payout_instructions=?, " + "is_pool_member=? " + "WHERE launcher_id=?", ( farmer_record.p2_singleton_puzzle_hash.hex(), farmer_record.delay_time, @@ -125,6 +128,7 @@ async def add_farmer_record(self, farmer_record: FarmerRecord, metadata: Request async def get_farmer_record(self, launcher_id: bytes32) -> Optional[FarmerRecord]: # TODO(pool): use cache + assert self.connection cursor = await self.connection.execute( "SELECT * from farmer where launcher_id=?", (launcher_id.hex(),), @@ -136,8 +140,9 @@ async def get_farmer_record(self, launcher_id: bytes32) -> Optional[FarmerRecord return self._row_to_farmer_record(row) async def update_difficulty(self, launcher_id: bytes32, difficulty: uint64): + assert self.connection cursor = await self.connection.execute( - f"UPDATE farmer SET difficulty=? WHERE launcher_id=?", (difficulty, launcher_id.hex()) + "UPDATE farmer SET difficulty=? WHERE launcher_id=?", (difficulty, launcher_id.hex()) ) await cursor.close() await self.connection.commit() @@ -149,15 +154,17 @@ async def update_singleton( singleton_tip_state: PoolState, is_pool_member: bool, ): + assert self.connection entry = (bytes(singleton_tip), bytes(singleton_tip_state), int(is_pool_member), launcher_id.hex()) cursor = await self.connection.execute( - f"UPDATE farmer SET singleton_tip=?, singleton_tip_state=?, is_pool_member=? WHERE launcher_id=?", + "UPDATE farmer SET singleton_tip=?, singleton_tip_state=?, is_pool_member=? WHERE launcher_id=?", entry, ) await cursor.close() await self.connection.commit() async def get_pay_to_singleton_phs(self) -> Set[bytes32]: + assert self.connection cursor = await self.connection.execute("SELECT p2_singleton_puzzle_hash from farmer") rows = await cursor.fetchall() await cursor.close() @@ -170,6 +177,7 @@ async def get_pay_to_singleton_phs(self) -> Set[bytes32]: async def get_farmer_records_for_p2_singleton_phs(self, puzzle_hashes: Set[bytes32]) -> List[FarmerRecord]: if len(puzzle_hashes) == 0: return [] + assert self.connection puzzle_hashes_db = tuple([ph.hex() for ph in list(puzzle_hashes)]) cursor = await self.connection.execute( f'SELECT * from farmer WHERE p2_singleton_puzzle_hash in ({"?," * (len(puzzle_hashes_db) - 1)}?) ', @@ -180,7 +188,8 @@ async def get_farmer_records_for_p2_singleton_phs(self, puzzle_hashes: Set[bytes return [self._row_to_farmer_record(row) for row in rows] async def get_farmer_points_and_payout_instructions(self) -> List[Tuple[uint64, bytes]]: - cursor = await self.connection.execute(f"SELECT points, payout_instructions from farmer") + assert self.connection + cursor = await self.connection.execute("SELECT points, payout_instructions from farmer") rows = await cursor.fetchall() await cursor.close() @@ -189,33 +198,36 @@ async def get_farmer_points_and_payout_instructions(self) -> List[Tuple[uint64, points: uint64 = uint64(row[0]) ph: bytes32 = bytes32(bytes.fromhex(row[1])) if ph in accumulated: - accumulated[ph] += points + accumulated[ph] = uint64(accumulated[ph] + points) else: accumulated[ph] = points - ret: List[Tuple[uint64, bytes32]] = [] + ret: List[Tuple[uint64, bytes]] = [] for ph, total_points in accumulated.items(): ret.append((total_points, ph)) return ret async def clear_farmer_points(self) -> None: - cursor = await self.connection.execute(f"UPDATE farmer set points=0") + assert self.connection + cursor = await self.connection.execute("UPDATE farmer set points=0") await cursor.close() await self.connection.commit() async def add_partial(self, launcher_id: bytes32, timestamp: uint64, difficulty: uint64): + assert self.connection cursor = await self.connection.execute( "INSERT into partial VALUES(?, ?, ?)", (launcher_id.hex(), timestamp, difficulty), ) await cursor.close() cursor = await self.connection.execute( - f"UPDATE farmer SET points=points+? WHERE launcher_id=?", (difficulty, launcher_id.hex()) + "UPDATE farmer SET points=points+? WHERE launcher_id=?", (difficulty, launcher_id.hex()) ) await cursor.close() await self.connection.commit() async def get_recent_partials(self, launcher_id: bytes32, count: int) -> List[Tuple[uint64, uint64]]: + assert self.connection cursor = await self.connection.execute( "SELECT timestamp, difficulty from partial WHERE launcher_id=? ORDER BY timestamp DESC LIMIT ?", (launcher_id.hex(), count), diff --git a/pool/util.py b/pool/util.py index c229abd..9569b37 100644 --- a/pool/util.py +++ b/pool/util.py @@ -1,7 +1,7 @@ from dataclasses import dataclass from typing import Dict, Mapping -from chia.protocols.pool_protocol import PoolErrorCode, ErrorResponse +from chia.protocols.pool_protocol import ErrorResponse, PoolErrorCode from chia.util.ints import uint16 from chia.util.json_util import obj_to_response @@ -27,7 +27,7 @@ class RequestMetadata: headers: Mapping[str, str] # header names are all lower case cookies: Dict[str, str] query: Dict[str, str] # query params passed in the url. These are not used by chia clients at the moment, but - # allow for a lot of adjustments and thanks to including them now they can be used without introducing breaking changes + # they allow users of this code to extend the protocol without introducing breaking changes remote: str # address of the client making the request def __post_init__(self): diff --git a/requirements.txt b/requirements.txt index 7997105..e7eb08d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,8 @@ -git+git://github.com/Chia-Network/chia-blockchain.git@main#egg=chia-blockchain -blspy~=1.0.2 -setuptools~=56.1.0 -aiosqlite~=0.17.0 -aiohttp~=3.7.4 -pytest==6.2.4 +chia-blockchain==2.0.0 +blspy==2.0.2 +setuptools~=67.6.1 +aiosqlite==0.19.0 +aiohttp==3.8.4 +pytest==7.4.0 +PyYAML==6.0.1 +PyMySQL==1.1.0 diff --git a/setup.py b/setup.py index d35c105..e03006c 100644 --- a/setup.py +++ b/setup.py @@ -1,4 +1,5 @@ import os +import sys import setuptools from setuptools import setup @@ -12,7 +13,24 @@ def read(fname): return open(os.path.join(os.path.dirname(__file__), fname)).read() -setup( +dependencies = [ + "chia-blockchain==2.0.0", + "blspy==2.0.2", + "setuptools~=56.1.0", + "aiosqlite==0.19.0", + "aiohttp==3.8.4", + "pytest==7.4.0", + "PyMySQL==1.1.0", +] + +dev_dependencies = [ + "types-aiofiles==23.1.0.5", + "types-pyyaml==6.0.12.11", + "types-setuptools==68.0.0.3", + "types-PyMySQL==1.1.0.1", +] + +kwargs = dict( name="chia-pool-reference", version="1.2", author="Mariano Sorgente", @@ -20,7 +38,10 @@ def read(fname): description=("A reference pool for the Chia blockchain."), license="Apache", packages=setuptools.find_packages(), - install_requires=["wheel", "chia-blockchain", "Flask"], + install_requires=dependencies, + extras_require=dict( + dev=dev_dependencies, + ), long_description=read("README.md"), classifiers=[ "Development Status :: 3 - Alpha", @@ -28,3 +49,9 @@ def read(fname): "License :: OSI Approved :: BSD License", ], ) + +if "setup_file" in sys.modules: + # include dev deps in regular deps when run in snyk + dependencies.extend(dev_dependencies) + +setup(**kwargs) # type: ignore