Use run_forever websocket for ping/pong keepalives

This commit is contained in:
Meatballs1
2024-03-26 22:46:12 +00:00
parent 203c0c8c37
commit 935543f81a
3 changed files with 118 additions and 57 deletions

View File

@@ -44,6 +44,8 @@ import json
import re import re
import sys import sys
import traceback import traceback
import threading
import time
from base64 import urlsafe_b64encode as base64url_encode from base64 import urlsafe_b64encode as base64url_encode
from datetime import datetime from datetime import datetime
@@ -55,18 +57,19 @@ def now():
class HCDevice: class HCDevice:
def __init__(self, ws, features, name): def __init__(self, ws, device):
self.ws = ws self.ws = ws
self.features = features self.features = device.get("features")
self.name = device.get("name")
self.session_id = None self.session_id = None
self.tx_msg_id = None self.tx_msg_id = None
self.device_name = "hcpy" self.device_name = "hcpy"
self.device_id = "0badcafe" self.device_id = "0badcafe"
self.debug = False self.debug = False
self.name = name
self.services_initialized = False self.services_initialized = False
self.services = {} self.services = {}
self.token = None self.token = None
self.connected = False
def parse_values(self, values): def parse_values(self, values):
if not self.features: if not self.features:
@@ -152,7 +155,7 @@ class HCDevice:
feature = self.features[uid] feature = self.features[uid]
# check the access level of the feature # check the access level of the feature
print(now(), self.name, f"Processing feature {feature['name']} with uid {uid}") self.print(f"Processing feature {feature['name']} with uid {uid}")
if "access" not in feature: if "access" not in feature:
raise Exception( raise Exception(
"Unable to configure appliance. " "Unable to configure appliance. "
@@ -209,7 +212,7 @@ class HCDevice:
try: try:
return self.handle_message(buf) return self.handle_message(buf)
except Exception as e: except Exception as e:
print(self.name, "error handling msg", e, buf, traceback.format_exc()) self.print("error handling msg", e, buf, traceback.format_exc())
return None return None
# reply to a POST or GET message with new data # reply to a POST or GET message with new data
@@ -234,7 +237,7 @@ class HCDevice:
if service in self.services.keys(): if service in self.services.keys():
version = self.services[service]["version"] version = self.services[service]["version"]
else: else:
print(now(), self.name, "ERROR service not known") self.print("ERROR service not known")
msg = { msg = {
"sID": self.session_id, "sID": self.session_id,
@@ -265,16 +268,14 @@ class HCDevice:
self.tx_msg_id += 1 self.tx_msg_id += 1
def reconnect(self): def reconnect(self):
self.ws.reconnect()
# Receive initialization message /ei/initialValues # Receive initialization message /ei/initialValues
# Automatically responds in the handle_message function # Automatically responds in the handle_message function
self.recv()
# ask the device which services it supports # ask the device which services it supports
# registered devices gets pushed down too hence the loop # registered devices gets pushed down too hence the loop
self.get("/ci/services") self.get("/ci/services")
while True: while True:
self.recv() time.sleep(1)
if self.services_initialized: if self.services_initialized:
break break
@@ -308,7 +309,7 @@ class HCDevice:
def handle_message(self, buf): def handle_message(self, buf):
msg = json.loads(buf) msg = json.loads(buf)
if self.debug: if self.debug:
print(now(), self.name, "RX:", msg) self.print("RX:", msg)
sys.stdout.flush() sys.stdout.flush()
resource = msg["resource"] resource = msg["resource"]
@@ -336,8 +337,11 @@ class HCDevice:
"deviceID": self.device_id, "deviceID": self.device_id,
}, },
) )
threading.Thread(target=self.reconnect).start()
else: else:
print(now(), self.name, "Unknown resource", resource, file=sys.stderr) self.print("Unknown resource", resource, file=sys.stderr)
elif action == "RESPONSE" or action == "NOTIFY": elif action == "RESPONSE" or action == "NOTIFY":
if resource == "/iz/info" or resource == "/ci/info": if resource == "/iz/info" or resource == "/ci/info":
@@ -363,7 +367,7 @@ class HCDevice:
if "data" in msg: if "data" in msg:
values = self.parse_values(msg["data"]) values = self.parse_values(msg["data"])
else: else:
print(now(), self.name, f"received {msg}") self.print(f"received {msg}")
elif resource == "/ci/registeredDevices": elif resource == "/ci/registeredDevices":
# This contains details of Phone/HCPY registered as clients to the device # This contains details of Phone/HCPY registered as clients to the device
@@ -386,10 +390,29 @@ class HCDevice:
self.services_initialized = True self.services_initialized = True
else: else:
print(now(), self.name, "Unknown response or notify:", msg) self.print("Unknown response or notify:", msg)
else: else:
print(now(), self.name, "Unknown message", msg) self.print("Unknown message", msg)
# return whatever we've parsed out of it # return whatever we've parsed out of it
return values return values
def run_forever(self, on_message, on_open, on_close):
def _on_message(ws, message):
values = self.handle_message(message)
on_message(values)
def _on_open(ws):
self.connected = True
on_open(ws)
def _on_close(ws, code, message):
self.connected = False
on_close(ws, code, message)
def on_error(ws, message):
self.print("Websocket error:", message)
self.ws.run_forever(on_message=_on_message, on_open=_on_open, on_close=_on_close,
on_error=on_error)
def print(self, *args):
print(now(), self.name, *args)

