Change how cleanup trait fn works using a date cutoff

This commit is contained in:
Ben Grant 2023-05-16 01:37:01 +10:00
parent 3e770a337b
commit 0304f5955d
20 changed files with 237 additions and 131 deletions

2
api/Cargo.lock generated
View file

@ -608,7 +608,7 @@ dependencies = [
] ]
[[package]] [[package]]
name = "crabfit_backend" name = "crabfit-api"
version = "2.0.0" version = "2.0.0"
dependencies = [ dependencies = [
"axum", "axum",

View file

@ -10,6 +10,7 @@ Note, you will need to have the following crates as dependencies in your adaptor
- `common`<br>Includes a trait for implementing your adaptor, as well as structs your adaptor needs to return. - `common`<br>Includes a trait for implementing your adaptor, as well as structs your adaptor needs to return.
- `async-trait`<br>Required because the trait from `common` uses async functions, make sure you include `#[async_trait]` above your trait implementation. - `async-trait`<br>Required because the trait from `common` uses async functions, make sure you include `#[async_trait]` above your trait implementation.
- `chrono`<br>Required to deal with dates in the common structs and trait function signatures.
Once you've created the adaptor, you'll need to make sure it's included as a dependency in the root [`Cargo.toml`](../Cargo.toml), and add a feature flag with the same name. Make sure you also document the new adaptor in the [api readme](../README.md). Once you've created the adaptor, you'll need to make sure it's included as a dependency in the root [`Cargo.toml`](../Cargo.toml), and add a feature flag with the same name. Make sure you also document the new adaptor in the [api readme](../README.md).

View file

@ -2,15 +2,10 @@ use std::{env, error::Error, fmt::Display};
use async_trait::async_trait; use async_trait::async_trait;
use chrono::{DateTime, NaiveDateTime, Utc}; use chrono::{DateTime, NaiveDateTime, Utc};
use common::{ use common::{Adaptor, Event, Person, Stats};
adaptor::Adaptor,
event::{Event, EventDeletion},
person::Person,
stats::Stats,
};
use google_cloud::{ use google_cloud::{
authorize::ApplicationCredentials, authorize::ApplicationCredentials,
datastore::{Client, Filter, FromValue, IntoValue, Key, Query}, datastore::{Client, Filter, FromValue, IntoValue, Key, KeyID, Query},
}; };
use tokio::sync::Mutex; use tokio::sync::Mutex;
@ -95,9 +90,22 @@ impl Adaptor for DatastoreAdaptor {
)) ))
} }
async fn upsert_person(&self, event_id: String, person: Person) -> Result<Person, Self::Error> { async fn upsert_person(
&self,
event_id: String,
person: Person,
) -> Result<Option<Person>, Self::Error> {
let mut client = self.client.lock().await; let mut client = self.client.lock().await;
// Check the event exists
if client
.get::<DatastoreEvent, _>(Key::new(EVENT_KIND).id(event_id.clone()))
.await?
.is_none()
{
return Ok(None);
}
// Check if person exists // Check if person exists
let existing_person = client let existing_person = client
.query( .query(
@ -122,7 +130,7 @@ impl Adaptor for DatastoreAdaptor {
.put((key, DatastorePerson::from_person(person.clone(), event_id))) .put((key, DatastorePerson::from_person(person.clone(), event_id)))
.await?; .await?;
Ok(person) Ok(Some(person))
} }
async fn get_event(&self, id: String) -> Result<Option<Event>, Self::Error> { async fn get_event(&self, id: String) -> Result<Option<Event>, Self::Error> {
@ -151,25 +159,45 @@ impl Adaptor for DatastoreAdaptor {
Ok(event) Ok(event)
} }
async fn delete_event(&self, id: String) -> Result<EventDeletion, Self::Error> { async fn delete_events(&self, cutoff: DateTime<Utc>) -> Result<Stats, Self::Error> {
let mut client = self.client.lock().await; let mut client = self.client.lock().await;
let mut keys_to_delete: Vec<Key> = client let mut keys_to_delete: Vec<Key> = client
.query( .query(Query::new(EVENT_KIND).filter(Filter::LesserThan(
Query::new(PERSON_KIND) "visited".into(),
.filter(Filter::Equal("eventId".into(), id.clone().into_value())), cutoff.timestamp().into_value(),
) )))
.await? .await?
.iter() .iter()
.map(|entity| entity.key().clone()) .map(|entity| entity.key().clone())
.collect(); .collect();
let person_count = keys_to_delete.len().try_into().unwrap(); let event_count = keys_to_delete.len() as i64;
keys_to_delete.insert(0, Key::new(EVENT_KIND).id(id.clone()));
let events_to_delete = keys_to_delete.clone();
for e in events_to_delete.iter() {
if let KeyID::StringID(id) = e.get_id() {
let mut event_people_to_delete: Vec<Key> = client
.query(
Query::new(PERSON_KIND)
.filter(Filter::Equal("eventId".into(), id.clone().into_value())),
)
.await?
.iter()
.map(|entity| entity.key().clone())
.collect();
keys_to_delete.append(&mut event_people_to_delete);
}
}
let person_count = keys_to_delete.len() as i64 - event_count;
client.delete_all(keys_to_delete).await?; client.delete_all(keys_to_delete).await?;
Ok(EventDeletion { id, person_count }) Ok(Stats {
event_count,
person_count,
})
} }
} }

View file

@ -1,13 +1,8 @@
use std::{collections::HashMap, error::Error, fmt::Display}; use std::{collections::HashMap, error::Error, fmt::Display};
use async_trait::async_trait; use async_trait::async_trait;
use chrono::Utc; use chrono::{DateTime, Utc};
use common::{ use common::{Adaptor, Event, Person, Stats};
adaptor::Adaptor,
event::{Event, EventDeletion},
person::Person,
stats::Stats,
};
use tokio::sync::Mutex; use tokio::sync::Mutex;
struct State { struct State {
@ -68,14 +63,23 @@ impl Adaptor for MemoryAdaptor {
)) ))
} }
async fn upsert_person(&self, event_id: String, person: Person) -> Result<Person, Self::Error> { async fn upsert_person(
&self,
event_id: String,
person: Person,
) -> Result<Option<Person>, Self::Error> {
let mut state = self.state.lock().await; let mut state = self.state.lock().await;
// Check event exists
if state.events.get(&event_id).is_none() {
return Ok(None);
}
state state
.people .people
.insert((event_id, person.name.clone()), person.clone()); .insert((event_id, person.name.clone()), person.clone());
Ok(person) Ok(Some(person))
} }
async fn get_event(&self, id: String) -> Result<Option<Event>, Self::Error> { async fn get_event(&self, id: String) -> Result<Option<Event>, Self::Error> {
@ -98,21 +102,38 @@ impl Adaptor for MemoryAdaptor {
Ok(event) Ok(event)
} }
async fn delete_event(&self, id: String) -> Result<EventDeletion, Self::Error> { async fn delete_events(&self, cutoff: DateTime<Utc>) -> Result<Stats, Self::Error> {
let mut state = self.state.lock().await; let mut state = self.state.lock().await;
let mut person_count: u64 = state.people.len() as u64; // Delete events older than cutoff date
let mut deleted_event_ids: Vec<String> = Vec::new();
state.events = state
.events
.clone()
.into_iter()
.filter(|(id, event)| {
if event.visited_at >= cutoff {
true
} else {
deleted_event_ids.push(id.into());
false
}
})
.collect();
let mut person_count = state.people.len() as i64;
state.people = state state.people = state
.people .people
.clone() .clone()
.into_iter() .into_iter()
.filter(|((event_id, _), _)| event_id != &id) .filter(|((event_id, _), _)| deleted_event_ids.contains(event_id))
.collect(); .collect();
person_count -= state.people.len() as u64; person_count -= state.people.len() as i64;
state.events.remove(&id); Ok(Stats {
event_count: deleted_event_ids.len() as i64,
Ok(EventDeletion { id, person_count }) person_count,
})
} }
} }

