Wrap hc_login module logic to a main function
This commit is contained in:
@@ -20,5 +20,5 @@ dependencies = [
|
|||||||
]
|
]
|
||||||
|
|
||||||
[project.scripts]
|
[project.scripts]
|
||||||
hc-login = "hcpy:hc_login"
|
hc-login = "hcpy.hc_login:main"
|
||||||
hc2mqtt = "hcpy.hc2mqtt:hc2mqtt"
|
hc2mqtt = "hcpy.hc2mqtt:hc2mqtt"
|
||||||
@@ -31,34 +31,34 @@ from hcpy.HCxml2json import xml2json
|
|||||||
# requests_log.setLevel(logging.DEBUG)
|
# requests_log.setLevel(logging.DEBUG)
|
||||||
# requests_log.propagate = True
|
# requests_log.propagate = True
|
||||||
|
|
||||||
|
def main():
|
||||||
def debug(*args):
|
def debug(*args):
|
||||||
print(*args, file=sys.stderr)
|
print(*args, file=sys.stderr)
|
||||||
|
|
||||||
|
|
||||||
email = sys.argv[1]
|
email = sys.argv[1]
|
||||||
password = sys.argv[2]
|
password = sys.argv[2]
|
||||||
devicefile = sys.argv[3]
|
devicefile = sys.argv[3]
|
||||||
|
|
||||||
headers = {"User-Agent": "hc-login/1.0"}
|
headers = {"User-Agent": "hc-login/1.0"}
|
||||||
|
|
||||||
session = requests.Session()
|
session = requests.Session()
|
||||||
session.headers.update(headers)
|
session.headers.update(headers)
|
||||||
|
|
||||||
base_url = "https://api.home-connect.com/security/oauth/"
|
base_url = "https://api.home-connect.com/security/oauth/"
|
||||||
asset_urls = [
|
asset_urls = [
|
||||||
"https://prod.reu.rest.homeconnectegw.com/", # EU
|
"https://prod.reu.rest.homeconnectegw.com/", # EU
|
||||||
"https://prod.rna.rest.homeconnectegw.com/", # US
|
"https://prod.rna.rest.homeconnectegw.com/", # US
|
||||||
]
|
]
|
||||||
|
|
||||||
#
|
#
|
||||||
# Start by fetching the old login page, which gives
|
# Start by fetching the old login page, which gives
|
||||||
# us the verifier and challenge for getting the token,
|
# us the verifier and challenge for getting the token,
|
||||||
# even after the singlekey detour.
|
# even after the singlekey detour.
|
||||||
#
|
#
|
||||||
# The app_id and scope are hardcoded in the application
|
# The app_id and scope are hardcoded in the application
|
||||||
app_id = "9B75AC9EC512F36C84256AC47D813E2C1DD0D6520DF774B020E1E6E2EB29B1F3"
|
app_id = "9B75AC9EC512F36C84256AC47D813E2C1DD0D6520DF774B020E1E6E2EB29B1F3"
|
||||||
scope = [
|
scope = [
|
||||||
"ReadAccount",
|
"ReadAccount",
|
||||||
"Settings",
|
"Settings",
|
||||||
"IdentifyAppliance",
|
"IdentifyAppliance",
|
||||||
@@ -69,23 +69,23 @@ scope = [
|
|||||||
"Monitor",
|
"Monitor",
|
||||||
"WriteOrigApi",
|
"WriteOrigApi",
|
||||||
"Images",
|
"Images",
|
||||||
]
|
]
|
||||||
scope = [
|
scope = [
|
||||||
"ReadOrigApi",
|
"ReadOrigApi",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
def b64(b):
|
def b64(b):
|
||||||
return re.sub(r"=", "", base64url_encode(b).decode("UTF-8"))
|
return re.sub(r"=", "", base64url_encode(b).decode("UTF-8"))
|
||||||
|
|
||||||
|
|
||||||
def b64random(num):
|
def b64random(num):
|
||||||
return b64(base64url_encode(get_random_bytes(num)))
|
return b64(base64url_encode(get_random_bytes(num)))
|
||||||
|
|
||||||
|
|
||||||
verifier = b64(get_random_bytes(32))
|
verifier = b64(get_random_bytes(32))
|
||||||
|
|
||||||
login_query = {
|
login_query = {
|
||||||
"response_type": "code",
|
"response_type": "code",
|
||||||
"prompt": "login",
|
"prompt": "login",
|
||||||
"code_challenge": b64(SHA256.new(verifier.encode("UTF-8")).digest()),
|
"code_challenge": b64(SHA256.new(verifier.encode("UTF-8")).digest()),
|
||||||
@@ -96,36 +96,36 @@ login_query = {
|
|||||||
"state": b64random(16),
|
"state": b64random(16),
|
||||||
"redirect_uri": "hcauth://auth/prod",
|
"redirect_uri": "hcauth://auth/prod",
|
||||||
"redirect_target": "icore",
|
"redirect_target": "icore",
|
||||||
}
|
}
|
||||||
|
|
||||||
loginpage_url = base_url + "authorize?" + urlencode(login_query)
|
loginpage_url = base_url + "authorize?" + urlencode(login_query)
|
||||||
token_url = base_url + "token"
|
token_url = base_url + "token"
|
||||||
|
|
||||||
debug(f"{loginpage_url=}")
|
debug(f"{loginpage_url=}")
|
||||||
r = session.get(loginpage_url)
|
r = session.get(loginpage_url)
|
||||||
if r.status_code != requests.codes.ok:
|
if r.status_code != requests.codes.ok:
|
||||||
print("error fetching login url!", loginpage_url, r.text, file=sys.stderr)
|
print("error fetching login url!", loginpage_url, r.text, file=sys.stderr)
|
||||||
exit(1)
|
exit(1)
|
||||||
|
|
||||||
# get the session from the text
|
# get the session from the text
|
||||||
if not (match := re.search(r'"sessionId" value="(.*?)"', r.text)):
|
if not (match := re.search(r'"sessionId" value="(.*?)"', r.text)):
|
||||||
print("Unable to find session id in login page")
|
print("Unable to find session id in login page")
|
||||||
exit(1)
|
exit(1)
|
||||||
session_id = match[1]
|
session_id = match[1]
|
||||||
if not (match := re.search(r'"sessionData" value="(.*?)"', r.text)):
|
if not (match := re.search(r'"sessionData" value="(.*?)"', r.text)):
|
||||||
print("Unable to find session data in login page")
|
print("Unable to find session data in login page")
|
||||||
exit(1)
|
exit(1)
|
||||||
session_data = match[1]
|
session_data = match[1]
|
||||||
|
|
||||||
debug("--------")
|
debug("--------")
|
||||||
|
|
||||||
# now that we have a session id, contact the
|
# now that we have a session id, contact the
|
||||||
# single key host to start the new login flow
|
# single key host to start the new login flow
|
||||||
singlekey_host = "https://singlekey-id.com"
|
singlekey_host = "https://singlekey-id.com"
|
||||||
login_url = singlekey_host + "/auth/en-us/log-in/"
|
login_url = singlekey_host + "/auth/en-us/log-in/"
|
||||||
|
|
||||||
preauth_url = singlekey_host + "/auth/connect/authorize"
|
preauth_url = singlekey_host + "/auth/connect/authorize"
|
||||||
preauth_query = {
|
preauth_query = {
|
||||||
"client_id": "11F75C04-21C2-4DA9-A623-228B54E9A256",
|
"client_id": "11F75C04-21C2-4DA9-A623-228B54E9A256",
|
||||||
"redirect_uri": "https://api.home-connect.com/security/oauth/redirect_target",
|
"redirect_uri": "https://api.home-connect.com/security/oauth/redirect_target",
|
||||||
"response_type": "code",
|
"response_type": "code",
|
||||||
@@ -133,13 +133,13 @@ preauth_query = {
|
|||||||
"prompt": "login",
|
"prompt": "login",
|
||||||
"style_id": "bsh_hc_01",
|
"style_id": "bsh_hc_01",
|
||||||
"state": '{"session_id":"' + session_id + '"}', # important: no spaces!
|
"state": '{"session_id":"' + session_id + '"}', # important: no spaces!
|
||||||
}
|
}
|
||||||
|
|
||||||
# fetch the preauth state to get the final callback url
|
# fetch the preauth state to get the final callback url
|
||||||
preauth_url += "?" + urlencode(preauth_query)
|
preauth_url += "?" + urlencode(preauth_query)
|
||||||
|
|
||||||
# loop until we have the callback url
|
# loop until we have the callback url
|
||||||
while True:
|
while True:
|
||||||
debug(f"next {preauth_url=}")
|
debug(f"next {preauth_url=}")
|
||||||
r = session.get(preauth_url, allow_redirects=False)
|
r = session.get(preauth_url, allow_redirects=False)
|
||||||
if r.status_code == 200:
|
if r.status_code == 200:
|
||||||
@@ -153,37 +153,37 @@ while True:
|
|||||||
print(f"2: {preauth_url=}: failed to fetch {r} {r.text}", file=sys.stderr)
|
print(f"2: {preauth_url=}: failed to fetch {r} {r.text}", file=sys.stderr)
|
||||||
exit(1)
|
exit(1)
|
||||||
|
|
||||||
# get the ReturnUrl from the response
|
# get the ReturnUrl from the response
|
||||||
query = parse_qs(urlparse(preauth_url).query)
|
query = parse_qs(urlparse(preauth_url).query)
|
||||||
return_url = query["ReturnUrl"][0]
|
return_url = query["ReturnUrl"][0]
|
||||||
debug(f"{return_url=}")
|
debug(f"{return_url=}")
|
||||||
|
|
||||||
if "X-CSRF-FORM-TOKEN" in r.cookies:
|
if "X-CSRF-FORM-TOKEN" in r.cookies:
|
||||||
headers["RequestVerificationToken"] = r.cookies["X-CSRF-FORM-TOKEN"]
|
headers["RequestVerificationToken"] = r.cookies["X-CSRF-FORM-TOKEN"]
|
||||||
session.headers.update(headers)
|
session.headers.update(headers)
|
||||||
|
|
||||||
debug("--------")
|
debug("--------")
|
||||||
|
|
||||||
soup = BeautifulSoup(r.text, "html.parser")
|
soup = BeautifulSoup(r.text, "html.parser")
|
||||||
requestVerificationToken = soup.find("input", {"name": "__RequestVerificationToken"}).get("value")
|
requestVerificationToken = soup.find("input", {"name": "__RequestVerificationToken"}).get("value")
|
||||||
r = session.post(
|
r = session.post(
|
||||||
preauth_url,
|
preauth_url,
|
||||||
data={
|
data={
|
||||||
"UserIdentifierInput.EmailInput.StringValue": email,
|
"UserIdentifierInput.EmailInput.StringValue": email,
|
||||||
"__RequestVerificationToken": requestVerificationToken,
|
"__RequestVerificationToken": requestVerificationToken,
|
||||||
},
|
},
|
||||||
allow_redirects=False,
|
allow_redirects=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
password_url = r.headers["location"]
|
password_url = r.headers["location"]
|
||||||
if not bool(urlparse(password_url).netloc):
|
if not bool(urlparse(password_url).netloc):
|
||||||
password_url = singlekey_host + password_url
|
password_url = singlekey_host + password_url
|
||||||
|
|
||||||
r = session.get(password_url, allow_redirects=False)
|
r = session.get(password_url, allow_redirects=False)
|
||||||
soup = BeautifulSoup(r.text, "html.parser")
|
soup = BeautifulSoup(r.text, "html.parser")
|
||||||
requestVerificationToken = soup.find("input", {"name": "__RequestVerificationToken"}).get("value")
|
requestVerificationToken = soup.find("input", {"name": "__RequestVerificationToken"}).get("value")
|
||||||
|
|
||||||
r = session.post(
|
r = session.post(
|
||||||
password_url,
|
password_url,
|
||||||
data={
|
data={
|
||||||
"Password": password,
|
"Password": password,
|
||||||
@@ -191,9 +191,9 @@ r = session.post(
|
|||||||
"__RequestVerificationToken": requestVerificationToken,
|
"__RequestVerificationToken": requestVerificationToken,
|
||||||
},
|
},
|
||||||
allow_redirects=False,
|
allow_redirects=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
if return_url.startswith("/"):
|
if return_url.startswith("/"):
|
||||||
return_url = singlekey_host + return_url
|
return_url = singlekey_host + return_url
|
||||||
r = session.get(return_url, allow_redirects=False)
|
r = session.get(return_url, allow_redirects=False)
|
||||||
@@ -203,14 +203,14 @@ while True:
|
|||||||
return_url = r.headers["location"]
|
return_url = r.headers["location"]
|
||||||
if return_url.startswith("hcauth://"):
|
if return_url.startswith("hcauth://"):
|
||||||
break
|
break
|
||||||
debug(f"{return_url=}")
|
debug(f"{return_url=}")
|
||||||
|
|
||||||
debug("--------")
|
debug("--------")
|
||||||
|
|
||||||
url = urlparse(return_url)
|
url = urlparse(return_url)
|
||||||
query = parse_qs(url.query)
|
query = parse_qs(url.query)
|
||||||
|
|
||||||
if query.get("ReturnUrl") is not None:
|
if query.get("ReturnUrl") is not None:
|
||||||
print("Wrong credentials.")
|
print("Wrong credentials.")
|
||||||
print(
|
print(
|
||||||
"If you forgot your login/password, you can restore them by opening "
|
"If you forgot your login/password, you can restore them by opening "
|
||||||
@@ -218,60 +218,60 @@ if query.get("ReturnUrl") is not None:
|
|||||||
)
|
)
|
||||||
exit(1)
|
exit(1)
|
||||||
|
|
||||||
code = query.get("code")[0]
|
code = query.get("code")[0]
|
||||||
state = query.get("state")[0]
|
state = query.get("state")[0]
|
||||||
grant_type = query.get("grant_type")[0] # "authorization_code"
|
grant_type = query.get("grant_type")[0] # "authorization_code"
|
||||||
|
|
||||||
debug(f"{code=} {grant_type=} {state=}")
|
debug(f"{code=} {grant_type=} {state=}")
|
||||||
|
|
||||||
auth_url = base_url + "login"
|
auth_url = base_url + "login"
|
||||||
token_url = base_url + "token"
|
token_url = base_url + "token"
|
||||||
|
|
||||||
token_fields = {
|
token_fields = {
|
||||||
"grant_type": grant_type,
|
"grant_type": grant_type,
|
||||||
"client_id": app_id,
|
"client_id": app_id,
|
||||||
"code_verifier": verifier,
|
"code_verifier": verifier,
|
||||||
"code": code,
|
"code": code,
|
||||||
"redirect_uri": login_query["redirect_uri"],
|
"redirect_uri": login_query["redirect_uri"],
|
||||||
}
|
}
|
||||||
|
|
||||||
debug(f"{token_url=} {token_fields=}")
|
debug(f"{token_url=} {token_fields=}")
|
||||||
|
|
||||||
r = requests.post(token_url, data=token_fields, allow_redirects=False)
|
r = requests.post(token_url, data=token_fields, allow_redirects=False)
|
||||||
if r.status_code != requests.codes.ok:
|
if r.status_code != requests.codes.ok:
|
||||||
print("Bad code?", file=sys.stderr)
|
print("Bad code?", file=sys.stderr)
|
||||||
print(r.headers, r.text)
|
print(r.headers, r.text)
|
||||||
exit(1)
|
exit(1)
|
||||||
|
|
||||||
debug("--------- got token page ----------")
|
debug("--------- got token page ----------")
|
||||||
|
|
||||||
token = json.loads(r.text)["access_token"]
|
token = json.loads(r.text)["access_token"]
|
||||||
debug(f"Received access {token=}")
|
debug(f"Received access {token=}")
|
||||||
|
|
||||||
headers = {
|
headers = {
|
||||||
"Authorization": "Bearer " + token,
|
"Authorization": "Bearer " + token,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
# Try to request account details from all geos. Whichever works, we'll use next.
|
# Try to request account details from all geos. Whichever works, we'll use next.
|
||||||
for asset_url in asset_urls:
|
for asset_url in asset_urls:
|
||||||
r = requests.get(asset_url + "account/details", headers=headers)
|
r = requests.get(asset_url + "account/details", headers=headers)
|
||||||
if r.status_code == requests.codes.ok:
|
if r.status_code == requests.codes.ok:
|
||||||
break
|
break
|
||||||
|
|
||||||
# now we can fetch the rest of the account info
|
# now we can fetch the rest of the account info
|
||||||
if r.status_code != requests.codes.ok:
|
if r.status_code != requests.codes.ok:
|
||||||
print("unable to fetch account details", file=sys.stderr)
|
print("unable to fetch account details", file=sys.stderr)
|
||||||
print(r.headers, r.text)
|
print(r.headers, r.text)
|
||||||
exit(1)
|
exit(1)
|
||||||
|
|
||||||
# print(r.text)
|
# print(r.text)
|
||||||
account = json.loads(r.text)
|
account = json.loads(r.text)
|
||||||
configs = []
|
configs = []
|
||||||
|
|
||||||
print(account, file=sys.stderr)
|
print(account, file=sys.stderr)
|
||||||
|
|
||||||
for app in account["data"]["homeAppliances"]:
|
for app in account["data"]["homeAppliances"]:
|
||||||
app_brand = app["brand"]
|
app_brand = app["brand"]
|
||||||
app_type = app["type"]
|
app_type = app["type"]
|
||||||
app_id = app["identifier"]
|
app_id = app["identifier"]
|
||||||
@@ -315,11 +315,14 @@ for app in account["data"]["homeAppliances"]:
|
|||||||
config["features"] = augment_device_features(machine["features"])
|
config["features"] = augment_device_features(machine["features"])
|
||||||
print("Discovered device: " + config["name"] + " - Device hostname: " + config["host"])
|
print("Discovered device: " + config["name"] + " - Device hostname: " + config["host"])
|
||||||
|
|
||||||
with open(devicefile, "w") as f:
|
with open(devicefile, "w") as f:
|
||||||
json.dump(configs, f, ensure_ascii=True, indent=4)
|
json.dump(configs, f, ensure_ascii=True, indent=4)
|
||||||
|
|
||||||
print(
|
print(
|
||||||
"Success. You can now edit "
|
"Success. You can now edit "
|
||||||
+ devicefile
|
+ devicefile
|
||||||
+ ", if needed, and run hc2mqtt.py or start Home Assistant addon again"
|
+ ", if needed, and run hc2mqtt.py or start Home Assistant addon again"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|||||||
Reference in New Issue
Block a user