Skip to content

Commit

Permalink
Add mypy checks
Browse files Browse the repository at this point in the history
  • Loading branch information
Andre0512 committed Apr 15, 2023
1 parent b6ca12e commit f54b7b2
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 63 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/python-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@ jobs:
run: |
python -m pip install --upgrade pip
python -m pip install -r requirements.txt
python -m pip install flake8 pylint black
python -m pip install flake8 pylint black mypy
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=88 --statistics
- name: Type check with mypy
run: |
# stop the build if there are Python syntax errors or undefined names
mypy pyhon/
# - name: Analysing the code with pylint
# run: |
# pylint --max-line-length 88 $(git ls-files '*.py')
Expand Down
26 changes: 16 additions & 10 deletions pyhon/appliance.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,29 @@
import importlib
from contextlib import suppress
from typing import Optional, Dict
from typing import Optional, Dict, Any
from typing import TYPE_CHECKING

from pyhon import helper
from pyhon.commands import HonCommand
from pyhon.parameter import HonParameterFixed

if TYPE_CHECKING:
from pyhon import HonAPI


class HonAppliance:
def __init__(self, api, info: Dict, zone: int = 0) -> None:
def __init__(
self, api: Optional["HonAPI"], info: Dict[str, Any], zone: int = 0
) -> None:
if attributes := info.get("attributes"):
info["attributes"] = {v["parName"]: v["parValue"] for v in attributes}
self._info = info
self._api = api
self._appliance_model = {}
self._info: Dict = info
self._api: Optional[HonAPI] = api
self._appliance_model: Dict = {}

self._commands = {}
self._statistics = {}
self._attributes = {}
self._commands: Dict = {}
self._statistics: Dict = {}
self._attributes: Dict = {}
self._zone = zone

try:
Expand Down Expand Up @@ -58,11 +64,11 @@ def _check_name_zone(self, name: str, frontend: bool = True) -> str:

@property
def appliance_model_id(self) -> str:
return self._info.get("applianceModelId")
return self._info.get("applianceModelId", "")

@property
def appliance_type(self) -> str:
return self._info.get("applianceTypeName")
return self._info.get("applianceTypeName", "")

@property
def mac_address(self) -> str:
Expand Down
64 changes: 32 additions & 32 deletions pyhon/connection/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
import re
import secrets
import urllib
from contextlib import suppress
from dataclasses import dataclass
from datetime import datetime, timedelta
from pprint import pformat
from typing import Dict, Optional
from typing import Dict, Optional, List
from urllib import parse
from urllib.parse import quote

Expand Down Expand Up @@ -82,14 +83,15 @@ async def _error_logger(self, response: ClientResponse, fail: bool = True) -> No
if fail:
raise exceptions.HonAuthenticationError("Can't login")

def _generate_nonce(self) -> str:
@staticmethod
def _generate_nonce() -> str:
nonce = secrets.token_hex(16)
return f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}"

async def _load_login(self):
async def _load_login(self) -> bool:
login_url = await self._introduce()
login_url = await self._handle_redirects(login_url)
await self._login_url(login_url)
return await self._login_url(login_url)

async def _introduce(self) -> str:
redirect_uri = urllib.parse.quote(f"{const.APP}://mobilesdk/detect/oauth/done")
Expand All @@ -101,8 +103,8 @@ async def _introduce(self) -> str:
"scope": "api openid refresh_token web",
"nonce": self._generate_nonce(),
}
params = "&".join([f"{k}={v}" for k, v in params.items()])
url = f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}"
params_encode = "&".join([f"{k}={v}" for k, v in params.items()])
url = f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params_encode}"
async with self._request.get(url) as response:
text = await response.text()
self._expires = datetime.utcnow()
Expand All @@ -115,7 +117,7 @@ async def _introduce(self) -> str:

async def _manual_redirect(self, url: str) -> str:
async with self._request.get(url, allow_redirects=False) as response:
if not (new_location := response.headers.get("Location")):
if not (new_location := response.headers.get("Location", "")):
await self._error_logger(response)
return new_location

