Changeset - 6950214de3af
[Not reviewed]
default
0 2 0
Dennis Fink - 3 years ago 2022-07-19 13:57:48
dennis.fink@c3l.lu
Move sensor handling into ActiveStatus
2 files changed with 46 insertions and 61 deletions:
0 comments (0 inline, 0 general)
spaceapi/sensors.py
Show inline comments
 
import json
 
from functools import partial
 

	
 
import jsonschema
 
from flask import Blueprint, abort, current_app, jsonify, request
 
from pkg_resources import resource_filename
 

	
 
from .auth import httpauth
 
from .utils import ActiveStatusv14, first, fuzzy_list_find
 
from .utils import ActiveStatusv14
 

	
 
sensors_views = Blueprint("sensors", __name__)
 

	
 
ALLOWED_SENSORS_KEYS = json.load(
 
    open(resource_filename("spaceapi", "schema/sensors.json"), encoding="utf-8")
 
)
 

	
 
RADIATON_SUBKEYS = frozenset(("alpha", "beta", "gamma", "beta_gamma"))
 

	
 
get_identification_key = partial(first, keys=frozenset(("name", "location")))
 

	
 

	
 
def set_value(data, key):
 

	
 
    active = ActiveStatusv14()
 

	
 
    try:
 
        subkey = get_identification_key(data)
 
    except ValueError:
 
        current_app.logger.error("Subkey Error")
 
        return abort(400)
 

	
 
    try:
 
        index = fuzzy_list_find(active["sensors"][key], subkey, data[subkey])
 
        if key == "barometer":
 
            data["unit"] == "hPa"
 
        active["sensors"][key][index].update(data)
 
    except ValueError:
 
        active["sensors"][key].append(data)
 

	
 
    active.save_last_state()
 
    return jsonify(active)
 

	
 

	
 
def set_radiation_value(data):
 

	
 
    active = ActiveStatusv14()
 

	
 
    radiation_keys = [k for k in RADIATON_SUBKEYS if k in data]
 

	
 
    if not radiation_keys:
 
        return abort(400)
 

	
 
    for first_subkey in radiation_keys:
 

	
 
        try:
 
            second_subkey = get_identification_key(data[first_subkey])
 
        except ValueError:
 
            return abort(400)
 

	
 
        try:
 
            index = fuzzy_list_find(
 
                active["sensors"]["radiation"][first_subkey],
 
                second_subkey,
 
                data[first_subkey][second_subkey],
 
            )
 
            active["sensors"]["radiation"][first_subkey][index].update(data)
 
        except ValueError:
 
            active["sensors"]["radiaton"][first_subkey].append(data)
 

	
 
    active.save_last_state()
 
    return jsonify(active)
 

	
 

	
 
@sensors_views.route("/set/<key>", methods=["POST"])
 
@httpauth.login_required
 
def set_sensors(key):
 

	
 
    active = ActiveStatusv14()
 

	
 
    if key in ALLOWED_SENSORS_KEYS and key in active["sensors"]:
 
        data = request.json
 
        try:
 
            try:
 
                jsonschema.validate(data, ALLOWED_SENSORS_KEYS[key])
 
            except jsonschema.ValidationError:
 
                current_app.logger.error("Validation Error")
 
                return abort(400)
 

	
 
            if key != "radiation":
 
                return set_value(data, key)
 
                active.set_sensor_value(data, key)
 
            else:
 
                return set_radiation_value(data)
 
                active.set_radiation_sensor_value(data)
 

	
 
            active.save_last_state()
 
            return jsonify(active)
 

	
 
        except ValueError:
 
            current_app.logger.error("Value Error")
 
            return abort(400)
 
    else:
 
        current_app.logger.error("Sensor not allowed")
 
        return abort(400)
spaceapi/utils.py
Show inline comments
 
import email
 
import email.policy
 
import json
 
import os.path
 
import random
 
import smtplib
 
import ssl
 
from functools import wraps
 
from functools import partial, wraps
 
from time import time
 

	
 
import mastodon
 
import tweepy
 
from flask import current_app, request
 

	
 
default_json_file_v14 = os.path.abspath("default_v14.json")
 
last_state_file_v14 = os.path.abspath("laststate_v14.json")
 

	
 
if not os.path.exists(default_json_file_v14):
 
    raise RuntimeError("default_v14.json does not exists!")
 
elif not os.path.isfile(default_json_file_v14):
 
    raise RuntimeError("default_v14.json is not a file!")
 

	
 
standard_open_message = "The space is now open!"
 
