Skip to content

Commit

Permalink
Wait for running jobs to finish (#40)
Browse files Browse the repository at this point in the history
Wait for running jobs to finish
  • Loading branch information
imbolc authored Jul 16, 2022
1 parent 2fdabd9 commit 75e12ba
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 8 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ runtime-tokio-rustls = ["sqlx/runtime-tokio-rustls"]
dotenv = "0.15.0"
pretty_env_logger = "0.4.0"
futures = "0.3.13"
tokio = { version = "1", features = ["full"] }
37 changes: 37 additions & 0 deletions examples/graceful-shutdown.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use sqlxmq::{job, CurrentJob, JobRegistry};
use std::time::Duration;
use tokio::time;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
dotenv::dotenv().ok();
let db = sqlx::PgPool::connect(&std::env::var("DATABASE_URL").unwrap()).await?;

sleep.builder().set_json(&5u64)?.spawn(&db).await?;

let mut handle = JobRegistry::new(&[sleep]).runner(&db).run().await?;

// Let's emulate a stop signal in a couple of seconts after running the job
time::sleep(Duration::from_secs(2)).await;
println!("A stop signal received");

// Stop listening for new jobs
handle.stop().await;

// Wait for the running jobs to stop for maximum 10 seconds
handle.wait_jobs_finish(Duration::from_secs(10)).await;

Ok(())
}

#[job]
pub async fn sleep(mut job: CurrentJob) -> sqlx::Result<()> {
let second = Duration::from_secs(1);
let mut to_sleep: u64 = job.json().unwrap().unwrap();
while to_sleep > 0 {
println!("job#{} {to_sleep} more seconds to sleep ...", job.id());
time::sleep(second).await;
to_sleep -= 1;
}
job.complete().await
}
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ mod tests {
async fn test_job_runner<F: Future + Send + 'static>(
pool: &Pool<Postgres>,
f: impl (Fn(CurrentJob) -> F) + Send + Sync + 'static,
) -> (OwnedHandle, Arc<AtomicUsize>)
) -> (JobRunnerHandle, Arc<AtomicUsize>)
where
F::Output: Send + 'static,
{
Expand Down Expand Up @@ -365,7 +365,7 @@ mod tests {
Ok(())
}

async fn named_job_runner(pool: &Pool<Postgres>) -> OwnedHandle {
async fn named_job_runner(pool: &Pool<Postgres>) -> JobRunnerHandle {
let mut registry = JobRegistry::new(&[example_job1, example_job2, example_job_with_ctx]);
registry.set_context(42).set_context("Hello, world!");
registry.runner(pool).run().await.unwrap()
Expand Down
42 changes: 36 additions & 6 deletions src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::borrow::Cow;
use std::fmt::Debug;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};

use serde::{Deserialize, Serialize};
use sqlx::postgres::types::PgInterval;
Expand Down Expand Up @@ -32,6 +32,12 @@ struct JobRunner {
notify: Notify,
}

/// Job runner handle
pub struct JobRunnerHandle {
runner: Arc<JobRunner>,
handle: Option<OwnedHandle>,
}

/// Type used to checkpoint a running job.
#[derive(Debug, Clone, Default)]
pub struct Checkpoint<'a> {
Expand Down Expand Up @@ -253,18 +259,19 @@ impl JobRunnerOptions {

/// Start the job runner in the background. The job runner will stop when the
/// returned handle is dropped.
pub async fn run(&self) -> Result<OwnedHandle, sqlx::Error> {
pub async fn run(&self) -> Result<JobRunnerHandle, sqlx::Error> {
let options = self.clone();
let job_runner = Arc::new(JobRunner {
options,
running_jobs: AtomicUsize::new(0),
notify: Notify::new(),
});
let listener_task = start_listener(job_runner.clone()).await?;
Ok(OwnedHandle::new(task::spawn(main_loop(
job_runner,
listener_task,
))))
let handle = OwnedHandle::new(task::spawn(main_loop(job_runner.clone(), listener_task)));
Ok(JobRunnerHandle {
runner: job_runner,
handle: Some(handle),
})
}

/// Run a single job and then return. Intended for use by tests. The job should
Expand Down Expand Up @@ -320,6 +327,29 @@ impl JobRunnerOptions {
}
}

impl JobRunnerHandle {
/// Return the number of still running jobs
pub fn num_running_jobs(&self) -> usize {
self.runner.running_jobs.load(Ordering::Relaxed)
}

/// Wait for the jobs to finish, but not more than `timeout`
pub async fn wait_jobs_finish(&self, timeout: Duration) {
let start = Instant::now();
let step = Duration::from_millis(10);
while self.num_running_jobs() > 0 && start.elapsed() < timeout {
tokio::time::sleep(step).await;
}
}

/// Stop the inner task and wait for it to finish.
pub async fn stop(&mut self) {
if let Some(handle) = self.handle.take() {
handle.stop().await
}
}
}

async fn start_listener(job_runner: Arc<JobRunner>) -> Result<OwnedHandle, sqlx::Error> {
let mut listener = PgListener::connect_with(&job_runner.options.pool).await?;
if let Some(channels) = &job_runner.options.channel_names {
Expand Down

0 comments on commit 75e12ba

Please # to comment.