Format with black

This commit is contained in:
Andre Basche 2023-04-09 20:55:36 +02:00
parent 36fad84ee2
commit 8dc6cd71cd
11 changed files with 166 additions and 65 deletions

View file

@ -25,8 +25,12 @@ def get_arguments():
keys = subparser.add_parser("keys", help="print as key format")
keys.add_argument("keys", help="print as key format", action="store_true")
keys.add_argument("--all", help="print also full keys", action="store_true")
translate = subparser.add_parser("translate", help="print available translation keys")
translate.add_argument("translate", help="language (de, en, fr...)", metavar="LANGUAGE")
translate = subparser.add_parser(
"translate", help="print available translation keys"
)
translate.add_argument(
"translate", help="language (de, en, fr...)", metavar="LANGUAGE"
)
translate.add_argument("--json", help="print as json", action="store_true")
return vars(parser.parse_args())
@ -51,7 +55,9 @@ def pretty_print(data, key="", intend=0, is_list=False):
else:
pretty_print(value, key=key, intend=intend)
else:
print(f"{' ' * intend}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}")
print(
f"{' ' * intend}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}"
)
def key_print(data, key="", start=True):
@ -90,7 +96,12 @@ async def translate(language, json_output=False):
if json_output:
print(json.dumps(keys, indent=4))
else:
clean_keys = json.dumps(keys).replace("\\n", "\\\\n").replace("\\\\r", "").replace("\\r", "")
clean_keys = (
json.dumps(keys)
.replace("\\n", "\\\\n")
.replace("\\\\r", "")
.replace("\\r", "")
)
keys = json.loads(clean_keys)
pretty_print(keys)
@ -126,5 +137,5 @@ def start():
print("Aborted.")
if __name__ == '__main__':
if __name__ == "__main__":
start()

View file

@ -18,7 +18,9 @@ class HonAppliance:
self._attributes = {}
try:
self._extra = importlib.import_module(f'pyhon.appliances.{self.appliance_type.lower()}').Appliance()
self._extra = importlib.import_module(
f"pyhon.appliances.{self.appliance_type.lower()}"
).Appliance()
except ModuleNotFoundError:
self._extra = None
@ -87,7 +89,14 @@ class HonAppliance:
async def _recover_last_command_states(self, commands):
command_history = await self._api.command_history(self)
for name, command in commands.items():
last = next((index for (index, d) in enumerate(command_history) if d.get("command", {}).get("commandName") == name), None)
last = next(
(
index
for (index, d) in enumerate(command_history)
if d.get("command", {}).get("commandName") == name
),
None,
)
if last is None:
continue
parameters = command_history[last].get("command", {}).get("parameters", {})
@ -95,7 +104,10 @@ class HonAppliance:
command.set_program(parameters.pop("program").split(".")[-1].lower())
command = self.commands[name]
for key, data in command.settings.items():
if not isinstance(data, HonParameterFixed) and parameters.get(key) is not None:
if (
not isinstance(data, HonParameterFixed)
and parameters.get(key) is not None
):
with suppress(ValueError):
data.value = parameters.get(key)
@ -112,7 +124,9 @@ class HonAppliance:
multi = {}
for program, attr2 in attr.items():
program = program.split(".")[-1].lower()
cmd = HonCommand(command, attr2, self._api, self, multi=multi, program=program)
cmd = HonCommand(
command, attr2, self._api, self, multi=multi, program=program
)
multi[program] = cmd
commands[command] = cmd
self._commands = commands
@ -149,8 +163,12 @@ class HonAppliance:
@property
def data(self):
result = {"attributes": self.attributes, "appliance": self.info, "statistics": self.statistics,
**self.parameters}
result = {
"attributes": self.attributes,
"appliance": self.info,
"statistics": self.statistics,
**self.parameters,
}
if self._extra:
return self._extra.data(result)
return result

View file

