diff --git a/spaceapi/__init__.py b/spaceapi/__init__.py --- a/spaceapi/__init__.py +++ b/spaceapi/__init__.py @@ -2,13 +2,12 @@ import base64 import json import logging import logging.handlers -import os import os.path +import secrets -from flask import Flask +from flask import Flask, Response from flask_bootstrap import Bootstrap -config_file = os.path.abspath("config.json") bootstrap = Bootstrap() logging_debug_string = ( @@ -19,32 +18,23 @@ logging_debug_formatter = logging.Format logging_formatter = logging.Formatter(logging_string) -def create_app(): +def create_app() -> Flask: app = Flask(__name__) - _default_secret_key = base64.b64encode(os.urandom(32)).decode("utf-8") + config_file = ( + os.path.abspath("config.json") + if app.debug + else os.path.abspath("/etc/spaceapi.json") + ) + + try: + app.config.from_file(config_file, load=json.load) + except FileNotFoundError: + pass + + _default_secret_key = base64.b64encode(secrets.token_bytes()).decode("utf-8") app.config["SECRET_KEY"] = os.environ.get("SECRET_KEY", _default_secret_key) - if not hasattr(app.config, "from_json"): - - def from_json(file, silent=True): - try: - with open(file, encoding="utf-8") as json_file: - obj = json.load(json_file) - except IOError: - if silent: - return False - raise - - for key in obj: - if key.isupper(): - app.config[key] = obj[key] - - return True - - app.config.from_json = from_json - - app.config.from_json(config_file, silent=True) app.config.setdefault("BOOTSTRAP_SERVE_LOCAL", True) bootstrap.init_app(app) @@ -81,7 +71,7 @@ def create_app(): app.logger.addHandler(rotating_file_handler) @app.after_request - def add_headers(response): + def add_headers(response: Response) -> Response: response.headers.setdefault("Access-Control-Allow-Origin", "*") response.headers.setdefault("Cache-Control", "no-cache") diff --git a/spaceapi/auth.py b/spaceapi/auth.py --- a/spaceapi/auth.py +++ b/spaceapi/auth.py @@ -1,4 +1,5 @@ from hmac import compare_digest +from typing import Optional from flask import current_app from flask_httpauth import HTTPBasicAuth, HTTPDigestAuth @@ -8,14 +9,14 @@ httpauth = HTTPDigestAuth() @httpauth.get_password -def get_pw(username): +def get_pw(username: str) -> Optional[str]: if username in current_app.config["HTTP_DIGEST_AUTH_USERS"]: return current_app.config["HTTP_DIGEST_AUTH_USERS"][username] return None @basicauth.verify_password -def verify_password(username, password): +def verify_password(username: str, password: str) -> Optional[bool]: if username in current_app.config["HTTP_DIGEST_AUTH_USERS"]: return compare_digest( current_app.config["HTTP_DIGEST_AUTH_USERS"][username], password diff --git a/spaceapi/utils.py b/spaceapi/utils.py --- a/spaceapi/utils.py +++ b/spaceapi/utils.py @@ -6,7 +6,7 @@ import random import smtplib import ssl from datetime import datetime -from functools import partial +from typing import Any, Dict, Iterable, List, Optional import mastodon import tweepy @@ -71,7 +71,7 @@ class ActiveStatusv14(Singleton, dict): self.default_json_file = default_json_file_v14 self.last_state_file = last_state_file_v14 - def reload(self): + def reload(self) -> None: with open(self.default_json_file, encoding="utf-8") as f: self.update(json.load(f)) @@ -85,7 +85,7 @@ class ActiveStatusv14(Singleton, dict): self["state"] = last_state_json["state"] self["sensors"].update(last_state_json["sensors"]) - def save_last_state(self): + def save_last_state(self) -> None: with open(self.last_state_file, mode="w", encoding="utf-8") as f: last_state = {} @@ -93,7 +93,7 @@ class ActiveStatusv14(Singleton, dict): last_state["sensors"] = self["sensors"] json.dump(last_state, f, sort_keys=True) - def add_user_present(self, username): + def add_user_present(self, username: str) -> None: if self["state"]["open"]: if "people_now_present" not in self["sensors"]: self["sensors"]["people_now_present"] = [{"value": 0}] @@ -119,7 +119,7 @@ class ActiveStatusv14(Singleton, dict): else: pass - def remove_user_present(self, username): + def remove_user_present(self, username: str) -> None: if self["state"]["open"] and "people_now_present" in self["sensors"]: people_now_present = self["sensors"]["people_now_present"][0] @@ -138,12 +138,12 @@ class ActiveStatusv14(Singleton, dict): else: pass - def clear_user_present(self): + def clear_user_present(self) -> None: self["sensors"]["people_now_present"][0]["value"] = 0 if "names" in self["sensors"]["people_now_present"][0]: del self["sensors"]["people_now_present"][0]["names"] - def notify(self): + def notify(self) -> None: message = ( random.choice(possible_open_messages) if self["state"]["open"] @@ -158,7 +158,7 @@ class ActiveStatusv14(Singleton, dict): ) self.send_email(subject, message) - def send_tweet(self, message): + def send_tweet(self, message: str) -> None: if "TWITTER_CONSUMER_KEY" in current_app.config: try: auth = tweepy.OAuthHandler( @@ -171,12 +171,12 @@ class ActiveStatusv14(Singleton, dict): ) api = tweepy.API(auth) api.update_status( - tweet, lat=self["location"]["lat"], long=self["location"]["lon"] + message, lat=self["location"]["lat"], long=self["location"]["lon"] ) except Exception as e: current_app.logger.error("Sending tweet failed! %s" % e, exc_info=True) - def send_toot(self, message): + def send_toot(self, message: str) -> None: if "MASTODON_USERCRED_FILE" in current_app.config: try: api = mastodon.Mastodon( @@ -184,11 +184,11 @@ class ActiveStatusv14(Singleton, dict): access_token=current_app.config["MASTODON_USERCRED_FILE"], api_base_url="https://chaos.social", ) - api.status_post(toot, visibility="unlisted") + api.status_post(message, visibility="unlisted") except Exception as e: current_app.logger.error("Sending toot failed! %s" % e, exc_info=True) - def send_email(self, subject, message): + def send_email(self, subject: str, message: str) -> None: if "EMAIL_PASS" in current_app.config: try: msg = email.message.EmailMessage(policy=email.policy.default) @@ -198,7 +198,7 @@ class ActiveStatusv14(Singleton, dict): msg["From"] = "spaceapibot <{email}>".format(email=email_user) msg["Subject"] = subject - msg.set_content(body) + msg.set_content(message) with smtplib.SMTP( current_app.config["EMAIL_HOST"], @@ -210,7 +210,12 @@ class ActiveStatusv14(Singleton, dict): except Exception as e: current_app.logger.error("Sending email failed! %s" % e, exc_info=True) - def set_new_state(self, value=None, trigger_person=None, message=None): + def set_new_state( + self, + value: Optional[bool] = None, + trigger_person: Optional[str] = None, + message: Optional[str] = None, + ) -> None: if value is not None and value != self["state"]["open"]: self["state"]["open"] = value @@ -235,9 +240,9 @@ class ActiveStatusv14(Singleton, dict): if message is not None and message: self["state"]["message"] = message - def set_sensor_value(self, data, key): + def set_sensor_value(self, data: Dict[str, Any], key: str) -> None: try: - subkey = get_identification_key(data) + subkey = first(data, frozenset(("name", "location"))) except ValueError: raise @@ -249,14 +254,16 @@ class ActiveStatusv14(Singleton, dict): except ValueError: self["sensors"][key].append(data) - def set_radiation_sensor_value(self, data): + def set_radiation_sensor_value(self, data: Dict[str, Any]) -> None: radiation_keys = [k for k in RADIATON_SUBKEYS if k in data] if not radiation_keys: - raise ValueErrr + raise ValueError for first_subkey in radiation_keys: try: - second_subkey = get_identification_key(data[first_subkey]) + second_subkey = first( + data[first_subkey], frozenset(("name", "location")) + ) except ValueError: raise @@ -271,7 +278,7 @@ class ActiveStatusv14(Singleton, dict): self["sensors"]["radiation"][first_subkey].append(data) -def request_wants_json(): +def request_wants_json() -> bool: best = request.accept_mimetypes.best_match(["application/json", "text/html"]) return ( best == "application/json" @@ -279,7 +286,7 @@ def request_wants_json(): ) -def fuzzy_list_find(lst, key, value): +def fuzzy_list_find(lst: List[Any], key: str, value: Any) -> int: for i, dic in enumerate(lst): if dic[key] == value: @@ -288,12 +295,9 @@ def fuzzy_list_find(lst, key, value): raise ValueError -def first(iterable, keys): +def first(iterable: Iterable[Any], keys: Iterable[str]) -> str: for key in keys: if key in iterable: return key raise ValueError - - -get_identification_key = partial(first, keys=frozenset(("name", "location"))) diff --git a/spaceapi/views.py b/spaceapi/views.py --- a/spaceapi/views.py +++ b/spaceapi/views.py @@ -1,5 +1,6 @@ from flask import ( Blueprint, + Response, abort, current_app, jsonify, @@ -16,20 +17,20 @@ root_views = Blueprint("root", __name__) @root_views.route("/") -def index(): +def index() -> Response: if request_wants_json(): return jsonify(ActiveStatusv14()) return render_template("index.html", status=ActiveStatusv14()) @root_views.route("/status.json") -def status_json(): +def status_json() -> Response: return jsonify(ActiveStatusv14()) @root_views.route("/reload") @httpauth.login_required -def reload(): +def reload() -> Response: active = ActiveStatusv14() active.reload() return jsonify(active) @@ -37,7 +38,7 @@ def reload(): @root_views.route("/open", methods=("GET", "POST")) @httpauth.login_required -def open(): +def open() -> Response: if request.method == "POST": active = ActiveStatusv14() @@ -73,7 +74,7 @@ def open(): @root_views.route("/present", methods=("GET", "POST")) @httpauth.login_required -def present(): +def present() -> Response: if request.method == "POST": active = ActiveStatusv14() @@ -100,7 +101,7 @@ def present(): @root_views.route("/basicopen", methods=("GET", "POST")) @basicauth.login_required -def basicopen(): +def basicopen() -> Response: if request.method == "POST": active = ActiveStatusv14() @@ -136,7 +137,7 @@ def basicopen(): @root_views.route("/basicpresent", methods=("GET", "POST")) @basicauth.login_required -def basicpresent(): +def basicpresent() -> Response: if request.method == "POST": active = ActiveStatusv14()