Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
b28bb18287
|
|||
|
088643cae5
|
|||
|
89e4c5e7f0
|
|||
|
e553adc033
|
|||
|
f77d2a6efa
|
|||
|
9c9f9b5d14
|
22
flake.lock
generated
22
flake.lock
generated
@@ -2,7 +2,9 @@
|
||||
"nodes": {
|
||||
"dream2nix": {
|
||||
"inputs": {
|
||||
"nixpkgs": "nixpkgs",
|
||||
"nixpkgs": [
|
||||
"nixpkgs"
|
||||
],
|
||||
"purescript-overlay": "purescript-overlay",
|
||||
"pyproject-nix": "pyproject-nix"
|
||||
},
|
||||
@@ -38,18 +40,17 @@
|
||||
},
|
||||
"nixpkgs": {
|
||||
"locked": {
|
||||
"lastModified": 1729850857,
|
||||
"narHash": "sha256-WvLXzNNnnw+qpFOmgaM3JUlNEH+T4s22b5i2oyyCpXE=",
|
||||
"lastModified": 1732014248,
|
||||
"narHash": "sha256-y/MEyuJ5oBWrWAic/14LaIr/u5E0wRVzyYsouYY3W6w=",
|
||||
"owner": "NixOS",
|
||||
"repo": "nixpkgs",
|
||||
"rev": "41dea55321e5a999b17033296ac05fe8a8b5a257",
|
||||
"rev": "23e89b7da85c3640bbc2173fe04f4bd114342367",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "NixOS",
|
||||
"ref": "nixpkgs-unstable",
|
||||
"repo": "nixpkgs",
|
||||
"type": "github"
|
||||
"id": "nixpkgs",
|
||||
"ref": "nixos-unstable",
|
||||
"type": "indirect"
|
||||
}
|
||||
},
|
||||
"purescript-overlay": {
|
||||
@@ -95,10 +96,7 @@
|
||||
"root": {
|
||||
"inputs": {
|
||||
"dream2nix": "dream2nix",
|
||||
"nixpkgs": [
|
||||
"dream2nix",
|
||||
"nixpkgs"
|
||||
]
|
||||
"nixpkgs": "nixpkgs"
|
||||
}
|
||||
},
|
||||
"slimlock": {
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
{
|
||||
inputs = {
|
||||
nixpkgs.url = "nixpkgs/nixos-unstable";
|
||||
dream2nix.url = "github:nix-community/dream2nix";
|
||||
nixpkgs.follows = "dream2nix/nixpkgs";
|
||||
dream2nix.inputs.nixpkgs.follows = "nixpkgs";
|
||||
};
|
||||
|
||||
outputs = {
|
||||
@@ -16,6 +17,11 @@
|
||||
"x86_64-linux"
|
||||
];
|
||||
in {
|
||||
nixosModules = rec {
|
||||
hcpy = import ./module.nix self;
|
||||
default = hcpy;
|
||||
};
|
||||
|
||||
packages = eachSystem (system: let
|
||||
pkgs = nixpkgs.legacyPackages.${system};
|
||||
in rec {
|
||||
@@ -25,6 +31,7 @@
|
||||
./package.nix
|
||||
{
|
||||
paths.projectRootFile = "flake.nix";
|
||||
paths.lockFile = "lock.${system}.json";
|
||||
paths.projectRoot = ./.;
|
||||
paths.package = ./.;
|
||||
}
|
||||
|
||||
@@ -24,9 +24,9 @@
|
||||
},
|
||||
"charset-normalizer": {
|
||||
"is_direct": false,
|
||||
"sha256": "8cda06946eac330cbe6598f77bb54e690b4ca93f593dee1568ad22b04f347c15",
|
||||
"sha256": "7f683ddc7eedd742e2889d2bfb96d69573fde1d92fcb811979cdb7165bb9c7d3",
|
||||
"type": "url",
|
||||
"url": "https://files.pythonhosted.org/packages/16/92/92a76dc2ff3a12e69ba94e7e05168d37d0345fa08c87e1fe24d0c2a42223/charset_normalizer-3.4.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
|
||||
"url": "https://files.pythonhosted.org/packages/f8/01/344ec40cf5d85c1da3c1f57566c59e0c9b56bcc5566c08804a95a6cc8257/charset_normalizer-3.4.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
|
||||
"version": "3.4.0"
|
||||
},
|
||||
"click": {
|
||||
@@ -59,9 +59,9 @@
|
||||
},
|
||||
"lxml": {
|
||||
"is_direct": false,
|
||||
"sha256": "3879cc6ce938ff4eb4900d901ed63555c778731a96365e53fadb36437a131a99",
|
||||
"sha256": "b1c8c20847b9f34e98080da785bb2336ea982e7f913eed5809e5a3c872900f32",
|
||||
"type": "url",
|
||||
"url": "https://files.pythonhosted.org/packages/0a/6e/94537acfb5b8f18235d13186d247bca478fea5e87d224644e0fe907df976/lxml-5.3.0-cp312-cp312-manylinux_2_28_x86_64.whl",
|
||||
"url": "https://files.pythonhosted.org/packages/05/9e/87492d03ff604fbf656ed2bf3e2e8d28f5d58ea1f00ff27ac27b06509079/lxml-5.3.0-cp310-cp310-manylinux_2_28_x86_64.whl",
|
||||
"version": "5.3.0"
|
||||
},
|
||||
"paho-mqtt": {
|
||||
@@ -155,5 +155,5 @@
|
||||
}
|
||||
}
|
||||
},
|
||||
"invalidationHash": "8b77dd84722cef2a9d624d9674be2397c777c8b180b813df1327f67470dd3476"
|
||||
"invalidationHash": "ffec585af8e2f405564c1aa26829f11dbf9265334a938576ea9f416033be7da5"
|
||||
}
|
||||
104
module.nix
Normal file
104
module.nix
Normal file
@@ -0,0 +1,104 @@
|
||||
self: {
|
||||
config,
|
||||
lib,
|
||||
pkgs,
|
||||
system,
|
||||
...
|
||||
}:
|
||||
with lib; let
|
||||
cfg = config.services.hcpy;
|
||||
bridgeConfig =
|
||||
cfg.config
|
||||
// {
|
||||
devices_file = cfg.devicesFile;
|
||||
};
|
||||
hcpy = pkgs.writeShellApplication {
|
||||
name = "hcpy-service";
|
||||
runtimeInputs = [self.packages.${system}.default];
|
||||
text = ''
|
||||
if [ ! -f "${cfg.devicesFile}" ]; then
|
||||
echo "Device file does not exist, requesting login to HomeConnect cloud";
|
||||
hcpy login "$(cat "${cfg.homeconnect.usernameFile}")" \
|
||||
"$(cat "${cfg.homeconnect.passwordFile}")" \
|
||||
"${cfg.devicesFile}";
|
||||
fi
|
||||
|
||||
hcpy run ${cli.toGNUCommandLineShell {} bridgeConfig} \
|
||||
--mqtt_username "$(cat "${cfg.mqtt.usernameFile}")" \
|
||||
--mqtt_password "$(cat "${cfg.mqtt.passwordFile}")"
|
||||
'';
|
||||
};
|
||||
in {
|
||||
options.services.hcpy = {
|
||||
enable = mkEnableOption "HomeConnect bridge service";
|
||||
|
||||
homeconnect = {
|
||||
usernameFile = mkOption {
|
||||
type = types.path;
|
||||
description = "Path to file containing single line with HomeConnect account username";
|
||||
example = "/run/hc.username.key";
|
||||
};
|
||||
|
||||
passwordFile = mkOption {
|
||||
type = types.path;
|
||||
description = "Path to file containing single line with HomeConnect account password";
|
||||
example = "/run/hc.password.key";
|
||||
};
|
||||
};
|
||||
|
||||
mqtt = {
|
||||
usernameFile = mkOption {
|
||||
type = types.path;
|
||||
description = "Path to file containing single line with MQTT client username";
|
||||
example = "/run/mqtt.username.key";
|
||||
};
|
||||
|
||||
passwordFile = mkOption {
|
||||
type = types.path;
|
||||
description = "Path to file containing single line with MQTT client password";
|
||||
example = "/run/mqtt.password.key";
|
||||
};
|
||||
};
|
||||
|
||||
devicesFile = mkOption {
|
||||
type = types.path;
|
||||
description = "Path to JSON file fetched from HomeConnect API";
|
||||
example = "/var/lib/hcpy/devices.json";
|
||||
};
|
||||
|
||||
config = mkOption {
|
||||
type = types.attrs;
|
||||
description = "The configuration of hcpy bridge";
|
||||
example = {
|
||||
mqtt_host = "localhost";
|
||||
mqtt_prefix = "homeconnect/";
|
||||
mqtt_port = 8883;
|
||||
mqtt_ssl = true;
|
||||
mqtt_cafile = "/run/ca.pem";
|
||||
mqtt_certfile = "/run/cert.crt";
|
||||
mqtt_keyfile = "/run/cert.key";
|
||||
mqtt_clientname = "hcpy1";
|
||||
domain_suffix = "";
|
||||
debug = true;
|
||||
ha-discovery = true;
|
||||
};
|
||||
};
|
||||
};
|
||||
|
||||
config = mkIf cfg.enable {
|
||||
systemd.services.hcpy = {
|
||||
enable = true;
|
||||
description = "HomeConnect bridge service";
|
||||
|
||||
wantedBy = ["multi-user.target"];
|
||||
wants = ["network-online.target"];
|
||||
after = ["network-online.target"];
|
||||
|
||||
serviceConfig = {
|
||||
ExecStart = "${hcpy}/bin/hcpy-service";
|
||||
Restart = "on-failure";
|
||||
RestartSec = 10;
|
||||
};
|
||||
};
|
||||
};
|
||||
}
|
||||
@@ -14,7 +14,7 @@ in rec {
|
||||
inherit (pyproject.project) name version;
|
||||
|
||||
deps = {nixpkgs, ...}: {
|
||||
python = nixpkgs.python3;
|
||||
python = nixpkgs.python310;
|
||||
inherit (nixpkgs) openssl;
|
||||
};
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta"
|
||||
name = "hcpy"
|
||||
version = "0.1.5"
|
||||
description = "HC to MQTT bridge"
|
||||
requires-python = ">=3.7"
|
||||
requires-python = "==3.10"
|
||||
dependencies = [
|
||||
"bs4",
|
||||
"requests",
|
||||
@@ -20,5 +20,5 @@ dependencies = [
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
hc-login = "hcpy:hc_login"
|
||||
hc-login = "hcpy.hc_login:main"
|
||||
hc2mqtt = "hcpy.hc2mqtt:hc2mqtt"
|
||||
@@ -31,295 +31,298 @@ from hcpy.HCxml2json import xml2json
|
||||
# requests_log.setLevel(logging.DEBUG)
|
||||
# requests_log.propagate = True
|
||||
|
||||
|
||||
def debug(*args):
|
||||
print(*args, file=sys.stderr)
|
||||
def main():
|
||||
def debug(*args):
|
||||
print(*args, file=sys.stderr)
|
||||
|
||||
|
||||
email = sys.argv[1]
|
||||
password = sys.argv[2]
|
||||
devicefile = sys.argv[3]
|
||||
email = sys.argv[1]
|
||||
password = sys.argv[2]
|
||||
devicefile = sys.argv[3]
|
||||
|
||||
headers = {"User-Agent": "hc-login/1.0"}
|
||||
headers = {"User-Agent": "hc-login/1.0"}
|
||||
|
||||
session = requests.Session()
|
||||
session.headers.update(headers)
|
||||
session = requests.Session()
|
||||
session.headers.update(headers)
|
||||
|
||||
base_url = "https://api.home-connect.com/security/oauth/"
|
||||
asset_urls = [
|
||||
"https://prod.reu.rest.homeconnectegw.com/", # EU
|
||||
"https://prod.rna.rest.homeconnectegw.com/", # US
|
||||
]
|
||||
base_url = "https://api.home-connect.com/security/oauth/"
|
||||
asset_urls = [
|
||||
"https://prod.reu.rest.homeconnectegw.com/", # EU
|
||||
"https://prod.rna.rest.homeconnectegw.com/", # US
|
||||
]
|
||||
|
||||
#
|
||||
# Start by fetching the old login page, which gives
|
||||
# us the verifier and challenge for getting the token,
|
||||
# even after the singlekey detour.
|
||||
#
|
||||
# The app_id and scope are hardcoded in the application
|
||||
app_id = "9B75AC9EC512F36C84256AC47D813E2C1DD0D6520DF774B020E1E6E2EB29B1F3"
|
||||
scope = [
|
||||
"ReadAccount",
|
||||
"Settings",
|
||||
"IdentifyAppliance",
|
||||
"Control",
|
||||
"DeleteAppliance",
|
||||
"WriteAppliance",
|
||||
"ReadOrigApi",
|
||||
"Monitor",
|
||||
"WriteOrigApi",
|
||||
"Images",
|
||||
]
|
||||
scope = [
|
||||
"ReadOrigApi",
|
||||
]
|
||||
#
|
||||
# Start by fetching the old login page, which gives
|
||||
# us the verifier and challenge for getting the token,
|
||||
# even after the singlekey detour.
|
||||
#
|
||||
# The app_id and scope are hardcoded in the application
|
||||
app_id = "9B75AC9EC512F36C84256AC47D813E2C1DD0D6520DF774B020E1E6E2EB29B1F3"
|
||||
scope = [
|
||||
"ReadAccount",
|
||||
"Settings",
|
||||
"IdentifyAppliance",
|
||||
"Control",
|
||||
"DeleteAppliance",
|
||||
"WriteAppliance",
|
||||
"ReadOrigApi",
|
||||
"Monitor",
|
||||
"WriteOrigApi",
|
||||
"Images",
|
||||
]
|
||||
scope = [
|
||||
"ReadOrigApi",
|
||||
]
|
||||
|
||||
|
||||
def b64(b):
|
||||
return re.sub(r"=", "", base64url_encode(b).decode("UTF-8"))
|
||||
def b64(b):
|
||||
return re.sub(r"=", "", base64url_encode(b).decode("UTF-8"))
|
||||
|
||||
|
||||
def b64random(num):
|
||||
return b64(base64url_encode(get_random_bytes(num)))
|
||||
def b64random(num):
|
||||
return b64(base64url_encode(get_random_bytes(num)))
|
||||
|
||||
|
||||
verifier = b64(get_random_bytes(32))
|
||||
verifier = b64(get_random_bytes(32))
|
||||
|
||||
login_query = {
|
||||
"response_type": "code",
|
||||
"prompt": "login",
|
||||
"code_challenge": b64(SHA256.new(verifier.encode("UTF-8")).digest()),
|
||||
"code_challenge_method": "S256",
|
||||
"client_id": app_id,
|
||||
"scope": " ".join(scope),
|
||||
"nonce": b64random(16),
|
||||
"state": b64random(16),
|
||||
"redirect_uri": "hcauth://auth/prod",
|
||||
"redirect_target": "icore",
|
||||
}
|
||||
|
||||
loginpage_url = base_url + "authorize?" + urlencode(login_query)
|
||||
token_url = base_url + "token"
|
||||
|
||||
debug(f"{loginpage_url=}")
|
||||
r = session.get(loginpage_url)
|
||||
if r.status_code != requests.codes.ok:
|
||||
print("error fetching login url!", loginpage_url, r.text, file=sys.stderr)
|
||||
exit(1)
|
||||
|
||||
# get the session from the text
|
||||
if not (match := re.search(r'"sessionId" value="(.*?)"', r.text)):
|
||||
print("Unable to find session id in login page")
|
||||
exit(1)
|
||||
session_id = match[1]
|
||||
if not (match := re.search(r'"sessionData" value="(.*?)"', r.text)):
|
||||
print("Unable to find session data in login page")
|
||||
exit(1)
|
||||
session_data = match[1]
|
||||
|
||||
debug("--------")
|
||||
|
||||
# now that we have a session id, contact the
|
||||
# single key host to start the new login flow
|
||||
singlekey_host = "https://singlekey-id.com"
|
||||
login_url = singlekey_host + "/auth/en-us/log-in/"
|
||||
|
||||
preauth_url = singlekey_host + "/auth/connect/authorize"
|
||||
preauth_query = {
|
||||
"client_id": "11F75C04-21C2-4DA9-A623-228B54E9A256",
|
||||
"redirect_uri": "https://api.home-connect.com/security/oauth/redirect_target",
|
||||
"response_type": "code",
|
||||
"scope": "openid email profile offline_access homeconnect.general",
|
||||
"prompt": "login",
|
||||
"style_id": "bsh_hc_01",
|
||||
"state": '{"session_id":"' + session_id + '"}', # important: no spaces!
|
||||
}
|
||||
|
||||
# fetch the preauth state to get the final callback url
|
||||
preauth_url += "?" + urlencode(preauth_query)
|
||||
|
||||
# loop until we have the callback url
|
||||
while True:
|
||||
debug(f"next {preauth_url=}")
|
||||
r = session.get(preauth_url, allow_redirects=False)
|
||||
if r.status_code == 200:
|
||||
break
|
||||
if r.status_code > 300 and r.status_code < 400:
|
||||
preauth_url = r.headers["location"]
|
||||
# Make relative locations absolute
|
||||
if not bool(urlparse(preauth_url).netloc):
|
||||
preauth_url = singlekey_host + preauth_url
|
||||
continue
|
||||
print(f"2: {preauth_url=}: failed to fetch {r} {r.text}", file=sys.stderr)
|
||||
exit(1)
|
||||
|
||||
# get the ReturnUrl from the response
|
||||
query = parse_qs(urlparse(preauth_url).query)
|
||||
return_url = query["ReturnUrl"][0]
|
||||
debug(f"{return_url=}")
|
||||
|
||||
if "X-CSRF-FORM-TOKEN" in r.cookies:
|
||||
headers["RequestVerificationToken"] = r.cookies["X-CSRF-FORM-TOKEN"]
|
||||
session.headers.update(headers)
|
||||
|
||||
debug("--------")
|
||||
|
||||
soup = BeautifulSoup(r.text, "html.parser")
|
||||
requestVerificationToken = soup.find("input", {"name": "__RequestVerificationToken"}).get("value")
|
||||
r = session.post(
|
||||
preauth_url,
|
||||
data={
|
||||
"UserIdentifierInput.EmailInput.StringValue": email,
|
||||
"__RequestVerificationToken": requestVerificationToken,
|
||||
},
|
||||
allow_redirects=False,
|
||||
)
|
||||
|
||||
password_url = r.headers["location"]
|
||||
if not bool(urlparse(password_url).netloc):
|
||||
password_url = singlekey_host + password_url
|
||||
|
||||
r = session.get(password_url, allow_redirects=False)
|
||||
soup = BeautifulSoup(r.text, "html.parser")
|
||||
requestVerificationToken = soup.find("input", {"name": "__RequestVerificationToken"}).get("value")
|
||||
|
||||
r = session.post(
|
||||
password_url,
|
||||
data={
|
||||
"Password": password,
|
||||
"RememberMe": "false",
|
||||
"__RequestVerificationToken": requestVerificationToken,
|
||||
},
|
||||
allow_redirects=False,
|
||||
)
|
||||
|
||||
while True:
|
||||
if return_url.startswith("/"):
|
||||
return_url = singlekey_host + return_url
|
||||
r = session.get(return_url, allow_redirects=False)
|
||||
debug(f"{return_url=}, {r} {r.text}")
|
||||
if r.status_code != 302:
|
||||
break
|
||||
return_url = r.headers["location"]
|
||||
if return_url.startswith("hcauth://"):
|
||||
break
|
||||
debug(f"{return_url=}")
|
||||
|
||||
debug("--------")
|
||||
|
||||
url = urlparse(return_url)
|
||||
query = parse_qs(url.query)
|
||||
|
||||
if query.get("ReturnUrl") is not None:
|
||||
print("Wrong credentials.")
|
||||
print(
|
||||
"If you forgot your login/password, you can restore them by opening "
|
||||
"https://singlekey-id.com/auth/en-us/login in browser"
|
||||
)
|
||||
exit(1)
|
||||
|
||||
code = query.get("code")[0]
|
||||
state = query.get("state")[0]
|
||||
grant_type = query.get("grant_type")[0] # "authorization_code"
|
||||
|
||||
debug(f"{code=} {grant_type=} {state=}")
|
||||
|
||||
auth_url = base_url + "login"
|
||||
token_url = base_url + "token"
|
||||
|
||||
token_fields = {
|
||||
"grant_type": grant_type,
|
||||
"client_id": app_id,
|
||||
"code_verifier": verifier,
|
||||
"code": code,
|
||||
"redirect_uri": login_query["redirect_uri"],
|
||||
}
|
||||
|
||||
debug(f"{token_url=} {token_fields=}")
|
||||
|
||||
r = requests.post(token_url, data=token_fields, allow_redirects=False)
|
||||
if r.status_code != requests.codes.ok:
|
||||
print("Bad code?", file=sys.stderr)
|
||||
print(r.headers, r.text)
|
||||
exit(1)
|
||||
|
||||
debug("--------- got token page ----------")
|
||||
|
||||
token = json.loads(r.text)["access_token"]
|
||||
debug(f"Received access {token=}")
|
||||
|
||||
headers = {
|
||||
"Authorization": "Bearer " + token,
|
||||
}
|
||||
|
||||
|
||||
# Try to request account details from all geos. Whichever works, we'll use next.
|
||||
for asset_url in asset_urls:
|
||||
r = requests.get(asset_url + "account/details", headers=headers)
|
||||
if r.status_code == requests.codes.ok:
|
||||
break
|
||||
|
||||
# now we can fetch the rest of the account info
|
||||
if r.status_code != requests.codes.ok:
|
||||
print("unable to fetch account details", file=sys.stderr)
|
||||
print(r.headers, r.text)
|
||||
exit(1)
|
||||
|
||||
# print(r.text)
|
||||
account = json.loads(r.text)
|
||||
configs = []
|
||||
|
||||
print(account, file=sys.stderr)
|
||||
|
||||
for app in account["data"]["homeAppliances"]:
|
||||
app_brand = app["brand"]
|
||||
app_type = app["type"]
|
||||
app_id = app["identifier"]
|
||||
|
||||
config = {
|
||||
"name": app_type.lower(),
|
||||
login_query = {
|
||||
"response_type": "code",
|
||||
"prompt": "login",
|
||||
"code_challenge": b64(SHA256.new(verifier.encode("UTF-8")).digest()),
|
||||
"code_challenge_method": "S256",
|
||||
"client_id": app_id,
|
||||
"scope": " ".join(scope),
|
||||
"nonce": b64random(16),
|
||||
"state": b64random(16),
|
||||
"redirect_uri": "hcauth://auth/prod",
|
||||
"redirect_target": "icore",
|
||||
}
|
||||
|
||||
configs.append(config)
|
||||
loginpage_url = base_url + "authorize?" + urlencode(login_query)
|
||||
token_url = base_url + "token"
|
||||
|
||||
if "tls" in app:
|
||||
# fancy machine with TLS support
|
||||
config["host"] = app_brand + "-" + app_type + "-" + app_id
|
||||
config["key"] = app["tls"]["key"]
|
||||
else:
|
||||
# less fancy machine with HTTP support
|
||||
config["host"] = app_id
|
||||
config["key"] = app["aes"]["key"]
|
||||
config["iv"] = app["aes"]["iv"]
|
||||
|
||||
# Fetch the XML zip file for this device
|
||||
app_url = asset_url + "api/iddf/v1/iddf/" + app_id
|
||||
print("fetching", app_url, file=sys.stderr)
|
||||
r = requests.get(app_url, headers=headers)
|
||||
debug(f"{loginpage_url=}")
|
||||
r = session.get(loginpage_url)
|
||||
if r.status_code != requests.codes.ok:
|
||||
print(app_id, ": unable to fetch machine description?")
|
||||
next
|
||||
print("error fetching login url!", loginpage_url, r.text, file=sys.stderr)
|
||||
exit(1)
|
||||
|
||||
# we now have a zip file with XML, let's unpack them
|
||||
content = r.content
|
||||
print(app_url + ": " + app_id + ".zip", file=sys.stderr)
|
||||
with open(app_id + ".zip", "wb") as f:
|
||||
f.write(content)
|
||||
z = ZipFile(io.BytesIO(content))
|
||||
# print(z.infolist())
|
||||
features = z.open(app_id + "_FeatureMapping.xml").read()
|
||||
description = z.open(app_id + "_DeviceDescription.xml").read()
|
||||
# get the session from the text
|
||||
if not (match := re.search(r'"sessionId" value="(.*?)"', r.text)):
|
||||
print("Unable to find session id in login page")
|
||||
exit(1)
|
||||
session_id = match[1]
|
||||
if not (match := re.search(r'"sessionData" value="(.*?)"', r.text)):
|
||||
print("Unable to find session data in login page")
|
||||
exit(1)
|
||||
session_data = match[1]
|
||||
|
||||
machine = xml2json(features, description)
|
||||
config["description"] = machine["description"]
|
||||
config["features"] = augment_device_features(machine["features"])
|
||||
print("Discovered device: " + config["name"] + " - Device hostname: " + config["host"])
|
||||
debug("--------")
|
||||
|
||||
with open(devicefile, "w") as f:
|
||||
json.dump(configs, f, ensure_ascii=True, indent=4)
|
||||
# now that we have a session id, contact the
|
||||
# single key host to start the new login flow
|
||||
singlekey_host = "https://singlekey-id.com"
|
||||
login_url = singlekey_host + "/auth/en-us/log-in/"
|
||||
|
||||
print(
|
||||
"Success. You can now edit "
|
||||
+ devicefile
|
||||
+ ", if needed, and run hc2mqtt.py or start Home Assistant addon again"
|
||||
)
|
||||
preauth_url = singlekey_host + "/auth/connect/authorize"
|
||||
preauth_query = {
|
||||
"client_id": "11F75C04-21C2-4DA9-A623-228B54E9A256",
|
||||
"redirect_uri": "https://api.home-connect.com/security/oauth/redirect_target",
|
||||
"response_type": "code",
|
||||
"scope": "openid email profile offline_access homeconnect.general",
|
||||
"prompt": "login",
|
||||
"style_id": "bsh_hc_01",
|
||||
"state": '{"session_id":"' + session_id + '"}', # important: no spaces!
|
||||
}
|
||||
|
||||
# fetch the preauth state to get the final callback url
|
||||
preauth_url += "?" + urlencode(preauth_query)
|
||||
|
||||
# loop until we have the callback url
|
||||
while True:
|
||||
debug(f"next {preauth_url=}")
|
||||
r = session.get(preauth_url, allow_redirects=False)
|
||||
if r.status_code == 200:
|
||||
break
|
||||
if r.status_code > 300 and r.status_code < 400:
|
||||
preauth_url = r.headers["location"]
|
||||
# Make relative locations absolute
|
||||
if not bool(urlparse(preauth_url).netloc):
|
||||
preauth_url = singlekey_host + preauth_url
|
||||
continue
|
||||
print(f"2: {preauth_url=}: failed to fetch {r} {r.text}", file=sys.stderr)
|
||||
exit(1)
|
||||
|
||||
# get the ReturnUrl from the response
|
||||
query = parse_qs(urlparse(preauth_url).query)
|
||||
return_url = query["ReturnUrl"][0]
|
||||
debug(f"{return_url=}")
|
||||
|
||||
if "X-CSRF-FORM-TOKEN" in r.cookies:
|
||||
headers["RequestVerificationToken"] = r.cookies["X-CSRF-FORM-TOKEN"]
|
||||
session.headers.update(headers)
|
||||
|
||||
debug("--------")
|
||||
|
||||
soup = BeautifulSoup(r.text, "html.parser")
|
||||
requestVerificationToken = soup.find("input", {"name": "__RequestVerificationToken"}).get("value")
|
||||
r = session.post(
|
||||
preauth_url,
|
||||
data={
|
||||
"UserIdentifierInput.EmailInput.StringValue": email,
|
||||
"__RequestVerificationToken": requestVerificationToken,
|
||||
},
|
||||
allow_redirects=False,
|
||||
)
|
||||
|
||||
password_url = r.headers["location"]
|
||||
if not bool(urlparse(password_url).netloc):
|
||||
password_url = singlekey_host + password_url
|
||||
|
||||
r = session.get(password_url, allow_redirects=False)
|
||||
soup = BeautifulSoup(r.text, "html.parser")
|
||||
requestVerificationToken = soup.find("input", {"name": "__RequestVerificationToken"}).get("value")
|
||||
|
||||
r = session.post(
|
||||
password_url,
|
||||
data={
|
||||
"Password": password,
|
||||
"RememberMe": "false",
|
||||
"__RequestVerificationToken": requestVerificationToken,
|
||||
},
|
||||
allow_redirects=False,
|
||||
)
|
||||
|
||||
while True:
|
||||
if return_url.startswith("/"):
|
||||
return_url = singlekey_host + return_url
|
||||
r = session.get(return_url, allow_redirects=False)
|
||||
debug(f"{return_url=}, {r} {r.text}")
|
||||
if r.status_code != 302:
|
||||
break
|
||||
return_url = r.headers["location"]
|
||||
if return_url.startswith("hcauth://"):
|
||||
break
|
||||
debug(f"{return_url=}")
|
||||
|
||||
debug("--------")
|
||||
|
||||
url = urlparse(return_url)
|
||||
query = parse_qs(url.query)
|
||||
|
||||
if query.get("ReturnUrl") is not None:
|
||||
print("Wrong credentials.")
|
||||
print(
|
||||
"If you forgot your login/password, you can restore them by opening "
|
||||
"https://singlekey-id.com/auth/en-us/login in browser"
|
||||
)
|
||||
exit(1)
|
||||
|
||||
code = query.get("code")[0]
|
||||
state = query.get("state")[0]
|
||||
grant_type = query.get("grant_type")[0] # "authorization_code"
|
||||
|
||||
debug(f"{code=} {grant_type=} {state=}")
|
||||
|
||||
auth_url = base_url + "login"
|
||||
token_url = base_url + "token"
|
||||
|
||||
token_fields = {
|
||||
"grant_type": grant_type,
|
||||
"client_id": app_id,
|
||||
"code_verifier": verifier,
|
||||
"code": code,
|
||||
"redirect_uri": login_query["redirect_uri"],
|
||||
}
|
||||
|
||||
debug(f"{token_url=} {token_fields=}")
|
||||
|
||||
r = requests.post(token_url, data=token_fields, allow_redirects=False)
|
||||
if r.status_code != requests.codes.ok:
|
||||
print("Bad code?", file=sys.stderr)
|
||||
print(r.headers, r.text)
|
||||
exit(1)
|
||||
|
||||
debug("--------- got token page ----------")
|
||||
|
||||
token = json.loads(r.text)["access_token"]
|
||||
debug(f"Received access {token=}")
|
||||
|
||||
headers = {
|
||||
"Authorization": "Bearer " + token,
|
||||
}
|
||||
|
||||
|
||||
# Try to request account details from all geos. Whichever works, we'll use next.
|
||||
for asset_url in asset_urls:
|
||||
r = requests.get(asset_url + "account/details", headers=headers)
|
||||
if r.status_code == requests.codes.ok:
|
||||
break
|
||||
|
||||
# now we can fetch the rest of the account info
|
||||
if r.status_code != requests.codes.ok:
|
||||
print("unable to fetch account details", file=sys.stderr)
|
||||
print(r.headers, r.text)
|
||||
exit(1)
|
||||
|
||||
# print(r.text)
|
||||
account = json.loads(r.text)
|
||||
configs = []
|
||||
|
||||
print(account, file=sys.stderr)
|
||||
|
||||
for app in account["data"]["homeAppliances"]:
|
||||
app_brand = app["brand"]
|
||||
app_type = app["type"]
|
||||
app_id = app["identifier"]
|
||||
|
||||
config = {
|
||||
"name": app_type.lower(),
|
||||
}
|
||||
|
||||
configs.append(config)
|
||||
|
||||
if "tls" in app:
|
||||
# fancy machine with TLS support
|
||||
config["host"] = app_brand + "-" + app_type + "-" + app_id
|
||||
config["key"] = app["tls"]["key"]
|
||||
else:
|
||||
# less fancy machine with HTTP support
|
||||
config["host"] = app_id
|
||||
config["key"] = app["aes"]["key"]
|
||||
config["iv"] = app["aes"]["iv"]
|
||||
|
||||
# Fetch the XML zip file for this device
|
||||
app_url = asset_url + "api/iddf/v1/iddf/" + app_id
|
||||
print("fetching", app_url, file=sys.stderr)
|
||||
r = requests.get(app_url, headers=headers)
|
||||
if r.status_code != requests.codes.ok:
|
||||
print(app_id, ": unable to fetch machine description?")
|
||||
next
|
||||
|
||||
# we now have a zip file with XML, let's unpack them
|
||||
content = r.content
|
||||
print(app_url + ": " + app_id + ".zip", file=sys.stderr)
|
||||
with open(app_id + ".zip", "wb") as f:
|
||||
f.write(content)
|
||||
z = ZipFile(io.BytesIO(content))
|
||||
# print(z.infolist())
|
||||
features = z.open(app_id + "_FeatureMapping.xml").read()
|
||||
description = z.open(app_id + "_DeviceDescription.xml").read()
|
||||
|
||||
machine = xml2json(features, description)
|
||||
config["description"] = machine["description"]
|
||||
config["features"] = augment_device_features(machine["features"])
|
||||
print("Discovered device: " + config["name"] + " - Device hostname: " + config["host"])
|
||||
|
||||
with open(devicefile, "w") as f:
|
||||
json.dump(configs, f, ensure_ascii=True, indent=4)
|
||||
|
||||
print(
|
||||
"Success. You can now edit "
|
||||
+ devicefile
|
||||
+ ", if needed, and run hc2mqtt.py or start Home Assistant addon again"
|
||||
)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -34,7 +34,7 @@ pkgs.writeShellApplication rec {
|
||||
#/ --mqtt_cafile - CA certifications file for SSL connection
|
||||
#/ --mqtt_certfile - certification file for SSL connection
|
||||
#/ --mqtt_keyfile - certification key file for SSL connection
|
||||
#/ --mqtt_clientname" - name of MQTT broker client, default="hcpy1"
|
||||
#/ --mqtt_clientname - name of MQTT broker client, default="hcpy1"
|
||||
#/ --domain_suffix - the suffix of the domain, default=""
|
||||
#/ --debug/--no-debug - enable debug mode
|
||||
#/ --ha-discovery - enable HomeAssistant discovery
|
||||
|
||||
Reference in New Issue
Block a user