split hcpy into HCSocket and restore HTTPS PSK support as well

This commit is contained in:
Trammell Hudson
2022-02-05 14:31:16 +01:00
parent 7e3bd97853
commit 84592e0c99
2 changed files with 182 additions and 149 deletions

166
HCSocket.py Executable file
View File

@@ -0,0 +1,166 @@
# Create a websocket that wraps a connection to a
# Bosh-Siemens Home Connect device
import socket
import ssl
import sslpsk
import websocket
import sys
import json
import re
import time
import io
from base64 import urlsafe_b64decode as base64url
from datetime import datetime
from Crypto.Cipher import AES
from Crypto.Hash import HMAC, SHA256
from Crypto.Random import get_random_bytes
# Convience to compute an HMAC on a message
def hmac(key,msg):
mac = HMAC.new(key, msg=msg, digestmod=SHA256).digest()
return mac
def now():
return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
# Monkey patch for sslpsk in pip using the old _sslobj
def _sslobj(sock):
if (3, 5) <= sys.version_info <= (3, 7):
return sock._sslobj._sslobj
else:
return sock._sslobj
sslpsk.sslpsk._sslobj = _sslobj
class HCSocket:
def __init__(self, host, psk64, iv64=None):
self.host = host
self.psk = base64url(psk64 + '===')
if iv64:
# an HTTP self-encrypted socket
self.http = True
self.iv = base64url(iv64 + '===')
self.enckey = hmac(self.psk, b'ENC')
self.mackey = hmac(self.psk, b'MAC')
self.port = 80
self.uri = "ws://" + host + ":80/homeconnect"
else:
self.http = False
self.port = 443
self.uri = "wss://" + host + ":443/homeconnect"
self.reconnect()
# restore the encryption state for a fresh connection
# this is only used by the HTTP connection
def reset(self):
if not self.http:
return
self.last_rx_hmac = bytes(16)
self.last_tx_hmac = bytes(16)
self.aes_encrypt = AES.new(self.enckey, AES.MODE_CBC, self.iv)
self.aes_decrypt = AES.new(self.enckey, AES.MODE_CBC, self.iv)
# hmac an inbound or outbound message, chaining the last hmac too
def hmac_msg(self, direction, enc_msg):
hmac_msg = self.iv + direction + enc_msg
return hmac(self.mackey, hmac_msg)[0:16]
def decrypt(self,buf):
if len(buf) < 32:
print("Short message?", buf.hex(), file=sys.stderr)
return None
if len(buf) % 16 != 0:
print("Unaligned message? probably bad", buf.hex(), file=sys.stderr)
# split the message into the encrypted message and the first 16-bytes of the HMAC
enc_msg = buf[0:-16]
their_hmac = buf[-16:]
# compute the expected hmac on the encrypted message
our_hmac = self.hmac_msg(b'\x43' + self.last_rx_hmac, enc_msg)
if their_hmac != our_hmac:
print("HMAC failure", their_hmac.hex(), our_hmac.hex(), file=sys.stderr)
return None
self.last_rx_hmac = their_hmac
# decrypt the message with CBC, so the last message block is mixed in
msg = self.aes_decrypt.decrypt(enc_msg)
# check for padding and trim it off the end
pad_len = msg[-1]
if len(msg) < pad_len:
print("padding error?", msg.hex())
return None
return msg[0:-pad_len]
def encrypt(self, clear_msg):
# convert the UTF-8 string into a byte array
clear_msg = bytes(clear_msg, 'utf-8')
# pad the buffer, adding an extra block if necessary
pad_len = 16 - (len(clear_msg) % 16)
if pad_len == 1:
pad_len += 16
pad = b'\x00' + get_random_bytes(pad_len-2) + bytearray([pad_len])
clear_msg = clear_msg + pad
# encrypt the padded message with CBC, so there is chained
# state from the last cipher block sent
enc_msg = self.aes_encrypt.encrypt(clear_msg)
# compute the hmac of the encrypted message, chaining the
# hmac of the previous message plus direction 'E'
self.last_tx_hmac = self.hmac_msg(b'\x45' + self.last_tx_hmac, enc_msg)
# append the new hmac to the message
return enc_msg + self.last_tx_hmac
def reconnect(self):
self.reset()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((self.host,self.port))
if not self.http:
sock = sslpsk.wrap_socket(
sock,
ssl_version = ssl.PROTOCOL_TLSv1_2,
ciphers = 'ECDHE-PSK-CHACHA20-POLY1305',
psk = self.psk,
)
print(now(), "CON:", self.uri)
self.ws = websocket.WebSocket()
self.ws.connect(self.uri,
socket = sock,
origin = "",
)
def send(self, msg):
print(now(), "TX:", msg)
buf = json.dumps(msg, separators=(',', ':') )
# swap " for '
buf = re.sub("'", '"', buf)
if self.http:
self.ws.send_binary(self.encrypt(buf))
else:
self.ws.send(buf)
def recv(self):
buf = self.ws.recv()
if buf is None or buf == "":
return None
if self.http:
buf = self.decrypt(buf)
if buf is None:
return None
print(now(), "RX:", buf)
return buf

