-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
Graceful shutdown #38
Comments
This doesn't work because messages may be scheduled arbitrarily far into the future, or be "dead" (attempts = 0). Applications which require zero downtime upgrades are expected to only make backwards-compatible changes to the message payloads. With serde, this is generally pretty easy to achieve. In cases where it is not possible, you can add a new job instead of modifying the old one, and then run that version of the code at least until all legacy jobs are gone from the database. (This is better than shutdown logic, because there is no time limit on how long you can run this version of the application) |
Though it's significantly more complicated.
Not in general, but for each particular application its developer could accept a viable strategy. Most of apps could allow some downtime. |
That said, I agree there is a subsection of applications which:
For which the effort of maintaining backwards compatibility of messages is unncessary. The question is whether that subsection is large enough, and the extra effort burdensome enough to warrant this feature? I think for me this falls into the gap of: not important enough (and with enough challenges in implementation) to not want to implement myself, but maybe I would accept a PR with some conditions:
|
Could you explain how to achieve zero-downtime while adding a new job? Shouldn't we restart all runners before spawning the job? Because other way it won't be in their registries. So shouldn't there be a way to at least wait for the current tasks to gracefully finish before restarting the runners? |
You would do a rolling restart of your replicas with the new version.
Oh you just mean waiting for already-picked-up jobs to complete. This is already mostly possible - when you drop (or call This second part could be made a little easier. |
It seems to also cancel all the existing tasks. Or maybe I got something wrong. Would you look at the code: https://github.com/imbolc/sqlxmq/blob/signal-shutdown/examples/signal-shutdown.rs |
It's only cancelling the existing tasks because you're returning from use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
dotenv::dotenv().ok();
let db = sqlx::PgPool::connect(&std::env::var("DATABASE_URL").unwrap()).await?;
sleep.builder().set_json(&10u64)?.spawn(&db).await?;
let handle = sqlxmq::JobRegistry::new(&[sleep]).runner(&db).run().await?;
println!("Waiting 2 seconds...");
tokio::time::sleep(Duration::from_secs(2)).await;
println!("Stopping...");
handle.stop().await;
tokio::time::sleep(Duration::from_secs(10)).await;
Ok(())
}
#[sqlxmq::job]
pub async fn sleep(mut job: sqlxmq::CurrentJob) -> sqlx::Result<()> {
let second = std::time::Duration::from_secs(1);
let mut to_sleep: u64 = job.json().unwrap().unwrap();
while to_sleep > 0 {
println!("job#{} {to_sleep} more seconds to sleep ...", job.id());
tokio::time::sleep(second).await;
to_sleep -= 1;
}
job.complete().await
} The way You can see that the implementation provided by the JobRegistry is not even async: Line 91 in 592b7e6
Ideally tokio would have some way to "wait until all tasks are done", but it does not. |
Right, I see now, thank you :) Maybe we add an |
Something like that, yeah. |
It seems like it's already there: JobRunner::running_jobs. But there's no access to |
I found that |
After changes in jobs code previously added payloads may not be valid anymore. A solution could be to stop spawning new jobs and wait until all the remaining jobs are done before restarting the runner. The only way to do it I could find for now is to query db directly
.. from mq_msgs where attempts > 0
. Should we add a method for this so users won't rely on the implementation details?The text was updated successfully, but these errors were encountered: