Changeset - 188deab9696b
[Not reviewed]
default
0 4 0
Dennis Fink - 3 years ago 2022-07-19 22:10:06
dennis.fink@c3l.lu
Add typing annotation, fix some bugs and update to newest version of Flask
4 files changed with 56 insertions and 60 deletions:
0 comments (0 inline, 0 general)
spaceapi/__init__.py
Show inline comments
 
import base64
 
import json
 
import logging
 
import logging.handlers
 
import os
 
import os.path
 
import secrets
 

	
 
from flask import Flask
 
from flask import Flask, Response
 
from flask_bootstrap import Bootstrap
 

	
 
config_file = os.path.abspath("config.json")
 
bootstrap = Bootstrap()
 

	
 
logging_debug_string = (
 
    "%(levelname)s:%(name)s:%(asctime)s:%(filename)s" ":%(lineno)d: %(message)s"
 
)
 
logging_string = "%(levelname)s - %(name)s - %(asctime)s - %(message)s"
 
logging_debug_formatter = logging.Formatter(logging_debug_string)
 
logging_formatter = logging.Formatter(logging_string)
 

	
 

	
 
def create_app():
 
def create_app() -> Flask:
 
    app = Flask(__name__)
 

	
 
    _default_secret_key = base64.b64encode(os.urandom(32)).decode("utf-8")
 
    config_file = (
 
        os.path.abspath("config.json")
 
        if app.debug
 
        else os.path.abspath("/etc/spaceapi.json")
 
    )
 

	
 
    try:
 
        app.config.from_file(config_file, load=json.load)
 
    except FileNotFoundError:
 
        pass
 

	
 
    _default_secret_key = base64.b64encode(secrets.token_bytes()).decode("utf-8")
 
    app.config["SECRET_KEY"] = os.environ.get("SECRET_KEY", _default_secret_key)
 

	
 
    if not hasattr(app.config, "from_json"):
 

	
 
        def from_json(file, silent=True):
 
            try:
 
                with open(file, encoding="utf-8") as json_file:
 
                    obj = json.load(json_file)
 
            except IOError:
 
                if silent:
 
                    return False
 
                raise
 

	
 
            for key in obj:
 
                if key.isupper():
 
                    app.config[key] = obj[key]
 

	
 
            return True
 

	
 
        app.config.from_json = from_json
 

	
 
    app.config.from_json(config_file, silent=True)
 
    app.config.setdefault("BOOTSTRAP_SERVE_LOCAL", True)
 

	
 
    bootstrap.init_app(app)
 

	
 
    app.logger.setLevel(logging.DEBUG)
 
    stream_handler = logging.StreamHandler()
 
    stream_handler.setLevel(logging.DEBUG)
 
    stream_handler.setFormatter(logging_debug_formatter)
 

	
 
    if not os.path.exists("spaceapi.log"):
 
        open("spaceapi.log", mode="a").close()
 

	
 
@@ -72,25 +62,25 @@ def create_app():
 

	
 
        second_rotating_file_handler = logging.handlers.RotatingFileHandler(
 
            "spaceapi_debug.log", maxBytes=1300000, backupCount=20, encoding="utf-8"
 
        )
 
        second_rotating_file_handler.setLevel(logging.DEBUG)
 
        second_rotating_file_handler.setFormatter(logging_debug_formatter)
 
        app.logger.addHandler(second_rotating_file_handler)
 

	
 
    app.logger.addHandler(stream_handler)
 
    app.logger.addHandler(rotating_file_handler)
 

	
 
    @app.after_request
 
    def add_headers(response):
 
    def add_headers(response: Response) -> Response:
 
        response.headers.setdefault("Access-Control-Allow-Origin", "*")
 
        response.headers.setdefault("Cache-Control", "no-cache")
 

	
 
        return response
 

	
 
    from .views import root_views
 

	
 
    app.register_blueprint(root_views)
 

	
 
    from .sensors import sensors_views
 

	
 
    app.register_blueprint(sensors_views, url_prefix="/sensors")