View file

@ -1,13 +1,8 @@
use std::{env, error::Error}; use std::{env, error::Error};
use async_trait::async_trait; use async_trait::async_trait;
use chrono::{DateTime as ChronoDateTime, Utc}; use chrono::{DateTime, Utc};
use common::{ use common::{Adaptor, Event, Person, Stats};
adaptor::Adaptor,
event::{Event, EventDeletion},
person::Person,
stats::Stats,
};
use entity::{event, person, stats}; use entity::{event, person, stats};
use migration::{Migrator, MigratorTrait}; use migration::{Migrator, MigratorTrait};
use sea_orm::{ use sea_orm::{
@ -70,7 +65,11 @@ impl Adaptor for SqlAdaptor {
}) })
} }
async fn upsert_person(&self, event_id: String, person: Person) -> Result<Person, Self::Error> { async fn upsert_person(
&self,
event_id: String,
person: Person,
) -> Result<Option<Person>, Self::Error> {
let data = person::ActiveModel { let data = person::ActiveModel {
name: Set(person.name.clone()), name: Set(person.name.clone()),
password_hash: Set(person.password_hash), password_hash: Set(person.password_hash),
@ -79,7 +78,16 @@ impl Adaptor for SqlAdaptor {
event_id: Set(event_id.clone()), event_id: Set(event_id.clone()),
}; };
Ok( // Check if the event exists
if event::Entity::find_by_id(event_id.clone())
.one(&self.db)
.await?
.is_none()
{
return Ok(None);
}
Ok(Some(
match person::Entity::find_by_id((person.name, event_id)) match person::Entity::find_by_id((person.name, event_id))
.one(&self.db) .one(&self.db)
.await? .await?
@ -87,7 +95,7 @@ impl Adaptor for SqlAdaptor {
Some(_) => data.update(&self.db).await?.try_into_model()?.into(), Some(_) => data.update(&self.db).await?.try_into_model()?.into(),
None => data.insert(&self.db).await?.try_into_model()?.into(), None => data.insert(&self.db).await?.try_into_model()?.into(),
}, },
) ))
} }
async fn get_event(&self, id: String) -> Result<Option<Event>, Self::Error> { async fn get_event(&self, id: String) -> Result<Option<Event>, Self::Error> {
@ -118,27 +126,43 @@ impl Adaptor for SqlAdaptor {
.into()) .into())
} }
async fn delete_event(&self, id: String) -> Result<EventDeletion, Self::Error> { async fn delete_events(&self, cutoff: DateTime<Utc>) -> Result<Stats, Self::Error> {
let event_id = id.clone(); let (event_count, person_count) = self
let person_count = self
.db .db
.transaction::<_, u64, DbErr>(|t| { .transaction::<_, (i64, i64), DbErr>(|t| {
Box::pin(async move { Box::pin(async move {
// Get events older than the cutoff date
let old_events = event::Entity::find()
.filter(event::Column::VisitedAt.lt(cutoff.naive_utc()))
.all(t)
.await?;
// Delete people // Delete people
let people_delete_result = person::Entity::delete_many() let mut people_deleted: i64 = 0;
.filter(person::Column::EventId.eq(&event_id)) // TODO: run concurrently
for e in old_events.iter() {
let people_delete_result = person::Entity::delete_many()
.filter(person::Column::EventId.eq(&e.id))
.exec(t)
.await?;
people_deleted += people_delete_result.rows_affected as i64;
}
// Delete events
let event_delete_result = event::Entity::delete_many()
.filter(event::Column::VisitedAt.lt(cutoff.naive_utc()))
.exec(t) .exec(t)
.await?; .await?;
// Delete event Ok((event_delete_result.rows_affected as i64, people_deleted))
event::Entity::delete_by_id(event_id).exec(t).await?;
Ok(people_delete_result.rows_affected)
}) })
}) })
.await?; .await?;
Ok(EventDeletion { id, person_count }) Ok(Stats {
event_count,
person_count,
})
} }
} }
@ -190,8 +214,8 @@ impl From<event::Model> for Event {
Self { Self {
id: value.id, id: value.id,
name: value.name, name: value.name,
created_at: ChronoDateTime::<Utc>::from_utc(value.created_at, Utc), created_at: DateTime::<Utc>::from_utc(value.created_at, Utc),
visited_at: ChronoDateTime::<Utc>::from_utc(value.visited_at, Utc), visited_at: DateTime::<Utc>::from_utc(value.visited_at, Utc),
times: serde_json::from_value(value.times).unwrap_or(vec![]), times: serde_json::from_value(value.times).unwrap_or(vec![]),
timezone: value.timezone, timezone: value.timezone,
} }
@ -203,7 +227,7 @@ impl From<person::Model> for Person {
Self { Self {
name: value.name, name: value.name,
password_hash: value.password_hash, password_hash: value.password_hash,
created_at: ChronoDateTime::<Utc>::from_utc(value.created_at, Utc), created_at: DateTime::<Utc>::from_utc(value.created_at, Utc),
availability: serde_json::from_value(value.availability).unwrap_or(vec![]), availability: serde_json::from_value(value.availability).unwrap_or(vec![]),
} }
} }

