Compare commits
38 commits
Author | SHA1 | Date | |
---|---|---|---|
|
327d4a1814 | ||
|
4dc60c29ee | ||
|
7bc9e718a0 | ||
|
d4e5793186 | ||
|
e9f2bb9f4f | ||
|
ea81e2891f | ||
|
c71f8f17f5 | ||
|
27d974abba | ||
|
ab18e45f97 | ||
|
e44b9b6773 | ||
|
5c650d722b | ||
|
6ae40481e3 | ||
|
ff8ae160bb | ||
|
658e80a8f4 | ||
|
ffba85bf0d | ||
|
10c8d961c4 | ||
|
61dd470588 | ||
|
1ed81c2a77 | ||
|
e4dc3cb1d0 | ||
|
2523069ce9 | ||
|
eeb458cb1b | ||
|
2764700bc7 | ||
|
e6c796e822 | ||
|
454f2d8916 | ||
|
59ca8b6caf | ||
|
44c55c681d | ||
|
cfee10df5f | ||
|
e0774677eb | ||
|
fc60d15e60 | ||
|
8ef8c0405d | ||
|
5575b240e1 | ||
|
22367242a2 | ||
|
4f7d4860db | ||
|
5a778373b6 | ||
|
e1c8bc5835 | ||
|
442e7a07dd | ||
|
b0e3b15ff0 | ||
|
2788a3ac91 |
36 changed files with 436 additions and 218 deletions
3
.flake8
Normal file
3
.flake8
Normal file
|
@ -0,0 +1,3 @@
|
|||
[flake8]
|
||||
max-line-length = 88
|
||||
max-complexity = 7
|
12
.github/workflows/python-check.yml
vendored
12
.github/workflows/python-check.yml
vendored
|
@ -13,7 +13,7 @@ jobs:
|
|||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
python-version: ["3.10", "3.11"]
|
||||
python-version: ["3.10", "3.11", "3.12"]
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
|
@ -28,15 +28,13 @@ jobs:
|
|||
python -m pip install -r requirements_dev.txt
|
||||
- name: Lint with flake8
|
||||
run: |
|
||||
# stop the build if there are Python syntax errors or undefined names
|
||||
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
|
||||
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=88 --statistics
|
||||
flake8 . --count --statistics
|
||||
- name: Type check with mypy
|
||||
run: |
|
||||
mypy pyhon/
|
||||
# - name: Analysing the code with pylint
|
||||
# run: |
|
||||
# pylint --max-line-length 88 $(git ls-files '*.py')
|
||||
- name: Analysing the code with pylint
|
||||
run: |
|
||||
pylint $(git ls-files '*.py')
|
||||
- name: Check black style
|
||||
run: |
|
||||
black . --check
|
||||
|
|
9
.pylintrc
Normal file
9
.pylintrc
Normal file
|
@ -0,0 +1,9 @@
|
|||
[MESSAGES CONTROL]
|
||||
|
||||
disable=missing-docstring
|
||||
|
||||
[FORMAT]
|
||||
|
||||
max-args=6
|
||||
max-attributes=8
|
||||
max-line-length=88
|
1
MANIFEST.in
Normal file
1
MANIFEST.in
Normal file
|
@ -0,0 +1 @@
|
|||
include pyhon/py.typed
|
11
README.md
11
README.md
|
@ -1,3 +1,14 @@
|
|||
# Announcement: I have to take the project down in the next few days
|
||||
> Dear User,
|
||||
>
|
||||
> We are writing to inform you that we have discovered two Home Assistant integration plug-ins developed by you ( https://github.com/Andre0512/hon and https://github.com/Andre0512/pyhOn ) that are in violation of our terms of service. Specifically, the plug-ins are using our services in an unauthorized manner which is causing significant economic harm to our Company.
|
||||
> We take the protection of our intellectual property very seriously and demand that you immediately cease and desist all illegal activities related to the development and distribution of these plug-ins. We also request that you remove the plug-ins from all stores and code hosting platforms where they are currently available.
|
||||
> Please be advised that we will take all necessary legal action to protect our interests if you fail to comply with this notice. We reserve the right to pursue all available remedies, including but not limited to monetary damages, injunctive relief, and attorney's fees.
|
||||
> We strongly urge you to take immediate action to rectify this situation and avoid any further legal action. If you have any questions or concerns, please do not hesitate to contact us.
|
||||
>
|
||||
> Haier Europe Security and Governance Department
|
||||
|
||||
|
||||
**This python package is unofficial and is not related in any way to Haier. It was developed by reversed engineered requests and can stop working at anytime!**
|
||||
|
||||
# pyhOn
|
||||
|
|
31
mypy.ini
31
mypy.ini
|
@ -1,9 +1,24 @@
|
|||
[mypy]
|
||||
check_untyped_defs = True
|
||||
disallow_any_generics = True
|
||||
disallow_untyped_defs = True
|
||||
disallow_any_unimported = True
|
||||
no_implicit_optional = True
|
||||
warn_return_any = True
|
||||
show_error_codes = True
|
||||
warn_unused_ignores = True
|
||||
check_untyped_defs = true
|
||||
disallow_any_generics = true
|
||||
disallow_any_unimported = true
|
||||
disallow_incomplete_defs = true
|
||||
disallow_subclassing_any = true
|
||||
disallow_untyped_calls = true
|
||||
disallow_untyped_decorators = true
|
||||
disallow_untyped_defs = true
|
||||
disable_error_code = annotation-unchecked
|
||||
enable_error_code = ignore-without-code, redundant-self, truthy-iterable
|
||||
follow_imports = silent
|
||||
local_partial_types = true
|
||||
no_implicit_optional = true
|
||||
no_implicit_reexport = true
|
||||
show_error_codes = true
|
||||
strict_concatenate = false
|
||||
strict_equality = true
|
||||
warn_incomplete_stub = true
|
||||
warn_redundant_casts = true
|
||||
warn_return_any = true
|
||||
warn_unreachable = true
|
||||
warn_unused_configs = true
|
||||
warn_unused_ignores = true
|
||||
|
|
|
@ -11,6 +11,7 @@ from typing import Tuple, Dict, Any
|
|||
if __name__ == "__main__":
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
# pylint: disable=wrong-import-position
|
||||
from pyhon import Hon, HonAPI, diagnose, printer
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
@ -30,13 +31,13 @@ def get_arguments() -> Dict[str, Any]:
|
|||
export.add_argument("--zip", help="create zip archive", action="store_true")
|
||||
export.add_argument("--anonymous", help="anonymize data", action="store_true")
|
||||
export.add_argument("directory", nargs="?", default=Path().cwd())
|
||||
translate = subparser.add_parser(
|
||||
translation = subparser.add_parser(
|
||||
"translate", help="print available translation keys"
|
||||
)
|
||||
translate.add_argument(
|
||||
translation.add_argument(
|
||||
"translate", help="language (de, en, fr...)", metavar="LANGUAGE"
|
||||
)
|
||||
translate.add_argument("--json", help="print as json", action="store_true")
|
||||
translation.add_argument("--json", help="print as json", action="store_true")
|
||||
parser.add_argument(
|
||||
"-i", "--import", help="import pyhon data", nargs="?", default=Path().cwd()
|
||||
)
|
||||
|
@ -91,11 +92,9 @@ async def main() -> None:
|
|||
data = device.data.copy()
|
||||
attr = "get" if args.get("all") else "pop"
|
||||
print(
|
||||
printer.key_print(
|
||||
data["attributes"].__getattribute__(attr)("parameters")
|
||||
)
|
||||
printer.key_print(getattr(data["attributes"], attr)("parameters"))
|
||||
)
|
||||
print(printer.key_print(data.__getattribute__(attr)("appliance")))
|
||||
print(printer.key_print(getattr(data, attr)("appliance")))
|
||||
print(printer.key_print(data))
|
||||
print(
|
||||
printer.pretty_print(
|
||||
|
|
|
@ -3,7 +3,7 @@ import logging
|
|||
import re
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
from typing import Optional, Dict, Any, TYPE_CHECKING, List
|
||||
from typing import Optional, Dict, Any, TYPE_CHECKING, List, TypeVar, overload
|
||||
|
||||
from pyhon import diagnose, exceptions
|
||||
from pyhon.appliances.base import ApplianceBase
|
||||
|
@ -20,7 +20,10 @@ if TYPE_CHECKING:
|
|||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
# pylint: disable=too-many-public-methods,too-many-instance-attributes
|
||||
class HonAppliance:
|
||||
_MINIMAL_UPDATE_INTERVAL = 5 # seconds
|
||||
|
||||
|
@ -48,24 +51,35 @@ class HonAppliance:
|
|||
except ModuleNotFoundError:
|
||||
self._extra = None
|
||||
|
||||
def _get_nested_item(self, item: str) -> Any:
|
||||
result: List[Any] | Dict[str, Any] = self.data
|
||||
for key in item.split("."):
|
||||
if all(k in "0123456789" for k in key) and isinstance(result, list):
|
||||
result = result[int(key)]
|
||||
elif isinstance(result, dict):
|
||||
result = result[key]
|
||||
return result
|
||||
|
||||
def __getitem__(self, item: str) -> Any:
|
||||
if self._zone:
|
||||
item += f"Z{self._zone}"
|
||||
if "." in item:
|
||||
result = self.data
|
||||
for key in item.split("."):
|
||||
if all(k in "0123456789" for k in key) and isinstance(result, list):
|
||||
result = result[int(key)]
|
||||
else:
|
||||
result = result[key]
|
||||
return result
|
||||
return self._get_nested_item(item)
|
||||
if item in self.data:
|
||||
return self.data[item]
|
||||
if item in self.attributes["parameters"]:
|
||||
return self.attributes["parameters"][item].value
|
||||
return self.info[item]
|
||||
|
||||
def get(self, item: str, default: Any = None) -> Any:
|
||||
@overload
|
||||
def get(self, item: str, default: None = None) -> Any:
|
||||
...
|
||||
|
||||
@overload
|
||||
def get(self, item: str, default: T) -> T:
|
||||
...
|
||||
|
||||
def get(self, item: str, default: Optional[T] = None) -> Any:
|
||||
try:
|
||||
return self[item]
|
||||
except (KeyError, IndexError):
|
||||
|
@ -80,15 +94,15 @@ class HonAppliance:
|
|||
|
||||
@property
|
||||
def appliance_model_id(self) -> str:
|
||||
return self._info.get("applianceModelId", "")
|
||||
return str(self._info.get("applianceModelId", ""))
|
||||
|
||||
@property
|
||||
def appliance_type(self) -> str:
|
||||
return self._info.get("applianceTypeName", "")
|
||||
return str(self._info.get("applianceTypeName", ""))
|
||||
|
||||
@property
|
||||
def mac_address(self) -> str:
|
||||
return self.info.get("macAddress", "")
|
||||
return str(self.info.get("macAddress", ""))
|
||||
|
||||
@property
|
||||
def unique_id(self) -> str:
|
||||
|
@ -124,11 +138,11 @@ class HonAppliance:
|
|||
|
||||
@property
|
||||
def model_id(self) -> int:
|
||||
return self._info.get("applianceModelId", 0)
|
||||
return int(self._info.get("applianceModelId", 0))
|
||||
|
||||
@property
|
||||
def options(self) -> Dict[str, Any]:
|
||||
return self._appliance_model.get("options", {})
|
||||
return dict(self._appliance_model.get("options", {}))
|
||||
|
||||
@property
|
||||
def commands(self) -> Dict[str, HonCommand]:
|
||||
|
@ -188,12 +202,8 @@ class HonAppliance:
|
|||
|
||||
async def update(self, force: bool = False) -> None:
|
||||
now = datetime.now()
|
||||
if (
|
||||
force
|
||||
or not self._last_update
|
||||
or self._last_update
|
||||
< now - timedelta(seconds=self._MINIMAL_UPDATE_INTERVAL)
|
||||
):
|
||||
min_age = now - timedelta(seconds=self._MINIMAL_UPDATE_INTERVAL)
|
||||
if force or not self._last_update or self._last_update < min_age:
|
||||
self._last_update = now
|
||||
await self.load_attributes()
|
||||
self.sync_params_to_command("settings")
|
||||
|
@ -267,25 +277,40 @@ class HonAppliance:
|
|||
_LOGGER.info("Can't set %s - %s", key, error)
|
||||
continue
|
||||
|
||||
def sync_command(self, main: str, target: Optional[List[str] | str] = None) -> None:
|
||||
def sync_command(
|
||||
self,
|
||||
main: str,
|
||||
target: Optional[List[str] | str] = None,
|
||||
to_sync: Optional[List[str] | bool] = None,
|
||||
) -> None:
|
||||
base: Optional[HonCommand] = self.commands.get(main)
|
||||
if not base:
|
||||
return
|
||||
for command, data in self.commands.items():
|
||||
if command == main or target and command not in target:
|
||||
continue
|
||||
for name, parameter in data.parameters.items():
|
||||
if base_value := base.parameters.get(name):
|
||||
if isinstance(base_value, HonParameterRange) and isinstance(
|
||||
parameter, HonParameterRange
|
||||
):
|
||||
parameter.max = base_value.max
|
||||
parameter.min = base_value.min
|
||||
parameter.step = base_value.step
|
||||
elif isinstance(parameter, HonParameterRange):
|
||||
parameter.max = int(base_value.value)
|
||||
parameter.min = int(base_value.value)
|
||||
parameter.step = 1
|
||||
elif isinstance(parameter, HonParameterEnum):
|
||||
parameter.values = base_value.values
|
||||
parameter.value = base_value.value
|
||||
|
||||
for name, target_param in data.parameters.items():
|
||||
if not (base_param := base.parameters.get(name)):
|
||||
continue
|
||||
if to_sync and (
|
||||
(isinstance(to_sync, list) and name not in to_sync)
|
||||
or not base_param.mandatory
|
||||
):
|
||||
continue
|
||||
self.sync_parameter(base_param, target_param)
|
||||
|
||||
def sync_parameter(self, main: Parameter, target: Parameter) -> None:
|
||||
if isinstance(main, HonParameterRange) and isinstance(
|
||||
target, HonParameterRange
|
||||
):
|
||||
target.max = main.max
|
||||
target.min = main.min
|
||||
target.step = main.step
|
||||
elif isinstance(target, HonParameterRange):
|
||||
target.max = int(main.value)
|
||||
target.min = int(main.value)
|
||||
target.step = 1
|
||||
elif isinstance(target, HonParameterEnum):
|
||||
target.values = main.values
|
||||
target.value = main.value
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
# pylint: disable=duplicate-code
|
||||
from typing import Any, Dict
|
||||
|
||||
from pyhon.appliances.base import ApplianceBase
|
||||
|
|
|
@ -1,17 +1,16 @@
|
|||
from typing import Any, Dict
|
||||
|
||||
from pyhon.appliances.base import ApplianceBase
|
||||
from pyhon.parameter.program import HonParameterProgram
|
||||
|
||||
|
||||
class Appliance(ApplianceBase):
|
||||
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
data = super().attributes(data)
|
||||
if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED":
|
||||
data["parameters"]["temp"].value = "0"
|
||||
data["parameters"]["onOffStatus"].value = "0"
|
||||
data["parameters"]["remoteCtrValid"].value = "0"
|
||||
data["parameters"]["remainingTimeMM"].value = "0"
|
||||
data["parameters"]["temp"].value = 0
|
||||
data["parameters"]["onOffStatus"].value = 0
|
||||
data["parameters"]["remoteCtrValid"].value = 0
|
||||
data["parameters"]["remainingTimeMM"].value = 0
|
||||
|
||||
data["active"] = data["parameters"]["onOffStatus"] == "1"
|
||||
data["active"] = data["parameters"]["onOffStatus"].value == 1
|
||||
return data
|
||||
|
|
|
@ -10,12 +10,12 @@ class Appliance(ApplianceBase):
|
|||
data["modeZ1"] = "holiday"
|
||||
elif data["parameters"]["intelligenceMode"] == "1":
|
||||
data["modeZ1"] = "auto_set"
|
||||
elif data["parameters"]["quickModeZ1"] == "1":
|
||||
elif data["parameters"].get("quickModeZ1") == "1":
|
||||
data["modeZ1"] = "super_cool"
|
||||
else:
|
||||
data["modeZ1"] = "no_mode"
|
||||
|
||||
if data["parameters"]["quickModeZ2"] == "1":
|
||||
if data["parameters"].get("quickModeZ2") == "1":
|
||||
data["modeZ2"] = "super_freeze"
|
||||
elif data["parameters"]["intelligenceMode"] == "1":
|
||||
data["modeZ2"] = "auto_set"
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
# pylint: disable=duplicate-code
|
||||
from typing import Any, Dict
|
||||
|
||||
from pyhon.appliances.base import ApplianceBase
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
# pylint: disable=duplicate-code
|
||||
from typing import Dict, Any
|
||||
|
||||
from pyhon.appliances.base import ApplianceBase
|
||||
|
|
16
pyhon/appliances/wh.py
Normal file
16
pyhon/appliances/wh.py
Normal file
|
@ -0,0 +1,16 @@
|
|||
from typing import Any, Dict
|
||||
|
||||
from pyhon.appliances.base import ApplianceBase
|
||||
from pyhon.parameter.base import HonParameter
|
||||
|
||||
|
||||
class Appliance(ApplianceBase):
|
||||
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
data = super().attributes(data)
|
||||
parameter = data.get("parameters", {}).get("onOffStatus")
|
||||
is_class = isinstance(parameter, HonParameter)
|
||||
data["active"] = parameter.value == 1 if is_class else parameter == 1
|
||||
return data
|
||||
|
||||
def settings(self, settings: Dict[str, Any]) -> Dict[str, Any]:
|
||||
return settings
|
|
@ -1,3 +1,4 @@
|
|||
# pylint: disable=duplicate-code
|
||||
from typing import Any, Dict
|
||||
|
||||
from pyhon.appliances.base import ApplianceBase
|
||||
|
|
|
@ -1,14 +1,15 @@
|
|||
import asyncio
|
||||
from contextlib import suppress
|
||||
from copy import copy
|
||||
from typing import Dict, Any, Optional, TYPE_CHECKING, List, Collection
|
||||
from typing import Dict, Any, Optional, TYPE_CHECKING, List
|
||||
|
||||
from pyhon.commands import HonCommand
|
||||
from pyhon.exceptions import NoAuthenticationException
|
||||
from pyhon.parameter.fixed import HonParameterFixed
|
||||
from pyhon.parameter.program import HonParameterProgram
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pyhon import HonAPI, exceptions
|
||||
from pyhon import HonAPI
|
||||
from pyhon.appliance import HonAppliance
|
||||
|
||||
|
||||
|
@ -16,12 +17,12 @@ class HonCommandLoader:
|
|||
"""Loads and parses hOn command data"""
|
||||
|
||||
def __init__(self, api: "HonAPI", appliance: "HonAppliance") -> None:
|
||||
self._api: "HonAPI" = api
|
||||
self._appliance: "HonAppliance" = appliance
|
||||
self._api_commands: Dict[str, Any] = {}
|
||||
self._favourites: List[Dict[str, Any]] = []
|
||||
self._command_history: List[Dict[str, Any]] = []
|
||||
self._commands: Dict[str, HonCommand] = {}
|
||||
self._api: "HonAPI" = api
|
||||
self._appliance: "HonAppliance" = appliance
|
||||
self._appliance_data: Dict[str, Any] = {}
|
||||
self._additional_data: Dict[str, Any] = {}
|
||||
|
||||
|
@ -29,7 +30,7 @@ class HonCommandLoader:
|
|||
def api(self) -> "HonAPI":
|
||||
"""api connection object"""
|
||||
if self._api is None:
|
||||
raise exceptions.NoAuthenticationException("Missing hOn login")
|
||||
raise NoAuthenticationException("Missing hOn login")
|
||||
return self._api
|
||||
|
||||
@property
|
||||
|
@ -183,23 +184,44 @@ class HonCommandLoader:
|
|||
def _add_favourites(self) -> None:
|
||||
"""Patch program categories with favourites"""
|
||||
for favourite in self._favourites:
|
||||
name = favourite.get("favouriteName", {})
|
||||
command = favourite.get("command", {})
|
||||
command_name = command.get("commandName", "")
|
||||
program_name = self._clean_name(command.get("programName", ""))
|
||||
if not (base := self.commands[command_name].categories.get(program_name)):
|
||||
name, command_name, base = self._get_favourite_info(favourite)
|
||||
if not base:
|
||||
continue
|
||||
base_command: HonCommand = copy(base)
|
||||
for data in command.values():
|
||||
if isinstance(data, str):
|
||||
self._update_base_command_with_data(base_command, favourite)
|
||||
self._update_base_command_with_favourite(base_command)
|
||||
self._update_program_categories(command_name, name, base_command)
|
||||
|
||||
def _get_favourite_info(
|
||||
self, favourite: Dict[str, Any]
|
||||
) -> tuple[str, str, HonCommand | None]:
|
||||
name: str = favourite.get("favouriteName", {})
|
||||
command = favourite.get("command", {})
|
||||
command_name: str = command.get("commandName", "")
|
||||
program_name = self._clean_name(command.get("programName", ""))
|
||||
base_command = self.commands[command_name].categories.get(program_name)
|
||||
return name, command_name, base_command
|
||||
|
||||
def _update_base_command_with_data(
|
||||
self, base_command: HonCommand, command: Dict[str, Any]
|
||||
) -> None:
|
||||
for data in command.values():
|
||||
if isinstance(data, str):
|
||||
continue
|
||||
for key, value in data.items():
|
||||
if not (parameter := base_command.parameters.get(key)):
|
||||
continue
|
||||
for key, value in data.items():
|
||||
if parameter := base_command.parameters.get(key):
|
||||
with suppress(ValueError):
|
||||
parameter.value = value
|
||||
extra_param = HonParameterFixed("favourite", {"fixedValue": "1"}, "custom")
|
||||
base_command.parameters.update(favourite=extra_param)
|
||||
program = base_command.parameters["program"]
|
||||
if isinstance(program, HonParameterProgram):
|
||||
program.set_value(name)
|
||||
self.commands[command_name].categories[name] = base_command
|
||||
with suppress(ValueError):
|
||||
parameter.value = value
|
||||
|
||||
def _update_base_command_with_favourite(self, base_command: HonCommand) -> None:
|
||||
extra_param = HonParameterFixed("favourite", {"fixedValue": "1"}, "custom")
|
||||
base_command.parameters.update(favourite=extra_param)
|
||||
|
||||
def _update_program_categories(
|
||||
self, command_name: str, name: str, base_command: HonCommand
|
||||
) -> None:
|
||||
program = base_command.parameters["program"]
|
||||
if isinstance(program, HonParameterProgram):
|
||||
program.set_value(name)
|
||||
self.commands[command_name].categories[name] = base_command
|
||||
|
|
|
@ -27,17 +27,16 @@ class HonCommand:
|
|||
categories: Optional[Dict[str, "HonCommand"]] = None,
|
||||
category_name: str = "",
|
||||
):
|
||||
self._api: Optional[HonAPI] = appliance.api
|
||||
self._appliance: "HonAppliance" = appliance
|
||||
self._name: str = name
|
||||
self._api: Optional[HonAPI] = None
|
||||
self._appliance: "HonAppliance" = appliance
|
||||
self._categories: Optional[Dict[str, "HonCommand"]] = categories
|
||||
self._category_name: str = category_name
|
||||
self._description: str = attributes.pop("description", "")
|
||||
self._protocol_type: str = attributes.pop("protocolType", "")
|
||||
self._parameters: Dict[str, HonParameter] = {}
|
||||
self._parameters: Dict[str, Parameter] = {}
|
||||
self._data: Dict[str, Any] = {}
|
||||
self._available_settings: Dict[str, HonParameter] = {}
|
||||
self._rules: List[HonRuleSet] = []
|
||||
attributes.pop("description", "")
|
||||
attributes.pop("protocolType", "")
|
||||
self._load_parameters(attributes)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
|
@ -49,6 +48,8 @@ class HonCommand:
|
|||
|
||||
@property
|
||||
def api(self) -> "HonAPI":
|
||||
if self._api is None and self._appliance:
|
||||
self._api = self._appliance.api
|
||||
if self._api is None:
|
||||
raise exceptions.NoAuthenticationException("Missing hOn login")
|
||||
return self._api
|
||||
|
@ -76,12 +77,23 @@ class HonCommand:
|
|||
result.setdefault(parameter.group, {})[name] = parameter.intern_value
|
||||
return result
|
||||
|
||||
@property
|
||||
def mandatory_parameter_groups(self) -> Dict[str, Dict[str, Union[str, float]]]:
|
||||
result: Dict[str, Dict[str, Union[str, float]]] = {}
|
||||
for name, parameter in self._parameters.items():
|
||||
if parameter.mandatory:
|
||||
result.setdefault(parameter.group, {})[name] = parameter.intern_value
|
||||
return result
|
||||
|
||||
@property
|
||||
def parameter_value(self) -> Dict[str, Union[str, float]]:
|
||||
return {n: p.value for n, p in self._parameters.items()}
|
||||
|
||||
def _load_parameters(self, attributes: Dict[str, Dict[str, Any]]) -> None:
|
||||
def _load_parameters(self, attributes: Dict[str, Dict[str, Any] | Any]) -> None:
|
||||
for key, items in attributes.items():
|
||||
if not isinstance(items, dict):
|
||||
_LOGGER.info("Loading Attributes - Skipping %s", str(items))
|
||||
continue
|
||||
for name, data in items.items():
|
||||
self._create_parameters(data, name, key)
|
||||
for rule in self._rules:
|
||||
|
@ -93,10 +105,12 @@ class HonCommand:
|
|||
if name == "zoneMap" and self._appliance.zone:
|
||||
data["default"] = self._appliance.zone
|
||||
if data.get("category") == "rule":
|
||||
if "fixedValue" not in data:
|
||||
_LOGGER.error("Rule not supported: %s", data)
|
||||
else:
|
||||
if "fixedValue" in data:
|
||||
self._rules.append(HonRuleSet(self, data["fixedValue"]))
|
||||
elif "enumValues" in data:
|
||||
self._rules.append(HonRuleSet(self, data["enumValues"]))
|
||||
else:
|
||||
_LOGGER.warning("Rule not supported: %s", data)
|
||||
match data.get("typology"):
|
||||
case "range":
|
||||
self._parameters[name] = HonParameterRange(name, data, parameter)
|
||||
|
@ -111,14 +125,33 @@ class HonCommand:
|
|||
name = "program" if "PROGRAM" in self._category_name else "category"
|
||||
self._parameters[name] = HonParameterProgram(name, self, "custom")
|
||||
|
||||
async def send(self) -> bool:
|
||||
params = self.parameter_groups.get("parameters", {})
|
||||
async def send(self, only_mandatory: bool = False) -> bool:
|
||||
grouped_params = (
|
||||
self.mandatory_parameter_groups if only_mandatory else self.parameter_groups
|
||||
)
|
||||
params = grouped_params.get("parameters", {})
|
||||
return await self.send_parameters(params)
|
||||
|
||||
async def send_specific(self, param_names: List[str]) -> bool:
|
||||
params: Dict[str, str | float] = {}
|
||||
for key, parameter in self._parameters.items():
|
||||
if key in param_names or parameter.mandatory:
|
||||
params[key] = parameter.value
|
||||
return await self.send_parameters(params)
|
||||
|
||||
async def send_parameters(self, params: Dict[str, str | float]) -> bool:
|
||||
ancillary_params = self.parameter_groups.get("ancillaryParameters", {})
|
||||
ancillary_params.pop("programRules", None)
|
||||
if "prStr" in params:
|
||||
params["prStr"] = self._category_name.upper()
|
||||
self.appliance.sync_command_to_params(self.name)
|
||||
try:
|
||||
result = await self.api.send_command(
|
||||
self._appliance, self._name, params, ancillary_params
|
||||
self._appliance,
|
||||
self._name,
|
||||
params,
|
||||
ancillary_params,
|
||||
self._category_name,
|
||||
)
|
||||
if not result:
|
||||
_LOGGER.error(result)
|
||||
|
@ -169,3 +202,7 @@ class HonCommand:
|
|||
else:
|
||||
result[name] = parameter
|
||||
return result
|
||||
|
||||
def reset(self) -> None:
|
||||
for parameter in self._parameters.values():
|
||||
parameter.reset()
|
||||
|
|
|
@ -190,6 +190,7 @@ class HonAPI:
|
|||
command: str,
|
||||
parameters: Dict[str, Any],
|
||||
ancillary_parameters: Dict[str, Any],
|
||||
program_name: str = "",
|
||||
) -> bool:
|
||||
now: str = datetime.utcnow().isoformat()
|
||||
data: Dict[str, Any] = {
|
||||
|
@ -208,6 +209,8 @@ class HonAPI:
|
|||
"parameters": parameters,
|
||||
"applianceType": appliance.appliance_type,
|
||||
}
|
||||
if command == "startProgram" and program_name:
|
||||
data.update({"programName": program_name.upper()})
|
||||
url: str = f"{const.API_URL}/commands/v1/send"
|
||||
async with self._hon.post(url, json=data) as response:
|
||||
json_data: Dict[str, Any] = await response.json()
|
||||
|
@ -278,10 +281,12 @@ class TestAPI(HonAPI):
|
|||
async def load_appliances(self) -> List[Dict[str, Any]]:
|
||||
result = []
|
||||
for appliance in self._path.glob("*/"):
|
||||
with open(
|
||||
appliance / "appliance_data.json", "r", encoding="utf-8"
|
||||
) as json_file:
|
||||
result.append(json.loads(json_file.read()))
|
||||
file = appliance / "appliance_data.json"
|
||||
with open(file, "r", encoding="utf-8") as json_file:
|
||||
try:
|
||||
result.append(json.loads(json_file.read()))
|
||||
except json.decoder.JSONDecodeError as error:
|
||||
_LOGGER.error("%s - %s", str(file), error)
|
||||
return result
|
||||
|
||||
async def load_commands(self, appliance: HonAppliance) -> Dict[str, Any]:
|
||||
|
@ -317,5 +322,12 @@ class TestAPI(HonAPI):
|
|||
command: str,
|
||||
parameters: Dict[str, Any],
|
||||
ancillary_parameters: Dict[str, Any],
|
||||
program_name: str = "",
|
||||
) -> bool:
|
||||
_LOGGER.info(
|
||||
"%s - %s - %s",
|
||||
str(parameters),
|
||||
str(ancillary_parameters),
|
||||
str(program_name),
|
||||
)
|
||||
return True
|
||||
|
|
|
@ -30,6 +30,14 @@ class HonLoginData:
|
|||
loaded: Optional[Dict[str, Any]] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class HonAuthData:
|
||||
access_token: str = ""
|
||||
refresh_token: str = ""
|
||||
cognito_token: str = ""
|
||||
id_token: str = ""
|
||||
|
||||
|
||||
class HonAuth:
|
||||
_TOKEN_EXPIRES_AFTER_HOURS = 8
|
||||
_TOKEN_EXPIRE_WARNING_HOURS = 7
|
||||
|
@ -46,28 +54,25 @@ class HonAuth:
|
|||
self._login_data = HonLoginData()
|
||||
self._login_data.email = email
|
||||
self._login_data.password = password
|
||||
self._access_token = ""
|
||||
self._refresh_token = ""
|
||||
self._cognito_token = ""
|
||||
self._id_token = ""
|
||||
self._device = device
|
||||
self._expires: datetime = datetime.utcnow()
|
||||
self._auth = HonAuthData()
|
||||
|
||||
@property
|
||||
def cognito_token(self) -> str:
|
||||
return self._cognito_token
|
||||
return self._auth.cognito_token
|
||||
|
||||
@property
|
||||
def id_token(self) -> str:
|
||||
return self._id_token
|
||||
return self._auth.id_token
|
||||
|
||||
@property
|
||||
def access_token(self) -> str:
|
||||
return self._access_token
|
||||
return self._auth.access_token
|
||||
|
||||
@property
|
||||
def refresh_token(self) -> str:
|
||||
return self._refresh_token
|
||||
return self._auth.refresh_token
|
||||
|
||||
def _check_token_expiration(self, hours: int) -> bool:
|
||||
return datetime.utcnow() >= self._expires + timedelta(hours=hours)
|
||||
|
@ -192,12 +197,12 @@ class HonAuth:
|
|||
|
||||
def _parse_token_data(self, text: str) -> bool:
|
||||
if access_token := re.findall("access_token=(.*?)&", text):
|
||||
self._access_token = access_token[0]
|
||||
self._auth.access_token = access_token[0]
|
||||
if refresh_token := re.findall("refresh_token=(.*?)&", text):
|
||||
self._refresh_token = refresh_token[0]
|
||||
self._auth.refresh_token = refresh_token[0]
|
||||
if id_token := re.findall("id_token=(.*?)&", text):
|
||||
self._id_token = id_token[0]
|
||||
return True if access_token and refresh_token and id_token else False
|
||||
self._auth.id_token = id_token[0]
|
||||
return bool(access_token and refresh_token and id_token)
|
||||
|
||||
async def _get_token(self, url: str) -> bool:
|
||||
async with self._request.get(url) as response:
|
||||
|
@ -229,7 +234,7 @@ class HonAuth:
|
|||
return True
|
||||
|
||||
async def _api_auth(self) -> bool:
|
||||
post_headers = {"id-token": self._id_token}
|
||||
post_headers = {"id-token": self._auth.id_token}
|
||||
data = self._device.get()
|
||||
async with self._request.post(
|
||||
f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data
|
||||
|
@ -239,8 +244,8 @@ class HonAuth:
|
|||
except json.JSONDecodeError:
|
||||
await self._error_logger(response)
|
||||
return False
|
||||
self._cognito_token = json_data.get("cognitoUser", {}).get("Token", "")
|
||||
if not self._cognito_token:
|
||||
self._auth.cognito_token = json_data.get("cognitoUser", {}).get("Token", "")
|
||||
if not self._auth.cognito_token:
|
||||
_LOGGER.error(json_data)
|
||||
raise exceptions.HonAuthenticationError()
|
||||
return True
|
||||
|
@ -262,7 +267,7 @@ class HonAuth:
|
|||
async def refresh(self) -> bool:
|
||||
params = {
|
||||
"client_id": const.CLIENT_ID,
|
||||
"refresh_token": self._refresh_token,
|
||||
"refresh_token": self._auth.refresh_token,
|
||||
"grant_type": "refresh_token",
|
||||
}
|
||||
async with self._request.post(
|
||||
|
@ -273,14 +278,14 @@ class HonAuth:
|
|||
return False
|
||||
data = await response.json()
|
||||
self._expires = datetime.utcnow()
|
||||
self._id_token = data["id_token"]
|
||||
self._access_token = data["access_token"]
|
||||
self._auth.id_token = data["id_token"]
|
||||
self._auth.access_token = data["access_token"]
|
||||
return await self._api_auth()
|
||||
|
||||
def clear(self) -> None:
|
||||
self._session.cookie_jar.clear_domain(const.AUTH_API.split("/")[-2])
|
||||
self._request.called_urls = []
|
||||
self._cognito_token = ""
|
||||
self._id_token = ""
|
||||
self._access_token = ""
|
||||
self._refresh_token = ""
|
||||
self._auth.cognito_token = ""
|
||||
self._auth.id_token = ""
|
||||
self._auth.access_token = ""
|
||||
self._auth.refresh_token = ""
|
||||
|
|
|
@ -21,7 +21,7 @@ class HonDevice:
|
|||
return self._os_version
|
||||
|
||||
@property
|
||||
def os(self) -> str:
|
||||
def os_type(self) -> str:
|
||||
return self._os
|
||||
|
||||
@property
|
||||
|
@ -36,7 +36,7 @@ class HonDevice:
|
|||
result: Dict[str, str | int] = {
|
||||
"appVersion": self.app_version,
|
||||
"mobileId": self.mobile_id,
|
||||
"os": self.os,
|
||||
"os": self.os_type,
|
||||
"osVersion": self.os_version,
|
||||
"deviceModel": self.device_model,
|
||||
}
|
||||
|
|
|
@ -2,7 +2,7 @@ import logging
|
|||
from collections.abc import AsyncIterator
|
||||
from contextlib import asynccontextmanager
|
||||
from types import TracebackType
|
||||
from typing import Optional, Dict, Type, Any, Protocol
|
||||
from typing import Optional, Dict, Type, Any
|
||||
|
||||
import aiohttp
|
||||
from typing_extensions import Self
|
||||
|
@ -47,10 +47,11 @@ class ConnectionHandler:
|
|||
return self
|
||||
|
||||
@asynccontextmanager
|
||||
def _intercept(
|
||||
async def _intercept(
|
||||
self, method: Callback, url: str | URL, *args: Any, **kwargs: Dict[str, Any]
|
||||
) -> AsyncIterator[aiohttp.ClientResponse]:
|
||||
raise NotImplementedError
|
||||
async with method(url, *args, **kwargs) as response:
|
||||
yield response
|
||||
|
||||
@asynccontextmanager
|
||||
async def get(
|
||||
|
@ -59,7 +60,8 @@ class ConnectionHandler:
|
|||
if self._session is None:
|
||||
raise exceptions.NoSessionException()
|
||||
response: aiohttp.ClientResponse
|
||||
async with self._intercept(self._session.get, *args, **kwargs) as response: # type: ignore[arg-type]
|
||||
args = self._session.get, *args
|
||||
async with self._intercept(*args, **kwargs) as response:
|
||||
yield response
|
||||
|
||||
@asynccontextmanager
|
||||
|
@ -69,7 +71,8 @@ class ConnectionHandler:
|
|||
if self._session is None:
|
||||
raise exceptions.NoSessionException()
|
||||
response: aiohttp.ClientResponse
|
||||
async with self._intercept(self._session.post, *args, **kwargs) as response: # type: ignore[arg-type]
|
||||
args = self._session.post, *args
|
||||
async with self._intercept(*args, **kwargs) as response:
|
||||
yield response
|
||||
|
||||
async def close(self) -> None:
|
||||
|
|
|
@ -95,11 +95,11 @@ class HonConnectionHandler(ConnectionHandler):
|
|||
try:
|
||||
await response.json()
|
||||
yield response
|
||||
except json.JSONDecodeError:
|
||||
except json.JSONDecodeError as exc:
|
||||
_LOGGER.warning(
|
||||
"%s - JsonDecodeError %s - %s",
|
||||
response.request_info.url,
|
||||
response.status,
|
||||
await response.text(),
|
||||
)
|
||||
raise HonAuthenticationError("Decode Error")
|
||||
raise HonAuthenticationError("Decode Error") from exc
|
||||
|
|
|
@ -2,9 +2,11 @@ AUTH_API = "https://account2.hon-smarthome.com"
|
|||
API_URL = "https://api-iot.he.services"
|
||||
API_KEY = "GRCqFhC6Gk@ikWXm1RmnSmX1cm,MxY-configuration"
|
||||
APP = "hon"
|
||||
# All seen id's (different accounts, different devices) are the same, so I guess this hash is static
|
||||
CLIENT_ID = "3MVG9QDx8IX8nP5T2Ha8ofvlmjLZl5L_gvfbT9.HJvpHGKoAS_dcMN8LYpTSYeVFCraUnV.2Ag1Ki7m4znVO6"
|
||||
APP_VERSION = "2.1.2"
|
||||
CLIENT_ID = (
|
||||
"3MVG9QDx8IX8nP5T2Ha8ofvlmjLZl5L_gvfbT9."
|
||||
"HJvpHGKoAS_dcMN8LYpTSYeVFCraUnV.2Ag1Ki7m4znVO6"
|
||||
)
|
||||
APP_VERSION = "2.4.7"
|
||||
OS_VERSION = 31
|
||||
OS = "android"
|
||||
DEVICE_MODEL = "exynos9820"
|
||||
|
|
|
@ -7,9 +7,10 @@ from typing import List, Optional, Dict, Any, Type
|
|||
from aiohttp import ClientSession
|
||||
from typing_extensions import Self
|
||||
|
||||
from pyhon import HonAPI, exceptions
|
||||
from pyhon.appliance import HonAppliance
|
||||
from pyhon.connection.api import HonAPI
|
||||
from pyhon.connection.api import TestAPI
|
||||
from pyhon.exceptions import NoAuthenticationException
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
@ -43,7 +44,7 @@ class Hon:
|
|||
@property
|
||||
def api(self) -> HonAPI:
|
||||
if self._api is None:
|
||||
raise exceptions.NoAuthenticationException
|
||||
raise NoAuthenticationException
|
||||
return self._api
|
||||
|
||||
@property
|
||||
|
|
|
@ -7,14 +7,21 @@ if TYPE_CHECKING:
|
|||
class HonParameter:
|
||||
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
|
||||
self._key = key
|
||||
self._category: str = attributes.get("category", "")
|
||||
self._typology: str = attributes.get("typology", "")
|
||||
self._mandatory: int = attributes.get("mandatory", 0)
|
||||
self._attributes = attributes
|
||||
self._category: str = ""
|
||||
self._typology: str = ""
|
||||
self._mandatory: int = 0
|
||||
self._value: str | float = ""
|
||||
self._group: str = group
|
||||
self._triggers: Dict[
|
||||
str, List[Tuple[Callable[["HonRule"], None], "HonRule"]]
|
||||
] = {}
|
||||
self._set_attributes()
|
||||
|
||||
def _set_attributes(self) -> None:
|
||||
self._category = self._attributes.get("category", "")
|
||||
self._typology = self._attributes.get("typology", "")
|
||||
self._mandatory = self._attributes.get("mandatory", 0)
|
||||
|
||||
@property
|
||||
def key(self) -> str:
|
||||
|
@ -61,8 +68,9 @@ class HonParameter:
|
|||
self._triggers.setdefault(value, []).append((func, data))
|
||||
|
||||
def check_trigger(self, value: str | float) -> None:
|
||||
if str(value) in self._triggers:
|
||||
for trigger in self._triggers[str(value)]:
|
||||
triggers = {str(k).lower(): v for k, v in self._triggers.items()}
|
||||
if str(value).lower() in triggers:
|
||||
for trigger in triggers[str(value)]:
|
||||
func, args = trigger
|
||||
func(args)
|
||||
|
||||
|
@ -85,3 +93,6 @@ class HonParameter:
|
|||
param[rule.param_key] = rule.param_data.get("defaultValue", "")
|
||||
|
||||
return result
|
||||
|
||||
def reset(self) -> None:
|
||||
self._set_attributes()
|
||||
|
|
|
@ -10,12 +10,19 @@ def clean_value(value: str | float) -> str:
|
|||
class HonParameterEnum(HonParameter):
|
||||
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
|
||||
super().__init__(key, attributes, group)
|
||||
self._default = attributes.get("defaultValue")
|
||||
self._value = self._default or "0"
|
||||
self._values: List[str] = attributes.get("enumValues", [])
|
||||
self._default: str | float = ""
|
||||
self._value: str | float = ""
|
||||
self._values: List[str] = []
|
||||
self._set_attributes()
|
||||
if self._default and clean_value(self._default.strip("[]")) not in self.values:
|
||||
self._values.append(self._default)
|
||||
|
||||
def _set_attributes(self) -> None:
|
||||
super()._set_attributes()
|
||||
self._default = self._attributes.get("defaultValue", "")
|
||||
self._value = self._default or "0"
|
||||
self._values = self._attributes.get("enumValues", [])
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"{self.__class__} (<{self.key}> {self.values})"
|
||||
|
||||
|
|
|
@ -6,14 +6,19 @@ from pyhon.parameter.base import HonParameter
|
|||
class HonParameterFixed(HonParameter):
|
||||
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
|
||||
super().__init__(key, attributes, group)
|
||||
self._value = attributes.get("fixedValue", None)
|
||||
self._value: str | float = ""
|
||||
self._set_attributes()
|
||||
|
||||
def _set_attributes(self) -> None:
|
||||
super()._set_attributes()
|
||||
self._value = self._attributes.get("fixedValue", "")
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"{self.__class__} (<{self.key}> fixed)"
|
||||
|
||||
@property
|
||||
def value(self) -> str | float:
|
||||
return self._value if self._value is not None else "0"
|
||||
return self._value if self._value != "" else "0"
|
||||
|
||||
@value.setter
|
||||
def value(self, value: str | float) -> None:
|
||||
|
|
|
@ -37,17 +37,19 @@ class HonParameterProgram(HonParameterEnum):
|
|||
|
||||
@values.setter
|
||||
def values(self, values: List[str]) -> None:
|
||||
return
|
||||
raise ValueError("Cant set values {values}")
|
||||
|
||||
@property
|
||||
def ids(self) -> Dict[int, str]:
|
||||
values = {
|
||||
int(p.parameters["prCode"].value): n
|
||||
for i, (n, p) in enumerate(self._programs.items())
|
||||
if "iot_" not in n
|
||||
and p.parameters.get("prCode")
|
||||
and not ((fav := p.parameters.get("favourite")) and fav.value == "1")
|
||||
}
|
||||
values: Dict[int, str] = {}
|
||||
for name, parameter in self._programs.items():
|
||||
if "iot_" in name:
|
||||
continue
|
||||
if parameter.parameters.get("prCode"):
|
||||
continue
|
||||
if (fav := parameter.parameters.get("favourite")) and fav.value == "1":
|
||||
continue
|
||||
values[int(parameter.parameters["prCode"].value)] = name
|
||||
return dict(sorted(values.items()))
|
||||
|
||||
def set_value(self, value: str) -> None:
|
||||
|
|
|
@ -7,11 +7,20 @@ from pyhon.parameter.base import HonParameter
|
|||
class HonParameterRange(HonParameter):
|
||||
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
|
||||
super().__init__(key, attributes, group)
|
||||
self._min: float = str_to_float(attributes["minimumValue"])
|
||||
self._max: float = str_to_float(attributes["maximumValue"])
|
||||
self._step: float = str_to_float(attributes["incrementValue"])
|
||||
self._default: float = str_to_float(attributes.get("defaultValue", self.min))
|
||||
self._value: float = self._default
|
||||
self._min: float = 0
|
||||
self._max: float = 0
|
||||
self._step: float = 0
|
||||
self._default: float = 0
|
||||
self._value: float = 0
|
||||
self._set_attributes()
|
||||
|
||||
def _set_attributes(self) -> None:
|
||||
super()._set_attributes()
|
||||
self._min = str_to_float(self._attributes.get("minimumValue", 0))
|
||||
self._max = str_to_float(self._attributes.get("maximumValue", 0))
|
||||
self._step = str_to_float(self._attributes.get("incrementValue", 0))
|
||||
self._default = str_to_float(self._attributes.get("defaultValue", self.min))
|
||||
self._value = self._default
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"{self.__class__} (<{self.key}> [{self.min} - {self.max}])"
|
||||
|
@ -55,9 +64,8 @@ class HonParameterRange(HonParameter):
|
|||
self._value = value
|
||||
self.check_trigger(value)
|
||||
else:
|
||||
raise ValueError(
|
||||
f"Allowed: min {self.min} max {self.max} step {self.step} But was: {value}"
|
||||
)
|
||||
allowed = f"min {self.min} max {self.max} step {self.step}"
|
||||
raise ValueError(f"Allowed: {allowed} But was: {value}")
|
||||
|
||||
@property
|
||||
def values(self) -> List[str]:
|
||||
|
|
|
@ -29,33 +29,26 @@ def pretty_print(
|
|||
whitespace: str = " ",
|
||||
) -> str:
|
||||
result = ""
|
||||
space = whitespace * intend
|
||||
if isinstance(data, (dict, list)) and key:
|
||||
result += f"{space}{'- ' if is_list else ''}{key}:\n"
|
||||
intend += 1
|
||||
if isinstance(data, list):
|
||||
if key:
|
||||
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n"
|
||||
intend += 1
|
||||
for i, value in enumerate(data):
|
||||
result += pretty_print(
|
||||
value, intend=intend, is_list=True, whitespace=whitespace
|
||||
)
|
||||
elif isinstance(data, dict):
|
||||
if key:
|
||||
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n"
|
||||
intend += 1
|
||||
for i, (key, value) in enumerate(sorted(data.items())):
|
||||
if is_list and not i:
|
||||
result += pretty_print(
|
||||
value, key=key, intend=intend, is_list=True, whitespace=whitespace
|
||||
)
|
||||
elif is_list:
|
||||
result += pretty_print(
|
||||
value, key=key, intend=intend + 1, whitespace=whitespace
|
||||
)
|
||||
else:
|
||||
result += pretty_print(
|
||||
value, key=key, intend=intend, whitespace=whitespace
|
||||
)
|
||||
for i, (list_key, value) in enumerate(sorted(data.items())):
|
||||
result += pretty_print(
|
||||
value,
|
||||
key=list_key,
|
||||
intend=intend + (is_list if i else 0),
|
||||
is_list=is_list and not i,
|
||||
whitespace=whitespace,
|
||||
)
|
||||
else:
|
||||
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}\n"
|
||||
result += f"{space}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}\n"
|
||||
return result
|
||||
|
||||
|
||||
|
|
0
pyhon/py.typed
Normal file
0
pyhon/py.typed
Normal file
|
@ -3,6 +3,7 @@ from typing import List, Dict, TYPE_CHECKING, Any, Optional
|
|||
|
||||
from pyhon.parameter.enum import HonParameterEnum
|
||||
from pyhon.parameter.range import HonParameterRange
|
||||
from pyhon.typedefs import Parameter
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pyhon.commands import HonCommand
|
||||
|
@ -24,6 +25,10 @@ class HonRuleSet:
|
|||
self._rules: Dict[str, List[HonRule]] = {}
|
||||
self._parse_rule(rule)
|
||||
|
||||
@property
|
||||
def rules(self) -> Dict[str, List[HonRule]]:
|
||||
return self._rules
|
||||
|
||||
def _parse_rule(self, rule: Dict[str, Any]) -> None:
|
||||
for param_key, params in rule.items():
|
||||
param_key = self._command.appliance.options.get(param_key, param_key)
|
||||
|
@ -51,6 +56,11 @@ class HonRuleSet:
|
|||
extra[trigger_key] = trigger_value
|
||||
for extra_key, extra_data in param_data.items():
|
||||
self._parse_conditions(param_key, extra_key, extra_data, extra)
|
||||
else:
|
||||
param_data = {"typology": "fixed", "fixedValue": param_data}
|
||||
self._create_rule(
|
||||
param_key, trigger_key, trigger_value, param_data, extra
|
||||
)
|
||||
|
||||
def _create_rule(
|
||||
self,
|
||||
|
@ -83,28 +93,46 @@ class HonRuleSet:
|
|||
for rule in rules:
|
||||
self._rules.setdefault(key, []).append(rule)
|
||||
|
||||
def _extra_rules_matches(self, rule: HonRule) -> bool:
|
||||
if rule.extras:
|
||||
for key, value in rule.extras.items():
|
||||
if not self._command.parameters.get(key):
|
||||
return False
|
||||
if str(self._command.parameters.get(key)) != str(value):
|
||||
return False
|
||||
return True
|
||||
|
||||
def _apply_fixed(self, param: Parameter, value: str | float) -> None:
|
||||
if isinstance(param, HonParameterEnum) and set(param.values) != {str(value)}:
|
||||
param.values = [str(value)]
|
||||
param.value = str(value)
|
||||
elif isinstance(param, HonParameterRange):
|
||||
if float(value) < param.min:
|
||||
param.min = float(value)
|
||||
elif float(value) > param.max:
|
||||
param.max = float(value)
|
||||
param.value = float(value)
|
||||
return
|
||||
param.value = str(value)
|
||||
|
||||
def _apply_enum(self, param: Parameter, rule: HonRule) -> None:
|
||||
if not isinstance(param, HonParameterEnum):
|
||||
return
|
||||
if enum_values := rule.param_data.get("enumValues"):
|
||||
param.values = enum_values.split("|")
|
||||
if default_value := rule.param_data.get("defaultValue"):
|
||||
param.value = default_value
|
||||
|
||||
def _add_trigger(self, parameter: "HonParameter", data: HonRule) -> None:
|
||||
def apply(rule: HonRule) -> None:
|
||||
if rule.extras is not None:
|
||||
for key, value in rule.extras.items():
|
||||
if str(self._command.parameters.get(key)) != str(value):
|
||||
return
|
||||
if param := self._command.parameters.get(rule.param_key):
|
||||
if value := rule.param_data.get("fixedValue", ""):
|
||||
if isinstance(param, HonParameterEnum) and set(param.values) != {
|
||||
str(value)
|
||||
}:
|
||||
param.values = [str(value)]
|
||||
elif isinstance(param, HonParameterRange):
|
||||
param.value = float(value)
|
||||
return
|
||||
param.value = str(value)
|
||||
elif rule.param_data.get("typology") == "enum":
|
||||
if isinstance(param, HonParameterEnum):
|
||||
if enum_values := rule.param_data.get("enumValues"):
|
||||
param.values = enum_values.split("|")
|
||||
if default_value := rule.param_data.get("defaultValue"):
|
||||
param.value = default_value
|
||||
if not self._extra_rules_matches(rule):
|
||||
return
|
||||
if not (param := self._command.parameters.get(rule.param_key)):
|
||||
return
|
||||
if fixed_value := rule.param_data.get("fixedValue", ""):
|
||||
self._apply_fixed(param, fixed_value)
|
||||
elif rule.param_data.get("typology") == "enum":
|
||||
self._apply_enum(param, rule)
|
||||
|
||||
parameter.add_trigger(data.trigger_value, apply, data)
|
||||
|
||||
|
|
|
@ -11,7 +11,7 @@ if TYPE_CHECKING:
|
|||
from pyhon.parameter.range import HonParameterRange
|
||||
|
||||
|
||||
class Callback(Protocol):
|
||||
class Callback(Protocol): # pylint: disable=too-few-public-methods
|
||||
def __call__(
|
||||
self, url: str | URL, *args: Any, **kwargs: Any
|
||||
) -> aiohttp.client._RequestContextManager:
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
aiohttp~=3.8.5
|
||||
yarl~=1.9.2
|
||||
typing-extensions~=4.7.1
|
||||
aiohttp>=3.8
|
||||
yarl>=1.8
|
||||
typing-extensions>=4.8
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
black==23.3.0
|
||||
flake8==6.0.0
|
||||
mypy==1.2.0
|
||||
pylint==2.17.2
|
||||
black>=22.12
|
||||
flake8>=6.0
|
||||
mypy>=0.991
|
||||
pylint>=2.15
|
||||
setuptools>=62.3
|
||||
|
|
7
setup.py
7
setup.py
|
@ -2,12 +2,12 @@
|
|||
|
||||
from setuptools import setup, find_packages
|
||||
|
||||
with open("README.md", "r") as f:
|
||||
with open("README.md", "r", encoding="utf-8") as f:
|
||||
long_description = f.read()
|
||||
|
||||
setup(
|
||||
name="pyhOn",
|
||||
version="0.14.12",
|
||||
version="0.15.15",
|
||||
author="Andre Basche",
|
||||
description="Control hOn devices with python",
|
||||
long_description=long_description,
|
||||
|
@ -21,7 +21,7 @@ setup(
|
|||
packages=find_packages(),
|
||||
include_package_data=True,
|
||||
python_requires=">=3.10",
|
||||
install_requires=["aiohttp~=3.8.5", "typing-extensions~=4.7.1", "yarl~=1.9.2"],
|
||||
install_requires=["aiohttp>=3.8", "typing-extensions>=4.8", "yarl>=1.8"],
|
||||
classifiers=[
|
||||
"Development Status :: 4 - Beta",
|
||||
"Environment :: Console",
|
||||
|
@ -30,6 +30,7 @@ setup(
|
|||
"Operating System :: OS Independent",
|
||||
"Programming Language :: Python :: 3.10",
|
||||
"Programming Language :: Python :: 3.11",
|
||||
"Programming Language :: Python :: 3.12",
|
||||
"Topic :: Software Development :: Libraries :: Python Modules",
|
||||
],
|
||||
entry_points={
|
||||
|
|
Loading…
Reference in a new issue