Skip to content

Commit

Permalink
Handle GitLab job cancellation
Browse files Browse the repository at this point in the history
When a job is cancelled on GitLab, the corresponding handler futures
will be dropped. If the handler was already created, the cleanup()
method will still be called, giving the opportunity to clean up after
anything that may have been interrupted.

Signed-off-by: Ryan Gonzalez <ryan.gonzalez@collabora.com>
  • Loading branch information
refi64 authored and sjoerdsimons committed Oct 17, 2022
1 parent ad2e41a commit 9107456
Show file tree
Hide file tree
Showing 7 changed files with 312 additions and 48 deletions.
14 changes: 9 additions & 5 deletions gitlab-runner-mock/src/api/trace.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use wiremock::ResponseTemplate;
use wiremock::{Request, Respond};

use crate::GitlabRunnerMock;
use crate::{GitlabRunnerMock, MockJobState};

pub(crate) struct JobTraceResponder {
mock: GitlabRunnerMock,
Expand Down Expand Up @@ -43,12 +43,16 @@ impl Respond for JobTraceResponder {
if let Some(job) = self.mock.get_job(id) {
if token != job.token() {
ResponseTemplate::new(403)
} else if job.state() != MockJobState::Running {
ResponseTemplate::new(403).insert_header("Job-Status", &*job.state().to_string())
} else {
match job.append_log(request.body.clone(), start, end) {
Ok(()) => ResponseTemplate::new(202).insert_header(
"X-GitLab-Trace-Update-Interval",
&*self.mock.update_interval().to_string(),
),
Ok(()) => ResponseTemplate::new(202)
.insert_header(
"X-GitLab-Trace-Update-Interval",
&*self.mock.update_interval().to_string(),
)
.insert_header("Job-Status", &*job.state().to_string()),
Err(e) => ResponseTemplate::new(416).set_body_string(format!("{:?}", e)),
}
}
Expand Down
9 changes: 7 additions & 2 deletions gitlab-runner-mock/src/api/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl Respond for JobUpdateResponder {
if r.token != job.token() {
ResponseTemplate::new(403)
} else {
match (job.state(), r.state) {
let r = match (job.state(), r.state) {
(MockJobState::Running, MockJobState::Success) => {
job.update_state(r.state);
ResponseTemplate::new(200)
Expand All @@ -50,8 +50,13 @@ impl Respond for JobUpdateResponder {
job.update_state(r.state);
ResponseTemplate::new(200)
}
(current_state, _) if current_state != MockJobState::Running => {
ResponseTemplate::new(403)
}
_ => panic!("Invalid state change"),
}
};

r.append_header("Job-Status", &*job.state().to_string())
}
} else {
ResponseTemplate::new(404)
Expand Down
25 changes: 25 additions & 0 deletions gitlab-runner-mock/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub enum MockJobState {
Running,
Success,
Failed,
Cancelled,
}

impl MockJobState {
Expand All @@ -22,6 +23,23 @@ impl MockJobState {
}
}

impl ToString for MockJobState {
fn to_string(&self) -> String {
match *self {
MockJobState::Pending => "pending",
MockJobState::Running => "running",
MockJobState::Success => "success",
MockJobState::Failed => "failed",
// The spelling mismatch of "cancelled" vs "canceled" is
// intentional: this crate, as well as tokio_util, already use
// "cancelled", so using it here keeps the spelling consistent, even
// if it's not *identical* to the exact GitLab job status.
MockJobState::Cancelled => "canceled",
}
.to_owned()
}
}

#[derive(Debug, Error)]
pub enum LogError {
#[error("Incorrect range start")]
Expand Down Expand Up @@ -198,6 +216,13 @@ impl MockJob {
inner.artifact.clone()
}

pub fn cancel(&self) {
let mut inner = self.inner.lock().unwrap();
assert!(!inner.state.finished(), "Job is already finished");
inner.state_updates += 1;
inner.state = MockJobState::Cancelled;
}

pub(crate) fn update_state(&self, state: MockJobState) {
let mut inner = self.inner.lock().unwrap();
inner.state_updates += 1;
Expand Down
15 changes: 15 additions & 0 deletions gitlab-runner/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ where
}

const GITLAB_TRACE_UPDATE_INTERVAL: &str = "X-GitLab-Trace-Update-Interval";
const JOB_STATUS: &str = "Job-Status";

#[derive(Debug, Clone, Serialize)]
struct FeaturesInfo {
Expand Down Expand Up @@ -193,6 +194,8 @@ impl JobResponse {
pub enum Error {
#[error("Unexpected reply code {0}")]
UnexpectedStatus(StatusCode),
#[error("Job cancelled")]
JobCancelled,
#[error("Request failure {0}")]
Request(#[from] reqwest::Error),
#[error("Failed to write to destination {0}")]
Expand Down Expand Up @@ -248,6 +251,13 @@ impl Client {
}
}

fn check_for_job_cancellation(&self, response: &reqwest::Response) -> Result<(), Error> {
match response.headers().get(JOB_STATUS) {
Some(header) if header == "canceled" => Err(Error::JobCancelled),
_ => Ok(()),
}
}

pub async fn update_job(
&self,
id: u64,
Expand All @@ -263,6 +273,9 @@ impl Client {
let update = JobUpdate { token, state };

let r = self.client.put(url).json(&update).send().await?;

self.check_for_job_cancellation(&r)?;

let trace_update_interval = r
.headers()
.get(GITLAB_TRACE_UPDATE_INTERVAL)
Expand Down Expand Up @@ -307,6 +320,8 @@ impl Client {
.send()
.await?;

self.check_for_job_cancellation(&r)?;

let trace_update_interval = r
.headers()
.get(GITLAB_TRACE_UPDATE_INTERVAL)
Expand Down
88 changes: 82 additions & 6 deletions gitlab-runner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ pub mod job;
use job::{Job, JobLog};
pub mod uploader;
pub use logging::GitlabLayer;
use tokio_util::sync::CancellationToken;
use tracing::instrument::WithSubscriber;
mod runlist;
use crate::runlist::RunList;
Expand Down Expand Up @@ -105,34 +106,109 @@ macro_rules! outputln {
pub type JobResult = Result<(), ()>;
pub use client::Phase;

/// Async trait for handling a single Job that handles its own cancellation
///
/// This trait is largely identical to [`JobHandler`], but its methods explicitly take a
/// `CancellationToken`, which will be triggered if GitLab cancels the job, after which the method
/// will be responsible for cancellation appropriately. In most cases, the entire execution should
/// simply be cancelled, in which case [`JobHandler`]'s default behavior is desirable instead. (Even
/// when cancelled, `cleanup` will still be called, allowing any cleanup tasks to be performed.)
///
/// Note that this is an asynchronous trait which should be implemented by using the [`async_trait`]
/// crate. However this also means the rustdoc documentation is interesting...
#[async_trait::async_trait]
pub trait CancellableJobHandler: Send {
/// Do a single step of a job
///
/// This gets called for each phase of the job (e.g. script and after_script). The passed
/// string array is the same array as was passed for a given step in the job definition. If the
/// job is cancelled while this is running, the given `cancel_token` will be triggered.
///
/// Note that gitlab concatinates the `before_script` and `script` arrays into a single
/// [Phase::Script] step
async fn step(
&mut self,
script: &[String],
phase: Phase,
cancel_token: &CancellationToken,
) -> JobResult;
/// Upload artifacts to gitlab
///
/// This gets called depending on whether the job definition calls for artifacts to be uploaded
/// based on the result of the script run. If the job is cancelled while this is running, it
/// will still be run to completion. If the job was already cancelled, this will not be called.
async fn upload_artifacts(&mut self, _uploader: &mut Uploader) -> JobResult {
Ok(())
}
/// Cleanup after the job is finished
///
/// This method always get called whether or not the job succeeded or was acancelled, allowing
/// the job handler to clean up as necessary.
async fn cleanup(&mut self) {}
}

/// Async trait for handling a single Job
///
/// In the event of being cancelled by GitLab, the `step` function will have its future dropped
/// instantly. If manual handling of cancellation is required, use `CancellableJobHandler` instead.
/// (Even when cancelled, `cleanup` will still be called, allowing any cleanup tasks to be
/// performed.)
///
/// Note that this is an asynchronous trait which should be implemented by using the [`async_trait`]
/// crate. However this also means the rustdoc documentation is interesting...
#[async_trait::async_trait]
pub trait JobHandler: Send {
/// Do a single step of a job
///
/// This gets called for each phase of the job (e.g. script and after_script). The passed
/// string array is the same array as was passed for a given step in the job definition.cold
/// string array is the same array as was passed for a given step in the job definition. If the
/// job is cancelled while this is running, its future will dropped, resulting in the function's
/// termination.
///
/// Note that gitlab concatinates the `before_script` and `script` arrays into a single
/// [Phase::Script] step
async fn step(&mut self, script: &[String], phase: Phase) -> JobResult;
/// Upload artifacts to gitlab
///
/// This gets called depending on whether the job definition calls for artifacts to be uploaded
/// based on the result of the script run
/// based on the result of the script run. If the job is cancelled while this is running, it
/// will still be run to completion. If the job was already cancelled, this will not be called.
async fn upload_artifacts(&mut self, _uploader: &mut Uploader) -> JobResult {
Ok(())
}
/// Cleanup after the job is finished
///
/// This method always get called whether or not the job succeeded, allowing the job handler to
/// clean up as necessary.
/// This method always get called whether or not the job succeeded or was cancelled, allowing
/// the job handler to clean up as necessary.
async fn cleanup(&mut self) {}
}

#[async_trait::async_trait]
impl<J> CancellableJobHandler for J
where
J: JobHandler,
{
async fn step(
&mut self,
script: &[String],
phase: Phase,
cancel_token: &CancellationToken,
) -> JobResult {
tokio::select! {
r = self.step(script, phase) => r,
_ = cancel_token.cancelled() => Ok(()),
}
}

async fn upload_artifacts(&mut self, uploader: &mut Uploader) -> JobResult {
self.upload_artifacts(uploader).await
}

async fn cleanup(&mut self) {
self.cleanup().await;
}
}

/// Runner for gitlab
///
/// The runner is responsible for communicating with gitlab to request new job and spawn them.
Expand Down Expand Up @@ -216,7 +292,7 @@ impl Runner {
pub async fn request_job<F, J, Ret>(&mut self, process: F) -> Result<bool, client::Error>
where
F: FnOnce(Job) -> Ret + Sync + Send + 'static,
J: JobHandler + Send + 'static,
J: CancellableJobHandler + Send + 'static,
Ret: Future<Output = Result<J, ()>> + Send + 'static,
{
let response = self.client.request_job().await?;
Expand Down Expand Up @@ -250,7 +326,7 @@ impl Runner {
pub async fn run<F, J, Ret>(&mut self, process: F, maximum: usize) -> Result<(), client::Error>
where
F: Fn(Job) -> Ret + Sync + Send + 'static + Clone,
J: JobHandler + Send + 'static,
J: CancellableJobHandler + Send + 'static,
Ret: Future<Output = Result<J, ()>> + Send + 'static,
{
loop {
Expand Down
Loading

0 comments on commit 9107456

Please # to comment.