Merge branch 'device_details' into websocket_keepalive

This commit is contained in:
Meatballs1
2024-03-26 17:48:43 +00:00
2 changed files with 96 additions and 59 deletions

View File

@@ -36,6 +36,7 @@
# /ce/status
#
# /ni/config
# /ni/info
#
# /iz/services
@@ -63,6 +64,9 @@ class HCDevice:
self.device_id = "0badcafe"
self.debug = False
self.name = name
self.services_initialized = False
self.services = {}
self.token = None
def parse_values(self, values):
if not self.features:
@@ -223,6 +227,15 @@ class HCDevice:
# send a message to the device
def get(self, resource, version=1, action="GET", data=None):
if self.services_initialized:
resource_parts = resource.split("/")
if len(resource_parts) > 1:
service = resource.split("/")[1]
if service in self.services.keys():
version = self.services[service]["version"]
else:
print(now(), self.name, "ERROR service not known")
msg = {
"sID": self.session_id,
"msgID": self.tx_msg_id,
@@ -251,6 +264,47 @@ class HCDevice:
print(self.name, "Failed to send", e, msg, traceback.format_exc())
self.tx_msg_id += 1
def reconnect(self):
self.ws.reconnect()
# Receive initialization message /ei/initialValues
# Automatically responds in the handle_message function
self.recv()
# ask the device which services it supports
# registered devices gets pushed down too hence the loop
self.get("/ci/services")
while True:
self.recv()
if self.services_initialized:
break
# We override the version based on the registered services received above
# the clothes washer wants this, the token doesn't matter,
# although they do not handle padding characters
# they send a response, not sure how to interpet it
self.token = base64url_encode(get_random_bytes(32)).decode("UTF-8")
self.token = re.sub(r"=", "", self.token)
self.get("/ci/authentication", version=2, data={"nonce": self.token})
self.get("/ci/info") # clothes washer
self.get("/iz/info") # dish washer
# Retrieves registered clients like phone/hcpy itself
# self.get("/ci/registeredDevices")
# tzInfo all returns empty?
# self.get("/ci/tzInfo")
# We need to send deviceReady for some devices or /ni/ will come back as 403 unauth
self.get("/ei/deviceReady", version=2, action="NOTIFY")
self.get("/ni/info")
# self.get("/ni/config", data={"interfaceID": 0})
# self.get("/ro/allDescriptionChanges")
self.get("/ro/allMandatoryValues")
self.get("/ro/values")
def handle_message(self, buf):
msg = json.loads(buf)
if self.debug:
@@ -263,7 +317,6 @@ class HCDevice:
values = {}
if "code" in msg:
print(now(), self.name, "ERROR", msg["code"])
values = {
"error": msg["code"],
"resource": msg.get("resource", ""),
@@ -283,41 +336,27 @@ class HCDevice:
"deviceID": self.device_id,
},
)
# ask the device which services it supports
self.get("/ci/services")
# the clothes washer wants this, the token doesn't matter,
# although they do not handle padding characters
# they send a response, not sure how to interpet it
token = base64url_encode(get_random_bytes(32)).decode("UTF-8")
token = re.sub(r"=", "", token)
self.get("/ci/authentication", version=2, data={"nonce": token})
self.get("/ci/info", version=2) # clothes washer
self.get("/iz/info") # dish washer
# self.get("/ci/tzInfo", version=2)
self.get("/ni/info")
# self.get("/ni/config", data={"interfaceID": 0})
self.get("/ei/deviceReady", version=2, action="NOTIFY")
self.get("/ro/allDescriptionChanges")
self.get("/ro/allDescriptionChanges")
self.get("/ro/allMandatoryValues")
# self.get("/ro/values")
else:
print(now(), self.name, "Unknown resource", resource, file=sys.stderr)
elif action == "RESPONSE" or action == "NOTIFY":
if resource == "/iz/info" or resource == "/ci/info":
# we could validate that this matches our machine
pass
if "data" in msg and len(msg["data"]) > 0:
# Return Device Information such as Serial Number, SW Versions, MAC Address
values = msg["data"][0]
elif resource == "/ro/descriptionChange" or resource == "/ro/allDescriptionChanges":
# we asked for these but don't know have to parse yet
pass
elif resource == "/ni/info":
# we're already talking, so maybe we don't care?
if "data" in msg and len(msg["data"]) > 0:
# Return Network Information/IP Address etc
values = msg["data"][0]
elif resource == "/ni/config":
# Returns some data about network interfaces e.g.
# [{'interfaceID': 0, 'automaticIPv4': True, 'automaticIPv6': True}]
pass
elif resource == "/ro/allMandatoryValues" or resource == "/ro/values":
@@ -325,19 +364,32 @@ class HCDevice:
values = self.parse_values(msg["data"])
else:
print(now(), self.name, f"received {msg}")
elif resource == "/ci/registeredDevices":
# we don't care
# This contains details of Phone/HCPY registered as clients to the device
pass
elif resource == "/ci/tzInfo":
pass
elif resource == "/ci/authentication":
if "data" in msg and len(msg["data"]) > 0:
# Grab authentication token - unsure if this is for us to use
# or to authenticate the server. Doesn't appear to be needed
self.token = msg["data"][0]["response"]
elif resource == "/ci/services":
self.services = {}
for service in msg["data"]:
self.services[service["service"]] = {
"version": service["version"],
}
self.services_initialized = True
else:
print(now(), self.name, "Unknown response or notify:", msg)
else:
print(now(), self.name, "Unknown", msg)
print(now(), self.name, "Unknown message", msg)
# return whatever we've parsed out of it
return values