Changeset - 88b94840f125
[Not reviewed]
default
0 3 0
Dennis Fink - 3 years ago 2022-07-19 22:16:34
dennis.fink@c3l.lu
Add more typing
3 files changed with 9 insertions and 8 deletions:
0 comments (0 inline, 0 general)
spaceapi/auth.py
Show inline comments
 
from hmac import compare_digest
 
from typing import Optional
 
from typing import Optional, cast
 

	
 
from flask import current_app
 
from flask_httpauth import HTTPBasicAuth, HTTPDigestAuth
 

	
 
basicauth = HTTPBasicAuth()
 
httpauth = HTTPDigestAuth()
 

	
 

	
 
@httpauth.get_password
 
def get_pw(username: str) -> Optional[str]:
 
    if username in current_app.config["HTTP_DIGEST_AUTH_USERS"]:
 
        return current_app.config["HTTP_DIGEST_AUTH_USERS"][username]
 
        return cast(str, current_app.config["HTTP_DIGEST_AUTH_USERS"][username])
 
    return None
 

	
 

	
 
@basicauth.verify_password
 
def verify_password(username: str, password: str) -> Optional[bool]:
 
    if username in current_app.config["HTTP_DIGEST_AUTH_USERS"]:
 
        return compare_digest(
 
            current_app.config["HTTP_DIGEST_AUTH_USERS"][username], password
 
        )
 
    return None
spaceapi/sensors.py
Show inline comments
 
import json
 
from typing import Any, Dict, cast
 

	
 
import jsonschema
 
from flask import Blueprint, abort, current_app, jsonify, request
 
from flask import Blueprint, Response, abort, current_app, jsonify, request
 
from pkg_resources import resource_filename
 

	
 
from .auth import httpauth
 
from .utils import ActiveStatusv14
 

	
 
sensors_views = Blueprint("sensors", __name__)
 

	
 
ALLOWED_SENSORS_KEYS = json.load(
 
    open(resource_filename("spaceapi", "schema/sensors.json"), encoding="utf-8")
 
)
 

	
 

	
 
@sensors_views.route("/set/<key>", methods=["POST"])
 
@httpauth.login_required
 
def set_sensors(key):
 
def set_sensors(key: str) -> Response:
 

	
 
    active = ActiveStatusv14()
 

	
 
    if key in ALLOWED_SENSORS_KEYS and key in active["sensors"]:
 
        data = request.json
 
        data = cast(Dict[str, Any], request.json)
 
        try:
 
            try:
 
                jsonschema.validate(data, ALLOWED_SENSORS_KEYS[key])
 
            except jsonschema.ValidationError:
 
                current_app.logger.error("Validation Error")
 
                return abort(400)
 

	
 
            if key != "radiation":
 
                active.set_sensor_value(data, key)
 
            else:
 
                active.set_radiation_sensor_value(data)
 

	
 
            active.save_last_state()
 
            return jsonify(active)
 

	
 
        except ValueError:
 
            current_app.logger.error("Value Error")
 
            return abort(400)
 
    else:
 
        current_app.logger.error("Sensor not allowed")
 
        return abort(400)
spaceapi/utils.py
Show inline comments
 
@@ -33,62 +33,62 @@ possible_open_messages = (
 
    "What do we hack now? Come and see, we've just opened!",
 
    "TUWAT! Come and hack. We are open!",
 
    "It's {time:%H:%M} and we are open! Come and hack!",
 
    "We just opened our doors at {time:%H:%M}!",
 
    "Bootup process finished at {time:%H:%M}! Doors opened!",
 
)
 

	
 
possible_closed_messages = (
 
    standard_close_message,
 
    "We're closed now! See you soon.",
 
    "Sorry, we are closed now!",
 
    "The ChaosStuff is now closed! Come back another time!",
 
    "Poweroff process finished! We're closed!",
 
    "Singularity reached! The space is closed!",
 
    "Dream of electric sheeps! We are closed!",
 
    "We closed our doors at {time:%H:%M}. See you soon.",
 
    "Poweroff process finished at {time:%H:%M}. We're closed!",
 
)
 

	
 

	
 
RADIATON_SUBKEYS = frozenset(("alpha", "beta", "gamma", "beta_gamma"))
 

	
 

	
 
class Singleton:
 
    def __new__(cls, *args, **kwargs):
 
    def __new__(cls, *args: Any, **kwargs: Any) -> Any:
 
        key = str(hash(cls))
 

	
 
        if not hasattr(cls, "_instance_dict"):
 
            cls._instance_dict = {}
 
            cls._instance_dict = {}  # type: Dict[str, Any]
 

	
 
        if key not in cls._instance_dict:
 
            cls._instance_dict[key] = super().__new__(cls, *args, **kwargs)
 

	
 
        return cls._instance_dict[key]
 

	
 

	
 
class ActiveStatusv14(Singleton, dict):
 
    def __init__(self):
 
    def __init__(self) -> None:
 
        self.default_json_file = default_json_file_v14
 
        self.last_state_file = last_state_file_v14
 

	
 
    def reload(self) -> None:
 

	
 
        with open(self.default_json_file, encoding="utf-8") as f:
 
            self.update(json.load(f))
 

	
 
        if os.path.exists(self.last_state_file) and os.path.isfile(
 
            self.last_state_file
 
        ):
 
            with open(self.last_state_file, encoding="utf-8") as f:
 
                last_state_json = json.load(f)
 

	
 
            self["state"] = last_state_json["state"]
 
            self["sensors"].update(last_state_json["sensors"])
 

	
 
    def save_last_state(self) -> None:
 

	
 
        with open(self.last_state_file, mode="w", encoding="utf-8") as f:
 
            last_state = {}
 
            last_state["state"] = self["state"]
 
            last_state["sensors"] = self["sensors"]
 
            json.dump(last_state, f, sort_keys=True)
0 comments (0 inline, 0 general)