Compare commits

6 Commits
nix ... main

8 changed files with 405 additions and 293 deletions

22
flake.lock generated
View File

@@ -2,7 +2,9 @@
"nodes": { "nodes": {
"dream2nix": { "dream2nix": {
"inputs": { "inputs": {
"nixpkgs": "nixpkgs", "nixpkgs": [
"nixpkgs"
],
"purescript-overlay": "purescript-overlay", "purescript-overlay": "purescript-overlay",
"pyproject-nix": "pyproject-nix" "pyproject-nix": "pyproject-nix"
}, },
@@ -38,18 +40,17 @@
}, },
"nixpkgs": { "nixpkgs": {
"locked": { "locked": {
"lastModified": 1729850857, "lastModified": 1732014248,
"narHash": "sha256-WvLXzNNnnw+qpFOmgaM3JUlNEH+T4s22b5i2oyyCpXE=", "narHash": "sha256-y/MEyuJ5oBWrWAic/14LaIr/u5E0wRVzyYsouYY3W6w=",
"owner": "NixOS", "owner": "NixOS",
"repo": "nixpkgs", "repo": "nixpkgs",
"rev": "41dea55321e5a999b17033296ac05fe8a8b5a257", "rev": "23e89b7da85c3640bbc2173fe04f4bd114342367",
"type": "github" "type": "github"
}, },
"original": { "original": {
"owner": "NixOS", "id": "nixpkgs",
"ref": "nixpkgs-unstable", "ref": "nixos-unstable",
"repo": "nixpkgs", "type": "indirect"
"type": "github"
} }
}, },
"purescript-overlay": { "purescript-overlay": {
@@ -95,10 +96,7 @@
"root": { "root": {
"inputs": { "inputs": {
"dream2nix": "dream2nix", "dream2nix": "dream2nix",
"nixpkgs": [ "nixpkgs": "nixpkgs"
"dream2nix",
"nixpkgs"
]
} }
}, },
"slimlock": { "slimlock": {

View File

@@ -1,7 +1,8 @@
{ {
inputs = { inputs = {
nixpkgs.url = "nixpkgs/nixos-unstable";
dream2nix.url = "github:nix-community/dream2nix"; dream2nix.url = "github:nix-community/dream2nix";
nixpkgs.follows = "dream2nix/nixpkgs"; dream2nix.inputs.nixpkgs.follows = "nixpkgs";
}; };
outputs = { outputs = {
@@ -16,6 +17,11 @@
"x86_64-linux" "x86_64-linux"
]; ];
in { in {
nixosModules = rec {
hcpy = import ./module.nix self;
default = hcpy;
};
packages = eachSystem (system: let packages = eachSystem (system: let
pkgs = nixpkgs.legacyPackages.${system}; pkgs = nixpkgs.legacyPackages.${system};
in rec { in rec {
@@ -25,6 +31,7 @@
./package.nix ./package.nix
{ {
paths.projectRootFile = "flake.nix"; paths.projectRootFile = "flake.nix";
paths.lockFile = "lock.${system}.json";
paths.projectRoot = ./.; paths.projectRoot = ./.;
paths.package = ./.; paths.package = ./.;
} }

View File

@@ -24,9 +24,9 @@
}, },
"charset-normalizer": { "charset-normalizer": {
"is_direct": false, "is_direct": false,
"sha256": "8cda06946eac330cbe6598f77bb54e690b4ca93f593dee1568ad22b04f347c15", "sha256": "7f683ddc7eedd742e2889d2bfb96d69573fde1d92fcb811979cdb7165bb9c7d3",
"type": "url", "type": "url",
"url": "https://files.pythonhosted.org/packages/16/92/92a76dc2ff3a12e69ba94e7e05168d37d0345fa08c87e1fe24d0c2a42223/charset_normalizer-3.4.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", "url": "https://files.pythonhosted.org/packages/f8/01/344ec40cf5d85c1da3c1f57566c59e0c9b56bcc5566c08804a95a6cc8257/charset_normalizer-3.4.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
"version": "3.4.0" "version": "3.4.0"
}, },
"click": { "click": {
@@ -59,9 +59,9 @@
}, },
"lxml": { "lxml": {
"is_direct": false, "is_direct": false,
"sha256": "3879cc6ce938ff4eb4900d901ed63555c778731a96365e53fadb36437a131a99", "sha256": "b1c8c20847b9f34e98080da785bb2336ea982e7f913eed5809e5a3c872900f32",
"type": "url", "type": "url",
"url": "https://files.pythonhosted.org/packages/0a/6e/94537acfb5b8f18235d13186d247bca478fea5e87d224644e0fe907df976/lxml-5.3.0-cp312-cp312-manylinux_2_28_x86_64.whl", "url": "https://files.pythonhosted.org/packages/05/9e/87492d03ff604fbf656ed2bf3e2e8d28f5d58ea1f00ff27ac27b06509079/lxml-5.3.0-cp310-cp310-manylinux_2_28_x86_64.whl",
"version": "5.3.0" "version": "5.3.0"
}, },
"paho-mqtt": { "paho-mqtt": {
@@ -155,5 +155,5 @@
} }
} }
}, },
"invalidationHash": "8b77dd84722cef2a9d624d9674be2397c777c8b180b813df1327f67470dd3476" "invalidationHash": "ffec585af8e2f405564c1aa26829f11dbf9265334a938576ea9f416033be7da5"
} }

104
module.nix Normal file
View File

@@ -0,0 +1,104 @@
self: {
config,
lib,
pkgs,
system,
...
}:
with lib; let
cfg = config.services.hcpy;
bridgeConfig =
cfg.config
// {
devices_file = cfg.devicesFile;
};
hcpy = pkgs.writeShellApplication {
name = "hcpy-service";
runtimeInputs = [self.packages.${system}.default];
text = ''
if [ ! -f "${cfg.devicesFile}" ]; then
echo "Device file does not exist, requesting login to HomeConnect cloud";
hcpy login "$(cat "${cfg.homeconnect.usernameFile}")" \
"$(cat "${cfg.homeconnect.passwordFile}")" \
"${cfg.devicesFile}";
fi
hcpy run ${cli.toGNUCommandLineShell {} bridgeConfig} \
--mqtt_username "$(cat "${cfg.mqtt.usernameFile}")" \
--mqtt_password "$(cat "${cfg.mqtt.passwordFile}")"
'';
};
in {
options.services.hcpy = {
enable = mkEnableOption "HomeConnect bridge service";
homeconnect = {
usernameFile = mkOption {
type = types.path;
description = "Path to file containing single line with HomeConnect account username";
example = "/run/hc.username.key";
};
passwordFile = mkOption {
type = types.path;
description = "Path to file containing single line with HomeConnect account password";
example = "/run/hc.password.key";
};
};
mqtt = {
usernameFile = mkOption {
type = types.path;
description = "Path to file containing single line with MQTT client username";
example = "/run/mqtt.username.key";
};
passwordFile = mkOption {
type = types.path;
description = "Path to file containing single line with MQTT client password";
example = "/run/mqtt.password.key";
};
};
devicesFile = mkOption {
type = types.path;
description = "Path to JSON file fetched from HomeConnect API";
example = "/var/lib/hcpy/devices.json";
};
config = mkOption {
type = types.attrs;
description = "The configuration of hcpy bridge";
example = {
mqtt_host = "localhost";
mqtt_prefix = "homeconnect/";
mqtt_port = 8883;
mqtt_ssl = true;
mqtt_cafile = "/run/ca.pem";
mqtt_certfile = "/run/cert.crt";
mqtt_keyfile = "/run/cert.key";
mqtt_clientname = "hcpy1";
domain_suffix = "";
debug = true;
ha-discovery = true;
};
};
};
config = mkIf cfg.enable {
systemd.services.hcpy = {
enable = true;
description = "HomeConnect bridge service";
wantedBy = ["multi-user.target"];
wants = ["network-online.target"];
after = ["network-online.target"];
serviceConfig = {
ExecStart = "${hcpy}/bin/hcpy-service";
Restart = "on-failure";
RestartSec = 10;
};
};
};
}

View File

@@ -14,7 +14,7 @@ in rec {
inherit (pyproject.project) name version; inherit (pyproject.project) name version;
deps = {nixpkgs, ...}: { deps = {nixpkgs, ...}: {
python = nixpkgs.python3; python = nixpkgs.python310;
inherit (nixpkgs) openssl; inherit (nixpkgs) openssl;
}; };

View File

@@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta"
name = "hcpy" name = "hcpy"
version = "0.1.5" version = "0.1.5"
description = "HC to MQTT bridge" description = "HC to MQTT bridge"
requires-python = ">=3.7" requires-python = "==3.10"
dependencies = [ dependencies = [
"bs4", "bs4",
"requests", "requests",
@@ -20,5 +20,5 @@ dependencies = [
] ]
[project.scripts] [project.scripts]
hc-login = "hcpy:hc_login" hc-login = "hcpy.hc_login:main"
hc2mqtt = "hcpy.hc2mqtt:hc2mqtt" hc2mqtt = "hcpy.hc2mqtt:hc2mqtt"

View File

@@ -31,34 +31,34 @@ from hcpy.HCxml2json import xml2json
# requests_log.setLevel(logging.DEBUG) # requests_log.setLevel(logging.DEBUG)
# requests_log.propagate = True # requests_log.propagate = True
def main():
def debug(*args): def debug(*args):
print(*args, file=sys.stderr) print(*args, file=sys.stderr)
email = sys.argv[1] email = sys.argv[1]
password = sys.argv[2] password = sys.argv[2]
devicefile = sys.argv[3] devicefile = sys.argv[3]
headers = {"User-Agent": "hc-login/1.0"} headers = {"User-Agent": "hc-login/1.0"}
session = requests.Session() session = requests.Session()
session.headers.update(headers) session.headers.update(headers)
base_url = "https://api.home-connect.com/security/oauth/" base_url = "https://api.home-connect.com/security/oauth/"
asset_urls = [ asset_urls = [
"https://prod.reu.rest.homeconnectegw.com/", # EU "https://prod.reu.rest.homeconnectegw.com/", # EU
"https://prod.rna.rest.homeconnectegw.com/", # US "https://prod.rna.rest.homeconnectegw.com/", # US
] ]
# #
# Start by fetching the old login page, which gives # Start by fetching the old login page, which gives
# us the verifier and challenge for getting the token, # us the verifier and challenge for getting the token,
# even after the singlekey detour. # even after the singlekey detour.
# #
# The app_id and scope are hardcoded in the application # The app_id and scope are hardcoded in the application
app_id = "9B75AC9EC512F36C84256AC47D813E2C1DD0D6520DF774B020E1E6E2EB29B1F3" app_id = "9B75AC9EC512F36C84256AC47D813E2C1DD0D6520DF774B020E1E6E2EB29B1F3"
scope = [ scope = [
"ReadAccount", "ReadAccount",
"Settings", "Settings",
"IdentifyAppliance", "IdentifyAppliance",
@@ -69,23 +69,23 @@ scope = [
"Monitor", "Monitor",
"WriteOrigApi", "WriteOrigApi",
"Images", "Images",
] ]
scope = [ scope = [
"ReadOrigApi", "ReadOrigApi",
] ]
def b64(b): def b64(b):
return re.sub(r"=", "", base64url_encode(b).decode("UTF-8")) return re.sub(r"=", "", base64url_encode(b).decode("UTF-8"))
def b64random(num): def b64random(num):
return b64(base64url_encode(get_random_bytes(num))) return b64(base64url_encode(get_random_bytes(num)))
verifier = b64(get_random_bytes(32)) verifier = b64(get_random_bytes(32))
login_query = { login_query = {
"response_type": "code", "response_type": "code",
"prompt": "login", "prompt": "login",
"code_challenge": b64(SHA256.new(verifier.encode("UTF-8")).digest()), "code_challenge": b64(SHA256.new(verifier.encode("UTF-8")).digest()),
@@ -96,36 +96,36 @@ login_query = {
"state": b64random(16), "state": b64random(16),
"redirect_uri": "hcauth://auth/prod", "redirect_uri": "hcauth://auth/prod",
"redirect_target": "icore", "redirect_target": "icore",
} }
loginpage_url = base_url + "authorize?" + urlencode(login_query) loginpage_url = base_url + "authorize?" + urlencode(login_query)
token_url = base_url + "token" token_url = base_url + "token"
debug(f"{loginpage_url=}") debug(f"{loginpage_url=}")
r = session.get(loginpage_url) r = session.get(loginpage_url)
if r.status_code != requests.codes.ok: if r.status_code != requests.codes.ok:
print("error fetching login url!", loginpage_url, r.text, file=sys.stderr) print("error fetching login url!", loginpage_url, r.text, file=sys.stderr)
exit(1) exit(1)
# get the session from the text # get the session from the text
if not (match := re.search(r'"sessionId" value="(.*?)"', r.text)): if not (match := re.search(r'"sessionId" value="(.*?)"', r.text)):
print("Unable to find session id in login page") print("Unable to find session id in login page")
exit(1) exit(1)
session_id = match[1] session_id = match[1]
if not (match := re.search(r'"sessionData" value="(.*?)"', r.text)): if not (match := re.search(r'"sessionData" value="(.*?)"', r.text)):
print("Unable to find session data in login page") print("Unable to find session data in login page")
exit(1) exit(1)
session_data = match[1] session_data = match[1]
debug("--------") debug("--------")
# now that we have a session id, contact the # now that we have a session id, contact the
# single key host to start the new login flow # single key host to start the new login flow
singlekey_host = "https://singlekey-id.com" singlekey_host = "https://singlekey-id.com"
login_url = singlekey_host + "/auth/en-us/log-in/" login_url = singlekey_host + "/auth/en-us/log-in/"
preauth_url = singlekey_host + "/auth/connect/authorize" preauth_url = singlekey_host + "/auth/connect/authorize"
preauth_query = { preauth_query = {
"client_id": "11F75C04-21C2-4DA9-A623-228B54E9A256", "client_id": "11F75C04-21C2-4DA9-A623-228B54E9A256",
"redirect_uri": "https://api.home-connect.com/security/oauth/redirect_target", "redirect_uri": "https://api.home-connect.com/security/oauth/redirect_target",
"response_type": "code", "response_type": "code",
@@ -133,13 +133,13 @@ preauth_query = {
"prompt": "login", "prompt": "login",
"style_id": "bsh_hc_01", "style_id": "bsh_hc_01",
"state": '{"session_id":"' + session_id + '"}', # important: no spaces! "state": '{"session_id":"' + session_id + '"}', # important: no spaces!
} }
# fetch the preauth state to get the final callback url # fetch the preauth state to get the final callback url
preauth_url += "?" + urlencode(preauth_query) preauth_url += "?" + urlencode(preauth_query)
# loop until we have the callback url # loop until we have the callback url
while True: while True:
debug(f"next {preauth_url=}") debug(f"next {preauth_url=}")
r = session.get(preauth_url, allow_redirects=False) r = session.get(preauth_url, allow_redirects=False)
if r.status_code == 200: if r.status_code == 200:
@@ -153,37 +153,37 @@ while True:
print(f"2: {preauth_url=}: failed to fetch {r} {r.text}", file=sys.stderr) print(f"2: {preauth_url=}: failed to fetch {r} {r.text}", file=sys.stderr)
exit(1) exit(1)
# get the ReturnUrl from the response # get the ReturnUrl from the response
query = parse_qs(urlparse(preauth_url).query) query = parse_qs(urlparse(preauth_url).query)
return_url = query["ReturnUrl"][0] return_url = query["ReturnUrl"][0]
debug(f"{return_url=}") debug(f"{return_url=}")
if "X-CSRF-FORM-TOKEN" in r.cookies: if "X-CSRF-FORM-TOKEN" in r.cookies:
headers["RequestVerificationToken"] = r.cookies["X-CSRF-FORM-TOKEN"] headers["RequestVerificationToken"] = r.cookies["X-CSRF-FORM-TOKEN"]
session.headers.update(headers) session.headers.update(headers)
debug("--------") debug("--------")
soup = BeautifulSoup(r.text, "html.parser") soup = BeautifulSoup(r.text, "html.parser")
requestVerificationToken = soup.find("input", {"name": "__RequestVerificationToken"}).get("value") requestVerificationToken = soup.find("input", {"name": "__RequestVerificationToken"}).get("value")
r = session.post( r = session.post(
preauth_url, preauth_url,
data={ data={
"UserIdentifierInput.EmailInput.StringValue": email, "UserIdentifierInput.EmailInput.StringValue": email,
"__RequestVerificationToken": requestVerificationToken, "__RequestVerificationToken": requestVerificationToken,
}, },
allow_redirects=False, allow_redirects=False,
) )
password_url = r.headers["location"] password_url = r.headers["location"]
if not bool(urlparse(password_url).netloc): if not bool(urlparse(password_url).netloc):
password_url = singlekey_host + password_url password_url = singlekey_host + password_url
r = session.get(password_url, allow_redirects=False) r = session.get(password_url, allow_redirects=False)
soup = BeautifulSoup(r.text, "html.parser") soup = BeautifulSoup(r.text, "html.parser")
requestVerificationToken = soup.find("input", {"name": "__RequestVerificationToken"}).get("value") requestVerificationToken = soup.find("input", {"name": "__RequestVerificationToken"}).get("value")
r = session.post( r = session.post(
password_url, password_url,
data={ data={
"Password": password, "Password": password,
@@ -191,9 +191,9 @@ r = session.post(
"__RequestVerificationToken": requestVerificationToken, "__RequestVerificationToken": requestVerificationToken,
}, },
allow_redirects=False, allow_redirects=False,
) )
while True: while True:
if return_url.startswith("/"): if return_url.startswith("/"):
return_url = singlekey_host + return_url return_url = singlekey_host + return_url
r = session.get(return_url, allow_redirects=False) r = session.get(return_url, allow_redirects=False)
@@ -203,14 +203,14 @@ while True:
return_url = r.headers["location"] return_url = r.headers["location"]
if return_url.startswith("hcauth://"): if return_url.startswith("hcauth://"):
break break
debug(f"{return_url=}") debug(f"{return_url=}")
debug("--------") debug("--------")
url = urlparse(return_url) url = urlparse(return_url)
query = parse_qs(url.query) query = parse_qs(url.query)
if query.get("ReturnUrl") is not None: if query.get("ReturnUrl") is not None:
print("Wrong credentials.") print("Wrong credentials.")
print( print(
"If you forgot your login/password, you can restore them by opening " "If you forgot your login/password, you can restore them by opening "
@@ -218,60 +218,60 @@ if query.get("ReturnUrl") is not None:
) )
exit(1) exit(1)
code = query.get("code")[0] code = query.get("code")[0]
state = query.get("state")[0] state = query.get("state")[0]
grant_type = query.get("grant_type")[0] # "authorization_code" grant_type = query.get("grant_type")[0] # "authorization_code"
debug(f"{code=} {grant_type=} {state=}") debug(f"{code=} {grant_type=} {state=}")
auth_url = base_url + "login" auth_url = base_url + "login"
token_url = base_url + "token" token_url = base_url + "token"
token_fields = { token_fields = {
"grant_type": grant_type, "grant_type": grant_type,
"client_id": app_id, "client_id": app_id,
"code_verifier": verifier, "code_verifier": verifier,
"code": code, "code": code,
"redirect_uri": login_query["redirect_uri"], "redirect_uri": login_query["redirect_uri"],
} }
debug(f"{token_url=} {token_fields=}") debug(f"{token_url=} {token_fields=}")
r = requests.post(token_url, data=token_fields, allow_redirects=False) r = requests.post(token_url, data=token_fields, allow_redirects=False)
if r.status_code != requests.codes.ok: if r.status_code != requests.codes.ok:
print("Bad code?", file=sys.stderr) print("Bad code?", file=sys.stderr)
print(r.headers, r.text) print(r.headers, r.text)
exit(1) exit(1)
debug("--------- got token page ----------") debug("--------- got token page ----------")
token = json.loads(r.text)["access_token"] token = json.loads(r.text)["access_token"]
debug(f"Received access {token=}") debug(f"Received access {token=}")
headers = { headers = {
"Authorization": "Bearer " + token, "Authorization": "Bearer " + token,
} }
# Try to request account details from all geos. Whichever works, we'll use next. # Try to request account details from all geos. Whichever works, we'll use next.
for asset_url in asset_urls: for asset_url in asset_urls:
r = requests.get(asset_url + "account/details", headers=headers) r = requests.get(asset_url + "account/details", headers=headers)
if r.status_code == requests.codes.ok: if r.status_code == requests.codes.ok:
break break
# now we can fetch the rest of the account info # now we can fetch the rest of the account info
if r.status_code != requests.codes.ok: if r.status_code != requests.codes.ok:
print("unable to fetch account details", file=sys.stderr) print("unable to fetch account details", file=sys.stderr)
print(r.headers, r.text) print(r.headers, r.text)
exit(1) exit(1)
# print(r.text) # print(r.text)
account = json.loads(r.text) account = json.loads(r.text)
configs = [] configs = []
print(account, file=sys.stderr) print(account, file=sys.stderr)
for app in account["data"]["homeAppliances"]: for app in account["data"]["homeAppliances"]:
app_brand = app["brand"] app_brand = app["brand"]
app_type = app["type"] app_type = app["type"]
app_id = app["identifier"] app_id = app["identifier"]
@@ -315,11 +315,14 @@ for app in account["data"]["homeAppliances"]:
config["features"] = augment_device_features(machine["features"]) config["features"] = augment_device_features(machine["features"])
print("Discovered device: " + config["name"] + " - Device hostname: " + config["host"]) print("Discovered device: " + config["name"] + " - Device hostname: " + config["host"])
with open(devicefile, "w") as f: with open(devicefile, "w") as f:
json.dump(configs, f, ensure_ascii=True, indent=4) json.dump(configs, f, ensure_ascii=True, indent=4)
print( print(
"Success. You can now edit " "Success. You can now edit "
+ devicefile + devicefile
+ ", if needed, and run hc2mqtt.py or start Home Assistant addon again" + ", if needed, and run hc2mqtt.py or start Home Assistant addon again"
) )
if __name__ == "__main__":
main()

View File

@@ -34,7 +34,7 @@ pkgs.writeShellApplication rec {
#/ --mqtt_cafile - CA certifications file for SSL connection #/ --mqtt_cafile - CA certifications file for SSL connection
#/ --mqtt_certfile - certification file for SSL connection #/ --mqtt_certfile - certification file for SSL connection
#/ --mqtt_keyfile - certification key file for SSL connection #/ --mqtt_keyfile - certification key file for SSL connection
#/ --mqtt_clientname" - name of MQTT broker client, default="hcpy1" #/ --mqtt_clientname - name of MQTT broker client, default="hcpy1"
#/ --domain_suffix - the suffix of the domain, default="" #/ --domain_suffix - the suffix of the domain, default=""
#/ --debug/--no-debug - enable debug mode #/ --debug/--no-debug - enable debug mode
#/ --ha-discovery - enable HomeAssistant discovery #/ --ha-discovery - enable HomeAssistant discovery