From 84592e0c992aea3628853596bb182deb01125257 Mon Sep 17 00:00:00 2001 From: Trammell Hudson Date: Sat, 5 Feb 2022 14:31:16 +0100 Subject: [PATCH] split hcpy into HCSocket and restore HTTPS PSK support as well --- HCSocket.py | 166 ++++++++++++++++++++++++++++++++++++++++++++++++++++ hcpy | 165 +++++---------------------------------------------- 2 files changed, 182 insertions(+), 149 deletions(-) create mode 100755 HCSocket.py diff --git a/HCSocket.py b/HCSocket.py new file mode 100755 index 0000000..e6e93a2 --- /dev/null +++ b/HCSocket.py @@ -0,0 +1,166 @@ +# Create a websocket that wraps a connection to a +# Bosh-Siemens Home Connect device +import socket +import ssl +import sslpsk +import websocket +import sys +import json +import re +import time +import io +from base64 import urlsafe_b64decode as base64url +from datetime import datetime +from Crypto.Cipher import AES +from Crypto.Hash import HMAC, SHA256 +from Crypto.Random import get_random_bytes + +# Convience to compute an HMAC on a message +def hmac(key,msg): + mac = HMAC.new(key, msg=msg, digestmod=SHA256).digest() + return mac + +def now(): + return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") + +# Monkey patch for sslpsk in pip using the old _sslobj +def _sslobj(sock): + if (3, 5) <= sys.version_info <= (3, 7): + return sock._sslobj._sslobj + else: + return sock._sslobj +sslpsk.sslpsk._sslobj = _sslobj + + +class HCSocket: + def __init__(self, host, psk64, iv64=None): + self.host = host + self.psk = base64url(psk64 + '===') + + if iv64: + # an HTTP self-encrypted socket + self.http = True + self.iv = base64url(iv64 + '===') + self.enckey = hmac(self.psk, b'ENC') + self.mackey = hmac(self.psk, b'MAC') + self.port = 80 + self.uri = "ws://" + host + ":80/homeconnect" + else: + self.http = False + self.port = 443 + self.uri = "wss://" + host + ":443/homeconnect" + + self.reconnect() + + # restore the encryption state for a fresh connection + # this is only used by the HTTP connection + def reset(self): + if not self.http: + return + self.last_rx_hmac = bytes(16) + self.last_tx_hmac = bytes(16) + + self.aes_encrypt = AES.new(self.enckey, AES.MODE_CBC, self.iv) + self.aes_decrypt = AES.new(self.enckey, AES.MODE_CBC, self.iv) + + # hmac an inbound or outbound message, chaining the last hmac too + def hmac_msg(self, direction, enc_msg): + hmac_msg = self.iv + direction + enc_msg + return hmac(self.mackey, hmac_msg)[0:16] + + def decrypt(self,buf): + if len(buf) < 32: + print("Short message?", buf.hex(), file=sys.stderr) + return None + if len(buf) % 16 != 0: + print("Unaligned message? probably bad", buf.hex(), file=sys.stderr) + + # split the message into the encrypted message and the first 16-bytes of the HMAC + enc_msg = buf[0:-16] + their_hmac = buf[-16:] + + # compute the expected hmac on the encrypted message + our_hmac = self.hmac_msg(b'\x43' + self.last_rx_hmac, enc_msg) + + if their_hmac != our_hmac: + print("HMAC failure", their_hmac.hex(), our_hmac.hex(), file=sys.stderr) + return None + + self.last_rx_hmac = their_hmac + + # decrypt the message with CBC, so the last message block is mixed in + msg = self.aes_decrypt.decrypt(enc_msg) + + # check for padding and trim it off the end + pad_len = msg[-1] + if len(msg) < pad_len: + print("padding error?", msg.hex()) + return None + + return msg[0:-pad_len] + + def encrypt(self, clear_msg): + # convert the UTF-8 string into a byte array + clear_msg = bytes(clear_msg, 'utf-8') + + # pad the buffer, adding an extra block if necessary + pad_len = 16 - (len(clear_msg) % 16) + if pad_len == 1: + pad_len += 16 + pad = b'\x00' + get_random_bytes(pad_len-2) + bytearray([pad_len]) + + clear_msg = clear_msg + pad + + # encrypt the padded message with CBC, so there is chained + # state from the last cipher block sent + enc_msg = self.aes_encrypt.encrypt(clear_msg) + + # compute the hmac of the encrypted message, chaining the + # hmac of the previous message plus direction 'E' + self.last_tx_hmac = self.hmac_msg(b'\x45' + self.last_tx_hmac, enc_msg) + + # append the new hmac to the message + return enc_msg + self.last_tx_hmac + + def reconnect(self): + self.reset() + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((self.host,self.port)) + + if not self.http: + sock = sslpsk.wrap_socket( + sock, + ssl_version = ssl.PROTOCOL_TLSv1_2, + ciphers = 'ECDHE-PSK-CHACHA20-POLY1305', + psk = self.psk, + ) + + print(now(), "CON:", self.uri) + self.ws = websocket.WebSocket() + self.ws.connect(self.uri, + socket = sock, + origin = "", + ) + + def send(self, msg): + print(now(), "TX:", msg) + buf = json.dumps(msg, separators=(',', ':') ) + # swap " for ' + buf = re.sub("'", '"', buf) + if self.http: + self.ws.send_binary(self.encrypt(buf)) + else: + self.ws.send(buf) + + def recv(self): + buf = self.ws.recv() + if buf is None or buf == "": + return None + + if self.http: + buf = self.decrypt(buf) + if buf is None: + return None + + print(now(), "RX:", buf) + return buf diff --git a/hcpy b/hcpy index 5bd142c..7fc400d 100755 --- a/hcpy +++ b/hcpy @@ -1,106 +1,22 @@ #!/usr/bin/env python3 # Contact a Bosh-Siemens Home Connect device # todo: document how to extract the psk -import socket -import ssl -import sslpsk -import websocket import sys import json import re import time import io -from base64 import urlsafe_b64decode as base64url -from datetime import datetime -from Crypto.Cipher import AES -from Crypto.Hash import HMAC, SHA256 -from Crypto.Random import get_random_bytes +from HCSocket import HCSocket, now -psk64 = 'KlRQQyG8AkEfRFPr0v7vultz96zcal5lxj2fAc2ohaY' -iv64 = 'tTUvqcsBldtkhHvDwE2DpQ' +#host = '10.1.0.145' +#psk64 = 'KlRQQyG8AkEfRFPr0v7vultz96zcal5lxj2fAc2ohaY' +#iv64 = 'tTUvqcsBldtkhHvDwE2DpQ' -psk = base64url(psk64 + '===') -iv = base64url(iv64 + '===') +host = "10.1.0.133" +psk64 = "Dsgf2MZJ-ti85_00M1QT1HP5LgH82CaASYlMGdcuzcs=" +iv64 = None # no iv == https -def hmac(key,msg): - mac = HMAC.new(key, msg=msg, digestmod=SHA256).digest() - return mac - -enckey = hmac(psk, b'ENC') -mackey = hmac(psk, b'MAC') -#print("ENC", enckey.hex()) -#print("MAC", mackey.hex()) - -last_rx_hmac = bytes(16) -last_tx_hmac = bytes(16) - -encrypt = AES.new(enckey, AES.MODE_CBC, iv) -decrypt = AES.new(enckey, AES.MODE_CBC, iv) - -def hc_decrypt(buf): - global last_rx_hmac - - if len(buf) < 32: - print("Short message?", buf.hex(), file=sys.stderr) - return None - if len(buf) % 16 != 0: - print("Unaligned message? probably bad", buf.hex(), file=sys.stderr) - - # validate the message using the first 16-bytes of the HMAC - their_hmac = buf[-16:] - enc_msg = buf[0:-16] - hmac_msg = iv + b'\x43' + last_rx_hmac + enc_msg - our_hmac = hmac(mackey, hmac_msg)[0:16] - - if their_hmac != our_hmac: - print("HMAC failure", their_hmac.hex(), our_hmac.hex(), file=sys.stderr) - return None - - last_rx_hmac = their_hmac - - # decrypt the message with CBC, so the last message block is mixed in - msg = decrypt.decrypt(enc_msg) - - # check for padding and trim it off the end - pad_len = msg[-1] - if len(msg) < pad_len: - print("padding error?", msg.hex()) - return None - - return msg[0:-pad_len] - -def hc_encrypt(clear_msg, pad=True): - global last_tx_hmac - - # convert the UTF-8 string into a byte array - clear_msg = bytes(clear_msg, 'utf-8') - - # pad the buffer, adding an extra block if necessary - if pad: - pad_len = 16 - (len(clear_msg) % 16) - if pad_len == 1: - pad_len += 16 - pad = b'\x00' + get_random_bytes(pad_len-2) + bytearray([pad_len]) - - clear_msg = clear_msg + pad - - # encrypt the padded message with CBC, so there is chained - # state from the last cipher block sent - enc_msg = encrypt.encrypt(clear_msg) - - # hmac it, chaining the last hmac too - hmac_msg = iv + b'\x45' + last_tx_hmac + enc_msg - last_tx_hmac = hmac(mackey, hmac_msg)[0:16] - - out_msg = enc_msg + last_tx_hmac - return out_msg - -#print(hc_encrypt('{"sID":2642355413,"msgID":1520318190,"resource":"/ei/initialValues","version":2,"action":"RESPONSE","data":[{"deviceType":"Application","deviceName":"Pixel","deviceID":"d304ee06571f0d09"}]}', True).hex()) -#exit(1) - - -#print(enckey.hex()) -#print(valkey.hex()) +ws = HCSocket(host, psk64, iv64) # read in a machine description if provided machine = None @@ -108,45 +24,14 @@ if len(sys.argv) > 1: with io.open(sys.argv[1], "r") as fp: machine = json.load(fp) -# Monkey patch for sslpsk in pip using the old _sslobj -def _sslobj(sock): - if (3, 5) <= sys.version_info <= (3, 7): - return sock._sslobj._sslobj - else: - return sock._sslobj -sslpsk.sslpsk._sslobj = _sslobj - -def now(): - return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") - -debug = False -host = '10.1.0.145' -port = 80; -uri = "ws://"+host+":"+str(port)+ "/homeconnect" session_id = None tx_msg_id = None -tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -tcp_sock.connect((host,port)) -#ssl_sock = sslpsk.wrap_socket( -# tcp_sock, -# ssl_version = ssl.PROTOCOL_TLSv1_2, -# ciphers = 'ECDHE-PSK-CHACHA20-POLY1305', -# psk = psk, #(psk, b'py-hca'), -#) - -ws = None -def tx(msg): - print(now(), "TX:", msg) - buf = json.dumps(msg, separators=(',', ':') ) - # swap " for ' - buf = re.sub("'", '"', buf) - ws.send_binary(hc_encrypt(buf)) def send_initial_messages(): global session_id, tx_msg_id # subscribe to stuff - tx({ + ws.send({ "sID": session_id, "msgID": tx_msg_id, "resource": "/ci/services", @@ -156,7 +41,7 @@ def send_initial_messages(): tx_msg_id += 1 - tx({ + ws.send({ "sID": session_id, "msgID": tx_msg_id, "resource": "/iz/info", @@ -166,7 +51,7 @@ def send_initial_messages(): tx_msg_id += 1 - tx({ + ws.send({ "sID": session_id, "msgID": tx_msg_id, "resource": "/ei/deviceReady", @@ -191,7 +76,7 @@ def handle_message(buf): tx_msg_id = msg["data"][0]["edMsgID"] # reply with a response and our initial get - tx({ + ws.send({ 'sID': session_id, 'msgID': msg["msgID"], # same one they sent to us 'resource': msg["resource"], @@ -206,7 +91,7 @@ def handle_message(buf): }], }) - tx({ + ws.send({ "sID": session_id, "msgID": tx_msg_id, "resource": "/ci/services", @@ -234,30 +119,12 @@ def handle_message(buf): print(status["name"] + "=" + value) - -#session_id = "1" -#handle_message('{"sID": 2887352564, "msgID": 195493504, "resource": "/ro/values", "version": 1, "action": "NOTIFY", "data": [{"uid": 527, "value": 1}]}') -#exit(0) - -if debug: - websocket.enableTrace(True) -ws = websocket.WebSocket() -ws.connect(uri, - #socket=ssl_sock, - socket=tcp_sock, - origin = "", #"https://" + host, -) - -#ws.run_forever() - while True: buf = ws.recv() if buf is None or buf == "": continue + try: - msg = hc_decrypt(buf) - if msg is None: - continue - handle_message(msg) + handle_message(buf) except Exception as e: - print("error handling msg", e, buf.hex()) + print("error handling msg", e, buf)