spaceapi/auth.py
Show inline comments
 
from hmac import compare_digest
 
from typing import Optional
 

	
 
from flask import current_app
 
from flask_httpauth import HTTPBasicAuth, HTTPDigestAuth
 

	
 
basicauth = HTTPBasicAuth()
 
httpauth = HTTPDigestAuth()
 

	
 

	
 
@httpauth.get_password
 
def get_pw(username):
 
def get_pw(username: str) -> Optional[str]:
 
    if username in current_app.config["HTTP_DIGEST_AUTH_USERS"]:
 
        return current_app.config["HTTP_DIGEST_AUTH_USERS"][username]
 
    return None
 

	
 

	
 
@basicauth.verify_password
 
def verify_password(username, password):
 
def verify_password(username: str, password: str) -> Optional[bool]:
 
    if username in current_app.config["HTTP_DIGEST_AUTH_USERS"]:
 
        return compare_digest(
 
            current_app.config["HTTP_DIGEST_AUTH_USERS"][username], password
 
        )
 
    return None
spaceapi/utils.py
Show inline comments
 
import email
 
import email.policy
 
import json
 
import os.path
 
import random
 
import smtplib
 
import ssl
 
from datetime import datetime
 
from functools import partial
 
from typing import Any, Dict, Iterable, List, Optional
 

	
 
import mastodon
 
import tweepy
 
from flask import current_app, request
 

	
 
default_json_file_v14 = os.path.abspath("default_v14.json")
 
last_state_file_v14 = os.path.abspath("laststate_v14.json")
 

	
 
if not os.path.exists(default_json_file_v14):
 
    raise RuntimeError("default_v14.json does not exists!")
 
elif not os.path.isfile(default_json_file_v14):
 
    raise RuntimeError("default_v14.json is not a file!")
 
@@ -62,47 +62,47 @@ class Singleton:
 

	
 
        if key not in cls._instance_dict:
 
            cls._instance_dict[key] = super().__new__(cls, *args, **kwargs)
 

	
 
        return cls._instance_dict[key]
 

	
 

	
 
class ActiveStatusv14(Singleton, dict):
 
    def __init__(self):
 
        self.default_json_file = default_json_file_v14
 
        self.last_state_file = last_state_file_v14
 

	
 
    def reload(self):
 
    def reload(self) -> None:
 

	
 
        with open(self.default_json_file, encoding="utf-8") as f:
 
            self.update(json.load(f))
 

	
 
        if os.path.exists(self.last_state_file) and os.path.isfile(
 
            self.last_state_file
 
        ):
 
            with open(self.last_state_file, encoding="utf-8") as f:
 
                last_state_json = json.load(f)
 

	
 
            self["state"] = last_state_json["state"]
 
            self["sensors"].update(last_state_json["sensors"])
 

	
 
    def save_last_state(self):
 
    def save_last_state(self) -> None:
 

	
 
        with open(self.last_state_file, mode="w", encoding="utf-8") as f:
 
            last_state = {}
 
            last_state["state"] = self["state"]
 
            last_state["sensors"] = self["sensors"]
 
            json.dump(last_state, f, sort_keys=True)
 

	
 
    def add_user_present(self, username):
 
    def add_user_present(self, username: str) -> None:
 
        if self["state"]["open"]:
 
            if "people_now_present" not in self["sensors"]:
 
                self["sensors"]["people_now_present"] = [{"value": 0}]
 

	
 
            people_now_present = self["sensors"]["people_now_present"][0]
 

	
 
            if (
 
                "names" in people_now_present
 
                and username not in people_now_present["names"]
 
            ):
 
                people_now_present["value"] += 1
 

	
 
