Files
@ 63736b19abb8
Branch filter:
Location: C3L-NOC/spaceapi/spaceapi/sensors.py
63736b19abb8
2.9 KiB
text/x-python
Autoformat
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 | import json
from functools import partial
from pkg_resources import resource_filename
import jsonschema
from flask import Blueprint, abort, current_app, jsonify, request
from .auth import httpauth
from .utils import ActiveStatus, ActiveStatusv14, first, fuzzy_list_find
sensors_views = Blueprint("sensors", __name__)
ALLOWED_SENSORS_KEYS = json.load(
open(resource_filename("spaceapi", "schema/sensors.json"), encoding="utf-8")
)
RADIATON_SUBKEYS = frozenset(("alpha", "beta", "gamma", "beta_gamma"))
get_identification_key = partial(first, keys=frozenset(("name", "location")))
def set_value(data, key):
active = ActiveStatus()
active_v14 = ActiveStatusv14()
try:
subkey = get_identification_key(data)
except ValueError:
current_app.logger.error("Subkey Error")
return abort(400)
try:
index = fuzzy_list_find(active["sensors"][key], subkey, data[subkey])
if key == "barometer":
data["unit"] == "hPA"
active["sensors"][key][index].update(data)
except ValueError:
active["sensors"][key].append(data)
try:
index_v14 = fuzzy_list_find(active_v14["sensors"][key], subkey, data[subkey])
if key == "barometer":
data["unit"] = "hPa"
active_v14["sensors"][key][index].update(data)
except ValueError:
active_v14["sensors"][key].append(data)
return jsonify(active)
def set_radiation_value(data):
active = ActiveStatus()
radiation_keys = [k for k in RADIATON_SUBKEYS if k in data]
if not radiation_keys:
return abort(400)
for first_subkey in radiation_keys:
try:
second_subkey = get_identification_key(data[first_subkey])
except ValueError:
return abort(400)
try:
index = fuzzy_list_find(
active["sensors"]["radiation"][first_subkey],
second_subkey,
data[first_subkey][second_subkey],
)
active["sensors"]["radiation"][first_subkey][index].update(data)
except ValueError:
active["sensors"]["radiaton"][first_subkey].append(data)
return jsonify(active)
@sensors_views.route("/set/<key>", methods=["POST"])
@httpauth.login_required
def set_sensors(key):
active = ActiveStatus()
if key in ALLOWED_SENSORS_KEYS and key in active["sensors"]:
data = request.json
try:
try:
jsonschema.validate(data, ALLOWED_SENSORS_KEYS[key])
except jsonschema.ValidationError:
current_app.logger.error("Validation Error")
return abort(400)
if key != "radiation":
return set_value(data, key)
else:
return set_radiation_value(data)
except ValueError:
current_app.logger.error("Value Error")
return abort(400)
else:
current_app.logger.error("Sensor not allowed")
return abort(400)
|