Files @ 63736b19abb8
Branch filter:

Location: C3L-NOC/spaceapi/spaceapi/sensors.py

Dennis Fink
Autoformat
import json
from functools import partial

from pkg_resources import resource_filename

import jsonschema
from flask import Blueprint, abort, current_app, jsonify, request

from .auth import httpauth
from .utils import ActiveStatus, ActiveStatusv14, first, fuzzy_list_find

sensors_views = Blueprint("sensors", __name__)

ALLOWED_SENSORS_KEYS = json.load(
    open(resource_filename("spaceapi", "schema/sensors.json"), encoding="utf-8")
)

RADIATON_SUBKEYS = frozenset(("alpha", "beta", "gamma", "beta_gamma"))

get_identification_key = partial(first, keys=frozenset(("name", "location")))


def set_value(data, key):

    active = ActiveStatus()
    active_v14 = ActiveStatusv14()

    try:
        subkey = get_identification_key(data)
    except ValueError:
        current_app.logger.error("Subkey Error")
        return abort(400)

    try:
        index = fuzzy_list_find(active["sensors"][key], subkey, data[subkey])
        if key == "barometer":
            data["unit"] == "hPA"
        active["sensors"][key][index].update(data)
    except ValueError:
        active["sensors"][key].append(data)

    try:
        index_v14 = fuzzy_list_find(active_v14["sensors"][key], subkey, data[subkey])
        if key == "barometer":
            data["unit"] = "hPa"
        active_v14["sensors"][key][index].update(data)
    except ValueError:
        active_v14["sensors"][key].append(data)

    return jsonify(active)


def set_radiation_value(data):

    active = ActiveStatus()

    radiation_keys = [k for k in RADIATON_SUBKEYS if k in data]

    if not radiation_keys:
        return abort(400)

    for first_subkey in radiation_keys:

        try:
            second_subkey = get_identification_key(data[first_subkey])
        except ValueError:
            return abort(400)

        try:
            index = fuzzy_list_find(
                active["sensors"]["radiation"][first_subkey],
                second_subkey,
                data[first_subkey][second_subkey],
            )
            active["sensors"]["radiation"][first_subkey][index].update(data)
        except ValueError:
            active["sensors"]["radiaton"][first_subkey].append(data)

    return jsonify(active)


@sensors_views.route("/set/<key>", methods=["POST"])
@httpauth.login_required
def set_sensors(key):

    active = ActiveStatus()

    if key in ALLOWED_SENSORS_KEYS and key in active["sensors"]:
        data = request.json
        try:
            try:
                jsonschema.validate(data, ALLOWED_SENSORS_KEYS[key])
            except jsonschema.ValidationError:
                current_app.logger.error("Validation Error")
                return abort(400)

            if key != "radiation":
                return set_value(data, key)
            else:
                return set_radiation_value(data)

        except ValueError:
            current_app.logger.error("Value Error")
            return abort(400)
    else:
        current_app.logger.error("Sensor not allowed")
        return abort(400)