standard_close_message = "The space is now closed!"
 

	
 
possible_open_messages = (
 
    standard_open_message,
 
    "The space is open! Come in and hack something!",
 
    "Yes, we're open! Come in and create something!",
 
    "Come by and hack something! We've just opened!",
 
    "The ChaosStuff is now open for everyone!",
 
    "Let the Chaos begin! We're open!",
 
    "What do we hack now? Come and see, we've just opened!",
 
    "TUWAT! Come and hack. We are open!",
 
)
 

	
 
possible_closed_messages = (
 
    standard_close_message,
 
    "We're closed now! See you soon.",
 
    "Sorry, we are closed now!",
 
    "The ChaosStuff is now closed! Come back another time!",
 
    "Poweroff process finished! We're closed!",
 
    "Singularity reached! The space is closed!",
 
    "Dream of electric sheeps! We are closed!",
 
)
 

	
 
get_identification_key = partial(first, keys=frozenset(("name", "location")))
 

	
 
RADIATON_SUBKEYS = frozenset(("alpha", "beta", "gamma", "beta_gamma"))
 

	
 

	
 
class Singleton:
 
    def __new__(cls, *args, **kwargs):
 
        key = str(hash(cls))
 

	
 
        if not hasattr(cls, "_instance_dict"):
 
            cls._instance_dict = {}
 

	
 
        if key not in cls._instance_dict:
 
            cls._instance_dict[key] = super().__new__(cls, *args, **kwargs)
 

	
 
        return cls._instance_dict[key]
 

	
 

	
 
class ActiveStatusv14(Singleton, dict):
 
    def __init__(self):
 
        self.default_json_file = default_json_file_v14
 
        self.last_state_file = last_state_file_v14
 

	
 
    def reload(self):
 

	
 
        with open(self.default_json_file, encoding="utf-8") as f:
 
            self.update(json.load(f))
 

	
 
@@ -205,48 +209,83 @@ class ActiveStatusv14(Singleton, dict):
 

	
 
        if value is not None and value != self["state"]["open"]:
 
            self["state"]["open"] = value
 

	
 
            self.notify()
 

	
 
            if not value:
 
                if "people_now_present" in self["sensors"]:
 
                    self.clear_user_present()
 

	
 
                if "message" in self["state"]:
 
                    del self["state"]["message"]
 

	
 
            if trigger_person is None:
 
                if "trigger_person" in self["state"]:
 
                    del self["state"]["trigger_person"]
 
            else:
 
                self["state"]["trigger_person"] = trigger_person
 

	
 
            self["state"]["lastchange"] = int(time())
 

	
 
        if message is not None and message:
 
            self["state"]["message"] = message
 

	
 
    def set_sensor_value(self, data, key):
 
        try:
 
            subkey = get_identification_key(data)
 
        except ValueError:
 
            raise
 

	
 
        try:
 
            index = fuzzy_list_find(self["sensors"][key], subkey, data[subkey])
 
            if key == "barometer":
 
                data["unit"] = "hPa"
 
            self["sensors"][key][index].update(data)
 
        except ValueError:
 
            self["sensors"][key].append(data)
 

	
 
    def set_radiation_sensor_value(self, data):
 
        radiation_keys = [k for k in RADIATON_SUBKEYS if k in data]
 
        if not radiation_keys:
 
            raise ValueErrr
 

	
 
        for first_subkey in radiation_keys:
 
            try:
 
                second_subkey = get_identification_key(data[first_subkey])
 
            except ValueError:
 
                raise
 

	
 
            try:
 
                index = fuzzy_list_find(
 
                    self["sensors"][first_subkey][second_subkey],
 
                    second_subkey,
 
                    data[first_subkey][second_subkey],
 
                )
 
                self["sensors"]["radiation"][first_subkey][index].update(data)
 
            except ValueError:
 
                self["sensors"]["radiation"][first_subkey].append(data)
 

	
 

	
 
def request_wants_json():
 
    best = request.accept_mimetypes.best_match(["application/json", "text/html"])
 
    return (
 
        best == "application/json"
 
        and request.accept_mimetypes[best] > request.accept_mimetypes["text/html"]
 
    )
 

	
 

	
 
def fuzzy_list_find(lst, key, value):
 

	
 
    for i, dic in enumerate(lst):
 
        if dic[key] == value:
 
            return i
 

	
 
    raise ValueError
 

	
 

	
 
def first(iterable, keys):
 
    for key in keys:
 
        if key in iterable:
 
            return key
 

	
 
    raise ValueError
0 comments (0 inline, 0 general)