Files
hcpy/HCDevice.py
2022-02-06 10:54:50 +01:00

230 lines
5.4 KiB
Python
Executable File

#!/usr/bin/env python3
# Parse messages from a Home Connect websocket (HCSocket)
# and keep the connection alive
#
# Possible resources to fetch from the devices:
#
# /ro/values
# /ro/descriptionChange
# /ro/allMandatoryValues
# /ro/allDescriptionChanges
# /ro/activeProgram
# /ro/selectedProgram
#
# /ei/initialValues
# /ei/deviceReady
#
# /ci/services
# /ci/registeredDevices
# /ci/pairableDevices
# /ci/delregistration
# /ci/networkDetails
# /ci/networkDetails2
# /ci/wifiNetworks
# /ci/wifiSetting
# /ci/wifiSetting2
# /ci/tzInfo
# /ci/authentication
# /ci/register
# /ci/deregister
#
# /ce/serverDeviceType
# /ce/serverCredential
# /ce/clientCredential
# /ce/hubInformation
# /ce/hubConnected
# /ce/status
#
# /ni/config
#
# /iz/services
import sys
import json
import re
import time
import io
import traceback
from datetime import datetime
def now():
return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
class HCDevice:
def __init__(self, ws):
self.ws = ws
self.machine = None
self.session_id = None
self.tx_msg_id = None
self.device_name = "hcpy"
self.device_id = "0badcafe"
self.debug = False
def load_description(self, device_type):
json_file = "xml/" + device_type + ".json"
try:
with io.open(json_file, "r") as fp:
self.machine = json.load(fp)
print(now(), json_file + ": parsed machine description")
except Exception as e:
print(now(), json_file + ": unable to load machine description", e)
def parse_values(self, values):
if not self.machine:
return
result = {}
for msg in values:
uid = str(msg["uid"])
value = msg["value"]
value_str = str(value)
name = uid
status = None
if uid in self.machine["features"]:
status = self.machine["features"][uid]
if status:
name = status["name"]
if "values" in status \
and value_str in status["values"]:
value = status["values"][value_str]
# trim everything off the name except the last part
name = re.sub(r'^.*\.', '', name)
result[name] = value
return result
def recv(self):
try:
buf = self.ws.recv()
if buf is None:
return None
except Exception as e:
print("receive error", e, traceback.format_exc())
return None
try:
return self.handle_message(buf)
except Exception as e:
print("error handling msg", e, buf, traceback.format_exc())
return None
# reply to a POST or GET message with new data
def reply(self, msg, reply):
self.ws.send({
'sID': msg["sID"],
'msgID': msg["msgID"], # same one they sent to us
'resource': msg["resource"],
'version': msg["version"],
'action': 'RESPONSE',
'data': [reply],
})
# send a message to the device
def get(self, resource, version=1, action="GET", data=None):
msg = {
"sID": self.session_id,
"msgID": self.tx_msg_id,
"resource": resource,
"version": version,
"action": action,
}
if data is not None:
msg["data"] = [data]
self.ws.send(msg)
self.tx_msg_id += 1
def handle_message(self, buf):
msg = json.loads(buf)
if self.debug:
print(now(), "RX:", msg)
sys.stdout.flush()
resource = msg["resource"]
action = msg["action"]
values = {}
if "code" in msg:
#print(now(), "ERROR", msg["code"])
values = {
"error": msg["code"],
"resource": msg.get("resource", ''),
}
elif action == "POST":
if resource == "/ei/initialValues":
# this is the first message they send to us and
# establishes our session plus message ids
self.session_id = msg["sID"]
self.tx_msg_id = msg["data"][0]["edMsgID"]
self.reply(msg, {
"deviceType": "Application",
"deviceName": self.device_name,
"deviceID": self.device_id,
})
# ask the device which services it supports
self.get("/ci/services")
#self.get("/ci/authentication", version=2, data={"nonce": "aGVsbG93b3JsZAo="})
self.get("/ci/info", version=2) # clothes washer
self.get("/iz/info") # dish washer
#self.get("/ci/tzInfo", version=2)
self.get("/ni/info")
#self.get("/ni/config", data={"interfaceID": 0})
self.get("/ei/deviceReady", version=2, action="NOTIFY")
self.get("/ro/allDescriptionChanges")
self.get("/ro/allDescriptionChanges")
self.get("/ro/allMandatoryValues")
#self.get("/ro/values")
else:
print(now(), "Unknown resource", resource, file=sys.stderr)
elif action == "RESPONSE" or action == "NOTIFY":
if resource == "/iz/info" or resource == "/ci/info":
# see if we have a device file for this model
if not "data" in msg:
return values
values = msg["data"][0]
self.load_description(values["vib"])
elif resource == "/ro/allMandatoryValues" \
or resource == "/ro/values":
values = self.parse_values(msg["data"])
#print(now(), values)
elif resource == "/ci/registeredDevices":
# we don't care
pass
elif resource == "/ci/services":
self.services = {}
for service in msg["data"]:
self.services[service["service"]] = {
"version": service["version"],
}
#print(now(), "services", self.services)
# we should figure out which ones to query now
# if "iz" in self.services:
# self.get("/iz/info", version=self.services["iz"]["version"])
# if "ni" in self.services:
# self.get("/ni/info", version=self.services["ni"]["version"])
# if "ei" in self.services:
# self.get("/ei/deviceReady", version=self.services["ei"]["version"], action="NOTIFY")
#self.get("/if/info")
else:
#print(now(), "Unknown reponse", resource)
pass
# return whatever we've parsed out of it
return values