From de3d965897bf29f80fcbba48ff30951d73bb802e Mon Sep 17 00:00:00 2001 From: Trammell Hudson Date: Sun, 12 Mar 2023 21:27:13 +0100 Subject: [PATCH] hc-login: hacks to work with the new singlekey login system --- hc-login | 208 +++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 149 insertions(+), 59 deletions(-) diff --git a/hc-login b/hc-login index 2503f98..985c474 100755 --- a/hc-login +++ b/hc-login @@ -18,15 +18,45 @@ from Crypto.Hash import SHA256 from zipfile import ZipFile from HCxml2json import xml2json +import logging + +# These two lines enable debugging at httplib level (requests->urllib3->http.client) +# You will see the REQUEST, including HEADERS and DATA, and RESPONSE with HEADERS but without DATA. +# The only thing missing will be the response.body which is not logged. +import http.client as http_client +#http_client.HTTPConnection.debuglevel = 1 + +# You must initialize logging, otherwise you'll not see debug output. +#logging.basicConfig() +#logging.getLogger().setLevel(logging.DEBUG) +#requests_log = logging.getLogger("requests.packages.urllib3") +#requests_log.setLevel(logging.DEBUG) +#requests_log.propagate = True + +def debug(*args): + print(*args, file=sys.stderr) + email = sys.argv[1] password = sys.argv[2] +headers = {"User-Agent": "hc-login/1.0"} + +session = requests.Session() +session.headers.update(headers) + base_url = 'https://api.home-connect.com/security/oauth/' asset_url = 'https://prod.reu.rest.homeconnectegw.com/' +##############3 +# +# Start by fetching the old login page, which gives +# us the verifier and challenge for getting the token, +# even after the singlekey detour. +# # The app_id and scope are hardcoded in the application app_id = '9B75AC9EC512F36C84256AC47D813E2C1DD0D6520DF774B020E1E6E2EB29B1F3' scope = ["ReadAccount","Settings","IdentifyAppliance","Control","DeleteAppliance","WriteAppliance","ReadOrigApi","Monitor","WriteOrigApi","Images"] +scope = ["ReadOrigApi",] def b64(b): return re.sub(r'=', '', base64url_encode(b).decode('UTF-8')) @@ -49,68 +79,124 @@ login_query = { } loginpage_url = base_url + 'authorize?' + urlencode(login_query) +token_url = base_url + 'token' + +debug(f"{loginpage_url=}") +r = session.get(loginpage_url) +if r.status_code != requests.codes.ok: + print("error fetching login url!", loginpage_url, r.text, file=sys.stderr) + exit(1) + +# get the session from the text +if not (match := re.search(r'"sessionId" value="(.*?)"', r.text)): + print("Unable to find session id in login page") + exit(1) +session_id = match[1] +if not (match := re.search(r'"sessionData" value="(.*?)"', r.text)): + print("Unable to find session data in login page") + exit(1) +session_data = match[1] + +debug("--------") + +# now that we have a session id, contact the +# single key host to start the new login flow +singlekey_host = 'https://singlekey-id.com' +login_url = singlekey_host + '/auth/en-us/log-in/' + +preauth_url = singlekey_host + "/auth/connect/authorize" +preauth_query = { + "client_id": "11F75C04-21C2-4DA9-A623-228B54E9A256", + "redirect_uri": "https://api.home-connect.com/security/oauth/redirect_target", + "response_type": "code", + "scope": "openid email profile offline_access homeconnect.general", + "prompt": "login", + "style_id": "bsh_hc_01", + "state": '{"session_id":"' + session_id + '"}', # important: no spaces! +} + +# fetch the preauth state to get the final callback url +preauth_url += "?" + urlencode(preauth_query) + +# loop until we have the callback url +while True: + debug(f"next {preauth_url=}") + r = session.get(preauth_url, allow_redirects=False) + if r.status_code == 200: + break + if r.status_code == 302 or r.status_code == 301: + preauth_url = r.headers["location"] + continue + print(f"2: {preauth_url=}: failed to fetch {r} {r.text}", file=sys.stderr) + exit(1) + +# get the ReturnUrl from the response +query = parse_qs(urlparse(preauth_url).query) +return_url = query["ReturnUrl"][0] +debug(f"{return_url=}") + +headers["RequestVerificationToken"] = r.cookies["X-CSRF-FORM-TOKEN"] +session.headers.update(headers) + +debug("--------") + +valid_url = singlekey_host + '/auth/api/v1/authentication/UserExists' +auth_url = singlekey_host + '/auth/api/v1/authentication/login' + +r = session.post(valid_url, json={"username": email}) +debug(f"{valid_url=}: {r} {r.text}") + + +login_fields = { + "username": email, + "password": password, + "keepMeSignedIn": False, + "returnUrl": return_url, +} + +r = session.post(auth_url, json=login_fields, allow_redirects=False) + +if r.status_code != 200: + debug(f"auth failed: {auth_url=}, {login_fields=} {r} {r.text}") + exit(-1) + +debug(f"{auth_url=}, {r} {r.text}") +return_url = json.loads(r.text)["returnUrl"] +if return_url.startswith("/"): + return_url = singlekey_host + return_url + +while True: + r = session.get(return_url, allow_redirects=False) + debug(f"{return_url=}, {r} {r.text}") + if r.status_code != 302: + break + return_url = r.headers["location"] + if return_url.startswith("hcauth://"): + break +debug(f"{return_url=}") + +debug("--------") + +url = urlparse(return_url) +query = parse_qs(url.query) +code = query.get("code")[0] +state = query.get("state")[0] +grant_type = query.get("grant_type")[0] # "authorization_code" + +debug(f"{code=} {grant_type=} {state=}") + auth_url = base_url + 'login' token_url = base_url + 'token' -r = requests.get(loginpage_url) -if r.status_code != requests.codes.ok: - print("error fetching login url!", login_url, file=sys.stderr) - exit(1) -loginpage = r.text - -#print('--------- got login page ----------') - -#with open("login.html") as fd: -# loginpage = fd.read() - -tree = html.fromstring(loginpage) - -# add in the email and password -auth_fields = { - "email": email, - "password": password, - "code_challenge": login_query["code_challenge"], - "code_challenge_method": login_query["code_challenge_method"], - "redirect_uri": login_query["redirect_uri"], -} - -for form in tree.forms: - if form.attrib.get("id") != "login_form": - continue - for field in form.fields: - if field not in auth_fields: - auth_fields[field] = form.fields.get(field) - -#print(auth_fields) - -# try to submit the form and get the redirect URL with the token -r = requests.post(auth_url, data=auth_fields, allow_redirects=False) -if r.status_code != 302: - print("Did not get a redirect; wrong username/password?", file=sys.stderr) - print(r.text) - exit(1) - -# Yes! -location = r.headers["location"] -url = urlparse(location) -query = parse_qs(url.query) -#print("response:", location, query) -code = query.get("code") -if not code: - print("Unable to find code in response?", location, file=sys.stderr) - sys.exit(1) - -#print('--------- got code page ----------') - token_fields = { - "grant_type": "authorization_code", + "grant_type": grant_type, "client_id": app_id, "code_verifier": verifier, - "code": code[0], + "code": code, "redirect_uri": login_query["redirect_uri"], } -#print(token_fields) +debug(f"{token_url=} {token_fields=}") r = requests.post(token_url, data=token_fields, allow_redirects=False) if r.status_code != requests.codes.ok: @@ -118,13 +204,11 @@ if r.status_code != requests.codes.ok: print(r.headers, r.text) exit(1) -#print('--------- got token page ----------') - -# Yes! -#print(r.text) +debug('--------- got token page ----------') token = json.loads(r.text)["access_token"] -print("Received access token", file=sys.stderr) +debug(f"Received access {token=}") + headers = { "Authorization": "Bearer " + token, } @@ -140,6 +224,8 @@ if r.status_code != requests.codes.ok: account = json.loads(r.text) configs = [] +print(account, file=sys.stderr) + for app in account["data"]["homeAppliances"]: app_brand = app["brand"] app_type = app["type"] @@ -170,7 +256,11 @@ for app in account["data"]["homeAppliances"]: next # we now have a zip file with XML, let's unpack them - z = ZipFile(io.BytesIO(r.content)) + content = r.content + print(app_url + ": " + app_id + ".zip", file=sys.stderr) + with open(app_id + ".zip", "wb") as f: + f.write(content) + z = ZipFile(io.BytesIO(content)) #print(z.infolist()) features = z.open(app_id + "_FeatureMapping.xml").read() description = z.open(app_id + "_DeviceDescription.xml").read()