From 8fa01343bcb27b4512c93e26918ea16a44d714a5 Mon Sep 17 00:00:00 2001 From: Andre Basche Date: Sun, 9 Apr 2023 18:13:50 +0200 Subject: [PATCH] Use connection handler --- pyhon/__init__.py | 2 +- pyhon/__main__.py | 6 +- pyhon/{device.py => appliance.py} | 2 +- pyhon/connection/__init__.py | 0 pyhon/{ => connection}/api.py | 168 ++++++++++++------------------ pyhon/{ => connection}/auth.py | 73 ++++++------- pyhon/connection/connection.py | 100 ++++++++++++++++++ pyhon/connection/device.py | 36 +++++++ 8 files changed, 247 insertions(+), 140 deletions(-) mode change 100755 => 100644 pyhon/__main__.py rename pyhon/{device.py => appliance.py} (99%) create mode 100644 pyhon/connection/__init__.py rename pyhon/{ => connection}/api.py (50%) rename pyhon/{ => connection}/auth.py (68%) create mode 100644 pyhon/connection/connection.py create mode 100644 pyhon/connection/device.py diff --git a/pyhon/__init__.py b/pyhon/__init__.py index b13705c..c761612 100644 --- a/pyhon/__init__.py +++ b/pyhon/__init__.py @@ -1 +1 @@ -from .api import HonConnection +from .connection.api import HonAPI diff --git a/pyhon/__main__.py b/pyhon/__main__.py old mode 100755 new mode 100644 index e5843d4..cc0c1be --- a/pyhon/__main__.py +++ b/pyhon/__main__.py @@ -11,7 +11,7 @@ from pprint import pprint if __name__ == "__main__": sys.path.insert(0, str(Path(__file__).parent.parent)) -from pyhon import HonConnection +from pyhon import HonAPI _LOGGER = logging.getLogger(__name__) @@ -85,7 +85,7 @@ def create_command(commands, concat=False): async def translate(language, json_output=False): - async with HonConnection() as hon: + async with HonAPI() as hon: keys = await hon.translation_keys(language) if json_output: print(json.dumps(keys, indent=4)) @@ -104,7 +104,7 @@ async def main(): user = input("User for hOn account: ") if not (password := args["password"]): password = getpass("Password for hOn account: ") - async with HonConnection(user, password) as hon: + async with HonAPI(user, password) as hon: for device in hon.devices: print("=" * 10, device.appliance_type, "-", device.nick_name, "=" * 10) if args.get("keys"): diff --git a/pyhon/device.py b/pyhon/appliance.py similarity index 99% rename from pyhon/device.py rename to pyhon/appliance.py index 711b7e1..4137466 100644 --- a/pyhon/device.py +++ b/pyhon/appliance.py @@ -5,7 +5,7 @@ from pyhon.commands import HonCommand from pyhon.parameter import HonParameterFixed -class HonDevice: +class HonAppliance: def __init__(self, connector, appliance): if attributes := appliance.get("attributes"): appliance["attributes"] = {v["parName"]: v["parValue"] for v in attributes} diff --git a/pyhon/connection/__init__.py b/pyhon/connection/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyhon/api.py b/pyhon/connection/api.py similarity index 50% rename from pyhon/api.py rename to pyhon/connection/api.py index 4a72274..62196a3 100644 --- a/pyhon/api.py +++ b/pyhon/connection/api.py @@ -1,74 +1,58 @@ import asyncio import json import logging -import secrets from datetime import datetime from typing import List -import aiohttp as aiohttp - from pyhon import const -from pyhon.auth import HonAuth -from pyhon.device import HonDevice +from pyhon.appliance import HonAppliance +from pyhon.connection.connection import HonConnectionHandler, HonAnonymousConnectionHandler _LOGGER = logging.getLogger() -class HonConnection: - def __init__(self, email="", password="", session=None) -> None: +class HonAPI: + def __init__(self, email="", password="") -> None: super().__init__() self._email = email self._password = password - self._request_headers = {"Content-Type": "application/json"} - self._session = session self._devices = [] - self._mobile_id = secrets.token_hex(8) + self._hon = None + self._hon_anonymous = HonAnonymousConnectionHandler() async def __aenter__(self): - self._session = aiohttp.ClientSession() - if self._email and self._password: - await self.setup() + self._hon = HonConnectionHandler(self._email, self._password) + await self._hon.create() + await self.setup() + return self async def __aexit__(self, exc_type, exc_val, exc_tb): - await self._session.close() + await self._hon.close() @property - def devices(self) -> List[HonDevice]: + def devices(self) -> List[HonAppliance]: return self._devices - @property - async def _headers(self): - if "cognito-token" not in self._request_headers or "id-token" not in self._request_headers: - auth = HonAuth() - if await auth.authorize(self._email, self._password, self._mobile_id): - self._request_headers["cognito-token"] = auth.cognito_token - self._request_headers["id-token"] = auth.id_token - else: - raise PermissionError("Can't Login") - return self._request_headers - async def setup(self): - async with aiohttp.ClientSession() as session: - async with session.get(f"{const.API_URL}/commands/v1/appliance", - headers=await self._headers) as resp: - try: - appliances = (await resp.json())["payload"]["appliances"] - for appliance in appliances: - device = HonDevice(self, appliance) - if device.mac_address is None: - continue - await asyncio.gather(*[ - device.load_attributes(), - device.load_commands(), - device.load_statistics()]) - self._devices.append(device) - except json.JSONDecodeError: - _LOGGER.error("No JSON Data after GET: %s", await resp.text()) - return False + async with self._hon.get(f"{const.API_URL}/commands/v1/appliance") as resp: + try: + appliances = (await resp.json())["payload"]["appliances"] + for appliance in appliances: + device = HonAppliance(self, appliance) + if device.mac_address is None: + continue + await asyncio.gather(*[ + device.load_attributes(), + device.load_commands(), + device.load_statistics()]) + self._devices.append(device) + except json.JSONDecodeError: + _LOGGER.error("No JSON Data after GET: %s", await resp.text()) + return False return True - async def load_commands(self, device: HonDevice): + async def load_commands(self, device: HonAppliance): params = { "applianceType": device.appliance_type, "code": device.appliance["code"], @@ -81,84 +65,46 @@ class HonConnection: "series": device.appliance["series"], } url = f"{const.API_URL}/commands/v1/retrieve" - async with self._session.get(url, params=params, headers=await self._headers) as response: + async with self._hon.get(url, params=params) as response: result = (await response.json()).get("payload", {}) if not result or result.pop("resultCode") != "0": return {} return result - async def command_history(self, device: HonDevice): + async def command_history(self, device: HonAppliance): url = f"{const.API_URL}/commands/v1/appliance/{device.mac_address}/history" - async with self._session.get(url, headers=await self._headers) as response: + async with self._hon.get(url) as response: result = await response.json() if not result or not result.get("payload"): return {} return result["payload"]["history"] - async def last_activity(self, device: HonDevice): + async def last_activity(self, device: HonAppliance): url = f"{const.API_URL}/commands/v1/retrieve-last-activity" params = {"macAddress": device.mac_address} - async with self._session.get(url, params=params, headers=await self._headers) as response: + async with self._hon.get(url, params=params) as response: result = await response.json() if result and (activity := result.get("attributes")): return activity return {} - async def appliance_configuration(self): - url = f"{const.API_URL}/config/v1/appliance-configuration" - headers = {"x-api-key": const.API_KEY, "content-type": "application/json"} - async with self._session.get(url, headers=headers) as response: - result = await response.json() - if result and (data := result.get("payload")): - return data - return {} - - async def app_config(self, language="en", beta=True): - headers = {"x-api-key": const.API_KEY, "content-type": "application/json"} - url = f"{const.API_URL}/app-config" - payload = { - "languageCode": language, - "beta": beta, - "appVersion": const.APP_VERSION, - "os": const.OS - } - payload = json.dumps(payload, separators=(',', ':')) - async with self._session.post(url, headers=headers, data=payload) as response: - if (result := await response.json()) and (data := result.get("payload")): - return data - return {} - - async def translation_keys(self, language="en"): - headers = {"x-api-key": const.API_KEY, "content-type": "application/json"} - config = await self.app_config(language=language) - if url := config.get("language", {}).get("jsonPath"): - async with self._session.get(url, headers=headers) as response: - if result := await response.json(): - return result - return {} - - async def load_attributes(self, device: HonDevice, loop=False): + async def load_attributes(self, device: HonAppliance): params = { "macAddress": device.mac_address, "applianceType": device.appliance_type, "category": "CYCLE" } url = f"{const.API_URL}/commands/v1/context" - async with self._session.get(url, params=params, headers=await self._headers) as response: - if response.status == 403 and not loop: - _LOGGER.error("%s - Error %s - %s", url, response.status, await response.text()) - self._request_headers.pop("cognito-token", None) - self._request_headers.pop("id-token", None) - return await self.load_attributes(device, loop=True) + async with self._hon.get(url, params=params) as response: return (await response.json()).get("payload", {}) - async def load_statistics(self, device: HonDevice): + async def load_statistics(self, device: HonAppliance): params = { "macAddress": device.mac_address, "applianceType": device.appliance_type } url = f"{const.API_URL}/commands/v1/statistics" - async with self._session.get(url, params=params, headers=await self._headers) as response: + async with self._hon.get(url, params=params) as response: return (await response.json()).get("payload", {}) async def send_command(self, device, command, parameters, ancillary_parameters): @@ -169,13 +115,7 @@ class HonConnection: "commandName": command, "transactionId": f"{device.mac_address}_{now[:-3]}Z", "applianceOptions": device.commands_options, - "device": { - "mobileId": self._mobile_id, - "mobileOs": const.OS, - "osVersion": const.OS_VERSION, - "appVersion": const.APP_VERSION, - "deviceModel": const.DEVICE_MODEL - }, + "device": self._hon.device.get(), "attributes": { "channel": "mobileApp", "origin": "standardProgram", @@ -186,7 +126,7 @@ class HonConnection: "applianceType": device.appliance_type } url = f"{const.API_URL}/commands/v1/send" - async with self._session.post(url, headers=await self._headers, json=data) as resp: + async with self._hon.post(url, json=data) as resp: try: json_data = await resp.json() except json.JSONDecodeError: @@ -194,3 +134,33 @@ class HonConnection: if json_data["payload"]["resultCode"] == "0": return True return False + + async def appliance_configuration(self): + url = f"{const.API_URL}/config/v1/appliance-configuration" + async with self._hon_anonymous.get(url) as response: + result = await response.json() + if result and (data := result.get("payload")): + return data + return {} + + async def app_config(self, language="en", beta=True): + url = f"{const.API_URL}/app-config" + payload = { + "languageCode": language, + "beta": beta, + "appVersion": const.APP_VERSION, + "os": const.OS + } + payload = json.dumps(payload, separators=(',', ':')) + async with self._hon_anonymous.post(url, data=payload) as response: + if (result := await response.json()) and (data := result.get("payload")): + return data + return {} + + async def translation_keys(self, language="en"): + config = await self.app_config(language=language) + if url := config.get("language", {}).get("jsonPath"): + async with self._hon_anonymous.get(url) as response: + if result := await response.json(): + return result + return {} diff --git a/pyhon/auth.py b/pyhon/connection/auth.py similarity index 68% rename from pyhon/auth.py rename to pyhon/connection/auth.py index 1baca58..72a7426 100644 --- a/pyhon/auth.py +++ b/pyhon/connection/auth.py @@ -3,9 +3,9 @@ import logging import re import secrets import urllib +from pprint import pprint from urllib import parse -import aiohttp as aiohttp from yarl import URL from pyhon import const @@ -14,11 +14,15 @@ _LOGGER = logging.getLogger() class HonAuth: - def __init__(self) -> None: + def __init__(self, session, email, password, device) -> None: + self._session = session + self._email = email + self._password = password self._access_token = "" self._refresh_token = "" self._cognito_token = "" self._id_token = "" + self._device = device @property def cognito_token(self): @@ -36,7 +40,7 @@ class HonAuth: def refresh_token(self): return self._refresh_token - async def _load_login(self, session): + async def _load_login(self): nonce = secrets.token_hex(16) nonce = f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}" params = { @@ -48,16 +52,16 @@ class HonAuth: "nonce": nonce } params = "&".join([f"{k}={v}" for k, v in params.items()]) - async with session.get(f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}") as resp: + async with self._session.get(f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}") as resp: if not (login_url := re.findall("url = '(.+?)'", await resp.text())): return False - async with session.get(login_url[0], allow_redirects=False) as redirect1: + async with self._session.get(login_url[0], allow_redirects=False) as redirect1: if not (url := redirect1.headers.get("Location")): return False - async with session.get(url, allow_redirects=False) as redirect2: + async with self._session.get(url, allow_redirects=False) as redirect2: if not (url := redirect2.headers.get("Location") + "&System=IoT_Mobile_App&RegistrationSubChannel=hOn"): return False - async with session.get(URL(url, encoded=True)) as login_screen: + async with self._session.get(URL(url, encoded=True)) as login_screen: if context := re.findall('"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text()): fw_uid, loaded_str = context[0] loaded = json.loads(loaded_str) @@ -65,7 +69,7 @@ class HonAuth: return fw_uid, loaded, login_url return False - async def _login(self, session, email, password, fw_uid, loaded, login_url): + async def _login(self, fw_uid, loaded, login_url): data = { "message": { "actions": [ @@ -74,8 +78,8 @@ class HonAuth: "descriptor": "apex://LightningLoginCustomController/ACTION$login", "callingDescriptor": "markup://c:loginForm", "params": { - "username": email, - "password": password, + "username": self._email, + "password": self._password, "startUrl": parse.unquote(login_url.split("startURL=")[-1]).split("%3D")[0] } } @@ -93,7 +97,7 @@ class HonAuth: "aura.token": None} params = {"r": 3, "other.LightningLoginCustom.login": 1} - async with session.post( + async with self._session.post( const.AUTH_API + "/s/sfsites/aura", headers={"Content-Type": "application/x-www-form-urlencoded"}, data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()), @@ -107,19 +111,19 @@ class HonAuth: _LOGGER.error("Unable to login: %s\n%s", response.status, await response.text()) return "" - async def _get_token(self, session, url): - async with session.get(url) as resp: + async def _get_token(self, url): + async with self._session.get(url) as resp: if resp.status != 200: _LOGGER.error("Unable to get token: %s", resp.status) return False url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await resp.text()) - async with session.get(url[0]) as resp: + async with self._session.get(url[0]) as resp: if resp.status != 200: _LOGGER.error("Unable to get token: %s", resp.status) return False url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await resp.text()) url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0] - async with session.get(url) as resp: + async with self._session.get(url) as resp: if resp.status != 200: _LOGGER.error("Unable to connect to the login service: %s", resp.status) return False @@ -132,26 +136,23 @@ class HonAuth: self._id_token = id_token[0] return True - async def authorize(self, email, password, mobile_id): - headers = {"user-agent": const.USER_AGENT} - async with aiohttp.ClientSession(headers=headers) as session: - if login_site := await self._load_login(session): - fw_uid, loaded, login_url = login_site - else: - return False - if not (url := await self._login(session, email, password, fw_uid, loaded, login_url)): - return False - if not await self._get_token(session, url): - return False + async def authorize(self): + if login_site := await self._load_login(): + fw_uid, loaded, login_url = login_site + else: + return False + if not (url := await self._login(fw_uid, loaded, login_url)): + return False + if not await self._get_token(url): + return False - post_headers = {"Content-Type": "application/json", "id-token": self._id_token} - data = {"appVersion": const.APP_VERSION, "mobileId": mobile_id, "osVersion": const.OS_VERSION, - "os": const.OS, "deviceModel": const.DEVICE_MODEL} - async with session.post(f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data) as resp: - try: - json_data = await resp.json() - except json.JSONDecodeError: - _LOGGER.error("No JSON Data after POST: %s", await resp.text()) - return False - self._cognito_token = json_data["cognitoUser"]["Token"] + post_headers = {"Content-Type": "application/json", "id-token": self._id_token} + data = self._device.get() + async with self._session.post(f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data) as resp: + try: + json_data = await resp.json() + except json.JSONDecodeError: + _LOGGER.error("No JSON Data after POST: %s", await resp.text()) + return False + self._cognito_token = json_data["cognitoUser"]["Token"] return True diff --git a/pyhon/connection/connection.py b/pyhon/connection/connection.py new file mode 100644 index 0000000..6c9bf56 --- /dev/null +++ b/pyhon/connection/connection.py @@ -0,0 +1,100 @@ +from contextlib import asynccontextmanager + +import aiohttp + +from pyhon import const +from pyhon.connection.auth import HonAuth, _LOGGER +from pyhon.connection.device import HonDevice + + +class HonBaseConnectionHandler: + _HEADERS = {"user-agent": const.USER_AGENT, "Content-Type": "application/json"} + + def __init__(self): + self._session = None + self._auth = None + + async def __aenter__(self): + await self.create() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.close() + + async def create(self): + self._session = aiohttp.ClientSession(headers=self._HEADERS) + + @asynccontextmanager + async def get(self, *args, **kwargs): + raise NotImplemented + + @asynccontextmanager + async def post(self, *args, **kwargs): + raise NotImplemented + + async def close(self): + await self._session.close() + + +class HonConnectionHandler(HonBaseConnectionHandler): + def __init__(self, email, password): + super().__init__() + self._device = HonDevice() + self._email = email + self._password = password + if not self._email: + raise PermissionError("Login-Error - An email address must be specified") + if not self._password: + raise PermissionError("Login-Error - A password address must be specified") + self._request_headers = {} + + @property + def device(self): + return self._device + + async def create(self): + await super().create() + self._auth = HonAuth(self._session, self._email, self._password, self._device) + + async def _check_headers(self, headers): + if "cognito-token" not in self._request_headers or "id-token" not in self._request_headers: + if await self._auth.authorize(): + self._request_headers["cognito-token"] = self._auth.cognito_token + self._request_headers["id-token"] = self._auth.id_token + else: + raise PermissionError("Can't Login") + return {h: v for h, v in self._request_headers.items() if h not in headers} + + @asynccontextmanager + async def get(self, *args, loop=0, **kwargs): + kwargs["headers"] = await self._check_headers(kwargs.get("headers", {})) + async with self._session.get(*args, **kwargs) as response: + if response.status == 403 and not loop: + _LOGGER.warning("%s - Error %s - %s", response.request_info.url, response.status, await response.text()) + await self.create() + yield await self.get(*args, loop=loop + 1, **kwargs) + elif loop >= 2: + _LOGGER.error("%s - Error %s - %s", response.request_info.url, response.status, await response.text()) + raise PermissionError() + else: + yield response + + @asynccontextmanager + async def post(self, *args, **kwargs): + kwargs["headers"] = await self._check_headers(kwargs.get("headers", {})) + async with self._session.post(*args, **kwargs) as response: + yield response + + +class HonAnonymousConnectionHandler(HonBaseConnectionHandler): + _HEADERS = HonBaseConnectionHandler._HEADERS | {"x-api-key": const.API_KEY} + + @asynccontextmanager + async def get(self, *args, **kwargs): + async with self._session.post(*args, **kwargs) as response: + yield response + + @asynccontextmanager + async def post(self, *args, **kwargs): + async with self._session.post(*args, **kwargs) as response: + yield response diff --git a/pyhon/connection/device.py b/pyhon/connection/device.py new file mode 100644 index 0000000..f817885 --- /dev/null +++ b/pyhon/connection/device.py @@ -0,0 +1,36 @@ +import secrets + +from pyhon import const + + +class HonDevice: + def __init__(self): + self._app_version = const.APP_VERSION + self._os_version = const.OS_VERSION + self._os = const.OS + self._device_model = const.DEVICE_MODEL + self._mobile_id = secrets.token_hex(8) + + @property + def app_version(self): + return self._app_version + + @property + def os_version(self): + return self._os_version + + @property + def os(self): + return self._os + + @property + def device_model(self): + return self._device_model + + @property + def mobile_id(self): + return self._mobile_id + + def get(self): + return {"appVersion": self.app_version, "mobileId": self.mobile_id, "osVersion": self.os_version, + "os": self.os, "deviceModel": self.device_model}