Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.

Add functionality to stop and restart processes #25

Merged
merged 4 commits into from
Dec 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/provider/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,6 @@ pub enum StackableError {
"The following config maps were specified in a pod but not found: {missing_config_maps:?}"
)]
MissingConfigMapsError { missing_config_maps: Vec<String> },
#[error("Object is missing key: {key}")]
MissingObjectKey { key: &'static str },
}
20 changes: 18 additions & 2 deletions src/provider/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ use kubelet::provider::Provider;
use log::{debug, error};

use crate::provider::error::StackableError;
use crate::provider::error::StackableError::{CrdMissing, KubeError, PodValidationError};
use crate::provider::error::StackableError::{
CrdMissing, KubeError, MissingObjectKey, PodValidationError,
};
use crate::provider::repository::package::Package;
use crate::provider::states::downloading::Downloading;
use crate::provider::states::terminated::Terminated;
Expand All @@ -40,13 +42,15 @@ pub struct PodState {
log_directory: PathBuf,
package_download_backoff_strategy: ExponentialBackoffStrategy,
service_name: String,
service_uid: String,
package: Package,
process_handle: Option<Child>,
}

impl PodState {
pub fn get_service_config_directory(&self) -> PathBuf {
self.config_directory.join(&self.service_name)
self.config_directory
.join(format!("{}-{}", &self.service_name, &self.service_uid))
}

pub fn get_service_package_directory(&self) -> PathBuf {
Expand Down Expand Up @@ -153,6 +157,17 @@ impl Provider for StackableProvider {

async fn initialize_pod_state(&self, pod: &Pod) -> anyhow::Result<Self::PodState> {
let service_name = pod.name();

// Extract uid from pod object, if this fails we return an error -
// this should not happen, as all objects we get from Kubernetes should have
// a uid set!
let service_uid = if let Some(uid) = pod.as_kube_pod().metadata.uid.as_ref() {
uid.to_string()
} else {
return Err(anyhow::Error::new(MissingObjectKey {
key: ".metadata.uid",
}));
};
let parcel_directory = self.parcel_directory.clone();
// TODO: make this configurable
let download_directory = parcel_directory.join("_download");
Expand All @@ -175,6 +190,7 @@ impl Provider for StackableProvider {
config_directory: self.config_directory.clone(),
package_download_backoff_strategy: ExponentialBackoffStrategy::default(),
service_name: String::from(service_name),
service_uid,
package,
process_handle: None,
})
Expand Down
37 changes: 24 additions & 13 deletions src/provider/states/running.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::process::Child;

use k8s_openapi::api::core::v1::{
ContainerState, ContainerStateRunning, ContainerStatus as KubeContainerStatus,
};
use kubelet::pod::Pod;
use kubelet::state::prelude::*;
use kubelet::state::{State, Transition};
Expand All @@ -9,15 +10,10 @@ use crate::provider::states::failed::Failed;
use crate::provider::states::installing::Installing;
use crate::provider::states::stopping::Stopping;
use crate::provider::PodState;
use k8s_openapi::api::core::v1::{
ContainerState, ContainerStateRunning, ContainerStatus as KubeContainerStatus,
};

#[derive(Debug, TransitionTo)]
#[transition_to(Stopping, Failed, Running, Installing)]
pub struct Running {
pub process_handle: Option<Child>,
}
pub struct Running {}

#[async_trait::async_trait]
impl State<PodState> for Running {
Expand All @@ -26,17 +22,32 @@ impl State<PodState> for Running {
pod_state: &mut PodState,
_pod: &Pod,
) -> Transition<PodState> {
debug!("waiting");
let mut handle = std::mem::replace(&mut self.process_handle, None).unwrap();

loop {
tokio::select! {
_ = tokio::time::delay_for(std::time::Duration::from_secs(1)) => {
trace!("Checking if service {} is still running.", &pod_state.service_name);
}
}
match handle.try_wait() {
Ok(None) => debug!("Service {} is still running", &pod_state.service_name),

// Obtain a mutable reference to the process handle
let child = if let Some(testproc) = pod_state.process_handle.as_mut() {
testproc
} else {
return Transition::next(
self,
Failed {
message: "Unable to obtain process handle from podstate!".to_string(),
},
);
};

// Check if an exit code is available for the process - if yes, it exited
match child.try_wait() {
Ok(None) => debug!(
"Service {} is still running with pid {}",
&pod_state.service_name,
child.id()
),
_ => {
error!(
"Service {} died unexpectedly, moving to failed state",
Expand Down
11 changes: 4 additions & 7 deletions src/provider/states/starting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,10 @@ impl State<PodState> for Starting {
);
}
}
//pod_state.process_handle = Some(child);
return Transition::next(
self,
Running {
process_handle: Some(child),
},
);
// Store the child handle in the podstate so that later states
// can use it
pod_state.process_handle = Some(child);
return Transition::next(self, Running {});
}
Err(error) => {
let error_message = format!("Failed to start process with error {}", error);
Expand Down
4 changes: 2 additions & 2 deletions src/provider/states/stopping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ pub struct Stopping;
impl State<PodState> for Stopping {
async fn next(self: Box<Self>, pod_state: &mut PodState, _pod: &Pod) -> Transition<PodState> {
if let Some(child) = &pod_state.process_handle {
let pid = child.id();
info!(
"Received stop command for service {}, stopping process with pid {}",
pod_state.service_name, pid
pod_state.service_name,
child.id()
);
}
Transition::next(self, Stopped)
Expand Down
30 changes: 27 additions & 3 deletions src/provider/states/terminated.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::anyhow;
use kubelet::state::prelude::*;
use log::info;
use log::{debug, error, info};

use crate::provider::PodState;

Expand All @@ -12,8 +13,31 @@ pub struct Terminated {
#[async_trait::async_trait]
impl State<PodState> for Terminated {
async fn next(self: Box<Self>, pod_state: &mut PodState, _pod: &Pod) -> Transition<PodState> {
info!("Service {} was terminated!", &pod_state.service_name);
Transition::Complete(Ok(()))
info!(
"Pod {} was terminated, stopping process!",
&pod_state.service_name
);
// Obtain a mutable reference to the process handle
let child = if let Some(testproc) = pod_state.process_handle.as_mut() {
testproc
} else {
return Transition::Complete(Err(anyhow!("Unable to retrieve process handle")));
};

return match child.kill() {
Ok(()) => {
debug!("Successfully killed process {}", pod_state.service_name);
Transition::Complete(Ok(()))
}
Err(e) => {
error!(
"Failed to stop process with pid {} due to: {:?}",
child.id(),
e
);
Transition::Complete(Err(anyhow::Error::new(e)))
}
};
}

async fn json_status(
Expand Down