Files
@ 69ec616687e6
Branch filter:
Location: FVDE/ennstatus/ennstatus/status/functions.py
69ec616687e6
2.9 KiB
text/x-python
Change from address in SMTPHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 | import os
import os.path
import json
from collections import defaultdict
from datetime import datetime
from flask import current_app
from flask_mail import Mail, Message
mail = Mail()
def _send_mail(server_name, status, last_updated):
current_app.logger.debug('Try sending mail')
try:
msg = Message('[Ennstatus] %s went %s' % (server_name, status))
msg.add_recipient(current_app.config['ENNSTATUS_ADMINS'][server_name])
msg.body = ('%s went to %s. I received the last update at %s'
% (server_name, status, last_updated))
mail.send(msg)
except KeyError:
current_app.logger.error('Admin for %s not found!' % server_name)
except Exception as e:
current_app.logger.error('Unexpected error: %s' % e,
exc_info=True)
current_app.logger.debug('Finished trying!')
def _check_server(data):
server_name = data['server_name']
last_updated = data['last_updated']
filename = os.path.join('data', '.'.join([server_name.lower(), 'json']))
date = datetime.strptime(last_updated, '%d-%m-%Y %H:%M:%S')
now = datetime.utcnow()
delta = now - date
if delta.days >= 7:
os.remove(filename)
current_app.logger.error('%s was removed!' % server_name)
return False
elif data['server_status'] == 'Offline':
return data
if delta.seconds >= 3600:
status = 'Offline'
elif delta.seconds >= 1200:
status = 'Unknown'
else:
status = None
if status is not None:
current_app.logger.error('%s is set to %s' % (server_name,
status))
_send_mail(server_name, status, last_updated)
for key in ('server_status', 'tor_status'):
data[key] = status
with open(filename, mode='w', encoding='utf-8') as file_object:
json.dump(data, file_object)
return data
def _load_single_server(filename):
try:
with open(filename, encoding='utf-8') as f:
server = json.load(f)
except IOError:
return False
server = _check_server(server)
return server
def single_server(name):
filename = ''.join(['data/', name, '.json'])
return _load_single_server(filename)
def _get_json_files(root, files):
for f in files:
if f.endswith('.json'):
yield os.path.join(root, f)
def all_servers():
for root, _, files in os.walk('data'):
for f in _get_json_files(root, files):
yield _load_single_server(f)
def all_servers_by_type(type):
for server in all_servers():
try:
if server['server_type'] == type:
yield server
except TypeError:
continue
def split_all_servers_to_types():
servers = defaultdict(list)
for server in all_servers():
try:
servers[server['server_type']].append(server)
except TypeError:
continue
return servers
|