From e553adc03378484ea8954b9ecc83d6c96b390ea7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Pluta?= Date: Thu, 21 Nov 2024 14:10:01 +0100 Subject: [PATCH] Wrap hc_login module logic to a main function --- pyproject.toml | 2 +- src/hcpy/hc_login.py | 545 ++++++++++++++++++++++--------------------- 2 files changed, 275 insertions(+), 272 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index e50a7c7..92e6e6d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,5 +20,5 @@ dependencies = [ ] [project.scripts] -hc-login = "hcpy:hc_login" +hc-login = "hcpy.hc_login:main" hc2mqtt = "hcpy.hc2mqtt:hc2mqtt" \ No newline at end of file diff --git a/src/hcpy/hc_login.py b/src/hcpy/hc_login.py index 4934a31..d862d88 100755 --- a/src/hcpy/hc_login.py +++ b/src/hcpy/hc_login.py @@ -31,295 +31,298 @@ from hcpy.HCxml2json import xml2json # requests_log.setLevel(logging.DEBUG) # requests_log.propagate = True - -def debug(*args): - print(*args, file=sys.stderr) +def main(): + def debug(*args): + print(*args, file=sys.stderr) -email = sys.argv[1] -password = sys.argv[2] -devicefile = sys.argv[3] + email = sys.argv[1] + password = sys.argv[2] + devicefile = sys.argv[3] -headers = {"User-Agent": "hc-login/1.0"} + headers = {"User-Agent": "hc-login/1.0"} -session = requests.Session() -session.headers.update(headers) + session = requests.Session() + session.headers.update(headers) -base_url = "https://api.home-connect.com/security/oauth/" -asset_urls = [ - "https://prod.reu.rest.homeconnectegw.com/", # EU - "https://prod.rna.rest.homeconnectegw.com/", # US -] + base_url = "https://api.home-connect.com/security/oauth/" + asset_urls = [ + "https://prod.reu.rest.homeconnectegw.com/", # EU + "https://prod.rna.rest.homeconnectegw.com/", # US + ] -# -# Start by fetching the old login page, which gives -# us the verifier and challenge for getting the token, -# even after the singlekey detour. -# -# The app_id and scope are hardcoded in the application -app_id = "9B75AC9EC512F36C84256AC47D813E2C1DD0D6520DF774B020E1E6E2EB29B1F3" -scope = [ - "ReadAccount", - "Settings", - "IdentifyAppliance", - "Control", - "DeleteAppliance", - "WriteAppliance", - "ReadOrigApi", - "Monitor", - "WriteOrigApi", - "Images", -] -scope = [ - "ReadOrigApi", -] + # + # Start by fetching the old login page, which gives + # us the verifier and challenge for getting the token, + # even after the singlekey detour. + # + # The app_id and scope are hardcoded in the application + app_id = "9B75AC9EC512F36C84256AC47D813E2C1DD0D6520DF774B020E1E6E2EB29B1F3" + scope = [ + "ReadAccount", + "Settings", + "IdentifyAppliance", + "Control", + "DeleteAppliance", + "WriteAppliance", + "ReadOrigApi", + "Monitor", + "WriteOrigApi", + "Images", + ] + scope = [ + "ReadOrigApi", + ] -def b64(b): - return re.sub(r"=", "", base64url_encode(b).decode("UTF-8")) + def b64(b): + return re.sub(r"=", "", base64url_encode(b).decode("UTF-8")) -def b64random(num): - return b64(base64url_encode(get_random_bytes(num))) + def b64random(num): + return b64(base64url_encode(get_random_bytes(num))) -verifier = b64(get_random_bytes(32)) + verifier = b64(get_random_bytes(32)) -login_query = { - "response_type": "code", - "prompt": "login", - "code_challenge": b64(SHA256.new(verifier.encode("UTF-8")).digest()), - "code_challenge_method": "S256", - "client_id": app_id, - "scope": " ".join(scope), - "nonce": b64random(16), - "state": b64random(16), - "redirect_uri": "hcauth://auth/prod", - "redirect_target": "icore", -} - -loginpage_url = base_url + "authorize?" + urlencode(login_query) -token_url = base_url + "token" - -debug(f"{loginpage_url=}") -r = session.get(loginpage_url) -if r.status_code != requests.codes.ok: - print("error fetching login url!", loginpage_url, r.text, file=sys.stderr) - exit(1) - -# get the session from the text -if not (match := re.search(r'"sessionId" value="(.*?)"', r.text)): - print("Unable to find session id in login page") - exit(1) -session_id = match[1] -if not (match := re.search(r'"sessionData" value="(.*?)"', r.text)): - print("Unable to find session data in login page") - exit(1) -session_data = match[1] - -debug("--------") - -# now that we have a session id, contact the -# single key host to start the new login flow -singlekey_host = "https://singlekey-id.com" -login_url = singlekey_host + "/auth/en-us/log-in/" - -preauth_url = singlekey_host + "/auth/connect/authorize" -preauth_query = { - "client_id": "11F75C04-21C2-4DA9-A623-228B54E9A256", - "redirect_uri": "https://api.home-connect.com/security/oauth/redirect_target", - "response_type": "code", - "scope": "openid email profile offline_access homeconnect.general", - "prompt": "login", - "style_id": "bsh_hc_01", - "state": '{"session_id":"' + session_id + '"}', # important: no spaces! -} - -# fetch the preauth state to get the final callback url -preauth_url += "?" + urlencode(preauth_query) - -# loop until we have the callback url -while True: - debug(f"next {preauth_url=}") - r = session.get(preauth_url, allow_redirects=False) - if r.status_code == 200: - break - if r.status_code > 300 and r.status_code < 400: - preauth_url = r.headers["location"] - # Make relative locations absolute - if not bool(urlparse(preauth_url).netloc): - preauth_url = singlekey_host + preauth_url - continue - print(f"2: {preauth_url=}: failed to fetch {r} {r.text}", file=sys.stderr) - exit(1) - -# get the ReturnUrl from the response -query = parse_qs(urlparse(preauth_url).query) -return_url = query["ReturnUrl"][0] -debug(f"{return_url=}") - -if "X-CSRF-FORM-TOKEN" in r.cookies: - headers["RequestVerificationToken"] = r.cookies["X-CSRF-FORM-TOKEN"] -session.headers.update(headers) - -debug("--------") - -soup = BeautifulSoup(r.text, "html.parser") -requestVerificationToken = soup.find("input", {"name": "__RequestVerificationToken"}).get("value") -r = session.post( - preauth_url, - data={ - "UserIdentifierInput.EmailInput.StringValue": email, - "__RequestVerificationToken": requestVerificationToken, - }, - allow_redirects=False, -) - -password_url = r.headers["location"] -if not bool(urlparse(password_url).netloc): - password_url = singlekey_host + password_url - -r = session.get(password_url, allow_redirects=False) -soup = BeautifulSoup(r.text, "html.parser") -requestVerificationToken = soup.find("input", {"name": "__RequestVerificationToken"}).get("value") - -r = session.post( - password_url, - data={ - "Password": password, - "RememberMe": "false", - "__RequestVerificationToken": requestVerificationToken, - }, - allow_redirects=False, -) - -while True: - if return_url.startswith("/"): - return_url = singlekey_host + return_url - r = session.get(return_url, allow_redirects=False) - debug(f"{return_url=}, {r} {r.text}") - if r.status_code != 302: - break - return_url = r.headers["location"] - if return_url.startswith("hcauth://"): - break -debug(f"{return_url=}") - -debug("--------") - -url = urlparse(return_url) -query = parse_qs(url.query) - -if query.get("ReturnUrl") is not None: - print("Wrong credentials.") - print( - "If you forgot your login/password, you can restore them by opening " - "https://singlekey-id.com/auth/en-us/login in browser" - ) - exit(1) - -code = query.get("code")[0] -state = query.get("state")[0] -grant_type = query.get("grant_type")[0] # "authorization_code" - -debug(f"{code=} {grant_type=} {state=}") - -auth_url = base_url + "login" -token_url = base_url + "token" - -token_fields = { - "grant_type": grant_type, - "client_id": app_id, - "code_verifier": verifier, - "code": code, - "redirect_uri": login_query["redirect_uri"], -} - -debug(f"{token_url=} {token_fields=}") - -r = requests.post(token_url, data=token_fields, allow_redirects=False) -if r.status_code != requests.codes.ok: - print("Bad code?", file=sys.stderr) - print(r.headers, r.text) - exit(1) - -debug("--------- got token page ----------") - -token = json.loads(r.text)["access_token"] -debug(f"Received access {token=}") - -headers = { - "Authorization": "Bearer " + token, -} - - -# Try to request account details from all geos. Whichever works, we'll use next. -for asset_url in asset_urls: - r = requests.get(asset_url + "account/details", headers=headers) - if r.status_code == requests.codes.ok: - break - -# now we can fetch the rest of the account info -if r.status_code != requests.codes.ok: - print("unable to fetch account details", file=sys.stderr) - print(r.headers, r.text) - exit(1) - -# print(r.text) -account = json.loads(r.text) -configs = [] - -print(account, file=sys.stderr) - -for app in account["data"]["homeAppliances"]: - app_brand = app["brand"] - app_type = app["type"] - app_id = app["identifier"] - - config = { - "name": app_type.lower(), + login_query = { + "response_type": "code", + "prompt": "login", + "code_challenge": b64(SHA256.new(verifier.encode("UTF-8")).digest()), + "code_challenge_method": "S256", + "client_id": app_id, + "scope": " ".join(scope), + "nonce": b64random(16), + "state": b64random(16), + "redirect_uri": "hcauth://auth/prod", + "redirect_target": "icore", } - configs.append(config) + loginpage_url = base_url + "authorize?" + urlencode(login_query) + token_url = base_url + "token" - if "tls" in app: - # fancy machine with TLS support - config["host"] = app_brand + "-" + app_type + "-" + app_id - config["key"] = app["tls"]["key"] - else: - # less fancy machine with HTTP support - config["host"] = app_id - config["key"] = app["aes"]["key"] - config["iv"] = app["aes"]["iv"] - - # Fetch the XML zip file for this device - app_url = asset_url + "api/iddf/v1/iddf/" + app_id - print("fetching", app_url, file=sys.stderr) - r = requests.get(app_url, headers=headers) + debug(f"{loginpage_url=}") + r = session.get(loginpage_url) if r.status_code != requests.codes.ok: - print(app_id, ": unable to fetch machine description?") - next + print("error fetching login url!", loginpage_url, r.text, file=sys.stderr) + exit(1) - # we now have a zip file with XML, let's unpack them - content = r.content - print(app_url + ": " + app_id + ".zip", file=sys.stderr) - with open(app_id + ".zip", "wb") as f: - f.write(content) - z = ZipFile(io.BytesIO(content)) - # print(z.infolist()) - features = z.open(app_id + "_FeatureMapping.xml").read() - description = z.open(app_id + "_DeviceDescription.xml").read() + # get the session from the text + if not (match := re.search(r'"sessionId" value="(.*?)"', r.text)): + print("Unable to find session id in login page") + exit(1) + session_id = match[1] + if not (match := re.search(r'"sessionData" value="(.*?)"', r.text)): + print("Unable to find session data in login page") + exit(1) + session_data = match[1] - machine = xml2json(features, description) - config["description"] = machine["description"] - config["features"] = augment_device_features(machine["features"]) - print("Discovered device: " + config["name"] + " - Device hostname: " + config["host"]) + debug("--------") -with open(devicefile, "w") as f: - json.dump(configs, f, ensure_ascii=True, indent=4) + # now that we have a session id, contact the + # single key host to start the new login flow + singlekey_host = "https://singlekey-id.com" + login_url = singlekey_host + "/auth/en-us/log-in/" -print( - "Success. You can now edit " - + devicefile - + ", if needed, and run hc2mqtt.py or start Home Assistant addon again" -) + preauth_url = singlekey_host + "/auth/connect/authorize" + preauth_query = { + "client_id": "11F75C04-21C2-4DA9-A623-228B54E9A256", + "redirect_uri": "https://api.home-connect.com/security/oauth/redirect_target", + "response_type": "code", + "scope": "openid email profile offline_access homeconnect.general", + "prompt": "login", + "style_id": "bsh_hc_01", + "state": '{"session_id":"' + session_id + '"}', # important: no spaces! + } + + # fetch the preauth state to get the final callback url + preauth_url += "?" + urlencode(preauth_query) + + # loop until we have the callback url + while True: + debug(f"next {preauth_url=}") + r = session.get(preauth_url, allow_redirects=False) + if r.status_code == 200: + break + if r.status_code > 300 and r.status_code < 400: + preauth_url = r.headers["location"] + # Make relative locations absolute + if not bool(urlparse(preauth_url).netloc): + preauth_url = singlekey_host + preauth_url + continue + print(f"2: {preauth_url=}: failed to fetch {r} {r.text}", file=sys.stderr) + exit(1) + + # get the ReturnUrl from the response + query = parse_qs(urlparse(preauth_url).query) + return_url = query["ReturnUrl"][0] + debug(f"{return_url=}") + + if "X-CSRF-FORM-TOKEN" in r.cookies: + headers["RequestVerificationToken"] = r.cookies["X-CSRF-FORM-TOKEN"] + session.headers.update(headers) + + debug("--------") + + soup = BeautifulSoup(r.text, "html.parser") + requestVerificationToken = soup.find("input", {"name": "__RequestVerificationToken"}).get("value") + r = session.post( + preauth_url, + data={ + "UserIdentifierInput.EmailInput.StringValue": email, + "__RequestVerificationToken": requestVerificationToken, + }, + allow_redirects=False, + ) + + password_url = r.headers["location"] + if not bool(urlparse(password_url).netloc): + password_url = singlekey_host + password_url + + r = session.get(password_url, allow_redirects=False) + soup = BeautifulSoup(r.text, "html.parser") + requestVerificationToken = soup.find("input", {"name": "__RequestVerificationToken"}).get("value") + + r = session.post( + password_url, + data={ + "Password": password, + "RememberMe": "false", + "__RequestVerificationToken": requestVerificationToken, + }, + allow_redirects=False, + ) + + while True: + if return_url.startswith("/"): + return_url = singlekey_host + return_url + r = session.get(return_url, allow_redirects=False) + debug(f"{return_url=}, {r} {r.text}") + if r.status_code != 302: + break + return_url = r.headers["location"] + if return_url.startswith("hcauth://"): + break + debug(f"{return_url=}") + + debug("--------") + + url = urlparse(return_url) + query = parse_qs(url.query) + + if query.get("ReturnUrl") is not None: + print("Wrong credentials.") + print( + "If you forgot your login/password, you can restore them by opening " + "https://singlekey-id.com/auth/en-us/login in browser" + ) + exit(1) + + code = query.get("code")[0] + state = query.get("state")[0] + grant_type = query.get("grant_type")[0] # "authorization_code" + + debug(f"{code=} {grant_type=} {state=}") + + auth_url = base_url + "login" + token_url = base_url + "token" + + token_fields = { + "grant_type": grant_type, + "client_id": app_id, + "code_verifier": verifier, + "code": code, + "redirect_uri": login_query["redirect_uri"], + } + + debug(f"{token_url=} {token_fields=}") + + r = requests.post(token_url, data=token_fields, allow_redirects=False) + if r.status_code != requests.codes.ok: + print("Bad code?", file=sys.stderr) + print(r.headers, r.text) + exit(1) + + debug("--------- got token page ----------") + + token = json.loads(r.text)["access_token"] + debug(f"Received access {token=}") + + headers = { + "Authorization": "Bearer " + token, + } + + + # Try to request account details from all geos. Whichever works, we'll use next. + for asset_url in asset_urls: + r = requests.get(asset_url + "account/details", headers=headers) + if r.status_code == requests.codes.ok: + break + + # now we can fetch the rest of the account info + if r.status_code != requests.codes.ok: + print("unable to fetch account details", file=sys.stderr) + print(r.headers, r.text) + exit(1) + + # print(r.text) + account = json.loads(r.text) + configs = [] + + print(account, file=sys.stderr) + + for app in account["data"]["homeAppliances"]: + app_brand = app["brand"] + app_type = app["type"] + app_id = app["identifier"] + + config = { + "name": app_type.lower(), + } + + configs.append(config) + + if "tls" in app: + # fancy machine with TLS support + config["host"] = app_brand + "-" + app_type + "-" + app_id + config["key"] = app["tls"]["key"] + else: + # less fancy machine with HTTP support + config["host"] = app_id + config["key"] = app["aes"]["key"] + config["iv"] = app["aes"]["iv"] + + # Fetch the XML zip file for this device + app_url = asset_url + "api/iddf/v1/iddf/" + app_id + print("fetching", app_url, file=sys.stderr) + r = requests.get(app_url, headers=headers) + if r.status_code != requests.codes.ok: + print(app_id, ": unable to fetch machine description?") + next + + # we now have a zip file with XML, let's unpack them + content = r.content + print(app_url + ": " + app_id + ".zip", file=sys.stderr) + with open(app_id + ".zip", "wb") as f: + f.write(content) + z = ZipFile(io.BytesIO(content)) + # print(z.infolist()) + features = z.open(app_id + "_FeatureMapping.xml").read() + description = z.open(app_id + "_DeviceDescription.xml").read() + + machine = xml2json(features, description) + config["description"] = machine["description"] + config["features"] = augment_device_features(machine["features"]) + print("Discovered device: " + config["name"] + " - Device hostname: " + config["host"]) + + with open(devicefile, "w") as f: + json.dump(configs, f, ensure_ascii=True, indent=4) + + print( + "Success. You can now edit " + + devicefile + + ", if needed, and run hc2mqtt.py or start Home Assistant addon again" + ) + +if __name__ == "__main__": + main()