Expand All @@ -138,11 +140,11 @@ async def _login_url(self, login_url: str) -> bool:
)
return True
await self._error_logger(response)
return False

async def _login(self):
start_url = parse.unquote(self._login_data.url.split("startURL=")[-1]).split(
"%3D"
)[0]
async def _login(self) -> str:
start_url = self._login_data.url.rsplit("startURL=", maxsplit=1)[-1]
start_url = parse.unquote(start_url).split("%3D")[0]
action = {
"id": "79;a",
"descriptor": "apex://LightningLoginCustomController/ACTION$login",
Expand Down Expand Up @@ -175,50 +177,48 @@ async def _login(self):
params=params,
) as response:
if response.status == 200:
try:
data = await response.json()
return data["events"][0]["attributes"]["values"]["url"]
except json.JSONDecodeError:
pass
except KeyError:
_LOGGER.error(
"Can't get login url - %s", pformat(await response.json())
)
with suppress(json.JSONDecodeError, KeyError):
result = await response.json()
return result["events"][0]["attributes"]["values"]["url"]
await self._error_logger(response)
return ""

def _parse_token_data(self, text):
def _parse_token_data(self, text: str) -> None:
if access_token := re.findall("access_token=(.*?)&", text):
self._access_token = access_token[0]
if refresh_token := re.findall("refresh_token=(.*?)&", text):
self._refresh_token = refresh_token[0]
if id_token := re.findall("id_token=(.*?)&", text):
self._id_token = id_token[0]

async def _get_token(self, url):
async def _get_token(self, url: str) -> bool:
async with self._request.get(url) as response:
if response.status != 200:
await self._error_logger(response)
return False
url = re.findall("href\\s*=\\s*[\"'](.+?)[\"']", await response.text())
if not url:
url_search = re.findall(
"href\\s*=\\s*[\"'](.+?)[\"']", await response.text()
)
if not url_search:
await self._error_logger(response)
return False
if "ProgressiveLogin" in url[0]:
async with self._request.get(url[0]) as response:
if "ProgressiveLogin" in url_search[0]:
async with self._request.get(url_search[0]) as response:
if response.status != 200:
await self._error_logger(response)
return False
url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await response.text())
url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0]
url_search = re.findall(
"href\\s*=\\s*[\"'](.*?)[\"']", await response.text()
)
url = "/".join(const.AUTH_API.split("/")[:-1]) + url_search[0]
async with self._request.get(url) as response:
if response.status != 200:
await self._error_logger(response)
return False
self._parse_token_data(await response.text())
return True

