Changeset - f591cd1f3c49
[Not reviewed]
default
0 1 0
Dennis Fink - 3 years ago 2022-07-19 12:09:50
dennis.fink@c3l.lu
Merge send_tweet and post_tweet, send_toot and post_toot, send_email and post_email into the spaceapi class
1 file changed with 56 insertions and 68 deletions:
0 comments (0 inline, 0 general)
spaceapi/utils.py
Show inline comments
 
@@ -23,7 +23,7 @@ elif not os.path.isfile(default_json_fil
 
standard_open_message = "The space is now open!"
 
standard_close_message = "The space is now closed!"
 

	
 
possible_open_tweets = (
 
possible_open_messages = (
 
    standard_open_message,
 
    "The space is open! Come in and hack something!",
 
    "Yes, we're open! Come in and create something!",
 
@@ -34,7 +34,7 @@ possible_open_tweets = (
 
    "TUWAT! Come and hack. We are open!",
 
)
 

	
 
possible_closed_tweets = (
 
possible_closed_messages = (
 
    standard_close_message,
 
    "We're closed now! See you soon.",
 
    "Sorry, we are closed now!",
 
@@ -45,55 +45,6 @@ possible_closed_tweets = (
 
)
 

	
 

	
 
def post_tweet(tweet, spaceapi=None):
 
    if "TWITTER_CONSUMER_KEY" in current_app.config:
 
        auth = tweepy.OAuthHandler(
 
            current_app.config["TWITTER_CONSUMER_KEY"],
 
            current_app.config["TWITTER_CONSUMER_SECRET"],
 
        )
 
        auth.set_access_token(
 
            current_app.config["TWITTER_ACCESS_TOKEN_KEY"],
 
            current_app.config["TWITTER_ACCESS_TOKEN_SECRET"],
 
        )
 
        api = tweepy.API(auth)
 
        if spaceapi is None:
 
            api.update_status(tweet)
 
        else:
 
            api.update_status(
 
                tweet, lat=spaceapi["location"]["lat"], lon=spaceapi["location"]["lon"]
 
            )
 

	
 

	
 
def post_toot(toot):
 
    if "MASTODON_USERCRED_FILE" in current_app.config:
 
        api = mastodon.Mastodon(
 
            client_id="c3l_spaceapi_clientcred.secret",
 
            access_token=current_app.config["MASTODON_USERCRED_FILE"],
 
            api_base_url="https://chaos.social",
 
        )
 
        api.status_post(toot, visibility="unlisted")
 

	
 

	
 
def post_email(subject, body):
 
    if "EMAIL_PASS" in current_app.config:
 

	
 
        msg = email.message.EmailMessage(policy=email.policy.default)
 
        msg["To"] = current_app.config["EMAIL_ANNOUNCE_ADDRESS"]
 

	
 
        email_user = current_app.config["EMAIL_USER"]
 
        msg["From"] = "spaceapibot <{email}>".format(email=email_user)
 

	
 
        msg["Subject"] = subject
 
        msg.set_content(body)
 

	
 
        with smtplib.SMTP(
 
            current_app.config["EMAIL_HOST"], port=current_app.config["EMAIL_PORT"]
 
        ) as smtp:
 
            smtp.starttls(context=ssl.create_default_context())
 
            smtp.login(email_user, current_app.config["EMAIL_PASS"])
 
            smtp.send_message(msg)
 

	
 

	
 
class Singleton:
 
    def __new__(cls, *args, **kwargs):
 
        key = str(hash(cls))
 
@@ -184,31 +135,69 @@ class ActiveStatusv14(Singleton, dict):
 
        if "names" in self["sensors"]["people_now_present"][0]:
 
            del self["sensors"]["people_now_present"][0]["names"]
 

	
 
    def send_tweet(self, value):
 
        tweet = (
 
            random.choice(possible_open_tweets)
 
            if value
 
            else random.choice(possible_closed_tweets)
 
    def notify(self):
 
        message = (
 
            random.choice(possible_open_messages)
 
            if self["state"]["open"]
 
            else random.choice(possible_closed_messages)
 
        )
 
        self.send_tweet(message)
 
        self.send_toot(message)
 

	
 
        subject = (
 
            standard_open_message if self["state"]["open"] else standard_close_message
 
        )
 
        self.send_email(subject, message)
 

	
 
    def send_tweet(self, message):
 
        if "TWITTER_CONSUMER_KEY" in current_app.config:
 
        try:
 
            post_tweet(tweet, self)
 
                auth = tweepy.OAuthHandler(
 
                    current_app.config["TWITTER_CONSUMER_KEY"],
 
                    current_app.config["TWITTER_CONSUMER_SECRET"],
 
                )
 
                auth.set_access_token(
 
                    current_app.config["TWITTER_ACCESS_TOKEN_KEY"],
 
                    current_app.config["TWITTER_ACCESS_TOKEN_SECRET"],
 
                )
 
                api = tweepy.API(auth)
 
                api.update_status(
 
                    tweet, lat=self["location"]["lat"], long=self["location"]["lon"]
 
                )
 
        except Exception as e:
 
            current_app.logger.error("Sending tweet failed! %s" % e, exc_info=True)
 

	
 
    def send_toot(self, message):
 
        if "MASTODON_USERCRED_FILE" in current_app.config:
 
        try:
 
            post_toot(tweet)
 
                api = mastodon.Mastodon(
 
                    client_id="c3l_spaceapi_clientcred.secret",
 
                    access_token=current_app.config["MASTODON_USERCRED_FILE"],
 
                    api_base_url="https://chaos.social",
 
                )
 
                api.status_post(toot, visibility="unlisted")
 
        except Exception as e:
 
            current_app.logger.error("Sending toot failed! %s" % e, exc_info=True)
 

	
 
    def send_email(self, value):
 
        subject = standard_open_message if value else standard_close_message
 
        message = (
 
            random.choice(possible_open_tweets)
 
            if value
 
            else random.choice(possible_closed_tweets)
 
        )
 
    def send_email(self, subject, message):
 
        if "EMAIL_PASS" in current_app.config:
 
        try:
 
            post_email(subject, message)
 
                msg = email.message.EmailMessage(policy=email.policy.default)
 
                msg["To"] = current_app.config["EMAIL_ANNOUNCE_ADDRESS"]
 

	
 
                email_user = current_app.config["EMAIL_USER"]
 
                msg["From"] = "spaceapibot <{email}>".format(email=email_user)
 

	
 
                msg["Subject"] = subject
 
                msg.set_content(body)
 

	
 
                with smtplib.SMTP(
 
                    current_app.config["EMAIL_HOST"],
 
                    port=current_app.config["EMAIL_PORT"],
 
                ) as smtp:
 
                    smtp.starttls(context=ssl.create_default_context())
 
                    smtp.login(email_user, current_app.config["EMAIL_PASS"])
 
                    smtp.send_message(msg)
 
        except Exception as e:
 
            current_app.logger.error("Sending email failed! %s" % e, exc_info=True)
 

	
 
@@ -217,8 +206,7 @@ class ActiveStatusv14(Singleton, dict):
 
        if value is not None and value != self["state"]["open"]:
 
            self["state"]["open"] = value
 

	
 
            self.send_tweet(value)
 
            self.send_email(value)
 
            self.notify()
 

	
 
            if not value:
 
                if "people_now_present" in self["sensors"]:
0 comments (0 inline, 0 general)