91 lines
2.4 KiB
Python
91 lines
2.4 KiB
Python
from base64 import b64decode, b64encode
|
|
from dataclasses import dataclass
|
|
import json
|
|
from random import randbytes
|
|
from typing import Optional, Any
|
|
|
|
from bson.objectid import ObjectId
|
|
import scrypt
|
|
import jwt
|
|
|
|
from roc_fnb.util.base64 import base64_encode, base64_decode
|
|
|
|
with open('private-key.pem') as file:
|
|
PRIVATE_KEY = file.read()
|
|
|
|
with open('public-key.pem') as file:
|
|
PUBLIC_KEY = file.read()
|
|
|
|
@dataclass
|
|
class JwtUser:
|
|
_id: ObjectId
|
|
email: str
|
|
name: str
|
|
moderator: bool
|
|
admin: bool
|
|
|
|
@dataclass
|
|
class User:
|
|
_id: Optional[ObjectId]
|
|
email: str
|
|
name: str
|
|
password_hash: bytes
|
|
salt: bytes
|
|
moderator: bool
|
|
admin: bool
|
|
|
|
@classmethod
|
|
def create(cls, email: str, name: str, password: str|bytes, moderator: bool = False, admin: bool = False):
|
|
"""Alternate constructor which hashes a given password"""
|
|
salt = randbytes(32)
|
|
password_hash = scrypt.hash(password, salt)
|
|
return cls(_id=None,
|
|
email=email,
|
|
name=name,
|
|
password_hash=password_hash,
|
|
salt=salt,
|
|
moderator=moderator,
|
|
admin=admin)
|
|
|
|
@property
|
|
def document(self):
|
|
doc = {
|
|
"email": self.email,
|
|
"name": self.name,
|
|
"password_hash": self.password_hash,
|
|
"salt": self.salt,
|
|
"moderator": self.moderator,
|
|
"admin": self.admin,
|
|
}
|
|
if self._id is not None:
|
|
doc['_id'] = self._id
|
|
return doc
|
|
|
|
@property
|
|
def public_fields(self):
|
|
return {
|
|
'_id': base64_encode(self._id.binary),
|
|
"email": self.email,
|
|
"name": self.name,
|
|
"moderator": self.moderator,
|
|
"admin": self.admin,
|
|
}
|
|
|
|
def check_password(self, password: str) -> bool:
|
|
return self.password_hash == scrypt.hash(password, self.salt)
|
|
|
|
@property
|
|
def jwt(self) -> str:
|
|
return jwt.encode(self.public_fields, PRIVATE_KEY, algorithm='RS256')
|
|
|
|
@staticmethod
|
|
def verify_jwt(token: str) -> JwtUser:
|
|
verified = jwt.decode(token, PUBLIC_KEY, verify=True, algorithms=['RS256'])
|
|
return JwtUser(
|
|
_id=ObjectId(base64_decode(verified['_id'])),
|
|
name=verified['name'],
|
|
email=verified['email'],
|
|
moderator=verified['moderator'],
|
|
admin=verified['admin'],
|
|
)
|