diff --git a/.github/workflows/python-check.yml b/.github/workflows/python-check.yml new file mode 100644 index 0000000..813ebd7 --- /dev/null +++ b/.github/workflows/python-check.yml @@ -0,0 +1,42 @@ +name: Python check + +on: + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + +jobs: + build: + + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ["3.10", "3.11"] + + steps: + - uses: actions/checkout@v3 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v3 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + python -m pip install -r requirements.txt + python -m pip install -r requirements_dev.txt + - name: Lint with flake8 + run: | + # stop the build if there are Python syntax errors or undefined names + flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + flake8 . --count --exit-zero --max-complexity=10 --max-line-length=88 --statistics + - name: Type check with mypy + run: | + mypy pyhon/ + # - name: Analysing the code with pylint + # run: | + # pylint --max-line-length 88 $(git ls-files '*.py') + - name: Check black style + run: | + black . --check diff --git a/.gitignore b/.gitignore index 6440f3c..078d48e 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ __pycache__/ dist/ **/*.egg-info/ test* +build/ diff --git a/README.md b/README.md index 46cfb42..b999d02 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ [![PyPI - Python Version](https://img.shields.io/pypi/pyversions/pyhOn)](https://www.python.org/) [![PyPI - License](https://img.shields.io/pypi/l/pyhOn)](https://github.com/Andre0512/pyhOn/blob/main/LICENSE) [![PyPI - Downloads](https://img.shields.io/pypi/dm/pyhOn)](https://pypistats.org/packages/pyhon) -Control your Haier appliances with python! +Control your Haier, Candy and Hoover appliances with python! The idea behind this library is, to make the use of all available commands as simple as possible. ## Installation @@ -47,28 +47,28 @@ settings: ### List devices ```python import asyncio -from pyhon import HonConnection +from pyhon import Hon async def devices_example(): - async with HonConnection(USER, PASSWORD) as hon: - for device in hon.devices: - print(device.nick_name) + async with Hon(USER, PASSWORD) as hon: + for appliance in hon.appliances: + print(appliance.nick_name) asyncio.run(devices_example()) ``` ### Execute a command ```python -async with HonConnection(USER, PASSWORD) as hon: - washing_machine = hon.devices[0] +async with Hon(USER, PASSWORD) as hon: + washing_machine = hon.appliances[0] pause_command = washing_machine.commands["pauseProgram"] await pause_command.send() ``` ### Set command parameter ```python -async with HonConnection(USER, PASSWORD) as hon: - washing_machine = hon.devices[0] +async with Hon(USER, PASSWORD) as hon: + washing_machine = hon.appliances[0] start_command = washing_machine.commands["startProgram"] for name, setting in start_command.settings: print("Setting", name) @@ -100,8 +100,6 @@ This generates a huge output. It is recommended to pipe this into a file $ pyhOn translate fr > hon_fr.yaml $ pyhOn translate en --json > hon_en.json ``` -## Tested devices -- Haier Washing Machine HW90 ## Usage example This library is used for the custom [HomeAssistant Integration "Haier hOn"](https://github.com/Andre0512/hOn). diff --git a/pyhon/__init__.py b/pyhon/__init__.py index b13705c..93728b7 100644 --- a/pyhon/__init__.py +++ b/pyhon/__init__.py @@ -1 +1,4 @@ -from .api import HonConnection +from .connection.api import HonAPI +from .hon import Hon + +__all__ = ["Hon", "HonAPI"] diff --git a/pyhon/__main__.py b/pyhon/__main__.py index e5843d4..ed28866 100755 --- a/pyhon/__main__.py +++ b/pyhon/__main__.py @@ -6,12 +6,11 @@ import logging import sys from getpass import getpass from pathlib import Path -from pprint import pprint if __name__ == "__main__": sys.path.insert(0, str(Path(__file__).parent.parent)) -from pyhon import HonConnection +from pyhon import Hon, HonAPI, helper _LOGGER = logging.getLogger(__name__) @@ -25,74 +24,30 @@ def get_arguments(): keys = subparser.add_parser("keys", help="print as key format") keys.add_argument("keys", help="print as key format", action="store_true") keys.add_argument("--all", help="print also full keys", action="store_true") - translate = subparser.add_parser("translate", help="print available translation keys") - translate.add_argument("translate", help="language (de, en, fr...)", metavar="LANGUAGE") + translate = subparser.add_parser( + "translate", help="print available translation keys" + ) + translate.add_argument( + "translate", help="language (de, en, fr...)", metavar="LANGUAGE" + ) translate.add_argument("--json", help="print as json", action="store_true") return vars(parser.parse_args()) -# yaml.dump() would be done the same, but needs an additional dependency... -def pretty_print(data, key="", intend=0, is_list=False): - if type(data) is list: - if key: - print(f"{' ' * intend}{'- ' if is_list else ''}{key}:") - intend += 1 - for i, value in enumerate(data): - pretty_print(value, intend=intend, is_list=True) - elif type(data) is dict: - if key: - print(f"{' ' * intend}{'- ' if is_list else ''}{key}:") - intend += 1 - for i, (key, value) in enumerate(sorted(data.items())): - if is_list and not i: - pretty_print(value, key=key, intend=intend, is_list=True) - elif is_list: - pretty_print(value, key=key, intend=intend + 1) - else: - pretty_print(value, key=key, intend=intend) - else: - print(f"{' ' * intend}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}") - - -def key_print(data, key="", start=True): - if type(data) is list: - for i, value in enumerate(data): - key_print(value, key=f"{key}.{i}", start=False) - elif type(data) is dict: - for k, value in sorted(data.items()): - key_print(value, key=k if start else f"{key}.{k}", start=False) - else: - print(f"{key}: {data}") - - -def create_command(commands, concat=False): - result = {} - for name, command in commands.items(): - if not concat: - result[name] = {} - for parameter, data in command.parameters.items(): - if data.typology == "enum": - value = data.values - elif data.typology == "range": - value = {"min": data.min, "max": data.max, "step": data.step} - else: - continue - if not concat: - result[name][parameter] = value - else: - result[f"{name}.{parameter}"] = value - return result - - async def translate(language, json_output=False): - async with HonConnection() as hon: + async with HonAPI(anonymous=True) as hon: keys = await hon.translation_keys(language) if json_output: print(json.dumps(keys, indent=4)) else: - clean_keys = json.dumps(keys).replace("\\n", "\\\\n").replace("\\\\r", "").replace("\\r", "") + clean_keys = ( + json.dumps(keys) + .replace("\\n", "\\\\n") + .replace("\\\\r", "") + .replace("\\r", "") + ) keys = json.loads(clean_keys) - pretty_print(keys) + print(helper.pretty_print(keys)) async def main(): @@ -104,19 +59,31 @@ async def main(): user = input("User for hOn account: ") if not (password := args["password"]): password = getpass("Password for hOn account: ") - async with HonConnection(user, password) as hon: - for device in hon.devices: + async with Hon(user, password) as hon: + for device in hon.appliances: print("=" * 10, device.appliance_type, "-", device.nick_name, "=" * 10) if args.get("keys"): data = device.data.copy() attr = "get" if args.get("all") else "pop" - key_print(data["attributes"].__getattribute__(attr)("parameters")) - key_print(data.__getattribute__(attr)("appliance")) - key_print(data) - pretty_print(create_command(device.commands, concat=True)) + print( + helper.key_print( + data["attributes"].__getattribute__(attr)("parameters") + ) + ) + print(helper.key_print(data.__getattribute__(attr)("appliance"))) + print(helper.key_print(data)) + print( + helper.pretty_print( + helper.create_command(device.commands, concat=True) + ) + ) else: - pretty_print({"data": device.data}) - pretty_print({"settings": create_command(device.commands)}) + print(helper.pretty_print({"data": device.data})) + print( + helper.pretty_print( + {"settings": helper.create_command(device.commands)} + ) + ) def start(): @@ -126,5 +93,5 @@ def start(): print("Aborted.") -if __name__ == '__main__': +if __name__ == "__main__": start() diff --git a/pyhon/api.py b/pyhon/api.py deleted file mode 100644 index 4a72274..0000000 --- a/pyhon/api.py +++ /dev/null @@ -1,196 +0,0 @@ -import asyncio -import json -import logging -import secrets -from datetime import datetime -from typing import List - -import aiohttp as aiohttp - -from pyhon import const -from pyhon.auth import HonAuth -from pyhon.device import HonDevice - -_LOGGER = logging.getLogger() - - -class HonConnection: - def __init__(self, email="", password="", session=None) -> None: - super().__init__() - self._email = email - self._password = password - self._request_headers = {"Content-Type": "application/json"} - self._session = session - self._devices = [] - self._mobile_id = secrets.token_hex(8) - - async def __aenter__(self): - self._session = aiohttp.ClientSession() - if self._email and self._password: - await self.setup() - return self - - async def __aexit__(self, exc_type, exc_val, exc_tb): - await self._session.close() - - @property - def devices(self) -> List[HonDevice]: - return self._devices - - @property - async def _headers(self): - if "cognito-token" not in self._request_headers or "id-token" not in self._request_headers: - auth = HonAuth() - if await auth.authorize(self._email, self._password, self._mobile_id): - self._request_headers["cognito-token"] = auth.cognito_token - self._request_headers["id-token"] = auth.id_token - else: - raise PermissionError("Can't Login") - return self._request_headers - - async def setup(self): - async with aiohttp.ClientSession() as session: - async with session.get(f"{const.API_URL}/commands/v1/appliance", - headers=await self._headers) as resp: - try: - appliances = (await resp.json())["payload"]["appliances"] - for appliance in appliances: - device = HonDevice(self, appliance) - if device.mac_address is None: - continue - await asyncio.gather(*[ - device.load_attributes(), - device.load_commands(), - device.load_statistics()]) - self._devices.append(device) - except json.JSONDecodeError: - _LOGGER.error("No JSON Data after GET: %s", await resp.text()) - return False - return True - - async def load_commands(self, device: HonDevice): - params = { - "applianceType": device.appliance_type, - "code": device.appliance["code"], - "applianceModelId": device.appliance_model_id, - "firmwareId": device.appliance["eepromId"], - "macAddress": device.mac_address, - "fwVersion": device.appliance["fwVersion"], - "os": const.OS, - "appVersion": const.APP_VERSION, - "series": device.appliance["series"], - } - url = f"{const.API_URL}/commands/v1/retrieve" - async with self._session.get(url, params=params, headers=await self._headers) as response: - result = (await response.json()).get("payload", {}) - if not result or result.pop("resultCode") != "0": - return {} - return result - - async def command_history(self, device: HonDevice): - url = f"{const.API_URL}/commands/v1/appliance/{device.mac_address}/history" - async with self._session.get(url, headers=await self._headers) as response: - result = await response.json() - if not result or not result.get("payload"): - return {} - return result["payload"]["history"] - - async def last_activity(self, device: HonDevice): - url = f"{const.API_URL}/commands/v1/retrieve-last-activity" - params = {"macAddress": device.mac_address} - async with self._session.get(url, params=params, headers=await self._headers) as response: - result = await response.json() - if result and (activity := result.get("attributes")): - return activity - return {} - - async def appliance_configuration(self): - url = f"{const.API_URL}/config/v1/appliance-configuration" - headers = {"x-api-key": const.API_KEY, "content-type": "application/json"} - async with self._session.get(url, headers=headers) as response: - result = await response.json() - if result and (data := result.get("payload")): - return data - return {} - - async def app_config(self, language="en", beta=True): - headers = {"x-api-key": const.API_KEY, "content-type": "application/json"} - url = f"{const.API_URL}/app-config" - payload = { - "languageCode": language, - "beta": beta, - "appVersion": const.APP_VERSION, - "os": const.OS - } - payload = json.dumps(payload, separators=(',', ':')) - async with self._session.post(url, headers=headers, data=payload) as response: - if (result := await response.json()) and (data := result.get("payload")): - return data - return {} - - async def translation_keys(self, language="en"): - headers = {"x-api-key": const.API_KEY, "content-type": "application/json"} - config = await self.app_config(language=language) - if url := config.get("language", {}).get("jsonPath"): - async with self._session.get(url, headers=headers) as response: - if result := await response.json(): - return result - return {} - - async def load_attributes(self, device: HonDevice, loop=False): - params = { - "macAddress": device.mac_address, - "applianceType": device.appliance_type, - "category": "CYCLE" - } - url = f"{const.API_URL}/commands/v1/context" - async with self._session.get(url, params=params, headers=await self._headers) as response: - if response.status == 403 and not loop: - _LOGGER.error("%s - Error %s - %s", url, response.status, await response.text()) - self._request_headers.pop("cognito-token", None) - self._request_headers.pop("id-token", None) - return await self.load_attributes(device, loop=True) - return (await response.json()).get("payload", {}) - - async def load_statistics(self, device: HonDevice): - params = { - "macAddress": device.mac_address, - "applianceType": device.appliance_type - } - url = f"{const.API_URL}/commands/v1/statistics" - async with self._session.get(url, params=params, headers=await self._headers) as response: - return (await response.json()).get("payload", {}) - - async def send_command(self, device, command, parameters, ancillary_parameters): - now = datetime.utcnow().isoformat() - data = { - "macAddress": device.mac_address, - "timestamp": f"{now[:-3]}Z", - "commandName": command, - "transactionId": f"{device.mac_address}_{now[:-3]}Z", - "applianceOptions": device.commands_options, - "device": { - "mobileId": self._mobile_id, - "mobileOs": const.OS, - "osVersion": const.OS_VERSION, - "appVersion": const.APP_VERSION, - "deviceModel": const.DEVICE_MODEL - }, - "attributes": { - "channel": "mobileApp", - "origin": "standardProgram", - "energyLabel": "0" - }, - "ancillaryParameters": ancillary_parameters, - "parameters": parameters, - "applianceType": device.appliance_type - } - url = f"{const.API_URL}/commands/v1/send" - async with self._session.post(url, headers=await self._headers, json=data) as resp: - try: - json_data = await resp.json() - except json.JSONDecodeError: - return False - if json_data["payload"]["resultCode"] == "0": - return True - return False diff --git a/pyhon/appliance.py b/pyhon/appliance.py new file mode 100644 index 0000000..ce89187 --- /dev/null +++ b/pyhon/appliance.py @@ -0,0 +1,215 @@ +import importlib +from contextlib import suppress +from typing import Optional, Dict, Any +from typing import TYPE_CHECKING + +from pyhon import helper +from pyhon.commands import HonCommand +from pyhon.parameter.fixed import HonParameterFixed + +if TYPE_CHECKING: + from pyhon import HonAPI + + +class HonAppliance: + def __init__( + self, api: Optional["HonAPI"], info: Dict[str, Any], zone: int = 0 + ) -> None: + if attributes := info.get("attributes"): + info["attributes"] = {v["parName"]: v["parValue"] for v in attributes} + self._info: Dict = info + self._api: Optional[HonAPI] = api + self._appliance_model: Dict = {} + + self._commands: Dict = {} + self._statistics: Dict = {} + self._attributes: Dict = {} + self._zone: int = zone + + try: + self._extra = importlib.import_module( + f"pyhon.appliances.{self.appliance_type.lower()}" + ).Appliance() + except ModuleNotFoundError: + self._extra = None + + def __getitem__(self, item): + if self._zone: + item += f"Z{self._zone}" + if "." in item: + result = self.data + for key in item.split("."): + if all(k in "0123456789" for k in key) and isinstance(result, list): + result = result[int(key)] + else: + result = result[key] + return result + if item in self.data: + return self.data[item] + if item in self.attributes["parameters"]: + return self.attributes["parameters"].get(item) + return self.info[item] + + def get(self, item, default=None): + try: + return self[item] + except (KeyError, IndexError): + return default + + def _check_name_zone(self, name: str, frontend: bool = True) -> str: + middle = " Z" if frontend else "_z" + if (attribute := self._info.get(name, "")) and self._zone: + return f"{attribute}{middle}{self._zone}" + return attribute + + @property + def appliance_model_id(self) -> str: + return self._info.get("applianceModelId", "") + + @property + def appliance_type(self) -> str: + return self._info.get("applianceTypeName", "") + + @property + def mac_address(self) -> str: + return self.info.get("macAddress", "") + + @property + def unique_id(self) -> str: + return self._check_name_zone("macAddress", frontend=False) + + @property + def model_name(self) -> str: + return self._check_name_zone("modelName") + + @property + def nick_name(self) -> str: + return self._check_name_zone("nickName") + + @property + def commands_options(self): + return self._appliance_model.get("options") + + @property + def commands(self): + return self._commands + + @property + def attributes(self): + return self._attributes + + @property + def statistics(self): + return self._statistics + + @property + def info(self): + return self._info + + @property + def zone(self) -> int: + return self._zone + + async def _recover_last_command_states(self, commands): + command_history = await self._api.command_history(self) + for name, command in commands.items(): + last = next( + ( + index + for (index, d) in enumerate(command_history) + if d.get("command", {}).get("commandName") == name + ), + None, + ) + if last is None: + continue + parameters = command_history[last].get("command", {}).get("parameters", {}) + if command.programs and parameters.get("program"): + command.program = parameters.pop("program").split(".")[-1].lower() + command = self.commands[name] + for key, data in command.settings.items(): + if ( + not isinstance(data, HonParameterFixed) + and parameters.get(key) is not None + ): + with suppress(ValueError): + data.value = parameters.get(key) + + async def load_commands(self): + raw = await self._api.load_commands(self) + self._appliance_model = raw.pop("applianceModel") + for item in ["settings", "options", "dictionaryId"]: + raw.pop(item) + commands = {} + for command, attr in raw.items(): + if "parameters" in attr: + commands[command] = HonCommand(command, attr, self._api, self) + elif "parameters" in attr[list(attr)[0]]: + multi = {} + for program, attr2 in attr.items(): + program = program.split(".")[-1].lower() + cmd = HonCommand( + command, + attr2, + self._api, + self, + programs=multi, + program_name=program, + ) + multi[program] = cmd + commands[command] = cmd + self._commands = commands + await self._recover_last_command_states(commands) + + @property + def settings(self): + result = {} + for name, command in self._commands.items(): + for key, setting in command.settings.items(): + result[f"{name}.{key}"] = setting + if self._extra: + return self._extra.settings(result) + return result + + @property + def parameters(self): + result = {} + for name, command in self._commands.items(): + for key, parameter in (command.parameters | command.ancillary_parameters).items(): + result.setdefault(name, {})[key] = parameter.value + return result + + async def load_attributes(self): + self._attributes = await self._api.load_attributes(self) + for name, values in self._attributes.pop("shadow").get("parameters").items(): + self._attributes.setdefault("parameters", {})[name] = values["parNewVal"] + + async def load_statistics(self): + self._statistics = await self._api.load_statistics(self) + + async def update(self): + await self.load_attributes() + + @property + def data(self): + result = { + "attributes": self.attributes, + "appliance": self.info, + "statistics": self.statistics, + **self.parameters, + } + if self._extra: + return self._extra.data(result) + return result + + @property + def diagnose(self): + data = self.data.copy() + for sensible in ["PK", "SK", "serialNumber", "code", "coords"]: + data["appliance"].pop(sensible, None) + result = helper.pretty_print({"data": self.data}, whitespace="\u200B \u200B ") + result += helper.pretty_print( + {"commands": helper.create_command(self.commands)}, + whitespace="\u200B \u200B ", + ) + return result.replace(self.mac_address, "12-34-56-78-90-ab") diff --git a/pyhon/appliances/dw.py b/pyhon/appliances/dw.py new file mode 100644 index 0000000..62311bd --- /dev/null +++ b/pyhon/appliances/dw.py @@ -0,0 +1,9 @@ +class Appliance: + def data(self, data): + if data["attributes"]["lastConnEvent"]["category"] == "DISCONNECTED": + data["attributes"]["parameters"]["machMode"] = "0" + data["active"] = bool(data.get("attributes", {}).get("activity")) + return data + + def settings(self, settings): + return settings diff --git a/pyhon/auth.py b/pyhon/auth.py deleted file mode 100644 index 1baca58..0000000 --- a/pyhon/auth.py +++ /dev/null @@ -1,157 +0,0 @@ -import json -import logging -import re -import secrets -import urllib -from urllib import parse - -import aiohttp as aiohttp -from yarl import URL - -from pyhon import const - -_LOGGER = logging.getLogger() - - -class HonAuth: - def __init__(self) -> None: - self._access_token = "" - self._refresh_token = "" - self._cognito_token = "" - self._id_token = "" - - @property - def cognito_token(self): - return self._cognito_token - - @property - def id_token(self): - return self._id_token - - @property - def access_token(self): - return self._access_token - - @property - def refresh_token(self): - return self._refresh_token - - async def _load_login(self, session): - nonce = secrets.token_hex(16) - nonce = f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}" - params = { - "response_type": "token+id_token", - "client_id": const.CLIENT_ID, - "redirect_uri": urllib.parse.quote(f"{const.APP}://mobilesdk/detect/oauth/done"), - "display": "touch", - "scope": "api openid refresh_token web", - "nonce": nonce - } - params = "&".join([f"{k}={v}" for k, v in params.items()]) - async with session.get(f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}") as resp: - if not (login_url := re.findall("url = '(.+?)'", await resp.text())): - return False - async with session.get(login_url[0], allow_redirects=False) as redirect1: - if not (url := redirect1.headers.get("Location")): - return False - async with session.get(url, allow_redirects=False) as redirect2: - if not (url := redirect2.headers.get("Location") + "&System=IoT_Mobile_App&RegistrationSubChannel=hOn"): - return False - async with session.get(URL(url, encoded=True)) as login_screen: - if context := re.findall('"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text()): - fw_uid, loaded_str = context[0] - loaded = json.loads(loaded_str) - login_url = login_url[0].replace("/".join(const.AUTH_API.split("/")[:-1]), "") - return fw_uid, loaded, login_url - return False - - async def _login(self, session, email, password, fw_uid, loaded, login_url): - data = { - "message": { - "actions": [ - { - "id": "79;a", - "descriptor": "apex://LightningLoginCustomController/ACTION$login", - "callingDescriptor": "markup://c:loginForm", - "params": { - "username": email, - "password": password, - "startUrl": parse.unquote(login_url.split("startURL=")[-1]).split("%3D")[0] - } - } - ] - }, - "aura.context": { - "mode": "PROD", - "fwuid": fw_uid, - "app": "siteforce:loginApp2", - "loaded": loaded, - "dn": [], - "globals": {}, - "uad": False}, - "aura.pageURI": login_url, - "aura.token": None} - - params = {"r": 3, "other.LightningLoginCustom.login": 1} - async with session.post( - const.AUTH_API + "/s/sfsites/aura", - headers={"Content-Type": "application/x-www-form-urlencoded"}, - data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()), - params=params - ) as response: - if response.status == 200: - try: - return (await response.json())["events"][0]["attributes"]["values"]["url"] - except json.JSONDecodeError: - pass - _LOGGER.error("Unable to login: %s\n%s", response.status, await response.text()) - return "" - - async def _get_token(self, session, url): - async with session.get(url) as resp: - if resp.status != 200: - _LOGGER.error("Unable to get token: %s", resp.status) - return False - url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await resp.text()) - async with session.get(url[0]) as resp: - if resp.status != 200: - _LOGGER.error("Unable to get token: %s", resp.status) - return False - url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await resp.text()) - url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0] - async with session.get(url) as resp: - if resp.status != 200: - _LOGGER.error("Unable to connect to the login service: %s", resp.status) - return False - text = await resp.text() - if access_token := re.findall("access_token=(.*?)&", text): - self._access_token = access_token[0] - if refresh_token := re.findall("refresh_token=(.*?)&", text): - self._refresh_token = refresh_token[0] - if id_token := re.findall("id_token=(.*?)&", text): - self._id_token = id_token[0] - return True - - async def authorize(self, email, password, mobile_id): - headers = {"user-agent": const.USER_AGENT} - async with aiohttp.ClientSession(headers=headers) as session: - if login_site := await self._load_login(session): - fw_uid, loaded, login_url = login_site - else: - return False - if not (url := await self._login(session, email, password, fw_uid, loaded, login_url)): - return False - if not await self._get_token(session, url): - return False - - post_headers = {"Content-Type": "application/json", "id-token": self._id_token} - data = {"appVersion": const.APP_VERSION, "mobileId": mobile_id, "osVersion": const.OS_VERSION, - "os": const.OS, "deviceModel": const.DEVICE_MODEL} - async with session.post(f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data) as resp: - try: - json_data = await resp.json() - except json.JSONDecodeError: - _LOGGER.error("No JSON Data after POST: %s", await resp.text()) - return False - self._cognito_token = json_data["cognitoUser"]["Token"] - return True diff --git a/pyhon/commands.py b/pyhon/commands.py index 7c1e9ee..33487a3 100644 --- a/pyhon/commands.py +++ b/pyhon/commands.py @@ -1,23 +1,47 @@ -from pyhon.parameter import HonParameterFixed, HonParameterEnum, HonParameterRange, HonParameterProgram +from typing import Optional, Dict, Any, List, TYPE_CHECKING + +from pyhon.parameter.base import HonParameter +from pyhon.parameter.enum import HonParameterEnum +from pyhon.parameter.fixed import HonParameterFixed +from pyhon.parameter.program import HonParameterProgram +from pyhon.parameter.range import HonParameterRange + +if TYPE_CHECKING: + from pyhon import HonAPI + from pyhon.appliance import HonAppliance class HonCommand: - def __init__(self, name, attributes, connector, device, multi=None, program=""): - self._connector = connector - self._device = device - self._name = name - self._multi = multi or {} - self._program = program - self._description = attributes.get("description", "") - self._parameters = self._create_parameters(attributes.get("parameters", {})) - self._ancillary_parameters = self._create_parameters(attributes.get("ancillaryParameters", {})) + def __init__( + self, + name: str, + attributes: Dict[str, Any], + api: "HonAPI", + appliance: "HonAppliance", + programs: Optional[Dict[str, "HonCommand"]] = None, + program_name: str = "", + ): + self._api: HonAPI = api + self._appliance: "HonAppliance" = appliance + self._name: str = name + self._programs: Optional[Dict[str, "HonCommand"]] = programs or {} + self._program_name: str = program_name + self._description: str = attributes.get("description", "") + self._parameters: Dict[str, HonParameter] = self._create_parameters( + attributes.get("parameters", {}) + ) + self._ancillary_parameters: Dict[str, HonParameter] = self._create_parameters( + attributes.get("ancillaryParameters", {}) + ) - def __repr__(self): + def __repr__(self) -> str: return f"{self._name} command" - def _create_parameters(self, parameters): - result = {} + def _create_parameters(self, parameters: Dict) -> Dict[str, HonParameter]: + result: Dict[str, HonParameter] = {} for parameter, attributes in parameters.items(): + if parameter == "zoneMap" and self._appliance.zone: + attributes["default"] = self._appliance.zone match attributes.get("typology"): case "range": result[parameter] = HonParameterRange(parameter, attributes) @@ -25,32 +49,47 @@ class HonCommand: result[parameter] = HonParameterEnum(parameter, attributes) case "fixed": result[parameter] = HonParameterFixed(parameter, attributes) - if self._multi: + if self._programs: result["program"] = HonParameterProgram("program", self) return result @property - def parameters(self): + def parameters(self) -> Dict[str, HonParameter]: return self._parameters @property - def ancillary_parameters(self): - return {key: parameter.value for key, parameter in self._ancillary_parameters.items()} + def ancillary_parameters(self) -> Dict[str, HonParameter]: + return self._ancillary_parameters - async def send(self): - parameters = {name: parameter.value for name, parameter in self._parameters.items()} - return await self._connector.send_command(self._device, self._name, parameters, self.ancillary_parameters) + async def send(self) -> bool: + parameters = { + name: parameter.value for name, parameter in self._parameters.items() + } + return await self._api.send_command( + self._appliance, self._name, parameters, self.ancillary_parameters + ) - def get_programs(self): - return self._multi + @property + def programs(self) -> Dict[str, "HonCommand"]: + if self._programs is None: + return {} + return self._programs - def set_program(self, program): - self._device.commands[self._name] = self._multi[program] + @property + def program(self) -> str: + return self._program_name - def _get_settings_keys(self, command=None): - command = command or self + @program.setter + def program(self, program: str) -> None: + self._appliance.commands[self._name] = self.programs[program] + + def _get_settings_keys(self, command: Optional["HonCommand"] = None) -> List[str]: + if command is None: + command = self keys = [] - for key, parameter in command._parameters.items(): + for key, parameter in ( + command._parameters | command._ancillary_parameters + ).items(): if isinstance(parameter, HonParameterFixed): continue if key not in keys: @@ -58,14 +97,22 @@ class HonCommand: return keys @property - def setting_keys(self): - if not self._multi: + def setting_keys(self) -> List[str]: + if not self._programs: return self._get_settings_keys() - result = [key for cmd in self._multi.values() for key in self._get_settings_keys(cmd)] + result = [ + key + for cmd in self._programs.values() + for key in self._get_settings_keys(cmd) + ] return list(set(result + ["program"])) @property - def settings(self): + def settings(self) -> Dict[str, HonParameter]: """Parameters with typology enum and range""" - return {s: self._parameters.get(s) for s in self.setting_keys if self._parameters.get(s) is not None} - + return { + s: param + for s in self.setting_keys + if (param := self._parameters.get(s)) is not None + or (param := self._ancillary_parameters.get(s)) is not None + } diff --git a/pyhon/connection/__init__.py b/pyhon/connection/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyhon/connection/api.py b/pyhon/connection/api.py new file mode 100644 index 0000000..4852a10 --- /dev/null +++ b/pyhon/connection/api.py @@ -0,0 +1,195 @@ +import json +import logging +from datetime import datetime +from typing import Dict, Optional + +from aiohttp import ClientSession +from typing_extensions import Self + +from pyhon import const, exceptions +from pyhon.appliance import HonAppliance +from pyhon.connection.auth import HonAuth +from pyhon.connection.handler.anonym import HonAnonymousConnectionHandler +from pyhon.connection.handler.hon import HonConnectionHandler + +_LOGGER = logging.getLogger() + + +class HonAPI: + def __init__( + self, + email: str = "", + password: str = "", + anonymous: bool = False, + session: Optional[ClientSession] = None, + ) -> None: + super().__init__() + self._email: str = email + self._password: str = password + self._anonymous: bool = anonymous + self._hon_handler: Optional[HonConnectionHandler] = None + self._hon_anonymous_handler: Optional[HonAnonymousConnectionHandler] = None + self._session: Optional[ClientSession] = session + + async def __aenter__(self) -> Self: + return await self.create() + + async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: + await self.close() + + @property + def auth(self) -> HonAuth: + if self._hon is None or self._hon.auth is None: + raise exceptions.NoAuthenticationException + return self._hon.auth + + @property + def _hon(self): + if self._hon_handler is None: + raise exceptions.NoAuthenticationException + return self._hon_handler + + @property + def _hon_anonymous(self): + if self._hon_anonymous_handler is None: + raise exceptions.NoAuthenticationException + return self._hon_anonymous_handler + + async def create(self) -> Self: + self._hon_anonymous_handler = await HonAnonymousConnectionHandler( + self._session + ).create() + if not self._anonymous: + self._hon_handler = await HonConnectionHandler( + self._email, self._password, self._session + ).create() + return self + + async def load_appliances(self) -> Dict: + async with self._hon.get(f"{const.API_URL}/commands/v1/appliance") as resp: + return await resp.json() + + async def load_commands(self, appliance: HonAppliance) -> Dict: + params: Dict = { + "applianceType": appliance.appliance_type, + "code": appliance.info["code"], + "applianceModelId": appliance.appliance_model_id, + "firmwareId": appliance.info["eepromId"], + "macAddress": appliance.mac_address, + "fwVersion": appliance.info["fwVersion"], + "os": const.OS, + "appVersion": const.APP_VERSION, + "series": appliance.info["series"], + } + url: str = f"{const.API_URL}/commands/v1/retrieve" + async with self._hon.get(url, params=params) as response: + result: Dict = (await response.json()).get("payload", {}) + if not result or result.pop("resultCode") != "0": + return {} + return result + + async def command_history(self, appliance: HonAppliance) -> Dict: + url: str = ( + f"{const.API_URL}/commands/v1/appliance/{appliance.mac_address}/history" + ) + async with self._hon.get(url) as response: + result: Dict = await response.json() + if not result or not result.get("payload"): + return {} + return result["payload"]["history"] + + async def last_activity(self, appliance: HonAppliance) -> Dict: + url: str = f"{const.API_URL}/commands/v1/retrieve-last-activity" + params: Dict = {"macAddress": appliance.mac_address} + async with self._hon.get(url, params=params) as response: + result: Dict = await response.json() + if result and (activity := result.get("attributes")): + return activity + return {} + + async def load_attributes(self, appliance: HonAppliance) -> Dict: + params: Dict = { + "macAddress": appliance.mac_address, + "applianceType": appliance.appliance_type, + "category": "CYCLE", + } + url: str = f"{const.API_URL}/commands/v1/context" + async with self._hon.get(url, params=params) as response: + return (await response.json()).get("payload", {}) + + async def load_statistics(self, appliance: HonAppliance) -> Dict: + params: Dict = { + "macAddress": appliance.mac_address, + "applianceType": appliance.appliance_type, + } + url: str = f"{const.API_URL}/commands/v1/statistics" + async with self._hon.get(url, params=params) as response: + return (await response.json()).get("payload", {}) + + async def send_command( + self, + appliance: HonAppliance, + command: str, + parameters: Dict, + ancillary_parameters: Dict, + ) -> bool: + now: str = datetime.utcnow().isoformat() + data: Dict = { + "macAddress": appliance.mac_address, + "timestamp": f"{now[:-3]}Z", + "commandName": command, + "transactionId": f"{appliance.mac_address}_{now[:-3]}Z", + "applianceOptions": appliance.commands_options, + "device": self._hon.device.get(mobile=True), + "attributes": { + "channel": "mobileApp", + "origin": "standardProgram", + "energyLabel": "0", + }, + "ancillaryParameters": ancillary_parameters, + "parameters": parameters, + "applianceType": appliance.appliance_type, + } + url: str = f"{const.API_URL}/commands/v1/send" + async with self._hon.post(url, json=data) as response: + json_data: Dict = await response.json() + if json_data.get("payload", {}).get("resultCode") == "0": + return True + _LOGGER.error(await response.text()) + return False + + async def appliance_configuration(self) -> Dict: + url: str = f"{const.API_URL}/config/v1/appliance-configuration" + async with self._hon_anonymous.get(url) as response: + result: Dict = await response.json() + if result and (data := result.get("payload")): + return data + return {} + + async def app_config(self, language: str = "en", beta: bool = True) -> Dict: + url: str = f"{const.API_URL}/app-config" + payload_data: Dict = { + "languageCode": language, + "beta": beta, + "appVersion": const.APP_VERSION, + "os": const.OS, + } + payload: str = json.dumps(payload_data, separators=(",", ":")) + async with self._hon_anonymous.post(url, data=payload) as response: + if (result := await response.json()) and (data := result.get("payload")): + return data + return {} + + async def translation_keys(self, language: str = "en") -> Dict: + config = await self.app_config(language=language) + if url := config.get("language", {}).get("jsonPath"): + async with self._hon_anonymous.get(url) as response: + if result := await response.json(): + return result + return {} + + async def close(self) -> None: + if self._hon_handler is not None: + await self._hon_handler.close() + if self._hon_anonymous_handler is not None: + await self._hon_anonymous_handler.close() diff --git a/pyhon/connection/auth.py b/pyhon/connection/auth.py new file mode 100644 index 0000000..6f26f1b --- /dev/null +++ b/pyhon/connection/auth.py @@ -0,0 +1,272 @@ +import json +import logging +import re +import secrets +import urllib +from contextlib import suppress +from dataclasses import dataclass +from datetime import datetime, timedelta +from typing import Dict, Optional +from urllib import parse +from urllib.parse import quote + +from aiohttp import ClientResponse +from yarl import URL + +from pyhon import const, exceptions +from pyhon.connection.handler.auth import HonAuthConnectionHandler + +_LOGGER = logging.getLogger(__name__) + + +@dataclass +class HonLoginData: + url: str = "" + email: str = "" + password: str = "" + fw_uid: str = "" + loaded: Optional[Dict] = None + + +class HonAuth: + _TOKEN_EXPIRES_AFTER_HOURS = 8 + _TOKEN_EXPIRE_WARNING_HOURS = 7 + + def __init__(self, session, email, password, device) -> None: + self._session = session + self._request = HonAuthConnectionHandler(session) + self._login_data = HonLoginData() + self._login_data.email = email + self._login_data.password = password + self._access_token = "" + self._refresh_token = "" + self._cognito_token = "" + self._id_token = "" + self._device = device + self._expires: datetime = datetime.utcnow() + + @property + def cognito_token(self) -> str: + return self._cognito_token + + @property + def id_token(self) -> str: + return self._id_token + + @property + def access_token(self) -> str: + return self._access_token + + @property + def refresh_token(self) -> str: + return self._refresh_token + + def _check_token_expiration(self, hours: int) -> bool: + return datetime.utcnow() >= self._expires + timedelta(hours=hours) + + @property + def token_is_expired(self) -> bool: + return self._check_token_expiration(self._TOKEN_EXPIRES_AFTER_HOURS) + + @property + def token_expires_soon(self) -> bool: + return self._check_token_expiration(self._TOKEN_EXPIRE_WARNING_HOURS) + + async def _error_logger(self, response: ClientResponse, fail: bool = True) -> None: + output = "hOn Authentication Error\n" + for i, (status, url) in enumerate(self._request.called_urls): + output += f" {i + 1: 2d} {status} - {url}\n" + output += f"ERROR - {response.status} - {response.request_info.url}\n" + output += f"{15 * '='} Response {15 * '='}\n{await response.text()}\n{40 * '='}" + _LOGGER.error(output) + if fail: + raise exceptions.HonAuthenticationError("Can't login") + + @staticmethod + def _generate_nonce() -> str: + nonce = secrets.token_hex(16) + return f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}" + + async def _load_login(self) -> bool: + login_url = await self._introduce() + login_url = await self._handle_redirects(login_url) + return await self._login_url(login_url) + + async def _introduce(self) -> str: + redirect_uri = urllib.parse.quote(f"{const.APP}://mobilesdk/detect/oauth/done") + params = { + "response_type": "token+id_token", + "client_id": const.CLIENT_ID, + "redirect_uri": redirect_uri, + "display": "touch", + "scope": "api openid refresh_token web", + "nonce": self._generate_nonce(), + } + params_encode = "&".join([f"{k}={v}" for k, v in params.items()]) + url = f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params_encode}" + async with self._request.get(url) as response: + text = await response.text() + self._expires = datetime.utcnow() + if not (login_url := re.findall("url = '(.+?)'", text)): + if "oauth/done#access_token=" in text: + self._parse_token_data(text) + raise exceptions.HonNoAuthenticationNeeded() + await self._error_logger(response) + return login_url[0] + + async def _manual_redirect(self, url: str) -> str: + async with self._request.get(url, allow_redirects=False) as response: + if not (new_location := response.headers.get("Location", "")): + await self._error_logger(response) + return new_location + + async def _handle_redirects(self, login_url) -> str: + redirect1 = await self._manual_redirect(login_url) + redirect2 = await self._manual_redirect(redirect1) + return f"{redirect2}&System=IoT_Mobile_App&RegistrationSubChannel=hOn" + + async def _login_url(self, login_url: str) -> bool: + headers = {"user-agent": const.USER_AGENT} + url = URL(login_url, encoded=True) + async with self._request.get(url, headers=headers) as response: + text = await response.text() + if context := re.findall('"fwuid":"(.*?)","loaded":(\\{.*?})', text): + fw_uid, loaded_str = context[0] + self._login_data.fw_uid = fw_uid + self._login_data.loaded = json.loads(loaded_str) + self._login_data.url = login_url.replace( + "/".join(const.AUTH_API.split("/")[:-1]), "" + ) + return True + await self._error_logger(response) + return False + + async def _login(self) -> str: + start_url = self._login_data.url.rsplit("startURL=", maxsplit=1)[-1] + start_url = parse.unquote(start_url).split("%3D")[0] + action = { + "id": "79;a", + "descriptor": "apex://LightningLoginCustomController/ACTION$login", + "callingDescriptor": "markup://c:loginForm", + "params": { + "username": quote(self._login_data.email), + "password": quote(self._login_data.password), + "startUrl": start_url, + }, + } + data = { + "message": {"actions": [action]}, + "aura.context": { + "mode": "PROD", + "fwuid": self._login_data.fw_uid, + "app": "siteforce:loginApp2", + "loaded": self._login_data.loaded, + "dn": [], + "globals": {}, + "uad": False, + }, + "aura.pageURI": self._login_data.url, + "aura.token": None, + } + params = {"r": 3, "other.LightningLoginCustom.login": 1} + async with self._request.post( + const.AUTH_API + "/s/sfsites/aura", + headers={"Content-Type": "application/x-www-form-urlencoded"}, + data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()), + params=params, + ) as response: + if response.status == 200: + with suppress(json.JSONDecodeError, KeyError): + result = await response.json() + return result["events"][0]["attributes"]["values"]["url"] + await self._error_logger(response) + return "" + + def _parse_token_data(self, text: str) -> None: + if access_token := re.findall("access_token=(.*?)&", text): + self._access_token = access_token[0] + if refresh_token := re.findall("refresh_token=(.*?)&", text): + self._refresh_token = refresh_token[0] + if id_token := re.findall("id_token=(.*?)&", text): + self._id_token = id_token[0] + + async def _get_token(self, url: str) -> bool: + async with self._request.get(url) as response: + if response.status != 200: + await self._error_logger(response) + return False + url_search = re.findall( + "href\\s*=\\s*[\"'](.+?)[\"']", await response.text() + ) + if not url_search: + await self._error_logger(response) + return False + if "ProgressiveLogin" in url_search[0]: + async with self._request.get(url_search[0]) as response: + if response.status != 200: + await self._error_logger(response) + return False + url_search = re.findall( + "href\\s*=\\s*[\"'](.*?)[\"']", await response.text() + ) + url = "/".join(const.AUTH_API.split("/")[:-1]) + url_search[0] + async with self._request.get(url) as response: + if response.status != 200: + await self._error_logger(response) + return False + self._parse_token_data(await response.text()) + return True + + async def _api_auth(self) -> bool: + post_headers = {"id-token": self._id_token} + data = self._device.get() + async with self._request.post( + f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data + ) as response: + try: + json_data = await response.json() + except json.JSONDecodeError: + await self._error_logger(response) + return False + self._cognito_token = json_data["cognitoUser"]["Token"] + return True + + async def authenticate(self) -> None: + self.clear() + try: + if not await self._load_login(): + raise exceptions.HonAuthenticationError("Can't open login page") + if not (url := await self._login()): + raise exceptions.HonAuthenticationError("Can't login") + if not await self._get_token(url): + raise exceptions.HonAuthenticationError("Can't get token") + if not await self._api_auth(): + raise exceptions.HonAuthenticationError("Can't get api token") + except exceptions.HonNoAuthenticationNeeded: + return + + async def refresh(self) -> bool: + params = { + "client_id": const.CLIENT_ID, + "refresh_token": self._refresh_token, + "grant_type": "refresh_token", + } + async with self._request.post( + f"{const.AUTH_API}/services/oauth2/token", params=params + ) as response: + if response.status >= 400: + await self._error_logger(response, fail=False) + return False + data = await response.json() + self._expires = datetime.utcnow() + self._id_token = data["id_token"] + self._access_token = data["access_token"] + return await self._api_auth() + + def clear(self) -> None: + self._session.cookie_jar.clear_domain(const.AUTH_API.split("/")[-2]) + self._request.called_urls = [] + self._cognito_token = "" + self._id_token = "" + self._access_token = "" + self._refresh_token = "" diff --git a/pyhon/connection/device.py b/pyhon/connection/device.py new file mode 100644 index 0000000..ea7f30e --- /dev/null +++ b/pyhon/connection/device.py @@ -0,0 +1,43 @@ +import secrets +from typing import Dict + +from pyhon import const + + +class HonDevice: + def __init__(self) -> None: + self._app_version: str = const.APP_VERSION + self._os_version: int = const.OS_VERSION + self._os: str = const.OS + self._device_model: str = const.DEVICE_MODEL + self._mobile_id: str = secrets.token_hex(8) + + @property + def app_version(self) -> str: + return self._app_version + + @property + def os_version(self) -> int: + return self._os_version + + @property + def os(self) -> str: + return self._os + + @property + def device_model(self) -> str: + return self._device_model + + @property + def mobile_id(self) -> str: + return self._mobile_id + + def get(self, mobile: bool = False) -> Dict: + result = { + "appVersion": self.app_version, + "mobileId": self.mobile_id, + "os": self.os, + "osVersion": self.os_version, + "deviceModel": self.device_model, + } + return (result | {"mobileOs": result.pop("os")}) if mobile else result diff --git a/pyhon/connection/handler/__init__.py b/pyhon/connection/handler/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyhon/connection/handler/anonym.py b/pyhon/connection/handler/anonym.py new file mode 100644 index 0000000..1ed0410 --- /dev/null +++ b/pyhon/connection/handler/anonym.py @@ -0,0 +1,21 @@ +import logging +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager +from typing import Callable, Dict + +from pyhon import const +from pyhon.connection.handler.base import ConnectionHandler + +_LOGGER = logging.getLogger(__name__) + + +class HonAnonymousConnectionHandler(ConnectionHandler): + _HEADERS: Dict = ConnectionHandler._HEADERS | {"x-api-key": const.API_KEY} + + @asynccontextmanager + async def _intercept(self, method: Callable, *args, **kwargs) -> AsyncIterator: + kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS + async with method(*args, **kwargs) as response: + if response.status == 403: + _LOGGER.error("Can't authenticate anymore") + yield response diff --git a/pyhon/connection/handler/auth.py b/pyhon/connection/handler/auth.py new file mode 100644 index 0000000..ecba4cb --- /dev/null +++ b/pyhon/connection/handler/auth.py @@ -0,0 +1,36 @@ +import logging +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager +from typing import Optional, Callable, List, Tuple + +import aiohttp + +from pyhon import const +from pyhon.connection.handler.base import ConnectionHandler + +_LOGGER = logging.getLogger(__name__) + + +class HonAuthConnectionHandler(ConnectionHandler): + _HEADERS = {"user-agent": const.USER_AGENT} + + def __init__(self, session: Optional[aiohttp.ClientSession] = None) -> None: + super().__init__(session) + self._called_urls: List[Tuple[int, str]] = [] + + @property + def called_urls(self) -> List[Tuple[int, str]]: + return self._called_urls + + @called_urls.setter + def called_urls(self, called_urls: List[Tuple[int, str]]) -> None: + self._called_urls = called_urls + + @asynccontextmanager + async def _intercept( + self, method: Callable, *args, loop: int = 0, **kwargs + ) -> AsyncIterator: + kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS + async with method(*args, **kwargs) as response: + self._called_urls.append((response.status, response.request_info.url)) + yield response diff --git a/pyhon/connection/handler/base.py b/pyhon/connection/handler/base.py new file mode 100644 index 0000000..7ccf5f4 --- /dev/null +++ b/pyhon/connection/handler/base.py @@ -0,0 +1,57 @@ +import logging +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager +from typing import Optional, Callable, Dict + +import aiohttp +from typing_extensions import Self + +from pyhon import const, exceptions + +_LOGGER = logging.getLogger(__name__) + + +class ConnectionHandler: + _HEADERS: Dict = { + "user-agent": const.USER_AGENT, + "Content-Type": "application/json", + } + + def __init__(self, session: Optional[aiohttp.ClientSession] = None) -> None: + self._create_session: bool = session is None + self._session: Optional[aiohttp.ClientSession] = session + + async def __aenter__(self) -> Self: + return await self.create() + + async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: + await self.close() + + async def create(self) -> Self: + if self._create_session: + self._session = aiohttp.ClientSession() + return self + + @asynccontextmanager + def _intercept(self, method: Callable, *args, loop: int = 0, **kwargs): + raise NotImplementedError + + @asynccontextmanager + async def get(self, *args, **kwargs) -> AsyncIterator[aiohttp.ClientResponse]: + if self._session is None: + raise exceptions.NoSessionException() + response: aiohttp.ClientResponse + async with self._intercept(self._session.get, *args, **kwargs) as response: + yield response + + @asynccontextmanager + async def post(self, *args, **kwargs) -> AsyncIterator[aiohttp.ClientResponse]: + if self._session is None: + raise exceptions.NoSessionException() + response: aiohttp.ClientResponse + async with self._intercept(self._session.post, *args, **kwargs) as response: + yield response + + async def close(self) -> None: + if self._create_session and self._session is not None: + await self._session.close() diff --git a/pyhon/connection/handler/hon.py b/pyhon/connection/handler/hon.py new file mode 100644 index 0000000..f50c3a9 --- /dev/null +++ b/pyhon/connection/handler/hon.py @@ -0,0 +1,102 @@ +import json +import logging +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager +from typing import Optional, Callable, Dict + +import aiohttp +from typing_extensions import Self + +from pyhon.connection.auth import HonAuth +from pyhon.connection.device import HonDevice +from pyhon.connection.handler.base import ConnectionHandler +from pyhon.exceptions import HonAuthenticationError, NoAuthenticationException + +_LOGGER = logging.getLogger(__name__) + + +class HonConnectionHandler(ConnectionHandler): + def __init__( + self, email: str, password: str, session: Optional[aiohttp.ClientSession] = None + ) -> None: + super().__init__(session=session) + self._device: HonDevice = HonDevice() + self._email: str = email + self._password: str = password + if not self._email: + raise HonAuthenticationError("An email address must be specified") + if not self._password: + raise HonAuthenticationError("A password address must be specified") + self._auth: Optional[HonAuth] = None + + @property + def auth(self) -> HonAuth: + if self._auth is None: + raise NoAuthenticationException() + return self._auth + + @property + def device(self) -> HonDevice: + return self._device + + async def create(self) -> Self: + await super().create() + self._auth = HonAuth(self._session, self._email, self._password, self._device) + return self + + async def _check_headers(self, headers: Dict) -> Dict: + if not (self.auth.cognito_token and self.auth.id_token): + await self.auth.authenticate() + headers["cognito-token"] = self.auth.cognito_token + headers["id-token"] = self.auth.id_token + return self._HEADERS | headers + + @asynccontextmanager + async def _intercept( + self, method: Callable, *args, loop: int = 0, **kwargs + ) -> AsyncIterator: + kwargs["headers"] = await self._check_headers(kwargs.get("headers", {})) + async with method(*args, **kwargs) as response: + if ( + self.auth.token_expires_soon or response.status in [401, 403] + ) and loop == 0: + _LOGGER.info("Try refreshing token...") + await self.auth.refresh() + async with self._intercept( + method, *args, loop=loop + 1, **kwargs + ) as result: + yield result + elif ( + self.auth.token_is_expired or response.status in [401, 403] + ) and loop == 1: + _LOGGER.warning( + "%s - Error %s - %s", + response.request_info.url, + response.status, + await response.text(), + ) + await self.create() + async with self._intercept( + method, *args, loop=loop + 1, **kwargs + ) as result: + yield result + elif loop >= 2: + _LOGGER.error( + "%s - Error %s - %s", + response.request_info.url, + response.status, + await response.text(), + ) + raise HonAuthenticationError("Login failure") + else: + try: + await response.json() + yield response + except json.JSONDecodeError: + _LOGGER.warning( + "%s - JsonDecodeError %s - %s", + response.request_info.url, + response.status, + await response.text(), + ) + raise HonAuthenticationError("Decode Error") diff --git a/pyhon/device.py b/pyhon/device.py deleted file mode 100644 index 711b7e1..0000000 --- a/pyhon/device.py +++ /dev/null @@ -1,156 +0,0 @@ -import importlib -from contextlib import suppress - -from pyhon.commands import HonCommand -from pyhon.parameter import HonParameterFixed - - -class HonDevice: - def __init__(self, connector, appliance): - if attributes := appliance.get("attributes"): - appliance["attributes"] = {v["parName"]: v["parValue"] for v in attributes} - self._appliance = appliance - self._connector = connector - self._appliance_model = {} - - self._commands = {} - self._statistics = {} - self._attributes = {} - - try: - self._extra = importlib.import_module(f'pyhon.appliances.{self.appliance_type.lower()}').Appliance() - except ModuleNotFoundError: - self._extra = None - - def __getitem__(self, item): - if "." in item: - result = self.data - for key in item.split("."): - if all([k in "0123456789" for k in key]) and type(result) is list: - result = result[int(key)] - else: - result = result[key] - return result - else: - if item in self.data: - return self.data[item] - if item in self.attributes["parameters"]: - return self.attributes["parameters"].get(item) - return self.appliance[item] - - def get(self, item, default=None): - try: - return self[item] - except (KeyError, IndexError): - return default - - @property - def appliance_model_id(self): - return self._appliance.get("applianceModelId") - - @property - def appliance_type(self): - return self._appliance.get("applianceTypeName") - - @property - def mac_address(self): - return self._appliance.get("macAddress") - - @property - def model_name(self): - return self._appliance.get("modelName") - - @property - def nick_name(self): - return self._appliance.get("nickName") - - @property - def commands_options(self): - return self._appliance_model.get("options") - - @property - def commands(self): - return self._commands - - @property - def attributes(self): - return self._attributes - - @property - def statistics(self): - return self._statistics - - @property - def appliance(self): - return self._appliance - - async def _recover_last_command_states(self, commands): - command_history = await self._connector.command_history(self) - for name, command in commands.items(): - last = next((index for (index, d) in enumerate(command_history) if d.get("command", {}).get("commandName") == name), None) - if last is None: - continue - parameters = command_history[last].get("command", {}).get("parameters", {}) - if command._multi and parameters.get("program"): - command.set_program(parameters.pop("program").split(".")[-1].lower()) - command = self.commands[name] - for key, data in command.settings.items(): - if not isinstance(data, HonParameterFixed) and parameters.get(key) is not None: - with suppress(ValueError): - data.value = parameters.get(key) - - async def load_commands(self): - raw = await self._connector.load_commands(self) - self._appliance_model = raw.pop("applianceModel") - for item in ["settings", "options", "dictionaryId"]: - raw.pop(item) - commands = {} - for command, attr in raw.items(): - if "parameters" in attr: - commands[command] = HonCommand(command, attr, self._connector, self) - elif "parameters" in attr[list(attr)[0]]: - multi = {} - for program, attr2 in attr.items(): - program = program.split(".")[-1].lower() - cmd = HonCommand(command, attr2, self._connector, self, multi=multi, program=program) - multi[program] = cmd - commands[command] = cmd - self._commands = commands - await self._recover_last_command_states(commands) - - @property - def settings(self): - result = {} - for name, command in self._commands.items(): - for key, setting in command.settings.items(): - result[f"{name}.{key}"] = setting - if self._extra: - return self._extra.settings(result) - return result - - @property - def parameters(self): - result = {} - for name, command in self._commands.items(): - for key, parameter in command.parameters.items(): - result.setdefault(name, {})[key] = parameter.value - return result - - async def load_attributes(self): - self._attributes = await self._connector.load_attributes(self) - for name, values in self._attributes.pop("shadow").get("parameters").items(): - self._attributes.setdefault("parameters", {})[name] = values["parNewVal"] - - async def load_statistics(self): - self._statistics = await self._connector.load_statistics(self) - - async def update(self): - await self.load_attributes() - - @property - def data(self): - result = {"attributes": self.attributes, "appliance": self.appliance, "statistics": self.statistics, - **self.parameters} - if self._extra: - return self._extra.data(result) - return result diff --git a/pyhon/exceptions.py b/pyhon/exceptions.py new file mode 100644 index 0000000..64e8648 --- /dev/null +++ b/pyhon/exceptions.py @@ -0,0 +1,14 @@ +class HonAuthenticationError(Exception): + pass + + +class HonNoAuthenticationNeeded(Exception): + pass + + +class NoSessionException(Exception): + pass + + +class NoAuthenticationException(Exception): + pass diff --git a/pyhon/helper.py b/pyhon/helper.py new file mode 100644 index 0000000..c820dd1 --- /dev/null +++ b/pyhon/helper.py @@ -0,0 +1,63 @@ +def key_print(data, key="", start=True): + result = "" + if isinstance(data, list): + for i, value in enumerate(data): + result += key_print(value, key=f"{key}.{i}", start=False) + elif isinstance(data, dict): + for k, value in sorted(data.items()): + result += key_print(value, key=k if start else f"{key}.{k}", start=False) + else: + result += f"{key}: {data}\n" + return result + + +# yaml.dump() would be done the same, but needs an additional dependency... +def pretty_print(data, key="", intend=0, is_list=False, whitespace=" "): + result = "" + if isinstance(data, list): + if key: + result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n" + intend += 1 + for i, value in enumerate(data): + result += pretty_print( + value, intend=intend, is_list=True, whitespace=whitespace + ) + elif isinstance(data, dict): + if key: + result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n" + intend += 1 + for i, (key, value) in enumerate(sorted(data.items())): + if is_list and not i: + result += pretty_print( + value, key=key, intend=intend, is_list=True, whitespace=whitespace + ) + elif is_list: + result += pretty_print( + value, key=key, intend=intend + 1, whitespace=whitespace + ) + else: + result += pretty_print( + value, key=key, intend=intend, whitespace=whitespace + ) + else: + result += f"{whitespace * intend}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}\n" + return result + + +def create_command(commands, concat=False): + result = {} + for name, command in commands.items(): + if not concat: + result[name] = {} + for parameter, data in command.settings.items(): + if data.typology == "enum": + value = data.values + elif data.typology == "range": + value = {"min": data.min, "max": data.max, "step": data.step} + else: + continue + if not concat: + result[name][parameter] = value + else: + result[f"{name}.{parameter}"] = value + return result diff --git a/pyhon/hon.py b/pyhon/hon.py new file mode 100644 index 0000000..830d510 --- /dev/null +++ b/pyhon/hon.py @@ -0,0 +1,69 @@ +import asyncio +from types import TracebackType +from typing import List, Optional, Dict, Any, Type + +from aiohttp import ClientSession +from typing_extensions import Self + +from pyhon import HonAPI, exceptions +from pyhon.appliance import HonAppliance + + +class Hon: + def __init__(self, email: str, password: str, session: ClientSession | None = None): + self._email: str = email + self._password: str = password + self._session: ClientSession | None = session + self._appliances: List[HonAppliance] = [] + self._api: Optional[HonAPI] = None + + async def __aenter__(self) -> Self: + return await self.create() + + async def __aexit__( + self, + exc_type: Optional[Type[BaseException]], + exc: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> None: + await self.close() + + @property + def api(self) -> HonAPI: + if self._api is None: + raise exceptions.NoAuthenticationException + return self._api + + async def create(self) -> Self: + self._api = await HonAPI( + self._email, self._password, session=self._session + ).create() + await self.setup() + return self + + @property + def appliances(self) -> List[HonAppliance]: + return self._appliances + + async def _create_appliance(self, appliance_data: Dict[str, Any], zone=0) -> None: + appliance = HonAppliance(self._api, appliance_data, zone=zone) + if appliance.mac_address is None: + return + await asyncio.gather( + *[ + appliance.load_attributes(), + appliance.load_commands(), + appliance.load_statistics(), + ] + ) + self._appliances.append(appliance) + + async def setup(self) -> None: + appliance: Dict + for appliance in (await self.api.load_appliances())["payload"]["appliances"]: + for zone in range(int(appliance.get("zone", "0"))): + await self._create_appliance(appliance.copy(), zone=zone + 1) + await self._create_appliance(appliance) + + async def close(self) -> None: + await self.api.close() diff --git a/pyhon/parameter.py b/pyhon/parameter.py deleted file mode 100644 index 647fb5d..0000000 --- a/pyhon/parameter.py +++ /dev/null @@ -1,141 +0,0 @@ -class HonParameter: - def __init__(self, key, attributes): - self._key = key - self._category = attributes.get("category") - self._typology = attributes.get("typology") - self._mandatory = attributes.get("mandatory") - self._value = "" - - @property - def key(self): - return self._key - - @property - def value(self): - return self._value if self._value is not None else "0" - - @property - def category(self): - return self._category - - @property - def typology(self): - return self._typology - - @property - def mandatory(self): - return self._mandatory - - -class HonParameterFixed(HonParameter): - def __init__(self, key, attributes): - super().__init__(key, attributes) - self._value = attributes.get("fixedValue", None) - - def __repr__(self): - return f"{self.__class__} (<{self.key}> fixed)" - - @property - def value(self): - return self._value if self._value is not None else "0" - - @value.setter - def value(self, value): - if not value == self._value: - raise ValueError("Can't change fixed value") - - -class HonParameterRange(HonParameter): - def __init__(self, key, attributes): - super().__init__(key, attributes) - self._min = int(attributes["minimumValue"]) - self._max = int(attributes["maximumValue"]) - self._step = int(attributes["incrementValue"]) - self._default = int(attributes.get("defaultValue", self._min)) - self._value = self._default - - def __repr__(self): - return f"{self.__class__} (<{self.key}> [{self._min} - {self._max}])" - - @property - def min(self): - return self._min - - @property - def max(self): - return self._max - - @property - def step(self): - return self._step - - @property - def value(self): - return self._value if self._value is not None else self._min - - @value.setter - def value(self, value): - value = int(value) - if self._min <= value <= self._max and not value % self._step: - self._value = value - else: - raise ValueError(f"Allowed: min {self._min} max {self._max} step {self._step}") - - -class HonParameterEnum(HonParameter): - def __init__(self, key, attributes): - super().__init__(key, attributes) - self._default = attributes.get("defaultValue") - self._value = self._default or "0" - self._values = attributes.get("enumValues") - - def __repr__(self): - return f"{self.__class__} (<{self.key}> {self.values})" - - @property - def values(self): - return [str(value) for value in self._values] - - @property - def value(self): - return self._value if self._value is not None else self.values[0] - - @value.setter - def value(self, value): - if value in self.values: - self._value = value - else: - raise ValueError(f"Allowed values {self._value}") - - -class HonParameterProgram(HonParameterEnum): - def __init__(self, key, command): - super().__init__(key, {}) - self._command = command - self._value = command._program - self._values = command._multi - self._typology = "enum" - self._filter = "" - - @property - def value(self): - return self._value - - @value.setter - def value(self, value): - if value in self.values: - self._command.set_program(value) - else: - raise ValueError(f"Allowed values {self._values}") - - @property - def filter(self): - return self._filter - - @filter.setter - def filter(self, filter): - self._filter = filter - - @property - def values(self): - return sorted([str(value) for value in self._values if not self._filter or self._filter in str(value)]) diff --git a/pyhon/parameter/__init__.py b/pyhon/parameter/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyhon/parameter/base.py b/pyhon/parameter/base.py new file mode 100644 index 0000000..c2c818a --- /dev/null +++ b/pyhon/parameter/base.py @@ -0,0 +1,30 @@ +from typing import Dict, Any + + +class HonParameter: + def __init__(self, key: str, attributes: Dict[str, Any]) -> None: + self._key = key + self._category: str = attributes.get("category", "") + self._typology: str = attributes.get("typology", "") + self._mandatory: int = attributes.get("mandatory", 0) + self._value: str | float = "" + + @property + def key(self) -> str: + return self._key + + @property + def value(self) -> str | float: + return self._value if self._value is not None else "0" + + @property + def category(self) -> str: + return self._category + + @property + def typology(self) -> str: + return self._typology + + @property + def mandatory(self) -> int: + return self._mandatory diff --git a/pyhon/parameter/enum.py b/pyhon/parameter/enum.py new file mode 100644 index 0000000..c5afcda --- /dev/null +++ b/pyhon/parameter/enum.py @@ -0,0 +1,29 @@ +from typing import Dict, Any, List + +from pyhon.parameter.base import HonParameter + + +class HonParameterEnum(HonParameter): + def __init__(self, key: str, attributes: Dict[str, Any]) -> None: + super().__init__(key, attributes) + self._default = attributes.get("defaultValue") + self._value = self._default or "0" + self._values: List[str] = attributes.get("enumValues", []) + + def __repr__(self) -> str: + return f"{self.__class__} (<{self.key}> {self.values})" + + @property + def values(self) -> List[str]: + return [str(value) for value in self._values] + + @property + def value(self) -> str | float: + return self._value if self._value is not None else self.values[0] + + @value.setter + def value(self, value: str) -> None: + if value in self.values: + self._value = value + else: + raise ValueError(f"Allowed values {self._value}") diff --git a/pyhon/parameter/fixed.py b/pyhon/parameter/fixed.py new file mode 100644 index 0000000..52595ed --- /dev/null +++ b/pyhon/parameter/fixed.py @@ -0,0 +1,21 @@ +from typing import Dict, Any + +from pyhon.parameter.base import HonParameter + + +class HonParameterFixed(HonParameter): + def __init__(self, key: str, attributes: Dict[str, Any]) -> None: + super().__init__(key, attributes) + self._value = attributes.get("fixedValue", None) + + def __repr__(self) -> str: + return f"{self.__class__} (<{self.key}> fixed)" + + @property + def value(self) -> str | float: + return self._value if self._value is not None else "0" + + @value.setter + def value(self, value: str | float) -> None: + # Fixed values seems being not so fixed as thought + self._value = value diff --git a/pyhon/parameter/program.py b/pyhon/parameter/program.py new file mode 100644 index 0000000..a41603c --- /dev/null +++ b/pyhon/parameter/program.py @@ -0,0 +1,33 @@ +from typing import List, TYPE_CHECKING + +from pyhon.parameter.enum import HonParameterEnum + +if TYPE_CHECKING: + from pyhon.commands import HonCommand + + +class HonParameterProgram(HonParameterEnum): + _FILTER = ["iot_recipe", "iot_guided"] + + def __init__(self, key: str, command: "HonCommand") -> None: + super().__init__(key, {}) + self._command = command + self._value: str = command.program + self._values: List[str] = list(command.programs) + self._typology: str = "enum" + + @property + def value(self) -> str | float: + return self._value + + @value.setter + def value(self, value: str) -> None: + if value in self.values: + self._command.program = value + else: + raise ValueError(f"Allowed values {self._values}") + + @property + def values(self) -> List[str]: + values = [v for v in self._values if all(f not in v for f in self._FILTER)] + return sorted(values) diff --git a/pyhon/parameter/range.py b/pyhon/parameter/range.py new file mode 100644 index 0000000..d0d26d9 --- /dev/null +++ b/pyhon/parameter/range.py @@ -0,0 +1,49 @@ +from typing import Dict, Any + +from pyhon.parameter.base import HonParameter + + +def str_to_float(string: str | float) -> float: + try: + return int(string) + except ValueError: + return float(str(string).replace(",", ".")) + + +class HonParameterRange(HonParameter): + def __init__(self, key: str, attributes: Dict[str, Any]) -> None: + super().__init__(key, attributes) + self._min: float = str_to_float(attributes["minimumValue"]) + self._max: float = str_to_float(attributes["maximumValue"]) + self._step: float = str_to_float(attributes["incrementValue"]) + self._default: float = str_to_float(attributes.get("defaultValue", self._min)) + self._value: float = self._default + + def __repr__(self): + return f"{self.__class__} (<{self.key}> [{self._min} - {self._max}])" + + @property + def min(self) -> float: + return self._min + + @property + def max(self) -> float: + return self._max + + @property + def step(self) -> float: + return self._step + + @property + def value(self) -> float: + return self._value if self._value is not None else self._min + + @value.setter + def value(self, value: float) -> None: + value = str_to_float(value) + if self._min <= value <= self._max and not value % self._step: + self._value = value + else: + raise ValueError( + f"Allowed: min {self._min} max {self._max} step {self._step}" + ) diff --git a/requirements.txt b/requirements.txt index ee4ba4f..9ba048c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ -aiohttp +aiohttp==3.8.4 +yarl==1.8.2 diff --git a/requirements_dev.txt b/requirements_dev.txt new file mode 100644 index 0000000..9bb012e --- /dev/null +++ b/requirements_dev.txt @@ -0,0 +1,4 @@ +black==23.3.0 +flake8==6.0.0 +mypy==1.2.0 +pylint==2.17.2 diff --git a/setup.py b/setup.py index df6672c..caa5d11 100644 --- a/setup.py +++ b/setup.py @@ -7,11 +7,11 @@ with open("README.md", "r") as f: setup( name="pyhOn", - version="0.5.0", + version="0.8.0b4", author="Andre Basche", description="Control hOn devices with python", long_description=long_description, - long_description_content_type='text/markdown', + long_description_content_type="text/markdown", project_urls={ "GitHub": "https://github.com/Andre0512/pyhOn", "PyPI": "https://pypi.org/project/pyhOn", @@ -33,8 +33,8 @@ setup( "Topic :: Software Development :: Libraries :: Python Modules", ], entry_points={ - 'console_scripts': [ - 'pyhOn = pyhon.__main__:start', + "console_scripts": [ + "pyhOn = pyhon.__main__:start", ] - } + }, )