Files @ 5ddd2b163b42
Branch filter:

Location: C3L-NOC/spaceapi/spaceapi/utils.py

Dennis Fink
Remove Purpose.SERVER_AUTH from create_default_context
import email
import json
import os.path
import random
import smtplib
from functools import wraps
from time import time

import mastodon
import tweepy
from flask import current_app, request

default_json_file_v14 = os.path.abspath("default_v14.json")
last_state_file_v14 = os.path.abspath("laststate_v14.json")

if not os.path.exists(default_json_file_v14):
    raise RuntimeError("default_v14.json does not exists!")
elif not os.path.isfile(default_json_file):
    raise RuntimeError("default_v14.json is not a file!")

standard_open_message = "The space is now open!"
standard_close_message = "The space is now closed!"

possible_open_tweets = (
    standard_open_message,
    "The space is open! Come in and hack something!",
    "Yes, we're open! Come in and create something!",
    "Come by and hack something! We've just opened!",
    "The ChaosStuff is now open for everyone!",
    "Let the Chaos begin! We're open!",
    "What do we hack now? Come and see, we've just opened!",
    "TUWAT! Come and hack. We are open!",
)

possible_closed_tweets = (
    standard_close_message,
    "We're closed now! See you soon.",
    "Sorry, we are closed now!",
    "The ChaosStuff is now closed! Come back another time!",
    "Poweroff process finished! We're closed!",
    "Singularity reached! The space is closed!",
    "Dream of electric sheeps! We are closed!",
)


def post_tweet(tweet, spaceapi=None):
    if "TWITTER_CONSUMER_KEY" in current_app.config:
        auth = tweepy.OAuthHandler(
            current_app.config["TWITTER_CONSUMER_KEY"],
            current_app.config["TWITTER_CONSUMER_SECRET"],
        )
        auth.set_access_token(
            current_app.config["TWITTER_ACCESS_TOKEN_KEY"],
            current_app.config["TWITTER_ACCESS_TOKEN_SECRET"],
        )
        api = tweepy.API(auth)
        if spaceapi is None:
            api.update_status(tweet)
        else:
            api.update_status(
                tweet, lat=spaceapi["location"]["lat"], lon=spaceapi["location"]["lon"]
            )


def post_toot(toot):
    if "MASTODON_USERCRED_FILE" in current_app.config:
        api = mastodon.Mastodon(
            client_id="c3l_spaceapi_clientcred.secret",
            access_token=current_app.config["MASTODON_USERCRED_FILE"],
            api_base_url="https://chaos.social",
        )
        api.status_post(toot, visibility="unlisted")


def post_email(subject, body):
    if "EMAIL_PASS" in current_app.config:
        smtp_conn = smtplib.SMTP(
            current_app.config["EMAIL_HOST"], port=current_app.config["EMAIL_PORT"]
        )
        ssl_context = ssl.create_default_context()
        smtp_conn.starttls(ssl_context)
        smtp_conn.login(
            current_app.config["EMAIL_USER"], current_app.config["EMAIL_PASS"]
        )
        msg = email.message.EmailMessage(email.policy.SMTP)
        msg["To"] = current_app.config["EMAIL_ANNOUNCE_ADDRESS"]
        msg["From"] = current_app.config["EMAIL_USER"]
        msg["Subject"] = subject
        msg.set_content(body)
        smtp_conn.send_message(msg)


class Singleton:
    def __new__(cls, *args, **kwargs):
        key = str(hash(cls))

        if not hasattr(cls, "_instance_dict"):
            cls._instance_dict = {}

        if key not in cls._instance_dict:
            cls._instance_dict[key] = super().__new__(cls, *args, **kwargs)

        return cls._instance_dict[key]