View File

@@ -131,10 +131,6 @@ class HCSocket:
def reconnect(self): def reconnect(self):
self.reset() self.reset()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 30)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 3)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5)
sock.connect((self.host, self.port)) sock.connect((self.host, self.port))
if not self.http: if not self.http:
@@ -153,10 +149,9 @@ class HCSocket:
buf = json.dumps(msg, separators=(",", ":")) buf = json.dumps(msg, separators=(",", ":"))
# swap " for ' # swap " for '
buf = re.sub("'", '"', buf) buf = re.sub("'", '"', buf)
if self.debug: self.dprint("TX:", buf)
print(now(), "TX:", buf)
if self.http: if self.http:
self.ws.send_binary(self.encrypt(buf)) self.ws.send_bytes(self.encrypt(buf))
else: else:
self.ws.send(buf) self.ws.send(buf)
@@ -170,6 +165,54 @@ class HCSocket:
if buf is None: if buf is None:
return None return None
if self.debug: self.dprint("RX:", buf)
print(now(), "RX:", buf)
return buf return buf
def run_forever(self, on_message, on_open, on_close, on_error):
self.reset()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((self.host, self.port))
if not self.http:
sock = sslpsk.wrap_socket(
sock,
ssl_version=ssl.PROTOCOL_TLSv1_2,
ciphers="ECDHE-PSK-CHACHA20-POLY1305",
psk=self.psk,
)
def _on_open(ws):
self.dprint("on connect")
on_open(ws)
def _on_close(ws, close_status_code, close_msg):
self.dprint(f"close: {close_msg}")
on_close(ws, close_status_code, close_msg)
def _on_message(ws, message):
if self.http:
message = self.decrypt(message)
self.dprint("RX:", message)
on_message(ws, message)
def _on_error(ws, error):
self.dprint(f"error {error}")
on_error(ws, error)
print(now(), "CON:", self.uri)
self.ws = websocket.WebSocketApp(
self.uri,
socket=sock,
on_open=_on_open,
on_message=_on_message,
on_close=_on_close,
on_error=_on_error)
websocket.setdefaulttimeout(30)
self.ws.run_forever(ping_interval=120, ping_timeout=10)
# Debug print
def dprint(self, *args):
if self.debug:
print(now(), *args)

View File

