Changeset - 935a6b927e50
[Not reviewed]
default
0 3 0
Dennis Fink - 7 years ago 2018-03-31 16:20:58
dennis.fink@c3l.lu
Added mastodon support
3 files changed with 27 insertions and 6 deletions:
0 comments (0 inline, 0 general)
requirements.txt
Show inline comments
 
#
 
# This file is autogenerated by pip-compile
 
# Make changes in requirements.in, then run this to update:
 
# To update, run:
 
#
 
#    pip-compile requirements.in
 
#    pip-compile --output-file requirements.txt requirements.in
 
#
 
decorator==4.2.1          # via mastodon.py
 
dominate==2.1.17          # via flask-bootstrap
 
flask-bootstrap==3.3.5.7
 
flask-httpauth==3.1.0
 
flask==0.10.1
 
itsdangerous==0.24        # via flask
 
jinja2==2.8               # via flask
 
jsonschema==2.5.1
 
markupsafe==0.23          # via jinja2
 
mastodon.py==1.2.2
 
oauthlib==1.0.3           # via requests-oauthlib
 
python-dateutil==2.7.2    # via mastodon.py
 
pytz==2018.3              # via mastodon.py
 
requests-oauthlib==0.6.1  # via tweepy
 
requests==2.9.1           # via requests-oauthlib, tweepy
 
six==1.10.0               # via tweepy
 
requests==2.9.1           # via mastodon.py, requests-oauthlib, tweepy
 
six==1.10.0               # via mastodon.py, python-dateutil, tweepy
 
tweepy==3.5.0
 
visitor==0.1.2            # via flask-bootstrap
 
werkzeug==0.11.4          # via flask
setup.py
Show inline comments
 
from pprint import pprint
 

	
 
from setuptools import setup, find_packages
 

	
 
packages = find_packages()
 
packages.append('spaceapi.templates')
 
packages.append('spaceapi.static')
 
packages.append('spaceapi.schema')
 

	
 
setup(
 
    name='c3l_spaceapi',
 
    version='0.0.12',
 
    version='0.0.13',
 
    url=None,
 
    license='GPLv3+',
 
    author='Dennis Fink',
 
    author_email='dennis.fink@c3l.lu',
 
    description='spaceapi endpoint for c3l.lu',
 
    packages=packages,
 
    package_data={
 
        'spaceapi.templates': ['*'],
 
        'spaceapi.static': ['*'],
 
        'spaceapi.schema': ['*'],
 
    },
 
    install_requires=[
 
        'Flask',
 
        'Flask-HTTPAuth',
 
        'Flask-Bootstrap',
 
        'jsonschema',
 
        'tweepy',
 
        'Mastodon.py',
 
    ]
 
)
spaceapi/utils.py
Show inline comments
 
import json
 
import os.path
 
from time import time
 
import random
 
from functools import wraps
 

	
 
from flask import request, current_app
 

	
 
import tweepy
 

	
 
import mastodon
 

	
 
default_json_file = os.path.abspath('default.json')
 
last_state_file = os.path.abspath('laststate.json')
 

	
 
default_json_file_v14 = os.path.abspath('default_v14.json')
 
last_state_file_v14 = os.path.abspath('laststate_v14.json')
 

	
 
if not os.path.exists(default_json_file):
 
    raise RuntimeError('default.json does not exists!')
 
elif not os.path.isfile(default_json_file):
 
    raise RuntimeError('default.json is not a file!')
 

	
 
if not os.path.exists(default_json_file_v14):
 
    raise RuntimeError('default_v14.json does not exists!')
 
elif not os.path.isfile(default_json_file):
 
    raise RuntimeError('default_v14.json is not a file!')
 

	
 

	
 
possible_open_tweets = (
 
    'The space is now open!',
 
    'The space is open! Come in and hack something!',
 
    'Yes, we\'re open! Come in and create something!',
 
    'Come by and hack something! We\'ve just opened!',
 
    'The ChaosStuff is now open for everyone!',
 
    'Let the Chaos begin! We\'re open!',
 
    'What do we hack now? Come and see, we\'ve just opened!',
 
    'TUWAT! Come and hack. We are open!',
 
)
 

	
 
possible_closed_tweets = (
 
    'The space is now closed!',
 
    'We\'re closed now! See you soon.',
 
    'Sorry, we are closed now!',
 
    'The ChaosStuff is now closed! Come back another time!',
 
    'Poweroff process finished! We\'re closed!',
 
    'Singularity reached! The space is closed!',
 
    'Dream of electric sheeps! We are closed!',
 
)
 

	
 

	
 
def post_tweet(tweet, spaceapi=None):
 
    if 'TWITTER_CONSUMER_KEY' in current_app.config:
 
        auth = tweepy.OAuthHandler(
 
            current_app.config['TWITTER_CONSUMER_KEY'],
 
            current_app.config['TWITTER_CONSUMER_SECRET']
 
        )
 
        auth.set_access_token(
 
            current_app.config['TWITTER_ACCESS_TOKEN_KEY'],
 
            current_app.config['TWITTER_ACCESS_TOKEN_SECRET']
 
        )
 
        api = tweepy.API(auth)
 
        if spaceapi is None:
 
            api.update_status(tweet)
 
        else:
 
            api.update_status(tweet, lat=spaceapi['location']['lat'], lon=spaceapi['location']['lon'])
 

	
 

	
 
def post_toot(toot):
 
    if 'MASTODON_USERCRED_FILE' in current_app.config:
 
        api = mastodon.Mastodon(
 
            client_id='c3l_spaceapi_clientcred.secret',
 
            access_token=current_app.config['MASTODON_USERCRED_FILE'],
 
            api_base_url='https://chaos.social'
 
        )
 
        api.status_post(toot, visibility='public')
 

	
 

	
 