@@ -110,190 +110,194 @@ class ActiveStatusv14(Singleton, dict):
 
                    people_now_present["names"].append(username)
 

	
 
            elif "names" not in people_now_present:
 
                people_now_present["value"] += 1
 

	
 
                if username in current_app.config["PEOPLE_NOW_PRESENT_ALLOWED"]:
 
                    people_now_present["names"] = [username]
 

	
 
            self["sensors"]["people_now_present"][0] = people_now_present
 
        else:
 
            pass
 

	
 
    def remove_user_present(self, username):
 
    def remove_user_present(self, username: str) -> None:
 
        if self["state"]["open"] and "people_now_present" in self["sensors"]:
 
            people_now_present = self["sensors"]["people_now_present"][0]
 

	
 
            if people_now_present["value"] > 0:
 
                people_now_present["value"] -= 1
 

	
 
            if "names" in people_now_present:
 

	
 
                if username in people_now_present["names"]:
 
                    people_now_present["names"].remove(username)
 

	
 
                if not people_now_present["names"] or people_now_present["value"] == 0:
 
                    del people_now_present["names"]
 

	
 
            self["sensors"]["people_now_present"][0] = people_now_present
 
        else:
 
            pass
 

	
 
    def clear_user_present(self):
 
    def clear_user_present(self) -> None:
 
        self["sensors"]["people_now_present"][0]["value"] = 0
 
        if "names" in self["sensors"]["people_now_present"][0]:
 
            del self["sensors"]["people_now_present"][0]["names"]
 

	
 
    def notify(self):
 
    def notify(self) -> None:
 
        message = (
 
            random.choice(possible_open_messages)
 
            if self["state"]["open"]
 
            else random.choice(possible_closed_messages)
 
        )
 
        message = message.format(time=datetime.now())
 
        self.send_tweet(message)
 
        self.send_toot(message)
 

	
 
        subject = (
 
            standard_open_message if self["state"]["open"] else standard_close_message
 
        )
 
        self.send_email(subject, message)
 

	
 
    def send_tweet(self, message):
 
    def send_tweet(self, message: str) -> None:
 
        if "TWITTER_CONSUMER_KEY" in current_app.config:
 
            try:
 
                auth = tweepy.OAuthHandler(
 
                    current_app.config["TWITTER_CONSUMER_KEY"],
 
                    current_app.config["TWITTER_CONSUMER_SECRET"],
 
                )
 
                auth.set_access_token(
 
                    current_app.config["TWITTER_ACCESS_TOKEN_KEY"],
 
                    current_app.config["TWITTER_ACCESS_TOKEN_SECRET"],
 
                )
 
                api = tweepy.API(auth)
 
                api.update_status(
 
                    tweet, lat=self["location"]["lat"], long=self["location"]["lon"]
 
                    message, lat=self["location"]["lat"], long=self["location"]["lon"]
 
                )
 
            except Exception as e:
 
                current_app.logger.error("Sending tweet failed! %s" % e, exc_info=True)
 

	
 
    def send_toot(self, message):
 
    def send_toot(self, message: str) -> None:
 
        if "MASTODON_USERCRED_FILE" in current_app.config:
 
            try:
 
                api = mastodon.Mastodon(
 
                    client_id="c3l_spaceapi_clientcred.secret",
 
                    access_token=current_app.config["MASTODON_USERCRED_FILE"],
 
                    api_base_url="https://chaos.social",
 
                )
 
                api.status_post(toot, visibility="unlisted")
 
                api.status_post(message, visibility="unlisted")
 
            except Exception as e:
 
                current_app.logger.error("Sending toot failed! %s" % e, exc_info=True)
 

	
 
    def send_email(self, subject, message):
 
    def send_email(self, subject: str, message: str) -> None:
 
        if "EMAIL_PASS" in current_app.config:
 
            try:
 
                msg = email.message.EmailMessage(policy=email.policy.default)
 
                msg["To"] = current_app.config["EMAIL_ANNOUNCE_ADDRESS"]
 

	
 
                email_user = current_app.config["EMAIL_USER"]
 
                msg["From"] = "spaceapibot <{email}>".format(email=email_user)
 

	
 
                msg["Subject"] = subject
 
                msg.set_content(body)
 
                msg.set_content(message)
 

	
 
                with smtplib.SMTP(
 
                    current_app.config["EMAIL_HOST"],
 
                    port=current_app.config["EMAIL_PORT"],
 
                ) as smtp:
 
                    smtp.starttls(context=ssl.create_default_context())
 
                    smtp.login(email_user, current_app.config["EMAIL_PASS"])
 
                    smtp.send_message(msg)
 
            except Exception as e:
 
                current_app.logger.error("Sending email failed! %s" % e, exc_info=True)
 

	
 
    def set_new_state(self, value=None, trigger_person=None, message=None):
 
    def set_new_state(
 
        self,
 
        value: Optional[bool] = None,
 
        trigger_person: Optional[str] = None,
 
        message: Optional[str] = None,
 
    ) -> None:
 

	
 
        if value is not None and value != self["state"]["open"]:
 
            self["state"]["open"] = value
 

	
 
            self.notify()
 

	
 
            if not value:
 
                if "people_now_present" in self["sensors"]:
 
                    self.clear_user_present()
 

	
 
                if "message" in self["state"]:
 
                    del self["state"]["message"]
 

	
 
            if trigger_person is None:
 
                if "trigger_person" in self["state"]:
 
                    del self["state"]["trigger_person"]
 
            else:
 
                self["state"]["trigger_person"] = trigger_person
 

	
 
            self["state"]["lastchange"] = int(datetime.now().timestamp())
 

	
 
        if message is not None and message:
 
            self["state"]["message"] = message
 

	
 
    def set_sensor_value(self, data, key):
 
    def set_sensor_value(self, data: Dict[str, Any], key: str) -> None:
 
        try:
 
            subkey = get_identification_key(data)
 
            subkey = first(data, frozenset(("name", "location")))
 
        except ValueError:
 
            raise
 

	
 
        try:
 
            index = fuzzy_list_find(self["sensors"][key], subkey, data[subkey])
 
            if key == "barometer":
 
                data["unit"] = "hPa"
 
            self["sensors"][key][index].update(data)
 
        except ValueError:
 
            self["sensors"][key].append(data)
 

	
 
    def set_radiation_sensor_value(self, data):
 
    def set_radiation_sensor_value(self, data: Dict[str, Any]) -> None:
 
        radiation_keys = [k for k in RADIATON_SUBKEYS if k in data]
 
        if not radiation_keys:
 
            raise ValueErrr
 
            raise ValueError
 

	
 
        for first_subkey in radiation_keys:
 
            try:
 
                second_subkey = get_identification_key(data[first_subkey])
 
                second_subkey = first(
 
                    data[first_subkey], frozenset(("name", "location"))
 
                )
 
            except ValueError:
 
                raise
 

	
 
            try:
 
                index = fuzzy_list_find(
 
                    self["sensors"][first_subkey][second_subkey],
 
                    second_subkey,
 
                    data[first_subkey][second_subkey],
 
                )
 
                self["sensors"]["radiation"][first_subkey][index].update(data)
 
            except ValueError:
 
                self["sensors"]["radiation"][first_subkey].append(data)
 

	
 

	
 
