pyhOn/pyhon/connection/api.py

167 lines
6.2 KiB
Python
Raw Normal View History

2023-02-13 03:36:09 +01:00
import asyncio
2023-02-13 01:41:38 +01:00
import json
import logging
from datetime import datetime
from typing import List
2023-02-13 03:36:09 +01:00
from pyhon import const
2023-04-09 18:13:50 +02:00
from pyhon.appliance import HonAppliance
from pyhon.connection.connection import HonConnectionHandler, HonAnonymousConnectionHandler
2023-02-13 01:41:38 +01:00
_LOGGER = logging.getLogger()
2023-04-09 18:13:50 +02:00
class HonAPI:
def __init__(self, email="", password="") -> None:
2023-02-13 01:41:38 +01:00
super().__init__()
self._email = email
self._password = password
self._devices = []
2023-04-09 18:13:50 +02:00
self._hon = None
self._hon_anonymous = HonAnonymousConnectionHandler()
2023-02-13 01:41:38 +01:00
async def __aenter__(self):
2023-04-09 18:13:50 +02:00
self._hon = HonConnectionHandler(self._email, self._password)
await self._hon.create()
await self.setup()
2023-02-13 01:41:38 +01:00
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
2023-04-09 18:13:50 +02:00
await self._hon.close()
2023-02-13 01:41:38 +01:00
@property
2023-04-09 18:13:50 +02:00
def devices(self) -> List[HonAppliance]:
2023-02-13 01:41:38 +01:00
return self._devices
async def setup(self):
2023-04-09 18:13:50 +02:00
async with self._hon.get(f"{const.API_URL}/commands/v1/appliance") as resp:
try:
appliances = (await resp.json())["payload"]["appliances"]
for appliance in appliances:
device = HonAppliance(self, appliance)
if device.mac_address is None:
continue
await asyncio.gather(*[
device.load_attributes(),
device.load_commands(),
device.load_statistics()])
self._devices.append(device)
except json.JSONDecodeError:
_LOGGER.error("No JSON Data after GET: %s", await resp.text())
return False
2023-02-13 01:41:38 +01:00
return True
2023-04-09 18:13:50 +02:00
async def load_commands(self, device: HonAppliance):
2023-02-13 01:41:38 +01:00
params = {
2023-03-08 00:58:25 +01:00
"applianceType": device.appliance_type,
"code": device.appliance["code"],
2023-02-13 01:41:38 +01:00
"applianceModelId": device.appliance_model_id,
"firmwareId": device.appliance["eepromId"],
2023-02-13 01:41:38 +01:00
"macAddress": device.mac_address,
2023-03-08 00:58:25 +01:00
"fwVersion": device.appliance["fwVersion"],
2023-02-13 01:41:38 +01:00
"os": const.OS,
"appVersion": const.APP_VERSION,
2023-03-08 00:58:25 +01:00
"series": device.appliance["series"],
2023-02-13 01:41:38 +01:00
}
url = f"{const.API_URL}/commands/v1/retrieve"
2023-04-09 18:13:50 +02:00
async with self._hon.get(url, params=params) as response:
2023-02-13 01:41:38 +01:00
result = (await response.json()).get("payload", {})
if not result or result.pop("resultCode") != "0":
return {}
return result
2023-04-09 18:13:50 +02:00
async def command_history(self, device: HonAppliance):
2023-03-11 02:31:56 +01:00
url = f"{const.API_URL}/commands/v1/appliance/{device.mac_address}/history"
2023-04-09 18:13:50 +02:00
async with self._hon.get(url) as response:
2023-03-11 02:31:56 +01:00
result = await response.json()
if not result or not result.get("payload"):
return {}
return result["payload"]["history"]
2023-04-09 18:13:50 +02:00
async def last_activity(self, device: HonAppliance):
2023-03-19 01:08:54 +01:00
url = f"{const.API_URL}/commands/v1/retrieve-last-activity"
params = {"macAddress": device.mac_address}
2023-04-09 18:13:50 +02:00
async with self._hon.get(url, params=params) as response:
2023-03-19 01:08:54 +01:00
result = await response.json()
if result and (activity := result.get("attributes")):
return activity
return {}
2023-04-09 18:13:50 +02:00
async def load_attributes(self, device: HonAppliance):
2023-02-13 01:41:38 +01:00
params = {
"macAddress": device.mac_address,
2023-03-08 00:58:25 +01:00
"applianceType": device.appliance_type,
2023-02-13 01:41:38 +01:00
"category": "CYCLE"
}
url = f"{const.API_URL}/commands/v1/context"
2023-04-09 18:13:50 +02:00
async with self._hon.get(url, params=params) as response:
2023-02-13 01:41:38 +01:00
return (await response.json()).get("payload", {})
2023-04-09 18:13:50 +02:00
async def load_statistics(self, device: HonAppliance):
2023-02-13 01:41:38 +01:00
params = {
"macAddress": device.mac_address,
2023-03-08 00:58:25 +01:00
"applianceType": device.appliance_type
2023-02-13 01:41:38 +01:00
}
url = f"{const.API_URL}/commands/v1/statistics"
2023-04-09 18:13:50 +02:00
async with self._hon.get(url, params=params) as response:
2023-02-13 01:41:38 +01:00
return (await response.json()).get("payload", {})
async def send_command(self, device, command, parameters, ancillary_parameters):
now = datetime.utcnow().isoformat()
data = {
"macAddress": device.mac_address,
"timestamp": f"{now[:-3]}Z",
"commandName": command,
"transactionId": f"{device.mac_address}_{now[:-3]}Z",
"applianceOptions": device.commands_options,
2023-04-09 18:13:50 +02:00
"device": self._hon.device.get(),
2023-02-13 01:41:38 +01:00
"attributes": {
"channel": "mobileApp",
"origin": "standardProgram",
"energyLabel": "0"
},
"ancillaryParameters": ancillary_parameters,
"parameters": parameters,
2023-03-08 23:01:40 +01:00
"applianceType": device.appliance_type
2023-02-13 01:41:38 +01:00
}
url = f"{const.API_URL}/commands/v1/send"
2023-04-09 18:13:50 +02:00
async with self._hon.post(url, json=data) as resp:
2023-02-13 01:41:38 +01:00
try:
json_data = await resp.json()
except json.JSONDecodeError:
return False
if json_data["payload"]["resultCode"] == "0":
return True
return False
2023-04-09 18:13:50 +02:00
async def appliance_configuration(self):
url = f"{const.API_URL}/config/v1/appliance-configuration"
async with self._hon_anonymous.get(url) as response:
result = await response.json()
if result and (data := result.get("payload")):
return data
return {}
async def app_config(self, language="en", beta=True):
url = f"{const.API_URL}/app-config"
payload = {
"languageCode": language,
"beta": beta,
"appVersion": const.APP_VERSION,
"os": const.OS
}
payload = json.dumps(payload, separators=(',', ':'))
async with self._hon_anonymous.post(url, data=payload) as response:
if (result := await response.json()) and (data := result.get("payload")):
return data
return {}
async def translation_keys(self, language="en"):
config = await self.app_config(language=language)
if url := config.get("language", {}).get("jsonPath"):
async with self._hon_anonymous.get(url) as response:
if result := await response.json():
return result
return {}