Improve logging for authentication errors

This commit is contained in:
Andre Basche 2023-04-11 00:59:00 +02:00
parent e857fe91de
commit b4b782c52c
4 changed files with 46 additions and 29 deletions

View file

@ -2,7 +2,6 @@ import json
import logging import logging
import re import re
import secrets import secrets
import sys
import urllib import urllib
from pprint import pformat from pprint import pformat
from urllib import parse from urllib import parse
@ -10,6 +9,7 @@ from urllib import parse
from yarl import URL from yarl import URL
from pyhon import const from pyhon import const
from pyhon.exceptions import HonAuthenticationError
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -24,6 +24,7 @@ class HonAuth:
self._cognito_token = "" self._cognito_token = ""
self._id_token = "" self._id_token = ""
self._device = device self._device = device
self._called_urls = []
@property @property
def cognito_token(self): def cognito_token(self):
@ -41,6 +42,16 @@ class HonAuth:
def refresh_token(self): def refresh_token(self):
return self._refresh_token return self._refresh_token
async def _error_logger(self, response, fail=True):
result = "hOn Authentication Error\n"
for i, (status, url) in enumerate(self._called_urls):
result += f" {i + 1: 2d} {status} - {url}\n"
result += f"ERROR - {response.status} - {response.request_info.url}\n"
result += f"{15 * '='} Response {15 * '='}\n{await response.text()}\n{40 * '='}"
_LOGGER.error(result)
if fail:
raise HonAuthenticationError("Can't login")
async def _load_login(self): async def _load_login(self):
nonce = secrets.token_hex(16) nonce = secrets.token_hex(16)
nonce = f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}" nonce = f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}"
@ -58,22 +69,27 @@ class HonAuth:
async with self._session.get( async with self._session.get(
f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}" f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}"
) as response: ) as response:
_LOGGER.debug("%s - %s", response.status, response.request_info.url) self._called_urls.append((response.status, response.request_info.url))
if not (login_url := re.findall("url = '(.+?)'", await response.text())): if not (login_url := re.findall("url = '(.+?)'", await response.text())):
await self._error_logger(response)
return False return False
async with self._session.get(login_url[0], allow_redirects=False) as redirect1: async with self._session.get(login_url[0], allow_redirects=False) as redirect1:
_LOGGER.debug("%s - %s", redirect1.status, redirect1.request_info.url) self._called_urls.append((redirect1.status, redirect1.request_info.url))
if not (url := redirect1.headers.get("Location")): if not (url := redirect1.headers.get("Location")):
await self._error_logger(redirect1)
return False return False
async with self._session.get(url, allow_redirects=False) as redirect2: async with self._session.get(url, allow_redirects=False) as redirect2:
_LOGGER.debug("%s - %s", redirect2.status, redirect2.request_info.url) self._called_urls.append((redirect2.status, redirect2.request_info.url))
if not ( if not (
url := redirect2.headers.get("Location") url := redirect2.headers.get("Location")
+ "&System=IoT_Mobile_App&RegistrationSubChannel=hOn" + "&System=IoT_Mobile_App&RegistrationSubChannel=hOn"
): ):
await self._error_logger(redirect2)
return False return False
async with self._session.get(URL(url, encoded=True)) as login_screen: async with self._session.get(URL(url, encoded=True)) as login_screen:
_LOGGER.debug("%s - %s", login_screen.status, login_screen.request_info.url) self._called_urls.append(
(login_screen.status, login_screen.request_info.url)
)
if context := re.findall( if context := re.findall(
'"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text() '"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text()
): ):
@ -83,6 +99,7 @@ class HonAuth:
"/".join(const.AUTH_API.split("/")[:-1]), "" "/".join(const.AUTH_API.split("/")[:-1]), ""
) )
return fw_uid, loaded, login_url return fw_uid, loaded, login_url
await self._error_logger(login_screen)
return False return False
async def _login(self, fw_uid, loaded, login_url): async def _login(self, fw_uid, loaded, login_url):
@ -122,7 +139,7 @@ class HonAuth:
data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()), data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()),
params=params, params=params,
) as response: ) as response:
_LOGGER.debug("%s - %s", response.status, response.request_info.url) self._called_urls.append((response.status, response.request_info.url))
if response.status == 200: if response.status == 200:
try: try:
data = await response.json() data = await response.json()
@ -133,35 +150,31 @@ class HonAuth:
_LOGGER.error( _LOGGER.error(
"Can't get login url - %s", pformat(await response.json()) "Can't get login url - %s", pformat(await response.json())
) )
_LOGGER.error( await self._error_logger(response)
"Unable to login: %s\n%s", response.status, await response.text()
)
return "" return ""
async def _get_token(self, url): async def _get_token(self, url):
async with self._session.get(url) as response: async with self._session.get(url) as response:
_LOGGER.debug("%s - %s", response.status, response.request_info.url) self._called_urls.append((response.status, response.request_info.url))
if response.status != 200: if response.status != 200:
_LOGGER.error("Unable to get token: %s", response.status) await self._error_logger(response)
return False return False
url = re.findall("href\\s*=\\s*[\"'](.+?)[\"']", await response.text()) url = re.findall("href\\s*=\\s*[\"'](.+?)[\"']", await response.text())
if not url: if not url:
_LOGGER.error("Can't get login url - \n%s", await response.text()) await self._error_logger(response)
raise PermissionError return False
if "ProgressiveLogin" in url[0]: if "ProgressiveLogin" in url[0]:
async with self._session.get(url[0]) as response: async with self._session.get(url[0]) as response:
_LOGGER.debug("%s - %s", response.status, response.request_info.url) self._called_urls.append((response.status, response.request_info.url))
if response.status != 200: if response.status != 200:
_LOGGER.error("Unable to get token: %s", response.status) await self._error_logger(response)
return False return False
url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await response.text()) url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await response.text())
url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0] url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0]
async with self._session.get(url) as response: async with self._session.get(url) as response:
_LOGGER.debug("%s - %s", response.status, response.request_info.url) self._called_urls.append((response.status, response.request_info.url))
if response.status != 200: if response.status != 200:
_LOGGER.error( await self._error_logger(response)
"Unable to connect to the login service: %s", response.status
)
return False return False
text = await response.text() text = await response.text()
if access_token := re.findall("access_token=(.*?)&", text): if access_token := re.findall("access_token=(.*?)&", text):
@ -187,11 +200,11 @@ class HonAuth:
async with self._session.post( async with self._session.post(
f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data
) as response: ) as response:
_LOGGER.debug("%s - %s", response.status, response.request_info.url) self._called_urls.append((response.status, response.request_info.url))
try: try:
json_data = await response.json() json_data = await response.json()
except json.JSONDecodeError: except json.JSONDecodeError:
_LOGGER.error("No JSON Data after POST: %s", await response.text()) await self._error_logger(response)
return False return False
self._cognito_token = json_data["cognitoUser"]["Token"] self._cognito_token = json_data["cognitoUser"]["Token"]
return True return True
@ -205,8 +218,9 @@ class HonAuth:
async with self._session.post( async with self._session.post(
f"{const.AUTH_API}/services/oauth2/token", params=params f"{const.AUTH_API}/services/oauth2/token", params=params
) as response: ) as response:
_LOGGER.debug("%s - %s", response.status, response.request_info.url) self._called_urls.append((response.status, response.request_info.url))
if response.status >= 400: if response.status >= 400:
await self._error_logger(response, fail=False)
return False return False
data = await response.json() data = await response.json()
self._id_token = data["id_token"] self._id_token = data["id_token"]

