#!/usr/bin/env python3 """ Data models for different device types managed by the Juice IOT hub. """ import re import json import tools import requests class Network(list): """ Represents a network of IOT devices. """ def find(self, device_id): """ Searches through the network and returns the matching device, or None. """ for device in self: if device.id == device_id: break else: return None return device class Device: """ Represents a generic device on the Network. """ def __init__(self): self.id = "" self.type = "" self.description = "" self.location = "" self.ip_address = "" self.mqtt_root = "" self.locked = False self.sub_devices = [] def find(self, sub_device_id): """ Searches through the devices sub_devices and returns the matching sub_device, or None. """ for sub_device in self.sub_devices: if sub_device.id == sub_device_id: break else: return None return sub_device def from_dict(self, data): """ Initializes self with data from dict. """ for key, value in data.items(): if key == 'sub_devices': continue else: setattr(self, key, value) for sub_device_dict in data.get('sub_devices'): if sub_device_dict.get('type') == 'RelayOutlet': sub_device = RelayOutlet().from_dict(sub_device_dict) self.sub_devices.append(sub_device) elif sub_device_dict.get('type') == 'NeoPixel': sub_device = NeoPixel().from_dict(sub_device_dict) self.sub_devices.append(sub_device) else: typ = sub_device_dict.get('type') raise ValueError(f"Unknown sub_device type {typ}") return self async def mqtt_callback(self, msg): """ Called when the MQTT client receives a message directed to this device. Should be overridden by the device implementation. """ pass class SubDevice: """ Represents a generic subdevice in a Device. """ def from_dict(self, data): """ Initializes self with data from dict. """ for key, value in data.items(): setattr(self, key, value) return self class RelayDevice(Device): """ Represents a relay receptacle device controlled by an ESP-01. The ESP-01 acts a simple switch to turn each receptable on or off via a relay. Each device only has two outputs, `OUT0` and `OUT2`. """ def __init__(self): super().__init__() self.type = "RelayDevice" def update(self): """ Queries the physical device and updates the internal model as appropriate. """ if not self.ip_address: return for sub_dev in self.sub_devices: res = requests.get(self.ip_address) gpio0 = re.search(r"GPIO0: (\bLow|\bHigh)", res.text).groups()[0] sub_dev.state = gpio0 == 'High' def toggle(self, sub_device_id): """ Toggles the state of a sub device. """ sub_device = self.find(sub_device_id) if not sub_device: return None params = {sub_device.gpio: 'toggle'} res = requests.get('http://' + self.ip_address + '/gpio',params=params) res.raise_for_status() state = re.search( rf"GPIO{sub_device.gpio}: (\bLow|\bHigh)", res.text ).groups()[0] == 'High' sub_device.state = state return state class RelayOutlet(SubDevice): """ Represents a single outlet on a RelayDevice. """ def __init__(self): self.id = "" self.type = "RelayOutlet" self.description = "" self.gpio = "" self.state = False class LightStrip(Device): """ Represents an RGB LED light strip controller. """ def __init__(self): super().__init__() self.type = "LightStrip" async def mqtt_callback(self, app, msg): topic = msg.topic.split('/') payload = msg.payload.decode('utf8').split(',') payload = [int(num) for num in payload] sub_device = self.find('led' + topic[4]) sub_device.color = payload save_network(network) data = {} data['ok'] = True data['device_id'] = self.id data['change_mode'] = topic[1] data['type'] = topic[2] if topic[2] == 'solid': data['amount'] = topic[3] if topic[3] == 'single': data['sub_device_id'] = 'led' + topic[4] data['color'] = tools.to_html_color(payload) res = {'event': 'neopixel', 'data': data} await app.send_json_all(res) class NeoPixel(SubDevice): """ Represents a single NeoPixel in a LightStrip. """ def __init__(self): self.id = "" self.type = "NeoPixel" self.color = [0,0,0] # JSON doesn't have tuples class DeviceEncoder(json.JSONEncoder): """ A custom json encoder for dumping devices to file. """ def default(self, o): return vars(o) def init_network(filepath="devices.json"): """ Initializes the network of IOT devices. """ with open(filepath, 'r') as file: data = file.read() data = json.loads(data) network = Network() for device_dict in data: if device_dict.get('type') == 'RelayDevice': device = RelayDevice().from_dict(device_dict) network.append(device) elif device_dict.get('type') == 'LightStrip': device = LightStrip().from_dict(device_dict) network.append(device) else: raise ValueError(f"Unknown device type {device_dict.get('type')}") return network def save_network(network, filepath="devices.json"): """ Dumps the network to file. """ with open(filepath, 'w') as file: file.write(json.dumps(network, cls=DeviceEncoder, indent='\t')) network = init_network()