Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Wait for running jobs to finish #40

Merged
merged 13 commits into from
Jul 16, 2022
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ runtime-tokio-rustls = ["sqlx/runtime-tokio-rustls"]
dotenv = "0.15.0"
pretty_env_logger = "0.4.0"
futures = "0.3.13"
tokio = { version = "1", features = ["full"] }
37 changes: 37 additions & 0 deletions examples/graceful-shutdown.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use sqlxmq::{job, CurrentJob, JobRegistry};
use std::time::Duration;
use tokio::time;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
dotenv::dotenv().ok();
let db = sqlx::PgPool::connect(&std::env::var("DATABASE_URL").unwrap()).await?;

sleep.builder().set_json(&5u64)?.spawn(&db).await?;

let mut handle = JobRegistry::new(&[sleep]).runner(&db).run().await?;

// Let's emulate a stop signal in a couple of seconts after running the job
time::sleep(Duration::from_secs(2)).await;
println!("A stop signal received");

// Stop listening for new jobs
handle.stop().await;

// Wait for the running jobs to stop for maximum 10 seconds
handle.wait_jobs_finish(Duration::from_secs(10)).await;

Ok(())
}

#[job]
pub async fn sleep(mut job: CurrentJob) -> sqlx::Result<()> {
let second = Duration::from_secs(1);
let mut to_sleep: u64 = job.json().unwrap().unwrap();
while to_sleep > 0 {
println!("job#{} {to_sleep} more seconds to sleep ...", job.id());
time::sleep(second).await;
to_sleep -= 1;
}
job.complete().await
}
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ mod tests {
async fn test_job_runner<F: Future + Send + 'static>(
pool: &Pool<Postgres>,
f: impl (Fn(CurrentJob) -> F) + Send + Sync + 'static,
) -> (OwnedHandle, Arc<AtomicUsize>)
) -> (JobRunnerHandle, Arc<AtomicUsize>)
where
F::Output: Send + 'static,
{
Expand Down Expand Up @@ -365,7 +365,7 @@ mod tests {
Ok(())
}

async fn named_job_runner(pool: &Pool<Postgres>) -> OwnedHandle {
async fn named_job_runner(pool: &Pool<Postgres>) -> JobRunnerHandle {
let mut registry = JobRegistry::new(&[example_job1, example_job2, example_job_with_ctx]);
registry.set_context(42).set_context("Hello, world!");
registry.runner(pool).run().await.unwrap()
Expand Down
42 changes: 36 additions & 6 deletions src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::borrow::Cow;
use std::fmt::Debug;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};

use serde::{Deserialize, Serialize};
use sqlx::postgres::types::PgInterval;
Expand Down Expand Up @@ -32,6 +32,12 @@ struct JobRunner {
notify: Notify,
}

/// Job runner handle
pub struct JobRunnerHandle {
runner: Arc<JobRunner>,
handle: Option<OwnedHandle>,
}

/// Type used to checkpoint a running job.
#[derive(Debug, Clone, Default)]
pub struct Checkpoint<'a> {
Expand Down Expand Up @@ -253,18 +259,19 @@ impl JobRunnerOptions {

/// Start the job runner in the background. The job runner will stop when the
/// returned handle is dropped.
pub async fn run(&self) -> Result<OwnedHandle, sqlx::Error> {
pub async fn run(&self) -> Result<JobRunnerHandle, sqlx::Error> {
let options = self.clone();
let job_runner = Arc::new(JobRunner {
options,
running_jobs: AtomicUsize::new(0),
notify: Notify::new(),
});
let listener_task = start_listener(job_runner.clone()).await?;
Ok(OwnedHandle::new(task::spawn(main_loop(
job_runner,
listener_task,
))))
let handle = OwnedHandle::new(task::spawn(main_loop(job_runner.clone(), listener_task)));
Ok(JobRunnerHandle {
runner: job_runner,
handle: Some(handle),
})
}

/// Run a single job and then return. Intended for use by tests. The job should
Expand Down Expand Up @@ -320,6 +327,29 @@ impl JobRunnerOptions {
}
}

impl JobRunnerHandle {
/// Return the number of still running jobs
pub fn num_running_jobs(&self) -> usize {
self.runner.running_jobs.load(Ordering::Relaxed)
}

/// Wait for the jobs to finish, but not more than `timeout`
pub async fn wait_jobs_finish(&self, timeout: Duration) {
let start = Instant::now();
let step = Duration::from_millis(1);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The step doesn't need to be this small, 10ms will give other tasks more time to run.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other than this, the changes look great 👍

while self.num_running_jobs() > 0 && start.elapsed() < timeout {
tokio::time::sleep(step).await;
}
}

/// Stop the inner task and wait for it to finish.
pub async fn stop(&mut self) {
if let Some(handle) = self.handle.take() {
handle.stop().await
}
}
}

async fn start_listener(job_runner: Arc<JobRunner>) -> Result<OwnedHandle, sqlx::Error> {
let mut listener = PgListener::connect_with(&job_runner.options.pool).await?;
if let Some(channels) = &job_runner.options.channel_names {
Expand Down