class Singleton:
 

	
 
    def __new__(cls, *args, **kwargs):
 
        key = str(hash(cls))
 

	
 
        if not hasattr(cls, '_instance_dict'):
 
            cls._instance_dict = {}
 

	
 
        if key not in cls._instance_dict:
 
            cls._instance_dict[key] = super().__new__(cls, *args, **kwargs)
 

	
 
        return cls._instance_dict[key]
 

	
 

	
 
class ActiveStatus(Singleton, dict):
 

	
 
    def __init__(self):
 
        self.default_json_file = default_json_file
 
        self.last_state_file = last_state_file
 

	
 
    def reload(self):
 

	
 
        with open(self.default_json_file, encoding='utf-8') as f:
 
            self.update(json.load(f))
 

	
 
        if os.path.exists(self.last_state_file) and os.path.isfile(self.last_state_file):
 
            with open(self.last_state_file, encoding='utf-8') as f:
 
                last_state_json = json.load(f)
 

	
 
            self['state'] = last_state_json['state']
 
            self['sensors'].update(last_state_json['sensors'])
 

	
 
    def save_last_state(self):
 

	
 
        with open(self.last_state_file, mode='w', encoding='utf-8') as f:
 
            last_state = {}
 
            last_state['state'] = self['state']
 
            last_state['sensors'] = self['sensors']
 
            json.dump(last_state, f, sort_keys=True)
 

	
 
    def add_user_present(self, username):
 
        if self['state']['open']:
 
            if 'people_now_present' not in self['sensors']:
 
                self['sensors']['people_now_present'] = [{'value': 0}]
 

	
 
            people_now_present = self['sensors']['people_now_present'][0]
 

	
 
            if 'names' in people_now_present and username not in people_now_present['names']:
 
                people_now_present['value'] += 1
 

	
 
                if username in current_app.config['PEOPLE_NOW_PRESENT_ALLOWED']:
 
                    people_now_present['names'].append(username)
 

	
 
            elif 'names' not in people_now_present:
 
                people_now_present['value'] += 1
 

	
 
                if username in current_app.config['PEOPLE_NOW_PRESENT_ALLOWED']:
 
                    people_now_present['names'] = [username]
 

	
 
            self['sensors']['people_now_present'][0] = people_now_present
 
        else:
 
            pass
 

	
 
    def remove_user_present(self, username):
 
        if self['state']['open'] and 'people_now_present' in self['sensors']:
 
            people_now_present = self['sensors']['people_now_present'][0]
 

	
 
            if people_now_present['value'] > 0:
 
                people_now_present['value'] -= 1
 

	
 
            if 'names' in people_now_present:
 

	
 
                if username in people_now_present['names']:
 
                    people_now_present['names'].remove(username)
 

	
 
                if not people_now_present['names'] or people_now_present['value'] == 0:
 
                    del people_now_present['names']
 

	
 
            self['sensors']['people_now_present'][0] = people_now_present
 
        else:
 
            pass
 

	
 
    def clear_user_present(self):
 
        self['sensors']['people_now_present'][0]['value'] = 0
 
        if 'names' in self['sensors']['people_now_present'][0]:
 
            del self['sensors']['people_now_present'][0]['names']
 

	
 
    def send_tweet(self, value):
 
        tweet = random.choice(possible_open_tweets) if value else random.choice(possible_closed_tweets)
 
        try:
 
            post_tweet(tweet, self)
 
        except Exception as e:
 
            current_app.logger.error('Sending tweet failed! %s' % e,
 
                                     exc_info=True)
 

	
 
        try:
 
            post_toot(tweet)
 
        except Exception as e:
 
            current_app.logger.error('Sending toot failed! %s' % e,
 
                                     exc_info=True)
 

	
 
    def set_new_state(self, value=None, trigger_person=None, message=None):
 

	
 
        if value is not None and value != self['state']['open']:
 
            self['state']['open'] = value
 

	
 
            self.send_tweet(value)
 

	
 
            if not value:
 
                if 'people_now_present' in self['sensors']:
 
                    self.clear_user_present()
 

	
 
                if 'message' in self['state']:
 
                    del self['state']['message']
 

	
 
            if trigger_person is None:
 
                del self['state']['trigger_person']
 
            else:
 
                self['state']['trigger_person'] = trigger_person
 

	
 
            self['state']['lastchange'] = int(time())
 

	
 
        if message is not None:
 
            self['state']['message'] = message
 

	
 

	
 
class ActiveStatusv14(ActiveStatus):
 

	
 
    def __init__(self):
 
        self.default_json_file = default_json_file_v14
 
        self.last_state_file = last_state_file_v14
 

	
 
    def send_tweet(self, value):
 
        pass
 

	
 

	
 
def request_wants_json():
 
    best = request.accept_mimetypes.best_match(
 
        ['application/json', 'text/html']
 
    )
 
    return best == 'application/json' and \
 
        request.accept_mimetypes[best] > \
 
        request.accept_mimetypes['text/html']
 

	
 

	
 
def fuzzy_list_find(lst, key, value):
 

	
 
    for i, dic in enumerate(lst):
 
        if dic[key] == value:
 
            return i
 

	
 
    raise ValueError
 

	
 

	
 
def first(iterable, keys):
 
    for key in keys:
 
        if key in iterable:
 
            return key
 

	
 
    raise ValueError
 

	
 

	
 
def pass_active_status(f):
 
    @wraps(f)
 
    def wrapper(*args, **kwargs):
 
        status = ActiveStatus()
 
        rv = f(status, *args, **kwargs)
 
        status.save_last_state()
 
        return rv
 
    return wrapper
0 comments (0 inline, 0 general)