From 1ca89995a2e35d2f537225ccc210311f0d37c2ed Mon Sep 17 00:00:00 2001 From: Andre Basche Date: Tue, 13 Jun 2023 00:39:18 +0200 Subject: [PATCH] Lock attributes --- pyhon/appliance.py | 2 +- pyhon/attributes.py | 29 +++++++++++++++++++++++++---- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/pyhon/appliance.py b/pyhon/appliance.py index bfb16b2..76514ff 100644 --- a/pyhon/appliance.py +++ b/pyhon/appliance.py @@ -330,7 +330,7 @@ class HonAppliance: command: HonCommand = self.commands.get(command_name) for key, value in self.attributes.get("parameters", {}).items(): if isinstance(value, str) and (new := command.parameters.get(key)): - self.attributes["parameters"][key].value = str(new.intern_value) + self.attributes["parameters"][key].update(str(new.intern_value), shield=True) def sync_command(self, main, target=None) -> None: base: HonCommand = self.commands.get(main) diff --git a/pyhon/attributes.py b/pyhon/attributes.py index 3a45c29..55ec0b7 100644 --- a/pyhon/attributes.py +++ b/pyhon/attributes.py @@ -1,17 +1,21 @@ -from datetime import datetime -from typing import Optional +from datetime import datetime, timedelta +from typing import Optional, Final, Dict from pyhon.helper import str_to_float class HonAttribute: + _LOCK_TIMEOUT: Final = 10 + def __init__(self, data): self._value: str = "" self._last_update: Optional[datetime] = None + self._lock_timestamp: Optional[datetime] = None self.update(data) @property def value(self) -> float | str: + """Attribute value""" try: return str_to_float(self._value) except ValueError: @@ -23,15 +27,32 @@ class HonAttribute: @property def last_update(self) -> Optional[datetime]: + """Timestamp of last api update""" return self._last_update - def update(self, data): - self._value = data.get("parNewVal", "") + @property + def lock(self) -> bool: + """Shows if value changes are forbidden""" + if not self._lock_timestamp: + return False + lock_until = self._lock_timestamp + timedelta(seconds=self._LOCK_TIMEOUT) + return lock_until >= datetime.utcnow() + + def update(self, data: Dict[str, str] | str, shield: bool = False) -> bool: + if self.lock and not shield: + return False + if shield: + self._lock_timestamp = datetime.utcnow() + if isinstance(data, str): + self.value = data + return True + self.value = data.get("parNewVal", "") if last_update := data.get("lastUpdate"): try: self._last_update = datetime.fromisoformat(last_update) except ValueError: self._last_update = None + return True def __str__(self) -> str: return self._value