Changeset - f591cd1f3c49
[Not reviewed]
default
0 1 0
Dennis Fink - 3 years ago 2022-07-19 12:09:50
dennis.fink@c3l.lu
Merge send_tweet and post_tweet, send_toot and post_toot, send_email and post_email into the spaceapi class
1 file changed with 56 insertions and 68 deletions:
0 comments (0 inline, 0 general)
spaceapi/utils.py
Show inline comments
 
@@ -20,83 +20,34 @@ if not os.path.exists(default_json_file_
 
elif not os.path.isfile(default_json_file_v14):
 
    raise RuntimeError("default_v14.json is not a file!")
 

	
 
standard_open_message = "The space is now open!"
 
standard_close_message = "The space is now closed!"
 

	
 
possible_open_tweets = (
 
possible_open_messages = (
 
    standard_open_message,
 
    "The space is open! Come in and hack something!",
 
    "Yes, we're open! Come in and create something!",
 
    "Come by and hack something! We've just opened!",
 
    "The ChaosStuff is now open for everyone!",
 
    "Let the Chaos begin! We're open!",
 
    "What do we hack now? Come and see, we've just opened!",
 
    "TUWAT! Come and hack. We are open!",
 
)
 

	
 
possible_closed_tweets = (
 
possible_closed_messages = (
 
    standard_close_message,
 
    "We're closed now! See you soon.",
 
    "Sorry, we are closed now!",
 
    "The ChaosStuff is now closed! Come back another time!",
 
    "Poweroff process finished! We're closed!",
 
    "Singularity reached! The space is closed!",
 
    "Dream of electric sheeps! We are closed!",
 
)
 

	
 

	
 
def post_tweet(tweet, spaceapi=None):
 
    if "TWITTER_CONSUMER_KEY" in current_app.config:
 
        auth = tweepy.OAuthHandler(
 
            current_app.config["TWITTER_CONSUMER_KEY"],
 
            current_app.config["TWITTER_CONSUMER_SECRET"],
 
        )
 
        auth.set_access_token(
 
            current_app.config["TWITTER_ACCESS_TOKEN_KEY"],
 
            current_app.config["TWITTER_ACCESS_TOKEN_SECRET"],
 
        )
 
        api = tweepy.API(auth)
 
        if spaceapi is None:
 
            api.update_status(tweet)
 
        else:
 
            api.update_status(
 
                tweet, lat=spaceapi["location"]["lat"], lon=spaceapi["location"]["lon"]
 
            )
 

	
 

	
 
def post_toot(toot):
 
    if "MASTODON_USERCRED_FILE" in current_app.config:
 
        api = mastodon.Mastodon(
 
            client_id="c3l_spaceapi_clientcred.secret",
 
            access_token=current_app.config["MASTODON_USERCRED_FILE"],
 
            api_base_url="https://chaos.social",
 
        )
 
        api.status_post(toot, visibility="unlisted")
 

	
 

	
 
def post_email(subject, body):
 
    if "EMAIL_PASS" in current_app.config:
 

	
 
        msg = email.message.EmailMessage(policy=email.policy.default)
 
        msg["To"] = current_app.config["EMAIL_ANNOUNCE_ADDRESS"]
 

	
 
        email_user = current_app.config["EMAIL_USER"]
 
        msg["From"] = "spaceapibot <{email}>".format(email=email_user)
 

	
 
        msg["Subject"] = subject
 
        msg.set_content(body)
 

	
 
        with smtplib.SMTP(
 
            current_app.config["EMAIL_HOST"], port=current_app.config["EMAIL_PORT"]
 
        ) as smtp:
 
            smtp.starttls(context=ssl.create_default_context())
 
            smtp.login(email_user, current_app.config["EMAIL_PASS"])
 
            smtp.send_message(msg)
 

	
 

	
 
class Singleton:
 
    def __new__(cls, *args, **kwargs):
 
        key = str(hash(cls))
 

	
 
        if not hasattr(cls, "_instance_dict"):
 
            cls._instance_dict = {}
 
@@ -181,47 +132,84 @@ class ActiveStatusv14(Singleton, dict):
 

	
 
    def clear_user_present(self):
 
        self["sensors"]["people_now_present"][0]["value"] = 0
 
        if "names" in self["sensors"]["people_now_present"][0]:
 
            del self["sensors"]["people_now_present"][0]["names"]
 

	
 
    def send_tweet(self, value):
 
        tweet = (
 
            random.choice(possible_open_tweets)
 
            if value
 
            else random.choice(possible_closed_tweets)
 
    def notify(self):
 
        message = (
 
            random.choice(possible_open_messages)
 
            if self["state"]["open"]
 
            else random.choice(possible_closed_messages)
 
        )
 
        self.send_tweet(message)
 
        self.send_toot(message)
 

	
 
        subject = (
 
            standard_open_message if self["state"]["open"] else standard_close_message
 
        )
 
        self.send_email(subject, message)
 

	
 
    def send_tweet(self, message):
 
        if "TWITTER_CONSUMER_KEY" in current_app.config:
 
        try:
 
            post_tweet(tweet, self)
 
                auth = tweepy.OAuthHandler(
 
                    current_app.config["TWITTER_CONSUMER_KEY"],
 
                    current_app.config["TWITTER_CONSUMER_SECRET"],
 
                )
 
                auth.set_access_token(
 
                    current_app.config["TWITTER_ACCESS_TOKEN_KEY"],
 
                    current_app.config["TWITTER_ACCESS_TOKEN_SECRET"],
 
                )
 
                api = tweepy.API(auth)
 
                api.update_status(
 
                    tweet, lat=self["location"]["lat"], long=self["location"]["lon"]
 
                )
 
        except Exception as e:
 
            current_app.logger.error("Sending tweet failed! %s" % e, exc_info=True)
 

	
 
    def send_toot(self, message):
 
        if "MASTODON_USERCRED_FILE" in current_app.config:
 
        try:
 
            post_toot(tweet)
 
                api = mastodon.Mastodon(
 
                    client_id="c3l_spaceapi_clientcred.secret",
 
                    access_token=current_app.config["MASTODON_USERCRED_FILE"],
 
                    api_base_url="https://chaos.social",
 
                )
 
                api.status_post(toot, visibility="unlisted")
 
        except Exception as e:
 
            current_app.logger.error("Sending toot failed! %s" % e, exc_info=True)
 

	
 
    def send_email(self, value):
 
        subject = standard_open_message if value else standard_close_message
 
        message = (
 
            random.choice(possible_open_tweets)
 
            if value
 
            else random.choice(possible_closed_tweets)
 
        )
 
    def send_email(self, subject, message):
 
        if "EMAIL_PASS" in current_app.config:
 
        try:
 
            post_email(subject, message)
 
                msg = email.message.EmailMessage(policy=email.policy.default)
 
                msg["To"] = current_app.config["EMAIL_ANNOUNCE_ADDRESS"]
 

	
 
                email_user = current_app.config["EMAIL_USER"]
 
                msg["From"] = "spaceapibot <{email}>".format(email=email_user)
 

	
 
                msg["Subject"] = subject
 
                msg.set_content(body)
 

	
 
                with smtplib.SMTP(
 
                    current_app.config["EMAIL_HOST"],
 
                    port=current_app.config["EMAIL_PORT"],
 
                ) as smtp:
 
                    smtp.starttls(context=ssl.create_default_context())
 
                    smtp.login(email_user, current_app.config["EMAIL_PASS"])
 
                    smtp.send_message(msg)
 
        except Exception as e:
 
            current_app.logger.error("Sending email failed! %s" % e, exc_info=True)
 

	
 
    def set_new_state(self, value=None, trigger_person=None, message=None):
 

	
 
        if value is not None and value != self["state"]["open"]:
 
            self["state"]["open"] = value
 

	
 
            self.send_tweet(value)
 
            self.send_email(value)
 
            self.notify()
 

	
 
            if not value:
 
                if "people_now_present" in self["sensors"]:
 
                    self.clear_user_present()
 

	
 
                if "message" in self["state"]:
0 comments (0 inline, 0 general)