From b6ca12ebff90e6e2698d130d1dd353ca3278506d Mon Sep 17 00:00:00 2001 From: Andre Basche Date: Sat, 15 Apr 2023 14:37:27 +0200 Subject: [PATCH] Use dataclass for login data --- pyhon/connection/auth.py | 71 ++++++++++++++++++++++++---------------- 1 file changed, 42 insertions(+), 29 deletions(-) diff --git a/pyhon/connection/auth.py b/pyhon/connection/auth.py index 715d7ef..9c5802a 100644 --- a/pyhon/connection/auth.py +++ b/pyhon/connection/auth.py @@ -3,8 +3,10 @@ import logging import re import secrets import urllib +from dataclasses import dataclass from datetime import datetime, timedelta from pprint import pformat +from typing import Dict, Optional from urllib import parse from urllib.parse import quote @@ -17,6 +19,15 @@ from pyhon.connection.handler.auth import HonAuthConnectionHandler _LOGGER = logging.getLogger(__name__) +@dataclass +class HonLoginData: + url: str = "" + email: str = "" + password: str = "" + fw_uid: str = "" + loaded: Optional[Dict] = None + + class HonAuth: _TOKEN_EXPIRES_AFTER_HOURS = 8 _TOKEN_EXPIRE_WARNING_HOURS = 7 @@ -24,8 +35,9 @@ class HonAuth: def __init__(self, session, email, password, device) -> None: self._session = session self._request = HonAuthConnectionHandler(session) - self._email = email - self._password = password + self._login_data = HonLoginData() + self._login_data.email = email + self._login_data.password = password self._access_token = "" self._refresh_token = "" self._cognito_token = "" @@ -77,7 +89,7 @@ class HonAuth: async def _load_login(self): login_url = await self._introduce() login_url = await self._handle_redirects(login_url) - return await self._login_url(login_url) + await self._login_url(login_url) async def _introduce(self) -> str: redirect_uri = urllib.parse.quote(f"{const.APP}://mobilesdk/detect/oauth/done") @@ -112,46 +124,47 @@ class HonAuth: redirect2 = await self._manual_redirect(redirect1) return f"{redirect2}&System=IoT_Mobile_App&RegistrationSubChannel=hOn" - async def _login_url(self, login_url: str) -> str: + async def _login_url(self, login_url: str) -> bool: headers = {"user-agent": const.USER_AGENT} url = URL(login_url, encoded=True) async with self._request.get(url, headers=headers) as response: text = await response.text() if context := re.findall('"fwuid":"(.*?)","loaded":(\\{.*?})', text): fw_uid, loaded_str = context[0] - loaded = json.loads(loaded_str) - result = login_url.replace("/".join(const.AUTH_API.split("/")[:-1]), "") - return fw_uid, loaded, result + self._login_data.fw_uid = fw_uid + self._login_data.loaded = json.loads(loaded_str) + self._login_data.url = login_url.replace( + "/".join(const.AUTH_API.split("/")[:-1]), "" + ) + return True await self._error_logger(response) - async def _login(self, fw_uid, loaded, login_url): - data = { - "message": { - "actions": [ - { - "id": "79;a", - "descriptor": "apex://LightningLoginCustomController/ACTION$login", - "callingDescriptor": "markup://c:loginForm", - "params": { - "username": quote(self._email), - "password": quote(self._password), - "startUrl": parse.unquote( - login_url.split("startURL=")[-1] - ).split("%3D")[0], - }, - } - ] + async def _login(self): + start_url = parse.unquote(self._login_data.url.split("startURL=")[-1]).split( + "%3D" + )[0] + action = { + "id": "79;a", + "descriptor": "apex://LightningLoginCustomController/ACTION$login", + "callingDescriptor": "markup://c:loginForm", + "params": { + "username": quote(self._login_data.email), + "password": quote(self._login_data.password), + "startUrl": start_url, }, + } + data = { + "message": {"actions": [action]}, "aura.context": { "mode": "PROD", - "fwuid": fw_uid, + "fwuid": self._login_data.fw_uid, "app": "siteforce:loginApp2", - "loaded": loaded, + "loaded": self._login_data.loaded, "dn": [], "globals": {}, "uad": False, }, - "aura.pageURI": login_url, + "aura.pageURI": self._login_data.url, "aura.token": None, } params = {"r": 3, "other.LightningLoginCustom.login": 1} @@ -222,9 +235,9 @@ class HonAuth: async def authenticate(self): self.clear() try: - if not (login_site := await self._load_login()): + if not await self._load_login(): raise exceptions.HonAuthenticationError("Can't open login page") - if not (url := await self._login(*login_site)): + if not (url := await self._login()): raise exceptions.HonAuthenticationError("Can't login") if not await self._get_token(url): raise exceptions.HonAuthenticationError("Can't get token")