Whoops, didn't commit for a while. Added button toggling logic, doesn't work
This commit is contained in:
parent
da4f4ba151
commit
a60a4c4885
21 changed files with 353 additions and 106 deletions
1
server/Cargo.lock
generated
1
server/Cargo.lock
generated
|
|
@ -1053,6 +1053,7 @@ dependencies = [
|
|||
"derive_builder",
|
||||
"either",
|
||||
"femme",
|
||||
"log",
|
||||
"rocket",
|
||||
"sea-orm",
|
||||
"sea-orm-migration",
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ path = "src/main.rs"
|
|||
[dependencies]
|
||||
chrono = "0.4.26"
|
||||
femme = "2.2.1"
|
||||
log = { version = "0.4.19", features = ["kv_unstable", "kv_unstable_serde"] }
|
||||
sea-orm-migration = "0.11.3"
|
||||
serde_json = "1.0.96"
|
||||
thiserror = "1.0.40"
|
||||
|
|
|
|||
|
|
@ -9,24 +9,21 @@ pub(crate) mod update;
|
|||
use std::{
|
||||
default::default,
|
||||
net::{IpAddr, Ipv4Addr},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use crate::error::Error;
|
||||
use rocket::{
|
||||
fs::{FileServer, NamedFile},
|
||||
response::stream::{Event, EventStream},
|
||||
response::stream::EventStream,
|
||||
routes, Build, Config, Rocket, State,
|
||||
};
|
||||
use sea_orm::DatabaseConnection;
|
||||
|
||||
pub(crate) use error::ErrorResponder;
|
||||
use tokio::sync::{
|
||||
broadcast::{self, error::RecvError, Receiver},
|
||||
RwLock,
|
||||
};
|
||||
use tokio::sync::broadcast::{self, error::RecvError, Sender};
|
||||
|
||||
use self::{error::ApiResult, update::Update};
|
||||
use log::{as_debug, as_serde, debug, trace};
|
||||
|
||||
#[get("/status")]
|
||||
fn status() -> &'static str {
|
||||
|
|
@ -34,14 +31,25 @@ fn status() -> &'static str {
|
|||
}
|
||||
|
||||
#[get("/updates")]
|
||||
async fn stream_updates(rx: &State<Arc<RwLock<Receiver<Update>>>>) -> EventStream![Event + '_] {
|
||||
let rx: Arc<RwLock<Receiver<Update>>> = (rx as &Arc<RwLock<Receiver<Update>>>).clone();
|
||||
async fn stream_updates(tx: &State<Sender<Update>>) -> EventStream![] {
|
||||
let mut rx = tx.subscribe();
|
||||
EventStream![loop {
|
||||
let mut rx = rx.write().await;
|
||||
match rx.recv().await {
|
||||
Ok(update) => yield update.to_event(),
|
||||
Err(RecvError::Closed) => break,
|
||||
Err(RecvError::Lagged(count)) => yield Update::lagged(count).to_event(),
|
||||
let event = rx.recv().await;
|
||||
match event {
|
||||
Ok(update) => {
|
||||
debug!(update = as_serde!(update); "sending update");
|
||||
let event = update.to_event();
|
||||
trace!(event = as_debug!(event); "this event");
|
||||
yield event;
|
||||
}
|
||||
Err(RecvError::Closed) => {
|
||||
warn!("channel closed, ending update stream");
|
||||
break;
|
||||
}
|
||||
Err(RecvError::Lagged(count)) => {
|
||||
warn!(count = count; "receiver lagged, instructing client to refresh");
|
||||
yield Update::lagged(count).to_event();
|
||||
}
|
||||
}
|
||||
}]
|
||||
}
|
||||
|
|
@ -57,7 +65,7 @@ pub(crate) fn start_server(db: DatabaseConnection) -> Rocket<Build> {
|
|||
use groups::*;
|
||||
use ticks::*;
|
||||
use tracks::*;
|
||||
let (tx, rx) = broadcast::channel::<Update>(8);
|
||||
let (tx, _) = broadcast::channel::<Update>(8);
|
||||
let it = rocket::build()
|
||||
.configure(Config {
|
||||
address: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
|
||||
|
|
@ -66,7 +74,6 @@ pub(crate) fn start_server(db: DatabaseConnection) -> Rocket<Build> {
|
|||
.register("/", catchers![spa_index_redirect])
|
||||
.manage(db)
|
||||
.manage(tx)
|
||||
.manage(rx)
|
||||
.mount("/api/v1", routes![status, stream_updates])
|
||||
.mount(
|
||||
"/api/v1/tracks",
|
||||
|
|
@ -78,6 +85,8 @@ pub(crate) fn start_server(db: DatabaseConnection) -> Rocket<Build> {
|
|||
update_track,
|
||||
delete_track,
|
||||
ticked,
|
||||
ticked_on_date,
|
||||
clear_all_ticks,
|
||||
],
|
||||
)
|
||||
.mount(
|
||||
|
|
|
|||
|
|
@ -56,11 +56,9 @@ pub(super) async fn insert_track(
|
|||
db: &State<DatabaseConnection>,
|
||||
track: Json<serde_json::Value>,
|
||||
) -> ApiResult<Json<tracks::Model>> {
|
||||
let mut track = track.0;
|
||||
let track = track.0;
|
||||
let db = db as &DatabaseConnection;
|
||||
let mut model: tracks::ActiveModel = default();
|
||||
track["id"] = 0.into(); // dummy value. set_from_json doesn't use this value
|
||||
// but for some reason requires it be set
|
||||
model.set_from_json(track).map_err(Error::from)?;
|
||||
Ok(Json(model.insert(db).await.map_err(Error::from)?))
|
||||
}
|
||||
|
|
@ -106,3 +104,49 @@ pub(super) async fn ticked(
|
|||
.map_err(Error::from)?;
|
||||
Ok(Json(tick))
|
||||
}
|
||||
|
||||
#[patch("/<id>/ticked?<year>&<month>&<day>")]
|
||||
pub(super) async fn ticked_on_date(
|
||||
db: &State<DatabaseConnection>,
|
||||
tx: &State<Sender<Update>>,
|
||||
id: i32,
|
||||
year: i32,
|
||||
month: u32,
|
||||
day: u32,
|
||||
) -> ApiResult<Either<Json<ticks::Model>, Status>> {
|
||||
let Some(date) = Date::from_ymd_opt(year, month, day) else {
|
||||
return Ok(Right(Status::BadRequest));
|
||||
};
|
||||
let tick = ticks::ActiveModel::on(date, id);
|
||||
let tick = tick
|
||||
.insert(db as &DatabaseConnection)
|
||||
.await
|
||||
.map_err(Error::from)?
|
||||
.to_owned();
|
||||
tx.send(Update::tick_added(tick.clone()))
|
||||
.map_err(Error::from)?;
|
||||
Ok(Left(Json(tick)))
|
||||
}
|
||||
|
||||
#[delete("/<id>/all-ticks")]
|
||||
pub(super) async fn clear_all_ticks(
|
||||
db: &State<DatabaseConnection>,
|
||||
tx: &State<Sender<Update>>,
|
||||
id: i32,
|
||||
) -> ApiResult<Either<Status, Json<Vec<ticks::Model>>>> {
|
||||
let db = db as &DatabaseConnection;
|
||||
let Some(track) = Tracks::find_by_id(id).one(db).await.map_err(Error::from)? else {
|
||||
info!(track_id = id; "couldn't drop all ticks for track; track not found");
|
||||
return Ok(Left(Status::NotFound));
|
||||
};
|
||||
let ticks = track
|
||||
.find_related(Ticks)
|
||||
.all(db)
|
||||
.await
|
||||
.map_err(Error::from)?;
|
||||
for tick in ticks.clone() {
|
||||
tick.clone().delete(db).await.map_err(Error::from)?;
|
||||
Update::tick_cancelled(tick).send(&tx)?;
|
||||
}
|
||||
Ok(Right(Json(ticks)))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,11 +1,13 @@
|
|||
use log::as_serde;
|
||||
use rocket::response::stream::Event;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::json;
|
||||
use tokio::sync::broadcast::Sender;
|
||||
|
||||
use crate::entities::ticks;
|
||||
use crate::{entities::ticks, error::Result};
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub(crate) enum Update {
|
||||
pub enum Update {
|
||||
TickChanged {
|
||||
kind: UpdateType,
|
||||
tick: ticks::Model,
|
||||
|
|
@ -18,32 +20,35 @@ pub(crate) enum Update {
|
|||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub(crate) enum UpdateType {
|
||||
pub enum UpdateType {
|
||||
TickAdded,
|
||||
TickDropped,
|
||||
Error,
|
||||
}
|
||||
|
||||
impl Update {
|
||||
pub(crate) fn lagged(count: u64) -> Update {
|
||||
pub fn lagged(count: u64) -> Update {
|
||||
Update::Lagged {
|
||||
kind: UpdateType::Error,
|
||||
count,
|
||||
}
|
||||
}
|
||||
pub(crate) fn tick_added(tick: ticks::Model) -> Self {
|
||||
|
||||
pub fn tick_added(tick: ticks::Model) -> Self {
|
||||
Self::TickChanged {
|
||||
kind: UpdateType::TickAdded,
|
||||
tick,
|
||||
}
|
||||
}
|
||||
pub(crate) fn tick_cancelled(tick: ticks::Model) -> Self {
|
||||
|
||||
pub fn tick_cancelled(tick: ticks::Model) -> Self {
|
||||
Self::TickChanged {
|
||||
kind: UpdateType::TickDropped,
|
||||
tick,
|
||||
}
|
||||
}
|
||||
pub(crate) fn to_event(&self) -> Event {
|
||||
|
||||
pub fn to_event(&self) -> Event {
|
||||
use Update::*;
|
||||
match self {
|
||||
TickChanged { kind, tick } => Event::json(tick).event(format!("{kind:?}")),
|
||||
|
|
@ -53,4 +58,10 @@ impl Update {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn send(self, tx: &Sender<Self>) -> Result<()> {
|
||||
let count = tx.send(self.clone())?;
|
||||
trace!(sent_to = count, update = as_serde!(self); "sent update to SSE channel");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -45,7 +45,7 @@ fn get_env_var_or_file<A: AsRef<OsStr>>(key: A) -> Option<String> {
|
|||
|
||||
/// Connect to the database using environment variables for configuration.
|
||||
/// Panics on any failure.
|
||||
pub(crate) fn connection_url() -> String {
|
||||
pub fn connection_url() -> String {
|
||||
let user = get_env_var_or_file("POSTGRES_USER").expect("$POSTGRES_USER");
|
||||
let password = get_env_var_or_file("POSTGRES_PASSWORD").expect("$POSTGRES_PASSWORD");
|
||||
let db = get_env_var_or_file("POSTGRES_DB").expect("$POSTGRES_DB");
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize};
|
|||
#[sea_orm(table_name = "ticks")]
|
||||
pub struct Model {
|
||||
#[sea_orm(primary_key)]
|
||||
#[serde(skip_deserializing)]
|
||||
pub id: i32,
|
||||
pub track_id: Option<i32>,
|
||||
pub year: Option<i32>,
|
||||
|
|
@ -62,4 +63,24 @@ impl ActiveModel {
|
|||
..default()
|
||||
}
|
||||
}
|
||||
pub(crate) fn on(date: Date, track_id: i32) -> Self {
|
||||
use sea_orm::ActiveValue::Set;
|
||||
let now = Utc::now();
|
||||
Self {
|
||||
track_id: Set(Some(track_id)),
|
||||
year: Set(Some(date.year())),
|
||||
month: Set(date.month().try_into().ok()),
|
||||
/* ^^^^^^^^^^^^^^^^^^^^^^^
|
||||
* I can't imagine a situation where this doesn't fit. This way, at
|
||||
* least, if it fails, you just get a messed up database entry that
|
||||
* doesn't do anything bad
|
||||
*/
|
||||
day: Set(date.day().try_into().ok()),
|
||||
hour: Set(now.hour().try_into().ok()),
|
||||
minute: Set(now.minute().try_into().ok()),
|
||||
second: Set(now.second().try_into().ok()),
|
||||
has_time_info: Set(Some(1)),
|
||||
..default()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ use serde::{Deserialize, Serialize};
|
|||
#[sea_orm(table_name = "tracks")]
|
||||
pub struct Model {
|
||||
#[sea_orm(primary_key)]
|
||||
#[serde(skip_deserializing)]
|
||||
pub id: i32,
|
||||
pub name: String,
|
||||
pub description: String,
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@ use std::string;
|
|||
use derive_builder::UninitializedFieldError;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum Error {
|
||||
pub enum Error {
|
||||
#[error(transparent)]
|
||||
Builder(#[from] UninitializedFieldError),
|
||||
#[error(transparent)]
|
||||
|
|
@ -20,4 +20,4 @@ pub(crate) enum Error {
|
|||
ChannelSendError(#[from] tokio::sync::broadcast::error::SendError<crate::api::update::Update>),
|
||||
}
|
||||
|
||||
pub(crate) type Result<T> = std::result::Result<T, Error>;
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue