From 409aa11fcbf924ae3cf97f87d8095fc0052fc160 Mon Sep 17 00:00:00 2001 From: Andre Basche Date: Fri, 17 Mar 2023 01:56:04 +0100 Subject: [PATCH] Clean up authentication --- pyhon/auth.py | 166 +++++++++++++++++++++++++++++--------------------- 1 file changed, 96 insertions(+), 70 deletions(-) diff --git a/pyhon/auth.py b/pyhon/auth.py index 33c89bd..eed4a49 100644 --- a/pyhon/auth.py +++ b/pyhon/auth.py @@ -6,6 +6,7 @@ import urllib from urllib import parse import aiohttp as aiohttp +from yarl import URL from pyhon import const @@ -14,7 +15,8 @@ _LOGGER = logging.getLogger() class HonAuth: def __init__(self) -> None: - self._framework = "" + self._access_token = "" + self._refresh_token = "" self._cognito_token = "" self._id_token = "" @@ -26,71 +28,15 @@ class HonAuth: def id_token(self): return self._id_token - async def _get_frontdoor_url(self, session, email, password): - data = { - "message": { - "actions": [ - { - "id": "79;a", - "descriptor": "apex://LightningLoginCustomController/ACTION$login", - "callingDescriptor": "markup://c:loginForm", - "params": { - "username": email, - "password": password, - "startUrl": "" - } - } - ] - }, - "aura.context": { - "mode": "PROD", - "fwuid": self._framework, - "app": "siteforce:loginApp2", - "loaded": {"APPLICATION@markup://siteforce:loginApp2": "YtNc5oyHTOvavSB9Q4rtag"}, - "dn": [], - "globals": {}, - "uad": False}, - "aura.pageURI": f"SmartHome/s/login/?language={const.LANGUAGE}", - "aura.token": None} + @property + def access_token(self): + return self._access_token - params = {"r": 3, "other.LightningLoginCustom.login": 1} - async with session.post( - const.AUTH_API + "/s/sfsites/aura", - headers={"Content-Type": "application/x-www-form-urlencoded"}, - data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()), - params=params - ) as response: - if response.status != 200: - _LOGGER.error("Unable to connect to the login service: %s\n%s", response.status, await response.text()) - return "" - try: - text = await response.text() - return (await response.json())["events"][0]["attributes"]["values"]["url"] - except json.JSONDecodeError: - if framework := re.findall('clientOutOfSync.*?Expected: ([\\w-]+?) Actual: (.*?)"', text): - self._framework, actual = framework[0] - _LOGGER.debug('Framework update from "%s" to "%s"', self._framework, actual) - return await self._get_frontdoor_url(session, email, password) - _LOGGER.error("Unable to retrieve the frontdoor URL. Message: " + text) - return "" + @property + def refresh_token(self): + return self._refresh_token - async def _prepare_login(self, session, email, password): - if not (frontdoor_url := await self._get_frontdoor_url(session, email, password)): - return False - - async with session.get(frontdoor_url) as resp: - if resp.status != 200: - _LOGGER.error("Unable to connect to the login service: %s", resp.status) - return False - - params = {"retURL": "/SmartHome/apex/CustomCommunitiesLanding"} - async with session.get(f"{const.AUTH_API}/apex/ProgressiveLogin", params=params) as resp: - if resp.status != 200: - _LOGGER.error("Unable to connect to the login service: %s", resp.status) - return False - return True - - async def _login(self, session): + async def _load_login(self, session): nonce = secrets.token_hex(16) nonce = f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}" params = { @@ -101,18 +47,98 @@ class HonAuth: "scope": "api openid refresh_token web", "nonce": nonce } + headers = {"user-agent": "Chrome/110.0.5481.153"} params = "&".join([f"{k}={v}" for k, v in params.items()]) - async with session.get(f"{const.AUTH_API}/services/oauth2/authorize?{params}") as resp: - if id_token := re.findall("id_token=(.*?)&", await resp.text()): - self._id_token = id_token[0] - return True + async with session.get(f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}") as resp: + if not (login_url := re.findall("url = '(.+?)'", await resp.text())): + return False + async with session.get(login_url[0], allow_redirects=False) as redirect1: + if not (url := redirect1.headers.get("Location")): + return False + async with session.get(url, allow_redirects=False) as redirect2: + if not (url := redirect2.headers.get("Location") + "&System=IoT_Mobile_App&RegistrationSubChannel=hOn"): + return False + async with session.get(URL(url, encoded=True), headers=headers) as login_screen: + if context := re.findall('"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text()): + fw_uid, loaded_str = context[0] + loaded = json.loads(loaded_str) + login_url = login_url[0].replace("/".join(const.AUTH_API.split("/")[:-1]), "") + return fw_uid, loaded, login_url return False + async def _login(self, session, email, password, fw_uid, loaded, login_url): + data = { + "message": { + "actions": [ + { + "id": "79;a", + "descriptor": "apex://LightningLoginCustomController/ACTION$login", + "callingDescriptor": "markup://c:loginForm", + "params": { + "username": email, + "password": password, + "startUrl": parse.unquote(login_url.split("startURL=")[-1]).split("%3D")[0] + } + } + ] + }, + "aura.context": { + "mode": "PROD", + "fwuid": fw_uid, + "app": "siteforce:loginApp2", + "loaded": loaded, + "dn": [], + "globals": {}, + "uad": False}, + "aura.pageURI": login_url, + "aura.token": None} + + params = {"r": 3, "other.LightningLoginCustom.login": 1} + async with session.post( + const.AUTH_API + "/s/sfsites/aura", + headers={"Content-Type": "application/x-www-form-urlencoded"}, + data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()), + params=params + ) as response: + if response.status == 200: + try: + return (await response.json())["events"][0]["attributes"]["values"]["url"] + except json.JSONDecodeError: + pass + _LOGGER.error("Unable to login: %s\n%s", response.status, await response.text()) + return "" + + async def _get_token(self, session, url): + async with session.get(url) as resp: + if resp.status != 200: + _LOGGER.error("Unable to get token: %s", resp.status) + return False + url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await resp.text()) + async with session.get(url[0]) as resp: + if resp.status != 200: + _LOGGER.error("Unable to get token: %s", resp.status) + return False + url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await resp.text()) + url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0] + async with session.get(url) as resp: + if resp.status != 200: + _LOGGER.error("Unable to connect to the login service: %s", resp.status) + return False + text = await resp.text() + if access_token := re.findall("access_token=(.*?)&", text): + self._access_token = access_token[0] + if refresh_token := re.findall("refresh_token=(.*?)&", text): + self._refresh_token = refresh_token[0] + if id_token := re.findall("id_token=(.*?)&", text): + self._id_token = id_token[0] + return True + async def authorize(self, email, password, mobile_id): async with aiohttp.ClientSession() as session: - if not await self._prepare_login(session, email, password): + fw_uid, loaded, login_url = await self._load_login(session) + if not (url := await self._login(session, email, password, fw_uid, loaded, login_url)): return False - if not await self._login(session): + if not await self._get_token(session, url): return False post_headers = {"Content-Type": "application/json", "id-token": self._id_token}