@ -4,7 +4,7 @@ from pyhon.parameter import HonParameterEnum
class Appliance:
def __init__(self):
filters = ["receipt", "standard, special"]
data = {'defaultValue': filters[0], 'enumValues': filters}
data = {"defaultValue": filters[0], "enumValues": filters}
self._program_filter = HonParameterEnum("program_filter", data)
def data(self, data):

View file

@ -1,4 +1,9 @@
from pyhon.parameter import HonParameterFixed, HonParameterEnum, HonParameterRange, HonParameterProgram
from pyhon.parameter import (
HonParameterFixed,
HonParameterEnum,
HonParameterRange,
HonParameterProgram,
)
class HonCommand:
@ -10,7 +15,9 @@ class HonCommand:
self._program = program
self._description = attributes.get("description", "")
self._parameters = self._create_parameters(attributes.get("parameters", {}))
self._ancillary_parameters = self._create_parameters(attributes.get("ancillaryParameters", {}))
self._ancillary_parameters = self._create_parameters(
attributes.get("ancillaryParameters", {})
)
def __repr__(self):
return f"{self._name} command"
@ -35,11 +42,18 @@ class HonCommand:
@property
def ancillary_parameters(self):
return {key: parameter.value for key, parameter in self._ancillary_parameters.items()}
return {
key: parameter.value
for key, parameter in self._ancillary_parameters.items()
}
async def send(self):
parameters = {name: parameter.value for name, parameter in self._parameters.items()}
return await self._connector.send_command(self._device, self._name, parameters, self.ancillary_parameters)
parameters = {
name: parameter.value for name, parameter in self._parameters.items()
}
return await self._connector.send_command(
self._device, self._name, parameters, self.ancillary_parameters
)
def get_programs(self):
return self._multi
@ -61,11 +75,16 @@ class HonCommand:
def setting_keys(self):
if not self._multi:
return self._get_settings_keys()
result = [key for cmd in self._multi.values() for key in self._get_settings_keys(cmd)]
result = [
key for cmd in self._multi.values() for key in self._get_settings_keys(cmd)
]
return list(set(result + ["program"]))
@property
def settings(self):
"""Parameters with typology enum and range"""
return {s: self._parameters.get(s) for s in self.setting_keys if self._parameters.get(s) is not None}
return {
s: self._parameters.get(s)
for s in self.setting_keys
if self._parameters.get(s) is not None
}

View file

@ -71,7 +71,7 @@ class HonAPI:
params = {
"macAddress": appliance.mac_address,
"applianceType": appliance.appliance_type,
"category": "CYCLE"
"category": "CYCLE",
}
url = f"{const.API_URL}/commands/v1/context"
async with self._hon.get(url, params=params) as response:
@ -80,7 +80,7 @@ class HonAPI:
async def load_statistics(self, appliance: HonAppliance):
params = {
"macAddress": appliance.mac_address,
"applianceType": appliance.appliance_type
"applianceType": appliance.appliance_type,
}
url = f"{const.API_URL}/commands/v1/statistics"
async with self._hon.get(url, params=params) as response:
@ -98,11 +98,11 @@ class HonAPI:
"attributes": {
"channel": "mobileApp",
"origin": "standardProgram",
"energyLabel": "0"
"energyLabel": "0",
},
"ancillaryParameters": ancillary_parameters,
"parameters": parameters,
"applianceType": appliance.appliance_type
"applianceType": appliance.appliance_type,
}
url = f"{const.API_URL}/commands/v1/send"
async with self._hon.post(url, json=data) as resp:
@ -125,9 +125,9 @@ class HonAPI:
"languageCode": language,
"beta": beta,
"appVersion": const.APP_VERSION,
"os": const.OS
"os": const.OS,
}
payload = json.dumps(payload, separators=(',', ':'))
payload = json.dumps(payload, separators=(",", ":"))
async with self._hon_anonymous.post(url, data=payload) as response:
if (result := await response.json()) and (data := result.get("payload")):
return data

