Files
hcpy/hcpy

264 lines
6.4 KiB
Python
Executable File

#!/usr/bin/env python3
# Contact a Bosh-Siemens Home Connect device
# todo: document how to extract the psk
import socket
import ssl
import sslpsk
import websocket
import sys
import json
import re
import time
import io
from base64 import urlsafe_b64decode as base64url
from datetime import datetime
from Crypto.Cipher import AES
from Crypto.Hash import HMAC, SHA256
from Crypto.Random import get_random_bytes
psk64 = 'KlRQQyG8AkEfRFPr0v7vultz96zcal5lxj2fAc2ohaY'
iv64 = 'tTUvqcsBldtkhHvDwE2DpQ'
psk = base64url(psk64 + '===')
iv = base64url(iv64 + '===')
def hmac(key,msg):
mac = HMAC.new(key, msg=msg, digestmod=SHA256).digest()
return mac
enckey = hmac(psk, b'ENC')
mackey = hmac(psk, b'MAC')
#print("ENC", enckey.hex())
#print("MAC", mackey.hex())
last_rx_hmac = bytes(16)
last_tx_hmac = bytes(16)
encrypt = AES.new(enckey, AES.MODE_CBC, iv)
decrypt = AES.new(enckey, AES.MODE_CBC, iv)
def hc_decrypt(buf):
global last_rx_hmac
if len(buf) < 32:
print("Short message?", buf.hex(), file=sys.stderr)
return None
if len(buf) % 16 != 0:
print("Unaligned message? probably bad", buf.hex(), file=sys.stderr)
# validate the message using the first 16-bytes of the HMAC
their_hmac = buf[-16:]
enc_msg = buf[0:-16]
hmac_msg = iv + b'\x43' + last_rx_hmac + enc_msg
our_hmac = hmac(mackey, hmac_msg)[0:16]
if their_hmac != our_hmac:
print("HMAC failure", their_hmac.hex(), our_hmac.hex(), file=sys.stderr)
return None
last_rx_hmac = their_hmac
# decrypt the message with CBC, so the last message block is mixed in
msg = decrypt.decrypt(enc_msg)
# check for padding and trim it off the end
pad_len = msg[-1]
if len(msg) < pad_len:
print("padding error?", msg.hex())
return None
return msg[0:-pad_len]
def hc_encrypt(clear_msg, pad=True):
global last_tx_hmac
# convert the UTF-8 string into a byte array
clear_msg = bytes(clear_msg, 'utf-8')
# pad the buffer, adding an extra block if necessary
if pad:
pad_len = 16 - (len(clear_msg) % 16)
if pad_len == 1:
pad_len += 16
pad = b'\x00' + get_random_bytes(pad_len-2) + bytearray([pad_len])
clear_msg = clear_msg + pad
# encrypt the padded message with CBC, so there is chained
# state from the last cipher block sent
enc_msg = encrypt.encrypt(clear_msg)
# hmac it, chaining the last hmac too
hmac_msg = iv + b'\x45' + last_tx_hmac + enc_msg
last_tx_hmac = hmac(mackey, hmac_msg)[0:16]
out_msg = enc_msg + last_tx_hmac
return out_msg
#print(hc_encrypt('{"sID":2642355413,"msgID":1520318190,"resource":"/ei/initialValues","version":2,"action":"RESPONSE","data":[{"deviceType":"Application","deviceName":"Pixel","deviceID":"d304ee06571f0d09"}]}', True).hex())
#exit(1)
#print(enckey.hex())
#print(valkey.hex())
# read in a machine description if provided
machine = None
if len(sys.argv) > 1:
with io.open(sys.argv[1], "r") as fp:
machine = json.load(fp)
# Monkey patch for sslpsk in pip using the old _sslobj
def _sslobj(sock):
if (3, 5) <= sys.version_info <= (3, 7):
return sock._sslobj._sslobj
else:
return sock._sslobj
sslpsk.sslpsk._sslobj = _sslobj
def now():
return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
debug = False
host = '10.1.0.145'
port = 80;
uri = "ws://"+host+":"+str(port)+ "/homeconnect"
session_id = None
tx_msg_id = None
tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp_sock.connect((host,port))
#ssl_sock = sslpsk.wrap_socket(
# tcp_sock,
# ssl_version = ssl.PROTOCOL_TLSv1_2,
# ciphers = 'ECDHE-PSK-CHACHA20-POLY1305',
# psk = psk, #(psk, b'py-hca'),
#)
ws = None
def tx(msg):
print(now(), "TX:", msg)
buf = json.dumps(msg, separators=(',', ':') )
# swap " for '
buf = re.sub("'", '"', buf)
ws.send_binary(hc_encrypt(buf))
def send_initial_messages():
global session_id, tx_msg_id
# subscribe to stuff
tx({
"sID": session_id,
"msgID": tx_msg_id,
"resource": "/ci/services",
"version": 1,
"action": "GET",
})
tx_msg_id += 1
tx({
"sID": session_id,
"msgID": tx_msg_id,
"resource": "/iz/info",
"version": 1,
"action": "GET",
})
tx_msg_id += 1
tx({
"sID": session_id,
"msgID": tx_msg_id,
"resource": "/ei/deviceReady",
"version": 2,
"action": "NOTIFY",
})
tx_msg_id += 1
def handle_message(buf):
global session_id, tx_msg_id
msg = json.loads(buf)
print(now(), "RX:", msg)
sys.stdout.flush()
# first message from the device establishes the session etc
# {'sID': 926468163, 'msgID': 3785595876, 'resource': '/ei/initialValues', 'version': 2, 'action': 'POST', 'data': [{'edMsgID': 2569124008}]}
# {"sID":2642355413,"msgID":1520318190,"resource":"/ei/initialValues","version":2,"action":"RESPONSE","data":[{"deviceType":"Application","deviceName":"Pixel","deviceID":"d304ee06571f0d09"}]}
# {"sID":1651311247,"msgID":3920866429,"resource":"/ei/initialValues","version":2,"action":"RESPONSE","data":[{"deviceType":"Application","deviceName":"Pixel","deviceID":"d304ee06571f0d09"}]}
if session_id == None:
session_id = msg["sID"]
tx_msg_id = msg["data"][0]["edMsgID"]
# reply with a response and our initial get
tx({
'sID': session_id,
'msgID': msg["msgID"], # same one they sent to us
'resource': msg["resource"],
'version': msg["version"],
'action': 'RESPONSE',
'data': [{
"deviceType": "Application",
#"deviceName": "py-hca",
#"deviceID": "1234",
"deviceName": "Pixel",
"deviceID": "d304ee06571f0d09",
}],
})
tx({
"sID": session_id,
"msgID": tx_msg_id,
"resource": "/ci/services",
"version": 1,
"action": "GET",
})
tx_msg_id += 1
send_initial_messages()
# do other stuff?
if machine and "data" in msg:
for el in msg["data"]:
if "uid" not in el:
continue
uid = str(el["uid"])
if not(uid in machine["status"]):
continue
status = machine["status"][uid]
value = str(el["value"])
if "values" in status and value in status["values"]:
value = status["values"][value]
print(status["name"] + "=" + value)
#session_id = "1"
#handle_message('{"sID": 2887352564, "msgID": 195493504, "resource": "/ro/values", "version": 1, "action": "NOTIFY", "data": [{"uid": 527, "value": 1}]}')
#exit(0)
if debug:
websocket.enableTrace(True)
ws = websocket.WebSocket()
ws.connect(uri,
#socket=ssl_sock,
socket=tcp_sock,
origin = "", #"https://" + host,
)
#ws.run_forever()
while True:
buf = ws.recv()
if buf is None or buf == "":
continue
try:
msg = hc_decrypt(buf)
if msg is None:
continue
handle_message(msg)
except Exception as e:
print("error handling msg", e, buf.hex())