@@ -26,7 +26,7 @@ from HCSocket import HCSocket, now
@click.option("--mqtt_cafile") @click.option("--mqtt_cafile")
@click.option("--mqtt_certfile") @click.option("--mqtt_certfile")
@click.option("--mqtt_keyfile") @click.option("--mqtt_keyfile")
@click.option("--mqtt_clientname", default="hcpy") @click.option("--mqtt_clientname", default="hcpy1")
@click_config_file.configuration_option() @click_config_file.configuration_option()
def hc2mqtt( def hc2mqtt(
devices_file: str, devices_file: str,
@@ -90,7 +90,10 @@ def hc2mqtt(
else: else:
raise Exception(f"Payload topic {topic} is unknown.") raise Exception(f"Payload topic {topic} is unknown.")
dev[device_name].get(resource, 1, "POST", msg) if dev[device_name].connected:
dev[device_name].get(resource, 1, "POST", msg)
else:
print(now(), device_name, "ERROR cant send message as websocket is not connected")
except Exception as e: except Exception as e:
print(now(), device_name, "ERROR", e, file=sys.stderr) print(now(), device_name, "ERROR", e, file=sys.stderr)
@@ -127,7 +130,6 @@ def hc2mqtt(
for device in devices: for device in devices:
mqtt_topic = mqtt_prefix + device["name"] mqtt_topic = mqtt_prefix + device["name"]
print(now(), f"topic: {mqtt_topic}")
thread = Thread(target=client_connect, args=(client, device, mqtt_topic)) thread = Thread(target=client_connect, args=(client, device, mqtt_topic))
thread.start() thread.start()
@@ -137,38 +139,17 @@ def hc2mqtt(
global dev global dev
dev = {} dev = {}
def client_connect(client, device, mqtt_topic): def client_connect(client, device, mqtt_topic):
host = device["host"] host = device["host"]
name = device["name"]
# HCDevice should maintain its own state?
state = {} state = {}
mqtt_set_topic = mqtt_topic + "/set" def on_message(msg):
print(now(), device["name"], f"set topic: {mqtt_set_topic}") if msg is not None:
client.subscribe(mqtt_set_topic) if len(msg) > 0:
print(now(), name, msg)
while True:
time.sleep(3)
try:
print(now(), device["name"], f"connecting to {host}")
ws = HCSocket(host, device["key"], device.get("iv", None))
dev[device["name"]] = HCDevice(ws, device.get("features", None), device["name"])
# ws.debug = True
dev[device["name"]].reconnect()
while True:
if client.is_connected():
client.publish(f"{mqtt_topic}/LWT", "", retain=True)
break
while True:
msg = dev[device["name"]].recv()
client.publish(f"{mqtt_topic}/LWT", "online")
if msg is None:
break
if len(msg) > 0:
print(now(), device["name"], msg)
update = False update = False
for key in msg.keys(): for key in msg.keys():
@@ -186,25 +167,39 @@ def client_connect(client, device, mqtt_topic):
update = True update = True
if not update: if not update:
continue return
if client.is_connected(): if client.is_connected():
msg = json.dumps(state) msg = json.dumps(state)
print(now(), device["name"], f"publish to {mqtt_topic} with {msg}") print(now(), name, f"publish to {mqtt_topic} with {msg}")
client.publish(f"{mqtt_topic}/state", msg, retain=True) client.publish(f"{mqtt_topic}/state", msg, retain=True)
else: else:
print( print(
now(), now(),
device["name"], name,
"ERROR Unable to publish update as mqtt is not connected.", "ERROR Unable to publish update as mqtt is not connected.",
) )
def on_open(ws):
client.publish(f"{mqtt_topic}/LWT", "", retain=True)
client.publish(f"{mqtt_topic}/LWT", "online")
def on_close(ws, code, message):
client.publish(f"{mqtt_topic}/LWT", "offline", retain=True)
print(now(), device["name"], "websocket closed, reconnecting...")
while True:
time.sleep(3)
try:
print(now(), name, f"connecting to {host}")
ws = HCSocket(host, device["key"], device.get("iv", None))
dev[name] = HCDevice(ws, device)
dev[name].run_forever(on_message=on_message, on_open=on_open, on_close=on_close)
except Exception as e: except Exception as e:
print(now(), device["name"], "ERROR", e, file=sys.stderr) print(now(), device["name"], "ERROR", e, file=sys.stderr)
client.publish(f"{mqtt_topic}/LWT", "offline", retain=True) client.publish(f"{mqtt_topic}/LWT", "offline", retain=True)
time.sleep(57) time.sleep(57)
if __name__ == "__main__": if __name__ == "__main__":
hc2mqtt() hc2mqtt()