From e82b0582e61b91429907b9118f40ec2d38c2bcff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Liebau?= Date: Thu, 3 Dec 2020 22:45:22 +0100 Subject: [PATCH 1/4] Add functionality to stop processes, which in turn enables restarting on recreated pods. Also contains change to add pod-uid to the config directory name. --- src/provider/mod.rs | 6 ++++- src/provider/states/running.rs | 37 ++++++++++++++++++++----------- src/provider/states/starting.rs | 11 ++++----- src/provider/states/stopping.rs | 3 +-- src/provider/states/terminated.rs | 30 ++++++++++++++++++++++--- 5 files changed, 61 insertions(+), 26 deletions(-) diff --git a/src/provider/mod.rs b/src/provider/mod.rs index cacb5b1..77555ac 100644 --- a/src/provider/mod.rs +++ b/src/provider/mod.rs @@ -40,13 +40,15 @@ pub struct PodState { log_directory: PathBuf, package_download_backoff_strategy: ExponentialBackoffStrategy, service_name: String, + service_uid: String, package: Package, process_handle: Option, } impl PodState { pub fn get_service_config_directory(&self) -> PathBuf { - self.config_directory.join(&self.service_name) + self.config_directory + .join(format!("{}-{}", &self.service_name, &self.service_uid)) } pub fn get_service_package_directory(&self) -> PathBuf { @@ -153,6 +155,7 @@ impl Provider for StackableProvider { async fn initialize_pod_state(&self, pod: &Pod) -> anyhow::Result { let service_name = pod.name(); + let service_uid = String::from(pod.as_kube_pod().metadata.uid.as_ref().unwrap()); let parcel_directory = self.parcel_directory.clone(); // TODO: make this configurable let download_directory = parcel_directory.join("_download"); @@ -175,6 +178,7 @@ impl Provider for StackableProvider { config_directory: self.config_directory.clone(), package_download_backoff_strategy: ExponentialBackoffStrategy::default(), service_name: String::from(service_name), + service_uid, package, process_handle: None, }) diff --git a/src/provider/states/running.rs b/src/provider/states/running.rs index 8ba6b27..7ea35ed 100644 --- a/src/provider/states/running.rs +++ b/src/provider/states/running.rs @@ -1,5 +1,6 @@ -use std::process::Child; - +use k8s_openapi::api::core::v1::{ + ContainerState, ContainerStateRunning, ContainerStatus as KubeContainerStatus, +}; use kubelet::pod::Pod; use kubelet::state::prelude::*; use kubelet::state::{State, Transition}; @@ -9,15 +10,10 @@ use crate::provider::states::failed::Failed; use crate::provider::states::installing::Installing; use crate::provider::states::stopping::Stopping; use crate::provider::PodState; -use k8s_openapi::api::core::v1::{ - ContainerState, ContainerStateRunning, ContainerStatus as KubeContainerStatus, -}; #[derive(Debug, TransitionTo)] #[transition_to(Stopping, Failed, Running, Installing)] -pub struct Running { - pub process_handle: Option, -} +pub struct Running {} #[async_trait::async_trait] impl State for Running { @@ -26,17 +22,32 @@ impl State for Running { pod_state: &mut PodState, _pod: &Pod, ) -> Transition { - debug!("waiting"); - let mut handle = std::mem::replace(&mut self.process_handle, None).unwrap(); - loop { tokio::select! { _ = tokio::time::delay_for(std::time::Duration::from_secs(1)) => { trace!("Checking if service {} is still running.", &pod_state.service_name); } } - match handle.try_wait() { - Ok(None) => debug!("Service {} is still running", &pod_state.service_name), + + // Obtain a mutable reference to the process handle + let child = if let Some(testproc) = pod_state.process_handle.as_mut() { + testproc + } else { + return Transition::next( + self, + Failed { + message: "Unable to obtain process handle from podstate!".to_string(), + }, + ); + }; + + // Check if an exit code is available for the process - if yes, it exited + match child.try_wait() { + Ok(None) => debug!( + "Service {} is still running with pid {}", + &pod_state.service_name, + child.id() + ), _ => { error!( "Service {} died unexpectedly, moving to failed state", diff --git a/src/provider/states/starting.rs b/src/provider/states/starting.rs index 7c743a1..0b95fe9 100644 --- a/src/provider/states/starting.rs +++ b/src/provider/states/starting.rs @@ -115,13 +115,10 @@ impl State for Starting { ); } } - //pod_state.process_handle = Some(child); - return Transition::next( - self, - Running { - process_handle: Some(child), - }, - ); + // Store the child handle in the podstate so that later states + // can use it + pod_state.process_handle = Some(child); + return Transition::next(self, Running {}); } Err(error) => { let error_message = format!("Failed to start process with error {}", error); diff --git a/src/provider/states/stopping.rs b/src/provider/states/stopping.rs index 75271c2..c1bc04e 100644 --- a/src/provider/states/stopping.rs +++ b/src/provider/states/stopping.rs @@ -15,10 +15,9 @@ pub struct Stopping; impl State for Stopping { async fn next(self: Box, pod_state: &mut PodState, _pod: &Pod) -> Transition { if let Some(child) = &pod_state.process_handle { - let pid = child.id(); info!( "Received stop command for service {}, stopping process with pid {}", - pod_state.service_name, pid + pod_state.service_name, &1 ); } Transition::next(self, Stopped) diff --git a/src/provider/states/terminated.rs b/src/provider/states/terminated.rs index 651ef91..528d214 100644 --- a/src/provider/states/terminated.rs +++ b/src/provider/states/terminated.rs @@ -1,5 +1,6 @@ +use anyhow::anyhow; use kubelet::state::prelude::*; -use log::info; +use log::{debug, error, info}; use crate::provider::PodState; @@ -12,8 +13,31 @@ pub struct Terminated { #[async_trait::async_trait] impl State for Terminated { async fn next(self: Box, pod_state: &mut PodState, _pod: &Pod) -> Transition { - info!("Service {} was terminated!", &pod_state.service_name); - Transition::Complete(Ok(())) + info!( + "Pod {} was terminated, stopping process!", + &pod_state.service_name + ); + // Obtain a mutable reference to the process handle + let child = if let Some(testproc) = pod_state.process_handle.as_mut() { + testproc + } else { + return Transition::Complete(Err(anyhow!("Unable to retrieve process handle"))); + }; + + return match child.kill() { + Ok(()) => { + debug!("Successfully killed process {}", pod_state.service_name); + Transition::Complete(Ok(())) + } + Err(e) => { + error!( + "Failed to stop process with pid {} due to: {:?}", + child.id(), + e + ); + Transition::Complete(Err(anyhow::Error::new(e))) + } + }; } async fn json_status( From 4a24e7944ea4a6aa006a8db5d08ff1850d0af0d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Liebau?= Date: Thu, 10 Dec 2020 09:10:41 +0100 Subject: [PATCH 2/4] Adressed review comments from Lars --- src/provider/error.rs | 2 ++ src/provider/mod.rs | 16 ++++++++++++++-- src/provider/states/stopping.rs | 2 +- 3 files changed, 17 insertions(+), 3 deletions(-) diff --git a/src/provider/error.rs b/src/provider/error.rs index b7e61b5..ffb3eba 100644 --- a/src/provider/error.rs +++ b/src/provider/error.rs @@ -43,4 +43,6 @@ pub enum StackableError { "The following config maps were specified in a pod but not found: {missing_config_maps:?}" )] MissingConfigMapsError { missing_config_maps: Vec }, + #[error("An object received from Kubernetes didn't contain a required field: {field_name}")] + IllegalKubeObject {field_name: String} } diff --git a/src/provider/mod.rs b/src/provider/mod.rs index 77555ac..1c41100 100644 --- a/src/provider/mod.rs +++ b/src/provider/mod.rs @@ -13,7 +13,9 @@ use kubelet::provider::Provider; use log::{debug, error}; use crate::provider::error::StackableError; -use crate::provider::error::StackableError::{CrdMissing, KubeError, PodValidationError}; +use crate::provider::error::StackableError::{ + CrdMissing, IllegalKubeObject, KubeError, PodValidationError, +}; use crate::provider::repository::package::Package; use crate::provider::states::downloading::Downloading; use crate::provider::states::terminated::Terminated; @@ -155,7 +157,17 @@ impl Provider for StackableProvider { async fn initialize_pod_state(&self, pod: &Pod) -> anyhow::Result { let service_name = pod.name(); - let service_uid = String::from(pod.as_kube_pod().metadata.uid.as_ref().unwrap()); + + // Extract uid from pod object, if this fails we return an error - + // this should not happen, as all objects we get from Kubernetes should have + // a uid set! + let service_uid = if let Some(uid) = pod.as_kube_pod().metadata.uid.as_ref() { + uid.to_string() + } else { + return Err(anyhow::Error::new(IllegalKubeObject { + field_name: "uid".to_string(), + })); + }; let parcel_directory = self.parcel_directory.clone(); // TODO: make this configurable let download_directory = parcel_directory.join("_download"); diff --git a/src/provider/states/stopping.rs b/src/provider/states/stopping.rs index c1bc04e..769690c 100644 --- a/src/provider/states/stopping.rs +++ b/src/provider/states/stopping.rs @@ -17,7 +17,7 @@ impl State for Stopping { if let Some(child) = &pod_state.process_handle { info!( "Received stop command for service {}, stopping process with pid {}", - pod_state.service_name, &1 + pod_state.service_name, child.id() ); } Transition::next(self, Stopped) From b7b92c90238ec1053b0849d2453e158d5740564e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Liebau?= Date: Thu, 10 Dec 2020 09:55:48 +0100 Subject: [PATCH 3/4] Missed some rustfmt issues. --- src/provider/error.rs | 2 +- src/provider/states/stopping.rs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/provider/error.rs b/src/provider/error.rs index ffb3eba..d331d66 100644 --- a/src/provider/error.rs +++ b/src/provider/error.rs @@ -44,5 +44,5 @@ pub enum StackableError { )] MissingConfigMapsError { missing_config_maps: Vec }, #[error("An object received from Kubernetes didn't contain a required field: {field_name}")] - IllegalKubeObject {field_name: String} + IllegalKubeObject { field_name: String }, } diff --git a/src/provider/states/stopping.rs b/src/provider/states/stopping.rs index 769690c..65d5a4a 100644 --- a/src/provider/states/stopping.rs +++ b/src/provider/states/stopping.rs @@ -17,7 +17,8 @@ impl State for Stopping { if let Some(child) = &pod_state.process_handle { info!( "Received stop command for service {}, stopping process with pid {}", - pod_state.service_name, child.id() + pod_state.service_name, + child.id() ); } Transition::next(self, Stopped) From caa5f7987eda4ce194d00dc769d660db793900ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B6nke=20Liebau?= Date: Thu, 10 Dec 2020 12:33:53 +0100 Subject: [PATCH 4/4] Changed used error type for missing uid to be in line with what Lars uses in the operator framework (which in turn is copied from the kubernetes client crate). --- src/provider/error.rs | 4 ++-- src/provider/mod.rs | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/provider/error.rs b/src/provider/error.rs index d331d66..edd86e5 100644 --- a/src/provider/error.rs +++ b/src/provider/error.rs @@ -43,6 +43,6 @@ pub enum StackableError { "The following config maps were specified in a pod but not found: {missing_config_maps:?}" )] MissingConfigMapsError { missing_config_maps: Vec }, - #[error("An object received from Kubernetes didn't contain a required field: {field_name}")] - IllegalKubeObject { field_name: String }, + #[error("Object is missing key: {key}")] + MissingObjectKey { key: &'static str }, } diff --git a/src/provider/mod.rs b/src/provider/mod.rs index 1c41100..d6b042f 100644 --- a/src/provider/mod.rs +++ b/src/provider/mod.rs @@ -14,7 +14,7 @@ use log::{debug, error}; use crate::provider::error::StackableError; use crate::provider::error::StackableError::{ - CrdMissing, IllegalKubeObject, KubeError, PodValidationError, + CrdMissing, KubeError, MissingObjectKey, PodValidationError, }; use crate::provider::repository::package::Package; use crate::provider::states::downloading::Downloading; @@ -164,8 +164,8 @@ impl Provider for StackableProvider { let service_uid = if let Some(uid) = pod.as_kube_pod().metadata.uid.as_ref() { uid.to_string() } else { - return Err(anyhow::Error::new(IllegalKubeObject { - field_name: "uid".to_string(), + return Err(anyhow::Error::new(MissingObjectKey { + key: ".metadata.uid", })); }; let parcel_directory = self.parcel_directory.clone();