View file

@ -6,6 +6,7 @@ import aiohttp
from pyhon import const from pyhon import const
from pyhon.connection.auth import HonAuth, _LOGGER from pyhon.connection.auth import HonAuth, _LOGGER
from pyhon.connection.device import HonDevice from pyhon.connection.device import HonDevice
from pyhon.exceptions import HonAuthenticationError
class HonBaseConnectionHandler: class HonBaseConnectionHandler:
@ -50,9 +51,9 @@ class HonConnectionHandler(HonBaseConnectionHandler):
self._email = email self._email = email
self._password = password self._password = password
if not self._email: if not self._email:
raise PermissionError("Login-Error - An email address must be specified") raise HonAuthenticationError("An email address must be specified")
if not self._password: if not self._password:
raise PermissionError("Login-Error - A password address must be specified") raise HonAuthenticationError("A password address must be specified")
self._request_headers = {} self._request_headers = {}
@property @property
@ -73,7 +74,7 @@ class HonConnectionHandler(HonBaseConnectionHandler):
self._request_headers["cognito-token"] = self._auth.cognito_token self._request_headers["cognito-token"] = self._auth.cognito_token
self._request_headers["id-token"] = self._auth.id_token self._request_headers["id-token"] = self._auth.id_token
else: else:
raise PermissionError("Can't Login") raise HonAuthenticationError("Can't login")
return {h: v for h, v in self._request_headers.items() if h not in headers} return {h: v for h, v in self._request_headers.items() if h not in headers}
@asynccontextmanager @asynccontextmanager
@ -100,7 +101,7 @@ class HonConnectionHandler(HonBaseConnectionHandler):
response.status, response.status,
await response.text(), await response.text(),
) )
raise PermissionError("Login failure") raise HonAuthenticationError("Login failure")
else: else:
try: try:
await response.json() await response.json()
@ -123,5 +124,5 @@ class HonAnonymousConnectionHandler(HonBaseConnectionHandler):
kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS
async with method(*args, **kwargs) as response: async with method(*args, **kwargs) as response:
if response.status == 403: if response.status == 403:
print("Can't authorize") _LOGGER.error("Can't authenticate anymore")
yield response yield response

2
pyhon/exceptions.py Normal file
View file

@ -0,0 +1,2 @@
class HonAuthenticationError(Exception):
pass

View file

@ -7,7 +7,7 @@ with open("README.md", "r") as f:
setup( setup(
name="pyhOn", name="pyhOn",
version="0.6.3", version="0.6.4",
author="Andre Basche", author="Andre Basche",
description="Control hOn devices with python", description="Control hOn devices with python",
long_description=long_description, long_description=long_description,