Juice/mqtt.py

39 lines
970 B
Python

#!/usr/bin/env python3
"""
Contains MQTT funtionality.
"""
import asyncio
import paho.mqtt.client as mqtt
from models import network
def on_connect(client, userdata, flags, rc):
"""Called when the client successfully connects to the broker."""
client.subscribe("juice/#")
for device in network:
if device.mqtt_root:
client.subscribe(device.mqtt_root + '/#')
def on_message(client, userdata, msg):
"""Called when the client receives a message."""
root = msg.topic.partition('/')[0]
devices = [device for device in network if root == device.mqtt_root]
for device in devices:
coro = device.mqtt_callback(client.app, msg)
future = asyncio.run_coroutine_threadsafe(coro, client.app.loop)
future.result()
def init_client(app):
"""Initializes the MQTT client."""
client = mqtt.Client()
client.app = app
client.on_connect = on_connect
client.on_message = on_message
client.connect_async("localhost", 1883, 60)
client.loop_start()
return client