View file

@ -1,3 +1,3 @@
# Common # Common
This crate contains the [adaptor trait](./src/adaptor.rs), and structs that are used by it. These are separated into their own crate so that the root crate and the adaptors can import from it without causing a circular dependency. This crate contains the adaptor trait, and structs that are used by it. These are separated into their own crate so that the root crate and the adaptors can import from it without causing a circular dependency.

View file

@ -1,30 +0,0 @@
use std::error::Error;
use async_trait::async_trait;
use crate::{
event::{Event, EventDeletion},
person::Person,
stats::Stats,
};
/// Data storage adaptor, all methods on an adaptor can return an error if
/// something goes wrong, or potentially None if the data requested was not found.
#[async_trait]
pub trait Adaptor: Send + Sync {
type Error: Error;
async fn get_stats(&self) -> Result<Stats, Self::Error>;
async fn increment_stat_event_count(&self) -> Result<i64, Self::Error>;
async fn increment_stat_person_count(&self) -> Result<i64, Self::Error>;
async fn get_people(&self, event_id: String) -> Result<Option<Vec<Person>>, Self::Error>;
async fn upsert_person(&self, event_id: String, person: Person) -> Result<Person, Self::Error>;
/// Get an event and update visited date to current time
async fn get_event(&self, id: String) -> Result<Option<Event>, Self::Error>;
async fn create_event(&self, event: Event) -> Result<Event, Self::Error>;
/// Delete an event as well as all related people
async fn delete_event(&self, id: String) -> Result<EventDeletion, Self::Error>;
}

