Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release 1.0.36 #141

Merged
merged 20 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
c4d27fa
Many mypy update
emlowe Nov 3, 2023
5713576
isort, black, flake8, mypy updates
emlowe Nov 3, 2023
7e8ab5a
Merge pull request #132 from Chia-Network/EL.mypy-fixes
TheLastCicada Nov 6, 2023
aeff2b8
fix some odd incompatibilities
emlowe Nov 6, 2023
f383205
feat: optimized sub-optimal scan_token_activity cron
MichaelTaylor3D Nov 7, 2023
fff5a6a
Merge pull request #133 from Chia-Network/EL.remove-annotations
TheLastCicada Nov 7, 2023
b5e3b35
Merge remote-tracking branch 'origin/develop' into fix/scan_token_act…
MichaelTaylor3D Nov 7, 2023
0c125b3
fix org loop
MichaelTaylor3D Nov 8, 2023
2cd1fc6
fix org loop
MichaelTaylor3D Nov 8, 2023
e5f276d
style: linting update
emlowe Nov 8, 2023
e69d540
fix: activities scan
MichaelTaylor3D Nov 8, 2023
98f0666
fix: activities scan
MichaelTaylor3D Nov 8, 2023
81e5e0b
fix: issues with reading from DBs due to bad context manager code
emlowe Nov 8, 2023
1c96ac1
Merge pull request #138 from Chia-Network/EL.fix-db-session
MichaelTaylor3D Nov 9, 2023
69ed88d
Merge remote-tracking branch 'origin/develop' into fix/scan_token_act…
MichaelTaylor3D Nov 9, 2023
aceb549
fix: issues with token pydantic classes and forward refs
emlowe Nov 13, 2023
6f48cd8
Merge pull request #139 from Chia-Network/EL.fix-token-schema-issues
TheLastCicada Nov 13, 2023
0992ca4
Merge remote-tracking branch 'origin/develop' into fix/scan_token_act…
MichaelTaylor3D Nov 13, 2023
0bb7599
Merge pull request #134 from Chia-Network/fix/scan_token_activity
TheLastCicada Nov 13, 2023
f540df3
chore: bump version
TheLastCicada Nov 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .isort.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[settings]
line_length = 120
profile=black
skip_gitignore=true
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Climate Token Driver Suite

