#!/usr/bin/env python3 """Websocket events.""" import re import types import tools import models from models import network async def toggle_outlet(request, ws, data): """Toggles the state of a RelayDevice.""" device_id = data.get('device_id') sub_device_id = data.get('sub_device_id') data = {} data['device_id'] = device_id data['sub_device_id'] = sub_device_id device = network.find(device_id) if not device: data['ok'] = False data['error'] = "device_id not found" await ws.send_json(data) return state = device.toggle(sub_device_id) if state is None: data['ok'] = False data['error'] = "sub_device_id not found" await ws.send_json(data) return models.save_network(network) data['ok'] = True data['state'] = state res = {'event': 'toggle_outlet', 'data': data} await request.app.send_json_all(res) async def edit_field(request, ws, data): """Edits the text of a particular field.""" device_id = data.get('device_id') sub_device_id = data.get('sub_device_id') field = data.get('field') value = data.get('value') data = {} data['device_id'] = device_id if sub_device_id: data['sub_device_id'] = sub_device_id data['field'] = field data['value'] = value device = network.find(device_id) if not device: data['ok'] = False data['error'] = "device_id not found" await ws.send_json(data) return if device.locked: data['ok'] = False data['error'] = "device is locked for editing" await ws.send_json(data) return if sub_device_id: sub_device = device.find(sub_device_id) if not sub_device: data['ok'] = False data['error'] = "sub_device_id not found" await ws.send_json(data) return if hasattr(sub_device, field): setattr(sub_device, field, value) else: data['ok'] = False data['error'] = "sub_device field not found" await ws.send_json(data) return else: if hasattr(device, field): setattr(device, field, value) else: data['ok'] = False data['error'] = "device field not found" await ws.send_json(data) return models.save_network(network) data['ok'] = True res = {'event': 'edit_field', 'data': data} await request.app.send_json_all(res) async def new_device(request, ws, data): """ Allows adding a new device. Accepts device_type parameter, returns the device_id. """ device_type = data.get('device_type') data = {} data['device_type'] = device_type if device_type == 'RelayDevice': device = models.RelayDevice() device.sub_devices.append(models.RelayOutlet()) device.sub_devices.append(models.RelayOutlet()) device.sub_devices[0].id = 'OUT0' device.sub_devices[0].gpio = '0' device.sub_devices[1].id = 'OUT2' device.sub_devices[1].gpio = '2' device.update() else: data['ok'] = False data['error'] = "unknown device type" await ws.send_json(data) return devices = [dev for dev in network if dev.type == device_type] devices.sort(key=lambda dev: dev.id) if not devices: device.id = device_type + '01' else: num = re.search(r'(\d*)$', devices[-1].id).groups() if not num: device.id = device_type + '01' else: num = str(int(num[0]) + 1).zfill(2) device.id = device_type + num network.append(device) models.save_network(network) data['ok'] = True data['device_id'] = device.id res = {'event': 'new_device', 'data': data} await request.app.send_json_all(res) async def lock_device(request, ws, data): """Locks or unlocks a device to prevent or allow editing it's fields.""" device_id = data.get('device_id') locked = bool(data.get('locked')) data = {} data['device_id'] = device_id data['locked'] = locked device = network.find(device_id) if not device: data['ok'] = False data['error'] = "device_id not found" await ws.send_json(data) return device.locked = locked models.save_network(network) data['ok'] = True res = {'event': 'lock_device', 'data': data} await request.app.send_json_all(res) async def delete_device(request, ws, data): """Deletes a device.""" device_id = data.get('device_id') data = {} data['device_id'] = device_id device = network.find(device_id) if not device: data['ok'] = False data['error'] = "device_id not found" await ws.send_json(data) return network.remove(device) models.save_network(network) data['ok'] = True res = {'event': 'delete_device', 'data': data} await request.app.send_json_all(res) async def neopixel(request, ws, data): """Changes the state of a NeoPixel strip.""" device_id = data.get('device_id') device = network.find(device_id) if not device: data['ok'] = False data['error'] = "device_id not found" await ws.send_json(data) return mqtt_msg = [device.mqtt_root] if data.get('change_mode') == 'state': mqtt_msg.append(data.get('change_mode')) if data.get('type') == 'solid': mqtt_msg.append(data.get('type')) if data.get('amount') == 'single': mqtt_msg.append(data.get('amount')) mqtt_msg.append(data.get('sub_device_id').replace('led', '')) elif data.get('amount') == 'all': mqtt_msg.append(data.get('amount')) else: error = "invalid amount" payload = tools.from_html_color(data.get('color')) payload = str(payload)[1:-1].replace(' ', '') elif data.get('type') == 'rainbow': mqtt_msg.append(data.get('type')) payload = ','.join(data.get('rainbow_params')) elif data.get('type') == 'america': mqtt_msg.append(data.get('type')) payload = ','.join(data.get('america_params')) else: error = "invalid state type" elif data.get('change_mode') == 'animation': mqtt_msg.append(data.get('change_mode')) if data.get('property_type') == 'mode': mqtt_msg.append(data.get('property_type')) if data.get('type') == 'static': mqtt_msg.append(data.get('type')) payload = '' elif data.get('type') == 'rotate_left': mqtt_msg.append(data.get('type')) payload = data.get('rotate_count') elif data.get('type') == 'rotate_right': mqtt_msg.append(data.get('type')) payload = data.get('rotate_count') elif data.get('property_type') == 'delay': mqtt_msg.append(data.get('property_type')) payload = data.get('delay') mqtt_msg = '/'.join(mqtt_msg) request.app['mqtt'].publish(mqtt_msg, payload) # websocket response is handled under LightStrip.mqtt_callback async def lixie_clock(request, ws, data): """Changes the state of a Lixie Clock.""" device_id = data.get('device_id') device = network.find(device_id) if not device: data['ok'] = False data['error'] = "device_id not found" await ws.send_json(data) return mqtt_msg = [device.mqtt_root] payload = '' if data.get('change_mode') == 'time': mqtt_msg.append('time') payload = '' elif data.get('change_mode') == 'number': mqtt_msg.append('number') payload = data.get('display_number') elif data.get('change_mode') == 'color': mqtt_msg.append('color') payload = tools.from_html_color(data.get('color')) payload = str(payload)[1:-1].replace(' ', '') elif data.get('change_mode') == 'time_offset': mqtt_msg.append('time_offset') payload = data.get('time_offset') mqtt_msg = '/'.join(mqtt_msg) request.app['mqtt'].publish(mqtt_msg, payload) # websocket response is handled under LixieClock.mqtt_callback events = {} for obj in dir(): if type(locals()[obj]) == types.FunctionType: events[locals()[obj].__name__] = locals()[obj]