Skip to content

feat(tests): Add integration tests for pg_replicate #121

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 34 commits into from
May 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
fc8b8b0
feat(tests): Add integration tests for pg_replicate
iambriccardo May 19, 2025
52b3694
Implement tokio facilities
iambriccardo May 19, 2025
0e32a1b
Use options in pg_replicate
iambriccardo May 19, 2025
97c066f
Add test facilities
iambriccardo May 19, 2025
467aab1
Improve
iambriccardo May 19, 2025
bbcfdbd
Improve
iambriccardo May 20, 2025
4dde136
Improve
iambriccardo May 20, 2025
2fe00aa
Improve
iambriccardo May 20, 2025
1e7c9c7
Fix
iambriccardo May 20, 2025
63cd678
Add facilities
iambriccardo May 20, 2025
c526103
Improve
iambriccardo May 20, 2025
339ea67
Improve
iambriccardo May 20, 2025
b684034
Fix
iambriccardo May 20, 2025
2689ee0
Implement wait mechanism
iambriccardo May 20, 2025
cd130f1
Improve
iambriccardo May 21, 2025
cfdc826
Improve
iambriccardo May 21, 2025
a74bdc7
Remove code
iambriccardo May 21, 2025
e399c3c
Reformat
iambriccardo May 21, 2025
6ff3316
Improve
iambriccardo May 21, 2025
ee7ce40
Update action
iambriccardo May 21, 2025
16a328a
Improve
iambriccardo May 21, 2025
ca0ac1b
Setup wal level in ci
iambriccardo May 21, 2025
b358b4e
Fix
iambriccardo May 21, 2025
f6a7d3a
Fix
iambriccardo May 21, 2025
c99c451
Trying to fix
iambriccardo May 21, 2025
b3977aa
Improve
iambriccardo May 21, 2025
372e6e7
Improve
iambriccardo May 21, 2025
98857d5
Improve
iambriccardo May 21, 2025
e997309
Improve
iambriccardo May 21, 2025
0d35430
Improve
iambriccardo May 21, 2025
12faaec
fix: clippy warnings
imor May 21, 2025
c7cb595
Fix PR comments
iambriccardo May 21, 2025
6bd16bd
Make queries lowercase
iambriccardo May 21, 2025
7068ec0
Improve
iambriccardo May 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions .github/workflows/general.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# copied from Luca Palmieri's gist here: https://gist.github.com/LukeMathWalker/5ae1107432ce283310c3e601fac915f3
name: Rust

on: [push, pull_request]
Expand Down Expand Up @@ -29,7 +28,6 @@ jobs:
- uses: dtolnay/rust-toolchain@stable
with:
components: clippy
# - uses: Swatinem/rust-cache@v2
- name: Linting
run: cargo clippy -- -D warnings

Expand All @@ -51,28 +49,51 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v3

- name: Wait for Postgres to be ready
run: |
until pg_isready -h localhost -p 5430; do
echo "Waiting for Postgres..."
sleep 1
done

- name: Enable logical WAL
run: |
PGPASSWORD=postgres psql -h localhost -p 5430 -U postgres -c "ALTER SYSTEM SET wal_level = 'logical';"

- name: Restart Postgres service container
run: |
docker restart ${{ job.services.postgres.id }}

- name: Install sqlx-cli
run: cargo install sqlx-cli
--features native-tls,postgres
--no-default-features
--locked
run: |
cargo install sqlx-cli \
--features native-tls,postgres \
--no-default-features \
--locked

- name: Migrate database
run: |
sudo apt-get install libpq-dev -y
cd api
SKIP_DOCKER=true ./scripts/init_db.sh

- name: Install cargo-llvm-cov
uses: taiki-e/install-action@cargo-llvm-cov

- name: Generate code coverage
id: coverage
run: |
cargo llvm-cov test --workspace --no-fail-fast --lcov --output-path lcov.info
cargo llvm-cov test \
--workspace --no-fail-fast \
--lcov --output-path lcov.info