![Minimum Chia Version](https://raw.githubusercontent.com/Chia-Network/core-registry-api/main/minimumChiaVersion.svg)
![Tested Up to Chia Version](https://raw.githubusercontent.com/Chia-Network/core-registry-api/main/testedChiaVersion.svg)

This application can run in 4 modes, each providing a separate application with a distinct use case:
Expand Down
22 changes: 17 additions & 5 deletions app/api/dependencies.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
from __future__ import annotations

import enum
from contextlib import AbstractAsyncContextManager, asynccontextmanager
from pathlib import Path
from typing import Iterator
from typing import AsyncGenerator, AsyncIterator

from chia.rpc.full_node_rpc_client import FullNodeRpcClient
from chia.rpc.rpc_client import RpcClient
from chia.rpc.wallet_rpc_client import WalletRpcClient
from chia.util.config import load_config
from chia.util.default_root import DEFAULT_ROOT_PATH
from chia.util.ints import uint16
from sqlalchemy.orm import Session

from app.config import settings
from app.db.session import get_session_local_cls
from app.logger import logger


def get_db_session_context() -> AbstractAsyncContextManager[Session]:
return asynccontextmanager(get_db_session)()


async def get_db_session() -> Session:
SessionLocal = await get_session_local_cls()

Expand All @@ -37,17 +45,20 @@ async def _get_rpc_client(
self_hostname: str,
rpc_port: int,
root_path: Path = DEFAULT_ROOT_PATH,
) -> Iterator[RpcClient]:
) -> AsyncGenerator[RpcClient, None]:
rpc_client_cls = {
NodeType.FULL_NODE: FullNodeRpcClient,
NodeType.WALLET: WalletRpcClient,
}.get(node_type)

if rpc_client_cls is None:
raise ValueError(f"Invalid node_type: {node_type}")

config = load_config(root_path, "config.yaml")

client = await rpc_client_cls.create(
self_hostname=self_hostname,
port=rpc_port,
port=uint16(rpc_port),
root_path=root_path,
net_config=config,
)
Expand All @@ -61,7 +72,7 @@ async def _get_rpc_client(
await client.await_closed()


async def get_wallet_rpc_client() -> Iterator[WalletRpcClient]:
async def get_wallet_rpc_client() -> AsyncIterator[WalletRpcClient]:
async for _ in _get_rpc_client(
node_type=NodeType.WALLET,
self_hostname=settings.CHIA_HOSTNAME,
Expand All @@ -71,7 +82,8 @@ async def get_wallet_rpc_client() -> Iterator[WalletRpcClient]:
yield _


async def get_full_node_rpc_client() -> Iterator[FullNodeRpcClient]:
@asynccontextmanager
async def get_full_node_rpc_client() -> AsyncIterator[FullNodeRpcClient]:
async for _ in _get_rpc_client(
node_type=NodeType.FULL_NODE,
self_hostname=settings.CHIA_HOSTNAME,
Expand Down
2 changes: 2 additions & 0 deletions app/api/v1/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
from __future__ import annotations

from app.api.v1.core import router
19 changes: 7 additions & 12 deletions app/api/v1/activities.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from typing import Dict, List, Optional
from pydantic import ValidationError
from typing import Any, Dict, List, Optional

from fastapi import APIRouter, Depends
from fastapi.encoders import jsonable_encoder
from pydantic import ValidationError
from sqlalchemy.orm import Session

from app import crud, models, schemas
Expand All @@ -11,7 +12,7 @@
from app.errors import ErrorCode
from app.logger import logger
from app.utils import disallow
import pprint

router = APIRouter()


Expand All @@ -26,15 +27,15 @@ async def get_activity(
limit: int = 10,
sort: str = "desc",
db: Session = Depends(deps.get_db_session),
):
) -> schemas.ActivitiesResponse:
"""Get activity.

This endpoint is to be called by the explorer.
"""

db_crud = crud.DBCrud(db=db)

activity_filters = {"or": [], "and": []}
activity_filters: Dict[str, Any] = {"or": [], "and": []}
cw_filters = {}
match search_by:
case schemas.ActivitySearchBy.ONCHAIN_METADATA:
Expand Down Expand Up @@ -91,17 +92,11 @@ async def get_activity(
limit=limit,
)
if len(activities) == 0:
logger.warning(
f"No data to get from activities. filters:{activity_filters} page:{page} limit:{limit}"
)
logger.warning(f"No data to get from activities. filters:{activity_filters} page:{page} limit:{limit}")
return schemas.ActivitiesResponse()

pp = pprint.PrettyPrinter(indent=4)

pp.pprint(f"Got {len(activities)} activities from activities table.")
activities_with_cw: List[schemas.ActivityWithCW] = []
for activity in activities:
pp.pprint(f"Checking activity: {activity}")
unit = units.get(activity.asset_id)
if unit is None:
continue
Expand Down
6 changes: 5 additions & 1 deletion app/api/v1/core.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
from __future__ import annotations

from typing import Dict

from fastapi import APIRouter

from app.api.v1 import activities, cron, keys, tokens, transactions
Expand All @@ -9,7 +13,7 @@


@router.get("/info")
async def get_info():
async def get_info() -> Dict[str, str]:
return {
"blockchain_name": "Chia Network",
"blockchain_name_short": "chia",
Expand Down
108 changes: 71 additions & 37 deletions app/api/v1/cron.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from __future__ import annotations

import asyncio
from typing import Any, Dict, List, Optional
import json
from typing import List

from blspy import G1Element
from chia.consensus.block_record import BlockRecord
Expand All @@ -18,7 +21,7 @@
from app.errors import ErrorCode
from app.logger import logger
from app.models import State
from app.utils import as_async_contextmanager, disallow
from app.utils import disallow

router = APIRouter()
errorcode = ErrorCode()
Expand All @@ -27,7 +30,7 @@

@router.on_event("startup")
@disallow([ExecutionMode.REGISTRY, ExecutionMode.CLIENT])
async def init_db():
async def init_db() -> None:
Engine = await get_engine_cls()

if not database_exists(Engine.url):
Expand All @@ -38,7 +41,7 @@ async def init_db():

Base.metadata.create_all(Engine)

async with as_async_contextmanager(deps.get_db_session) as db:
async with deps.get_db_session_context() as db:
state = State(id=1, current_height=settings.BLOCK_START, peak_height=None)
db_state = [jsonable_encoder(state)]

Expand Down Expand Up @@ -70,34 +73,67 @@ async def _scan_token_activity(

logger.info(f"Scanning blocks {start_height} - {end_height} for activity")

climate_units: Dict[
str, Any
] = climate_warehouse.combine_climate_units_and_metadata(search={})
for unit in climate_units:
token: Optional[Dict] = unit.get("token")

# is None or empty
if not token:
logger.warning(f"Can not get token in climate warehouse unit. unit:{unit}")
continue
# Check if SCAN_ALL_ORGANIZATIONS is defined and True, otherwise treat as False
scan_all = getattr(settings, "SCAN_ALL_ORGANIZATIONS", False)

public_key = G1Element.from_bytes(hexstr_to_bytes(token["public_key"]))
all_organizations = climate_warehouse.get_climate_organizations()
if not scan_all:
# Convert to a list of organizations where `isHome` is True
climate_organizations = [org for org in all_organizations.values() if org.get("isHome", False)]
else:
# Convert to a list of all organizations
climate_organizations = list(all_organizations.values())

activities: List[schemas.Activity] = await blockchain.get_activities(
org_uid=token["org_uid"],
warehouse_project_id=token["warehouse_project_id"],
vintage_year=token["vintage_year"],
sequence_num=token["sequence_num"],
public_key=public_key,
start_height=start_height,
end_height=end_height,
peak_height=state.peak_height,
)
for org in climate_organizations:
org_uid = org["orgUid"]
org_name = org["name"]

if len(activities) == 0:
org_metadata = climate_warehouse.get_climate_organizations_metadata(org_uid)
if not org_metadata:
logger.warning(f"Cannot get metadata in CADT organization: {org_name}")
continue

db_crud.batch_insert_ignore_activity(activities)
for key, value_str in org_metadata.items():
try:
tokenization_dict = json.loads(value_str)
required_fields = [
"org_uid",
"warehouse_project_id",
"vintage_year",
"sequence_num",
"public_key",
"index",
]
optional_fields = ["permissionless_retirement", "detokenization"]

if not all(field in tokenization_dict for field in required_fields) or not any(
field in tokenization_dict for field in optional_fields
):
# not a tokenization record
continue

public_key = G1Element.from_bytes(hexstr_to_bytes(tokenization_dict["public_key"]))
activities: List[schemas.Activity] = await blockchain.get_activities(
org_uid=tokenization_dict["org_uid"],
warehouse_project_id=tokenization_dict["warehouse_project_id"],
vintage_year=tokenization_dict["vintage_year"],
sequence_num=tokenization_dict["sequence_num"],
public_key=public_key,
start_height=state.current_height,
end_height=end_height,
peak_height=state.peak_height,
)

if len(activities) == 0:
continue

db_crud.batch_insert_ignore_activity(activities)
logger.info(f"Activities for {org_name} and asset id: {key} added to the database.")

except json.JSONDecodeError as e:
logger.error(f"Failed to parse JSON for key {key} in organization {org_name}: {str(e)}")
except Exception as e:
logger.error(f"An error occurred for organization {org_name} under key {key}: {str(e)}")

db_crud.update_block_state(current_height=target_start_height)
return True
Expand All @@ -112,13 +148,11 @@ async def scan_token_activity() -> None:

async with (
lock,
as_async_contextmanager(deps.get_db_session) as db,
as_async_contextmanager(deps.get_full_node_rpc_client) as full_node_client,
deps.get_db_session_context() as db,
deps.get_full_node_rpc_client() as full_node_client,
):
db_crud = crud.DBCrud(db=db)
climate_warehouse = crud.ClimateWareHouseCrud(
url=settings.CADT_API_SERVER_HOST, api_key=settings.CADT_API_KEY
)
climate_warehouse = crud.ClimateWareHouseCrud(url=settings.CADT_API_SERVER_HOST, api_key=settings.CADT_API_KEY)
blockchain = crud.BlockChainCrud(full_node_client=full_node_client)

try:
Expand All @@ -145,9 +179,9 @@ async def scan_token_activity() -> None:
async def _scan_blockchain_state(
db_crud: crud.DBCrud,
full_node_client: FullNodeRpcClient,
):
state: Dict = await full_node_client.get_blockchain_state()
peak: Dict = state.get("peak")
) -> None:
state = await full_node_client.get_blockchain_state()
peak = state.get("peak")

if peak is None:
logger.warning("Full node is not synced")
Expand All @@ -162,8 +196,8 @@ async def _scan_blockchain_state(
@disallow([ExecutionMode.REGISTRY, ExecutionMode.CLIENT])
async def scan_blockchain_state() -> None:
async with (
as_async_contextmanager(deps.get_db_session) as db,
as_async_contextmanager(deps.get_full_node_rpc_client) as full_node_client,
deps.get_db_session_context() as db,
deps.get_full_node_rpc_client() as full_node_client,
):
db_crud = crud.DBCrud(db=db)

Expand Down
17 changes: 6 additions & 11 deletions app/api/v1/keys.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, Optional
from typing import Optional

from blspy import G1Element, PrivateKey
from chia.consensus.coinbase import create_puzzlehash_for_pk
Expand All @@ -7,10 +7,7 @@
from chia.util.bech32m import decode_puzzle_hash, encode_puzzle_hash
from chia.util.byte_types import hexstr_to_bytes
from chia.util.ints import uint32
from chia.wallet.derive_keys import (
master_sk_to_wallet_sk,
master_sk_to_wallet_sk_unhardened,
)
from chia.wallet.derive_keys import master_sk_to_wallet_sk, master_sk_to_wallet_sk_unhardened
from fastapi import APIRouter, Depends

from app import schemas
Expand All @@ -31,20 +28,18 @@ async def get_key(
derivation_index: int = 0,
prefix: str = "bls1238",
wallet_rpc_client: WalletRpcClient = Depends(deps.get_wallet_rpc_client),
):
) -> schemas.Key:
fingerprint: int = await wallet_rpc_client.get_logged_in_fingerprint()

result: Dict = await wallet_rpc_client.get_private_key(fingerprint)
result = await wallet_rpc_client.get_private_key(fingerprint)

secret_key = PrivateKey.from_bytes(hexstr_to_bytes(result["sk"]))

wallet_secret_key: PrivateKey
if hardened:
wallet_secret_key = master_sk_to_wallet_sk(secret_key, uint32(derivation_index))
else:
wallet_secret_key = master_sk_to_wallet_sk_unhardened(
secret_key, uint32(derivation_index)
)
wallet_secret_key = master_sk_to_wallet_sk_unhardened(secret_key, uint32(derivation_index))

wallet_public_key: G1Element = wallet_secret_key.get_g1()
puzzle_hash: bytes32 = create_puzzlehash_for_pk(wallet_public_key)
Expand All @@ -62,7 +57,7 @@ async def get_key(
)
async def parse_key(
address: str,
):
) -> Optional[schemas.Key]:
try:
puzzle_hash: bytes = decode_puzzle_hash(address)
except ValueError:
Expand Down
Loading