165
hcpy
View File

@@ -1,106 +1,22 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# Contact a Bosh-Siemens Home Connect device # Contact a Bosh-Siemens Home Connect device
# todo: document how to extract the psk # todo: document how to extract the psk
import socket
import ssl
import sslpsk
import websocket
import sys import sys
import json import json
import re import re
import time import time
import io import io
from base64 import urlsafe_b64decode as base64url from HCSocket import HCSocket, now
from datetime import datetime
from Crypto.Cipher import AES
from Crypto.Hash import HMAC, SHA256
from Crypto.Random import get_random_bytes
psk64 = 'KlRQQyG8AkEfRFPr0v7vultz96zcal5lxj2fAc2ohaY' #host = '10.1.0.145'
iv64 = 'tTUvqcsBldtkhHvDwE2DpQ' #psk64 = 'KlRQQyG8AkEfRFPr0v7vultz96zcal5lxj2fAc2ohaY'
#iv64 = 'tTUvqcsBldtkhHvDwE2DpQ'
psk = base64url(psk64 + '===') host = "10.1.0.133"
iv = base64url(iv64 + '===') psk64 = "Dsgf2MZJ-ti85_00M1QT1HP5LgH82CaASYlMGdcuzcs="
iv64 = None # no iv == https
def hmac(key,msg): ws = HCSocket(host, psk64, iv64)
mac = HMAC.new(key, msg=msg, digestmod=SHA256).digest()
return mac
enckey = hmac(psk, b'ENC')
mackey = hmac(psk, b'MAC')
#print("ENC", enckey.hex())
#print("MAC", mackey.hex())
last_rx_hmac = bytes(16)
last_tx_hmac = bytes(16)
encrypt = AES.new(enckey, AES.MODE_CBC, iv)
decrypt = AES.new(enckey, AES.MODE_CBC, iv)
def hc_decrypt(buf):
global last_rx_hmac
if len(buf) < 32:
print("Short message?", buf.hex(), file=sys.stderr)
return None
if len(buf) % 16 != 0:
print("Unaligned message? probably bad", buf.hex(), file=sys.stderr)
# validate the message using the first 16-bytes of the HMAC
their_hmac = buf[-16:]
enc_msg = buf[0:-16]
hmac_msg = iv + b'\x43' + last_rx_hmac + enc_msg
our_hmac = hmac(mackey, hmac_msg)[0:16]
if their_hmac != our_hmac:
print("HMAC failure", their_hmac.hex(), our_hmac.hex(), file=sys.stderr)
return None
last_rx_hmac = their_hmac
# decrypt the message with CBC, so the last message block is mixed in
msg = decrypt.decrypt(enc_msg)
# check for padding and trim it off the end
pad_len = msg[-1]
if len(msg) < pad_len:
print("padding error?", msg.hex())
return None
return msg[0:-pad_len]
def hc_encrypt(clear_msg, pad=True):
global last_tx_hmac
# convert the UTF-8 string into a byte array
clear_msg = bytes(clear_msg, 'utf-8')
# pad the buffer, adding an extra block if necessary
if pad:
pad_len = 16 - (len(clear_msg) % 16)
if pad_len == 1:
pad_len += 16
pad = b'\x00' + get_random_bytes(pad_len-2) + bytearray([pad_len])
clear_msg = clear_msg + pad
# encrypt the padded message with CBC, so there is chained
# state from the last cipher block sent
enc_msg = encrypt.encrypt(clear_msg)
# hmac it, chaining the last hmac too
hmac_msg = iv + b'\x45' + last_tx_hmac + enc_msg
last_tx_hmac = hmac(mackey, hmac_msg)[0:16]
out_msg = enc_msg + last_tx_hmac
return out_msg
#print(hc_encrypt('{"sID":2642355413,"msgID":1520318190,"resource":"/ei/initialValues","version":2,"action":"RESPONSE","data":[{"deviceType":"Application","deviceName":"Pixel","deviceID":"d304ee06571f0d09"}]}', True).hex())
#exit(1)
#print(enckey.hex())
#print(valkey.hex())
# read in a machine description if provided # read in a machine description if provided
machine = None machine = None
@@ -108,45 +24,14 @@ if len(sys.argv) > 1:
with io.open(sys.argv[1], "r") as fp: with io.open(sys.argv[1], "r") as fp:
machine = json.load(fp) machine = json.load(fp)
# Monkey patch for sslpsk in pip using the old _sslobj
def _sslobj(sock):
if (3, 5) <= sys.version_info <= (3, 7):
return sock._sslobj._sslobj
else:
return sock._sslobj
sslpsk.sslpsk._sslobj = _sslobj
def now():
return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
debug = False
host = '10.1.0.145'
port = 80;
uri = "ws://"+host+":"+str(port)+ "/homeconnect"
session_id = None session_id = None
tx_msg_id = None tx_msg_id = None
tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp_sock.connect((host,port))
#ssl_sock = sslpsk.wrap_socket(
# tcp_sock,
# ssl_version = ssl.PROTOCOL_TLSv1_2,
# ciphers = 'ECDHE-PSK-CHACHA20-POLY1305',
# psk = psk, #(psk, b'py-hca'),
#)
ws = None
def tx(msg):
print(now(), "TX:", msg)
buf = json.dumps(msg, separators=(',', ':') )
# swap " for '
buf = re.sub("'", '"', buf)
ws.send_binary(hc_encrypt(buf))
def send_initial_messages(): def send_initial_messages():
global session_id, tx_msg_id global session_id, tx_msg_id
# subscribe to stuff # subscribe to stuff
tx({ ws.send({
"sID": session_id, "sID": session_id,
"msgID": tx_msg_id, "msgID": tx_msg_id,
"resource": "/ci/services", "resource": "/ci/services",
@@ -156,7 +41,7 @@ def send_initial_messages():
tx_msg_id += 1 tx_msg_id += 1
tx({ ws.send({
"sID": session_id, "sID": session_id,
"msgID": tx_msg_id, "msgID": tx_msg_id,
"resource": "/iz/info", "resource": "/iz/info",
@@ -166,7 +51,7 @@ def send_initial_messages():
tx_msg_id += 1 tx_msg_id += 1
tx({ ws.send({
"sID": session_id, "sID": session_id,
"msgID": tx_msg_id, "msgID": tx_msg_id,
"resource": "/ei/deviceReady", "resource": "/ei/deviceReady",
@@ -191,7 +76,7 @@ def handle_message(buf):
tx_msg_id = msg["data"][0]["edMsgID"] tx_msg_id = msg["data"][0]["edMsgID"]
# reply with a response and our initial get # reply with a response and our initial get
tx({ ws.send({
'sID': session_id, 'sID': session_id,
'msgID': msg["msgID"], # same one they sent to us 'msgID': msg["msgID"], # same one they sent to us
'resource': msg["resource"], 'resource': msg["resource"],
@@ -206,7 +91,7 @@ def handle_message(buf):
}], }],
}) })
tx({ ws.send({
"sID": session_id, "sID": session_id,
"msgID": tx_msg_id, "msgID": tx_msg_id,
"resource": "/ci/services", "resource": "/ci/services",
@@ -234,30 +119,12 @@ def handle_message(buf):
print(status["name"] + "=" + value) print(status["name"] + "=" + value)
#session_id = "1"
#handle_message('{"sID": 2887352564, "msgID": 195493504, "resource": "/ro/values", "version": 1, "action": "NOTIFY", "data": [{"uid": 527, "value": 1}]}')
#exit(0)
if debug:
websocket.enableTrace(True)
ws = websocket.WebSocket()
ws.connect(uri,
#socket=ssl_sock,
socket=tcp_sock,
origin = "", #"https://" + host,
)
#ws.run_forever()
while True: while True:
buf = ws.recv() buf = ws.recv()
if buf is None or buf == "": if buf is None or buf == "":
continue continue
try: try:
msg = hc_decrypt(buf) handle_message(buf)
if msg is None:
continue
handle_message(msg)
except Exception as e: except Exception as e:
print("error handling msg", e, buf.hex()) print("error handling msg", e, buf)