View file

@ -1,19 +0,0 @@
use chrono::{DateTime, Utc};
#[derive(Clone)]
pub struct Event {
pub id: String,
pub name: String,
pub created_at: DateTime<Utc>,
pub visited_at: DateTime<Utc>,
pub times: Vec<String>,
pub timezone: String,
}
#[derive(Clone)]
/// Info about a deleted event
pub struct EventDeletion {
pub id: String,
/// The amount of people that were in this event that were also deleted
pub person_count: u64,
}

View file

@ -1,4 +1,54 @@
pub mod adaptor; use std::error::Error;
pub mod event;
pub mod person; use async_trait::async_trait;
pub mod stats; use chrono::{DateTime, Utc};
/// Data storage adaptor, all methods on an adaptor can return an error if
/// something goes wrong, or potentially None if the data requested was not found.
#[async_trait]
pub trait Adaptor: Send + Sync {
type Error: Error;
async fn get_stats(&self) -> Result<Stats, Self::Error>;
async fn increment_stat_event_count(&self) -> Result<i64, Self::Error>;
async fn increment_stat_person_count(&self) -> Result<i64, Self::Error>;
async fn get_people(&self, event_id: String) -> Result<Option<Vec<Person>>, Self::Error>;
async fn upsert_person(
&self,
event_id: String,
person: Person,
) -> Result<Option<Person>, Self::Error>;
/// Get an event and update visited date to current time
async fn get_event(&self, id: String) -> Result<Option<Event>, Self::Error>;
async fn create_event(&self, event: Event) -> Result<Event, Self::Error>;
/// Delete events older than a cutoff date, as well as any associated people
/// Returns the amount of events and people deleted
async fn delete_events(&self, cutoff: DateTime<Utc>) -> Result<Stats, Self::Error>;
}
#[derive(Clone)]
pub struct Stats {
pub event_count: i64,
pub person_count: i64,
}
#[derive(Clone)]
pub struct Event {
pub id: String,
pub name: String,
pub created_at: DateTime<Utc>,
pub visited_at: DateTime<Utc>,
pub times: Vec<String>,
pub timezone: String,
}
#[derive(Clone)]
pub struct Person {
pub name: String,
pub password_hash: Option<String>,
pub created_at: DateTime<Utc>,
pub availability: Vec<String>,
}

View file

@ -1,9 +0,0 @@
use chrono::{DateTime, Utc};
#[derive(Clone)]
pub struct Person {
pub name: String,
pub password_hash: Option<String>,
pub created_at: DateTime<Utc>,
pub availability: Vec<String>,
}

View file

