384 lines
13 KiB
Python
384 lines
13 KiB
Python
import importlib
|
|
import json
|
|
import logging
|
|
from contextlib import suppress
|
|
from copy import copy
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Optional, Dict, Any
|
|
from typing import TYPE_CHECKING
|
|
|
|
from pyhon import helper
|
|
from pyhon.attributes import HonAttribute
|
|
from pyhon.commands import HonCommand
|
|
from pyhon.parameter.base import HonParameter
|
|
from pyhon.parameter.fixed import HonParameterFixed
|
|
from pyhon.parameter.range import HonParameterRange
|
|
|
|
if TYPE_CHECKING:
|
|
from pyhon import HonAPI
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
class HonAppliance:
|
|
_MINIMAL_UPDATE_INTERVAL = 5 # seconds
|
|
|
|
def __init__(
|
|
self, api: Optional["HonAPI"], info: Dict[str, Any], zone: int = 0
|
|
) -> None:
|
|
if attributes := info.get("attributes"):
|
|
info["attributes"] = {v["parName"]: v["parValue"] for v in attributes}
|
|
self._info: Dict = info
|
|
self._api: Optional[HonAPI] = api
|
|
self._appliance_model: Dict = {}
|
|
|
|
self._commands: Dict = {}
|
|
self._statistics: Dict = {}
|
|
self._attributes: Dict = {}
|
|
self._zone: int = zone
|
|
self._additional_data: Dict[str, Any] = {}
|
|
self._last_update = None
|
|
self._default_setting = HonParameter("", {}, "")
|
|
|
|
try:
|
|
self._extra = importlib.import_module(
|
|
f"pyhon.appliances.{self.appliance_type.lower()}"
|
|
).Appliance(self)
|
|
except ModuleNotFoundError:
|
|
self._extra = None
|
|
|
|
def __getitem__(self, item):
|
|
if self._zone:
|
|
item += f"Z{self._zone}"
|
|
if "." in item:
|
|
result = self.data
|
|
for key in item.split("."):
|
|
if all(k in "0123456789" for k in key) and isinstance(result, list):
|
|
result = result[int(key)]
|
|
else:
|
|
result = result[key]
|
|
return result
|
|
if item in self.data:
|
|
return self.data[item]
|
|
if item in self.attributes["parameters"]:
|
|
return self.attributes["parameters"][item].value
|
|
return self.info[item]
|
|
|
|
def get(self, item, default=None):
|
|
try:
|
|
return self[item]
|
|
except (KeyError, IndexError):
|
|
return default
|
|
|
|
def _check_name_zone(self, name: str, frontend: bool = True) -> str:
|
|
middle = " Z" if frontend else "_z"
|
|
if (attribute := self._info.get(name, "")) and self._zone:
|
|
return f"{attribute}{middle}{self._zone}"
|
|
return attribute
|
|
|
|
@property
|
|
def appliance_model_id(self) -> str:
|
|
return self._info.get("applianceModelId", "")
|
|
|
|
@property
|
|
def appliance_type(self) -> str:
|
|
return self._info.get("applianceTypeName", "")
|
|
|
|
@property
|
|
def mac_address(self) -> str:
|
|
return self.info.get("macAddress", "")
|
|
|
|
@property
|
|
def unique_id(self) -> str:
|
|
return self._check_name_zone("macAddress", frontend=False)
|
|
|
|
@property
|
|
def model_name(self) -> str:
|
|
return self._check_name_zone("modelName")
|
|
|
|
@property
|
|
def nick_name(self) -> str:
|
|
return self._check_name_zone("nickName")
|
|
|
|
@property
|
|
def code(self) -> str:
|
|
if code := self.info.get("code"):
|
|
return code
|
|
serial_number = self.info.get("serialNumber", "")
|
|
return serial_number[:8] if len(serial_number) < 18 else serial_number[:11]
|
|
|
|
@property
|
|
def options(self):
|
|
return self._appliance_model.get("options", {})
|
|
|
|
@property
|
|
def commands(self):
|
|
return self._commands
|
|
|
|
@property
|
|
def attributes(self):
|
|
return self._attributes
|
|
|
|
@property
|
|
def statistics(self):
|
|
return self._statistics
|
|
|
|
@property
|
|
def info(self):
|
|
return self._info
|
|
|
|
@property
|
|
def additional_data(self):
|
|
return self._additional_data
|
|
|
|
@property
|
|
def zone(self) -> int:
|
|
return self._zone
|
|
|
|
@property
|
|
def api(self) -> Optional["HonAPI"]:
|
|
return self._api
|
|
|
|
async def _recover_last_command_states(self):
|
|
command_history = await self.api.command_history(self)
|
|
for name, command in self._commands.items():
|
|
last = next(
|
|
(
|
|
index
|
|
for (index, d) in enumerate(command_history)
|
|
if d.get("command", {}).get("commandName") == name
|
|
),
|
|
None,
|
|
)
|
|
if last is None:
|
|
continue
|
|
parameters = command_history[last].get("command", {}).get("parameters", {})
|
|
if command.categories and (
|
|
parameters.get("program") or parameters.get("category")
|
|
):
|
|
if parameters.get("program"):
|
|
command.category = parameters.pop("program").split(".")[-1].lower()
|
|
else:
|
|
command.category = parameters.pop("category")
|
|
command = self.commands[name]
|
|
for key, data in command.settings.items():
|
|
if (
|
|
not isinstance(data, HonParameterFixed)
|
|
and parameters.get(key) is not None
|
|
):
|
|
with suppress(ValueError):
|
|
data.value = parameters.get(key)
|
|
|
|
def _get_categories(self, command, data):
|
|
categories = {}
|
|
for category, value in data.items():
|
|
result = self._get_command(value, command, category, categories)
|
|
if result:
|
|
if "PROGRAM" in category:
|
|
category = category.split(".")[-1].lower()
|
|
categories[category] = result[0]
|
|
if categories:
|
|
if "setParameters" in categories:
|
|
return [categories["setParameters"]]
|
|
return [list(categories.values())[0]]
|
|
return []
|
|
|
|
def _get_commands(self, data):
|
|
commands = []
|
|
for command, value in data.items():
|
|
commands += self._get_command(value, command, "")
|
|
return {c.name: c for c in commands}
|
|
|
|
def _get_command(self, data, command="", category="", categories=None):
|
|
commands = []
|
|
if isinstance(data, dict):
|
|
if data.get("description") and data.get("protocolType", None):
|
|
commands += [
|
|
HonCommand(
|
|
command,
|
|
data,
|
|
self,
|
|
category_name=category,
|
|
categories=categories,
|
|
)
|
|
]
|
|
else:
|
|
commands += self._get_categories(command, data)
|
|
elif category:
|
|
self._additional_data.setdefault(command, {})[category] = data
|
|
else:
|
|
self._additional_data[command] = data
|
|
return commands
|
|
|
|
async def load_commands(self):
|
|
raw = await self.api.load_commands(self)
|
|
self._appliance_model = raw.pop("applianceModel")
|
|
raw.pop("dictionaryId", None)
|
|
self._commands = self._get_commands(raw)
|
|
await self._add_favourites()
|
|
await self._recover_last_command_states()
|
|
|
|
async def _add_favourites(self):
|
|
favourites = await self._api.command_favourites(self)
|
|
for favourite in favourites:
|
|
name = favourite.get("favouriteName")
|
|
command = favourite.get("command")
|
|
command_name = command.get("commandName")
|
|
program_name = command.get("programName", "").split(".")[-1].lower()
|
|
base = copy(self._commands[command_name].categories[program_name])
|
|
for data in command.values():
|
|
if isinstance(data, str):
|
|
continue
|
|
for key, value in data.items():
|
|
if parameter := base.parameters.get(key):
|
|
with suppress(ValueError):
|
|
parameter.value = value
|
|
extra_param = HonParameterFixed("favourite", {"fixedValue": "1"}, "custom")
|
|
base.parameters.update(favourite=extra_param)
|
|
base.parameters["program"].set_value(name)
|
|
self._commands[command_name].categories[name] = base
|
|
|
|
async def load_attributes(self):
|
|
self._attributes = await self.api.load_attributes(self)
|
|
for name, values in self._attributes.pop("shadow").get("parameters").items():
|
|
if name in self._attributes.get("parameters", {}):
|
|
self._attributes["parameters"][name].update(values)
|
|
else:
|
|
self._attributes.setdefault("parameters", {})[name] = HonAttribute(values)
|
|
if self._extra:
|
|
self._attributes = self._extra.attributes(self._attributes)
|
|
|
|
async def load_statistics(self):
|
|
self._statistics = await self.api.load_statistics(self)
|
|
self._statistics |= await self.api.load_maintenance(self)
|
|
|
|
async def update(self, force=False):
|
|
now = datetime.now()
|
|
if (
|
|
force
|
|
or not self._last_update
|
|
or self._last_update
|
|
< now - timedelta(seconds=self._MINIMAL_UPDATE_INTERVAL)
|
|
):
|
|
self._last_update = now
|
|
await self.load_attributes()
|
|
|
|
@property
|
|
def command_parameters(self):
|
|
return {n: c.parameter_value for n, c in self._commands.items()}
|
|
|
|
@property
|
|
def settings(self):
|
|
result = {}
|
|
for name, command in self._commands.items():
|
|
for key in command.setting_keys:
|
|
setting = command.settings.get(key, self._default_setting)
|
|
result[f"{name}.{key}"] = setting
|
|
if self._extra:
|
|
return self._extra.settings(result)
|
|
return result
|
|
|
|
@property
|
|
def available_settings(self):
|
|
result = []
|
|
for name, command in self._commands.items():
|
|
for key in command.setting_keys:
|
|
result.append(f"{name}.{key}")
|
|
return result
|
|
|
|
@property
|
|
def data(self):
|
|
result = {
|
|
"attributes": self.attributes,
|
|
"appliance": self.info,
|
|
"statistics": self.statistics,
|
|
"additional_data": self._additional_data,
|
|
**self.command_parameters,
|
|
**self.attributes,
|
|
}
|
|
return result
|
|
|
|
def diagnose(self, whitespace=" ", command_only=False):
|
|
data = {
|
|
"attributes": self.attributes.copy(),
|
|
"appliance": self.info,
|
|
"statistics": self.statistics,
|
|
"additional_data": self._additional_data,
|
|
}
|
|
if command_only:
|
|
data.pop("attributes")
|
|
data.pop("appliance")
|
|
data.pop("statistics")
|
|
data |= {n: c.parameter_groups for n, c in self._commands.items()}
|
|
extra = {n: c.data for n, c in self._commands.items() if c.data}
|
|
if extra:
|
|
data |= {"extra_command_data": extra}
|
|
for sensible in ["PK", "SK", "serialNumber", "coords", "device"]:
|
|
data.get("appliance", {}).pop(sensible, None)
|
|
result = helper.pretty_print({"data": data}, whitespace=whitespace)
|
|
result += helper.pretty_print(
|
|
{
|
|
"commands": helper.create_command(self.commands),
|
|
"rules": helper.create_rules(self.commands),
|
|
},
|
|
whitespace=whitespace,
|
|
)
|
|
return result.replace(self.mac_address, "xx-xx-xx-xx-xx-xx")
|
|
|
|
def sync_to_params(self, command_name):
|
|
command: HonCommand = self.commands.get(command_name)
|
|
for key, value in self.attributes.get("parameters", {}).items():
|
|
if isinstance(value, str) and (new := command.parameters.get(key)):
|
|
self.attributes["parameters"][key].update(str(new.intern_value), shield=True)
|
|
|
|
def sync_command(self, main, target=None) -> None:
|
|
base: HonCommand = self.commands.get(main)
|
|
for command, data in self.commands.items():
|
|
if command == main or target and command not in target:
|
|
continue
|
|
for name, parameter in data.parameters.items():
|
|
if base_value := base.parameters.get(name):
|
|
if isinstance(base_value, HonParameterRange) and isinstance(
|
|
parameter, HonParameterRange
|
|
):
|
|
parameter.max = base_value.max
|
|
parameter.min = base_value.min
|
|
parameter.step = base_value.step
|
|
elif isinstance(parameter, HonParameterRange):
|
|
parameter.max = int(base_value.value)
|
|
parameter.min = int(base_value.value)
|
|
parameter.step = 1
|
|
parameter.value = base_value.value
|
|
|
|
|
|
class HonApplianceTest(HonAppliance):
|
|
def __init__(self, name):
|
|
super().__init__(None, {})
|
|
self._name = name
|
|
self.load_commands()
|
|
self.load_attributes()
|
|
self._info = self._appliance_model
|
|
|
|
def load_commands(self):
|
|
device = Path(__file__).parent / "test_data" / f"{self._name}.json"
|
|
with open(str(device)) as f:
|
|
raw = json.loads(f.read())
|
|
self._appliance_model = raw.pop("applianceModel")
|
|
raw.pop("dictionaryId", None)
|
|
self._commands = self._get_commands(raw)
|
|
|
|
async def update(self):
|
|
return
|
|
|
|
@property
|
|
def nick_name(self) -> str:
|
|
return self._name
|
|
|
|
@property
|
|
def unique_id(self) -> str:
|
|
return self._name
|
|
|
|
@property
|
|
def mac_address(self) -> str:
|
|
return "xx-xx-xx-xx-xx-xx"
|