Changeset - 2cef966f277b
[Not reviewed]
version_5
0 1 1
Dennis Fink - 10 years ago 2015-08-30 15:32:48
dennis.fink@c3l.lu
Added new check_ip function
2 files changed with 13 insertions and 5 deletions:
0 comments (0 inline, 0 general)
ennstatus/api/model.py
Show inline comments
 
import ipaddress
 
import json
 
import functools
 
import statistics
 

	
 
from pathlib import Path
 
from datetime import datetime
 

	
 
import jsonschema
 
import strict_rfc3339
 
import requests
 

	
 
from flask import current_app
 
from pkg_resources import resource_filename
 

	
 
from ..utils import check_ip
 

	
 

	
 
schema = json.load(
 
    open(
 
        resource_filename('ennstatus.api', 'schema/server.json'),
 
        encoding='utf-8'
 
    )
 
)
 

	
 
validate = functools.partial(
 
    jsonschema.validate,
 
    schema=schema,
 
    format_checker=jsonschema.FormatChecker()
 
)
 

	
 

	
 
def calculate_weight(data):
 

	
 
    obj = {}
 

	
 
    for subkey in ('1_week', '1_month', '3_months', '1_year', '5_years'):
 

	
 
        subdata = data[subkey]
 
        factor = subdata['factor']
 

	
 
        values = [x * factor for x in subdata['values'] if x is not None]
 

	
 
        if values:
 
            obj[subkey] = statistics.mean(values) * 100
 
        else:
 
            obj[subkey] = None
 

	
 
    return obj
 

	
 

	
 
class ServerEncoder(json.JSONEncoder):
 

	
 
    def default(self, obj):
 

	
 
        if isinstance(obj, (ipaddress.IPv4Address, ipaddress.IPv6Address)):
 
            return str(obj)
 

	
 
        if isinstance(obj, datetime):
 
            return strict_rfc3339.timestamp_to_rfc3339_utcoffset(
 
                obj.timestamp()
 
            )
 

	
 
        return json.JSONEncoder.default(self, obj)
 

	
 

	
 
@@ -134,102 +137,98 @@ class Server:
 

	
 
    @classmethod
 
    def from_file_by_name(cls, name):
 

	
 
        filepath = Path('data') / (name.lower() + '.json')
 

	
 
        current_app.logger.info('Loading {}'.format(str(filepath)))
 
        if filepath.exists() and filepath.is_file():
 
            try:
 
                with filepath.open(encoding='utf-8') as f:
 
                    data = json.load(f, cls=ServerDecoder)
 
            except (IOError, ValueError):
 
                current_app.logger.error('IOError or ValueError')
 
                return False
 
            else:
 
                return cls(**data)
 
        else:
 
            current_app.logger.error('File error!')
 
            return False
 

	
 
    @classmethod
 
    def from_json(cls, server):
 

	
 
        try:
 
            if cls.check_json_format(json.loads(server)):
 
                decoded = json.loads(server, cls=ServerDecoder)
 
                return cls(**decoded)
 
        except (jsonschema.ValidationError, ValueError) as e:
 
            raise e
 

	
 
    @classmethod
 
    def from_dict(cls, server):
 
        return cls.from_json(json.dumps(server))
 

	
 
    def json(self):
 
        return json.dumps(self.__dict__, cls=ServerEncoder)
 

	
 
    @staticmethod
 
    def check_json_format(server):
 

	
 
        try:
 
            validate(server)
 
        except jsonschema.ValidationError as e:
 
            raise e
 

	
 
        for key in ('ip', 'ip6'):
 
            if key in server:
 
                address = ipaddress.ip_address(server[key])
 

	
 
                if any({address.is_private, address.is_multicast,
 
                        address.is_unspecified, address.is_reserved,
 
                        address.is_loopback}):
 
                if not check_ip(address):
 
                    raise ValueError('{} is not accepted!\n'.format(key))
 

	
 
        return True
 

	
 
    def save(self):
 

	
 
        filepath = Path('data') / (self.name.lower() + '.json')
 

	
 
        try:
 
            with filepath.open(mode='w', encoding='utf-8') as f:
 
                json.dump(self.__dict__, f, cls=ServerEncoder)
 
        except Exception as e:
 
            raise e
 

	
 
    def update_weights(self):
 

	
 
        if self.type not in ('exit', 'relay'):
 
            raise NotImplementedError
 

	
 
        url = 'https://onionoo.torproject.org/weights?lookup={}'.format(
 
            self.fingerprint
 
        )
 

	
 
        data = requests.get(url)
 

	
 
        try:
 
            data.raise_for_status()
 
        except requests.HTTPError as e:
 
            raise e
 
        else:
 
            data = data.json()['relays'][0]
 

	
 
        self.mean_consensus_weight = calculate_weight(data['consensus_weight'])
 
        self.mean_exit_probability = calculate_weight(data['exit_probability'])
 
        self.mean_guard_probability = calculate_weight(
 
            data['guard_probability']
 
        )
 
        self.mean_middle_probability = calculate_weight(
 
            data['middle_probability']
 
        )
 
        self.mean_consensus_weight_fraction = calculate_weight(
 
            data['consensus_weight_fraction']
 
        )
 

	
 
    def check_status(self):
 

	
 
        now = datetime.utcnow()
 
        delta = now - self.last_updated
 

	
 
        if delta.seconds >= 3600:
ennstatus/utils.py
Show inline comments
 
new file 100644
 

	
 
def check_ip(ipaddress):
 
    return not any({
 
        ipaddress.is_private,
 
        ipaddress.is_multicast,
 
        ipaddress.is_unspecified,
 
        ipaddress.is_reserved,
 
        ipaddress.is_loopback
 
    })
0 comments (0 inline, 0 general)