- name: Coveralls upload
uses: coverallsapp/github-action@v2
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
path-to-lcov: lcov.info
debug: true

docker:
name: Docker
runs-on: ubuntu-latest
Expand All @@ -96,4 +117,4 @@ jobs:
with:
file: ./replicator/Dockerfile
push: true
tags: ${{ vars.DOCKERHUB_USERNAME }}/replicator:${{ github.head_ref || github.ref_name }}.${{ github.sha }}
tags: ${{ vars.DOCKERHUB_USERNAME }}/replicator:${{ github.head_ref || github.ref_name }}.${{ github.sha }}
16 changes: 13 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
[workspace]

resolver = "2"

members = ["api", "pg_replicate", "replicator", "telemetry"]
members = [
"api",
"pg_replicate",
"postgres",
"replicator",
"telemetry"
]

[workspace.dependencies]
api = { path = "api" }
pg_replicate = { path = "pg_replicate" }
postgres = { path = "postgres" }
replicator = { path = "replicator" }
telemetry = { path = "telemetry" }

actix-web = { version = "4", default-features = false }
actix-web-httpauth = { version = "0.8.2", default-features = false }
anyhow = { version = "1.0", default-features = false }
Expand Down
9 changes: 6 additions & 3 deletions api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ path = "src/main.rs"
name = "api"

[dependencies]
postgres = { workspace = true, features = ["sqlx"] }
telemetry = { workspace = true }

