Files
@ 22285473c090
Branch filter:
Location: C3L-NOC/spaceapi/spaceapi/utils.py
22285473c090
5.2 KiB
text/x-python
Version bump
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 | import json
import os.path
from time import time
from flask import request, current_app
import tweepy
default_json_file = os.path.abspath('default.json')
last_state_file = os.path.abspath('laststate.json')
if not os.path.exists(default_json_file):
raise RuntimeError('default.json does not exists!')
elif not os.path.isfile(default_json_file):
raise RuntimeError('default.json is not a file!')
class Singleton:
def __new__(cls, *args, **kwargs):
key = str(hash(cls))
if not hasattr(cls, '_instance_dict'):
cls._instance_dict = {}
if key not in cls._instance_dict:
cls._instance_dict[key] = super().__new__(cls, *args, **kwargs)
return cls._instance_dict[key]
class ActiveStatus(Singleton, dict):
def reload(self):
with open(default_json_file, encoding='utf-8') as f:
self.update(json.load(f))
if os.path.exists(last_state_file) and os.path.isfile(last_state_file):
with open(last_state_file, encoding='utf-8') as f:
last_state_json = json.load(f)
self['state'] = last_state_json['state']
self['sensors'].update(last_state_json['sensors'])
def save_last_state(self):
with open(last_state_file, mode='w', encoding='utf-8') as f:
last_state = {}
last_state['state'] = self['state']
last_state['sensors'] = self['sensors']
json.dump(last_state, f, sort_keys=True)
def add_user_present(self, username):
if self['state']['open']:
if 'people_now_present' not in self['sensors']:
self['sensors']['people_now_present'] = [{'value': 0}]
people_now_present = self['sensors']['people_now_present'][0]
if 'names' in people_now_present and username not in people_now_present['names']:
people_now_present['value'] += 1
if username in current_app.config['PEOPLE_NOW_PRESENT_ALLOWED']:
people_now_present['names'].append(username)
elif 'names' not in people_now_present:
people_now_present['value'] += 1
if username in current_app.config['PEOPLE_NOW_PRESENT_ALLOWED']:
people_now_present['names'] = [username]
self['sensors']['people_now_present'][0] = people_now_present
else:
pass
def remove_user_present(self, username):
if self['state']['open'] and 'people_now_present' in self['sensors']:
people_now_present = self['sensors']['people_now_present'][0]
if people_now_present['value'] > 0:
people_now_present['value'] -= 1
if 'names' in people_now_present:
if people_now_present['value'] == 0:
del people_now_present['names']
elif username in people_now_present['names']:
people_now_present['names'].remove(username)
if not people_now_present['names']:
del people_now_present['names']
self['sensors']['people_now_present'][0] = people_now_present
else:
pass
def set_new_state(self, value=None, trigger_person=None, message=None):
if value is not None:
self['state']['open'] = value
if 'TWITTER_CONSUMER_KEY' in current_app.config:
auth = tweepy.OAuthHandler(
current_app.config['TWITTER_CONSUMER_KEY'],
current_app.config['TWITTER_CONSUMER_SECRET']
)
auth.set_access_token(
current_app.config['TWITTER_ACCESS_TOKEN_KEY'],
current_app.config['TWITTER_ACCESS_TOKEN_SECRET']
)
api = tweepy.API(auth)
if value:
api.update_status('The space is now open!')
else:
api.update_status('The space is now closed!')
if not value:
if 'people_now_present' in self['sensors']:
self['sensors']['people_now_present'][0]['value'] = 0
if 'names' in self['sensors']['people_now_present'][0]:
del self['sensors']['people_now_present'][0]['names']
if 'message' in self['state']:
del self['state']['message']
if trigger_person is None:
del self['state']['trigger_person']
if trigger_person is not None:
self['state']['trigger_person'] = trigger_person
if message is not None:
self['state']['message'] = message
if value is not None or trigger_person is not None or message is not None:
self['state']['lastchange'] = int(time())
def request_wants_json():
best = request.accept_mimetypes.best_match(
['application/json', 'text/html']
)
return best == 'application/json' and \
request.accept_mimetypes[best] > \
request.accept_mimetypes['text/html']
def fuzzy_list_find(lst, key, value):
for i, dic in enumerate(lst):
if dic[key] == value:
return i
raise ValueError
def first(iterable, keys):
for key in keys:
if key in iterable:
return key
raise ValueError
|