initial version of a very hacky websocket tool

This commit is contained in:
Trammell Hudson
2022-01-30 19:01:31 +01:00
commit afe099672c
3 changed files with 272 additions and 0 deletions

129
hcpy Executable file
View File

@@ -0,0 +1,129 @@
#!/usr/bin/env python3
# Contact a Bosh-Siemens Home Connect device
# todo: document how to extract the psk
import socket
import ssl
import sslpsk
import websocket
import sys
import json
import re
import time
# Monkey patch for sslpsk in pip using the old _sslobj
def _sslobj(sock):
if (3, 5) <= sys.version_info <= (3, 7):
return sock._sslobj._sslobj
else:
return sock._sslobj
sslpsk.sslpsk._sslobj = _sslobj
debug = False
host = '10.1.0.133'
port = 443;
uri = "wss://"+host+":"+str(port)+ "/homeconnect"
psk = b'\x0e\xc8\x1f\xd8\xc6\x49\xfa\xd8\xbc\xe7\xfd\x34\x33\x54\x13\xd4\x73\xf9\x2e\x01\xfc\xd8\x26\x80\x49\x89\x4c\x19\xd7\x2e\xcd\xcb';
session_id = None
msg_id = None
tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp_sock.connect((host,port))
ssl_sock = sslpsk.wrap_socket(
tcp_sock,
ssl_version = ssl.PROTOCOL_TLSv1_2,
ciphers = 'ECDHE-PSK-CHACHA20-POLY1305',
psk = psk, #(psk, b'py-hca'),
)
ws = None
def tx(msg):
buf = json.dumps(msg, separators=(',', ':') )
# swap " for '
buf = re.sub("'", '"', buf)
print("TX:", buf)
ws.send(buf)
def send_initial_messages():
global session_id, msg_id
# subscribe to stuff
tx({
"sID": session_id,
"msgID": msg_id,
"resource": "/ci/services",
"version": 1,
"action": "GET",
})
msg_id += 1
tx({
"sID": session_id,
"msgID": msg_id,
"resource": "/iz/info",
"version": 1,
"action": "GET",
})
msg_id += 1
tx({
"sID": session_id,
"msgID": msg_id,
"resource": "/ei/deviceReady",
"version": 2,
"action": "NOTIFY",
})
msg_id += 1
def handle_message(buf):
global session_id, msg_id
msg = json.loads(buf)
print("RX:", msg)
# first message from the device establishes the session etc
# {'sID': 926468163, 'msgID': 3785595876, 'resource': '/ei/initialValues', 'version': 2, 'action': 'POST', 'data': [{'edMsgID': 2569124008}]}
if session_id == None:
session_id = msg["sID"]
msg_id = msg["data"][0]["edMsgID"]
# reply with a bogus message
tx({
'sID': session_id,
'msgID': msg["msgID"],
'resource': msg["resource"],
'version': msg["version"],
'action': 'RESPONSE',
'data': [{
"deviceType": "Application",
"deviceName": "py-hca",
"deviceID": "1234",
#"deviceName": "Pixel",
#"deviceID": "d304ee06571f0d09",
}],
})
send_initial_messages()
# do other stuff?
if debug:
websocket.enableTrace(True)
ws = websocket.WebSocket()
ws.connect(uri,
socket=ssl_sock,
origin = "", #"https://" + host,
)
# on_message = handle_message,
# on_error = exit,
#)
#ws.run_forever()
while True:
buf = ws.recv()
if buf is None or buf == "":
continue
handle_message(buf)