@ -1,5 +0,0 @@
#[derive(Clone)]
pub struct Stats {
pub event_count: i64,
pub person_count: i64,
}

View file

@ -17,6 +17,7 @@ use utoipa::{
routes::person::get_people, routes::person::get_people,
routes::person::get_person, routes::person::get_person,
routes::person::update_person, routes::person::update_person,
routes::tasks::cleanup,
), ),
components(schemas( components(schemas(
payloads::StatsResponse, payloads::StatsResponse,

View file

@ -1,5 +1,5 @@
use axum::{http::StatusCode, response::IntoResponse}; use axum::{http::StatusCode, response::IntoResponse};
use common::adaptor::Adaptor; use common::Adaptor;
pub enum ApiError<A: Adaptor> { pub enum ApiError<A: Adaptor> {
AdaptorError(A::Error), AdaptorError(A::Error),

View file

@ -82,6 +82,7 @@ async fn main() {
"/event/:event_id/people/:person_name", "/event/:event_id/people/:person_name",
patch(person::update_person), patch(person::update_person),
) )
.route("/tasks/cleanup", patch(tasks::cleanup))
.with_state(shared_state) .with_state(shared_state)
.layer(cors) .layer(cors)
.layer(rate_limit) .layer(rate_limit)

View file

@ -1,5 +1,5 @@
use axum::Json; use axum::Json;
use common::{event::Event, person::Person, stats::Stats}; use common::{Event, Person, Stats};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use utoipa::ToSchema; use utoipa::ToSchema;

View file

@ -3,7 +3,7 @@ use axum::{
http::StatusCode, http::StatusCode,
Json, Json,
}; };
use common::{adaptor::Adaptor, event::Event}; use common::{Adaptor, Event};
use rand::{seq::SliceRandom, thread_rng, Rng}; use rand::{seq::SliceRandom, thread_rng, Rng};
use regex::Regex; use regex::Regex;

View file

@ -1,3 +1,4 @@
pub mod event; pub mod event;
pub mod person; pub mod person;
pub mod stats; pub mod stats;
pub mod tasks;

View file

@ -4,7 +4,7 @@ use axum::{
Json, TypedHeader, Json, TypedHeader,
}; };
use base64::{engine::general_purpose, Engine}; use base64::{engine::general_purpose, Engine};
use common::{adaptor::Adaptor, person::Person}; use common::{Adaptor, Person};
use crate::{ use crate::{
errors::ApiError, errors::ApiError,
@ -120,6 +120,7 @@ pub async fn get_person<A: Adaptor>(
) )
.await .await
.map_err(ApiError::AdaptorError)? .map_err(ApiError::AdaptorError)?
.unwrap()
.into(), .into(),
)) ))
} }
@ -189,6 +190,7 @@ pub async fn update_person<A: Adaptor>(
) )
.await .await
.map_err(ApiError::AdaptorError)? .map_err(ApiError::AdaptorError)?
.unwrap()
.into(), .into(),
)) ))
} }

View file

@ -1,5 +1,5 @@
use axum::{extract, Json}; use axum::{extract, Json};
use common::adaptor::Adaptor; use common::Adaptor;
use crate::{ use crate::{
errors::ApiError, errors::ApiError,

40
api/src/routes/tasks.rs Normal file
View file

@ -0,0 +1,40 @@
use std::env;
use axum::{extract, http::HeaderMap};
use common::Adaptor;
use tracing::info;
use crate::{errors::ApiError, State};
#[utoipa::path(
get,
path = "/tasks/cleanup",
responses(
(status = 200, description = "Cleanup complete"),
(status = 401, description = "Missing or incorrect X-Cron-Key header"),
(status = 429, description = "Too many requests"),
),
tag = "tasks",
)]
/// Delete events older than 3 months
pub async fn cleanup<A: Adaptor>(
extract::State(state): State<A>,
headers: HeaderMap,
) -> Result<(), ApiError<A>> {
// Check cron key
let cron_key = headers.get("X-Cron-Key").ok_or(ApiError::NotAuthorized)?;
if let Ok(key) = env::var("CRON_KEY") {
if !key.is_empty() && *cron_key != key {
return Err(ApiError::NotAuthorized);
}
}
info!("Running cleanup task");
let adaptor = &state.lock().await.adaptor;
// TODO:
//let stats = adaptor.get_stats().await.map_err(ApiError::AdaptorError)?;
Ok(())
}