From b5565327e875e91fa3c7c9759fe9ee24e92dcbe0 Mon Sep 17 00:00:00 2001 From: Hien Pham Date: Sun, 5 Nov 2023 09:43:39 +0700 Subject: [PATCH 1/2] feat: impl using sqlx instead of diesel for fully async support --- Cargo.toml | 3 +- src/apps/actix/server.rs | 2 +- src/apps/axum/server.rs | 2 +- src/apps/cli/mod.rs | 2 +- src/apps/warp/mod.rs | 9 +++--- src/bin/warp.rs | 2 +- src/container/mod.rs | 13 +++----- src/infra/db.rs | 36 +++++++++++++++++++++ src/infra/mod.rs | 2 ++ src/lib.rs | 1 + src/users/mod.rs | 2 ++ src/users/user_repo.rs | 69 ++++++++++++++++++++++++++++++++++++++++ 12 files changed, 125 insertions(+), 18 deletions(-) create mode 100644 src/infra/db.rs create mode 100644 src/infra/mod.rs create mode 100644 src/users/user_repo.rs diff --git a/Cargo.toml b/Cargo.toml index 3313b02..55fe445 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,7 +50,8 @@ async-trait = "0.1.74" diesel = { version = "2.1.3", features = ["chrono", "mysql", "postgres", "sqlite", "r2d2"] } r2d2 = "0.8.10" -sqlx = { version = "0.7.2", features = [ "runtime-tokio-rustls" , "postgres", "chrono", "migrate", "uuid", "json"], default-features = false } +sqlx = { version = "0.7.2", features = [ "runtime-tokio" ,"tls-rustls", "postgres", "sqlite", "chrono", "migrate", "uuid", "json", "macros"], default-features = false } + sea-orm = { version = "0.12.4", features = [ "sqlx-postgres", "runtime-tokio-rustls", "macros", "debug-print", "with-chrono" ], default-features = false } [build-dependencies] diff --git a/src/apps/actix/server.rs b/src/apps/actix/server.rs index 264ba0c..90b42bf 100644 --- a/src/apps/actix/server.rs +++ b/src/apps/actix/server.rs @@ -2,7 +2,7 @@ use actix_web::{web, App, HttpServer}; pub async fn serve() -> std::io::Result<()> { // construct di - let user_component = crate::container::UserContainer::new(); + let user_component = crate::container::UserContainer::new().await; let user_security_service = user_component.user_security_service.clone(); let user_service = user_component.user_service.clone(); diff --git a/src/apps/axum/server.rs b/src/apps/axum/server.rs index 0eb5b91..8322235 100644 --- a/src/apps/axum/server.rs +++ b/src/apps/axum/server.rs @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize}; pub async fn serve() -> std::io::Result<()> { // construct di - let user_component = crate::container::UserContainer::new(); + let user_component = crate::container::UserContainer::new().await; let app = Router::new() .route("/", get(health)) diff --git a/src/apps/cli/mod.rs b/src/apps/cli/mod.rs index e7a9b67..58b0c80 100644 --- a/src/apps/cli/mod.rs +++ b/src/apps/cli/mod.rs @@ -3,7 +3,7 @@ use chrono::Utc; use crate::users::User; pub async fn run() -> std::io::Result<()> { - let cmp = crate::container::UserContainer::new(); + let cmp = crate::container::UserContainer::new().await; let user_service = cmp.user_service; let now = Utc::now().naive_utc(); let user = User { diff --git a/src/apps/warp/mod.rs b/src/apps/warp/mod.rs index faa0ee2..9ae114b 100644 --- a/src/apps/warp/mod.rs +++ b/src/apps/warp/mod.rs @@ -1,14 +1,13 @@ -use warp::Filter; use crate::core::QueryParamsImpl; +use warp::Filter; +pub mod error; pub mod health; pub mod users; -pub mod error; - -pub fn routes() -> impl Filter + Clone { +pub async fn routes() -> impl Filter + Clone { // construct di - let user_component = crate::container::UserContainer::new(); + let user_component = crate::container::UserContainer::new().await; let user_service = user_component.user_service.clone(); let index = warp::path::end() diff --git a/src/bin/warp.rs b/src/bin/warp.rs index 4e5770d..656253d 100644 --- a/src/bin/warp.rs +++ b/src/bin/warp.rs @@ -1,6 +1,6 @@ #[tokio::main] async fn main() { - let routes = rwebapi::apps::warp::routes(); + let routes = rwebapi::apps::warp::routes().await; let port = 8000; println!("Listenting {}", port); warp::serve(routes).run(([0, 0, 0, 0], port)).await; diff --git a/src/container/mod.rs b/src/container/mod.rs index bd652ca..ccecb6e 100644 --- a/src/container/mod.rs +++ b/src/container/mod.rs @@ -8,7 +8,10 @@ pub struct UserContainer { } impl UserContainer { - pub fn new() -> Self { + pub async fn new() -> Self { + let sqlx_pool = crate::infra::db_conn().await; + let sqlx_user_repo: Arc = + Arc::new(crate::users::UserSqlxRepoImpl::new(sqlx_pool.clone())); let pool = Arc::new(crate::diesel_impl::db_pool()); let user_repo: Arc = Arc::new(crate::diesel_impl::UserDieselImpl::new(pool)); @@ -20,7 +23,7 @@ impl UserContainer { }); let user_service: Arc = Arc::new(UserServiceImpl { - user_repo: user_repo.clone(), + user_repo: sqlx_user_repo.clone(), user_security: user_security_service.clone(), }); let user_auth_service: Arc = Arc::new(UserAuthServiceImpl { @@ -35,9 +38,3 @@ impl UserContainer { } } } - -impl Default for UserContainer { - fn default() -> Self { - Self::new() - } -} diff --git a/src/infra/db.rs b/src/infra/db.rs new file mode 100644 index 0000000..b9171ec --- /dev/null +++ b/src/infra/db.rs @@ -0,0 +1,36 @@ +#[cfg(feature = "postgres")] +pub type DBConn = sqlx::postgres::PgPool; + +#[cfg(feature = "sqlite")] +pub type DBConn = sqlx::sqlite::SqlitePool; + +#[cfg(feature = "mysql")] +pub type DBConn = sqlx::mysql::MySqlPool; + +pub async fn db_conn() -> DBConn { + let database_url = std::env::var("DATABASE_URL").unwrap_or("/tmp/test_examples.db".to_string()); + println!("Using Database {}", database_url); + #[cfg(feature = "sqlite")] + { + sqlx::sqlite::SqlitePoolOptions::new() + .connect(&database_url) + .await + .unwrap() + } + + #[cfg(feature = "postgres")] + { + sqlx::postgres::PgPoolOptions::new() + .connect(&database_url) + .await + .unwrap() + } + + #[cfg(feature = "mysql")] + { + sqlx::mysql::MySqlConnectOptions::new() + .connect(&database_url) + .await + .unwrap() + } +} diff --git a/src/infra/mod.rs b/src/infra/mod.rs new file mode 100644 index 0000000..efedb25 --- /dev/null +++ b/src/infra/mod.rs @@ -0,0 +1,2 @@ +mod db; +pub use db::*; diff --git a/src/lib.rs b/src/lib.rs index c24394b..41971fc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,4 +5,5 @@ pub mod container; pub mod core; pub mod diesel_impl; pub mod entity; +pub mod infra; pub mod users; diff --git a/src/users/mod.rs b/src/users/mod.rs index 896897f..a4fb66b 100644 --- a/src/users/mod.rs +++ b/src/users/mod.rs @@ -1,5 +1,7 @@ mod entity; mod services; +mod user_repo; pub use entity::*; pub use services::*; +pub use user_repo::*; diff --git a/src/users/user_repo.rs b/src/users/user_repo.rs new file mode 100644 index 0000000..5ce0459 --- /dev/null +++ b/src/users/user_repo.rs @@ -0,0 +1,69 @@ +use async_trait::async_trait; + +use crate::core::{QueryParams, RepoResult, ResultPaging}; + +use super::entity::{User, UserRepo, UserUpdate}; + +pub struct UserSqlxRepoImpl { + pool: crate::infra::DBConn, +} + +impl UserSqlxRepoImpl { + pub fn new(pool: crate::infra::DBConn) -> Self { + Self { pool } + } +} + +#[async_trait] +impl UserRepo for UserSqlxRepoImpl { + async fn get_all(&self, params: &dyn QueryParams) -> RepoResult> { + let count = sqlx::query!("SELECT COUNT(*) AS count FROM users") + .fetch_one(&self.pool.clone()) + .await + .unwrap() + .count; + let limit = params.limit(); + let offset = params.offset(); + let users = sqlx::query_as!( + User, + r#"SELECT * FROM users ORDER BY id LIMIT ? OFFSET ?"#, + limit, + offset, + ) + .fetch_all(&self.pool.clone()) + .await + .unwrap(); + return Ok(ResultPaging { + total: count as i64, + items: users, + }); + } + + async fn find(&self, user_id: &str) -> RepoResult { + let user = sqlx::query_as!(User, "SELECT * FROM users WHERE id = ?", user_id) + .fetch_one(&self.pool.clone()) + .await + .unwrap(); + Ok(user) + } + + async fn find_by_email(&self, email: &str) -> RepoResult { + let user = sqlx::query_as!(User, "SELECT * FROM users WHERE email = ?", email) + .fetch_one(&self.pool.clone()) + .await + .unwrap(); + return Ok(user); + } + + async fn create(&self, user: &User) -> RepoResult { + panic!("impl") + } + + async fn update(&self, id: &str, update_user: &UserUpdate) -> RepoResult { + panic!("impl") + } + + async fn delete(&self, user_id: &str) -> RepoResult<()> { + panic!("impl") + } +} From a225deed3f2e42f546f70e71d2336f6c57372c81 Mon Sep 17 00:00:00 2001 From: Hien Pham Date: Sat, 23 Mar 2024 15:26:26 +0700 Subject: [PATCH 2/2] update user_repo --- src/users/user_repo.rs | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/src/users/user_repo.rs b/src/users/user_repo.rs index 5ce0459..cbccc93 100644 --- a/src/users/user_repo.rs +++ b/src/users/user_repo.rs @@ -1,4 +1,5 @@ use async_trait::async_trait; +use tokio::join; use crate::core::{QueryParams, RepoResult, ResultPaging}; @@ -17,25 +18,23 @@ impl UserSqlxRepoImpl { #[async_trait] impl UserRepo for UserSqlxRepoImpl { async fn get_all(&self, params: &dyn QueryParams) -> RepoResult> { - let count = sqlx::query!("SELECT COUNT(*) AS count FROM users") - .fetch_one(&self.pool.clone()) - .await - .unwrap() - .count; + let pool = self.pool.clone(); + let count_fut = sqlx::query!("SELECT COUNT(*) AS count FROM users").fetch_one(&pool); + let limit = params.limit(); let offset = params.offset(); - let users = sqlx::query_as!( + let users_fut = sqlx::query_as!( User, r#"SELECT * FROM users ORDER BY id LIMIT ? OFFSET ?"#, limit, offset, ) - .fetch_all(&self.pool.clone()) - .await - .unwrap(); + .fetch_all(&pool); + let (count, users) = join!(count_fut, users_fut); + return Ok(ResultPaging { - total: count as i64, - items: users, + total: count.unwrap().count as i64, + items: users.unwrap(), }); } @@ -64,6 +63,10 @@ impl UserRepo for UserSqlxRepoImpl { } async fn delete(&self, user_id: &str) -> RepoResult<()> { - panic!("impl") + sqlx::query!("DELETE FROM users WHERE id = ?", user_id) + .execute(&self.pool.clone()) + .await + .unwrap(); + Ok(()) } }