Files
hcpy/hc-login

181 lines
5.0 KiB
Python
Executable File

#!/usr/bin/env python3
# This directly follows the OAuth login flow that is opaquely described
# https://github.com/openid/AppAuth-Android
# A really nice walk through of how it works is:
# https://auth0.com/docs/get-started/authentication-and-authorization-flow/call-your-api-using-the-authorization-code-flow-with-pkce
import requests
from urllib.parse import urlparse, parse_qs, urlencode
from lxml import html
import io
import re
import sys
import json
from time import time
from base64 import b64decode as base64_decode
from base64 import urlsafe_b64encode as base64url_encode
from Crypto.Random import get_random_bytes
from Crypto.Hash import SHA256
from zipfile import ZipFile
from HCxml2json import xml2json
email = sys.argv[1]
password = sys.argv[2]
base_url = 'https://api.home-connect.com/security/oauth/'
asset_url = 'https://prod.reu.rest.homeconnectegw.com/'
# The app_id and scope are hardcoded in the application
app_id = '9B75AC9EC512F36C84256AC47D813E2C1DD0D6520DF774B020E1E6E2EB29B1F3'
scope = ["ReadAccount","Settings","IdentifyAppliance","Control","DeleteAppliance","WriteAppliance","ReadOrigApi","Monitor","WriteOrigApi","Images"]
def b64(b):
return re.sub(r'=', '', base64url_encode(b).decode('UTF-8'))
def b64random(num):
return b64(base64url_encode(get_random_bytes(num)))
verifier = b64(get_random_bytes(32))
login_query = {
"response_type": "code",
"prompt": "login",
"code_challenge": b64(SHA256.new(verifier.encode('UTF-8')).digest()),
"code_challenge_method": "S256",
"client_id": app_id,
"scope": ' '.join(scope),
"nonce": b64random(16),
"state": b64random(16),
"redirect_uri": 'hcauth://auth/prod',
}
loginpage_url = base_url + 'authorize?' + urlencode(login_query)
auth_url = base_url + 'login'
token_url = base_url + 'token'
r = requests.get(loginpage_url)
if r.status_code != requests.codes.ok:
print("error fetching login url!", file=sys.stderr)
exit(1)
loginpage = r.text
#print('--------- got login page ----------')
#with open("login.html") as fd:
# loginpage = fd.read()
tree = html.fromstring(loginpage)
# add in the email and password
auth_fields = {
"email": email,
"password": password,
"code_challenge": login_query["code_challenge"],
"code_challenge_method": login_query["code_challenge_method"],
"redirect_uri": login_query["redirect_uri"],
}
for form in tree.forms:
if form.attrib.get("id") != "login_form":
continue
for field in form.fields:
if field not in auth_fields:
auth_fields[field] = form.fields.get(field)
#print(auth_fields)
# try to submit the form and get the redirect URL with the token
r = requests.post(auth_url, data=auth_fields, allow_redirects=False)
if r.status_code != 302:
print("Did not get a redirect; wrong username/password?", file=sys.stderr)
exit(1)
# Yes!
location = r.headers["location"]
url = urlparse(location)
query = parse_qs(url.query)
#print("response:", location, query)
code = query.get("code")
if not code:
print("Unable to find code in response?", location, file=sys.stderr)
sys.exit(1)
#print('--------- got code page ----------')
token_fields = {
"grant_type": "authorization_code",
"client_id": app_id,
"code_verifier": verifier,
"code": code[0],
"redirect_uri": login_query["redirect_uri"],
}
#print(token_fields)
r = requests.post(token_url, data=token_fields, allow_redirects=False)
if r.status_code != requests.codes.ok:
print("Bad code?", file=sys.stderr)
print(r.headers, r.text)
exit(1)
#print('--------- got token page ----------')
# Yes!
#print(r.text)
token = json.loads(r.text)["access_token"]
print("Received access token", file=sys.stderr)
headers = {
"Authorization": "Bearer " + token,
}
# now we can fetch the rest of the account info
r = requests.get(asset_url + "account/details", headers=headers)
if r.status_code != requests.codes.ok:
print("unable to fetch account details", file=sys.stderr)
print(r.headers, r.text)
exit(1)
#print(r.text)
account = json.loads(r.text)
configs = []
for app in account["data"]["homeAppliances"]:
app_brand = app["brand"]
app_type = app["type"]
app_id = app["identifier"]
config = {
"name": app_type.lower(),
}
configs.append(config)
if "tls" in app:
# fancy machine with TLS support
config["host"] =app_brand + "-" + app_type + "-" + app_id
config["key"] = app["tls"]["key"]
else:
# less fancy machine with HTTP support
config["host"] = app_id
config["key"] = app["aes"]["key"]
config["iv"] = app["aes"]["iv"]
# Fetch the XML zip file for this device
app_url = asset_url + "api/iddf/v1/iddf/" + app_id
print("fetching", app_url, file=sys.stderr)
r = requests.get(app_url, headers=headers)
if r.status_code != requests.codes.ok:
print(app_id, ": unable to fetch machine description?")
next
# we now have a zip file with XML, let's unpack them
z = ZipFile(io.BytesIO(r.content))
#print(z.infolist())
features = z.open(app_id + "_FeatureMapping.xml").read()
description = z.open(app_id + "_DeviceDescription.xml").read()
machine = xml2json(features, description)
config["description"] = machine["description"]
config["features"] = machine["features"]
print(json.dumps(configs, indent=4))