#!/usr/bin/env python3 """ Data models for different device types managed by the Juice IOT hub. """ import re import json import requests import tools class Network(list): """ Represents a network of IOT devices. """ def find(self, device_id): """ Searches through the network and returns the matching device, or None. """ for device in self: if device.id == device_id: break else: return None return device class Device: """ Represents a generic device on the Network. """ def __init__(self): self.id = "" self.type = "" self.description = "" self.location = "" self.ip_address = "" self.mqtt_root = "" self.locked = False self.sub_devices = [] def find(self, sub_device_id): """ Searches through the devices sub_devices and returns the matching sub_device, or None. """ for sub_device in self.sub_devices: if sub_device.id == sub_device_id: break else: return None return sub_device def from_dict(self, data): """ Initializes self with data from dict. """ for key, value in data.items(): if key == 'sub_devices': continue else: setattr(self, key, value) for sub_device_dict in data.get('sub_devices'): if sub_device_dict.get('type') == 'RelayOutlet': sub_device = RelayOutlet().from_dict(sub_device_dict) self.sub_devices.append(sub_device) elif sub_device_dict.get('type') == 'NeoPixel': sub_device = NeoPixel().from_dict(sub_device_dict) self.sub_devices.append(sub_device) elif sub_device_dict.get('type') == 'LixieDisplay': sub_device = LixieDisplay().from_dict(sub_device_dict) self.sub_devices.append(sub_device) else: typ = sub_device_dict.get('type') raise ValueError(f"Unknown sub_device type {typ}") return self async def mqtt_callback(self, msg): """ Called when the MQTT client receives a message directed to this device. Should be overridden by the device implementation. """ pass class SubDevice: """ Represents a generic subdevice in a Device. """ def from_dict(self, data): """ Initializes self with data from dict. """ for key, value in data.items(): setattr(self, key, value) return self class RelayDevice(Device): """ Represents a relay receptacle device controlled by an ESP-01. The ESP-01 acts a simple switch to turn each receptable on or off via a relay. Each device only has two outputs, `OUT0` and `OUT2`. """ def __init__(self): super().__init__() self.type = "RelayDevice" def update(self): """ Queries the physical device and updates the internal model as appropriate. """ if not self.ip_address: return for sub_dev in self.sub_devices: res = requests.get(self.ip_address) gpio0 = re.search(r"GPIO0: (\bLow|\bHigh)", res.text).groups()[0] sub_dev.state = gpio0 == 'High' def toggle(self, sub_device_id): """ Toggles the state of a sub device. """ sub_device = self.find(sub_device_id) if not sub_device: return None params = {sub_device.gpio: 'toggle'} res = requests.get('http://' + self.ip_address + '/gpio',params=params) res.raise_for_status() state = re.search( rf"GPIO{sub_device.gpio}: (\bLow|\bHigh)", res.text ).groups()[0] == 'High' sub_device.state = state return state class RelayOutlet(SubDevice): """ Represents a single outlet on a RelayDevice. """ def __init__(self): self.id = "" self.type = "RelayOutlet" self.description = "" self.gpio = "" self.state = False class LightStrip(Device): """ Represents an RGB LED light strip controller. """ def __init__(self): super().__init__() self.type = "LightStrip" self.state = "solid" self.animation = "static", rainbow_params = [0.3, 0.3, 0.3, 0, 2, 4, 127, 128], america_params = [8, 128], animation_rotate_count = 1, animation_delay = 200, def mqtt_callback(self, app, msg): topic = msg.topic.split('/') payload = msg.payload.decode('utf8') data = {} data['ok'] = True data['device_id'] = self.id data['change_mode'] = topic[1] if topic[1] == 'state': if topic[2] == 'solid': self.state = topic[2] data['type'] = topic[2] payload = payload.split(',') payload = [int(num) for num in payload] if topic[3] == 'single': data['amount'] = topic[3] data['sub_device_id'] = 'led' + topic[4] sub_device = self.find('led' + topic[4]) sub_device.color = payload elif topic[3] == 'all': data['amount'] = topic[3] for sub_device in self.sub_devices: sub_device.color = payload data['color'] = tools.to_html_color(payload) elif topic[2] == 'rainbow': self.state = topic[2] data['type'] = topic[2] payload = payload.split(',') self.rainbow_params = [float(p) for p in payload[:3]] self.rainbow_params += [int(p) for p in payload[3:]] data['rainbow_params'] = self.rainbow_params elif topic[2] == 'america': self.state = topic[2] data['type'] = topic[2] payload = payload.split(',') self.america_params = [int(n) for n in payload] data['america_params'] = self.america_params elif topic[1] == 'animation': if topic[2] == 'mode': data['property_type'] = topic[2] if topic[3] == 'static': self.animation = topic[3] data['type'] = topic[3] elif topic[3] == 'rotate_left': self.animation = topic[3] self.animation_rotate_count = int(payload) data['type'] = topic[3] data['rotate_count'] = payload elif topic[3] == 'rotate_right': data['type'] = topic[3] data['rotate_count'] = payload self.animation = topic[3] self.animation_rotate_count = int(payload) elif topic[2] == 'delay': self.animation_delay = int(payload) data['property_type'] = topic[2] data['delay'] = payload elif topic[1] == 'strip': data['amount'] = topic[2] if topic[2] == 'full': payload = json.loads(payload) data['colors'] = [] for i, color in enumerate(payload): self.sub_devices[i].color = color data['colors'].append(tools.to_html_color(color)) save_network(network) res = {'event': 'neopixel', 'data': data} app.loop.create_task(app.send_json_all(res)) class NeoPixel(SubDevice): """ Represents a single NeoPixel in a LightStrip. """ def __init__(self): self.id = "" self.type = "NeoPixel" self.color = [0,0,0] # JSON doesn't have tuples class LixieClock(Device): """ Represents a clock made of Lixie displays. """ def __init__(self): super().__init__() self.type = "LixieClock" self.color = [0,0,0] self.display_mode = 'time' self.time_offset = -5 self.display_number = 0 def mqtt_callback(self, app, msg): topic = msg.topic.split('/') payload = msg.payload.decode('utf8') data = {} data['ok'] = True data['device_id'] = self.id if topic[1] == 'time': data['change_mode'] = 'time' self.display_mode = 'time' elif topic[1] == 'number': payload = int(payload) data['change_mode'] = 'number' data['display_number'] = payload self.display_mode = 'number' self.display_number = payload elif topic[1] == 'color': payload = payload.split(',') payload = [int(num) for num in payload] data['change_mode'] = 'color' data['color'] = tools.to_html_color(payload) self.color = payload elif topic[1] == 'time_offset': data['change_mode'] = 'time_offset' data['time_offset'] = int(payload) self.time_offset = int(payload) elif topic[1] == 'helo': app['mqtt'].publish(topic[0] + '/' + self.display_mode, '') if self.display_mode == 'number': app['mqtt'].publish(topic[0] + '/number', self.display_number) app['mqtt'].publish(topic[0] + '/time_offset', self.time_offset) color = ','.join([str(n) for n in self.color]) app['mqtt'].publish(topic[0] + '/color', color) save_network(network) res = {'event': 'lixie_clock', 'data': data} app.loop.create_task(app.send_json_all(res)) class LixieDisplay(SubDevice): """ Represents a single Lixie display. """ def __init__(self): self.id = "" self.type = "LixieDisplay" class DeviceEncoder(json.JSONEncoder): """ A custom json encoder for dumping devices to file. """ def default(self, o): return vars(o) def init_network(filepath="devices.json"): """ Initializes the network of IOT devices. """ with open(filepath, 'r') as file: data = file.read() data = json.loads(data) network = Network() for device_dict in data: if device_dict.get('type') == 'RelayDevice': device = RelayDevice().from_dict(device_dict) network.append(device) elif device_dict.get('type') == 'LightStrip': device = LightStrip().from_dict(device_dict) network.append(device) elif device_dict.get('type') == 'LixieClock': device = LixieClock().from_dict(device_dict) network.append(device) else: raise ValueError(f"Unknown device type {device_dict.get('type')}") return network def save_network(network, filepath="devices.json"): """ Dumps the network to file. """ with open(filepath, 'w') as file: file.write(json.dumps(network, cls=DeviceEncoder, indent='\t')) network = init_network()