From 7a4dcd9792256479cb85cc1ab2afaa75b0af1f84 Mon Sep 17 00:00:00 2001 From: imbolc Date: Mon, 11 Jul 2022 22:06:41 +0300 Subject: [PATCH 01/10] Not working `signal-shutdown` example --- Cargo.toml | 2 ++ examples/signal-shutdown.rs | 49 +++++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+) create mode 100644 examples/signal-shutdown.rs diff --git a/Cargo.toml b/Cargo.toml index d9d40a7..44df75a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,3 +35,5 @@ runtime-tokio-rustls = ["sqlx/runtime-tokio-rustls"] dotenv = "0.15.0" pretty_env_logger = "0.4.0" futures = "0.3.13" +signal-hook = "0.3" +tokio = { version = "1", features = ["full"] } diff --git a/examples/signal-shutdown.rs b/examples/signal-shutdown.rs new file mode 100644 index 0000000..daab00f --- /dev/null +++ b/examples/signal-shutdown.rs @@ -0,0 +1,49 @@ +#[tokio::main] +async fn main() -> Result<(), Box> { + dotenv::dotenv().ok(); + let db = sqlx::PgPool::connect(&std::env::var("DATABASE_URL").unwrap()).await?; + + sleep.builder().set_json(&10)?.spawn(&db).await?; + + let handle = sqlxmq::JobRegistry::new(&[sleep]).runner(&db).run().await?; + + println!("Press Ctrl+C to send a stop signal"); + wait_signal()?; + println!("Got a stop signal, waiting for jobs to finish ..."); + + // this would just instantly kill the job + // handle.stop().await; + + // this waits for the job to finish, but hangs forever afterwards + handle.into_inner().await?; + + Ok(()) +} + +fn wait_signal() -> std::io::Result<()> { + use signal_hook::consts::signal::*; + use signal_hook::iterator::Signals; + + let mut signals = Signals::new(&[SIGTERM, SIGINT, SIGQUIT])?; + let handle = signals.handle(); + for signal in signals.forever() { + match signal { + SIGTERM | SIGINT | SIGQUIT => break, + _ => (), + } + } + handle.close(); + Ok(()) +} + +#[sqlxmq::job] +pub async fn sleep(mut job: sqlxmq::CurrentJob) -> sqlx::Result<()> { + let second = std::time::Duration::from_secs(1); + let mut to_sleep: u64 = job.json().unwrap().unwrap(); + while to_sleep > 0 { + println!("job#{} {to_sleep} more seconds to sleep ...", job.id()); + tokio::time::sleep(second).await; + to_sleep -= 1; + } + job.complete().await +} From 153e353db66be1fedf076c8782d00e75b176924d Mon Sep 17 00:00:00 2001 From: imbolc Date: Tue, 12 Jul 2022 15:27:38 +0300 Subject: [PATCH 02/10] Wait for running jobs to finish --- src/runner.rs | 42 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 36 insertions(+), 6 deletions(-) diff --git a/src/runner.rs b/src/runner.rs index a7cc953..c3059c5 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -2,7 +2,7 @@ use std::borrow::Cow; use std::fmt::Debug; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use serde::{Deserialize, Serialize}; use sqlx::postgres::types::PgInterval; @@ -32,6 +32,12 @@ struct JobRunner { notify: Notify, } +/// Job runner handle +pub struct JobRunnerHandle { + runner: Arc, + handle: Option, +} + /// Type used to checkpoint a running job. #[derive(Debug, Clone, Default)] pub struct Checkpoint<'a> { @@ -253,7 +259,7 @@ impl JobRunnerOptions { /// Start the job runner in the background. The job runner will stop when the /// returned handle is dropped. - pub async fn run(&self) -> Result { + pub async fn run(&self) -> Result { let options = self.clone(); let job_runner = Arc::new(JobRunner { options, @@ -261,10 +267,11 @@ impl JobRunnerOptions { notify: Notify::new(), }); let listener_task = start_listener(job_runner.clone()).await?; - Ok(OwnedHandle::new(task::spawn(main_loop( - job_runner, - listener_task, - )))) + let handle = OwnedHandle::new(task::spawn(main_loop(job_runner.clone(), listener_task))); + Ok(JobRunnerHandle { + runner: job_runner, + handle: Some(handle), + }) } /// Run a single job and then return. Intended for use by tests. The job should @@ -320,6 +327,29 @@ impl JobRunnerOptions { } } +impl JobRunnerHandle { + /// Return the number of still running jobs + pub fn num_running_jobs(&self) -> usize { + self.runner.running_jobs.load(Ordering::Relaxed) + } + + /// Wait for the jobs to finish, but not more than `timeout` + pub async fn wait_jobs_finish(&self, timeout: Duration) { + let start = Instant::now(); + let step = Duration::from_millis(1); + while self.num_running_jobs() > 0 && start.elapsed() < timeout { + tokio::time::sleep(step).await; + } + } + + /// Stop the inner task and wait for it to finish. + pub async fn stop(&mut self) { + if let Some(handle) = self.handle.take() { + handle.stop().await + } + } +} + async fn start_listener(job_runner: Arc) -> Result { let mut listener = PgListener::connect_with(&job_runner.options.pool).await?; if let Some(channels) = &job_runner.options.channel_names { From eedd8b6626bf8d62c99d6fdbaafe9ce3c050b300 Mon Sep 17 00:00:00 2001 From: imbolc Date: Tue, 12 Jul 2022 15:27:58 +0300 Subject: [PATCH 03/10] Update example --- Cargo.toml | 1 - examples/graceful-shutdown.rs | 37 +++++++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) create mode 100644 examples/graceful-shutdown.rs diff --git a/Cargo.toml b/Cargo.toml index 44df75a..855f732 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,5 +35,4 @@ runtime-tokio-rustls = ["sqlx/runtime-tokio-rustls"] dotenv = "0.15.0" pretty_env_logger = "0.4.0" futures = "0.3.13" -signal-hook = "0.3" tokio = { version = "1", features = ["full"] } diff --git a/examples/graceful-shutdown.rs b/examples/graceful-shutdown.rs new file mode 100644 index 0000000..1af237e --- /dev/null +++ b/examples/graceful-shutdown.rs @@ -0,0 +1,37 @@ +use sqlxmq::{job, CurrentJob, JobRegistry}; +use std::time::Duration; +use tokio::time; + +#[tokio::main] +async fn main() -> Result<(), Box> { + dotenv::dotenv().ok(); + let db = sqlx::PgPool::connect(&std::env::var("DATABASE_URL").unwrap()).await?; + + sleep.builder().set_json(&5u64)?.spawn(&db).await?; + + let mut handle = JobRegistry::new(&[sleep]).runner(&db).run().await?; + + // Let's emulate a stop signal in a couple of seconts after running the job + time::sleep(Duration::from_secs(2)).await; + println!("A stop signal received"); + + // Stop listening for new jobs + handle.stop().await; + + // Wait for the running jobs to stop for maximum 10 seconds + handle.wait_jobs_finish(Duration::from_secs(10)).await; + + Ok(()) +} + +#[job] +pub async fn sleep(mut job: CurrentJob) -> sqlx::Result<()> { + let second = Duration::from_secs(1); + let mut to_sleep: u64 = job.json().unwrap().unwrap(); + while to_sleep > 0 { + println!("job#{} {to_sleep} more seconds to sleep ...", job.id()); + time::sleep(second).await; + to_sleep -= 1; + } + job.complete().await +} From 34bfe624d9c29c72004cd26ed2bf8e7ced4b4e99 Mon Sep 17 00:00:00 2001 From: imbolc Date: Tue, 12 Jul 2022 15:29:22 +0300 Subject: [PATCH 04/10] Remove signal based example --- examples/signal-shutdown.rs | 49 ------------------------------------- 1 file changed, 49 deletions(-) delete mode 100644 examples/signal-shutdown.rs diff --git a/examples/signal-shutdown.rs b/examples/signal-shutdown.rs deleted file mode 100644 index daab00f..0000000 --- a/examples/signal-shutdown.rs +++ /dev/null @@ -1,49 +0,0 @@ -#[tokio::main] -async fn main() -> Result<(), Box> { - dotenv::dotenv().ok(); - let db = sqlx::PgPool::connect(&std::env::var("DATABASE_URL").unwrap()).await?; - - sleep.builder().set_json(&10)?.spawn(&db).await?; - - let handle = sqlxmq::JobRegistry::new(&[sleep]).runner(&db).run().await?; - - println!("Press Ctrl+C to send a stop signal"); - wait_signal()?; - println!("Got a stop signal, waiting for jobs to finish ..."); - - // this would just instantly kill the job - // handle.stop().await; - - // this waits for the job to finish, but hangs forever afterwards - handle.into_inner().await?; - - Ok(()) -} - -fn wait_signal() -> std::io::Result<()> { - use signal_hook::consts::signal::*; - use signal_hook::iterator::Signals; - - let mut signals = Signals::new(&[SIGTERM, SIGINT, SIGQUIT])?; - let handle = signals.handle(); - for signal in signals.forever() { - match signal { - SIGTERM | SIGINT | SIGQUIT => break, - _ => (), - } - } - handle.close(); - Ok(()) -} - -#[sqlxmq::job] -pub async fn sleep(mut job: sqlxmq::CurrentJob) -> sqlx::Result<()> { - let second = std::time::Duration::from_secs(1); - let mut to_sleep: u64 = job.json().unwrap().unwrap(); - while to_sleep > 0 { - println!("job#{} {to_sleep} more seconds to sleep ...", job.id()); - tokio::time::sleep(second).await; - to_sleep -= 1; - } - job.complete().await -} From 0afcb05d628fc82be4365814e057395681589b1d Mon Sep 17 00:00:00 2001 From: imbolc Date: Tue, 12 Jul 2022 15:47:30 +0300 Subject: [PATCH 05/10] Fix tests --- src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 39b7f4f..888ab23 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -317,7 +317,7 @@ mod tests { async fn test_job_runner( pool: &Pool, f: impl (Fn(CurrentJob) -> F) + Send + Sync + 'static, - ) -> (OwnedHandle, Arc) + ) -> (JobRunnerHandle, Arc) where F::Output: Send + 'static, { @@ -365,7 +365,7 @@ mod tests { Ok(()) } - async fn named_job_runner(pool: &Pool) -> OwnedHandle { + async fn named_job_runner(pool: &Pool) -> JobRunnerHandle { let mut registry = JobRegistry::new(&[example_job1, example_job2, example_job_with_ctx]); registry.set_context(42).set_context("Hello, world!"); registry.runner(pool).run().await.unwrap() From 1b75f787261e28e83ef5b9b1dad20adf6cd5b06d Mon Sep 17 00:00:00 2001 From: imbolc Date: Tue, 12 Jul 2022 16:34:31 +0300 Subject: [PATCH 06/10] Wait longer --- src/lib.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 39b7f4f..1f5bc36 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -578,8 +578,7 @@ mod tests { assert_eq!(counter.load(Ordering::SeqCst), 1); // Second attempt - pause_ms(backoff).await; - pause().await; + pause_ms(backoff + 1000).await; assert_eq!(counter.load(Ordering::SeqCst), 2); // No more attempts From e1821655baf1bfa38dbdf3d21a96c88b4b2b1824 Mon Sep 17 00:00:00 2001 From: imbolc Date: Tue, 12 Jul 2022 16:50:29 +0300 Subject: [PATCH 07/10] Identify github actions --- .github/workflows/toolchain.yml | 2 ++ src/lib.rs | 15 ++++++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/.github/workflows/toolchain.yml b/.github/workflows/toolchain.yml index bc46de3..6655ff6 100644 --- a/.github/workflows/toolchain.yml +++ b/.github/workflows/toolchain.yml @@ -54,6 +54,7 @@ jobs: runs-on: ubuntu-latest env: RUST_BACKTRACE: "1" + GITHUB_ACTIONS: "1" steps: - uses: actions/checkout@v2 - uses: actions-rs/toolchain@v1 @@ -76,6 +77,7 @@ jobs: runs-on: ubuntu-latest env: RUST_BACKTRACE: "1" + GITHUB_ACTIONS: "1" steps: - uses: actions/checkout@v2 - uses: actions-rs/toolchain@v1 diff --git a/src/lib.rs b/src/lib.rs index 1f5bc36..65ad8d0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -371,6 +371,10 @@ mod tests { registry.runner(pool).run().await.unwrap() } + fn is_github_actions() -> bool { + std::env::var("GITHUB_ACTIONS").ok().is_some() + } + async fn pause() { pause_ms(200).await; } @@ -389,6 +393,9 @@ mod tests { assert_eq!(counter.load(Ordering::SeqCst), 0); JobBuilder::new("foo").spawn(pool).await.unwrap(); pause().await; + if is_github_actions() { + pause_ms(1000).await; + } assert_eq!(counter.load(Ordering::SeqCst), 1); } pause().await; @@ -578,11 +585,17 @@ mod tests { assert_eq!(counter.load(Ordering::SeqCst), 1); // Second attempt - pause_ms(backoff + 1000).await; + pause_ms(backoff).await; + if is_github_actions() { + pause_ms(1000).await; + } assert_eq!(counter.load(Ordering::SeqCst), 2); // No more attempts pause_ms(backoff * 3).await; + if is_github_actions() { + pause_ms(1000).await; + } assert_eq!(counter.load(Ordering::SeqCst), 2); } pause().await; From e5a9cf4ad71be73d7fd50c6a90334a4af8fb3111 Mon Sep 17 00:00:00 2001 From: imbolc Date: Tue, 12 Jul 2022 17:20:13 +0300 Subject: [PATCH 08/10] Refactoring --- src/lib.rs | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 65ad8d0..d613c37 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -375,8 +375,16 @@ mod tests { std::env::var("GITHUB_ACTIONS").ok().is_some() } + fn default_pause() -> u64 { + if is_github_actions() { + 1000 + } else { + 200 + } + } + async fn pause() { - pause_ms(200).await; + pause_ms(default_pause()).await; } async fn pause_ms(ms: u64) { @@ -393,9 +401,6 @@ mod tests { assert_eq!(counter.load(Ordering::SeqCst), 0); JobBuilder::new("foo").spawn(pool).await.unwrap(); pause().await; - if is_github_actions() { - pause_ms(1000).await; - } assert_eq!(counter.load(Ordering::SeqCst), 1); } pause().await; @@ -520,7 +525,7 @@ mod tests { let pool = &*test_pool().await; let (_runner, counter) = test_job_runner(pool, move |_| async {}).await; - let backoff = 500; + let backoff = default_pause() + 300; assert_eq!(counter.load(Ordering::SeqCst), 0); JobBuilder::new("foo") @@ -568,7 +573,7 @@ mod tests { }) .await; - let backoff = 200; + let backoff = default_pause(); assert_eq!(counter.load(Ordering::SeqCst), 0); JobBuilder::new("foo") @@ -586,16 +591,10 @@ mod tests { // Second attempt pause_ms(backoff).await; - if is_github_actions() { - pause_ms(1000).await; - } assert_eq!(counter.load(Ordering::SeqCst), 2); // No more attempts pause_ms(backoff * 3).await; - if is_github_actions() { - pause_ms(1000).await; - } assert_eq!(counter.load(Ordering::SeqCst), 2); } pause().await; From 2197826ef021e0a811ada7e507a5893e312ca29a Mon Sep 17 00:00:00 2001 From: imbolc Date: Wed, 13 Jul 2022 16:49:48 +0300 Subject: [PATCH 09/10] CI env var --- .github/workflows/toolchain.yml | 2 -- src/lib.rs | 6 +++--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/.github/workflows/toolchain.yml b/.github/workflows/toolchain.yml index 6655ff6..bc46de3 100644 --- a/.github/workflows/toolchain.yml +++ b/.github/workflows/toolchain.yml @@ -54,7 +54,6 @@ jobs: runs-on: ubuntu-latest env: RUST_BACKTRACE: "1" - GITHUB_ACTIONS: "1" steps: - uses: actions/checkout@v2 - uses: actions-rs/toolchain@v1 @@ -77,7 +76,6 @@ jobs: runs-on: ubuntu-latest env: RUST_BACKTRACE: "1" - GITHUB_ACTIONS: "1" steps: - uses: actions/checkout@v2 - uses: actions-rs/toolchain@v1 diff --git a/src/lib.rs b/src/lib.rs index d613c37..1e79370 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -371,12 +371,12 @@ mod tests { registry.runner(pool).run().await.unwrap() } - fn is_github_actions() -> bool { - std::env::var("GITHUB_ACTIONS").ok().is_some() + fn is_ci() -> bool { + std::env::var("CI").ok().is_some() } fn default_pause() -> u64 { - if is_github_actions() { + if is_ci() { 1000 } else { 200 From 3e8c7b1bc761540c8e15b6936b5cf985b2d48a6e Mon Sep 17 00:00:00 2001 From: imbolc Date: Sun, 17 Jul 2022 00:45:29 +0300 Subject: [PATCH 10/10] Increace waiter step --- src/runner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/runner.rs b/src/runner.rs index c3059c5..fcd27ab 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -336,7 +336,7 @@ impl JobRunnerHandle { /// Wait for the jobs to finish, but not more than `timeout` pub async fn wait_jobs_finish(&self, timeout: Duration) { let start = Instant::now(); - let step = Duration::from_millis(1); + let step = Duration::from_millis(10); while self.num_running_jobs() > 0 && start.elapsed() < timeout { tokio::time::sleep(step).await; }