def request_wants_json():
 
def request_wants_json() -> bool:
 
    best = request.accept_mimetypes.best_match(["application/json", "text/html"])
 
    return (
 
        best == "application/json"
 
        and request.accept_mimetypes[best] > request.accept_mimetypes["text/html"]
 
    )
 

	
 

	
 
def fuzzy_list_find(lst, key, value):
 
def fuzzy_list_find(lst: List[Any], key: str, value: Any) -> int:
 

	
 
    for i, dic in enumerate(lst):
 
        if dic[key] == value:
 
            return i
 

	
 
    raise ValueError
 

	
 

	
 
def first(iterable, keys):
 
def first(iterable: Iterable[Any], keys: Iterable[str]) -> str:
 
    for key in keys:
 
        if key in iterable:
 
            return key
 

	
 
    raise ValueError
 

	
 

	
 
get_identification_key = partial(first, keys=frozenset(("name", "location")))
spaceapi/views.py
Show inline comments
 
from flask import (
 
    Blueprint,
 
    Response,
 
    abort,
 
    current_app,
 
    jsonify,
 
    redirect,
 
    render_template,
 
    request,
 
    url_for,
 
)
 

	
 
from .auth import basicauth, httpauth
 
from .utils import ActiveStatusv14, request_wants_json
 

	
 
