Merge pull request #16 from osresearch/singlekey
hc-login: hacks to work with the new singlekey login system
This commit is contained in:
208
hc-login
208
hc-login
@@ -18,15 +18,45 @@ from Crypto.Hash import SHA256
|
||||
from zipfile import ZipFile
|
||||
from HCxml2json import xml2json
|
||||
|
||||
import logging
|
||||
|
||||
# These two lines enable debugging at httplib level (requests->urllib3->http.client)
|
||||
# You will see the REQUEST, including HEADERS and DATA, and RESPONSE with HEADERS but without DATA.
|
||||
# The only thing missing will be the response.body which is not logged.
|
||||
import http.client as http_client
|
||||
#http_client.HTTPConnection.debuglevel = 1
|
||||
|
||||
# You must initialize logging, otherwise you'll not see debug output.
|
||||
#logging.basicConfig()
|
||||
#logging.getLogger().setLevel(logging.DEBUG)
|
||||
#requests_log = logging.getLogger("requests.packages.urllib3")
|
||||
#requests_log.setLevel(logging.DEBUG)
|
||||
#requests_log.propagate = True
|
||||
|
||||
def debug(*args):
|
||||
print(*args, file=sys.stderr)
|
||||
|
||||
email = sys.argv[1]
|
||||
password = sys.argv[2]
|
||||
|
||||
headers = {"User-Agent": "hc-login/1.0"}
|
||||
|
||||
session = requests.Session()
|
||||
session.headers.update(headers)
|
||||
|
||||
base_url = 'https://api.home-connect.com/security/oauth/'
|
||||
asset_url = 'https://prod.reu.rest.homeconnectegw.com/'
|
||||
|
||||
##############3
|
||||
#
|
||||
# Start by fetching the old login page, which gives
|
||||
# us the verifier and challenge for getting the token,
|
||||
# even after the singlekey detour.
|
||||
#
|
||||
# The app_id and scope are hardcoded in the application
|
||||
app_id = '9B75AC9EC512F36C84256AC47D813E2C1DD0D6520DF774B020E1E6E2EB29B1F3'
|
||||
scope = ["ReadAccount","Settings","IdentifyAppliance","Control","DeleteAppliance","WriteAppliance","ReadOrigApi","Monitor","WriteOrigApi","Images"]
|
||||
scope = ["ReadOrigApi",]
|
||||
|
||||
def b64(b):
|
||||
return re.sub(r'=', '', base64url_encode(b).decode('UTF-8'))
|
||||
@@ -49,68 +79,124 @@ login_query = {
|
||||
}
|
||||
|
||||
loginpage_url = base_url + 'authorize?' + urlencode(login_query)
|
||||
token_url = base_url + 'token'
|
||||
|
||||
debug(f"{loginpage_url=}")
|
||||
r = session.get(loginpage_url)
|
||||
if r.status_code != requests.codes.ok:
|
||||
print("error fetching login url!", loginpage_url, r.text, file=sys.stderr)
|
||||
exit(1)
|
||||
|
||||
# get the session from the text
|
||||
if not (match := re.search(r'"sessionId" value="(.*?)"', r.text)):
|
||||
print("Unable to find session id in login page")
|
||||
exit(1)
|
||||
session_id = match[1]
|
||||
if not (match := re.search(r'"sessionData" value="(.*?)"', r.text)):
|
||||
print("Unable to find session data in login page")
|
||||
exit(1)
|
||||
session_data = match[1]
|
||||
|
||||
debug("--------")
|
||||
|
||||
# now that we have a session id, contact the
|
||||
# single key host to start the new login flow
|
||||
singlekey_host = 'https://singlekey-id.com'
|
||||
login_url = singlekey_host + '/auth/en-us/log-in/'
|
||||
|
||||
preauth_url = singlekey_host + "/auth/connect/authorize"
|
||||
preauth_query = {
|
||||
"client_id": "11F75C04-21C2-4DA9-A623-228B54E9A256",
|
||||
"redirect_uri": "https://api.home-connect.com/security/oauth/redirect_target",
|
||||
"response_type": "code",
|
||||
"scope": "openid email profile offline_access homeconnect.general",
|
||||
"prompt": "login",
|
||||
"style_id": "bsh_hc_01",
|
||||
"state": '{"session_id":"' + session_id + '"}', # important: no spaces!
|
||||
}
|
||||
|
||||
# fetch the preauth state to get the final callback url
|
||||
preauth_url += "?" + urlencode(preauth_query)
|
||||
|
||||
# loop until we have the callback url
|
||||
while True:
|
||||
debug(f"next {preauth_url=}")
|
||||
r = session.get(preauth_url, allow_redirects=False)
|
||||
if r.status_code == 200:
|
||||
break
|
||||
if r.status_code == 302 or r.status_code == 301:
|
||||
preauth_url = r.headers["location"]
|
||||
continue
|
||||
print(f"2: {preauth_url=}: failed to fetch {r} {r.text}", file=sys.stderr)
|
||||
exit(1)
|
||||
|
||||
# get the ReturnUrl from the response
|
||||
query = parse_qs(urlparse(preauth_url).query)
|
||||
return_url = query["ReturnUrl"][0]
|
||||
debug(f"{return_url=}")
|
||||
|
||||
headers["RequestVerificationToken"] = r.cookies["X-CSRF-FORM-TOKEN"]
|
||||
session.headers.update(headers)
|
||||
|
||||
debug("--------")
|
||||
|
||||
valid_url = singlekey_host + '/auth/api/v1/authentication/UserExists'
|
||||
auth_url = singlekey_host + '/auth/api/v1/authentication/login'
|
||||
|
||||
r = session.post(valid_url, json={"username": email})
|
||||
debug(f"{valid_url=}: {r} {r.text}")
|
||||
|
||||
|
||||
login_fields = {
|
||||
"username": email,
|
||||
"password": password,
|
||||
"keepMeSignedIn": False,
|
||||
"returnUrl": return_url,
|
||||
}
|
||||
|
||||
r = session.post(auth_url, json=login_fields, allow_redirects=False)
|
||||
|
||||
if r.status_code != 200:
|
||||
debug(f"auth failed: {auth_url=}, {login_fields=} {r} {r.text}")
|
||||
exit(-1)
|
||||
|
||||
debug(f"{auth_url=}, {r} {r.text}")
|
||||
return_url = json.loads(r.text)["returnUrl"]
|
||||
if return_url.startswith("/"):
|
||||
return_url = singlekey_host + return_url
|
||||
|
||||
while True:
|
||||
r = session.get(return_url, allow_redirects=False)
|
||||
debug(f"{return_url=}, {r} {r.text}")
|
||||
if r.status_code != 302:
|
||||
break
|
||||
return_url = r.headers["location"]
|
||||
if return_url.startswith("hcauth://"):
|
||||
break
|
||||
debug(f"{return_url=}")
|
||||
|
||||
debug("--------")
|
||||
|
||||
url = urlparse(return_url)
|
||||
query = parse_qs(url.query)
|
||||
code = query.get("code")[0]
|
||||
state = query.get("state")[0]
|
||||
grant_type = query.get("grant_type")[0] # "authorization_code"
|
||||
|
||||
debug(f"{code=} {grant_type=} {state=}")
|
||||
|
||||
auth_url = base_url + 'login'
|
||||
token_url = base_url + 'token'
|
||||
|
||||
r = requests.get(loginpage_url)
|
||||
if r.status_code != requests.codes.ok:
|
||||
print("error fetching login url!", login_url, file=sys.stderr)
|
||||
exit(1)
|
||||
loginpage = r.text
|
||||
|
||||
#print('--------- got login page ----------')
|
||||
|
||||
#with open("login.html") as fd:
|
||||
# loginpage = fd.read()
|
||||
|
||||
tree = html.fromstring(loginpage)
|
||||
|
||||
# add in the email and password
|
||||
auth_fields = {
|
||||
"email": email,
|
||||
"password": password,
|
||||
"code_challenge": login_query["code_challenge"],
|
||||
"code_challenge_method": login_query["code_challenge_method"],
|
||||
"redirect_uri": login_query["redirect_uri"],
|
||||
}
|
||||
|
||||
for form in tree.forms:
|
||||
if form.attrib.get("id") != "login_form":
|
||||
continue
|
||||
for field in form.fields:
|
||||
if field not in auth_fields:
|
||||
auth_fields[field] = form.fields.get(field)
|
||||
|
||||
#print(auth_fields)
|
||||
|
||||
# try to submit the form and get the redirect URL with the token
|
||||
r = requests.post(auth_url, data=auth_fields, allow_redirects=False)
|
||||
if r.status_code != 302:
|
||||
print("Did not get a redirect; wrong username/password?", file=sys.stderr)
|
||||
print(r.text)
|
||||
exit(1)
|
||||
|
||||
# Yes!
|
||||
location = r.headers["location"]
|
||||
url = urlparse(location)
|
||||
query = parse_qs(url.query)
|
||||
#print("response:", location, query)
|
||||
code = query.get("code")
|
||||
if not code:
|
||||
print("Unable to find code in response?", location, file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
#print('--------- got code page ----------')
|
||||
|
||||
token_fields = {
|
||||
"grant_type": "authorization_code",
|
||||
"grant_type": grant_type,
|
||||
"client_id": app_id,
|
||||
"code_verifier": verifier,
|
||||
"code": code[0],
|
||||
"code": code,
|
||||
"redirect_uri": login_query["redirect_uri"],
|
||||
}
|
||||
|
||||
#print(token_fields)
|
||||
debug(f"{token_url=} {token_fields=}")
|
||||
|
||||
r = requests.post(token_url, data=token_fields, allow_redirects=False)
|
||||
if r.status_code != requests.codes.ok:
|
||||
@@ -118,13 +204,11 @@ if r.status_code != requests.codes.ok:
|
||||
print(r.headers, r.text)
|
||||
exit(1)
|
||||
|
||||
#print('--------- got token page ----------')
|
||||
|
||||
# Yes!
|
||||
#print(r.text)
|
||||
debug('--------- got token page ----------')
|
||||
|
||||
token = json.loads(r.text)["access_token"]
|
||||
print("Received access token", file=sys.stderr)
|
||||
debug(f"Received access {token=}")
|
||||
|
||||
headers = {
|
||||
"Authorization": "Bearer " + token,
|
||||
}
|
||||
@@ -140,6 +224,8 @@ if r.status_code != requests.codes.ok:
|
||||
account = json.loads(r.text)
|
||||
configs = []
|
||||
|
||||
print(account, file=sys.stderr)
|
||||
|
||||
for app in account["data"]["homeAppliances"]:
|
||||
app_brand = app["brand"]
|
||||
app_type = app["type"]
|
||||
@@ -170,7 +256,11 @@ for app in account["data"]["homeAppliances"]:
|
||||
next
|
||||
|
||||
# we now have a zip file with XML, let's unpack them
|
||||
z = ZipFile(io.BytesIO(r.content))
|
||||
content = r.content
|
||||
print(app_url + ": " + app_id + ".zip", file=sys.stderr)
|
||||
with open(app_id + ".zip", "wb") as f:
|
||||
f.write(content)
|
||||
z = ZipFile(io.BytesIO(content))
|
||||
#print(z.infolist())
|
||||
features = z.open(app_id + "_FeatureMapping.xml").read()
|
||||
description = z.open(app_id + "_DeviceDescription.xml").read()
|
||||
|
||||
Reference in New Issue
Block a user