net/forest/models.py
2024-05-04 02:09:53 +03:00

215 lines
6.8 KiB
Python

from werkzeug.security import generate_password_hash, check_password_hash
from itsdangerous.url_safe import URLSafeTimedSerializer as Serializer
from itsdangerous.exc import BadSignature, SignatureExpired
from flask import current_app, request, url_for
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, UserMixin, AnonymousUserMixin
import os
import random
import uuid
import base64
import hashlib
import json
from datetime import date, time, datetime, timedelta
import onetimepass
db = SQLAlchemy()
lm = LoginManager()
lm.login_view = 'auth.login'
lm.login_message = 'Login Required.'
lm.session_protection = 'strong'
lm.anonymous_user = AnonymousUserMixin
class Permission:
DEPLOY = 0x01
ADMINISTER = 0xff
class Role(db.Model):
__tablename__ = 'roles'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String, unique=True)
default = db.Column(db.Boolean, default=False, index=True)
permissions = db.Column(db.Integer)
users = db.relationship('User', backref='role', lazy='dynamic')
@staticmethod
def insert_roles():
roles = {
'User': (Permission.DEPLOY, True),
'Administrator': (Permission.ADMINISTER, False)
}
for r in roles:
role = Role.query.filter_by(name=r).first()
if role is None:
role = Role(name=r)
role.permissions = roles[r][0]
role.default = roles[r][1]
db.session.add(role)
db.session.commit()
def __repr__(self):
#return '<Role %r>' % self.name
return self.name
class User(db.Model, UserMixin):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
email = db.Column(db.String(128), unique=True, nullable=False)
active = db.Column(db.Boolean(), default=True, nullable=False)
confirmed = db.Column(db.Boolean, default=False)
setup = db.Column(db.Boolean, default=False)
role_id = db.Column(db.ForeignKey('roles.id')) #FK
password_hash = db.Column(db.String)
tokens = db.Column(db.Text)
member_since = db.Column(db.DateTime(), default=datetime.utcnow)
last_seen = db.Column(db.DateTime(), default=datetime.utcnow)
last_ip = db.Column(db.String)
twofactor = db.Column(db.Boolean, default=False) #optional 2factor auth
otp_secret = db.Column(db.String)
avatar_hash = db.Column(db.String)
uuid = db.Column(db.String)
name = db.Column(db.Unicode)
address = db.Column(db.Unicode)
city = db.Column(db.Unicode)
postcode = db.Column(db.String)
country = db.Column(db.String, default='BG')
phone = db.Column(db.String)
inv_items = db.relationship('Item', backref='owner', lazy='dynamic')
def __init__(self, **kwargs):
super(User, self).__init__(**kwargs)
if self.role is None:
if self.email == current_app.config['ADMIN_MAIL']:
#if email match config admin name create admin user
self.role = Role.query.filter_by(permissions=0xff).first()
if self.role is None:
#if role is stil not set, create default user role
self.role = Role.query.filter_by(default=True).first()
if self.avatar_hash is None and self.email is not None:
self.avatar_hash = hashlib.md5(self.email.encode('utf-8')).hexdigest()
if self.otp_secret is None:
# generate a random secret
self.otp_secret = base64.b32encode(os.urandom(10)).decode('utf-8')
if self.uuid is None:
# generate uuid
self.uuid = uuid.uuid4()
@property
def password(self):
raise AttributeError('password is not a readable attribute')
@password.setter
def password(self, password):
self.password_hash = generate_password_hash(password)
def verify_password(self, password):
return check_password_hash(self.password_hash, password)
def get_totp_uri(self):
return 'otpauth://totp/DataPanel:{0}?secret={1}&issuer=datapanel'.format(self.email, self.otp_secret)
def get_otp_secret(self):
return self.otp_secret
def verify_totp(self, token):
return onetimepass.valid_totp(token, self.otp_secret)
def generate_confirmation_token(self):
s = Serializer(current_app.config['SECRET_KEY'])
return s.dumps({'confirm': self.id})
def confirm(self, token):
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except:
return False
if data.get('confirm') != self.id:
return False
self.confirmed = True
db.session.add(self)
db.session.commit()
return True
def generate_reset_token(self):
s = Serializer(current_app.config['SECRET_KEY'])
return s.dumps({'reset': self.id})
def reset_password(self, token, new_password):
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except:
return False
if data.get('reset') != self.id:
return False
self.password = new_password
db.session.add(self)
db.session.commit()
return True
def can(self, permissions):
return self.role is not None and (self.role.permissions & permissions) == permissions
def is_administrator(self):
if self.can(Permission.ADMINISTER):
return True
else:
return False
def ping(self):
self.last_seen = datetime.utcnow()
db.session.add(self)
db.session.commit()
def gravatar(self, size=100, default='identicon', rating='g'):
#this check is disabled because it didnt work for me but forcing https to gravatar is okay.
#if request.is_secure:
# url = 'https://secure.gravatar.com/avatar'
#else:
# url = 'http://www.gravatar.com/avatar'
url = 'https://secure.gravatar.com/avatar'
hash = self.avatar_hash or hashlib.md5(self.email.encode('utf-8')).hexdigest()
return '{url}/{hash}?s={size}&d={default}&r={rating}'.format(url=url, hash=hash, size=size, default=default, rating=rating)
def is_authenticated(self):
return self.is_authenticated
def get_id(self):
return str(self.id)
def __repr__(self):
return '<User %r>' % self.email
class AnonymousUser(AnonymousUserMixin):
def can(self, permissions):
return False
def is_administrator(self):
return False
@lm.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
#ITEMS
class Item(db.Model):
__tablename__ = 'items'
id = db.Column(db.Integer, primary_key=True) #PK
user_id = db.Column(db.ForeignKey('users.id')) #FK
key = db.Column(db.Integer, default=0)
description = db.Column(db.Unicode)
date_created = db.Column(db.DateTime, default=datetime.utcnow)