actix-web = { workspace = true, features = ["macros", "http2"] }
actix-web-httpauth = { workspace = true }
anyhow = { workspace = true, features = ["std"] }
async-trait = { workspace = true }
aws-lc-rs = { workspace = true, features = ["alloc", "aws-lc-sys"] }
base64 = { workspace = true, features = ["std"] }
bytes = { workspace = true }
config = { workspace = true, features = ["yaml"] }
constant_time_eq = { workspace = true }
k8s-openapi = { workspace = true, features = ["latest"] }
Expand All @@ -30,7 +32,6 @@ kube = { workspace = true, features = [
pg_escape = { workspace = true }
rand = { workspace = true, features = ["std"] }
reqwest = { workspace = true, features = ["json"] }
secrecy = { workspace = true, features = ["serde", "alloc"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true, features = ["std"] }
sqlx = { workspace = true, features = [
Expand All @@ -41,10 +42,12 @@ sqlx = { workspace = true, features = [
"migrate",
] }
thiserror = { workspace = true }
telemetry = { path = "../telemetry" }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
tracing = { workspace = true, default-features = false }
tracing-actix-web = { workspace = true, features = ["emit_event_on_error"] }
utoipa = { workspace = true, features = ["actix_extras"] }
utoipa-swagger-ui = { workspace = true, features = ["actix-web", "reqwest"] }
uuid = { version = "1.10.0", features = ["v4"] }

[dev-dependencies]
postgres = { workspace = true, features = ["test-utils", "sqlx"] }
2 changes: 1 addition & 1 deletion api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Before you begin, ensure you have the following installed:
## Database Management

### Initial Setup
To set up and initialize the database, run the following command from the `api` directory:
To set up and initialize the database, run the following command from the main directory:

```bash
./scripts/init_db.sh
Expand Down
52 changes: 2 additions & 50 deletions api/src/configuration.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use std::fmt::{self, Display};

use base64::{prelude::BASE64_STANDARD, Engine};
use secrecy::{ExposeSecret, Secret};
use postgres::sqlx::options::PgDatabaseOptions;
use serde::{
de::{self, MapAccess, Unexpected, Visitor},
Deserialize, Deserializer,
};
use sqlx::postgres::{PgConnectOptions, PgSslMode};
use thiserror::Error;

#[derive(serde::Deserialize, Clone)]
Expand Down Expand Up @@ -100,59 +99,12 @@ impl<'de> Deserialize<'de> for ApiKey {

#[derive(serde::Deserialize, Clone)]
pub struct Settings {
pub database: DatabaseSettings,
pub database: PgDatabaseOptions,
pub application: ApplicationSettings,
pub encryption_key: EncryptionKey,
pub api_key: String,
}

#[derive(serde::Deserialize, Clone)]
pub struct DatabaseSettings {
/// Host on which Postgres is running
pub host: String,

/// Port on which Postgres is running
pub port: u16,

/// Postgres database name
pub name: String,

/// Postgres database user name
pub username: String,

/// Postgres database user password
pub password: Option<Secret<String>>,

/// Whether to enable ssl or not
pub require_ssl: bool,
}

impl DatabaseSettings {
pub fn without_db(&self) -> PgConnectOptions {
let ssl_mode = if self.require_ssl {
PgSslMode::Require
} else {
PgSslMode::Prefer
};

let options = PgConnectOptions::new_without_pgpass()
.host(&self.host)
.username(&self.username)
.port(self.port)
.ssl_mode(ssl_mode);

if let Some(password) = &self.password {
options.password(password.expose_secret())
} else {
options
}
}

pub fn with_db(&self) -> PgConnectOptions {
self.without_db().database(&self.name)
}
}

#[derive(serde::Deserialize, Clone)]
pub struct ApplicationSettings {
/// host the api listens on
Expand Down
25 changes: 13 additions & 12 deletions api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ use std::env;

use anyhow::anyhow;
use api::{
configuration::{get_settings, DatabaseSettings, Settings},
configuration::{get_settings, Settings},
startup::Application,
};
use postgres::sqlx::options::PgDatabaseOptions;
use telemetry::init_tracing;
use tracing::{error, info};

Expand All @@ -23,7 +24,7 @@ pub async fn main() -> anyhow::Result<()> {
// Run the application server
1 => {
let configuration = get_settings::<'_, Settings>()?;
log_database_settings(&configuration.database);
log_pg_database_options(&configuration.database);
let application = Application::build(configuration.clone()).await?;
application.run_until_stopped().await?;
}
Expand All @@ -32,9 +33,9 @@ pub async fn main() -> anyhow::Result<()> {
let command = args.nth(1).unwrap();
match command.as_str() {
"migrate" => {
let configuration = get_settings::<'_, DatabaseSettings>()?;
log_database_settings(&configuration);
Application::migrate_database(configuration).await?;
let options = get_settings::<'_, PgDatabaseOptions>()?;
log_pg_database_options(&options);
Application::migrate_database(options).await?;
info!("database migrated successfully");
}
_ => {
Expand All @@ -54,13 +55,13 @@ pub async fn main() -> anyhow::Result<()> {
Ok(())
}

fn log_database_settings(settings: &DatabaseSettings) {
fn log_pg_database_options(options: &PgDatabaseOptions) {
info!(
host = settings.host,
port = settings.port,
dbname = settings.name,
username = settings.username,
require_ssl = settings.require_ssl,
"database details",
host = options.host,
port = options.port,
dbname = options.name,
username = options.username,
require_ssl = options.require_ssl,
"pg database options",
);
}
13 changes: 6 additions & 7 deletions api/src/startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ use actix_web::{dev::Server, web, App, HttpServer};
use actix_web_httpauth::middleware::HttpAuthentication;
use aws_lc_rs::aead::{RandomizedNonceKey, AES_256_GCM};
use base64::{prelude::BASE64_STANDARD, Engine};
use postgres::sqlx::options::PgDatabaseOptions;
use sqlx::{postgres::PgPoolOptions, PgPool};
use tracing_actix_web::TracingLogger;
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;

use crate::{
authentication::auth_validator,
configuration::{DatabaseSettings, Settings},
configuration::Settings,
db::publications::Publication,
encryption,
k8s_client::HttpK8sClient,
Expand Down Expand Up @@ -90,10 +91,8 @@ impl Application {
Ok(Self { port, server })
}

pub async fn migrate_database(
database_settings: DatabaseSettings,
) -> Result<(), anyhow::Error> {
let connection_pool = get_connection_pool(&database_settings);
pub async fn migrate_database(options: PgDatabaseOptions) -> Result<(), anyhow::Error> {
let connection_pool = get_connection_pool(&options);

sqlx::migrate!("./migrations").run(&connection_pool).await?;

Expand All @@ -109,8 +108,8 @@ impl Application {
}
}

pub fn get_connection_pool(configuration: &DatabaseSettings) -> PgPool {
PgPoolOptions::new().connect_lazy_with(configuration.with_db())
pub fn get_connection_pool(options: &PgDatabaseOptions) -> PgPool {
PgPoolOptions::new().connect_lazy_with(options.with_db())
}

// HttpK8sClient is wrapped in an option because creating it
Expand Down
53 changes: 11 additions & 42 deletions api/tests/common/database.rs
Original file line number Diff line number Diff line change
@@ -1,51 +1,20 @@
use api::configuration::DatabaseSettings;
use sqlx::{Connection, Executor, PgConnection, PgPool};
use postgres::sqlx::options::PgDatabaseOptions;
use postgres::sqlx::test_utils::create_pg_database;
use sqlx::PgPool;

pub async fn create_and_configure_database(settings: &DatabaseSettings) -> PgPool {
// Create the database via a single connection.
let mut connection = PgConnection::connect_with(&settings.without_db())
.await
.expect("Failed to connect to Postgres");
connection
.execute(&*format!(r#"CREATE DATABASE "{}";"#, settings.name))
.await
.expect("Failed to create database");
/// Creates and configures a new PostgreSQL database for the API.
///
/// Similar to [`create_pg_database`], but additionally runs all database migrations
/// from the "./migrations" directory after creation. Returns a [`PgPool`]
/// connected to the newly created and migrated database. Panics if database
/// creation or migration fails.
Comment on lines +6 to +10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move panic docs to the Panics section

Suggested change
///
/// Similar to [`create_pg_database`], but additionally runs all database migrations
/// from the "./migrations" directory after creation. Returns a [`PgPool`]
/// connected to the newly created and migrated database. Panics if database
/// creation or migration fails.
///
/// Similar to [`create_pg_database`], but additionally runs all database migrations
/// from the "./migrations" directory after creation. Returns a [`PgPool`]
/// connected to the newly created and migrated database.
///
/// # Panics
///
/// Panics if database creation or migration fails.

pub async fn create_pg_replicate_api_database(options: &PgDatabaseOptions) -> PgPool {
let connection_pool = create_pg_database(&options).await;

// Create a connection pool to the database and run the migration.
let connection_pool = PgPool::connect_with(settings.with_db())
.await
.expect("Failed to connect to Postgres");
sqlx::migrate!("./migrations")
.run(&connection_pool)
.await
.expect("Failed to migrate the database");

connection_pool
}

pub async fn destroy_database(settings: &DatabaseSettings) {
// Connect to the default database.
let mut connection = PgConnection::connect_with(&settings.without_db())
.await
.expect("Failed to connect to Postgres");

// Forcefully terminate any remaining connections to the database. This code assumes that those
// connections are not used anymore and do not outlive the `TestApp` instance.
connection
.execute(&*format!(
r#"
SELECT pg_terminate_backend(pg_stat_activity.pid)
FROM pg_stat_activity
WHERE pg_stat_activity.datname = '{}'
AND pid <> pg_backend_pid();"#,
settings.name
))
.await
.expect("Failed to terminate database connections");

// Drop the database.
connection
.execute(&*format!(r#"DROP DATABASE IF EXISTS "{}";"#, settings.name))
.await
.expect("Failed to destroy database");
}
1 change: 0 additions & 1 deletion api/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,5 @@
//! - Provide connection pools for tests
//!
//! These utilities help maintain consistency across tests and reduce code duplication.

pub mod database;
pub mod test_app;
Loading