Source code for app.mqtt.device_scheduler

import time
from random import randint
from copy import deepcopy
from threading import Thread, Condition, Lock
from xml.etree.ElementTree import TreeBuilder

from flask import Flask
from flask_mqtt import Mqtt

from app.models import Device, User
from app.models.db import db
from app.mqtt.mqtt_message import MqttMesssage, SetEnergyLevelSignal, ShutdownSignal, StartupSignal

# TODO remove after testing
SECONDS_PER_HOUR = 10


[docs]class ScheduleState: """Class for representing scheduler internal state""" PUBLISH = "publish" EXEC_SYNC = "execs" EXEC_BACKGROUND = "execb" def __init__(self): self.queue = [] """FIFO queue that accumulates requests from every module of this app\n it is usually populated by ScheduleHandlers or, for example, anomaly detection routines""" self.info = {} """Dictionary to represent current state""" self.info_lock = Lock() self.wakeup = Condition() """Variable that, when notified, wakes up the scheduler"""
[docs] def notify(self): """Notify wakeup condition""" self.wakeup.acquire(True) self.wakeup.notify() self.wakeup.release()
def assign_publish(self, channel, content): if type(content) == str: content = content.encode() self.queue.append((ScheduleState.PUBLISH, channel, content)) def assign_exec_sync(self, to_call, kwargs): if type(to_call) != str: to_call = to_call.__name__ self.queue.append((ScheduleState.EXEC_SYNC, to_call, kwargs)) def assign_exec_background(self, to_call, kwargs): if type(to_call) != str: to_call = to_call.__name__ self.queue.append((ScheduleState.EXEC_BACKGROUND, to_call, kwargs))
[docs]class ScheduleHandlers: """Collection of all (default) schedule-related handlers\n Rules for implementing a handler:\n 1. every handler must receive as first argument the current state\n 2. for multithreading safety, lock before using info from (the) state object\n """
[docs] def global_shutdown(current_state: ScheduleState): """Global shutdown broadcast""" current_state.info_lock.acquire() global_channel = current_state.info["global_channel"] current_state.info_lock.release() current_state.assign_publish(global_channel, ShutdownSignal().pack()) current_state.notify()
[docs] def global_startup(current_state: ScheduleState): """Global startup broadcast""" current_state.info_lock.acquire() global_channel = current_state.info["global_channel"] current_state.info_lock.release() current_state.assign_publish(global_channel, StartupSignal().pack()) current_state.notify()
[docs] def schedule_tracker(current_state: ScheduleState, device_uuid): """Enforces schedule for a specific device""" current_state.info_lock.acquire() channel = current_state.info[device_uuid]["channel"] intervals = deepcopy(current_state.info[device_uuid]["schedule"]) current_state.info_lock.release() intervals.sort(key = lambda i: f"{i[0]}{i[1]}") idx = 0 hour = int(time.strftime("%H"), 10) delta = 0 if hour > intervals[-1][0]: delta += 24 - hour hour = 0 while intervals[idx][0] < hour: idx += 1 delta += intervals[idx][0] - hour time.sleep(delta * SECONDS_PER_HOUR) while True: current_state.assign_publish(channel, StartupSignal().pack()) time.sleep((intervals[idx][1] - intervals[idx][0]) * SECONDS_PER_HOUR) current_state.notify() current_state.assign_publish(channel, ShutdownSignal().pack()) lo = intervals[idx][1] idx = (idx + 1) % len(intervals) hi = intervals[idx][0] delta = hi - lo if hi < lo: delta += 24 time.sleep(delta * SECONDS_PER_HOUR)
[docs] def power_schedule_tracker(current_state: ScheduleState, device_uuid): """Enforces ACPI schedule for a specific device""" current_state.info_lock.acquire() channel = current_state.info[device_uuid]["channel"] intervals = deepcopy(current_state.info[device_uuid]["power_schedule"]) current_state.info_lock.release() intervals.sort(key = lambda i: f"{i[0]}{i[1]}") idx = 0 hour = int(time.strftime("%H"), 10) delta = 0 if hour > intervals[-1][0]: delta += 24 - hour hour = 0 while intervals[idx][0] < hour: idx += 1 delta += intervals[idx][0] - hour time.sleep(delta * SECONDS_PER_HOUR) while True: current_state.assign_publish(channel, SetEnergyLevelSignal(intervals[idx][3]).pack()) time.sleep((intervals[idx][1] - intervals[idx][0]) * SECONDS_PER_HOUR) current_state.notify() current_state.assign_publish(channel, SetEnergyLevelSignal(intervals[idx][2]).pack()) lo = intervals[idx][1] idx = (idx + 1) % len(intervals) hi = intervals[idx][0] delta = hi - lo if hi < lo: delta += 24 time.sleep(delta * SECONDS_PER_HOUR)
[docs] def alarm(current_state: ScheduleState, seconds, repeats, device_uuid, condition="always_true", content_generator="default_content"): """General-purpose alarm""" current_state.info_lock.acquire() channel = current_state.info[device_uuid]["channel"] current_state.info_lock.release() if repeats == -1: repeats = 1 rep = 0 while rep < repeats: time.sleep(seconds) if ScheduleHandlers.call[condition](current_state) is True: current_state.assign_publish(channel, ScheduleHandlers.call[content_generator](current_state)) current_state.notify() if repeats == -1: rep += 1
call = { "alarm": alarm, "schedule_tracker": schedule_tracker, "power_schedule_tracker": power_schedule_tracker, "global_shutdown": global_shutdown, "global_startup": global_startup, "always_true": lambda _: True, "default_content": lambda _: MqttMesssage(payload = f"ping {randint(0, 10000)}", sender = "sched").pack(), "ping_alive": lambda _: MqttMesssage(payload = f"ping {randint(0, 10000)}", sender = "sched").pack() } """Function dispatcher"""
[docs]class DeviceScheduler: """The mqtt client associated with the flask webserver\n It manages the current state of the devices and their scheduling"""
[docs] def parse_device_settings(self, device, state: ScheduleState): """Update a state object based on given device settings""" settings = device.settings if "handlers" in settings.keys(): for fct, kwargs in settings["handlers"].items(): state.assign_exec_background(fct, kwargs) state.info[device.uuid] = {} if "channel" in settings.keys(): state.info[device.uuid]["channel"] = settings["channel"] if "always_on" in settings.keys(): state.info[device.uuid]["always_on"] = settings["always_on"] if "schedule" in settings.keys(): state.info[device.uuid]["schedule"] = settings["schedule"] if "power_schedule" in settings.keys(): state.info[device.uuid]["power_schedule"] = settings["power_schedule"]
[docs] def scheduler_loop(self, state: ScheduleState): """Main scheduler infinite loop""" while True: # NOT busy waiting while len(state.queue) == 0: state.wakeup.acquire(True) # only because the wait() call needs the current thread to have the lock state.wakeup.wait() while len(state.queue) > 0: r = state.queue.pop(0) if r[0] == ScheduleState.PUBLISH: self.mqtt.publish(r[1], r[2]) elif r[0] == ScheduleState.EXEC_SYNC: ScheduleHandlers.call[r[1]](current_state=state, **r[2]) elif r[0] == ScheduleState.EXEC_BACKGROUND: thr = Thread(target=ScheduleHandlers.call[r[1]], daemon=True, args=(state,), kwargs=r[2]) thr.start()
[docs] def start_scheduler(self): """Parses each device settings for each user, calls required handlers\n and then starts the (infinite) publisher loop, for each different user""" with self.app.app_context(): for user in db.session.query(User).all(): initial_state = ScheduleState() self.per_user_scheds[user] = Thread(target=self.scheduler_loop, daemon=True, args=(initial_state,)) # NOTE: since the state object is currently referenced only here # the locks are not (yet) used initial_state.assign_exec_sync("global_startup", {}) for device in db.session.query(Device).filter_by(user_id=user.id): self.parse_device_settings(device, initial_state) initial_state.info["global_channel"] = f"{self.config['global_channel_prefix']}_{user.id}" self.per_user_scheds[user].start()
def __init__(self, app: Flask, config): self.config = config try: self.app = app self.mqtt = Mqtt(app) self.per_user_scheds = {} self.global_sched_thr = Thread(target=self.start_scheduler, daemon=True) self.global_sched_thr.start() except Exception as err: raise Exception(f"error while executing scheduler-related code: {err}")