proxadmin/app/models.py

461 lines
16 KiB
Python

# FAT MODEL
from werkzeug.security import generate_password_hash, check_password_hash
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
from flask import current_app, request, url_for
from flask_login import UserMixin, AnonymousUserMixin
from app.exceptions import ValidationError
from . import db, lm
import os
import random
import uuid
import base64
import hashlib
import json
from decimal import Decimal
from datetime import date, time, datetime, timedelta
from sortedcontainers import SortedDict
import requests
import onetimepass
class Permission:
DEPLOY = 0x01
ADMINISTER = 0xff
class Role(db.Model):
__tablename__ = 'roles'
pid = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String, unique=True)
default = db.Column(db.Boolean, default=False, index=True)
permissions = db.Column(db.Integer)
users = db.relationship('User', backref='role', lazy='dynamic')
@staticmethod
def insert_roles():
roles = {
'User': (Permission.DEPLOY, True),
'Administrator': (Permission.ADMINISTER, False)
}
for r in roles:
role = Role.query.filter_by(name=r).first()
if role is None:
role = Role(name=r)
role.permissions = roles[r][0]
role.default = roles[r][1]
db.session.add(role)
db.session.commit()
def __repr__(self):
return '<Role %r>' % self.name
class User(db.Model, UserMixin):
__tablename__ = 'users'
pid = db.Column(db.Integer, primary_key=True)
email = db.Column(db.String, unique=True, index=True)
role_id = db.Column(db.ForeignKey('roles.pid')) #FK
password_hash = db.Column(db.String)
tokens = db.Column(db.Text)
confirmed = db.Column(db.Boolean, default=False)
active = db.Column(db.Boolean, default=True)
member_since = db.Column(db.DateTime(), default=datetime.utcnow)
last_seen = db.Column(db.DateTime(), default=datetime.utcnow)
last_ip = db.Column(db.String)
twofactor = db.Column(db.Boolean, default=False) #optional 2factor auth
otp_secret = db.Column(db.String)
avatar_hash = db.Column(db.String)
uuid = db.Column(db.String)
name = db.Column(db.Unicode)
address = db.Column(db.Unicode)
city = db.Column(db.Unicode)
postcode = db.Column(db.String)
country = db.Column(db.String, default='BG')
phone = db.Column(db.String)
org_account = db.Column(db.Boolean, default=False)
org_companyname = db.Column(db.Unicode)
org_regaddress = db.Column(db.Unicode)
org_responsible = db.Column(db.Unicode)
org_vatnum = db.Column(db.String)
group = db.Column(db.String, default='User')
language = db.Column(db.String, default='BG')
wallet = db.Column(db.Float)
currency = db.Column(db.String, default='BGN')
inv_transactions = db.relationship('Transaction', backref='owner', lazy='dynamic')
inv_orders = db.relationship('Order', backref='owner', lazy='dynamic')
inv_servers = db.relationship('Server', backref='owner', lazy='dynamic')
inv_deployments = db.relationship('Deployment', backref='owner', lazy='dynamic')
inv_services = db.relationship('Service', backref='owner', lazy='dynamic')
inv_domains = db.relationship('Domain', backref='owner', lazy='dynamic')
inv_addresses = db.relationship('Address', backref='owner', lazy='dynamic')
def __init__(self, **kwargs):
super(User, self).__init__(**kwargs)
if self.role is None:
if self.email == current_app.config['ADMIN_EMAIL']:
#if email match config admin name create admin user
self.role = Role.query.filter_by(permissions=0xff).first()
if self.role is None:
#if role is stil not set, create default user role
self.role = Role.query.filter_by(default=True).first()
if self.avatar_hash is None and self.email is not None:
self.avatar_hash = hashlib.md5(self.email.encode('utf-8')).hexdigest()
if self.otp_secret is None:
# generate a random secret
self.otp_secret = base64.b32encode(os.urandom(10)).decode('utf-8')
if self.uuid is None:
# generate uuid
self.uuid = uuid.uuid4()
@property
def password(self):
raise AttributeError('password is not a readable attribute')
@password.setter
def password(self, password):
self.password_hash = generate_password_hash(password)
def verify_password(self, password):
return check_password_hash(self.password_hash, password)
def get_totp_uri(self):
return 'otpauth://totp/DataPanel:{0}?secret={1}&issuer=datapanel'.format(self.email, self.otp_secret)
def verify_totp(self, token):
return onetimepass.valid_totp(token, self.otp_secret)
def generate_confirmation_token(self, expiration=86400):
s = Serializer(current_app.config['SECRET_KEY'], expiration)
return s.dumps({'confirm': self.pid})
def confirm(self, token):
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except:
return False
if data.get('confirm') != self.pid:
return False
self.confirmed = True
db.session.add(self)
db.session.commit()
return True
def generate_reset_token(self, expiration=86400):
s = Serializer(current_app.config['SECRET_KEY'], expiration)
return s.dumps({'reset': self.pid})
def reset_password(self, token, new_password):
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except:
return False
if data.get('reset') != self.pid:
return False
self.password = new_password
db.session.add(self)
db.session.commit()
return True
def can(self, permissions):
return self.role is not None and (self.role.permissions & permissions) == permissions
def is_administrator(self):
if self.can(Permission.ADMINISTER):
return True
else:
return False
def ping(self):
self.last_seen = datetime.utcnow()
db.session.add(self)
db.session.commit()
def gravatar(self, size=100, default='identicon', rating='g'):
#this check is disabled because it didnt work for me but forcing https to gravatar is okay.
#if request.is_secure:
# url = 'https://secure.gravatar.com/avatar'
#else:
# url = 'http://www.gravatar.com/avatar'
url = 'https://secure.gravatar.com/avatar'
hash = self.avatar_hash or hashlib.md5(self.email.encode('utf-8')).hexdigest()
return '{url}/{hash}?s={size}&d={default}&r={rating}'.format(url=url, hash=hash, size=size, default=default, rating=rating)
def is_authenticated(self):
return self.is_authenticated
def get_id(self):
return str(self.pid)
def __repr__(self):
return '<User %r>' % self.email
class AnonymousUser(AnonymousUserMixin):
def can(self, permissions):
return False
def is_administrator(self):
return False
lm.anonymous_user = AnonymousUser
@lm.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
def contact_proxmaster(data, method):
data['apikey'] = current_app.config['APIKEY']
data_json = json.dumps(data)
url = current_app.config['PROXMASTER_URL'] + '/' + str(method)
try:
db_result = requests.post( url, data=data_json, headers={"content-type": "application/json"}, timeout=30 )
proxjson = db_result.json()
#if current_app.config['DEBUG'] == 1:
# current_app.logger.info('API> {}'.format(str(proxjson)))
return proxjson
except:
return { 'status': 'UNREACHABLE' }
#MARKET
class Order(db.Model):
__tablename__ = 'orders'
pid = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.ForeignKey('users.pid')) #FK
date_created = db.Column(db.DateTime, default=datetime.utcnow)
status = db.Column(db.String)
recipe_id = db.Column(db.ForeignKey('recipes.pid')) #FK
region_id = db.Column(db.ForeignKey('regions.pid')) #FK
parameter1 = db.Column(db.String)
parameter2 = db.Column(db.String)
parameter3 = db.Column(db.String)
parameter4 = db.Column(db.String)
class Recipe(db.Model):
__tablename__ = 'recipes'
pid = db.Column(db.Integer, primary_key=True)
inv_orders = db.relationship('Order', backref='recipe', lazy='dynamic')
category = db.Column(db.String)
templatefile = db.Column(db.String)
description = db.Column(db.String)
#VDC
class Region(db.Model):
__tablename__ = 'regions'
pid = db.Column(db.Integer, primary_key=True)
inv_servers = db.relationship('Server', backref='region', lazy='dynamic')
inv_addresses = db.relationship('Address', backref='region', lazy='dynamic')
inv_orders = db.relationship('Order', backref='region', lazy='dynamic')
enabled = db.Column(db.Boolean)
name = db.Column(db.String)
description = db.Column(db.String)
extraprice = db.Column(db.Float)
class Server(db.Model):
__tablename__ = 'servers'
pid = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.ForeignKey('users.pid')) #FK
region_id = db.Column(db.ForeignKey('regions.pid')) #FK
enabled = db.Column(db.Boolean)
name = db.Column(db.String)
cpu = db.Column(db.String)
mem = db.Column(db.String)
hdd = db.Column(db.String)
address = db.Column(db.String)
inv_deployments = db.relationship('Deployment', backref='server', lazy='dynamic')
class Deployment(db.Model):
__tablename__ = 'deployments'
pid = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.ForeignKey('users.pid')) #FK
server_id = db.Column(db.ForeignKey('servers.pid')) #FK
topic_id = db.Column(db.ForeignKey('support_topic.pid')) #FK
date_created = db.Column(db.DateTime, default=datetime.utcnow)
deleted = db.Column(db.Boolean, default=False)
enabled = db.Column(db.Boolean, default=True)
warning = db.Column(db.Boolean, default=False)
protected = db.Column(db.Boolean, default=False) #machines with this False will be autodeleted after the warning period is over.
date_last_charge = db.Column(db.DateTime)
discount = db.Column(db.Integer)
period = db.Column(db.Integer)
daysleft = db.Column(db.Integer)
machine_id = db.Column(db.BigInteger) #unit_id
machine_alias = db.Column(db.String) #dns name
machine_cpu = db.Column(db.Integer)
machine_mem = db.Column(db.Integer)
machine_hdd = db.Column(db.Integer)
inv_pubvlans = db.relationship('PubVLAN', backref='deploy', lazy='dynamic')
def __init__(self, **kwargs):
super(Deployment, self).__init__(**kwargs)
if self.topic_id is None:
#create new topic
new_topic = SupportTopic(hashtag='deploy-' + query['unit_id'])
db.session.add(new_topic)
db.session.commit()
self.topic_id = new_topic.pid
class PubVLAN(db.Model):
__tablename__ = 'pubvlans'
pid = db.Column(db.Integer, primary_key=True)
deploy_id = db.Column(db.ForeignKey('deployments.pid')) #FK
vlan_id = db.Column(db.Integer)
pubaddr = db.relationship('Address', uselist=False, backref='assignee')
class Address(db.Model):
__tablename__ = 'address'
pid = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.ForeignKey('users.pid')) #FK
region_id = db.Column(db.ForeignKey('regions.pid')) #FK
pubvlan_id = db.Column(db.ForeignKey('pubvlans.pid')) #FK
enabled = db.Column(db.Boolean)
ip = db.Column(db.String)
#mac = db.Column(db.String)
rdns = db.Column(db.String)
reserved = db.Column(db.Boolean, default=False) #this ip SHOULD NOT be listed as available to assign even if its not currently owned by anyone
#def __init__(self, **kwargs):
# super(Address, self).__init__(**kwargs)
# if self.mac is None:
# self.mac = self.genmac()
#def genmac(self):
# alladdr = Address.query.all()
# allmacs = []
# current_app.logger.info('populating mac addr pool')
# for addr in alladdr:
# allmacs.append(str(addr.mac))
# while True:
# mac = [ random.randint(0, 255) for x in range(0, 6) ]
# mac[0] = (mac[0] & 0xfc) | 0x02
# mac = ':'.join([ '{0:02x}'.format(x) for x in mac ])
# if mac in allmacs:
# current_app.logger.warning('mac address {} is in the pool. regenerating...'.format(mac))
# continue
# else:
# return mac
#MISC ITEMS
class Service(db.Model):
__tablename__ = 'services'
pid = db.Column(db.Integer, primary_key=True) #PK
user_id = db.Column(db.ForeignKey('users.pid')) #FK
date_created = db.Column(db.DateTime, default=datetime.utcnow)
deleted = db.Column(db.Boolean, default=False)
enabled = db.Column(db.Boolean, default=False)
warning = db.Column(db.Boolean, default=False)
date_last_charge = db.Column(db.DateTime)
period = db.Column(db.Integer)
daysleft = db.Column(db.Integer)
category = db.Column(db.String)
description = db.Column(db.Unicode)
price = db.Column(db.Float)
class Domain(db.Model):
__tablename__ = 'domains'
pid = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.ForeignKey('users.pid')) #FK
date_created = db.Column(db.DateTime, default=datetime.utcnow)
deleted = db.Column(db.Boolean, default=False)
date_expire = db.Column(db.DateTime)
daysleft = db.Column(db.Integer)
warning = db.Column(db.Boolean, default=False)
enabled = db.Column(db.Boolean, default=False)
fqdn = db.Column(db.String, unique=True)
auto_update = db.Column(db.Boolean)
#UINVOICE
class Transaction(db.Model):
__tablename__ = 'transaction'
pid = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.ForeignKey('users.pid')) #FK
date_created = db.Column(db.DateTime, default=datetime.utcnow)
currency = db.Column(db.String, default='BGN')
description = db.Column(db.String)
value = db.Column(db.Float)
class Invoice(db.Model):
__tablename__ = 'invoice'
pid = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.ForeignKey('users.pid')) #FK
date_created = db.Column(db.DateTime, default=datetime.utcnow)
invoice_number = db.Column(db.Integer, unique=True)
currency = db.Column(db.String, default='BGN')
tax = db.Column(db.Float) #VAT
description = db.Column(db.Unicode)
def sub_total(self):
items = self.items
sub_total = 0
for item in items:
sub_total += item.total()
return sub_total
def tax_amount(self):
if not self.tax:
return 0
return self.sub_total() * self.tax / 100.0
def grand_total(self):
amount_str = str(self.sub_total() + self.tax_amount())
amount = Decimal(amount_str)
rounder = Decimal("0.05") # precision for rounding
return amount - amount.remainder_near(rounder)
class InvoiceItem(db.Model):
__tablename__ = 'invoice_item'
pid = db.Column(db.Integer, primary_key=True)
item_number = db.Column(db.Integer)
invoice_id = db.Column(db.ForeignKey('invoice.pid')) #FK
item_title = db.Column(db.Unicode)
item_quantity = db.Column(db.Float)
item_price = db.Column(db.Float)
amount = db.Column(db.Float)
invoice = db.relationship(Invoice, backref=db.backref('items', order_by=item_number, cascade="delete"))
def total(self):
return self.item_quantity * self.item_price
#SUPPORT
class SupportTopic(db.Model):
__tablename__ = 'support_topic'
pid = db.Column(db.Integer, primary_key=True)
hashtag = db.Column(db.String) #topic
timestamp = db.Column(db.DateTime(), default=datetime.utcnow)
inv_lines = db.relationship('SupportLine', backref='topic', lazy='dynamic')
inv_deployments = db.relationship('Deployment', backref='topic', lazy='dynamic')
class SupportLine(db.Model):
__tablename__ = 'support_line'
pid = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.ForeignKey('users.pid')) #FK
topic_id = db.Column(db.ForeignKey('support_topic.pid')) #FK
line = db.Column(db.Unicode)
timestamp = db.Column(db.DateTime(), default=datetime.utcnow)