Changeset - 9cb4b622c930
[Not reviewed]
default
0 1 0
Dennis Fink - 3 years ago 2022-07-19 11:54:17
dennis.fink@c3l.lu
Use context manager with stmp connection
1 file changed with 12 insertions and 12 deletions:
0 comments (0 inline, 0 general)
spaceapi/utils.py
Show inline comments
 
@@ -31,112 +31,112 @@ possible_open_tweets = (
 
    "The ChaosStuff is now open for everyone!",
 
    "Let the Chaos begin! We're open!",
 
    "What do we hack now? Come and see, we've just opened!",
 
    "TUWAT! Come and hack. We are open!",
 
)
 

	
 
possible_closed_tweets = (
 
    standard_close_message,
 
    "We're closed now! See you soon.",
 
    "Sorry, we are closed now!",
 
    "The ChaosStuff is now closed! Come back another time!",
 
    "Poweroff process finished! We're closed!",
 
    "Singularity reached! The space is closed!",
 
    "Dream of electric sheeps! We are closed!",
 
)
 

	
 

	
 
def post_tweet(tweet, spaceapi=None):
 
    if "TWITTER_CONSUMER_KEY" in current_app.config:
 
        auth = tweepy.OAuthHandler(
 
            current_app.config["TWITTER_CONSUMER_KEY"],
 
            current_app.config["TWITTER_CONSUMER_SECRET"],
 
        )
 
        auth.set_access_token(
 
            current_app.config["TWITTER_ACCESS_TOKEN_KEY"],
 
            current_app.config["TWITTER_ACCESS_TOKEN_SECRET"],
 
        )
 
        api = tweepy.API(auth)
 
        if spaceapi is None:
 
            api.update_status(tweet)
 
        else:
 
            api.update_status(
 
                tweet, lat=spaceapi["location"]["lat"], lon=spaceapi["location"]["lon"]
 
            )
 

	
 

	
 
def post_toot(toot):
 
    if "MASTODON_USERCRED_FILE" in current_app.config:
 
        api = mastodon.Mastodon(
 
            client_id="c3l_spaceapi_clientcred.secret",
 
            access_token=current_app.config["MASTODON_USERCRED_FILE"],
 
            api_base_url="https://chaos.social",
 
        )
 
        api.status_post(toot, visibility="unlisted")
 

	
 

	
 
def post_email(subject, body):
 
    if "EMAIL_PASS" in current_app.config:
 
        smtp_conn = smtplib.SMTP(
 
            current_app.config["EMAIL_HOST"], port=current_app.config["EMAIL_PORT"]
 
        )
 
        ssl_context = ssl.create_default_context()
 
        smtp_conn.starttls(context=ssl_context)
 
        smtp_conn.login(
 
            current_app.config["EMAIL_USER"], current_app.config["EMAIL_PASS"]
 
        )
 

	
 
        msg = email.message.EmailMessage(policy=email.policy.default)
 
        msg["To"] = current_app.config["EMAIL_ANNOUNCE_ADDRESS"]
 
        msg["From"] = "spaceapibot <{email}>".format(
 
            email=current_app.config["EMAIL_USER"]
 
        )
 

	
 
        email_user = current_app.config["EMAIL_USER"]
 
        msg["From"] = "spaceapibot <{email}>".format(email=email_user)
 

	
 
        msg["Subject"] = subject
 
        msg.set_content(body)
 
        smtp_conn.send_message(msg)
 

	
 
        with smtplib.SMTP(
 
            current_app.config["EMAIL_HOST"], port=current_app.config["EMAIL_PORT"]
 
        ) as smtp:
 
            smtp.starttls(context=ssl.create_default_context())
 
            smtp.login(email_user, current_app.config["EMAIL_PASS"])
 
            smtp.send_message(msg)
 

	
 

	
 
class Singleton:
 
    def __new__(cls, *args, **kwargs):
 
        key = str(hash(cls))
 

	
 
        if not hasattr(cls, "_instance_dict"):
 
            cls._instance_dict = {}
 

	
 
        if key not in cls._instance_dict:
 
            cls._instance_dict[key] = super().__new__(cls, *args, **kwargs)
 

	
 
        return cls._instance_dict[key]
 

	
 

	
 
class ActiveStatusv14(Singleton, dict):
 
    def __init__(self):
 
        self.default_json_file = default_json_file_v14
 
        self.last_state_file = last_state_file_v14
 

	
 
    def reload(self):
 

	
 
        with open(self.default_json_file, encoding="utf-8") as f:
 
            self.update(json.load(f))
 

	
 
        if os.path.exists(self.last_state_file) and os.path.isfile(
 
            self.last_state_file
 
        ):
 
            with open(self.last_state_file, encoding="utf-8") as f:
 
                last_state_json = json.load(f)
 

	
 
            self["state"] = last_state_json["state"]
 
            self["sensors"].update(last_state_json["sensors"])
 

	
 
    def save_last_state(self):
 

	
 
        with open(self.last_state_file, mode="w", encoding="utf-8") as f:
 
            last_state = {}
 
            last_state["state"] = self["state"]
 
            last_state["sensors"] = self["sensors"]
 
            json.dump(last_state, f, sort_keys=True)
 

	
 
    def add_user_present(self, username):
 
        if self["state"]["open"]:
 
            if "people_now_present" not in self["sensors"]:
 
                self["sensors"]["people_now_present"] = [{"value": 0}]
 

	
 
            people_now_present = self["sensors"]["people_now_present"][0]
0 comments (0 inline, 0 general)