[pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci
This commit is contained in:
pre-commit-ci[bot]
2024-03-19 18:38:07 +00:00
parent d8a5e22cb9
commit 30e12f54ba
5 changed files with 705 additions and 609 deletions

304
hc-login
View File

@@ -3,39 +3,37 @@
# https://github.com/openid/AppAuth-Android
# A really nice walk through of how it works is:
# https://auth0.com/docs/get-started/authentication-and-authorization-flow/call-your-api-using-the-authorization-code-flow-with-pkce
import requests
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
from lxml import html
import io
import json
import re
import sys
import json
from time import time
from base64 import b64decode as base64_decode
from base64 import urlsafe_b64encode as base64url_encode
from bs4 import BeautifulSoup
from Crypto.Random import get_random_bytes
from Crypto.Hash import SHA256
from urllib.parse import parse_qs, urlencode, urlparse
from zipfile import ZipFile
from HCxml2json import xml2json
import logging
import requests
from bs4 import BeautifulSoup
from Crypto.Hash import SHA256
from Crypto.Random import get_random_bytes
from HCxml2json import xml2json
# These two lines enable debugging at httplib level (requests->urllib3->http.client)
# You will see the REQUEST, including HEADERS and DATA, and RESPONSE with HEADERS but without DATA.
# The only thing missing will be the response.body which is not logged.
import http.client as http_client
#http_client.HTTPConnection.debuglevel = 1
# http_client.HTTPConnection.debuglevel = 1
# You must initialize logging, otherwise you'll not see debug output.
#logging.basicConfig()
#logging.getLogger().setLevel(logging.DEBUG)
#requests_log = logging.getLogger("requests.packages.urllib3")
#requests_log.setLevel(logging.DEBUG)
#requests_log.propagate = True
# logging.basicConfig()
# logging.getLogger().setLevel(logging.DEBUG)
# requests_log = logging.getLogger("requests.packages.urllib3")
# requests_log.setLevel(logging.DEBUG)
# requests_log.propagate = True
def debug(*args):
print(*args, file=sys.stderr)
print(*args, file=sys.stderr)
email = sys.argv[1]
password = sys.argv[2]
@@ -45,8 +43,8 @@ headers = {"User-Agent": "hc-login/1.0"}
session = requests.Session()
session.headers.update(headers)
base_url = 'https://api.home-connect.com/security/oauth/'
asset_url = 'https://prod.reu.rest.homeconnectegw.com/'
base_url = "https://api.home-connect.com/security/oauth/"
asset_url = "https://prod.reu.rest.homeconnectegw.com/"
##############3
#
@@ -55,65 +53,82 @@ asset_url = 'https://prod.reu.rest.homeconnectegw.com/'
# even after the singlekey detour.
#
# The app_id and scope are hardcoded in the application
app_id = '9B75AC9EC512F36C84256AC47D813E2C1DD0D6520DF774B020E1E6E2EB29B1F3'
scope = ["ReadAccount","Settings","IdentifyAppliance","Control","DeleteAppliance","WriteAppliance","ReadOrigApi","Monitor","WriteOrigApi","Images"]
scope = ["ReadOrigApi",]
app_id = "9B75AC9EC512F36C84256AC47D813E2C1DD0D6520DF774B020E1E6E2EB29B1F3"
scope = [
"ReadAccount",
"Settings",
"IdentifyAppliance",
"Control",
"DeleteAppliance",
"WriteAppliance",
"ReadOrigApi",
"Monitor",
"WriteOrigApi",
"Images",
]
scope = [
"ReadOrigApi",
]
def b64(b):
return re.sub(r'=', '', base64url_encode(b).decode('UTF-8'))
return re.sub(r"=", "", base64url_encode(b).decode("UTF-8"))
def b64random(num):
return b64(base64url_encode(get_random_bytes(num)))
return b64(base64url_encode(get_random_bytes(num)))
verifier = b64(get_random_bytes(32))
login_query = {
"response_type": "code",
"prompt": "login",
"code_challenge": b64(SHA256.new(verifier.encode('UTF-8')).digest()),
"code_challenge_method": "S256",
"client_id": app_id,
"scope": ' '.join(scope),
"nonce": b64random(16),
"state": b64random(16),
"redirect_uri": 'hcauth://auth/prod',
"redirect_target": 'icore',
"response_type": "code",
"prompt": "login",
"code_challenge": b64(SHA256.new(verifier.encode("UTF-8")).digest()),
"code_challenge_method": "S256",
"client_id": app_id,
"scope": " ".join(scope),
"nonce": b64random(16),
"state": b64random(16),
"redirect_uri": "hcauth://auth/prod",
"redirect_target": "icore",
}
loginpage_url = base_url + 'authorize?' + urlencode(login_query)
token_url = base_url + 'token'
loginpage_url = base_url + "authorize?" + urlencode(login_query)
token_url = base_url + "token"
debug(f"{loginpage_url=}")
r = session.get(loginpage_url)
if r.status_code != requests.codes.ok:
print("error fetching login url!", loginpage_url, r.text, file=sys.stderr)
exit(1)
print("error fetching login url!", loginpage_url, r.text, file=sys.stderr)
exit(1)
# get the session from the text
if not (match := re.search(r'"sessionId" value="(.*?)"', r.text)):
print("Unable to find session id in login page")
exit(1)
print("Unable to find session id in login page")
exit(1)
session_id = match[1]
if not (match := re.search(r'"sessionData" value="(.*?)"', r.text)):
print("Unable to find session data in login page")
exit(1)
print("Unable to find session data in login page")
exit(1)
session_data = match[1]
debug("--------")
# now that we have a session id, contact the
# now that we have a session id, contact the
# single key host to start the new login flow
singlekey_host = 'https://singlekey-id.com'
login_url = singlekey_host + '/auth/en-us/log-in/'
singlekey_host = "https://singlekey-id.com"
login_url = singlekey_host + "/auth/en-us/log-in/"
preauth_url = singlekey_host + "/auth/connect/authorize"
preauth_query = {
"client_id": "11F75C04-21C2-4DA9-A623-228B54E9A256",
"redirect_uri": "https://api.home-connect.com/security/oauth/redirect_target",
"response_type": "code",
"scope": "openid email profile offline_access homeconnect.general",
"prompt": "login",
"style_id": "bsh_hc_01",
"state": '{"session_id":"' + session_id + '"}', # important: no spaces!
"client_id": "11F75C04-21C2-4DA9-A623-228B54E9A256",
"redirect_uri": "https://api.home-connect.com/security/oauth/redirect_target",
"response_type": "code",
"scope": "openid email profile offline_access homeconnect.general",
"prompt": "login",
"style_id": "bsh_hc_01",
"state": '{"session_id":"' + session_id + '"}', # important: no spaces!
}
# fetch the preauth state to get the final callback url
@@ -121,18 +136,18 @@ preauth_url += "?" + urlencode(preauth_query)
# loop until we have the callback url
while True:
debug(f"next {preauth_url=}")
r = session.get(preauth_url, allow_redirects=False)
if r.status_code == 200:
break
if r.status_code > 300 and r.status_code < 400:
preauth_url = r.headers["location"]
# Make relative locations absolute
if not bool(urlparse(preauth_url).netloc):
preauth_url = singlekey_host + preauth_url
continue
print(f"2: {preauth_url=}: failed to fetch {r} {r.text}", file=sys.stderr)
exit(1)
debug(f"next {preauth_url=}")
r = session.get(preauth_url, allow_redirects=False)
if r.status_code == 200:
break
if r.status_code > 300 and r.status_code < 400:
preauth_url = r.headers["location"]
# Make relative locations absolute
if not bool(urlparse(preauth_url).netloc):
preauth_url = singlekey_host + preauth_url
continue
print(f"2: {preauth_url=}: failed to fetch {r} {r.text}", file=sys.stderr)
exit(1)
# get the ReturnUrl from the response
query = parse_qs(urlparse(preauth_url).query)
@@ -140,36 +155,55 @@ return_url = query["ReturnUrl"][0]
debug(f"{return_url=}")
if "X-CSRF-FORM-TOKEN" in r.cookies:
headers["RequestVerificationToken"] = r.cookies["X-CSRF-FORM-TOKEN"]
headers["RequestVerificationToken"] = r.cookies["X-CSRF-FORM-TOKEN"]
session.headers.update(headers)
debug("--------")
soup = BeautifulSoup(r.text, 'html.parser')
requestVerificationToken = soup.find('input', {'name': '__RequestVerificationToken'}).get('value')
r = session.post(preauth_url, data={"UserIdentifierInput.EmailInput.StringValue": email, "__RequestVerificationToken": requestVerificationToken }, allow_redirects=False)
soup = BeautifulSoup(r.text, "html.parser")
requestVerificationToken = soup.find(
"input", {"name": "__RequestVerificationToken"}
).get("value")
r = session.post(
preauth_url,
data={
"UserIdentifierInput.EmailInput.StringValue": email,
"__RequestVerificationToken": requestVerificationToken,
},
allow_redirects=False,
)
password_url = r.headers['location']
password_url = r.headers["location"]
if not bool(urlparse(password_url).netloc):
password_url = singlekey_host + password_url
password_url = singlekey_host + password_url
r = session.get(password_url, allow_redirects=False)
soup = BeautifulSoup(r.text, 'html.parser')
requestVerificationToken = soup.find('input', {'name': '__RequestVerificationToken'}).get('value')
soup = BeautifulSoup(r.text, "html.parser")
requestVerificationToken = soup.find(
"input", {"name": "__RequestVerificationToken"}
).get("value")
r = session.post(password_url, data={"Password": password, "RememberMe": "false", "__RequestVerificationToken": requestVerificationToken }, allow_redirects=False)
r = session.post(
password_url,
data={
"Password": password,
"RememberMe": "false",
"__RequestVerificationToken": requestVerificationToken,
},
allow_redirects=False,
)
if return_url.startswith("/"):
return_url = singlekey_host + return_url
return_url = singlekey_host + return_url
while True:
r = session.get(return_url, allow_redirects=False)
debug(f"{return_url=}, {r} {r.text}")
if r.status_code != 302:
break
return_url = r.headers["location"]
if return_url.startswith("hcauth://"):
break
r = session.get(return_url, allow_redirects=False)
debug(f"{return_url=}, {r} {r.text}")
if r.status_code != 302:
break
return_url = r.headers["location"]
if return_url.startswith("hcauth://"):
break
debug(f"{return_url=}")
debug("--------")
@@ -178,92 +212,92 @@ url = urlparse(return_url)
query = parse_qs(url.query)
code = query.get("code")[0]
state = query.get("state")[0]
grant_type = query.get("grant_type")[0] # "authorization_code"
grant_type = query.get("grant_type")[0] # "authorization_code"
debug(f"{code=} {grant_type=} {state=}")
auth_url = base_url + 'login'
token_url = base_url + 'token'
auth_url = base_url + "login"
token_url = base_url + "token"
token_fields = {
"grant_type": grant_type,
"client_id": app_id,
"code_verifier": verifier,
"code": code,
"redirect_uri": login_query["redirect_uri"],
"grant_type": grant_type,
"client_id": app_id,
"code_verifier": verifier,
"code": code,
"redirect_uri": login_query["redirect_uri"],
}
debug(f"{token_url=} {token_fields=}")
r = requests.post(token_url, data=token_fields, allow_redirects=False)
if r.status_code != requests.codes.ok:
print("Bad code?", file=sys.stderr)
print(r.headers, r.text)
exit(1)
print("Bad code?", file=sys.stderr)
print(r.headers, r.text)
exit(1)
debug('--------- got token page ----------')
debug("--------- got token page ----------")
token = json.loads(r.text)["access_token"]
debug(f"Received access {token=}")
headers = {
"Authorization": "Bearer " + token,
"Authorization": "Bearer " + token,
}
# now we can fetch the rest of the account info
r = requests.get(asset_url + "account/details", headers=headers)
if r.status_code != requests.codes.ok:
print("unable to fetch account details", file=sys.stderr)
print(r.headers, r.text)
exit(1)
print("unable to fetch account details", file=sys.stderr)
print(r.headers, r.text)
exit(1)
#print(r.text)
# print(r.text)
account = json.loads(r.text)
configs = []
print(account, file=sys.stderr)
for app in account["data"]["homeAppliances"]:
app_brand = app["brand"]
app_type = app["type"]
app_id = app["identifier"]
app_brand = app["brand"]
app_type = app["type"]
app_id = app["identifier"]
config = {
"name": app_type.lower(),
}
config = {
"name": app_type.lower(),
}
configs.append(config)
configs.append(config)
if "tls" in app:
# fancy machine with TLS support
config["host"] =app_brand + "-" + app_type + "-" + app_id
config["key"] = app["tls"]["key"]
else:
# less fancy machine with HTTP support
config["host"] = app_id
config["key"] = app["aes"]["key"]
config["iv"] = app["aes"]["iv"]
if "tls" in app:
# fancy machine with TLS support
config["host"] = app_brand + "-" + app_type + "-" + app_id
config["key"] = app["tls"]["key"]
else:
# less fancy machine with HTTP support
config["host"] = app_id
config["key"] = app["aes"]["key"]
config["iv"] = app["aes"]["iv"]
# Fetch the XML zip file for this device
app_url = asset_url + "api/iddf/v1/iddf/" + app_id
print("fetching", app_url, file=sys.stderr)
r = requests.get(app_url, headers=headers)
if r.status_code != requests.codes.ok:
print(app_id, ": unable to fetch machine description?")
next
# Fetch the XML zip file for this device
app_url = asset_url + "api/iddf/v1/iddf/" + app_id
print("fetching", app_url, file=sys.stderr)
r = requests.get(app_url, headers=headers)
if r.status_code != requests.codes.ok:
print(app_id, ": unable to fetch machine description?")
next
# we now have a zip file with XML, let's unpack them
content = r.content
print(app_url + ": " + app_id + ".zip", file=sys.stderr)
with open(app_id + ".zip", "wb") as f:
f.write(content)
z = ZipFile(io.BytesIO(content))
#print(z.infolist())
features = z.open(app_id + "_FeatureMapping.xml").read()
description = z.open(app_id + "_DeviceDescription.xml").read()
# we now have a zip file with XML, let's unpack them
content = r.content
print(app_url + ": " + app_id + ".zip", file=sys.stderr)
with open(app_id + ".zip", "wb") as f:
f.write(content)
z = ZipFile(io.BytesIO(content))
# print(z.infolist())
features = z.open(app_id + "_FeatureMapping.xml").read()
description = z.open(app_id + "_DeviceDescription.xml").read()
machine = xml2json(features, description)
config["description"] = machine["description"]
config["features"] = machine["features"]
machine = xml2json(features, description)
config["description"] = machine["description"]
config["features"] = machine["features"]
print(json.dumps(configs, indent=4))