Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feat: wrapper connection implementation #400

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -45,6 +45,7 @@ diesel = { git = "https://github.com/juspay/diesel.git", branch = "dynamic-schem
"chrono",
"uuid",
"postgres_backend",
"i-implement-a-third-party-backend-and-opt-into-breaking-changes"
] }
fred = { version = "9.2.1" }
futures-util = "0.3.28"
19 changes: 6 additions & 13 deletions crates/context_aware_config/src/api/default_config/handlers.rs
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@ use diesel::{Connection, ExpressionMethods, QueryDsl, RunQueryDsl, SelectableHel
use jsonschema::{Draft, JSONSchema, ValidationError};
use serde_json::Value;
use service_utils::{
db::types::ConnectionImpl,
helpers::{parse_config_tags, validation_err_to_str},
service::types::{AppHeader, AppState, CustomHeaders, DbConnection, SchemaName},
};
@@ -317,38 +318,30 @@ fn fetch_default_key(

#[get("")]
async fn get(
db_conn: DbConnection,
mut db_conn: ConnectionImpl,
filters: Query<PaginationParams>,
schema_name: SchemaName,
) -> superposition::Result<Json<PaginatedResponse<DefaultConfig>>> {
let DbConnection(mut conn) = db_conn;

if let Some(true) = filters.all {
let result: Vec<DefaultConfig> = dsl::default_configs
.schema_name(&schema_name)
.get_results(&mut conn)?;
let result: Vec<DefaultConfig> =
dsl::default_configs.get_results(&mut db_conn)?;
return Ok(Json(PaginatedResponse {
total_pages: 1,
total_items: result.len() as i64,
data: result,
}));
}

let n_default_configs: i64 = dsl::default_configs
.count()
.schema_name(&schema_name)
.get_result(&mut conn)?;
let n_default_configs: i64 = dsl::default_configs.count().get_result(&mut db_conn)?;
let limit = filters.count.unwrap_or(10);
let mut builder = dsl::default_configs
.order(dsl::created_at.desc())
.limit(limit)
.schema_name(&schema_name)
.into_boxed();
if let Some(page) = filters.page {
let offset = (page - 1) * limit;
builder = builder.offset(offset);
}
let result: Vec<DefaultConfig> = builder.load(&mut conn)?;
let result: Vec<DefaultConfig> = builder.load(&mut db_conn)?;
let total_pages = (n_default_configs as f64 / limit as f64).ceil() as i64;
Ok(Json(PaginatedResponse {
total_pages,
1 change: 1 addition & 0 deletions crates/service_utils/src/db.rs
Original file line number Diff line number Diff line change
@@ -4,5 +4,6 @@ use diesel::{
};

pub mod utils;
pub mod types;

pub type PgSchemaConnectionPool = Pool<ConnectionManager<PgConnection>>;
219 changes: 219 additions & 0 deletions crates/service_utils/src/db/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
use actix_web::web::Data;
use actix_web::{FromRequest, HttpMessage};
use derive_more::{Deref, DerefMut};
use diesel::connection::{
AnsiTransactionManager, Connection, ConnectionSealed, DefaultLoadingMode,
LoadConnection, SimpleConnection, TransactionManager,
};
use diesel::pg::{Pg, PgQueryBuilder};
use diesel::r2d2::{ConnectionManager, PooledConnection};
use diesel::PgConnection;
use diesel::RunQueryDsl;

use crate::service::types::{AppState, SchemaName};

pub struct TransactionManagerImpl;

impl TransactionManager<ConnectionImpl> for TransactionManagerImpl {
type TransactionStateData = <AnsiTransactionManager as TransactionManager<
PgConnection,
>>::TransactionStateData;

fn begin_transaction(conn: &mut ConnectionImpl) -> diesel::prelude::QueryResult<()> {
AnsiTransactionManager::begin_transaction(&mut *conn.conn)?;
let result = diesel::sql_query("SELECT set_config('search_path', $1, true)")
.bind::<diesel::sql_types::Text, _>(&conn.namespace)
.execute(&mut *conn.conn)?;
log::info!("{:?}", result);
Ok(())
}

fn rollback_transaction(
conn: &mut ConnectionImpl,
) -> diesel::prelude::QueryResult<()> {
AnsiTransactionManager::rollback_transaction(&mut *conn.conn)
}

fn commit_transaction(conn: &mut ConnectionImpl) -> diesel::prelude::QueryResult<()> {
AnsiTransactionManager::commit_transaction(&mut *conn.conn)
}

fn transaction_manager_status_mut(
conn: &mut ConnectionImpl,
) -> &mut diesel::connection::TransactionManagerStatus {
AnsiTransactionManager::transaction_manager_status_mut(&mut *conn.conn)
}
}

pub struct ConnectionImpl {
namespace: String,
conn: PooledConnection<ConnectionManager<PgConnection>>,
}

impl ConnectionImpl {
pub fn new(
namespace: String,
mut conn: PooledConnection<ConnectionManager<PgConnection>>,
) -> Self {
conn.set_prepared_statement_cache_size(diesel::connection::CacheSize::Disabled);
ConnectionImpl { namespace, conn }
}

pub fn set_namespace(&mut self, namespace: String) {
self.namespace = namespace;
}

pub fn from_request_override(
req: &actix_web::HttpRequest,
schema_name: String,
) -> Result<Self, actix_web::Error> {
let app_state = match req.app_data::<Data<AppState>>() {
Some(state) => state,
None => {
log::info!(
"DbConnection-FromRequest: Unable to get app_data from request"
);
return Err(actix_web::error::ErrorInternalServerError(""));
}
};

match app_state.db_pool.get() {
Ok(conn) => Ok(ConnectionImpl::new(schema_name, conn)),
Err(e) => {
log::info!("Unable to get db connection from pool, error: {e}");
Err(actix_web::error::ErrorInternalServerError(""))
}
}
}
}

impl ConnectionSealed for ConnectionImpl {}

impl SimpleConnection for ConnectionImpl {
fn batch_execute(&mut self, query: &str) -> diesel::prelude::QueryResult<()> {
self.conn.batch_execute(query)
}
}

impl Connection for ConnectionImpl {
type Backend = Pg;
type TransactionManager = TransactionManagerImpl;

// NOTE: this function will never be used, so namespace here doesn't matter
fn establish(database_url: &str) -> diesel::prelude::ConnectionResult<Self> {
let conn = PooledConnection::establish(database_url)?;
Ok(ConnectionImpl {
namespace: String::new(),
conn,
})
}

fn execute_returning_count<T>(
&mut self,
source: &T,
) -> diesel::prelude::QueryResult<usize>
where
T: diesel::query_builder::QueryFragment<Self::Backend>
+ diesel::query_builder::QueryId,
{
log::info!("{:?}", source.to_sql(&mut PgQueryBuilder::default(), &Pg));
self.transaction::<usize, diesel::result::Error, _>(|conn| {
(*conn.conn).execute_returning_count(source)
})
}

fn transaction_state(&mut self,) -> &mut<Self::TransactionManager as diesel::connection::TransactionManager<Self>>::TransactionStateData{
self.conn.transaction_state()
}

fn set_prepared_statement_cache_size(&mut self, size: diesel::connection::CacheSize) {
self.conn.set_prepared_statement_cache_size(size)
}

fn set_instrumentation(
&mut self,
instrumentation: impl diesel::connection::Instrumentation,
) {
self.conn.set_instrumentation(instrumentation)
}

fn instrumentation(&mut self) -> &mut dyn diesel::connection::Instrumentation {
self.conn.instrumentation()
}
}

impl LoadConnection<DefaultLoadingMode> for ConnectionImpl {
type Cursor<'conn, 'query> =
<PgConnection as LoadConnection<DefaultLoadingMode>>::Cursor<'conn, 'query>;
type Row<'conn, 'query> =
<PgConnection as LoadConnection<DefaultLoadingMode>>::Row<'conn, 'query>;

fn load<'conn, 'query, T>(
&'conn mut self,
source: T,
) -> diesel::prelude::QueryResult<Self::Cursor<'conn, 'query>>
where
T: diesel::query_builder::Query
+ diesel::query_builder::QueryFragment<Self::Backend>
+ diesel::query_builder::QueryId
+ 'query,
Self::Backend: diesel::expression::QueryMetadata<T::SqlType>,
{
self.transaction::<Self::Cursor<'conn, 'query>, diesel::result::Error, _>(
|conn| {
log::info!("{:?}", source.to_sql(&mut PgQueryBuilder::default(), &Pg));
<PgConnection as LoadConnection<DefaultLoadingMode>>::load::<T>(
&mut *conn.conn,
source,
)
},
)
}
}

impl FromRequest for ConnectionImpl {
type Error = actix_web::Error;
type Future = std::future::Ready<Result<ConnectionImpl, Self::Error>>;

fn from_request(
req: &actix_web::HttpRequest,
_: &mut actix_web::dev::Payload,
) -> Self::Future {
let schema_name = req.extensions().get::<SchemaName>().cloned().unwrap().0;
std::future::ready(ConnectionImpl::from_request_override(req, schema_name))
}
}

#[derive(Deref, DerefMut)]
pub struct PublicConnection(pub ConnectionImpl);
impl FromRequest for PublicConnection {
type Error = actix_web::Error;
type Future = std::future::Ready<Result<PublicConnection, Self::Error>>;

fn from_request(
req: &actix_web::HttpRequest,
_: &mut actix_web::dev::Payload,
) -> Self::Future {
std::future::ready(
ConnectionImpl::from_request_override(req, String::from("public"))
.map(|conn| PublicConnection(conn)),
)
}
}

#[derive(Deref, DerefMut)]
pub struct SuperpositionConnection(pub ConnectionImpl);
impl FromRequest for SuperpositionConnection {
type Error = actix_web::Error;
type Future = std::future::Ready<Result<SuperpositionConnection, Self::Error>>;

fn from_request(
req: &actix_web::HttpRequest,
_: &mut actix_web::dev::Payload,
) -> Self::Future {
std::future::ready(
ConnectionImpl::from_request_override(req, String::from("superposition"))
.map(|conn| SuperpositionConnection(conn)),
)
}
}
3 changes: 0 additions & 3 deletions crates/service_utils/src/service/types.rs
Original file line number Diff line number Diff line change
@@ -171,7 +171,6 @@ pub struct DbConnection(pub PooledConnection<ConnectionManager<PgConnection>>);
impl FromRequest for DbConnection {
type Error = Error;
type Future = Ready<Result<DbConnection, Self::Error>>;

fn from_request(
req: &actix_web::HttpRequest,
_: &mut actix_web::dev::Payload,
@@ -185,15 +184,13 @@ impl FromRequest for DbConnection {
return ready(Err(error::ErrorInternalServerError("")));
}
};

let result = match app_state.db_pool.get() {
Ok(conn) => Ok(DbConnection(conn)),
Err(e) => {
log::info!("Unable to get db connection from pool, error: {e}");
Err(error::ErrorInternalServerError(""))
}
};

ready(result)
}
}
Loading