Juice/models.py

352 lines
8.9 KiB
Python
Raw Permalink Normal View History

2019-07-02 08:56:17 -04:00
#!/usr/bin/env python3
"""
Data models for different device types managed by the Juice IOT hub.
"""
import re
import json
2020-05-15 19:06:31 -04:00
import requests
2019-07-02 08:56:17 -04:00
2019-11-18 13:05:37 -05:00
import tools
2019-11-15 10:35:56 -05:00
class Network(list):
"""
Represents a network of IOT devices.
"""
def find(self, device_id):
"""
Searches through the network and returns the matching device,
or None.
"""
for device in self:
if device.id == device_id:
break
else:
return None
return device
class Device:
"""
Represents a generic device on the Network.
"""
def __init__(self):
self.id = ""
self.type = ""
self.description = ""
self.location = ""
self.ip_address = ""
2019-11-18 13:05:37 -05:00
self.mqtt_root = ""
self.locked = False
self.sub_devices = []
2019-11-15 10:35:56 -05:00
def find(self, sub_device_id):
"""
Searches through the devices sub_devices and returns the matching
sub_device, or None.
"""
for sub_device in self.sub_devices:
if sub_device.id == sub_device_id:
break
else:
return None
return sub_device
def from_dict(self, data):
"""
Initializes self with data from dict.
"""
for key, value in data.items():
if key == 'sub_devices':
continue
else:
setattr(self, key, value)
for sub_device_dict in data.get('sub_devices'):
if sub_device_dict.get('type') == 'RelayOutlet':
sub_device = RelayOutlet().from_dict(sub_device_dict)
self.sub_devices.append(sub_device)
elif sub_device_dict.get('type') == 'NeoPixel':
sub_device = NeoPixel().from_dict(sub_device_dict)
self.sub_devices.append(sub_device)
2019-11-25 08:39:42 -05:00
elif sub_device_dict.get('type') == 'LixieDisplay':
sub_device = LixieDisplay().from_dict(sub_device_dict)
self.sub_devices.append(sub_device)
else:
typ = sub_device_dict.get('type')
raise ValueError(f"Unknown sub_device type {typ}")
return self
2019-11-18 13:05:37 -05:00
async def mqtt_callback(self, msg):
"""
Called when the MQTT client receives a message directed to this
device. Should be overridden by the device implementation.
"""
pass
class SubDevice:
"""
Represents a generic subdevice in a Device.
"""
def from_dict(self, data):
"""
Initializes self with data from dict.
"""
for key, value in data.items():
setattr(self, key, value)
return self
2019-11-15 10:35:56 -05:00
class RelayDevice(Device):
2019-07-02 08:56:17 -04:00
"""
Represents a relay receptacle device controlled by an ESP-01. The
ESP-01 acts a simple switch to turn each receptable on or off via
a relay. Each device only has two outputs, `OUT0` and `OUT2`.
"""
def __init__(self):
super().__init__()
2019-07-02 08:56:17 -04:00
self.type = "RelayDevice"
def update(self):
"""
Queries the physical device and updates the internal model as
appropriate.
"""
if not self.ip_address:
return
for sub_dev in self.sub_devices:
res = requests.get(self.ip_address)
gpio0 = re.search(r"GPIO0: (\bLow|\bHigh)", res.text).groups()[0]
sub_dev.state = gpio0 == 'High'
2019-11-15 10:35:56 -05:00
def toggle(self, sub_device_id):
2019-07-02 08:56:17 -04:00
"""
Toggles the state of a sub device.
"""
2019-11-15 10:35:56 -05:00
sub_device = self.find(sub_device_id)
if not sub_device:
2019-07-02 08:56:17 -04:00
return None
2019-11-15 10:35:56 -05:00
params = {sub_device.gpio: 'toggle'}
2019-07-02 08:56:17 -04:00
res = requests.get('http://' + self.ip_address + '/gpio',params=params)
res.raise_for_status()
state = re.search(
2019-11-15 10:35:56 -05:00
rf"GPIO{sub_device.gpio}: (\bLow|\bHigh)",
2019-07-02 08:56:17 -04:00
res.text
).groups()[0] == 'High'
2019-11-15 10:35:56 -05:00
sub_device.state = state
2019-07-02 08:56:17 -04:00
2019-11-15 10:35:56 -05:00
return state
2019-07-02 08:56:17 -04:00
class RelayOutlet(SubDevice):
2019-07-02 08:56:17 -04:00
"""
Represents a single outlet on a RelayDevice.
"""
def __init__(self):
self.id = ""
self.type = "RelayOutlet"
self.description = ""
self.gpio = ""
self.state = False
2019-11-15 10:35:56 -05:00
class LightStrip(Device):
2019-07-02 13:09:31 -04:00
"""
Represents an RGB LED light strip controller.
"""
def __init__(self):
super().__init__()
2019-07-02 13:09:31 -04:00
self.type = "LightStrip"
2019-11-20 11:00:37 -05:00
self.state = "solid"
2020-12-11 14:03:44 -05:00
self.animation = "static"
self.rainbow_params = [0.3, 0.3, 0.3, 0, 2, 4, 127, 128]
self.america_params = [8, 128]
self.animation_rotate_count = 1
self.animation_delay = 200
2019-07-02 13:09:31 -04:00
2020-05-15 19:06:31 -04:00
def mqtt_callback(self, app, msg):
2019-11-18 13:05:37 -05:00
topic = msg.topic.split('/')
2019-11-20 18:37:48 -05:00
payload = msg.payload.decode('utf8')
2019-11-18 13:05:37 -05:00
data = {}
data['ok'] = True
data['device_id'] = self.id
data['change_mode'] = topic[1]
2019-11-20 12:59:42 -05:00
if topic[1] == 'state':
if topic[2] == 'solid':
2019-11-22 07:36:10 -05:00
self.state = topic[2]
2019-11-21 20:39:47 -05:00
data['type'] = topic[2]
2019-11-20 18:37:48 -05:00
payload = payload.split(',')
payload = [int(num) for num in payload]
2019-11-20 12:59:42 -05:00
if topic[3] == 'single':
2019-11-21 20:39:47 -05:00
data['amount'] = topic[3]
2019-11-20 12:59:42 -05:00
data['sub_device_id'] = 'led' + topic[4]
sub_device = self.find('led' + topic[4])
sub_device.color = payload
elif topic[3] == 'all':
2019-11-21 20:39:47 -05:00
data['amount'] = topic[3]
2019-11-20 12:59:42 -05:00
for sub_device in self.sub_devices:
sub_device.color = payload
data['color'] = tools.to_html_color(payload)
2019-11-20 18:37:48 -05:00
elif topic[2] == 'rainbow':
2019-11-22 07:36:10 -05:00
self.state = topic[2]
2019-11-21 20:39:47 -05:00
data['type'] = topic[2]
2019-11-20 18:37:48 -05:00
payload = payload.split(',')
self.rainbow_params = [float(p) for p in payload[:3]]
self.rainbow_params += [int(p) for p in payload[3:]]
2019-11-21 08:43:02 -05:00
data['rainbow_params'] = self.rainbow_params
elif topic[2] == 'america':
2019-11-22 07:36:10 -05:00
self.state = topic[2]
2019-11-21 20:39:47 -05:00
data['type'] = topic[2]
2019-11-21 08:43:02 -05:00
payload = payload.split(',')
self.america_params = [int(n) for n in payload]
data['america_params'] = self.america_params
2019-11-21 20:39:47 -05:00
elif topic[1] == 'animation':
if topic[2] == 'mode':
data['property_type'] = topic[2]
if topic[3] == 'static':
self.animation = topic[3]
data['type'] = topic[3]
2019-11-22 07:36:10 -05:00
elif topic[3] == 'rotate_left':
2019-11-21 20:39:47 -05:00
self.animation = topic[3]
self.animation_rotate_count = int(payload)
2019-11-22 07:36:10 -05:00
data['type'] = topic[3]
data['rotate_count'] = payload
2019-11-21 20:39:47 -05:00
elif topic[3] == 'rotate_right':
data['type'] = topic[3]
data['rotate_count'] = payload
self.animation = topic[3]
self.animation_rotate_count = int(payload)
elif topic[2] == 'delay':
2019-11-22 07:36:10 -05:00
self.animation_delay = int(payload)
2019-11-21 20:39:47 -05:00
data['property_type'] = topic[2]
data['delay'] = payload
2019-11-21 08:21:02 -05:00
elif topic[1] == 'strip':
data['amount'] = topic[2]
if topic[2] == 'full':
payload = json.loads(payload)
data['colors'] = []
for i, color in enumerate(payload):
self.sub_devices[i].color = color
data['colors'].append(tools.to_html_color(color))
2019-11-20 18:37:48 -05:00
save_network(network)
2019-11-20 12:59:42 -05:00
2019-11-18 13:05:37 -05:00
res = {'event': 'neopixel', 'data': data}
2020-05-15 19:06:31 -04:00
app.loop.create_task(app.send_json_all(res))
2019-11-18 13:05:37 -05:00
class NeoPixel(SubDevice):
"""
Represents a single NeoPixel in a LightStrip.
"""
def __init__(self):
self.id = ""
self.type = "NeoPixel"
self.color = [0,0,0] # JSON doesn't have tuples
2019-07-02 13:09:31 -04:00
2019-11-25 08:39:42 -05:00
class LixieClock(Device):
"""
Represents a clock made of Lixie displays.
"""
def __init__(self):
super().__init__()
self.type = "LixieClock"
self.color = [0,0,0]
self.display_mode = 'time'
self.time_offset = -5
self.display_number = 0
2020-05-15 19:06:31 -04:00
def mqtt_callback(self, app, msg):
2019-11-25 08:39:42 -05:00
topic = msg.topic.split('/')
payload = msg.payload.decode('utf8')
data = {}
data['ok'] = True
data['device_id'] = self.id
if topic[1] == 'time':
data['change_mode'] = 'time'
self.display_mode = 'time'
elif topic[1] == 'number':
payload = int(payload)
data['change_mode'] = 'number'
data['display_number'] = payload
self.display_mode = 'number'
self.display_number = payload
elif topic[1] == 'color':
payload = payload.split(',')
payload = [int(num) for num in payload]
data['change_mode'] = 'color'
data['color'] = tools.to_html_color(payload)
self.color = payload
2020-03-18 13:07:07 -04:00
elif topic[1] == 'time_offset':
data['change_mode'] = 'time_offset'
data['time_offset'] = int(payload)
self.time_offset = int(payload)
2020-05-15 19:06:31 -04:00
elif topic[1] == 'helo':
app['mqtt'].publish(topic[0] + '/' + self.display_mode, '')
if self.display_mode == 'number':
app['mqtt'].publish(topic[0] + '/number', self.display_number)
app['mqtt'].publish(topic[0] + '/time_offset', self.time_offset)
color = ','.join([str(n) for n in self.color])
app['mqtt'].publish(topic[0] + '/color', color)
2019-11-25 08:39:42 -05:00
save_network(network)
res = {'event': 'lixie_clock', 'data': data}
2020-05-15 19:06:31 -04:00
app.loop.create_task(app.send_json_all(res))
2019-11-25 08:39:42 -05:00
class LixieDisplay(SubDevice):
"""
Represents a single Lixie display.
"""
def __init__(self):
self.id = ""
self.type = "LixieDisplay"
2019-07-02 08:56:17 -04:00
class DeviceEncoder(json.JSONEncoder):
"""
A custom json encoder for dumping devices to file.
"""
def default(self, o):
return vars(o)
def init_network(filepath="devices.json"):
"""
Initializes the network of IOT devices.
"""
with open(filepath, 'r') as file:
data = file.read()
data = json.loads(data)
2019-11-15 10:35:56 -05:00
network = Network()
2019-07-02 08:56:17 -04:00
for device_dict in data:
if device_dict.get('type') == 'RelayDevice':
device = RelayDevice().from_dict(device_dict)
network.append(device)
elif device_dict.get('type') == 'LightStrip':
2019-07-02 13:09:31 -04:00
device = LightStrip().from_dict(device_dict)
network.append(device)
2019-11-25 08:39:42 -05:00
elif device_dict.get('type') == 'LixieClock':
device = LixieClock().from_dict(device_dict)
network.append(device)
2019-07-02 08:56:17 -04:00
else:
raise ValueError(f"Unknown device type {device_dict.get('type')}")
2019-07-02 08:56:17 -04:00
return network
def save_network(network, filepath="devices.json"):
"""
Dumps the network to file.
"""
with open(filepath, 'w') as file:
file.write(json.dumps(network, cls=DeviceEncoder, indent='\t'))
2019-11-15 10:35:56 -05:00
network = init_network()