346 lines
8.5 KiB
Python
346 lines
8.5 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Data models for different device types managed by the Juice IOT hub.
|
|
"""
|
|
import re
|
|
import json
|
|
|
|
import tools
|
|
|
|
import requests
|
|
|
|
class Network(list):
|
|
"""
|
|
Represents a network of IOT devices.
|
|
"""
|
|
def find(self, device_id):
|
|
"""
|
|
Searches through the network and returns the matching device,
|
|
or None.
|
|
"""
|
|
for device in self:
|
|
if device.id == device_id:
|
|
break
|
|
else:
|
|
return None
|
|
return device
|
|
|
|
|
|
class Device:
|
|
"""
|
|
Represents a generic device on the Network.
|
|
"""
|
|
def __init__(self):
|
|
self.id = ""
|
|
self.type = ""
|
|
self.description = ""
|
|
self.location = ""
|
|
self.ip_address = ""
|
|
self.mqtt_root = ""
|
|
self.locked = False
|
|
self.sub_devices = []
|
|
|
|
def find(self, sub_device_id):
|
|
"""
|
|
Searches through the devices sub_devices and returns the matching
|
|
sub_device, or None.
|
|
"""
|
|
for sub_device in self.sub_devices:
|
|
if sub_device.id == sub_device_id:
|
|
break
|
|
else:
|
|
return None
|
|
return sub_device
|
|
|
|
def from_dict(self, data):
|
|
"""
|
|
Initializes self with data from dict.
|
|
"""
|
|
for key, value in data.items():
|
|
if key == 'sub_devices':
|
|
continue
|
|
else:
|
|
setattr(self, key, value)
|
|
for sub_device_dict in data.get('sub_devices'):
|
|
if sub_device_dict.get('type') == 'RelayOutlet':
|
|
sub_device = RelayOutlet().from_dict(sub_device_dict)
|
|
self.sub_devices.append(sub_device)
|
|
elif sub_device_dict.get('type') == 'NeoPixel':
|
|
sub_device = NeoPixel().from_dict(sub_device_dict)
|
|
self.sub_devices.append(sub_device)
|
|
elif sub_device_dict.get('type') == 'LixieDisplay':
|
|
sub_device = LixieDisplay().from_dict(sub_device_dict)
|
|
self.sub_devices.append(sub_device)
|
|
else:
|
|
typ = sub_device_dict.get('type')
|
|
raise ValueError(f"Unknown sub_device type {typ}")
|
|
return self
|
|
|
|
async def mqtt_callback(self, msg):
|
|
"""
|
|
Called when the MQTT client receives a message directed to this
|
|
device. Should be overridden by the device implementation.
|
|
"""
|
|
pass
|
|
|
|
|
|
class SubDevice:
|
|
"""
|
|
Represents a generic subdevice in a Device.
|
|
"""
|
|
def from_dict(self, data):
|
|
"""
|
|
Initializes self with data from dict.
|
|
"""
|
|
for key, value in data.items():
|
|
setattr(self, key, value)
|
|
return self
|
|
|
|
|
|
class RelayDevice(Device):
|
|
"""
|
|
Represents a relay receptacle device controlled by an ESP-01. The
|
|
ESP-01 acts a simple switch to turn each receptable on or off via
|
|
a relay. Each device only has two outputs, `OUT0` and `OUT2`.
|
|
"""
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.type = "RelayDevice"
|
|
|
|
def update(self):
|
|
"""
|
|
Queries the physical device and updates the internal model as
|
|
appropriate.
|
|
"""
|
|
if not self.ip_address:
|
|
return
|
|
for sub_dev in self.sub_devices:
|
|
res = requests.get(self.ip_address)
|
|
gpio0 = re.search(r"GPIO0: (\bLow|\bHigh)", res.text).groups()[0]
|
|
sub_dev.state = gpio0 == 'High'
|
|
|
|
def toggle(self, sub_device_id):
|
|
"""
|
|
Toggles the state of a sub device.
|
|
"""
|
|
sub_device = self.find(sub_device_id)
|
|
if not sub_device:
|
|
return None
|
|
|
|
params = {sub_device.gpio: 'toggle'}
|
|
res = requests.get('http://' + self.ip_address + '/gpio',params=params)
|
|
res.raise_for_status()
|
|
|
|
state = re.search(
|
|
rf"GPIO{sub_device.gpio}: (\bLow|\bHigh)",
|
|
res.text
|
|
).groups()[0] == 'High'
|
|
sub_device.state = state
|
|
|
|
return state
|
|
|
|
|
|
class RelayOutlet(SubDevice):
|
|
"""
|
|
Represents a single outlet on a RelayDevice.
|
|
"""
|
|
def __init__(self):
|
|
self.id = ""
|
|
self.type = "RelayOutlet"
|
|
self.description = ""
|
|
self.gpio = ""
|
|
self.state = False
|
|
|
|
|
|
class LightStrip(Device):
|
|
"""
|
|
Represents an RGB LED light strip controller.
|
|
"""
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.type = "LightStrip"
|
|
self.state = "solid"
|
|
self.animation = "static",
|
|
rainbow_params = [0.3, 0.3, 0.3, 0, 2, 4, 127, 128],
|
|
america_params = [8, 128],
|
|
animation_rotate_count = 1,
|
|
animation_delay = 200,
|
|
|
|
async def mqtt_callback(self, app, msg):
|
|
topic = msg.topic.split('/')
|
|
payload = msg.payload.decode('utf8')
|
|
|
|
data = {}
|
|
data['ok'] = True
|
|
data['device_id'] = self.id
|
|
data['change_mode'] = topic[1]
|
|
|
|
if topic[1] == 'state':
|
|
if topic[2] == 'solid':
|
|
self.state = topic[2]
|
|
data['type'] = topic[2]
|
|
payload = payload.split(',')
|
|
payload = [int(num) for num in payload]
|
|
if topic[3] == 'single':
|
|
data['amount'] = topic[3]
|
|
data['sub_device_id'] = 'led' + topic[4]
|
|
|
|
sub_device = self.find('led' + topic[4])
|
|
sub_device.color = payload
|
|
elif topic[3] == 'all':
|
|
data['amount'] = topic[3]
|
|
for sub_device in self.sub_devices:
|
|
sub_device.color = payload
|
|
data['color'] = tools.to_html_color(payload)
|
|
elif topic[2] == 'rainbow':
|
|
self.state = topic[2]
|
|
data['type'] = topic[2]
|
|
payload = payload.split(',')
|
|
self.rainbow_params = [float(p) for p in payload[:3]]
|
|
self.rainbow_params += [int(p) for p in payload[3:]]
|
|
data['rainbow_params'] = self.rainbow_params
|
|
elif topic[2] == 'america':
|
|
self.state = topic[2]
|
|
data['type'] = topic[2]
|
|
payload = payload.split(',')
|
|
self.america_params = [int(n) for n in payload]
|
|
data['america_params'] = self.america_params
|
|
elif topic[1] == 'animation':
|
|
if topic[2] == 'mode':
|
|
data['property_type'] = topic[2]
|
|
if topic[3] == 'static':
|
|
self.animation = topic[3]
|
|
data['type'] = topic[3]
|
|
elif topic[3] == 'rotate_left':
|
|
self.animation = topic[3]
|
|
self.animation_rotate_count = int(payload)
|
|
data['type'] = topic[3]
|
|
data['rotate_count'] = payload
|
|
elif topic[3] == 'rotate_right':
|
|
data['type'] = topic[3]
|
|
data['rotate_count'] = payload
|
|
self.animation = topic[3]
|
|
self.animation_rotate_count = int(payload)
|
|
elif topic[2] == 'delay':
|
|
self.animation_delay = int(payload)
|
|
data['property_type'] = topic[2]
|
|
data['delay'] = payload
|
|
elif topic[1] == 'strip':
|
|
data['amount'] = topic[2]
|
|
if topic[2] == 'full':
|
|
payload = json.loads(payload)
|
|
data['colors'] = []
|
|
for i, color in enumerate(payload):
|
|
self.sub_devices[i].color = color
|
|
data['colors'].append(tools.to_html_color(color))
|
|
save_network(network)
|
|
|
|
res = {'event': 'neopixel', 'data': data}
|
|
await app.send_json_all(res)
|
|
|
|
|
|
class NeoPixel(SubDevice):
|
|
"""
|
|
Represents a single NeoPixel in a LightStrip.
|
|
"""
|
|
def __init__(self):
|
|
self.id = ""
|
|
self.type = "NeoPixel"
|
|
self.color = [0,0,0] # JSON doesn't have tuples
|
|
|
|
|
|
class LixieClock(Device):
|
|
"""
|
|
Represents a clock made of Lixie displays.
|
|
"""
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.type = "LixieClock"
|
|
self.color = [0,0,0]
|
|
self.display_mode = 'time'
|
|
self.time_offset = -5
|
|
self.display_number = 0
|
|
|
|
async def mqtt_callback(self, app, msg):
|
|
topic = msg.topic.split('/')
|
|
payload = msg.payload.decode('utf8')
|
|
|
|
data = {}
|
|
data['ok'] = True
|
|
data['device_id'] = self.id
|
|
|
|
if topic[1] == 'time':
|
|
data['change_mode'] = 'time'
|
|
self.display_mode = 'time'
|
|
elif topic[1] == 'number':
|
|
payload = int(payload)
|
|
data['change_mode'] = 'number'
|
|
data['display_number'] = payload
|
|
self.display_mode = 'number'
|
|
self.display_number = payload
|
|
elif topic[1] == 'color':
|
|
payload = payload.split(',')
|
|
payload = [int(num) for num in payload]
|
|
data['change_mode'] = 'color'
|
|
data['color'] = tools.to_html_color(payload)
|
|
self.color = payload
|
|
elif topic[1] == 'time_offset':
|
|
data['change_mode'] = 'time_offset'
|
|
data['time_offset'] = int(payload)
|
|
self.time_offset = int(payload)
|
|
save_network(network)
|
|
|
|
res = {'event': 'lixie_clock', 'data': data}
|
|
await app.send_json_all(res)
|
|
|
|
|
|
class LixieDisplay(SubDevice):
|
|
"""
|
|
Represents a single Lixie display.
|
|
"""
|
|
def __init__(self):
|
|
self.id = ""
|
|
self.type = "LixieDisplay"
|
|
|
|
|
|
class DeviceEncoder(json.JSONEncoder):
|
|
"""
|
|
A custom json encoder for dumping devices to file.
|
|
"""
|
|
def default(self, o):
|
|
return vars(o)
|
|
|
|
|
|
def init_network(filepath="devices.json"):
|
|
"""
|
|
Initializes the network of IOT devices.
|
|
"""
|
|
with open(filepath, 'r') as file:
|
|
data = file.read()
|
|
data = json.loads(data)
|
|
network = Network()
|
|
for device_dict in data:
|
|
if device_dict.get('type') == 'RelayDevice':
|
|
device = RelayDevice().from_dict(device_dict)
|
|
network.append(device)
|
|
elif device_dict.get('type') == 'LightStrip':
|
|
device = LightStrip().from_dict(device_dict)
|
|
network.append(device)
|
|
elif device_dict.get('type') == 'LixieClock':
|
|
device = LixieClock().from_dict(device_dict)
|
|
network.append(device)
|
|
else:
|
|
raise ValueError(f"Unknown device type {device_dict.get('type')}")
|
|
return network
|
|
|
|
|
|
def save_network(network, filepath="devices.json"):
|
|
"""
|
|
Dumps the network to file.
|
|
"""
|
|
with open(filepath, 'w') as file:
|
|
file.write(json.dumps(network, cls=DeviceEncoder, indent='\t'))
|
|
|
|
|
|
network = init_network()
|