Merge remote-tracking branch 'origin/main'

# Conflicts:
#	pyhon/appliances/ov.py
This commit is contained in:
Miguel Ángel 2023-04-16 09:58:00 +02:00
commit f2aa3dc8fd
33 changed files with 1441 additions and 770 deletions

42
.github/workflows/python-check.yml vendored Normal file
View file

@ -0,0 +1,42 @@
name: Python check
on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]
jobs:
build:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: ["3.10", "3.11"]
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v3
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install -r requirements.txt
python -m pip install -r requirements_dev.txt
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=88 --statistics
- name: Type check with mypy
run: |
mypy pyhon/
# - name: Analysing the code with pylint
# run: |
# pylint --max-line-length 88 $(git ls-files '*.py')
- name: Check black style
run: |
black . --check

1
.gitignore vendored
View file

@ -4,3 +4,4 @@ __pycache__/
dist/
**/*.egg-info/
test*
build/

View file

@ -6,7 +6,7 @@
[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/pyhOn)](https://www.python.org/)
[![PyPI - License](https://img.shields.io/pypi/l/pyhOn)](https://github.com/Andre0512/pyhOn/blob/main/LICENSE)
[![PyPI - Downloads](https://img.shields.io/pypi/dm/pyhOn)](https://pypistats.org/packages/pyhon)
Control your Haier appliances with python!
Control your Haier, Candy and Hoover appliances with python!
The idea behind this library is, to make the use of all available commands as simple as possible.
## Installation
@ -47,28 +47,28 @@ settings:
### List devices
```python
import asyncio
from pyhon import HonConnection
from pyhon import Hon
async def devices_example():
async with HonConnection(USER, PASSWORD) as hon:
for device in hon.devices:
print(device.nick_name)
async with Hon(USER, PASSWORD) as hon:
for appliance in hon.appliances:
print(appliance.nick_name)
asyncio.run(devices_example())
```
### Execute a command
```python
async with HonConnection(USER, PASSWORD) as hon:
washing_machine = hon.devices[0]
async with Hon(USER, PASSWORD) as hon:
washing_machine = hon.appliances[0]
pause_command = washing_machine.commands["pauseProgram"]
await pause_command.send()
```
### Set command parameter
```python
async with HonConnection(USER, PASSWORD) as hon:
washing_machine = hon.devices[0]
async with Hon(USER, PASSWORD) as hon:
washing_machine = hon.appliances[0]
start_command = washing_machine.commands["startProgram"]
for name, setting in start_command.settings:
print("Setting", name)
@ -100,8 +100,6 @@ This generates a huge output. It is recommended to pipe this into a file
$ pyhOn translate fr > hon_fr.yaml
$ pyhOn translate en --json > hon_en.json
```
## Tested devices
- Haier Washing Machine HW90
## Usage example
This library is used for the custom [HomeAssistant Integration "Haier hOn"](https://github.com/Andre0512/hOn).

View file

@ -1 +1,4 @@
from .api import HonConnection
from .connection.api import HonAPI
from .hon import Hon
__all__ = ["Hon", "HonAPI"]

View file

@ -6,12 +6,11 @@ import logging
import sys
from getpass import getpass
from pathlib import Path
from pprint import pprint
if __name__ == "__main__":
sys.path.insert(0, str(Path(__file__).parent.parent))
from pyhon import HonConnection
from pyhon import Hon, HonAPI, helper
_LOGGER = logging.getLogger(__name__)
@ -25,74 +24,30 @@ def get_arguments():
keys = subparser.add_parser("keys", help="print as key format")
keys.add_argument("keys", help="print as key format", action="store_true")
keys.add_argument("--all", help="print also full keys", action="store_true")
translate = subparser.add_parser("translate", help="print available translation keys")
translate.add_argument("translate", help="language (de, en, fr...)", metavar="LANGUAGE")
translate = subparser.add_parser(
"translate", help="print available translation keys"
)
translate.add_argument(
"translate", help="language (de, en, fr...)", metavar="LANGUAGE"
)
translate.add_argument("--json", help="print as json", action="store_true")
return vars(parser.parse_args())
# yaml.dump() would be done the same, but needs an additional dependency...
def pretty_print(data, key="", intend=0, is_list=False):
if type(data) is list:
if key:
print(f"{' ' * intend}{'- ' if is_list else ''}{key}:")
intend += 1
for i, value in enumerate(data):
pretty_print(value, intend=intend, is_list=True)
elif type(data) is dict:
if key:
print(f"{' ' * intend}{'- ' if is_list else ''}{key}:")
intend += 1
for i, (key, value) in enumerate(sorted(data.items())):
if is_list and not i:
pretty_print(value, key=key, intend=intend, is_list=True)
elif is_list:
pretty_print(value, key=key, intend=intend + 1)
else:
pretty_print(value, key=key, intend=intend)
else:
print(f"{' ' * intend}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}")
def key_print(data, key="", start=True):
if type(data) is list:
for i, value in enumerate(data):
key_print(value, key=f"{key}.{i}", start=False)
elif type(data) is dict:
for k, value in sorted(data.items()):
key_print(value, key=k if start else f"{key}.{k}", start=False)
else:
print(f"{key}: {data}")
def create_command(commands, concat=False):
result = {}
for name, command in commands.items():
if not concat:
result[name] = {}
for parameter, data in command.parameters.items():
if data.typology == "enum":
value = data.values
elif data.typology == "range":
value = {"min": data.min, "max": data.max, "step": data.step}
else:
continue
if not concat:
result[name][parameter] = value
else:
result[f"{name}.{parameter}"] = value
return result
async def translate(language, json_output=False):
async with HonConnection() as hon:
async with HonAPI(anonymous=True) as hon:
keys = await hon.translation_keys(language)
if json_output:
print(json.dumps(keys, indent=4))
else:
clean_keys = json.dumps(keys).replace("\\n", "\\\\n").replace("\\\\r", "").replace("\\r", "")
clean_keys = (
json.dumps(keys)
.replace("\\n", "\\\\n")
.replace("\\\\r", "")
.replace("\\r", "")
)
keys = json.loads(clean_keys)
pretty_print(keys)
print(helper.pretty_print(keys))
async def main():
@ -104,19 +59,31 @@ async def main():
user = input("User for hOn account: ")
if not (password := args["password"]):
password = getpass("Password for hOn account: ")
async with HonConnection(user, password) as hon:
for device in hon.devices:
async with Hon(user, password) as hon:
for device in hon.appliances:
print("=" * 10, device.appliance_type, "-", device.nick_name, "=" * 10)
if args.get("keys"):
data = device.data.copy()
attr = "get" if args.get("all") else "pop"
key_print(data["attributes"].__getattribute__(attr)("parameters"))
key_print(data.__getattribute__(attr)("appliance"))
key_print(data)
pretty_print(create_command(device.commands, concat=True))
print(
helper.key_print(
data["attributes"].__getattribute__(attr)("parameters")
)
)
print(helper.key_print(data.__getattribute__(attr)("appliance")))
print(helper.key_print(data))
print(
helper.pretty_print(
helper.create_command(device.commands, concat=True)
)
)
else:
pretty_print({"data": device.data})
pretty_print({"settings": create_command(device.commands)})
print(helper.pretty_print({"data": device.data}))
print(
helper.pretty_print(
{"settings": helper.create_command(device.commands)}
)
)
def start():
@ -126,5 +93,5 @@ def start():
print("Aborted.")
if __name__ == '__main__':
if __name__ == "__main__":
start()

View file

@ -1,196 +0,0 @@
import asyncio
import json
import logging
import secrets
from datetime import datetime
from typing import List
import aiohttp as aiohttp
from pyhon import const
from pyhon.auth import HonAuth
from pyhon.device import HonDevice
_LOGGER = logging.getLogger()
class HonConnection:
def __init__(self, email="", password="", session=None) -> None:
super().__init__()
self._email = email
self._password = password
self._request_headers = {"Content-Type": "application/json"}
self._session = session
self._devices = []
self._mobile_id = secrets.token_hex(8)
async def __aenter__(self):
self._session = aiohttp.ClientSession()
if self._email and self._password:
await self.setup()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._session.close()
@property
def devices(self) -> List[HonDevice]:
return self._devices
@property
async def _headers(self):
if "cognito-token" not in self._request_headers or "id-token" not in self._request_headers:
auth = HonAuth()
if await auth.authorize(self._email, self._password, self._mobile_id):
self._request_headers["cognito-token"] = auth.cognito_token
self._request_headers["id-token"] = auth.id_token
else:
raise PermissionError("Can't Login")
return self._request_headers
async def setup(self):
async with aiohttp.ClientSession() as session:
async with session.get(f"{const.API_URL}/commands/v1/appliance",
headers=await self._headers) as resp:
try:
appliances = (await resp.json())["payload"]["appliances"]
for appliance in appliances:
device = HonDevice(self, appliance)
if device.mac_address is None:
continue
await asyncio.gather(*[
device.load_attributes(),
device.load_commands(),
device.load_statistics()])
self._devices.append(device)
except json.JSONDecodeError:
_LOGGER.error("No JSON Data after GET: %s", await resp.text())
return False
return True
async def load_commands(self, device: HonDevice):
params = {
"applianceType": device.appliance_type,
"code": device.appliance["code"],
"applianceModelId": device.appliance_model_id,
"firmwareId": device.appliance["eepromId"],
"macAddress": device.mac_address,
"fwVersion": device.appliance["fwVersion"],
"os": const.OS,
"appVersion": const.APP_VERSION,
"series": device.appliance["series"],
}
url = f"{const.API_URL}/commands/v1/retrieve"
async with self._session.get(url, params=params, headers=await self._headers) as response:
result = (await response.json()).get("payload", {})
if not result or result.pop("resultCode") != "0":
return {}
return result
async def command_history(self, device: HonDevice):
url = f"{const.API_URL}/commands/v1/appliance/{device.mac_address}/history"
async with self._session.get(url, headers=await self._headers) as response:
result = await response.json()
if not result or not result.get("payload"):
return {}
return result["payload"]["history"]
async def last_activity(self, device: HonDevice):
url = f"{const.API_URL}/commands/v1/retrieve-last-activity"
params = {"macAddress": device.mac_address}
async with self._session.get(url, params=params, headers=await self._headers) as response:
result = await response.json()
if result and (activity := result.get("attributes")):
return activity
return {}
async def appliance_configuration(self):
url = f"{const.API_URL}/config/v1/appliance-configuration"
headers = {"x-api-key": const.API_KEY, "content-type": "application/json"}
async with self._session.get(url, headers=headers) as response:
result = await response.json()
if result and (data := result.get("payload")):
return data
return {}
async def app_config(self, language="en", beta=True):
headers = {"x-api-key": const.API_KEY, "content-type": "application/json"}
url = f"{const.API_URL}/app-config"
payload = {
"languageCode": language,
"beta": beta,
"appVersion": const.APP_VERSION,
"os": const.OS
}
payload = json.dumps(payload, separators=(',', ':'))
async with self._session.post(url, headers=headers, data=payload) as response:
if (result := await response.json()) and (data := result.get("payload")):
return data
return {}
async def translation_keys(self, language="en"):
headers = {"x-api-key": const.API_KEY, "content-type": "application/json"}
config = await self.app_config(language=language)
if url := config.get("language", {}).get("jsonPath"):
async with self._session.get(url, headers=headers) as response:
if result := await response.json():
return result
return {}
async def load_attributes(self, device: HonDevice, loop=False):
params = {
"macAddress": device.mac_address,
"applianceType": device.appliance_type,
"category": "CYCLE"
}
url = f"{const.API_URL}/commands/v1/context"
async with self._session.get(url, params=params, headers=await self._headers) as response:
if response.status == 403 and not loop:
_LOGGER.error("%s - Error %s - %s", url, response.status, await response.text())
self._request_headers.pop("cognito-token", None)
self._request_headers.pop("id-token", None)
return await self.load_attributes(device, loop=True)
return (await response.json()).get("payload", {})
async def load_statistics(self, device: HonDevice):
params = {
"macAddress": device.mac_address,
"applianceType": device.appliance_type
}
url = f"{const.API_URL}/commands/v1/statistics"
async with self._session.get(url, params=params, headers=await self._headers) as response:
return (await response.json()).get("payload", {})
async def send_command(self, device, command, parameters, ancillary_parameters):
now = datetime.utcnow().isoformat()
data = {
"macAddress": device.mac_address,
"timestamp": f"{now[:-3]}Z",
"commandName": command,
"transactionId": f"{device.mac_address}_{now[:-3]}Z",
"applianceOptions": device.commands_options,
"device": {
"mobileId": self._mobile_id,
"mobileOs": const.OS,
"osVersion": const.OS_VERSION,
"appVersion": const.APP_VERSION,
"deviceModel": const.DEVICE_MODEL
},
"attributes": {
"channel": "mobileApp",
"origin": "standardProgram",
"energyLabel": "0"
},
"ancillaryParameters": ancillary_parameters,
"parameters": parameters,
"applianceType": device.appliance_type
}
url = f"{const.API_URL}/commands/v1/send"
async with self._session.post(url, headers=await self._headers, json=data) as resp:
try:
json_data = await resp.json()
except json.JSONDecodeError:
return False
if json_data["payload"]["resultCode"] == "0":
return True
return False

215
pyhon/appliance.py Normal file
View file

@ -0,0 +1,215 @@
import importlib
from contextlib import suppress
from typing import Optional, Dict, Any
from typing import TYPE_CHECKING
from pyhon import helper
from pyhon.commands import HonCommand
from pyhon.parameter.fixed import HonParameterFixed
if TYPE_CHECKING:
from pyhon import HonAPI
class HonAppliance:
def __init__(
self, api: Optional["HonAPI"], info: Dict[str, Any], zone: int = 0
) -> None:
if attributes := info.get("attributes"):
info["attributes"] = {v["parName"]: v["parValue"] for v in attributes}
self._info: Dict = info
self._api: Optional[HonAPI] = api
self._appliance_model: Dict = {}
self._commands: Dict = {}
self._statistics: Dict = {}
self._attributes: Dict = {}
self._zone: int = zone
try:
self._extra = importlib.import_module(
f"pyhon.appliances.{self.appliance_type.lower()}"
).Appliance()
except ModuleNotFoundError:
self._extra = None
def __getitem__(self, item):
if self._zone:
item += f"Z{self._zone}"
if "." in item:
result = self.data
for key in item.split("."):
if all(k in "0123456789" for k in key) and isinstance(result, list):
result = result[int(key)]
else:
result = result[key]
return result
if item in self.data:
return self.data[item]
if item in self.attributes["parameters"]:
return self.attributes["parameters"].get(item)
return self.info[item]
def get(self, item, default=None):
try:
return self[item]
except (KeyError, IndexError):
return default
def _check_name_zone(self, name: str, frontend: bool = True) -> str:
middle = " Z" if frontend else "_z"
if (attribute := self._info.get(name, "")) and self._zone:
return f"{attribute}{middle}{self._zone}"
return attribute
@property
def appliance_model_id(self) -> str:
return self._info.get("applianceModelId", "")
@property
def appliance_type(self) -> str:
return self._info.get("applianceTypeName", "")
@property
def mac_address(self) -> str:
return self.info.get("macAddress", "")
@property
def unique_id(self) -> str:
return self._check_name_zone("macAddress", frontend=False)
@property
def model_name(self) -> str:
return self._check_name_zone("modelName")
@property
def nick_name(self) -> str:
return self._check_name_zone("nickName")
@property
def commands_options(self):
return self._appliance_model.get("options")
@property
def commands(self):
return self._commands
@property
def attributes(self):
return self._attributes
@property
def statistics(self):
return self._statistics
@property
def info(self):
return self._info
@property
def zone(self) -> int:
return self._zone
async def _recover_last_command_states(self, commands):
command_history = await self._api.command_history(self)
for name, command in commands.items():
last = next(
(
index
for (index, d) in enumerate(command_history)
if d.get("command", {}).get("commandName") == name
),
None,
)
if last is None:
continue
parameters = command_history[last].get("command", {}).get("parameters", {})
if command.programs and parameters.get("program"):
command.program = parameters.pop("program").split(".")[-1].lower()
command = self.commands[name]
for key, data in command.settings.items():
if (
not isinstance(data, HonParameterFixed)
and parameters.get(key) is not None
):
with suppress(ValueError):
data.value = parameters.get(key)
async def load_commands(self):
raw = await self._api.load_commands(self)
self._appliance_model = raw.pop("applianceModel")
for item in ["settings", "options", "dictionaryId"]:
raw.pop(item)
commands = {}
for command, attr in raw.items():
if "parameters" in attr:
commands[command] = HonCommand(command, attr, self._api, self)
elif "parameters" in attr[list(attr)[0]]:
multi = {}
for program, attr2 in attr.items():
program = program.split(".")[-1].lower()
cmd = HonCommand(
command,
attr2,
self._api,
self,
programs=multi,
program_name=program,
)
multi[program] = cmd
commands[command] = cmd
self._commands = commands
await self._recover_last_command_states(commands)
@property
def settings(self):
result = {}
for name, command in self._commands.items():
for key, setting in command.settings.items():
result[f"{name}.{key}"] = setting
if self._extra:
return self._extra.settings(result)
return result
@property
def parameters(self):
result = {}
for name, command in self._commands.items():
for key, parameter in (command.parameters | command.ancillary_parameters).items():
result.setdefault(name, {})[key] = parameter.value
return result
async def load_attributes(self):
self._attributes = await self._api.load_attributes(self)
for name, values in self._attributes.pop("shadow").get("parameters").items():
self._attributes.setdefault("parameters", {})[name] = values["parNewVal"]
async def load_statistics(self):
self._statistics = await self._api.load_statistics(self)
async def update(self):
await self.load_attributes()
@property
def data(self):
result = {
"attributes": self.attributes,
"appliance": self.info,
"statistics": self.statistics,
**self.parameters,
}
if self._extra:
return self._extra.data(result)
return result
@property
def diagnose(self):
data = self.data.copy()
for sensible in ["PK", "SK", "serialNumber", "code", "coords"]:
data["appliance"].pop(sensible, None)
result = helper.pretty_print({"data": self.data}, whitespace="\u200B \u200B ")
result += helper.pretty_print(
{"commands": helper.create_command(self.commands)},
whitespace="\u200B \u200B ",
)
return result.replace(self.mac_address, "12-34-56-78-90-ab")

9
pyhon/appliances/dw.py Normal file
View file

@ -0,0 +1,9 @@
class Appliance:
def data(self, data):
if data["attributes"]["lastConnEvent"]["category"] == "DISCONNECTED":
data["attributes"]["parameters"]["machMode"] = "0"
data["active"] = bool(data.get("attributes", {}).get("activity"))
return data
def settings(self, settings):
return settings

View file

@ -1,157 +0,0 @@
import json
import logging
import re
import secrets
import urllib
from urllib import parse
import aiohttp as aiohttp
from yarl import URL
from pyhon import const
_LOGGER = logging.getLogger()
class HonAuth:
def __init__(self) -> None:
self._access_token = ""
self._refresh_token = ""
self._cognito_token = ""
self._id_token = ""
@property
def cognito_token(self):
return self._cognito_token
@property
def id_token(self):
return self._id_token
@property
def access_token(self):
return self._access_token
@property
def refresh_token(self):
return self._refresh_token
async def _load_login(self, session):
nonce = secrets.token_hex(16)
nonce = f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}"
params = {
"response_type": "token+id_token",
"client_id": const.CLIENT_ID,
"redirect_uri": urllib.parse.quote(f"{const.APP}://mobilesdk/detect/oauth/done"),
"display": "touch",
"scope": "api openid refresh_token web",
"nonce": nonce
}
params = "&".join([f"{k}={v}" for k, v in params.items()])
async with session.get(f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}") as resp:
if not (login_url := re.findall("url = '(.+?)'", await resp.text())):
return False
async with session.get(login_url[0], allow_redirects=False) as redirect1:
if not (url := redirect1.headers.get("Location")):
return False
async with session.get(url, allow_redirects=False) as redirect2:
if not (url := redirect2.headers.get("Location") + "&System=IoT_Mobile_App&RegistrationSubChannel=hOn"):
return False
async with session.get(URL(url, encoded=True)) as login_screen:
if context := re.findall('"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text()):
fw_uid, loaded_str = context[0]
loaded = json.loads(loaded_str)
login_url = login_url[0].replace("/".join(const.AUTH_API.split("/")[:-1]), "")
return fw_uid, loaded, login_url
return False
async def _login(self, session, email, password, fw_uid, loaded, login_url):
data = {
"message": {
"actions": [
{
"id": "79;a",
"descriptor": "apex://LightningLoginCustomController/ACTION$login",
"callingDescriptor": "markup://c:loginForm",
"params": {
"username": email,
"password": password,
"startUrl": parse.unquote(login_url.split("startURL=")[-1]).split("%3D")[0]
}
}
]
},
"aura.context": {
"mode": "PROD",
"fwuid": fw_uid,
"app": "siteforce:loginApp2",
"loaded": loaded,
"dn": [],
"globals": {},
"uad": False},
"aura.pageURI": login_url,
"aura.token": None}
params = {"r": 3, "other.LightningLoginCustom.login": 1}
async with session.post(
const.AUTH_API + "/s/sfsites/aura",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()),
params=params
) as response:
if response.status == 200:
try:
return (await response.json())["events"][0]["attributes"]["values"]["url"]
except json.JSONDecodeError:
pass
_LOGGER.error("Unable to login: %s\n%s", response.status, await response.text())
return ""
async def _get_token(self, session, url):
async with session.get(url) as resp:
if resp.status != 200:
_LOGGER.error("Unable to get token: %s", resp.status)
return False
url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await resp.text())
async with session.get(url[0]) as resp:
if resp.status != 200:
_LOGGER.error("Unable to get token: %s", resp.status)
return False
url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await resp.text())
url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0]
async with session.get(url) as resp:
if resp.status != 200:
_LOGGER.error("Unable to connect to the login service: %s", resp.status)
return False
text = await resp.text()
if access_token := re.findall("access_token=(.*?)&", text):
self._access_token = access_token[0]
if refresh_token := re.findall("refresh_token=(.*?)&", text):
self._refresh_token = refresh_token[0]
if id_token := re.findall("id_token=(.*?)&", text):
self._id_token = id_token[0]
return True
async def authorize(self, email, password, mobile_id):
headers = {"user-agent": const.USER_AGENT}
async with aiohttp.ClientSession(headers=headers) as session:
if login_site := await self._load_login(session):
fw_uid, loaded, login_url = login_site
else:
return False
if not (url := await self._login(session, email, password, fw_uid, loaded, login_url)):
return False
if not await self._get_token(session, url):
return False
post_headers = {"Content-Type": "application/json", "id-token": self._id_token}
data = {"appVersion": const.APP_VERSION, "mobileId": mobile_id, "osVersion": const.OS_VERSION,
"os": const.OS, "deviceModel": const.DEVICE_MODEL}
async with session.post(f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data) as resp:
try:
json_data = await resp.json()
except json.JSONDecodeError:
_LOGGER.error("No JSON Data after POST: %s", await resp.text())
return False
self._cognito_token = json_data["cognitoUser"]["Token"]
return True

View file

@ -1,23 +1,47 @@
from pyhon.parameter import HonParameterFixed, HonParameterEnum, HonParameterRange, HonParameterProgram
from typing import Optional, Dict, Any, List, TYPE_CHECKING
from pyhon.parameter.base import HonParameter
from pyhon.parameter.enum import HonParameterEnum
from pyhon.parameter.fixed import HonParameterFixed
from pyhon.parameter.program import HonParameterProgram
from pyhon.parameter.range import HonParameterRange
if TYPE_CHECKING:
from pyhon import HonAPI
from pyhon.appliance import HonAppliance
class HonCommand:
def __init__(self, name, attributes, connector, device, multi=None, program=""):
self._connector = connector
self._device = device
self._name = name
self._multi = multi or {}
self._program = program
self._description = attributes.get("description", "")
self._parameters = self._create_parameters(attributes.get("parameters", {}))
self._ancillary_parameters = self._create_parameters(attributes.get("ancillaryParameters", {}))
def __init__(
self,
name: str,
attributes: Dict[str, Any],
api: "HonAPI",
appliance: "HonAppliance",
programs: Optional[Dict[str, "HonCommand"]] = None,
program_name: str = "",
):
self._api: HonAPI = api
self._appliance: "HonAppliance" = appliance
self._name: str = name
self._programs: Optional[Dict[str, "HonCommand"]] = programs or {}
self._program_name: str = program_name
self._description: str = attributes.get("description", "")
self._parameters: Dict[str, HonParameter] = self._create_parameters(
attributes.get("parameters", {})
)
self._ancillary_parameters: Dict[str, HonParameter] = self._create_parameters(
attributes.get("ancillaryParameters", {})
)
def __repr__(self):
def __repr__(self) -> str:
return f"{self._name} command"
def _create_parameters(self, parameters):
result = {}
def _create_parameters(self, parameters: Dict) -> Dict[str, HonParameter]:
result: Dict[str, HonParameter] = {}
for parameter, attributes in parameters.items():
if parameter == "zoneMap" and self._appliance.zone:
attributes["default"] = self._appliance.zone
match attributes.get("typology"):
case "range":
result[parameter] = HonParameterRange(parameter, attributes)
@ -25,32 +49,47 @@ class HonCommand:
result[parameter] = HonParameterEnum(parameter, attributes)
case "fixed":
result[parameter] = HonParameterFixed(parameter, attributes)
if self._multi:
if self._programs:
result["program"] = HonParameterProgram("program", self)
return result
@property
def parameters(self):
def parameters(self) -> Dict[str, HonParameter]:
return self._parameters
@property
def ancillary_parameters(self):
return {key: parameter.value for key, parameter in self._ancillary_parameters.items()}
def ancillary_parameters(self) -> Dict[str, HonParameter]:
return self._ancillary_parameters
async def send(self):
parameters = {name: parameter.value for name, parameter in self._parameters.items()}
return await self._connector.send_command(self._device, self._name, parameters, self.ancillary_parameters)
async def send(self) -> bool:
parameters = {
name: parameter.value for name, parameter in self._parameters.items()
}
return await self._api.send_command(
self._appliance, self._name, parameters, self.ancillary_parameters
)
def get_programs(self):
return self._multi
@property
def programs(self) -> Dict[str, "HonCommand"]:
if self._programs is None:
return {}
return self._programs
def set_program(self, program):
self._device.commands[self._name] = self._multi[program]
@property
def program(self) -> str:
return self._program_name
def _get_settings_keys(self, command=None):
command = command or self
@program.setter
def program(self, program: str) -> None:
self._appliance.commands[self._name] = self.programs[program]
def _get_settings_keys(self, command: Optional["HonCommand"] = None) -> List[str]:
if command is None:
command = self
keys = []
for key, parameter in command._parameters.items():
for key, parameter in (
command._parameters | command._ancillary_parameters
).items():
if isinstance(parameter, HonParameterFixed):
continue
if key not in keys:
@ -58,14 +97,22 @@ class HonCommand:
return keys
@property
def setting_keys(self):
if not self._multi:
def setting_keys(self) -> List[str]:
if not self._programs:
return self._get_settings_keys()
result = [key for cmd in self._multi.values() for key in self._get_settings_keys(cmd)]
result = [
key
for cmd in self._programs.values()
for key in self._get_settings_keys(cmd)
]
return list(set(result + ["program"]))
@property
def settings(self):
def settings(self) -> Dict[str, HonParameter]:
"""Parameters with typology enum and range"""
return {s: self._parameters.get(s) for s in self.setting_keys if self._parameters.get(s) is not None}
return {
s: param
for s in self.setting_keys
if (param := self._parameters.get(s)) is not None
or (param := self._ancillary_parameters.get(s)) is not None
}

View file

195
pyhon/connection/api.py Normal file
View file

@ -0,0 +1,195 @@
import json
import logging
from datetime import datetime
from typing import Dict, Optional
from aiohttp import ClientSession
from typing_extensions import Self
from pyhon import const, exceptions
from pyhon.appliance import HonAppliance
from pyhon.connection.auth import HonAuth
from pyhon.connection.handler.anonym import HonAnonymousConnectionHandler
from pyhon.connection.handler.hon import HonConnectionHandler
_LOGGER = logging.getLogger()
class HonAPI:
def __init__(
self,
email: str = "",
password: str = "",
anonymous: bool = False,
session: Optional[ClientSession] = None,
) -> None:
super().__init__()
self._email: str = email
self._password: str = password
self._anonymous: bool = anonymous
self._hon_handler: Optional[HonConnectionHandler] = None
self._hon_anonymous_handler: Optional[HonAnonymousConnectionHandler] = None
self._session: Optional[ClientSession] = session
async def __aenter__(self) -> Self:
return await self.create()
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
await self.close()
@property
def auth(self) -> HonAuth:
if self._hon is None or self._hon.auth is None:
raise exceptions.NoAuthenticationException
return self._hon.auth
@property
def _hon(self):
if self._hon_handler is None:
raise exceptions.NoAuthenticationException
return self._hon_handler
@property
def _hon_anonymous(self):
if self._hon_anonymous_handler is None:
raise exceptions.NoAuthenticationException
return self._hon_anonymous_handler
async def create(self) -> Self:
self._hon_anonymous_handler = await HonAnonymousConnectionHandler(
self._session
).create()
if not self._anonymous:
self._hon_handler = await HonConnectionHandler(
self._email, self._password, self._session
).create()
return self
async def load_appliances(self) -> Dict:
async with self._hon.get(f"{const.API_URL}/commands/v1/appliance") as resp:
return await resp.json()
async def load_commands(self, appliance: HonAppliance) -> Dict:
params: Dict = {
"applianceType": appliance.appliance_type,
"code": appliance.info["code"],
"applianceModelId": appliance.appliance_model_id,
"firmwareId": appliance.info["eepromId"],
"macAddress": appliance.mac_address,
"fwVersion": appliance.info["fwVersion"],
"os": const.OS,
"appVersion": const.APP_VERSION,
"series": appliance.info["series"],
}
url: str = f"{const.API_URL}/commands/v1/retrieve"
async with self._hon.get(url, params=params) as response:
result: Dict = (await response.json()).get("payload", {})
if not result or result.pop("resultCode") != "0":
return {}
return result
async def command_history(self, appliance: HonAppliance) -> Dict:
url: str = (
f"{const.API_URL}/commands/v1/appliance/{appliance.mac_address}/history"
)
async with self._hon.get(url) as response:
result: Dict = await response.json()
if not result or not result.get("payload"):
return {}
return result["payload"]["history"]
async def last_activity(self, appliance: HonAppliance) -> Dict:
url: str = f"{const.API_URL}/commands/v1/retrieve-last-activity"
params: Dict = {"macAddress": appliance.mac_address}
async with self._hon.get(url, params=params) as response:
result: Dict = await response.json()
if result and (activity := result.get("attributes")):
return activity
return {}
async def load_attributes(self, appliance: HonAppliance) -> Dict:
params: Dict = {
"macAddress": appliance.mac_address,
"applianceType": appliance.appliance_type,
"category": "CYCLE",
}
url: str = f"{const.API_URL}/commands/v1/context"
async with self._hon.get(url, params=params) as response:
return (await response.json()).get("payload", {})
async def load_statistics(self, appliance: HonAppliance) -> Dict:
params: Dict = {
"macAddress": appliance.mac_address,
"applianceType": appliance.appliance_type,
}
url: str = f"{const.API_URL}/commands/v1/statistics"
async with self._hon.get(url, params=params) as response:
return (await response.json()).get("payload", {})
async def send_command(
self,
appliance: HonAppliance,
command: str,
parameters: Dict,
ancillary_parameters: Dict,
) -> bool:
now: str = datetime.utcnow().isoformat()
data: Dict = {
"macAddress": appliance.mac_address,
"timestamp": f"{now[:-3]}Z",
"commandName": command,
"transactionId": f"{appliance.mac_address}_{now[:-3]}Z",
"applianceOptions": appliance.commands_options,
"device": self._hon.device.get(mobile=True),
"attributes": {
"channel": "mobileApp",
"origin": "standardProgram",
"energyLabel": "0",
},
"ancillaryParameters": ancillary_parameters,
"parameters": parameters,
"applianceType": appliance.appliance_type,
}
url: str = f"{const.API_URL}/commands/v1/send"
async with self._hon.post(url, json=data) as response:
json_data: Dict = await response.json()
if json_data.get("payload", {}).get("resultCode") == "0":
return True
_LOGGER.error(await response.text())
return False
async def appliance_configuration(self) -> Dict:
url: str = f"{const.API_URL}/config/v1/appliance-configuration"
async with self._hon_anonymous.get(url) as response:
result: Dict = await response.json()
if result and (data := result.get("payload")):
return data
return {}
async def app_config(self, language: str = "en", beta: bool = True) -> Dict:
url: str = f"{const.API_URL}/app-config"
payload_data: Dict = {
"languageCode": language,
"beta": beta,
"appVersion": const.APP_VERSION,
"os": const.OS,
}
payload: str = json.dumps(payload_data, separators=(",", ":"))
async with self._hon_anonymous.post(url, data=payload) as response:
if (result := await response.json()) and (data := result.get("payload")):
return data
return {}
async def translation_keys(self, language: str = "en") -> Dict:
config = await self.app_config(language=language)
if url := config.get("language", {}).get("jsonPath"):
async with self._hon_anonymous.get(url) as response:
if result := await response.json():
return result
return {}
async def close(self) -> None:
if self._hon_handler is not None:
await self._hon_handler.close()
if self._hon_anonymous_handler is not None:
await self._hon_anonymous_handler.close()

272
pyhon/connection/auth.py Normal file
View file

@ -0,0 +1,272 @@
import json
import logging
import re
import secrets
import urllib
from contextlib import suppress
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, Optional
from urllib import parse
from urllib.parse import quote
from aiohttp import ClientResponse
from yarl import URL
from pyhon import const, exceptions
from pyhon.connection.handler.auth import HonAuthConnectionHandler
_LOGGER = logging.getLogger(__name__)
@dataclass
class HonLoginData:
url: str = ""
email: str = ""
password: str = ""
fw_uid: str = ""
loaded: Optional[Dict] = None
class HonAuth:
_TOKEN_EXPIRES_AFTER_HOURS = 8
_TOKEN_EXPIRE_WARNING_HOURS = 7
def __init__(self, session, email, password, device) -> None:
self._session = session
self._request = HonAuthConnectionHandler(session)
self._login_data = HonLoginData()
self._login_data.email = email
self._login_data.password = password
self._access_token = ""
self._refresh_token = ""
self._cognito_token = ""
self._id_token = ""
self._device = device
self._expires: datetime = datetime.utcnow()
@property
def cognito_token(self) -> str:
return self._cognito_token
@property
def id_token(self) -> str:
return self._id_token
@property
def access_token(self) -> str:
return self._access_token
@property
def refresh_token(self) -> str:
return self._refresh_token
def _check_token_expiration(self, hours: int) -> bool:
return datetime.utcnow() >= self._expires + timedelta(hours=hours)
@property
def token_is_expired(self) -> bool:
return self._check_token_expiration(self._TOKEN_EXPIRES_AFTER_HOURS)
@property
def token_expires_soon(self) -> bool:
return self._check_token_expiration(self._TOKEN_EXPIRE_WARNING_HOURS)
async def _error_logger(self, response: ClientResponse, fail: bool = True) -> None:
output = "hOn Authentication Error\n"
for i, (status, url) in enumerate(self._request.called_urls):
output += f" {i + 1: 2d} {status} - {url}\n"
output += f"ERROR - {response.status} - {response.request_info.url}\n"
output += f"{15 * '='} Response {15 * '='}\n{await response.text()}\n{40 * '='}"
_LOGGER.error(output)
if fail:
raise exceptions.HonAuthenticationError("Can't login")
@staticmethod
def _generate_nonce() -> str:
nonce = secrets.token_hex(16)
return f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}"
async def _load_login(self) -> bool:
login_url = await self._introduce()
login_url = await self._handle_redirects(login_url)
return await self._login_url(login_url)
async def _introduce(self) -> str:
redirect_uri = urllib.parse.quote(f"{const.APP}://mobilesdk/detect/oauth/done")
params = {
"response_type": "token+id_token",
"client_id": const.CLIENT_ID,
"redirect_uri": redirect_uri,
"display": "touch",
"scope": "api openid refresh_token web",
"nonce": self._generate_nonce(),
}
params_encode = "&".join([f"{k}={v}" for k, v in params.items()])
url = f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params_encode}"
async with self._request.get(url) as response:
text = await response.text()
self._expires = datetime.utcnow()
if not (login_url := re.findall("url = '(.+?)'", text)):
if "oauth/done#access_token=" in text:
self._parse_token_data(text)
raise exceptions.HonNoAuthenticationNeeded()
await self._error_logger(response)
return login_url[0]
async def _manual_redirect(self, url: str) -> str:
async with self._request.get(url, allow_redirects=False) as response:
if not (new_location := response.headers.get("Location", "")):
await self._error_logger(response)
return new_location
async def _handle_redirects(self, login_url) -> str:
redirect1 = await self._manual_redirect(login_url)
redirect2 = await self._manual_redirect(redirect1)
return f"{redirect2}&System=IoT_Mobile_App&RegistrationSubChannel=hOn"
async def _login_url(self, login_url: str) -> bool:
headers = {"user-agent": const.USER_AGENT}
url = URL(login_url, encoded=True)
async with self._request.get(url, headers=headers) as response:
text = await response.text()
if context := re.findall('"fwuid":"(.*?)","loaded":(\\{.*?})', text):
fw_uid, loaded_str = context[0]
self._login_data.fw_uid = fw_uid
self._login_data.loaded = json.loads(loaded_str)
self._login_data.url = login_url.replace(
"/".join(const.AUTH_API.split("/")[:-1]), ""
)
return True
await self._error_logger(response)
return False
async def _login(self) -> str:
start_url = self._login_data.url.rsplit("startURL=", maxsplit=1)[-1]
start_url = parse.unquote(start_url).split("%3D")[0]
action = {
"id": "79;a",
"descriptor": "apex://LightningLoginCustomController/ACTION$login",
"callingDescriptor": "markup://c:loginForm",
"params": {
"username": quote(self._login_data.email),
"password": quote(self._login_data.password),
"startUrl": start_url,
},
}
data = {
"message": {"actions": [action]},
"aura.context": {
"mode": "PROD",
"fwuid": self._login_data.fw_uid,
"app": "siteforce:loginApp2",
"loaded": self._login_data.loaded,
"dn": [],
"globals": {},
"uad": False,
},
"aura.pageURI": self._login_data.url,
"aura.token": None,
}
params = {"r": 3, "other.LightningLoginCustom.login": 1}
async with self._request.post(
const.AUTH_API + "/s/sfsites/aura",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()),
params=params,
) as response:
if response.status == 200:
with suppress(json.JSONDecodeError, KeyError):
result = await response.json()
return result["events"][0]["attributes"]["values"]["url"]
await self._error_logger(response)
return ""
def _parse_token_data(self, text: str) -> None:
if access_token := re.findall("access_token=(.*?)&", text):
self._access_token = access_token[0]
if refresh_token := re.findall("refresh_token=(.*?)&", text):
self._refresh_token = refresh_token[0]
if id_token := re.findall("id_token=(.*?)&", text):
self._id_token = id_token[0]
async def _get_token(self, url: str) -> bool:
async with self._request.get(url) as response:
if response.status != 200:
await self._error_logger(response)
return False
url_search = re.findall(
"href\\s*=\\s*[\"'](.+?)[\"']", await response.text()
)
if not url_search:
await self._error_logger(response)
return False
if "ProgressiveLogin" in url_search[0]:
async with self._request.get(url_search[0]) as response:
if response.status != 200:
await self._error_logger(response)
return False
url_search = re.findall(
"href\\s*=\\s*[\"'](.*?)[\"']", await response.text()
)
url = "/".join(const.AUTH_API.split("/")[:-1]) + url_search[0]
async with self._request.get(url) as response:
if response.status != 200:
await self._error_logger(response)
return False
self._parse_token_data(await response.text())
return True
async def _api_auth(self) -> bool:
post_headers = {"id-token": self._id_token}
data = self._device.get()
async with self._request.post(
f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data
) as response:
try:
json_data = await response.json()
except json.JSONDecodeError:
await self._error_logger(response)
return False
self._cognito_token = json_data["cognitoUser"]["Token"]
return True
async def authenticate(self) -> None:
self.clear()
try:
if not await self._load_login():
raise exceptions.HonAuthenticationError("Can't open login page")
if not (url := await self._login()):
raise exceptions.HonAuthenticationError("Can't login")
if not await self._get_token(url):
raise exceptions.HonAuthenticationError("Can't get token")
if not await self._api_auth():
raise exceptions.HonAuthenticationError("Can't get api token")
except exceptions.HonNoAuthenticationNeeded:
return
async def refresh(self) -> bool:
params = {
"client_id": const.CLIENT_ID,
"refresh_token": self._refresh_token,
"grant_type": "refresh_token",
}
async with self._request.post(
f"{const.AUTH_API}/services/oauth2/token", params=params
) as response:
if response.status >= 400:
await self._error_logger(response, fail=False)
return False
data = await response.json()
self._expires = datetime.utcnow()
self._id_token = data["id_token"]
self._access_token = data["access_token"]
return await self._api_auth()
def clear(self) -> None:
self._session.cookie_jar.clear_domain(const.AUTH_API.split("/")[-2])
self._request.called_urls = []
self._cognito_token = ""
self._id_token = ""
self._access_token = ""
self._refresh_token = ""

View file

@ -0,0 +1,43 @@
import secrets
from typing import Dict
from pyhon import const
class HonDevice:
def __init__(self) -> None:
self._app_version: str = const.APP_VERSION
self._os_version: int = const.OS_VERSION
self._os: str = const.OS
self._device_model: str = const.DEVICE_MODEL
self._mobile_id: str = secrets.token_hex(8)
@property
def app_version(self) -> str:
return self._app_version
@property
def os_version(self) -> int:
return self._os_version
@property
def os(self) -> str:
return self._os
@property
def device_model(self) -> str:
return self._device_model
@property
def mobile_id(self) -> str:
return self._mobile_id
def get(self, mobile: bool = False) -> Dict:
result = {
"appVersion": self.app_version,
"mobileId": self.mobile_id,
"os": self.os,
"osVersion": self.os_version,
"deviceModel": self.device_model,
}
return (result | {"mobileOs": result.pop("os")}) if mobile else result

View file

View file

@ -0,0 +1,21 @@
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Callable, Dict
from pyhon import const
from pyhon.connection.handler.base import ConnectionHandler
_LOGGER = logging.getLogger(__name__)
class HonAnonymousConnectionHandler(ConnectionHandler):
_HEADERS: Dict = ConnectionHandler._HEADERS | {"x-api-key": const.API_KEY}
@asynccontextmanager
async def _intercept(self, method: Callable, *args, **kwargs) -> AsyncIterator:
kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS
async with method(*args, **kwargs) as response:
if response.status == 403:
_LOGGER.error("Can't authenticate anymore")
yield response

View file

@ -0,0 +1,36 @@
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Optional, Callable, List, Tuple
import aiohttp
from pyhon import const
from pyhon.connection.handler.base import ConnectionHandler
_LOGGER = logging.getLogger(__name__)
class HonAuthConnectionHandler(ConnectionHandler):
_HEADERS = {"user-agent": const.USER_AGENT}
def __init__(self, session: Optional[aiohttp.ClientSession] = None) -> None:
super().__init__(session)
self._called_urls: List[Tuple[int, str]] = []
@property
def called_urls(self) -> List[Tuple[int, str]]:
return self._called_urls
@called_urls.setter
def called_urls(self, called_urls: List[Tuple[int, str]]) -> None:
self._called_urls = called_urls
@asynccontextmanager
async def _intercept(
self, method: Callable, *args, loop: int = 0, **kwargs
) -> AsyncIterator:
kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS
async with method(*args, **kwargs) as response:
self._called_urls.append((response.status, response.request_info.url))
yield response

View file

@ -0,0 +1,57 @@
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Optional, Callable, Dict
import aiohttp
from typing_extensions import Self
from pyhon import const, exceptions
_LOGGER = logging.getLogger(__name__)
class ConnectionHandler:
_HEADERS: Dict = {
"user-agent": const.USER_AGENT,
"Content-Type": "application/json",
}
def __init__(self, session: Optional[aiohttp.ClientSession] = None) -> None:
self._create_session: bool = session is None
self._session: Optional[aiohttp.ClientSession] = session
async def __aenter__(self) -> Self:
return await self.create()
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
await self.close()
async def create(self) -> Self:
if self._create_session:
self._session = aiohttp.ClientSession()
return self
@asynccontextmanager
def _intercept(self, method: Callable, *args, loop: int = 0, **kwargs):
raise NotImplementedError
@asynccontextmanager
async def get(self, *args, **kwargs) -> AsyncIterator[aiohttp.ClientResponse]:
if self._session is None:
raise exceptions.NoSessionException()
response: aiohttp.ClientResponse
async with self._intercept(self._session.get, *args, **kwargs) as response:
yield response
@asynccontextmanager
async def post(self, *args, **kwargs) -> AsyncIterator[aiohttp.ClientResponse]:
if self._session is None:
raise exceptions.NoSessionException()
response: aiohttp.ClientResponse
async with self._intercept(self._session.post, *args, **kwargs) as response:
yield response
async def close(self) -> None:
if self._create_session and self._session is not None:
await self._session.close()

View file

@ -0,0 +1,102 @@
import json
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Optional, Callable, Dict
import aiohttp
from typing_extensions import Self
from pyhon.connection.auth import HonAuth
from pyhon.connection.device import HonDevice
from pyhon.connection.handler.base import ConnectionHandler
from pyhon.exceptions import HonAuthenticationError, NoAuthenticationException
_LOGGER = logging.getLogger(__name__)
class HonConnectionHandler(ConnectionHandler):
def __init__(
self, email: str, password: str, session: Optional[aiohttp.ClientSession] = None
) -> None:
super().__init__(session=session)
self._device: HonDevice = HonDevice()
self._email: str = email
self._password: str = password
if not self._email:
raise HonAuthenticationError("An email address must be specified")
if not self._password:
raise HonAuthenticationError("A password address must be specified")
self._auth: Optional[HonAuth] = None
@property
def auth(self) -> HonAuth:
if self._auth is None:
raise NoAuthenticationException()
return self._auth
@property
def device(self) -> HonDevice:
return self._device
async def create(self) -> Self:
await super().create()
self._auth = HonAuth(self._session, self._email, self._password, self._device)
return self
async def _check_headers(self, headers: Dict) -> Dict:
if not (self.auth.cognito_token and self.auth.id_token):
await self.auth.authenticate()
headers["cognito-token"] = self.auth.cognito_token
headers["id-token"] = self.auth.id_token
return self._HEADERS | headers
@asynccontextmanager
async def _intercept(
self, method: Callable, *args, loop: int = 0, **kwargs
) -> AsyncIterator:
kwargs["headers"] = await self._check_headers(kwargs.get("headers", {}))
async with method(*args, **kwargs) as response:
if (
self.auth.token_expires_soon or response.status in [401, 403]
) and loop == 0:
_LOGGER.info("Try refreshing token...")
await self.auth.refresh()
async with self._intercept(
method, *args, loop=loop + 1, **kwargs
) as result:
yield result
elif (
self.auth.token_is_expired or response.status in [401, 403]
) and loop == 1:
_LOGGER.warning(
"%s - Error %s - %s",
response.request_info.url,
response.status,
await response.text(),
)
await self.create()
async with self._intercept(
method, *args, loop=loop + 1, **kwargs
) as result:
yield result
elif loop >= 2:
_LOGGER.error(
"%s - Error %s - %s",
response.request_info.url,
response.status,
await response.text(),
)
raise HonAuthenticationError("Login failure")
else:
try:
await response.json()
yield response
except json.JSONDecodeError:
_LOGGER.warning(
"%s - JsonDecodeError %s - %s",
response.request_info.url,
response.status,
await response.text(),
)
raise HonAuthenticationError("Decode Error")

View file

@ -1,156 +0,0 @@
import importlib
from contextlib import suppress
from pyhon.commands import HonCommand
from pyhon.parameter import HonParameterFixed
class HonDevice:
def __init__(self, connector, appliance):
if attributes := appliance.get("attributes"):
appliance["attributes"] = {v["parName"]: v["parValue"] for v in attributes}
self._appliance = appliance
self._connector = connector
self._appliance_model = {}
self._commands = {}
self._statistics = {}
self._attributes = {}
try:
self._extra = importlib.import_module(f'pyhon.appliances.{self.appliance_type.lower()}').Appliance()
except ModuleNotFoundError:
self._extra = None
def __getitem__(self, item):
if "." in item:
result = self.data
for key in item.split("."):
if all([k in "0123456789" for k in key]) and type(result) is list:
result = result[int(key)]
else:
result = result[key]
return result
else:
if item in self.data:
return self.data[item]
if item in self.attributes["parameters"]:
return self.attributes["parameters"].get(item)
return self.appliance[item]
def get(self, item, default=None):
try:
return self[item]
except (KeyError, IndexError):
return default
@property
def appliance_model_id(self):
return self._appliance.get("applianceModelId")
@property
def appliance_type(self):
return self._appliance.get("applianceTypeName")
@property
def mac_address(self):
return self._appliance.get("macAddress")
@property
def model_name(self):
return self._appliance.get("modelName")
@property
def nick_name(self):
return self._appliance.get("nickName")
@property
def commands_options(self):
return self._appliance_model.get("options")
@property
def commands(self):
return self._commands
@property
def attributes(self):
return self._attributes
@property
def statistics(self):
return self._statistics
@property
def appliance(self):
return self._appliance
async def _recover_last_command_states(self, commands):
command_history = await self._connector.command_history(self)
for name, command in commands.items():
last = next((index for (index, d) in enumerate(command_history) if d.get("command", {}).get("commandName") == name), None)
if last is None:
continue
parameters = command_history[last].get("command", {}).get("parameters", {})
if command._multi and parameters.get("program"):
command.set_program(parameters.pop("program").split(".")[-1].lower())
command = self.commands[name]
for key, data in command.settings.items():
if not isinstance(data, HonParameterFixed) and parameters.get(key) is not None:
with suppress(ValueError):
data.value = parameters.get(key)
async def load_commands(self):
raw = await self._connector.load_commands(self)
self._appliance_model = raw.pop("applianceModel")
for item in ["settings", "options", "dictionaryId"]:
raw.pop(item)
commands = {}
for command, attr in raw.items():
if "parameters" in attr:
commands[command] = HonCommand(command, attr, self._connector, self)
elif "parameters" in attr[list(attr)[0]]:
multi = {}
for program, attr2 in attr.items():
program = program.split(".")[-1].lower()
cmd = HonCommand(command, attr2, self._connector, self, multi=multi, program=program)
multi[program] = cmd
commands[command] = cmd
self._commands = commands
await self._recover_last_command_states(commands)
@property
def settings(self):
result = {}
for name, command in self._commands.items():
for key, setting in command.settings.items():
result[f"{name}.{key}"] = setting
if self._extra:
return self._extra.settings(result)
return result
@property
def parameters(self):
result = {}
for name, command in self._commands.items():
for key, parameter in command.parameters.items():
result.setdefault(name, {})[key] = parameter.value
return result
async def load_attributes(self):
self._attributes = await self._connector.load_attributes(self)
for name, values in self._attributes.pop("shadow").get("parameters").items():
self._attributes.setdefault("parameters", {})[name] = values["parNewVal"]
async def load_statistics(self):
self._statistics = await self._connector.load_statistics(self)
async def update(self):
await self.load_attributes()
@property
def data(self):
result = {"attributes": self.attributes, "appliance": self.appliance, "statistics": self.statistics,
**self.parameters}
if self._extra:
return self._extra.data(result)
return result

14
pyhon/exceptions.py Normal file
View file

@ -0,0 +1,14 @@
class HonAuthenticationError(Exception):
pass
class HonNoAuthenticationNeeded(Exception):
pass
class NoSessionException(Exception):
pass
class NoAuthenticationException(Exception):
pass

63
pyhon/helper.py Normal file
View file

@ -0,0 +1,63 @@
def key_print(data, key="", start=True):
result = ""
if isinstance(data, list):
for i, value in enumerate(data):
result += key_print(value, key=f"{key}.{i}", start=False)
elif isinstance(data, dict):
for k, value in sorted(data.items()):
result += key_print(value, key=k if start else f"{key}.{k}", start=False)
else:
result += f"{key}: {data}\n"
return result
# yaml.dump() would be done the same, but needs an additional dependency...
def pretty_print(data, key="", intend=0, is_list=False, whitespace=" "):
result = ""
if isinstance(data, list):
if key:
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n"
intend += 1
for i, value in enumerate(data):
result += pretty_print(
value, intend=intend, is_list=True, whitespace=whitespace
)
elif isinstance(data, dict):
if key:
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n"
intend += 1
for i, (key, value) in enumerate(sorted(data.items())):
if is_list and not i:
result += pretty_print(
value, key=key, intend=intend, is_list=True, whitespace=whitespace
)
elif is_list:
result += pretty_print(
value, key=key, intend=intend + 1, whitespace=whitespace
)
else:
result += pretty_print(
value, key=key, intend=intend, whitespace=whitespace
)
else:
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}\n"
return result
def create_command(commands, concat=False):
result = {}
for name, command in commands.items():
if not concat:
result[name] = {}
for parameter, data in command.settings.items():
if data.typology == "enum":
value = data.values
elif data.typology == "range":
value = {"min": data.min, "max": data.max, "step": data.step}
else:
continue
if not concat:
result[name][parameter] = value
else:
result[f"{name}.{parameter}"] = value
return result

69
pyhon/hon.py Normal file
View file

@ -0,0 +1,69 @@
import asyncio
from types import TracebackType
from typing import List, Optional, Dict, Any, Type
from aiohttp import ClientSession
from typing_extensions import Self
from pyhon import HonAPI, exceptions
from pyhon.appliance import HonAppliance
class Hon:
def __init__(self, email: str, password: str, session: ClientSession | None = None):
self._email: str = email
self._password: str = password
self._session: ClientSession | None = session
self._appliances: List[HonAppliance] = []
self._api: Optional[HonAPI] = None
async def __aenter__(self) -> Self:
return await self.create()
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
await self.close()
@property
def api(self) -> HonAPI:
if self._api is None:
raise exceptions.NoAuthenticationException
return self._api
async def create(self) -> Self:
self._api = await HonAPI(
self._email, self._password, session=self._session
).create()
await self.setup()
return self
@property
def appliances(self) -> List[HonAppliance]:
return self._appliances
async def _create_appliance(self, appliance_data: Dict[str, Any], zone=0) -> None:
appliance = HonAppliance(self._api, appliance_data, zone=zone)
if appliance.mac_address is None:
return
await asyncio.gather(
*[
appliance.load_attributes(),
appliance.load_commands(),
appliance.load_statistics(),
]
)
self._appliances.append(appliance)
async def setup(self) -> None:
appliance: Dict
for appliance in (await self.api.load_appliances())["payload"]["appliances"]:
for zone in range(int(appliance.get("zone", "0"))):
await self._create_appliance(appliance.copy(), zone=zone + 1)
await self._create_appliance(appliance)
async def close(self) -> None:
await self.api.close()

View file

@ -1,141 +0,0 @@
class HonParameter:
def __init__(self, key, attributes):
self._key = key
self._category = attributes.get("category")
self._typology = attributes.get("typology")
self._mandatory = attributes.get("mandatory")
self._value = ""
@property
def key(self):
return self._key
@property
def value(self):
return self._value if self._value is not None else "0"
@property
def category(self):
return self._category
@property
def typology(self):
return self._typology
@property
def mandatory(self):
return self._mandatory
class HonParameterFixed(HonParameter):
def __init__(self, key, attributes):
super().__init__(key, attributes)
self._value = attributes.get("fixedValue", None)
def __repr__(self):
return f"{self.__class__} (<{self.key}> fixed)"
@property
def value(self):
return self._value if self._value is not None else "0"
@value.setter
def value(self, value):
if not value == self._value:
raise ValueError("Can't change fixed value")
class HonParameterRange(HonParameter):
def __init__(self, key, attributes):
super().__init__(key, attributes)
self._min = int(attributes["minimumValue"])
self._max = int(attributes["maximumValue"])
self._step = int(attributes["incrementValue"])
self._default = int(attributes.get("defaultValue", self._min))
self._value = self._default
def __repr__(self):
return f"{self.__class__} (<{self.key}> [{self._min} - {self._max}])"
@property
def min(self):
return self._min
@property
def max(self):
return self._max
@property
def step(self):
return self._step
@property
def value(self):
return self._value if self._value is not None else self._min
@value.setter
def value(self, value):
value = int(value)
if self._min <= value <= self._max and not value % self._step:
self._value = value
else:
raise ValueError(f"Allowed: min {self._min} max {self._max} step {self._step}")
class HonParameterEnum(HonParameter):
def __init__(self, key, attributes):
super().__init__(key, attributes)
self._default = attributes.get("defaultValue")
self._value = self._default or "0"
self._values = attributes.get("enumValues")
def __repr__(self):
return f"{self.__class__} (<{self.key}> {self.values})"
@property
def values(self):
return [str(value) for value in self._values]
@property
def value(self):
return self._value if self._value is not None else self.values[0]
@value.setter
def value(self, value):
if value in self.values:
self._value = value
else:
raise ValueError(f"Allowed values {self._value}")
class HonParameterProgram(HonParameterEnum):
def __init__(self, key, command):
super().__init__(key, {})
self._command = command
self._value = command._program
self._values = command._multi
self._typology = "enum"
self._filter = ""
@property
def value(self):
return self._value
@value.setter
def value(self, value):
if value in self.values:
self._command.set_program(value)
else:
raise ValueError(f"Allowed values {self._values}")
@property
def filter(self):
return self._filter
@filter.setter
def filter(self, filter):
self._filter = filter
@property
def values(self):
return sorted([str(value) for value in self._values if not self._filter or self._filter in str(value)])

View file

30
pyhon/parameter/base.py Normal file
View file

@ -0,0 +1,30 @@
from typing import Dict, Any
class HonParameter:
def __init__(self, key: str, attributes: Dict[str, Any]) -> None:
self._key = key
self._category: str = attributes.get("category", "")
self._typology: str = attributes.get("typology", "")
self._mandatory: int = attributes.get("mandatory", 0)
self._value: str | float = ""
@property
def key(self) -> str:
return self._key
@property
def value(self) -> str | float:
return self._value if self._value is not None else "0"
@property
def category(self) -> str:
return self._category
@property
def typology(self) -> str:
return self._typology
@property
def mandatory(self) -> int:
return self._mandatory

29
pyhon/parameter/enum.py Normal file
View file

@ -0,0 +1,29 @@
from typing import Dict, Any, List
from pyhon.parameter.base import HonParameter
class HonParameterEnum(HonParameter):
def __init__(self, key: str, attributes: Dict[str, Any]) -> None:
super().__init__(key, attributes)
self._default = attributes.get("defaultValue")
self._value = self._default or "0"
self._values: List[str] = attributes.get("enumValues", [])
def __repr__(self) -> str:
return f"{self.__class__} (<{self.key}> {self.values})"
@property
def values(self) -> List[str]:
return [str(value) for value in self._values]
@property
def value(self) -> str | float:
return self._value if self._value is not None else self.values[0]
@value.setter
def value(self, value: str) -> None:
if value in self.values:
self._value = value
else:
raise ValueError(f"Allowed values {self._value}")

21
pyhon/parameter/fixed.py Normal file
View file

@ -0,0 +1,21 @@
from typing import Dict, Any
from pyhon.parameter.base import HonParameter
class HonParameterFixed(HonParameter):
def __init__(self, key: str, attributes: Dict[str, Any]) -> None:
super().__init__(key, attributes)
self._value = attributes.get("fixedValue", None)
def __repr__(self) -> str:
return f"{self.__class__} (<{self.key}> fixed)"
@property
def value(self) -> str | float:
return self._value if self._value is not None else "0"
@value.setter
def value(self, value: str | float) -> None:
# Fixed values seems being not so fixed as thought
self._value = value

View file

@ -0,0 +1,33 @@
from typing import List, TYPE_CHECKING
from pyhon.parameter.enum import HonParameterEnum
if TYPE_CHECKING:
from pyhon.commands import HonCommand
class HonParameterProgram(HonParameterEnum):
_FILTER = ["iot_recipe", "iot_guided"]
def __init__(self, key: str, command: "HonCommand") -> None:
super().__init__(key, {})
self._command = command
self._value: str = command.program
self._values: List[str] = list(command.programs)
self._typology: str = "enum"
@property
def value(self) -> str | float:
return self._value
@value.setter
def value(self, value: str) -> None:
if value in self.values:
self._command.program = value
else:
raise ValueError(f"Allowed values {self._values}")
@property
def values(self) -> List[str]:
values = [v for v in self._values if all(f not in v for f in self._FILTER)]
return sorted(values)

49
pyhon/parameter/range.py Normal file
View file

@ -0,0 +1,49 @@
from typing import Dict, Any
from pyhon.parameter.base import HonParameter
def str_to_float(string: str | float) -> float:
try:
return int(string)
except ValueError:
return float(str(string).replace(",", "."))
class HonParameterRange(HonParameter):
def __init__(self, key: str, attributes: Dict[str, Any]) -> None:
super().__init__(key, attributes)
self._min: float = str_to_float(attributes["minimumValue"])
self._max: float = str_to_float(attributes["maximumValue"])
self._step: float = str_to_float(attributes["incrementValue"])
self._default: float = str_to_float(attributes.get("defaultValue", self._min))
self._value: float = self._default
def __repr__(self):
return f"{self.__class__} (<{self.key}> [{self._min} - {self._max}])"
@property
def min(self) -> float:
return self._min
@property
def max(self) -> float:
return self._max
@property
def step(self) -> float:
return self._step
@property
def value(self) -> float:
return self._value if self._value is not None else self._min
@value.setter
def value(self, value: float) -> None:
value = str_to_float(value)
if self._min <= value <= self._max and not value % self._step:
self._value = value
else:
raise ValueError(
f"Allowed: min {self._min} max {self._max} step {self._step}"
)

View file

@ -1 +1,2 @@
aiohttp
aiohttp==3.8.4
yarl==1.8.2

4
requirements_dev.txt Normal file
View file

@ -0,0 +1,4 @@
black==23.3.0
flake8==6.0.0
mypy==1.2.0
pylint==2.17.2

View file

@ -7,11 +7,11 @@ with open("README.md", "r") as f:
setup(
name="pyhOn",
version="0.5.0",
version="0.8.0b4",
author="Andre Basche",
description="Control hOn devices with python",
long_description=long_description,
long_description_content_type='text/markdown',
long_description_content_type="text/markdown",
project_urls={
"GitHub": "https://github.com/Andre0512/pyhOn",
"PyPI": "https://pypi.org/project/pyhOn",
@ -33,8 +33,8 @@ setup(
"Topic :: Software Development :: Libraries :: Python Modules",
],
entry_points={
'console_scripts': [
'pyhOn = pyhon.__main__:start',
"console_scripts": [
"pyhOn = pyhon.__main__:start",
]
}
},
)