#!/usr/bin/env python3 """Websocket events.""" import re import types import tools import models from models import network async def toggle_outlet(request, ws, data): """Toggles the state of a RelayDevice.""" device_id = data.get('device_id') sub_device_id = data.get('sub_device_id') data = {} data['device_id'] = device_id data['sub_device_id'] = sub_device_id device = network.find(device_id) if not device: data['ok'] = False data['error'] = "device_id not found" await ws.send_json(data) return state = device.toggle(sub_device_id) if state is None: data['ok'] = False data['error'] = "sub_device_id not found" await ws.send_json(data) return models.save_network(network) data['ok'] = True data['state'] = state res = {'event': 'toggle_outlet', 'data': data} await request.app.send_json_all(res) async def edit_field(request, ws, data): """Edits the text of a particular field.""" device_id = data.get('device_id') sub_device_id = data.get('sub_device_id') field = data.get('field') value = data.get('value') data = {} data['device_id'] = device_id if sub_device_id: data['sub_device_id'] = sub_device_id data['field'] = field data['value'] = value device = network.find(device_id) if not device: data['ok'] = False data['error'] = "device_id not found" await ws.send_json(data) return if device.locked: data['ok'] = False data['error'] = "device is locked for editing" await ws.send_json(data) return if sub_device_id: sub_device = device.find(sub_device_id) if not sub_device: data['ok'] = False data['error'] = "sub_device_id not found" await ws.send_json(data) return if hasattr(sub_device, field): setattr(sub_device, field, value) else: data['ok'] = False data['error'] = "sub_device field not found" await ws.send_json(data) return else: if hasattr(device, field): setattr(device, field, value) else: data['ok'] = False data['error'] = "device field not found" await ws.send_json(data) return models.save_network(network) data['ok'] = True res = {'event': 'edit_field', 'data': data} await request.app.send_json_all(res) async def new_device(request, ws, data): """ Allows adding a new device. Accepts device_type parameter, returns the device_id. """ device_id = data.get('device_id', '') device_type = data.get('device_type') num_sub_devices = data.get('num_sub_devices') try: num_sub_devices = int(num_sub_devices) except (TypeError, ValueError): data['ok'] = False data['error'] = "num_sub_devices value error" await ws.send_json(data) return data = {} if device_type == 'relay_device': device = models.RelayDevice() for n in [0,2]: sub_device = models.RelayOutlet() sub_device.id = 'OUT' + str(n) sub_device.gpio = str(n) device.sub_devices.append(sub_device) device.update() elif device_type == 'light_strip': device = models.LightStrip() for n in range(num_sub_devices): sub_device = models.NeoPixel() sub_device.id = 'LED' + str(n) device.sub_devices.append(sub_device) elif device_type == 'lixie_clock': device = models.LixieClock() for n in range(num_sub_devices): sub_device = models.LixieDisplay() sub_device.id = 'LixieDisplay' + str(n) device.sub_devices.append(sub_device) else: data['ok'] = False data['error'] = "unknown device type" await ws.send_json(data) return while network.find(device_id): device_id += '0' device.id = device_id network.append(device) models.save_network(network) data['ok'] = True data['device_id'] = device.id data['device_type'] = device_type data['sub_device_ids'] = [sub_dev.id for sub_dev in device.sub_devices] res = {'event': 'new_device', 'data': data} await request.app.send_json_all(res) async def lock_device(request, ws, data): """Locks or unlocks a device to prevent or allow editing it's fields.""" device_id = data.get('device_id') locked = bool(data.get('locked')) data = {} data['device_id'] = device_id data['locked'] = locked device = network.find(device_id) if not device: data['ok'] = False data['error'] = "device_id not found" await ws.send_json(data) return device.locked = locked models.save_network(network) data['ok'] = True res = {'event': 'lock_device', 'data': data} await request.app.send_json_all(res) async def delete_device(request, ws, data): """Deletes a device.""" device_id = data.get('device_id') data = {} data['device_id'] = device_id device = network.find(device_id) if not device: data['ok'] = False data['error'] = "device_id not found" await ws.send_json(data) return network.remove(device) models.save_network(network) data['ok'] = True res = {'event': 'delete_device', 'data': data} await request.app.send_json_all(res) async def neopixel(request, ws, data): """Changes the state of a NeoPixel strip.""" device_id = data.get('device_id') device = network.find(device_id) if not device: data['ok'] = False data['error'] = "device_id not found" await ws.send_json(data) return mqtt_msg = [device.mqtt_root] if data.get('change_mode') == 'state': mqtt_msg.append(data.get('change_mode')) if data.get('type') == 'solid': mqtt_msg.append(data.get('type')) if data.get('amount') == 'single': mqtt_msg.append(data.get('amount')) mqtt_msg.append(data.get('sub_device_id').replace('led', '')) elif data.get('amount') == 'all': mqtt_msg.append(data.get('amount')) else: error = "invalid amount" payload = tools.from_html_color(data.get('color')) payload = str(payload)[1:-1].replace(' ', '') elif data.get('type') == 'rainbow': mqtt_msg.append(data.get('type')) payload = ','.join(data.get('rainbow_params')) elif data.get('type') == 'america': mqtt_msg.append(data.get('type')) payload = ','.join(data.get('america_params')) else: error = "invalid state type" elif data.get('change_mode') == 'animation': mqtt_msg.append(data.get('change_mode')) if data.get('property_type') == 'mode': mqtt_msg.append(data.get('property_type')) if data.get('type') == 'static': mqtt_msg.append(data.get('type')) payload = '' elif data.get('type') == 'rotate_left': mqtt_msg.append(data.get('type')) payload = data.get('rotate_count') elif data.get('type') == 'rotate_right': mqtt_msg.append(data.get('type')) payload = data.get('rotate_count') elif data.get('property_type') == 'delay': mqtt_msg.append(data.get('property_type')) payload = data.get('delay') mqtt_msg = '/'.join(mqtt_msg) request.app['mqtt'].publish(mqtt_msg, payload) # websocket response is handled under LightStrip.mqtt_callback async def lixie_clock(request, ws, data): """Changes the state of a Lixie Clock.""" device_id = data.get('device_id') device = network.find(device_id) if not device: data['ok'] = False data['error'] = "device_id not found" await ws.send_json(data) return mqtt_msg = [device.mqtt_root] payload = '' if data.get('change_mode') == 'time': mqtt_msg.append('time') payload = '' elif data.get('change_mode') == 'number': mqtt_msg.append('number') payload = data.get('display_number') elif data.get('change_mode') == 'color': mqtt_msg.append('color') payload = tools.from_html_color(data.get('color')) payload = str(payload)[1:-1].replace(' ', '') elif data.get('change_mode') == 'time_offset': mqtt_msg.append('time_offset') payload = data.get('time_offset') mqtt_msg = '/'.join(mqtt_msg) request.app['mqtt'].publish(mqtt_msg, payload) # websocket response is handled under LixieClock.mqtt_callback events = {} for obj in dir(): if type(locals()[obj]) == types.FunctionType: events[locals()[obj].__name__] = locals()[obj]