218 lines
5.2 KiB
Python
218 lines
5.2 KiB
Python
#!/usr/bin/env python3
|
|
"""Websocket events."""
|
|
import re
|
|
import types
|
|
|
|
import tools
|
|
import models
|
|
from models import network
|
|
|
|
async def toggle_outlet(request, ws, data):
|
|
"""Toggles the state of a RelayDevice."""
|
|
device_id = data.get('device_id')
|
|
sub_device_id = data.get('sub_device_id')
|
|
|
|
data = {}
|
|
data['device_id'] = device_id
|
|
data['sub_device_id'] = sub_device_id
|
|
|
|
device = network.find(device_id)
|
|
if not device:
|
|
data['ok'] = False
|
|
data['error'] = "device_id not found"
|
|
await ws.send_json(data)
|
|
return
|
|
state = device.toggle(sub_device_id)
|
|
if state is None:
|
|
data['ok'] = False
|
|
data['error'] = "sub_device_id not found"
|
|
await ws.send_json(data)
|
|
return
|
|
models.save_network(network)
|
|
|
|
data['ok'] = True
|
|
data['state'] = state
|
|
res = {'event': 'toggle_outlet', 'data': data}
|
|
await request.app.send_json_all(res)
|
|
|
|
|
|
async def edit_field(request, ws, data):
|
|
"""Edits the text of a particular field."""
|
|
device_id = data.get('device_id')
|
|
sub_device_id = data.get('sub_device_id')
|
|
field = data.get('field')
|
|
value = data.get('value')
|
|
|
|
data = {}
|
|
data['device_id'] = device_id
|
|
if sub_device_id:
|
|
data['sub_device_id'] = sub_device_id
|
|
data['field'] = field
|
|
data['value'] = value
|
|
|
|
device = network.find(device_id)
|
|
if not device:
|
|
data['ok'] = False
|
|
data['error'] = "device_id not found"
|
|
await ws.send_json(data)
|
|
return
|
|
|
|
if device.locked:
|
|
data['ok'] = False
|
|
data['error'] = "device is locked for editing"
|
|
await ws.send_json(data)
|
|
return
|
|
|
|
if sub_device_id:
|
|
sub_device = device.find(sub_device_id)
|
|
if not sub_device:
|
|
data['ok'] = False
|
|
data['error'] = "sub_device_id not found"
|
|
await ws.send_json(data)
|
|
return
|
|
|
|
if hasattr(sub_device, field):
|
|
setattr(sub_device, field, value)
|
|
else:
|
|
data['ok'] = False
|
|
data['error'] = "sub_device field not found"
|
|
await ws.send_json(data)
|
|
return
|
|
else:
|
|
if hasattr(device, field):
|
|
setattr(device, field, value)
|
|
else:
|
|
data['ok'] = False
|
|
data['error'] = "device field not found"
|
|
await ws.send_json(data)
|
|
return
|
|
models.save_network(network)
|
|
|
|
data['ok'] = True
|
|
res = {'event': 'edit_field', 'data': data}
|
|
await request.app.send_json_all(res)
|
|
|
|
|
|
async def new_device(request, ws, data):
|
|
"""
|
|
Allows adding a new device. Accepts device_type parameter, returns
|
|
the device_id.
|
|
"""
|
|
device_type = data.get('device_type')
|
|
|
|
data = {}
|
|
data['device_type'] = device_type
|
|
|
|
if device_type == 'RelayDevice':
|
|
device = models.RelayDevice()
|
|
device.sub_devices.append(models.RelayOutlet())
|
|
device.sub_devices.append(models.RelayOutlet())
|
|
device.sub_devices[0].id = 'OUT0'
|
|
device.sub_devices[0].gpio = '0'
|
|
device.sub_devices[1].id = 'OUT2'
|
|
device.sub_devices[1].gpio = '2'
|
|
device.update()
|
|
else:
|
|
data['ok'] = False
|
|
data['error'] = "unknown device type"
|
|
await ws.send_json(data)
|
|
return
|
|
|
|
devices = [dev for dev in network if dev.type == device_type]
|
|
devices.sort(key=lambda dev: dev.id)
|
|
if not devices:
|
|
device.id = device_type + '01'
|
|
else:
|
|
num = re.search(r'(\d*)$', devices[-1].id).groups()
|
|
if not num:
|
|
device.id = device_type + '01'
|
|
else:
|
|
num = str(int(num[0]) + 1).zfill(2)
|
|
device.id = device_type + num
|
|
network.append(device)
|
|
models.save_network(network)
|
|
|
|
data['ok'] = True
|
|
data['device_id'] = device.id
|
|
res = {'event': 'new_device', 'data': data}
|
|
await request.app.send_json_all(res)
|
|
|
|
|
|
async def lock_device(request, ws, data):
|
|
"""Locks or unlocks a device to prevent or allow editing it's fields."""
|
|
device_id = data.get('device_id')
|
|
locked = bool(data.get('locked'))
|
|
|
|
data = {}
|
|
data['device_id'] = device_id
|
|
data['locked'] = locked
|
|
|
|
device = network.find(device_id)
|
|
if not device:
|
|
data['ok'] = False
|
|
data['error'] = "device_id not found"
|
|
await ws.send_json(data)
|
|
return
|
|
|
|
device.locked = locked
|
|
models.save_network(network)
|
|
|
|
data['ok'] = True
|
|
res = {'event': 'lock_device', 'data': data}
|
|
await request.app.send_json_all(res)
|
|
|
|
|
|
async def delete_device(request, ws, data):
|
|
"""Deletes a device."""
|
|
device_id = data.get('device_id')
|
|
|
|
data = {}
|
|
data['device_id'] = device_id
|
|
|
|
device = network.find(device_id)
|
|
if not device:
|
|
data['ok'] = False
|
|
data['error'] = "device_id not found"
|
|
await ws.send_json(data)
|
|
return
|
|
|
|
network.remove(device)
|
|
models.save_network(network)
|
|
|
|
data['ok'] = True
|
|
res = {'event': 'delete_device', 'data': data}
|
|
await request.app.send_json_all(res)
|
|
|
|
|
|
async def neopixel(request, ws, data):
|
|
"""Changes the state of a NeoPixel strip."""
|
|
device_id = data.get('device_id')
|
|
|
|
device = network.find(device_id)
|
|
if not device:
|
|
data['ok'] = False
|
|
data['error'] = "device_id not found"
|
|
await ws.send_json(data)
|
|
return
|
|
|
|
# TODO: form validation
|
|
|
|
mqtt_msg = [device.mqtt_root]
|
|
mqtt_msg.append(data.get('change_mode'))
|
|
mqtt_msg.append(data.get('type'))
|
|
if data.get('type') == 'solid':
|
|
mqtt_msg.append(data.get('amount'))
|
|
if data.get('amount') == 'single':
|
|
mqtt_msg.append(data.get('sub_device_id').replace('led', ''))
|
|
mqtt_msg = '/'.join(mqtt_msg)
|
|
payload = tools.from_html_color(data.get('color'))
|
|
payload = str(payload)[1:-1].replace(' ', '')
|
|
request.app['mqtt'].publish(mqtt_msg, payload)
|
|
# websocket response is handled under LightStrip.mqtt_callback
|
|
|
|
|
|
events = {}
|
|
for obj in dir():
|
|
if type(locals()[obj]) == types.FunctionType:
|
|
events[locals()[obj].__name__] = locals()[obj]
|