223 lines
6.5 KiB
Python
Executable File
223 lines
6.5 KiB
Python
Executable File
# Create a websocket that wraps a connection to a
|
|
# Bosh-Siemens Home Connect device
|
|
import json
|
|
import re
|
|
import socket
|
|
import ssl
|
|
import sys
|
|
from base64 import urlsafe_b64decode as base64url
|
|
from datetime import datetime
|
|
|
|
import sslpsk
|
|
import websocket
|
|
from Crypto.Cipher import AES
|
|
from Crypto.Hash import HMAC, SHA256
|
|
from Crypto.Random import get_random_bytes
|
|
|
|
|
|
# Convience to compute an HMAC on a message
|
|
def hmac(key, msg):
|
|
mac = HMAC.new(key, msg=msg, digestmod=SHA256).digest()
|
|
return mac
|
|
|
|
|
|
def now():
|
|
return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
|
|
|
|
|
|
# Monkey patch for sslpsk in pip using the old _sslobj
|
|
def _sslobj(sock):
|
|
if (3, 5) <= sys.version_info <= (3, 7):
|
|
return sock._sslobj._sslobj
|
|
else:
|
|
return sock._sslobj
|
|
|
|
|
|
sslpsk.sslpsk._sslobj = _sslobj
|
|
|
|
|
|
class HCSocket:
|
|
def __init__(self, host, psk64, iv64=None, domain_suffix=""):
|
|
self.host = host
|
|
if domain_suffix:
|
|
self.host = f"{host}.{domain_suffix}"
|
|
|
|
self.psk = base64url(psk64 + "===")
|
|
self.debug = False
|
|
|
|
if iv64:
|
|
# an HTTP self-encrypted socket
|
|
self.http = True
|
|
self.iv = base64url(iv64 + "===")
|
|
self.enckey = hmac(self.psk, b"ENC")
|
|
self.mackey = hmac(self.psk, b"MAC")
|
|
self.port = 80
|
|
self.uri = f"ws://{host}:80/homeconnect"
|
|
else:
|
|
self.http = False
|
|
self.port = 443
|
|
self.uri = f"wss://{host}:443/homeconnect"
|
|
|
|
# don't connect automatically so that debug etc can be set
|
|
# self.reconnect()
|
|
|
|
# restore the encryption state for a fresh connection
|
|
# this is only used by the HTTP connection
|
|
def reset(self):
|
|
if not self.http:
|
|
return
|
|
self.last_rx_hmac = bytes(16)
|
|
self.last_tx_hmac = bytes(16)
|
|
|
|
self.aes_encrypt = AES.new(self.enckey, AES.MODE_CBC, self.iv)
|
|
self.aes_decrypt = AES.new(self.enckey, AES.MODE_CBC, self.iv)
|
|
|
|
# hmac an inbound or outbound message, chaining the last hmac too
|
|
def hmac_msg(self, direction, enc_msg):
|
|
hmac_msg = self.iv + direction + enc_msg
|
|
return hmac(self.mackey, hmac_msg)[0:16]
|
|
|
|
def decrypt(self, buf):
|
|
if len(buf) < 32:
|
|
print("Short message?", buf.hex(), file=sys.stderr)
|
|
return None
|
|
if len(buf) % 16 != 0:
|
|
print("Unaligned message? probably bad", buf.hex(), file=sys.stderr)
|
|
|
|
# split the message into the encrypted message and the first 16-bytes of the HMAC
|
|
enc_msg = buf[0:-16]
|
|
their_hmac = buf[-16:]
|
|
|
|
# compute the expected hmac on the encrypted message
|
|
our_hmac = self.hmac_msg(b"\x43" + self.last_rx_hmac, enc_msg)
|
|
|
|
if their_hmac != our_hmac:
|
|
print("HMAC failure", their_hmac.hex(), our_hmac.hex(), file=sys.stderr)
|
|
return None
|
|
|
|
self.last_rx_hmac = their_hmac
|
|
|
|
# decrypt the message with CBC, so the last message block is mixed in
|
|
msg = self.aes_decrypt.decrypt(enc_msg)
|
|
|
|
# check for padding and trim it off the end
|
|
pad_len = msg[-1]
|
|
if len(msg) < pad_len:
|
|
print("padding error?", msg.hex())
|
|
return None
|
|
|
|
return msg[0:-pad_len]
|
|
|
|
def encrypt(self, clear_msg):
|
|
# convert the UTF-8 string into a byte array
|
|
clear_msg = bytes(clear_msg, "utf-8")
|
|
|
|
# pad the buffer, adding an extra block if necessary
|
|
pad_len = 16 - (len(clear_msg) % 16)
|
|
if pad_len == 1:
|
|
pad_len += 16
|
|
pad = b"\x00" + get_random_bytes(pad_len - 2) + bytearray([pad_len])
|
|
|
|
clear_msg = clear_msg + pad
|
|
|
|
# encrypt the padded message with CBC, so there is chained
|
|
# state from the last cipher block sent
|
|
enc_msg = self.aes_encrypt.encrypt(clear_msg)
|
|
|
|
# compute the hmac of the encrypted message, chaining the
|
|
# hmac of the previous message plus direction 'E'
|
|
self.last_tx_hmac = self.hmac_msg(b"\x45" + self.last_tx_hmac, enc_msg)
|
|
|
|
# append the new hmac to the message
|
|
return enc_msg + self.last_tx_hmac
|
|
|
|
def reconnect(self):
|
|
self.reset()
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.connect((self.host, self.port))
|
|
|
|
if not self.http:
|
|
sock = sslpsk.wrap_socket(
|
|
sock,
|
|
ssl_version=ssl.PROTOCOL_TLSv1_2,
|
|
ciphers="ECDHE-PSK-CHACHA20-POLY1305",
|
|
psk=self.psk,
|
|
)
|
|
|
|
print(now(), "CON:", self.uri)
|
|
self.ws = websocket.WebSocket()
|
|
self.ws.connect(self.uri, socket=sock, origin="")
|
|
|
|
def send(self, msg):
|
|
buf = json.dumps(msg, separators=(",", ":"))
|
|
# swap " for '
|
|
buf = re.sub("'", '"', buf)
|
|
self.dprint("TX:", buf)
|
|
if self.http:
|
|
self.ws.send_bytes(self.encrypt(buf))
|
|
else:
|
|
self.ws.send(buf)
|
|
|
|
def recv(self):
|
|
buf = self.ws.recv()
|
|
if buf is None or buf == "":
|
|
return None
|
|
|
|
if self.http:
|
|
buf = self.decrypt(buf)
|
|
if buf is None:
|
|
return None
|
|
|
|
self.dprint("RX:", buf)
|
|
return buf
|
|
|
|
def run_forever(self, on_message, on_open, on_close, on_error):
|
|
self.reset()
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.connect((self.host, self.port))
|
|
|
|
if not self.http:
|
|
sock = sslpsk.wrap_socket(
|
|
sock,
|
|
ssl_version=ssl.PROTOCOL_TLSv1_2,
|
|
ciphers="ECDHE-PSK-CHACHA20-POLY1305",
|
|
psk=self.psk,
|
|
)
|
|
|
|
def _on_open(ws):
|
|
self.dprint("on connect")
|
|
on_open(ws)
|
|
|
|
def _on_close(ws, close_status_code, close_msg):
|
|
self.dprint(f"close: {close_msg}")
|
|
on_close(ws, close_status_code, close_msg)
|
|
|
|
def _on_message(ws, message):
|
|
if self.http:
|
|
message = self.decrypt(message)
|
|
self.dprint("RX:", message)
|
|
on_message(ws, message)
|
|
|
|
def _on_error(ws, error):
|
|
self.dprint(f"error {error}")
|
|
on_error(ws, error)
|
|
|
|
print(now(), "CON:", self.uri)
|
|
self.ws = websocket.WebSocketApp(
|
|
self.uri,
|
|
socket=sock,
|
|
on_open=_on_open,
|
|
on_message=_on_message,
|
|
on_close=_on_close,
|
|
on_error=_on_error,
|
|
)
|
|
|
|
websocket.setdefaulttimeout(30)
|
|
|
|
self.ws.run_forever(ping_interval=120, ping_timeout=10)
|
|
|
|
# Debug print
|
|
def dprint(self, *args):
|
|
if self.debug:
|
|
print(now(), *args)
|