root_views = Blueprint("root", __name__)
 

	
 

	
 
@root_views.route("/")
 
def index():
 
def index() -> Response:
 
    if request_wants_json():
 
        return jsonify(ActiveStatusv14())
 
    return render_template("index.html", status=ActiveStatusv14())
 

	
 

	
 
@root_views.route("/status.json")
 
def status_json():
 
def status_json() -> Response:
 
    return jsonify(ActiveStatusv14())
 

	
 

	
 
@root_views.route("/reload")
 
@httpauth.login_required
 
def reload():
 
def reload() -> Response:
 
    active = ActiveStatusv14()
 
    active.reload()
 
    return jsonify(active)
 

	
 

	
 
@root_views.route("/open", methods=("GET", "POST"))
 
@httpauth.login_required
 
def open():
 
def open() -> Response:
 
    if request.method == "POST":
 
        active = ActiveStatusv14()
 

	
 
        try:
 
            if (
 
                httpauth.username()
 
                in current_app.config["STATE_TRIGGER_PERSON_ALLOWED"]
 
            ):
 
                trigger_person = httpauth.username()
 
            else:
 
                trigger_person = None
 
        except KeyError:
 
@@ -64,25 +65,25 @@ def open():
 

	
 
        active.set_new_state(
 
            value=new_state, trigger_person=trigger_person, message=message
 
        )
 
        active.save_last_state()
 
        return redirect(url_for("root.index"))
 

	
 
    return render_template("open.html")
 

	
 

	
 
@root_views.route("/present", methods=("GET", "POST"))
 
@httpauth.login_required
 
def present():
 
def present() -> Response:
 
    if request.method == "POST":
 

	
 
        active = ActiveStatusv14()
 

	
 
        if active["state"]["open"]:
 
            user = (
 
                httpauth.username()
 
                if "user" not in request.form
 
                else request.form["user"]
 
            )
 
            if "present" in request.form:
 
                active.add_user_present(user)
 
@@ -91,25 +92,25 @@ def present():
 
            else:
 
                return redirect(url_for("root.index"))
 

	
 
            active.save_last_state()
 

	
 
        return redirect(url_for("root.index"))
 

	
 
    return render_template("present.html")
 

	
 

	
 
@root_views.route("/basicopen", methods=("GET", "POST"))
 
@basicauth.login_required
 
def basicopen():
 
def basicopen() -> Response:
 
    if request.method == "POST":
 
        active = ActiveStatusv14()
 

	
 
        try:
 
            if (
 
                httpauth.username()
 
                in current_app.config["STATE_TRIGGER_PERSON_ALLOWED"]
 
            ):
 
                trigger_person = httpauth.username()
 
            else:
 
                trigger_person = None
 
        except KeyError:
 
@@ -127,25 +128,25 @@ def basicopen():
 

	
 
        active.set_new_state(
 
            value=new_state, trigger_person=trigger_person, message=message
 
        )
 
        active.save_last_state()
 
        return redirect(url_for("root.index"))
 

	
 
    return render_template("open.html")
 

	
 

	
 
@root_views.route("/basicpresent", methods=("GET", "POST"))
 
@basicauth.login_required
 
def basicpresent():
 
def basicpresent() -> Response:
 
    if request.method == "POST":
 

	
 
        active = ActiveStatusv14()
 

	
 
        if active["state"]["open"]:
 
            user = (
 
                httpauth.username()
 
                if "user" not in request.form
 
                else request.form["user"]
 
            )
 
            if "present" in request.form:
 
                active.add_user_present(user)
0 comments (0 inline, 0 general)