Compare commits

..

2 commits

Author SHA1 Message Date
D. Scott Boggs 313733f1e8 Add test CI job
Some checks are pending
/ run-tests (push) Waiting to run
2025-06-01 12:41:36 -04:00
D. Scott Boggs 600a6af27a Add invitations 2025-06-01 11:55:03 -04:00
12 changed files with 471 additions and 19 deletions

View file

@ -0,0 +1,6 @@
on: [push]
jobs:
run-tests:
runs-on: docker
steps:
- run: echo it works!

5
.prettierrc Normal file
View file

@ -0,0 +1,5 @@
{
"arrowParens": "avoid",
"semi": false,
"singleQuote": true
}

View file

@ -0,0 +1,6 @@
from roc_fnb.website.database import Database
db = Database.from_env()
db.db.users.delete_many({})
db.db.invitations.delete_many({})

View file

@ -1,2 +1,3 @@
#!/usr/bin/env bash
export APP_HOSTNAME=localhost:1312
python -m pytest --capture no --ignore mounts

View file

@ -1 +1,2 @@
from roc_fnb.util.logging import log
from roc_fnb.util.base64 import base64_decode, base64_encode

View file

@ -1,4 +1,6 @@
from datetime import datetime, UTC
from os import environ
from random import randbytes
from typing import Optional, Self
from pdb import set_trace as debugger
@ -6,7 +8,9 @@ from bson.objectid import ObjectId
import pymongo
from pymongo import MongoClient
from roc_fnb.website.models.user import User
from roc_fnb.util.base64 import base64_encode
from roc_fnb.util import log
from roc_fnb.website.models.user import User, JwtUser
KEYLEN = 64
with open('private-key.pem') as pk:
@ -58,3 +62,42 @@ class Database(MongoClient):
if user := self.db.users.find_one({'_id': id}):
return User(**user)
return None
def invite_new_user(self, inviter: JwtUser | User) -> bytes:
"""Save a new invite code to the database and return its raw bytes."""
if inviter_id := inviter._id:
invite_code = randbytes(32)
self.db.invitations.insert_one({
'created_at': datetime.now(UTC).timestamp(),
'invited_user_id': None,
'invite_code': invite_code,
'invited_by': inviter_id,
})
return invite_code
raise ValueError('unsaved user cannot create an invite')
def store_new_invitee(self, invite_code: bytes, invitee: User) -> User:
"""Store the new user if the given invite_code exists and is not already used."""
if invite := self.db.invitations.find_one({'invite_code': invite_code}):
if invite.get('invited_user_id') is None:
self.store_user(invitee)
self.db.invitations.update_one({'_id': invite['_id']}, {'$set': {'invited_user_id': invitee._id}})
return invitee
raise InvitationClaimed(invite, invitee)
raise InvitationNotFound(invite_code, invitee)
#
class InvitationClaimed(ValueError):
def __init__(self, invitation: dict, invitee: User):
self.invitation = invitation
self.invitiee = invitee
emsg = f'{invitee.name} tried to sign up with a used invite code: {base64_encode(invitation["invite_code"])}'
log.error(emsg, invitation=self.invitation, invitee=invitee)
super().__init__(emsg)
class InvitationNotFound(ValueError):
def __init__(self, invitation_code: bytes, invitee: User):
self.invitation_code = base64_encode(invitation_code)
self.invitiee = invitee
emsg = f'{invitee.name} tried to sign up with an invalid invite code: {self.invitation_code}'
log.error(emsg, invitation_code=self.invitation_code, invitee=invitee)
super().__init__(emsg)

View file