View file

@ -45,26 +45,37 @@ class HonAuth:
params = {
"response_type": "token+id_token",
"client_id": const.CLIENT_ID,
"redirect_uri": urllib.parse.quote(f"{const.APP}://mobilesdk/detect/oauth/done"),
"redirect_uri": urllib.parse.quote(
f"{const.APP}://mobilesdk/detect/oauth/done"
),
"display": "touch",
"scope": "api openid refresh_token web",
"nonce": nonce
"nonce": nonce,
}
params = "&".join([f"{k}={v}" for k, v in params.items()])
async with self._session.get(f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}") as resp:
async with self._session.get(
f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}"
) as resp:
if not (login_url := re.findall("url = '(.+?)'", await resp.text())):
return False
async with self._session.get(login_url[0], allow_redirects=False) as redirect1:
if not (url := redirect1.headers.get("Location")):
return False
async with self._session.get(url, allow_redirects=False) as redirect2:
if not (url := redirect2.headers.get("Location") + "&System=IoT_Mobile_App&RegistrationSubChannel=hOn"):
if not (
url := redirect2.headers.get("Location")
+ "&System=IoT_Mobile_App&RegistrationSubChannel=hOn"
):
return False
async with self._session.get(URL(url, encoded=True)) as login_screen:
if context := re.findall('"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text()):
if context := re.findall(
'"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text()
):
fw_uid, loaded_str = context[0]
loaded = json.loads(loaded_str)
login_url = login_url[0].replace("/".join(const.AUTH_API.split("/")[:-1]), "")
login_url = login_url[0].replace(
"/".join(const.AUTH_API.split("/")[:-1]), ""
)
return fw_uid, loaded, login_url
return False
@ -79,8 +90,10 @@ class HonAuth:
"params": {
"username": self._email,
"password": self._password,
"startUrl": parse.unquote(login_url.split("startURL=")[-1]).split("%3D")[0]
}
"startUrl": parse.unquote(
login_url.split("startURL=")[-1]
).split("%3D")[0],
},
}
]
},
@ -91,23 +104,28 @@ class HonAuth:
"loaded": loaded,
"dn": [],
"globals": {},
"uad": False},
"uad": False,
},
"aura.pageURI": login_url,
"aura.token": None}
"aura.token": None,
}
params = {"r": 3, "other.LightningLoginCustom.login": 1}
async with self._session.post(
const.AUTH_API + "/s/sfsites/aura",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()),
params=params
const.AUTH_API + "/s/sfsites/aura",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()),
params=params,
) as response:
if response.status == 200:
try:
return (await response.json())["events"][0]["attributes"]["values"]["url"]
return (await response.json())["events"][0]["attributes"]["values"][
"url"
]
except json.JSONDecodeError:
pass
_LOGGER.error("Unable to login: %s\n%s", response.status, await response.text())
_LOGGER.error(
"Unable to login: %s\n%s", response.status, await response.text()
)
return ""
async def _get_token(self, url):
@ -147,7 +165,9 @@ class HonAuth:
post_headers = {"id-token": self._id_token}
data = self._device.get()
async with self._session.post(f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data) as resp:
async with self._session.post(
f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data
) as resp:
try:
json_data = await resp.json()
except json.JSONDecodeError:
@ -160,13 +180,13 @@ class HonAuth:
params = {
"client_id": const.CLIENT_ID,
"refresh_token": self._refresh_token,
"grant_type": "refresh_token"
"grant_type": "refresh_token",
}
async with self._session.post(f"{const.AUTH_API}/services/oauth2/token", params=params) as resp:
async with self._session.post(
f"{const.AUTH_API}/services/oauth2/token", params=params
) as resp:
if resp.status >= 400:
return False
data = await resp.json()
self._id_token = data["id_token"]
self._access_token = data["access_token"]