async def _api_auth(self):
async def _api_auth(self) -> bool:
post_headers = {"id-token": self._id_token}
data = self._device.get()
async with self._request.post(
Expand All @@ -232,7 +232,7 @@ async def _api_auth(self):
self._cognito_token = json_data["cognitoUser"]["Token"]
return True

async def authenticate(self):
async def authenticate(self) -> None:
self.clear()
try:
if not await self._load_login():
Expand All @@ -246,7 +246,7 @@ async def authenticate(self):
except exceptions.HonNoAuthenticationNeeded:
return

async def refresh(self):
async def refresh(self) -> bool:
params = {
"client_id": const.CLIENT_ID,
"refresh_token": self._refresh_token,
Expand All @@ -264,7 +264,7 @@ async def refresh(self):
self._access_token = data["access_token"]
return await self._api_auth()

def clear(self):
def clear(self) -> None:
self._session.cookie_jar.clear_domain(const.AUTH_API.split("/")[-2])
self._request.called_urls = []
self._cognito_token = ""
Expand Down
10 changes: 5 additions & 5 deletions pyhon/connection/handler/base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Optional, Callable, Dict
from typing import Optional, Callable, Dict, Any

import aiohttp
from typing_extensions import Self
Expand Down Expand Up @@ -37,18 +37,18 @@ def _intercept(self, method: Callable, *args, loop: int = 0, **kwargs):
raise NotImplementedError

@asynccontextmanager
async def get(self, *args, **kwargs) -> AsyncIterator[Callable]:
async def get(self, *args, **kwargs) -> AsyncIterator[aiohttp.ClientResponse]:
if self._session is None:
raise exceptions.NoSessionException()
response: Callable
response: aiohttp.ClientResponse
async with self._intercept(self._session.get, *args, **kwargs) as response:
yield response

@asynccontextmanager
async def post(self, *args, **kwargs) -> AsyncIterator[Callable]:
async def post(self, *args, **kwargs) -> AsyncIterator[aiohttp.ClientResponse]:
if self._session is None:
raise exceptions.NoSessionException()
response: Callable
response: aiohttp.ClientResponse
async with self._intercept(self._session.post, *args, **kwargs) as response:
yield response

Expand Down
24 changes: 12 additions & 12 deletions pyhon/connection/handler/hon.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from pyhon.connection.auth import HonAuth
from pyhon.connection.device import HonDevice
from pyhon.connection.handler.base import ConnectionHandler
from pyhon.exceptions import HonAuthenticationError
from pyhon.exceptions import HonAuthenticationError, NoAuthenticationException

_LOGGER = logging.getLogger(__name__)

Expand All @@ -30,7 +30,9 @@ def __init__(
self._auth: Optional[HonAuth] = None

@property
def auth(self) -> Optional[HonAuth]:
def auth(self) -> HonAuth:
if self._auth is None:
raise NoAuthenticationException()
return self._auth

@property
Expand All @@ -39,16 +41,14 @@ def device(self) -> HonDevice:

async def create(self) -> Self:
await super().create()
self._auth: HonAuth = HonAuth(
self._session, self._email, self._password, self._device
)
self._auth = HonAuth(self._session, self._email, self._password, self._device)
return self

async def _check_headers(self, headers: Dict) -> Dict:
if not (self._auth.cognito_token and self._auth.id_token):
await self._auth.authenticate()
headers["cognito-token"] = self._auth.cognito_token
headers["id-token"] = self._auth.id_token
if not (self.auth.cognito_token and self.auth.id_token):
await self.auth.authenticate()
headers["cognito-token"] = self.auth.cognito_token
headers["id-token"] = self.auth.id_token
return self._HEADERS | headers

@asynccontextmanager
Expand All @@ -58,16 +58,16 @@ async def _intercept(
kwargs["headers"] = await self._check_headers(kwargs.get("headers", {}))
async with method(*args, **kwargs) as response:
if (
self._auth.token_expires_soon or response.status in [401, 403]
self.auth.token_expires_soon or response.status in [401, 403]
) and loop == 0:
_LOGGER.info("Try refreshing token...")
await self._auth.refresh()
await self.auth.refresh()
async with self._intercept(
method, *args, loop=loop + 1, **kwargs
) as result:
yield result
elif (
self._auth.token_is_expired or response.status in [401, 403]
self.auth.token_is_expired or response.status in [401, 403]
) and loop == 1:
_LOGGER.warning(
"%s - Error %s - %s",
Expand Down
6 changes: 3 additions & 3 deletions pyhon/hon.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import asyncio
from typing import List, Optional, Dict
from typing import List, Optional, Dict, Any
from typing_extensions import Self

from aiohttp import ClientSession
Expand Down Expand Up @@ -39,8 +39,8 @@ async def create(self) -> Self:
def appliances(self) -> List[HonAppliance]:
return self._appliances

async def _create_appliance(self, appliance: Dict, zone=0) -> None:
appliance = HonAppliance(self._api, appliance, zone=zone)
async def _create_appliance(self, appliance_data: Dict[str, Any], zone=0) -> None:
appliance = HonAppliance(self._api, appliance_data, zone=zone)
if appliance.mac_address is None:
return
await asyncio.gather(
Expand Down

0 comments on commit f54b7b2

Please sign in to comment.