@ -16,6 +16,7 @@ with open('private-key.pem') as file:
with open('public-key.pem') as file:
PUBLIC_KEY = file.read()
@dataclass
class JwtUser:
_id: ObjectId
@ -41,17 +42,26 @@ class User:
admin: bool
@classmethod
def create(cls, email: str, name: str, password: str|bytes, moderator: bool = False, admin: bool = False):
def create(
cls,
email: str,
name: str,
password: str | bytes,
moderator: bool = False,
admin: bool = False,
):
"""Alternate constructor which hashes a given password"""
salt = randbytes(32)
password_hash = scrypt.hash(password, salt)
return cls(_id=None,
email=email,
name=name,
password_hash=password_hash,
salt=salt,
moderator=moderator,
admin=admin)
return cls(
_id=None,
email=email,
name=name,
password_hash=password_hash,
salt=salt,
moderator=moderator,
admin=admin,
)
@property
def document(self):

View file

@ -1,16 +1,17 @@
from functools import wraps
import json
from os import environ
from pathlib import Path
from random import randbytes
from sys import stderr
from flask import Flask, redirect
from flask import Flask, redirect, session, g
from roc_fnb.util.env_file import env_file
from roc_fnb.util import log
from roc_fnb.website.database import Database
from roc_fnb.website.server.user import setup_user_routes
from roc_fnb.website.models.user import JwtUser
db = Database.from_env()
@ -21,12 +22,19 @@ app = Flask(
static_folder=Path(__file__).absolute().parent / 'static',
)
app.secret_key = env_file('FLASK_SECRET', default_file='./flask.secret', default_fn=lambda: randbytes(12))
app.secret_key = env_file(
'FLASK_SECRET',
default_file='./flask.secret',
default_fn=lambda: randbytes(12)
)
APP_HOSTNAME = environ["APP_HOSTNAME"]
@app.before_request
def decode_user():
if user := session.get('user'):
g.user = JwtUser.from_json(data=json.loads(user))
g.app_hostname = APP_HOSTNAME
@app.route('/ig')
@ -43,4 +51,5 @@ def donate_redir():
def index():
return redirect('/index.html')
setup_user_routes(app, db)

View file

@ -0,0 +1,129 @@
from bson.objectid import ObjectId
from flask.testing import FlaskClient
import json
from pytest import fixture
from roc_fnb.util import base64_decode
from roc_fnb.website.server import app
from roc_fnb.website.models.user import User
from roc_fnb.website.database import Database
@fixture
def user() -> User:
return User.create('test@t.co', 'name', 'monkey')
@fixture
def database() -> Database:
return Database.from_env()
@fixture
def client():
return app.test_client()
def test_login(user: User, database: Database, client: FlaskClient):
database.store_user(user)
try:
assert user._id is not None # for mypy
resp = client.post(
'/login', json={
'name': user.name,
'password': 'monkey'
}
)
assert resp.json is not None # for mypy
assert resp.json['status'] == "OK"
with client.session_transaction() as session:
session_data = json.loads(session['user'])
assert base64_decode(session_data['_id']) == user._id.binary
assert session_data['name'] == user.name
assert 'password' not in session_data.keys()
assert session_data['email'] == user.email
assert not session.get('admin')
assert not session.get('moderator')
finally:
if _id := user._id:
database.delete_user(_id)
def test_create_user(user: User, database: Database, client: FlaskClient):
user.admin = True
try:
database.store_user(user)
assert user._id is not None # for mypy
resp = client.post(
'/login', json={
'name': user.name,
'password': 'monkey'
}
)
assert resp.json is not None # for mypy
assert resp.json['status'] == "OK"
with client.session_transaction() as session:
assert session.get('user') is not None
session_data = json.loads(session['user'])
assert session_data['admin']
resp = client.get('/invite', headers={'Accept': 'application/json'})
assert resp.status_code == 200
assert resp.json is not None
invite = resp.json['invite']
with client.session_transaction() as session:
del session['user']
_id = None
try:
resp = client.post(
'/new-user',
json={
'name': 'Test 2',
'password': 'hunter2',
'invite_code': invite,
'email': None
}
)
assert resp.status_code == 200
assert resp.json is not None
assert resp.json['status'] == 'OK'
with client.session_transaction() as session:
session_data = json.loads(session['user'])
_id = ObjectId(base64_decode(session_data['_id']))
new_user = database.get_user_by_id(_id)
assert new_user is not None
assert new_user._id == _id
assert new_user.name == 'Test 2'
assert new_user.check_password('hunter2')
assert not (new_user.admin or new_user.moderator)
invitation_doc = database.db.invitations.find_one({'invite_code': base64_decode(invite)})
assert invitation_doc is not None
assert invitation_doc['invited_user_id'] == _id
assert invitation_doc['invited_by'] == user._id
finally:
if _id is not None:
database.delete_user(_id)
finally:
if u_id := user._id:
database.delete_user(u_id)
def test_create_user_deny_non_admin(user: User, database: Database, client: FlaskClient):
database.store_user(user)
try:
assert user._id is not None # for mypy
resp = client.post(
'/login', json={
'name': user.name,
'password': 'monkey'
}
)
assert resp.json is not None # for mypy
assert resp.json['status'] == "OK"
with client.session_transaction() as session:
assert session.get('user') is not None
resp = client.get('/invite', headers={'Accept': 'application/json'})
assert resp.status_code == 401
finally:
if _id := user._id:
database.delete_user(_id)

