pyhOn/pyhon/auth.py
2023-03-21 01:11:33 +01:00

157 lines
6.4 KiB
Python

import json
import logging
import re
import secrets
import urllib
from urllib import parse
import aiohttp as aiohttp
from yarl import URL
from pyhon import const
_LOGGER = logging.getLogger()
class HonAuth:
def __init__(self) -> None:
self._access_token = ""
self._refresh_token = ""
self._cognito_token = ""
self._id_token = ""
@property
def cognito_token(self):
return self._cognito_token
@property
def id_token(self):
return self._id_token
@property
def access_token(self):
return self._access_token
@property
def refresh_token(self):
return self._refresh_token
async def _load_login(self, session):
nonce = secrets.token_hex(16)
nonce = f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}"
params = {
"response_type": "token+id_token",
"client_id": const.CLIENT_ID,
"redirect_uri": urllib.parse.quote(f"{const.APP}://mobilesdk/detect/oauth/done"),
"display": "touch",
"scope": "api openid refresh_token web",
"nonce": nonce
}
params = "&".join([f"{k}={v}" for k, v in params.items()])
async with session.get(f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}") as resp:
if not (login_url := re.findall("url = '(.+?)'", await resp.text())):
return False
async with session.get(login_url[0], allow_redirects=False) as redirect1:
if not (url := redirect1.headers.get("Location")):
return False
async with session.get(url, allow_redirects=False) as redirect2:
if not (url := redirect2.headers.get("Location") + "&System=IoT_Mobile_App&RegistrationSubChannel=hOn"):
return False
async with session.get(URL(url, encoded=True)) as login_screen:
if context := re.findall('"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text()):
fw_uid, loaded_str = context[0]
loaded = json.loads(loaded_str)
login_url = login_url[0].replace("/".join(const.AUTH_API.split("/")[:-1]), "")
return fw_uid, loaded, login_url
return False
async def _login(self, session, email, password, fw_uid, loaded, login_url):
data = {
"message": {
"actions": [
{
"id": "79;a",
"descriptor": "apex://LightningLoginCustomController/ACTION$login",
"callingDescriptor": "markup://c:loginForm",
"params": {
"username": email,
"password": password,
"startUrl": parse.unquote(login_url.split("startURL=")[-1]).split("%3D")[0]
}
}
]
},
"aura.context": {
"mode": "PROD",
"fwuid": fw_uid,
"app": "siteforce:loginApp2",
"loaded": loaded,
"dn": [],
"globals": {},
"uad": False},
"aura.pageURI": login_url,
"aura.token": None}
params = {"r": 3, "other.LightningLoginCustom.login": 1}
async with session.post(
const.AUTH_API + "/s/sfsites/aura",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()),
params=params
) as response:
if response.status == 200:
try:
return (await response.json())["events"][0]["attributes"]["values"]["url"]
except json.JSONDecodeError:
pass
_LOGGER.error("Unable to login: %s\n%s", response.status, await response.text())
return ""
async def _get_token(self, session, url):
async with session.get(url) as resp:
if resp.status != 200:
_LOGGER.error("Unable to get token: %s", resp.status)
return False
url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await resp.text())
async with session.get(url[0]) as resp:
if resp.status != 200:
_LOGGER.error("Unable to get token: %s", resp.status)
return False
url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await resp.text())
url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0]
async with session.get(url) as resp:
if resp.status != 200:
_LOGGER.error("Unable to connect to the login service: %s", resp.status)
return False
text = await resp.text()
if access_token := re.findall("access_token=(.*?)&", text):
self._access_token = access_token[0]
if refresh_token := re.findall("refresh_token=(.*?)&", text):
self._refresh_token = refresh_token[0]
if id_token := re.findall("id_token=(.*?)&", text):
self._id_token = id_token[0]
return True
async def authorize(self, email, password, mobile_id):
headers = {"user-agent": const.USER_AGENT}
async with aiohttp.ClientSession(headers=headers) as session:
if login_site := await self._load_login(session):
fw_uid, loaded, login_url = login_site
else:
return False
if not (url := await self._login(session, email, password, fw_uid, loaded, login_url)):
return False
if not await self._get_token(session, url):
return False
post_headers = {"Content-Type": "application/json", "id-token": self._id_token}
data = {"appVersion": const.APP_VERSION, "mobileId": mobile_id, "osVersion": const.OS_VERSION,
"os": const.OS, "deviceModel": const.DEVICE_MODEL}
async with session.post(f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data) as resp:
try:
json_data = await resp.json()
except json.JSONDecodeError:
_LOGGER.error("No JSON Data after POST: %s", await resp.text())
return False
self._cognito_token = json_data["cognitoUser"]["Token"]
return True