class ActiveStatusv14(Singleton, dict):
    def __init__(self):
        self.default_json_file = default_json_file_v14
        self.last_state_file = last_state_file_v14

    def reload(self):

        with open(self.default_json_file, encoding="utf-8") as f:
            self.update(json.load(f))

        if os.path.exists(self.last_state_file) and os.path.isfile(
            self.last_state_file
        ):
            with open(self.last_state_file, encoding="utf-8") as f:
                last_state_json = json.load(f)

            self["state"] = last_state_json["state"]
            self["sensors"].update(last_state_json["sensors"])

    def save_last_state(self):

        with open(self.last_state_file, mode="w", encoding="utf-8") as f:
            last_state = {}
            last_state["state"] = self["state"]
            last_state["sensors"] = self["sensors"]
            json.dump(last_state, f, sort_keys=True)

    def add_user_present(self, username):
        if self["state"]["open"]:
            if "people_now_present" not in self["sensors"]:
                self["sensors"]["people_now_present"] = [{"value": 0}]

            people_now_present = self["sensors"]["people_now_present"][0]

            if (
                "names" in people_now_present
                and username not in people_now_present["names"]
            ):
                people_now_present["value"] += 1

                if username in current_app.config["PEOPLE_NOW_PRESENT_ALLOWED"]:
                    people_now_present["names"].append(username)

            elif "names" not in people_now_present:
                people_now_present["value"] += 1

                if username in current_app.config["PEOPLE_NOW_PRESENT_ALLOWED"]:
                    people_now_present["names"] = [username]

            self["sensors"]["people_now_present"][0] = people_now_present
        else:
            pass

    def remove_user_present(self, username):
        if self["state"]["open"] and "people_now_present" in self["sensors"]:
            people_now_present = self["sensors"]["people_now_present"][0]

            if people_now_present["value"] > 0:
                people_now_present["value"] -= 1

            if "names" in people_now_present:

                if username in people_now_present["names"]:
                    people_now_present["names"].remove(username)

                if not people_now_present["names"] or people_now_present["value"] == 0:
                    del people_now_present["names"]

            self["sensors"]["people_now_present"][0] = people_now_present
        else:
            pass

    def clear_user_present(self):
        self["sensors"]["people_now_present"][0]["value"] = 0
        if "names" in self["sensors"]["people_now_present"][0]:
            del self["sensors"]["people_now_present"][0]["names"]

    def send_tweet(self, value):
        tweet = (
            random.choice(possible_open_tweets)
            if value
            else random.choice(possible_closed_tweets)
        )
        try:
            post_tweet(tweet, self)
        except Exception as e:
            current_app.logger.error("Sending tweet failed! %s" % e, exc_info=True)

        try:
            post_toot(tweet)
        except Exception as e:
            current_app.logger.error("Sending toot failed! %s" % e, exc_info=True)

    def send_email(self, value):
        subject = standard_open_message if value else standard_close_message
        try:
            post_email(message)
        except Exception as e:
            current_app.logger.error("Sending email failed! %s" % e, exc_info=True)

    def set_new_state(self, value=None, trigger_person=None, message=None):

        if value is not None and value != self["state"]["open"]:
            self["state"]["open"] = value

            self.send_tweet(value)
            self.send_email(value)

            if not value:
                if "people_now_present" in self["sensors"]:
                    self.clear_user_present()

                if "message" in self["state"]:
                    del self["state"]["message"]

            if trigger_person is None:
                if "trigger_person" in self["state"]:
                    del self["state"]["trigger_person"]
            else:
                self["state"]["trigger_person"] = trigger_person

            self["state"]["lastchange"] = int(time())

        if message is not None and message:
            self["state"]["message"] = message


def request_wants_json():
    best = request.accept_mimetypes.best_match(["application/json", "text/html"])
    return (
        best == "application/json"
        and request.accept_mimetypes[best] > request.accept_mimetypes["text/html"]
    )


def fuzzy_list_find(lst, key, value):

    for i, dic in enumerate(lst):
        if dic[key] == value:
            return i

    raise ValueError


def first(iterable, keys):
    for key in keys:
        if key in iterable:
            return key

    raise ValueError


def pass_active_status(f):
    @wraps(f)
    def wrapper(*args, **kwargs):
        status = ActiveStatusv14()
        rv = f(status, *args, **kwargs)
        status.save_last_state()
        return rv

    return wrapper