Add update subscription endpoint

This commit is contained in:
D. Scott Boggs 2023-06-18 08:11:11 -04:00
parent 3553743a9a
commit c22241008e
5 changed files with 123 additions and 23 deletions

View file

@ -4,25 +4,48 @@ mod groups;
mod import; mod import;
mod ticks; mod ticks;
mod tracks; mod tracks;
pub(crate) mod update;
use std::default::default; use std::{
use std::net::{IpAddr, Ipv4Addr}; default::default,
net::{IpAddr, Ipv4Addr},
sync::Arc,
};
use crate::error::Error; use crate::error::Error;
use crate::rocket::{Build, Rocket}; use rocket::{
use rocket::fs::{FileServer, NamedFile}; fs::{FileServer, NamedFile},
use rocket::{routes, Config}; response::stream::{Event, EventStream},
routes, Build, Config, Rocket, State,
};
use sea_orm::DatabaseConnection; use sea_orm::DatabaseConnection;
pub(crate) use error::ErrorResponder; pub(crate) use error::ErrorResponder;
use tokio::sync::{
broadcast::{self, error::RecvError, Receiver},
RwLock,
};
use self::error::ApiResult; use self::{error::ApiResult, update::Update};
#[get("/status")] #[get("/status")]
fn status() -> &'static str { fn status() -> &'static str {
"Ok" "Ok"
} }
#[get("/updates")]
async fn stream_updates(rx: &State<Arc<RwLock<Receiver<Update>>>>) -> EventStream![Event + '_] {
let rx: Arc<RwLock<Receiver<Update>>> = (rx as &Arc<RwLock<Receiver<Update>>>).clone();
EventStream![loop {
let mut rx = rx.write().await;
match rx.recv().await {
Ok(update) => yield update.to_event(),
Err(RecvError::Closed) => break,
Err(RecvError::Lagged(count)) => yield Update::lagged(count).to_event(),
}
}]
}
#[catch(404)] #[catch(404)]
async fn spa_index_redirect() -> ApiResult<NamedFile> { async fn spa_index_redirect() -> ApiResult<NamedFile> {
Ok(NamedFile::open("/src/public/index.html") Ok(NamedFile::open("/src/public/index.html")
@ -34,6 +57,7 @@ pub(crate) fn start_server(db: DatabaseConnection) -> Rocket<Build> {
use groups::*; use groups::*;
use ticks::*; use ticks::*;
use tracks::*; use tracks::*;
let (tx, rx) = broadcast::channel::<Update>(8);
let it = rocket::build() let it = rocket::build()
.configure(Config { .configure(Config {
address: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), address: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
@ -41,7 +65,9 @@ pub(crate) fn start_server(db: DatabaseConnection) -> Rocket<Build> {
}) })
.register("/", catchers![spa_index_redirect]) .register("/", catchers![spa_index_redirect])
.manage(db) .manage(db)
.mount("/api/v1", routes![status]) .manage(tx)
.manage(rx)
.mount("/api/v1", routes![status, stream_updates])
.mount( .mount(
"/api/v1/tracks", "/api/v1/tracks",
routes![ routes![
@ -50,7 +76,8 @@ pub(crate) fn start_server(db: DatabaseConnection) -> Rocket<Build> {
ticks_for_track, ticks_for_track,
insert_track, insert_track,
update_track, update_track,
delete_track delete_track,
ticked,
], ],
) )
.mount( .mount(

View file

@ -1,13 +1,14 @@
use either::{Either, Left, Right}; use either::{Either, Left, Right};
use rocket::{http::Status, serde::json::Json, State}; use rocket::{http::Status, serde::json::Json, State};
use sea_orm::{prelude::*, DatabaseConnection}; use sea_orm::{prelude::*, DatabaseConnection};
use tokio::sync::broadcast::Sender;
use crate::{ use crate::{
entities::{prelude::*, *}, entities::{prelude::*, *},
error::Error, error::Error,
}; };
use super::error::ApiResult; use super::{error::ApiResult, update::Update};
#[get("/")] #[get("/")]
pub(super) async fn all_ticks( pub(super) async fn all_ticks(
@ -59,10 +60,18 @@ pub(super) async fn update_tick(
} }
#[delete("/<id>")] #[delete("/<id>")]
pub(super) async fn delete_tick(db: &State<DatabaseConnection>, id: i32) -> ApiResult<Status> { pub(super) async fn delete_tick(
Ticks::delete_by_id(id) db: &State<DatabaseConnection>,
.exec(db as &DatabaseConnection) tx: &State<Sender<Update>>,
.await id: i32,
.map_err(Error::from)?; ) -> ApiResult<Status> {
Ok(Status::Ok) let db = db as &DatabaseConnection;
let tick = Ticks::find_by_id(id).one(db).await.map_err(Error::from)?;
if let Some(tick) = tick {
tick.clone().delete(db).await.map_err(Error::from)?;
tx.send(Update::tick_cancelled(tick)).map_err(Error::from)?;
Ok(Status::Ok)
} else {
Ok(Status::NotFound)
}
} }

View file

@ -6,6 +6,9 @@ use rocket::http::Status;
use rocket::{serde::json::Json, State}; use rocket::{serde::json::Json, State};
use sea_orm::{prelude::*, DatabaseConnection}; use sea_orm::{prelude::*, DatabaseConnection};
use std::default::default; use std::default::default;
use tokio::sync::broadcast::Sender;
use super::update::Update;
#[get("/")] #[get("/")]
pub(super) async fn all_tracks( pub(super) async fn all_tracks(
@ -90,13 +93,16 @@ pub(super) async fn delete_track(db: &State<DatabaseConnection>, id: i32) -> Api
#[patch("/<id>/ticked")] #[patch("/<id>/ticked")]
pub(super) async fn ticked( pub(super) async fn ticked(
db: &State<DatabaseConnection>, db: &State<DatabaseConnection>,
tx: &State<Sender<Update>>,
id: i32, id: i32,
) -> ApiResult<Json<ticks::Model>> { ) -> ApiResult<Json<ticks::Model>> {
let tick = ticks::ActiveModel::now(id); let tick = ticks::ActiveModel::now(id);
Ok(Json( let tick = tick
tick.insert(db as &DatabaseConnection) .insert(db as &DatabaseConnection)
.await .await
.map_err(Error::from)? .map_err(Error::from)?
.to_owned(), .to_owned();
)) tx.send(Update::tick_added(tick.clone()))
.map_err(Error::from)?;
Ok(Json(tick))
} }

56
server/src/api/update.rs Normal file
View file

@ -0,0 +1,56 @@
use rocket::response::stream::Event;
use serde::{Deserialize, Serialize};
use serde_json::json;
use crate::entities::ticks;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) enum Update {
TickChanged {
kind: UpdateType,
tick: ticks::Model,
},
Lagged {
kind: UpdateType,
count: u64,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub(crate) enum UpdateType {
TickAdded,
TickDropped,
Error,
}
impl Update {
pub(crate) fn lagged(count: u64) -> Update {
Update::Lagged {
kind: UpdateType::Error,
count,
}
}
pub(crate) fn tick_added(tick: ticks::Model) -> Self {
Self::TickChanged {
kind: UpdateType::TickAdded,
tick,
}
}
pub(crate) fn tick_cancelled(tick: ticks::Model) -> Self {
Self::TickChanged {
kind: UpdateType::TickDropped,
tick,
}
}
pub(crate) fn to_event(&self) -> Event {
use Update::*;
match self {
TickChanged { kind, tick } => Event::json(tick).event(format!("{kind:?}")),
Lagged { kind, count } => {
Event::json(&json! {{"message": "error: lagged", "count": count}})
.event(format!("{kind:?}"))
}
}
}
}

View file

@ -3,7 +3,7 @@ use std::string;
use derive_builder::UninitializedFieldError; use derive_builder::UninitializedFieldError;
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum Error { pub(crate) enum Error {
#[error(transparent)] #[error(transparent)]
Builder(#[from] UninitializedFieldError), Builder(#[from] UninitializedFieldError),
#[error(transparent)] #[error(transparent)]
@ -16,6 +16,8 @@ pub enum Error {
Unreachable, Unreachable,
#[error(transparent)] #[error(transparent)]
Utf8(#[from] string::FromUtf8Error), Utf8(#[from] string::FromUtf8Error),
#[error(transparent)]
ChannelSendError(#[from] tokio::sync::broadcast::error::SendError<crate::api::update::Update>),
} }
pub type Result<T> = std::result::Result<T, Error>; pub(crate) type Result<T> = std::result::Result<T, Error>;