Skip to content

Commit

Permalink
Use dataclass for login data
Browse files Browse the repository at this point in the history
  • Loading branch information
Andre0512 committed Apr 15, 2023
1 parent 4a0ee85 commit b6ca12e
Showing 1 changed file with 42 additions and 29 deletions.
71 changes: 42 additions & 29 deletions pyhon/connection/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import re
import secrets
import urllib
from dataclasses import dataclass
from datetime import datetime, timedelta
from pprint import pformat
from typing import Dict, Optional
from urllib import parse
from urllib.parse import quote

Expand All @@ -17,15 +19,25 @@
_LOGGER = logging.getLogger(__name__)


@dataclass
class HonLoginData:
url: str = ""
email: str = ""
password: str = ""
fw_uid: str = ""
loaded: Optional[Dict] = None


class HonAuth:
_TOKEN_EXPIRES_AFTER_HOURS = 8
_TOKEN_EXPIRE_WARNING_HOURS = 7

def __init__(self, session, email, password, device) -> None:
self._session = session
self._request = HonAuthConnectionHandler(session)
self._email = email
self._password = password
self._login_data = HonLoginData()
self._login_data.email = email
self._login_data.password = password
self._access_token = ""
self._refresh_token = ""
self._cognito_token = ""
Expand Down Expand Up @@ -77,7 +89,7 @@ def _generate_nonce(self) -> str:
async def _load_login(self):
login_url = await self._introduce()
login_url = await self._handle_redirects(login_url)
return await self._login_url(login_url)
await self._login_url(login_url)

async def _introduce(self) -> str:
redirect_uri = urllib.parse.quote(f"{const.APP}://mobilesdk/detect/oauth/done")
Expand Down Expand Up @@ -112,46 +124,47 @@ async def _handle_redirects(self, login_url) -> str:
redirect2 = await self._manual_redirect(redirect1)
return f"{redirect2}&System=IoT_Mobile_App&RegistrationSubChannel=hOn"

async def _login_url(self, login_url: str) -> str:
async def _login_url(self, login_url: str) -> bool:
headers = {"user-agent": const.USER_AGENT}
url = URL(login_url, encoded=True)
async with self._request.get(url, headers=headers) as response:
text = await response.text()
if context := re.findall('"fwuid":"(.*?)","loaded":(\\{.*?})', text):
fw_uid, loaded_str = context[0]
loaded = json.loads(loaded_str)
result = login_url.replace("/".join(const.AUTH_API.split("/")[:-1]), "")
return fw_uid, loaded, result
self._login_data.fw_uid = fw_uid
self._login_data.loaded = json.loads(loaded_str)
self._login_data.url = login_url.replace(
"/".join(const.AUTH_API.split("/")[:-1]), ""
)
return True
await self._error_logger(response)

async def _login(self, fw_uid, loaded, login_url):
data = {
"message": {
"actions": [
{
"id": "79;a",
"descriptor": "apex://LightningLoginCustomController/ACTION$login",
"callingDescriptor": "markup://c:loginForm",
"params": {
"username": quote(self._email),
"password": quote(self._password),
"startUrl": parse.unquote(
login_url.split("startURL=")[-1]
).split("%3D")[0],
},
}
]
async def _login(self):
start_url = parse.unquote(self._login_data.url.split("startURL=")[-1]).split(
"%3D"
)[0]
action = {
"id": "79;a",
"descriptor": "apex://LightningLoginCustomController/ACTION$login",
"callingDescriptor": "markup://c:loginForm",
"params": {
"username": quote(self._login_data.email),
"password": quote(self._login_data.password),
"startUrl": start_url,
},
}
data = {
"message": {"actions": [action]},
"aura.context": {
"mode": "PROD",
"fwuid": fw_uid,
"fwuid": self._login_data.fw_uid,
"app": "siteforce:loginApp2",
"loaded": loaded,
"loaded": self._login_data.loaded,
"dn": [],
"globals": {},
"uad": False,
},
"aura.pageURI": login_url,
"aura.pageURI": self._login_data.url,
"aura.token": None,
}
params = {"r": 3, "other.LightningLoginCustom.login": 1}
Expand Down Expand Up @@ -222,9 +235,9 @@ async def _api_auth(self):
async def authenticate(self):
self.clear()
try:
if not (login_site := await self._load_login()):
if not await self._load_login():
raise exceptions.HonAuthenticationError("Can't open login page")
if not (url := await self._login(*login_site)):
if not (url := await self._login()):
raise exceptions.HonAuthenticationError("Can't login")
if not await self._get_token(url):
raise exceptions.HonAuthenticationError("Can't get token")
Expand Down

0 comments on commit b6ca12e

Please sign in to comment.