Changeset - 6950214de3af
[Not reviewed]
default
0 2 0
Dennis Fink - 3 years ago 2022-07-19 13:57:48
dennis.fink@c3l.lu
Move sensor handling into ActiveStatus
2 files changed with 46 insertions and 61 deletions:
0 comments (0 inline, 0 general)
spaceapi/sensors.py
Show inline comments
 
import json
 
from functools import partial
 

	
 
import jsonschema
 
from flask import Blueprint, abort, current_app, jsonify, request
 
from pkg_resources import resource_filename
 

	
 
from .auth import httpauth
 
from .utils import ActiveStatusv14, first, fuzzy_list_find
 
from .utils import ActiveStatusv14
 

	
 
sensors_views = Blueprint("sensors", __name__)
 

	
 
@@ -14,62 +13,6 @@ ALLOWED_SENSORS_KEYS = json.load(
 
    open(resource_filename("spaceapi", "schema/sensors.json"), encoding="utf-8")
 
)
 

	
 
RADIATON_SUBKEYS = frozenset(("alpha", "beta", "gamma", "beta_gamma"))
 

	
 
get_identification_key = partial(first, keys=frozenset(("name", "location")))
 

	
 

	
 
def set_value(data, key):
 

	
 
    active = ActiveStatusv14()
 

	
 
    try:
 
        subkey = get_identification_key(data)
 
    except ValueError:
 
        current_app.logger.error("Subkey Error")
 
        return abort(400)
 

	
 
    try:
 
        index = fuzzy_list_find(active["sensors"][key], subkey, data[subkey])
 
        if key == "barometer":
 
            data["unit"] == "hPa"
 
        active["sensors"][key][index].update(data)
 
    except ValueError:
 
        active["sensors"][key].append(data)
 

	
 
    active.save_last_state()
 
    return jsonify(active)
 

	
 

	
 
def set_radiation_value(data):
 

	
 
    active = ActiveStatusv14()
 

	
 
    radiation_keys = [k for k in RADIATON_SUBKEYS if k in data]
 

	
 
    if not radiation_keys:
 
        return abort(400)
 

	
 
    for first_subkey in radiation_keys:
 

	
 
        try:
 
            second_subkey = get_identification_key(data[first_subkey])
 
        except ValueError:
 
            return abort(400)
 

	
 
        try:
 
            index = fuzzy_list_find(
 
                active["sensors"]["radiation"][first_subkey],
 
                second_subkey,
 
                data[first_subkey][second_subkey],
 
            )
 
            active["sensors"]["radiation"][first_subkey][index].update(data)
 
        except ValueError:
 
            active["sensors"]["radiaton"][first_subkey].append(data)
 

	
 
    active.save_last_state()
 
    return jsonify(active)
 

	
 

	
 
@sensors_views.route("/set/<key>", methods=["POST"])
 
@httpauth.login_required
 
@@ -87,9 +30,12 @@ def set_sensors(key):
 
                return abort(400)
 

	
 
            if key != "radiation":
 
                return set_value(data, key)
 
                active.set_sensor_value(data, key)
 
            else:
 
                return set_radiation_value(data)
 
                active.set_radiation_sensor_value(data)
 

	
 
            active.save_last_state()
 
            return jsonify(active)
 

	
 
        except ValueError:
 
            current_app.logger.error("Value Error")
spaceapi/utils.py
Show inline comments
 
@@ -5,7 +5,7 @@ import os.path
 
import random
 
import smtplib
 
import ssl
 
from functools import wraps
 
from functools import partial, wraps
 
from time import time
 

	
 
import mastodon
 
@@ -44,6 +44,10 @@ possible_closed_messages = (
 
    "Dream of electric sheeps! We are closed!",
 
)
 

	
 
get_identification_key = partial(first, keys=frozenset(("name", "location")))
 

	
 
RADIATON_SUBKEYS = frozenset(("alpha", "beta", "gamma", "beta_gamma"))
 

	
 

	
 
class Singleton:
 
    def __new__(cls, *args, **kwargs):
 
@@ -226,6 +230,41 @@ class ActiveStatusv14(Singleton, dict):
 
        if message is not None and message:
 
            self["state"]["message"] = message
 

	
 
    def set_sensor_value(self, data, key):
 
        try:
 
            subkey = get_identification_key(data)
 
        except ValueError:
 
            raise
 

	
 
        try:
 
            index = fuzzy_list_find(self["sensors"][key], subkey, data[subkey])
 
            if key == "barometer":
 
                data["unit"] = "hPa"
 
            self["sensors"][key][index].update(data)
 
        except ValueError:
 
            self["sensors"][key].append(data)
 

	
 
    def set_radiation_sensor_value(self, data):
 
        radiation_keys = [k for k in RADIATON_SUBKEYS if k in data]
 
        if not radiation_keys:
 
            raise ValueErrr
 

	
 
        for first_subkey in radiation_keys:
 
            try:
 
                second_subkey = get_identification_key(data[first_subkey])
 
            except ValueError:
 
                raise
 

	
 
            try:
 
                index = fuzzy_list_find(
 
                    self["sensors"][first_subkey][second_subkey],
 
                    second_subkey,
 
                    data[first_subkey][second_subkey],
 
                )
 
                self["sensors"]["radiation"][first_subkey][index].update(data)
 
            except ValueError:
 
                self["sensors"]["radiation"][first_subkey].append(data)
 

	
 

	
 
def request_wants_json():
 
    best = request.accept_mimetypes.best_match(["application/json", "text/html"])
0 comments (0 inline, 0 general)