View file

@ -32,5 +32,10 @@ class HonDevice:
return self._mobile_id
def get(self):
return {"appVersion": self.app_version, "mobileId": self.mobile_id, "osVersion": self.os_version,
"os": self.os, "deviceModel": self.device_model}
return {
"appVersion": self.app_version,
"mobileId": self.mobile_id,
"osVersion": self.os_version,
"os": self.os,
"deviceModel": self.device_model,
}

View file

@ -59,7 +59,10 @@ class HonConnectionHandler(HonBaseConnectionHandler):
return self
async def _check_headers(self, headers):
if "cognito-token" not in self._request_headers or "id-token" not in self._request_headers:
if (
"cognito-token" not in self._request_headers
or "id-token" not in self._request_headers
):
if await self._auth.authorize():
self._request_headers["cognito-token"] = self._auth.cognito_token
self._request_headers["id-token"] = self._auth.id_token
@ -76,19 +79,33 @@ class HonConnectionHandler(HonBaseConnectionHandler):
await self._auth.refresh()
yield await self._intercept(method, *args, loop=loop + 1, **kwargs)
elif response.status == 403 and loop < 2:
_LOGGER.warning("%s - Error %s - %s", response.request_info.url, response.status, await response.text())
_LOGGER.warning(
"%s - Error %s - %s",
response.request_info.url,
response.status,
await response.text(),
)
await self.create()
yield await self._intercept(method, *args, loop=loop + 1, **kwargs)
elif loop >= 2:
_LOGGER.error("%s - Error %s - %s", response.request_info.url, response.status, await response.text())
_LOGGER.error(
"%s - Error %s - %s",
response.request_info.url,
response.status,
await response.text(),
)
raise PermissionError("Login failure")
else:
try:
await response.json()
yield response
except json.JSONDecodeError:
_LOGGER.warning("%s - JsonDecodeError %s - %s", response.request_info.url, response.status,
await response.text())
_LOGGER.warning(
"%s - JsonDecodeError %s - %s",
response.request_info.url,
response.status,
await response.text(),
)
yield {}
@asynccontextmanager

View file

@ -29,8 +29,11 @@ class Hon:
appliance = HonAppliance(self._api, appliance)
if appliance.mac_address is None:
continue
await asyncio.gather(*[
appliance.load_attributes(),
appliance.load_commands(),
appliance.load_statistics()])
await asyncio.gather(
*[
appliance.load_attributes(),
appliance.load_commands(),
appliance.load_statistics(),
]
)
self._appliances.append(appliance)

View file

@ -79,7 +79,9 @@ class HonParameterRange(HonParameter):
if self._min <= value <= self._max and not value % self._step:
self._value = value
else:
raise ValueError(f"Allowed: min {self._min} max {self._max} step {self._step}")
raise ValueError(
f"Allowed: min {self._min} max {self._max} step {self._step}"
)
class HonParameterEnum(HonParameter):
@ -138,4 +140,10 @@ class HonParameterProgram(HonParameterEnum):
@property
def values(self):
return sorted([str(value) for value in self._values if not self._filter or self._filter in str(value)])
return sorted(
[
str(value)
for value in self._values
if not self._filter or self._filter in str(value)
]
)

View file

@ -11,7 +11,7 @@ setup(
author="Andre Basche",
description="Control hOn devices with python",
long_description=long_description,
long_description_content_type='text/markdown',
long_description_content_type="text/markdown",
project_urls={
"GitHub": "https://github.com/Andre0512/pyhOn",
"PyPI": "https://pypi.org/project/pyhOn",
@ -33,8 +33,8 @@ setup(
"Topic :: Software Development :: Libraries :: Python Modules",
],
entry_points={
'console_scripts': [
'pyhOn = pyhon.__main__:start',
"console_scripts": [
"pyhOn = pyhon.__main__:start",
]
}
},
)