View file

@ -1,9 +1,12 @@
import json
from flask import request, redirect, render_template, g, abort
from flask import request, redirect, render_template, g, abort, make_response, flash, jsonify, session
from roc_fnb.util import log
from roc_fnb.util import log, base64_encode, base64_decode
from roc_fnb.website.server.decorators import require_user, logger_request_bindings
from roc_fnb.website.database import InvitationClaimed, InvitationNotFound
from roc_fnb.website.models.user import User
def setup_user_routes(app, db):
@app.post('/login')
@ -16,7 +19,7 @@ def setup_user_routes(app, db):
log.warn('incorrect password submitted', name=form['name'])
abort(401) # unauthorized
session['user'] = json.dumps(user.public_fields)
return redirect('/me')
return jsonify(status='OK')
@app.get('/login')
def render_login_page():
@ -29,3 +32,71 @@ def setup_user_routes(app, db):
@require_user()
def get_profile():
return render_template('profile.html', user=g.user)
@app.get('/invite')
@require_user(admin=True)
def create_invite():
"""
Two handlers in one: JSON and HTML.
First, a user-agent (browser) makes a request for /invite, and gets the
rendered HTML page. Then they click a button which sends a request
specifically asking for a JSON reply. The invitation is created and
returned in a JSON document.
This allows a user to generate more than one invitation code per visit
to the page and avoids accidentally creating an invite code on page load.
"""
if request.headers['Accept'] == 'application/json':
invite = base64_encode(db.invite_new_user(g.user))
log.info('new invitation created', inviter=g.user, invitation_code=invite)
return jsonify(invite=invite, status='OK')
return render_template(
'new_invite.html', user=g.user, app_hostname=g.app_hostname
)
@app.post('/new-user')
def create_new_user():
decoded_invite_code = base64_decode(request.json['invite_code'])
invitee = User.create(
request.json['email'], request.json['name'],
request.json['password']
)
try:
db.store_new_invitee(decoded_invite_code, invitee)
log.info('new user created', user=invitee, invite_code=request.json['invite_code'])
except InvitationClaimed as err:
response = make_response(
json.dumps(
{
'type':
'InvitationClaimed',
'invite_code':
base64_encode(err.invitation['invite_code']),
'message':
str(err)
}
)
)
response.headers['Content-Type'] = 'application/json'
response.status_code = 401
abort(response)
except InvitationNotFound as err:
r = make_response(
json.dumps(
{
'type': 'InvitationNotFound',
'invite_code': err.invitation_code,
'message': str(err)
}
)
)
response.headers['Content-Type'] = 'application/json'
response.status_code = 404
abort(response)
session['user'] = json.dumps(invitee.public_fields)
return jsonify(status='OK')
@app.get('/new-user')
def render_signup_page():
return render_template('sign-up.html')

View file

