#!/usr/bin/env python3 # This directly follows the OAuth login flow that is opaquely described # https://github.com/openid/AppAuth-Android # A really nice walk through of how it works is: # https://auth0.com/docs/get-started/authentication-and-authorization-flow/call-your-api-using-the-authorization-code-flow-with-pkce import requests from urllib.parse import urlparse, parse_qs, urlencode from lxml import html import io import re import sys import json from time import time from base64 import b64decode as base64_decode from base64 import urlsafe_b64encode as base64url_encode from Crypto.Random import get_random_bytes from Crypto.Hash import SHA256 from zipfile import ZipFile email = sys.argv[1] password = sys.argv[2] base_url = 'https://api.home-connect.com/security/oauth/' asset_url = 'https://prod.reu.rest.homeconnectegw.com/' # The app_id and scope are hardcoded in the application app_id = '9B75AC9EC512F36C84256AC47D813E2C1DD0D6520DF774B020E1E6E2EB29B1F3' scope = ["ReadAccount","Settings","IdentifyAppliance","Control","DeleteAppliance","WriteAppliance","ReadOrigApi","Monitor","WriteOrigApi","Images"] def b64(b): return re.sub(r'=', '', base64url_encode(b).decode('UTF-8')) def b64random(num): return b64(base64url_encode(get_random_bytes(num))) verifier = b64(get_random_bytes(32)) login_query = { "response_type": "code", "prompt": "login", "code_challenge": b64(SHA256.new(verifier.encode('UTF-8')).digest()), "code_challenge_method": "S256", "client_id": app_id, "scope": ' '.join(scope), "nonce": b64random(16), "state": b64random(16), "redirect_uri": 'hcauth://auth/prod', } loginpage_url = base_url + 'authorize?' + urlencode(login_query) auth_url = base_url + 'login' token_url = base_url + 'token' r = requests.get(loginpage_url) if r.status_code != requests.codes.ok: print("error fetching login url!", file=sys.stderr) exit(1) loginpage = r.text #print('--------- got login page ----------') #with open("login.html") as fd: # loginpage = fd.read() tree = html.fromstring(loginpage) # add in the email and password auth_fields = { "email": email, "password": password, "code_challenge": login_query["code_challenge"], "code_challenge_method": login_query["code_challenge_method"], "redirect_uri": login_query["redirect_uri"], } for form in tree.forms: if form.attrib.get("id") != "login_form": continue for field in form.fields: if field not in auth_fields: auth_fields[field] = form.fields.get(field) #print(auth_fields) # try to submit the form and get the redirect URL with the token r = requests.post(auth_url, data=auth_fields, allow_redirects=False) if r.status_code != 302: print("Did not get a redirect; wrong username/password?", file=sys.stderr) exit(1) # Yes! location = r.headers["location"] url = urlparse(location) query = parse_qs(url.query) #print("response:", location, query) code = query.get("code") if not code: print("Unable to find code in response?", location, file=sys.stderr) sys.exit(1) #print('--------- got code page ----------') token_fields = { "grant_type": "authorization_code", "client_id": app_id, "code_verifier": verifier, "code": code[0], "redirect_uri": login_query["redirect_uri"], } #print(token_fields) r = requests.post(token_url, data=token_fields, allow_redirects=False) if r.status_code != requests.codes.ok: print("Bad code?", file=sys.stderr) print(r.headers, r.text) exit(1) #print('--------- got token page ----------') # Yes! #print(r.text) token = json.loads(r.text)["access_token"] headers = { "Authorization": "Bearer " + token, } # now we can fetch the rest of the account info r = requests.get(asset_url + "account/details", headers=headers) if r.status_code != requests.codes.ok: print("unable to fetch account details", file=sys.stderr) print(r.headers, r.text) exit(1) #print(r.text) account = json.loads(r.text) configs = [] for app in account["data"]["homeAppliances"]: app_brand = app["brand"] app_type = app["type"] app_id = app["identifier"] config = { "name": app_type.lower(), } configs.append(config) if "tls" in app: # fancy machine with TLS support config["host"] =app_brand + "-" + app_type + "-" + app_id config["key"] = app["tls"]["key"] else: # less fancy machine with HTTP support config["host"] = app_id config["key"] = app["aes"]["key"] config["iv"] = app["aes"]["iv"] # Fetch the XML zip file for this device app_url = asset_url + "api/iddf/v1/iddf/" + app_id print("fetching", app_url) r = requests.get(app_url, headers=headers) if r.status_code != requests.codes.ok: print(app_id, ": unable to fetch machine description?") next # we now have a zip file with XML, let's unpack them z = ZipFile(io.BytesIO(r.content)) print(z.infolist()) print(configs)