diff --git a/spaceapi/__init__.py b/spaceapi/__init__.py --- a/spaceapi/__init__.py +++ b/spaceapi/__init__.py @@ -1,20 +1,20 @@ +import base64 import json +import logging +import logging.handlers import os import os.path -import base64 - -import logging -import logging.handlers from flask import Flask from flask_bootstrap import Bootstrap -config_file = os.path.abspath('config.json') +config_file = os.path.abspath("config.json") bootstrap = Bootstrap() -logging_debug_string = ('%(levelname)s:%(name)s:%(asctime)s:%(filename)s' - ':%(lineno)d: %(message)s') -logging_string = '%(levelname)s - %(name)s - %(asctime)s - %(message)s' +logging_debug_string = ( + "%(levelname)s:%(name)s:%(asctime)s:%(filename)s" ":%(lineno)d: %(message)s" +) +logging_string = "%(levelname)s - %(name)s - %(asctime)s - %(message)s" logging_debug_formatter = logging.Formatter(logging_debug_string) logging_formatter = logging.Formatter(logging_string) @@ -22,14 +22,14 @@ logging_formatter = logging.Formatter(lo def create_app(): app = Flask(__name__) - _default_secret_key = base64.b64encode(os.urandom(32)).decode('utf-8') - app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', - _default_secret_key) + _default_secret_key = base64.b64encode(os.urandom(32)).decode("utf-8") + app.config["SECRET_KEY"] = os.environ.get("SECRET_KEY", _default_secret_key) - if not hasattr(app.config, 'from_json'): + if not hasattr(app.config, "from_json"): + def from_json(file, silent=True): try: - with open(file, encoding='utf-8') as json_file: + with open(file, encoding="utf-8") as json_file: obj = json.load(json_file) except IOError: if silent: @@ -45,7 +45,7 @@ def create_app(): app.config.from_json = from_json app.config.from_json(config_file, silent=True) - app.config.setdefault('BOOTSTRAP_SERVE_LOCAL', True) + app.config.setdefault("BOOTSTRAP_SERVE_LOCAL", True) bootstrap.init_app(app) @@ -54,28 +54,25 @@ def create_app(): stream_handler.setLevel(logging.DEBUG) stream_handler.setFormatter(logging_debug_formatter) - if not os.path.exists('spaceapi.log'): - open('spaceapi.log', mode='a').close() + if not os.path.exists("spaceapi.log"): + open("spaceapi.log", mode="a").close() rotating_file_handler = logging.handlers.RotatingFileHandler( - 'spaceapi.log', - maxBytes=1300000, - backupCount=10, - encoding='utf-8') + "spaceapi.log", maxBytes=1300000, backupCount=10, encoding="utf-8" + ) rotating_file_handler.setLevel(logging.INFO) rotating_file_handler.setFormatter(logging_formatter) - if app.debug or ('ENABLE_DEBUG_LOG' in app.config and - app.config['ENABLE_DEBUG_LOG']): + if app.debug or ( + "ENABLE_DEBUG_LOG" in app.config and app.config["ENABLE_DEBUG_LOG"] + ): - if not os.path.exists('spaceapi_debug.log'): - open('spaceapi_debug.log', mode='a').close() + if not os.path.exists("spaceapi_debug.log"): + open("spaceapi_debug.log", mode="a").close() second_rotating_file_handler = logging.handlers.RotatingFileHandler( - 'spaceapi_debug.log', - maxBytes=1300000, - backupCount=20, - encoding='utf-8') + "spaceapi_debug.log", maxBytes=1300000, backupCount=20, encoding="utf-8" + ) second_rotating_file_handler.setLevel(logging.DEBUG) second_rotating_file_handler.setFormatter(logging_debug_formatter) app.logger.addHandler(second_rotating_file_handler) @@ -85,21 +82,25 @@ def create_app(): @app.after_request def add_headers(response): - response.headers.setdefault('Access-Control-Allow-Origin', '*') - response.headers.setdefault('Cache-Control', 'no-cache') + response.headers.setdefault("Access-Control-Allow-Origin", "*") + response.headers.setdefault("Cache-Control", "no-cache") return response from .views import root_views + app.register_blueprint(root_views) from .state import state_views - app.register_blueprint(state_views, url_prefix='/state') + + app.register_blueprint(state_views, url_prefix="/state") from .sensors import sensors_views - app.register_blueprint(sensors_views, url_prefix='/sensors') + + app.register_blueprint(sensors_views, url_prefix="/sensors") from .utils import ActiveStatus, ActiveStatusv14 + ActiveStatus().reload() ActiveStatusv14().reload() diff --git a/spaceapi/auth.py b/spaceapi/auth.py --- a/spaceapi/auth.py +++ b/spaceapi/auth.py @@ -6,6 +6,6 @@ httpauth = HTTPDigestAuth() @httpauth.get_password def get_pw(username): - if username in current_app.config['HTTP_DIGEST_AUTH_USERS']: - return current_app.config['HTTP_DIGEST_AUTH_USERS'][username] + if username in current_app.config["HTTP_DIGEST_AUTH_USERS"]: + return current_app.config["HTTP_DIGEST_AUTH_USERS"][username] return None diff --git a/spaceapi/sensors.py b/spaceapi/sensors.py --- a/spaceapi/sensors.py +++ b/spaceapi/sensors.py @@ -1,14 +1,13 @@ import json - from functools import partial from pkg_resources import resource_filename import jsonschema -from flask import Blueprint, jsonify, request, abort, current_app +from flask import Blueprint, abort, current_app, jsonify, request from .auth import httpauth -from .utils import first, fuzzy_list_find, ActiveStatus, ActiveStatusv14 +from .utils import ActiveStatus, ActiveStatusv14, first, fuzzy_list_find sensors_views = Blueprint("sensors", __name__) diff --git a/spaceapi/state.py b/spaceapi/state.py --- a/spaceapi/state.py +++ b/spaceapi/state.py @@ -1,20 +1,19 @@ import json - from time import time -from flask import Blueprint, jsonify, request, current_app, abort +from flask import Blueprint, abort, current_app, jsonify, request +from .auth import httpauth from .utils import pass_active_status -from .auth import httpauth -state_views = Blueprint('state', __name__) +state_views = Blueprint("state", __name__) -@state_views.route('/set/', methods=['POST']) +@state_views.route("/set/", methods=["POST"]) @httpauth.login_required @pass_active_status def set_state(active, key): - value = json.loads(request.data.decode('utf-8'))['value'] + value = json.loads(request.data.decode("utf-8"))["value"] current_app.logger.info(value) current_app.logger.info(type(value)) active.set_new_state(**{key: value}) diff --git a/spaceapi/utils.py b/spaceapi/utils.py --- a/spaceapi/utils.py +++ b/spaceapi/utils.py @@ -1,86 +1,86 @@ import json import os.path -from time import time import random from functools import wraps - -from flask import request, current_app +from time import time +import mastodon import tweepy -import mastodon +from flask import current_app, request -default_json_file = os.path.abspath('default.json') -last_state_file = os.path.abspath('laststate.json') +default_json_file = os.path.abspath("default.json") +last_state_file = os.path.abspath("laststate.json") -default_json_file_v14 = os.path.abspath('default_v14.json') -last_state_file_v14 = os.path.abspath('laststate_v14.json') +default_json_file_v14 = os.path.abspath("default_v14.json") +last_state_file_v14 = os.path.abspath("laststate_v14.json") if not os.path.exists(default_json_file): - raise RuntimeError('default.json does not exists!') + raise RuntimeError("default.json does not exists!") elif not os.path.isfile(default_json_file): - raise RuntimeError('default.json is not a file!') + raise RuntimeError("default.json is not a file!") if not os.path.exists(default_json_file_v14): - raise RuntimeError('default_v14.json does not exists!') + raise RuntimeError("default_v14.json does not exists!") elif not os.path.isfile(default_json_file): - raise RuntimeError('default_v14.json is not a file!') + raise RuntimeError("default_v14.json is not a file!") possible_open_tweets = ( - 'The space is now open!', - 'The space is open! Come in and hack something!', - 'Yes, we\'re open! Come in and create something!', - 'Come by and hack something! We\'ve just opened!', - 'The ChaosStuff is now open for everyone!', - 'Let the Chaos begin! We\'re open!', - 'What do we hack now? Come and see, we\'ve just opened!', - 'TUWAT! Come and hack. We are open!', + "The space is now open!", + "The space is open! Come in and hack something!", + "Yes, we're open! Come in and create something!", + "Come by and hack something! We've just opened!", + "The ChaosStuff is now open for everyone!", + "Let the Chaos begin! We're open!", + "What do we hack now? Come and see, we've just opened!", + "TUWAT! Come and hack. We are open!", ) possible_closed_tweets = ( - 'The space is now closed!', - 'We\'re closed now! See you soon.', - 'Sorry, we are closed now!', - 'The ChaosStuff is now closed! Come back another time!', - 'Poweroff process finished! We\'re closed!', - 'Singularity reached! The space is closed!', - 'Dream of electric sheeps! We are closed!', + "The space is now closed!", + "We're closed now! See you soon.", + "Sorry, we are closed now!", + "The ChaosStuff is now closed! Come back another time!", + "Poweroff process finished! We're closed!", + "Singularity reached! The space is closed!", + "Dream of electric sheeps! We are closed!", ) def post_tweet(tweet, spaceapi=None): - if 'TWITTER_CONSUMER_KEY' in current_app.config: + if "TWITTER_CONSUMER_KEY" in current_app.config: auth = tweepy.OAuthHandler( - current_app.config['TWITTER_CONSUMER_KEY'], - current_app.config['TWITTER_CONSUMER_SECRET'] + current_app.config["TWITTER_CONSUMER_KEY"], + current_app.config["TWITTER_CONSUMER_SECRET"], ) auth.set_access_token( - current_app.config['TWITTER_ACCESS_TOKEN_KEY'], - current_app.config['TWITTER_ACCESS_TOKEN_SECRET'] + current_app.config["TWITTER_ACCESS_TOKEN_KEY"], + current_app.config["TWITTER_ACCESS_TOKEN_SECRET"], ) api = tweepy.API(auth) if spaceapi is None: api.update_status(tweet) else: - api.update_status(tweet, lat=spaceapi['location']['lat'], lon=spaceapi['location']['lon']) + api.update_status( + tweet, lat=spaceapi["location"]["lat"], lon=spaceapi["location"]["lon"] + ) def post_toot(toot): - if 'MASTODON_USERCRED_FILE' in current_app.config: + if "MASTODON_USERCRED_FILE" in current_app.config: api = mastodon.Mastodon( - client_id='c3l_spaceapi_clientcred.secret', - access_token=current_app.config['MASTODON_USERCRED_FILE'], - api_base_url='https://chaos.social' + client_id="c3l_spaceapi_clientcred.secret", + access_token=current_app.config["MASTODON_USERCRED_FILE"], + api_base_url="https://chaos.social", ) - api.status_post(toot, visibility='unlisted') + api.status_post(toot, visibility="unlisted") class Singleton: - def __new__(cls, *args, **kwargs): key = str(hash(cls)) - if not hasattr(cls, '_instance_dict'): + if not hasattr(cls, "_instance_dict"): cls._instance_dict = {} if key not in cls._instance_dict: @@ -90,120 +90,125 @@ class Singleton: class ActiveStatus(Singleton, dict): - def __init__(self): self.default_json_file = default_json_file self.last_state_file = last_state_file def reload(self): - with open(self.default_json_file, encoding='utf-8') as f: + with open(self.default_json_file, encoding="utf-8") as f: self.update(json.load(f)) - if os.path.exists(self.last_state_file) and os.path.isfile(self.last_state_file): - with open(self.last_state_file, encoding='utf-8') as f: + if os.path.exists(self.last_state_file) and os.path.isfile( + self.last_state_file + ): + with open(self.last_state_file, encoding="utf-8") as f: last_state_json = json.load(f) - self['state'] = last_state_json['state'] - self['sensors'].update(last_state_json['sensors']) + self["state"] = last_state_json["state"] + self["sensors"].update(last_state_json["sensors"]) def save_last_state(self): - with open(self.last_state_file, mode='w', encoding='utf-8') as f: + with open(self.last_state_file, mode="w", encoding="utf-8") as f: last_state = {} - last_state['state'] = self['state'] - last_state['sensors'] = self['sensors'] + last_state["state"] = self["state"] + last_state["sensors"] = self["sensors"] json.dump(last_state, f, sort_keys=True) def add_user_present(self, username): - if self['state']['open']: - if 'people_now_present' not in self['sensors']: - self['sensors']['people_now_present'] = [{'value': 0}] + if self["state"]["open"]: + if "people_now_present" not in self["sensors"]: + self["sensors"]["people_now_present"] = [{"value": 0}] - people_now_present = self['sensors']['people_now_present'][0] - - if 'names' in people_now_present and username not in people_now_present['names']: - people_now_present['value'] += 1 + people_now_present = self["sensors"]["people_now_present"][0] - if username in current_app.config['PEOPLE_NOW_PRESENT_ALLOWED']: - people_now_present['names'].append(username) + if ( + "names" in people_now_present + and username not in people_now_present["names"] + ): + people_now_present["value"] += 1 - elif 'names' not in people_now_present: - people_now_present['value'] += 1 + if username in current_app.config["PEOPLE_NOW_PRESENT_ALLOWED"]: + people_now_present["names"].append(username) - if username in current_app.config['PEOPLE_NOW_PRESENT_ALLOWED']: - people_now_present['names'] = [username] + elif "names" not in people_now_present: + people_now_present["value"] += 1 - self['sensors']['people_now_present'][0] = people_now_present + if username in current_app.config["PEOPLE_NOW_PRESENT_ALLOWED"]: + people_now_present["names"] = [username] + + self["sensors"]["people_now_present"][0] = people_now_present else: pass def remove_user_present(self, username): - if self['state']['open'] and 'people_now_present' in self['sensors']: - people_now_present = self['sensors']['people_now_present'][0] + if self["state"]["open"] and "people_now_present" in self["sensors"]: + people_now_present = self["sensors"]["people_now_present"][0] - if people_now_present['value'] > 0: - people_now_present['value'] -= 1 + if people_now_present["value"] > 0: + people_now_present["value"] -= 1 - if 'names' in people_now_present: + if "names" in people_now_present: - if username in people_now_present['names']: - people_now_present['names'].remove(username) + if username in people_now_present["names"]: + people_now_present["names"].remove(username) - if not people_now_present['names'] or people_now_present['value'] == 0: - del people_now_present['names'] + if not people_now_present["names"] or people_now_present["value"] == 0: + del people_now_present["names"] - self['sensors']['people_now_present'][0] = people_now_present + self["sensors"]["people_now_present"][0] = people_now_present else: pass def clear_user_present(self): - self['sensors']['people_now_present'][0]['value'] = 0 - if 'names' in self['sensors']['people_now_present'][0]: - del self['sensors']['people_now_present'][0]['names'] + self["sensors"]["people_now_present"][0]["value"] = 0 + if "names" in self["sensors"]["people_now_present"][0]: + del self["sensors"]["people_now_present"][0]["names"] def send_tweet(self, value): - tweet = random.choice(possible_open_tweets) if value else random.choice(possible_closed_tweets) + tweet = ( + random.choice(possible_open_tweets) + if value + else random.choice(possible_closed_tweets) + ) try: post_tweet(tweet, self) except Exception as e: - current_app.logger.error('Sending tweet failed! %s' % e, - exc_info=True) + current_app.logger.error("Sending tweet failed! %s" % e, exc_info=True) try: post_toot(tweet) except Exception as e: - current_app.logger.error('Sending toot failed! %s' % e, - exc_info=True) + current_app.logger.error("Sending toot failed! %s" % e, exc_info=True) def set_new_state(self, value=None, trigger_person=None, message=None): - if value is not None and value != self['state']['open']: - self['state']['open'] = value + if value is not None and value != self["state"]["open"]: + self["state"]["open"] = value self.send_tweet(value) if not value: - if 'people_now_present' in self['sensors']: + if "people_now_present" in self["sensors"]: self.clear_user_present() - if 'message' in self['state']: - del self['state']['message'] + if "message" in self["state"]: + del self["state"]["message"] if trigger_person is None: - if 'trigger_person' in self['state']: - del self['state']['trigger_person'] + if "trigger_person" in self["state"]: + del self["state"]["trigger_person"] else: - self['state']['trigger_person'] = trigger_person + self["state"]["trigger_person"] = trigger_person - self['state']['lastchange'] = int(time()) + self["state"]["lastchange"] = int(time()) if message is not None and message: - self['state']['message'] = message + self["state"]["message"] = message class ActiveStatusv14(ActiveStatus): - def __init__(self): self.default_json_file = default_json_file_v14 self.last_state_file = last_state_file_v14 @@ -213,12 +218,11 @@ class ActiveStatusv14(ActiveStatus): def request_wants_json(): - best = request.accept_mimetypes.best_match( - ['application/json', 'text/html'] + best = request.accept_mimetypes.best_match(["application/json", "text/html"]) + return ( + best == "application/json" + and request.accept_mimetypes[best] > request.accept_mimetypes["text/html"] ) - return best == 'application/json' and \ - request.accept_mimetypes[best] > \ - request.accept_mimetypes['text/html'] def fuzzy_list_find(lst, key, value): @@ -245,4 +249,5 @@ def pass_active_status(f): rv = f(status, *args, **kwargs) status.save_last_state() return rv + return wrapper diff --git a/spaceapi/views.py b/spaceapi/views.py --- a/spaceapi/views.py +++ b/spaceapi/views.py @@ -1,29 +1,38 @@ -from flask import Blueprint, jsonify, render_template, abort, request, redirect, url_for, current_app +from flask import ( + Blueprint, + abort, + current_app, + jsonify, + redirect, + render_template, + request, + url_for, +) -from .utils import request_wants_json, ActiveStatus, ActiveStatusv14 from .auth import httpauth +from .utils import ActiveStatus, ActiveStatusv14, request_wants_json -root_views = Blueprint('root', __name__) +root_views = Blueprint("root", __name__) -@root_views.route('/') +@root_views.route("/") def index(): if request_wants_json(): return jsonify(ActiveStatus()) - return render_template('index.html', status=ActiveStatus()) + return render_template("index.html", status=ActiveStatus()) -@root_views.route('/status.json') +@root_views.route("/status.json") def status_json(): return jsonify(ActiveStatus()) -@root_views.route('/v14/status.json') +@root_views.route("/v14/status.json") def v14_json(): return jsonify(ActiveStatusv14()) -@root_views.route('/reload') +@root_views.route("/reload") @httpauth.login_required def reload(): active = ActiveStatus() @@ -31,7 +40,7 @@ def reload(): return jsonify(active) -@root_views.route('/v14/reload') +@root_views.route("/v14/reload") @httpauth.login_required def v14_reload(): active = ActiveStatusv14() @@ -39,58 +48,69 @@ def v14_reload(): return jsonify(active) -@root_views.route('/open', methods=('GET', 'POST')) +@root_views.route("/open", methods=("GET", "POST")) @httpauth.login_required def open(): - if request.method == 'POST': + if request.method == "POST": active = ActiveStatus() activev14 = ActiveStatusv14() try: - if httpauth.username() in current_app.config['STATE_TRIGGER_PERSON_ALLOWED']: + if ( + httpauth.username() + in current_app.config["STATE_TRIGGER_PERSON_ALLOWED"] + ): trigger_person = httpauth.username() else: trigger_person = None except KeyError: trigger_person = None - if 'close' in request.form: + if "close" in request.form: new_state = False - elif 'open' in request.form: + elif "open" in request.form: new_state = True - if 'message' in request.form: - message = request.form.get('message') + if "message" in request.form: + message = request.form.get("message") else: message = None - active.set_new_state(value=new_state, trigger_person=trigger_person, message=message) - activev14.set_new_state(value=new_state, trigger_person=trigger_person, message=message) + active.set_new_state( + value=new_state, trigger_person=trigger_person, message=message + ) + activev14.set_new_state( + value=new_state, trigger_person=trigger_person, message=message + ) active.save_last_state() activev14.save_last_state() - return redirect(url_for('root.index')) + return redirect(url_for("root.index")) - return render_template('open.html') + return render_template("open.html") -@root_views.route('/present', methods=('GET', 'POST')) +@root_views.route("/present", methods=("GET", "POST")) @httpauth.login_required def present(): - if request.method == 'POST': + if request.method == "POST": active = ActiveStatus() activev14 = ActiveStatusv14() - if active['state']['open']: - user = httpauth.username() if 'user' not in request.form else request.form['user'] - if 'present' in request.form: + if active["state"]["open"]: + user = ( + httpauth.username() + if "user" not in request.form + else request.form["user"] + ) + if "present" in request.form: active.add_user_present(user) activev14.add_user_present(user) - elif 'leave' in request.form: + elif "leave" in request.form: active.remove_user_present(user) activev14.remove_user_present(user) active.save_last_state() activev14.save_last_state() - return redirect(url_for('root.index')) + return redirect(url_for("root.index")) - return render_template('present.html') + return render_template("present.html")