@ -0,0 +1,80 @@
{% extends "base.html" %}
<div></div>
{% block content %}
<script>
let inviteCode
function inviteElements() {
return {
doneView: document.getElementById('done-view'),
inviteDisplay: document.getElementById('invite-code'),
}
}
function copyInviteCodeToClipboard() {
navigator.clipboard
.writeText(inviteCode)
.then(() => alert('invite code copied to clipboard'))
}
async function newInviteCode() {
const { doneView, inviteDisplay } = inviteElements()
const response = await fetch('/invite', {
headers: { Accept: 'application/json' },
})
if (response.ok) {
const { invite } = await response.json()
inviteDisplay.innerText = inviteCode = invite
doneView.style.display = 'inherit'
} else {
console.dir('response')
alert('error creating new invite code')
}
}
async function deleteInvite() {
const { doneView, inviteDisplay } = inviteElements()
const response = await fetch('/invite', {
method: 'DELETE',
body: JSON.stringify({ invite_code: inviteCode }),
headers: { 'Content-Type': 'application/json' },
})
if (response.ok) {
inviteCode = null
inviteDisplay.innerText = ""
doneView.display = 'none'
}
}
document.addEventListener('readystatechange', () => {
if (document.readyState === 'complete') {
const doneView = inviteElements()
doneView.style.display = 'none'
}
})
</script>
<img
id="biglogo"
class="spinny"
onclick="start_animation()"
src="logo.png"
alt="logo"
/>
<h1>Rochester Food Not Bombs!</h1>
<div>
<div>
<label for="invite-description"
>Describe the invitation for future reference</label
>
</div>
<button>Click here to generate a new invite code</button>
</div>
<div id="done-view">
<p>
Tell the new user to visit
<a href="/new-user">https://{{app_hostname}}/new-user</a>.
</p>
<p>They will need this invite code:</p>
<pre id="invite-code"></pre>
<button onclick="copyInviteCodeToClipboard()">Click here to copy it</button>
<button onclick="deleteInvite()">Delete invite</button>
</div>
{% endblock %}

View file

@ -0,0 +1,91 @@
{% extends "base.html" %} {% block content %}
<script>
document.addEventListener("readystatechange", () => {
if (document.readyState === "complete") {
/**
* @type {HTMLInputElement}
*/
const nameInput = document.getElementById("input-name");
/**
* @type {HTMLInputElement}
*/
const passwordInput = document.getElementById("input-password");
/**
* @type {HTMLInputElement}
*/
const emailInput = document.getElementById("input-name");
/**
* @type {HTMLInputElement}
*/
const pwConfInput = document.getElementById("input-password");
/**
* @type {HTMLInputElement}
*/
const inviteCodeInput = document.getElementById("input-password");
/**
* @type {HTMLButtonElement}
*/
const button = document.getElementById("submit-button");
button.addEventListener("click", async (event) => {
const name = nameInput.value;
const password = passwordInput.value;
if (password !== pwConfInput.value) {
// TODO better handle errors
alert("passwords do not match")
return
}
const invite_code = inviteCodeInput.value
const email = emailInput.value
const result = await fetch("/login", {
method: "POST",
body: JSON.stringify({ name, password, invite_code, email }),
headers: {'Content-Type': 'application/json'}
});
if (result.ok) {
window.location = '/me'
} else {
console.dir(result)
// TODO handle error!
}
});
}
});
</script>
<img
id="biglogo"
class="spinny"
onclick="start_animation()"
src="logo.png"
alt="logo"
/>
<h1>Rochester Food Not Bombs!</h1>
<h3>Login:</h3>
<div>
<label for="name">Your name</label>
<input type="text" id="input-name" />
</div>
<div>
<label for="password">Your password</label>
<input type="password" id="input-password" />
</div>
<div>
<label for="password">Confirm your password</label>
<input type="password" id="input-password" />
</div>
<div>
<label for="invite_code">Invitation Code</label>
<input type="password" id="input-invitation" />
</div>
<div>
<label for="email">Entering an email makes web site administration easier</label>
<input type="email" name="email" id="input-email" />
</div>
<div>
<button id="submit-button" type="submit">Log in</button>
</div>
{% endblock %}