Juice/events.py

292 lines
7.6 KiB
Python

#!/usr/bin/env python3
"""Websocket events."""
import re
import types
import tools
import models
from models import network
async def toggle_outlet(request, ws, data):
"""Toggles the state of a RelayDevice."""
device_id = data.get('device_id')
sub_device_id = data.get('sub_device_id')
data = {}
data['device_id'] = device_id
data['sub_device_id'] = sub_device_id
device = network.find(device_id)
if not device:
data['ok'] = False
data['error'] = "device_id not found"
await ws.send_json(data)
return
state = device.toggle(sub_device_id)
if state is None:
data['ok'] = False
data['error'] = "sub_device_id not found"
await ws.send_json(data)
return
models.save_network(network)
data['ok'] = True
data['state'] = state
res = {'event': 'toggle_outlet', 'data': data}
await request.app.send_json_all(res)
async def edit_field(request, ws, data):
"""Edits the text of a particular field."""
device_id = data.get('device_id')
sub_device_id = data.get('sub_device_id')
field = data.get('field')
value = data.get('value')
data = {}
data['device_id'] = device_id
if sub_device_id:
data['sub_device_id'] = sub_device_id
data['field'] = field
data['value'] = value
device = network.find(device_id)
if not device:
data['ok'] = False
data['error'] = "device_id not found"
await ws.send_json(data)
return
if device.locked:
data['ok'] = False
data['error'] = "device is locked for editing"
await ws.send_json(data)
return
if sub_device_id:
sub_device = device.find(sub_device_id)
if not sub_device:
data['ok'] = False
data['error'] = "sub_device_id not found"
await ws.send_json(data)
return
if hasattr(sub_device, field):
setattr(sub_device, field, value)
else:
data['ok'] = False
data['error'] = "sub_device field not found"
await ws.send_json(data)
return
else:
if hasattr(device, field):
setattr(device, field, value)
else:
data['ok'] = False
data['error'] = "device field not found"
await ws.send_json(data)
return
models.save_network(network)
data['ok'] = True
res = {'event': 'edit_field', 'data': data}
await request.app.send_json_all(res)
async def new_device(request, ws, data):
"""
Allows adding a new device. Accepts device_type parameter, returns
the device_id.
"""
device_id = data.get('device_id', '')
device_type = data.get('device_type')
num_sub_devices = data.get('num_sub_devices')
try:
num_sub_devices = int(num_sub_devices)
except (TypeError, ValueError):
data['ok'] = False
data['error'] = "num_sub_devices value error"
await ws.send_json(data)
return
data = {}
if device_type == 'relay_device':
device = models.RelayDevice()
for n in [0,2]:
sub_device = models.RelayOutlet()
sub_device.id = 'OUT' + str(n)
sub_device.gpio = str(n)
device.sub_devices.append(sub_device)
device.update()
elif device_type == 'light_strip':
device = models.LightStrip()
for n in range(num_sub_devices):
sub_device = models.NeoPixel()
sub_device.id = 'LED' + str(n)
device.sub_devices.append(sub_device)
elif device_type == 'lixie_clock':
device = models.LixieClock()
for n in range(num_sub_devices):
sub_device = models.LixieDisplay()
sub_device.id = 'LixieDisplay' + str(n)
device.sub_devices.append(sub_device)
else:
data['ok'] = False
data['error'] = "unknown device type"
await ws.send_json(data)
return
while network.find(device_id):
device_id += '0'
device.id = device_id
network.append(device)
models.save_network(network)
data['ok'] = True
data['device_id'] = device.id
data['device_type'] = device_type
data['sub_device_ids'] = [sub_dev.id for sub_dev in device.sub_devices]
res = {'event': 'new_device', 'data': data}
await request.app.send_json_all(res)
async def lock_device(request, ws, data):
"""Locks or unlocks a device to prevent or allow editing it's fields."""
device_id = data.get('device_id')
locked = bool(data.get('locked'))
data = {}
data['device_id'] = device_id
data['locked'] = locked
device = network.find(device_id)
if not device:
data['ok'] = False
data['error'] = "device_id not found"
await ws.send_json(data)
return
device.locked = locked
models.save_network(network)
data['ok'] = True
res = {'event': 'lock_device', 'data': data}
await request.app.send_json_all(res)
async def delete_device(request, ws, data):
"""Deletes a device."""
device_id = data.get('device_id')
data = {}
data['device_id'] = device_id
device = network.find(device_id)
if not device:
data['ok'] = False
data['error'] = "device_id not found"
await ws.send_json(data)
return
network.remove(device)
models.save_network(network)
data['ok'] = True
res = {'event': 'delete_device', 'data': data}
await request.app.send_json_all(res)
async def neopixel(request, ws, data):
"""Changes the state of a NeoPixel strip."""
device_id = data.get('device_id')
device = network.find(device_id)
if not device:
data['ok'] = False
data['error'] = "device_id not found"
await ws.send_json(data)
return
mqtt_msg = [device.mqtt_root]
if data.get('change_mode') == 'state':
mqtt_msg.append(data.get('change_mode'))
if data.get('type') == 'solid':
mqtt_msg.append(data.get('type'))
if data.get('amount') == 'single':
mqtt_msg.append(data.get('amount'))
mqtt_msg.append(data.get('sub_device_id').replace('led', ''))
elif data.get('amount') == 'all':
mqtt_msg.append(data.get('amount'))
else:
error = "invalid amount"
payload = tools.from_html_color(data.get('color'))
payload = str(payload)[1:-1].replace(' ', '')
elif data.get('type') == 'rainbow':
mqtt_msg.append(data.get('type'))
payload = ','.join(data.get('rainbow_params'))
elif data.get('type') == 'america':
mqtt_msg.append(data.get('type'))
payload = ','.join(data.get('america_params'))
else:
error = "invalid state type"
elif data.get('change_mode') == 'animation':
mqtt_msg.append(data.get('change_mode'))
if data.get('property_type') == 'mode':
mqtt_msg.append(data.get('property_type'))
if data.get('type') == 'static':
mqtt_msg.append(data.get('type'))
payload = ''
elif data.get('type') == 'rotate_left':
mqtt_msg.append(data.get('type'))
payload = data.get('rotate_count')
elif data.get('type') == 'rotate_right':
mqtt_msg.append(data.get('type'))
payload = data.get('rotate_count')
elif data.get('property_type') == 'delay':
mqtt_msg.append(data.get('property_type'))
payload = data.get('delay')
mqtt_msg = '/'.join(mqtt_msg)
request.app['mqtt'].publish(mqtt_msg, payload)
# websocket response is handled under LightStrip.mqtt_callback
async def lixie_clock(request, ws, data):
"""Changes the state of a Lixie Clock."""
device_id = data.get('device_id')
device = network.find(device_id)
if not device:
data['ok'] = False
data['error'] = "device_id not found"
await ws.send_json(data)
return
mqtt_msg = [device.mqtt_root]
payload = ''
if data.get('change_mode') == 'time':
mqtt_msg.append('time')
payload = ''
elif data.get('change_mode') == 'number':
mqtt_msg.append('number')
payload = data.get('display_number')
elif data.get('change_mode') == 'color':
mqtt_msg.append('color')
payload = tools.from_html_color(data.get('color'))
payload = str(payload)[1:-1].replace(' ', '')
elif data.get('change_mode') == 'time_offset':
mqtt_msg.append('time_offset')
payload = data.get('time_offset')
mqtt_msg = '/'.join(mqtt_msg)
request.app['mqtt'].publish(mqtt_msg, payload)
# websocket response is handled under LixieClock.mqtt_callback
events = {}
for obj in dir():
if type(locals()[obj]) == types.FunctionType:
events[locals()[obj].__name__] = locals()[obj]