Skip to content

Commit

Permalink
Add support for catching signals
Browse files Browse the repository at this point in the history
This is used if the user ends an Aperf run using Ctrl+c or kill <pid>.
  • Loading branch information
janaknat committed Jul 25, 2024
1 parent 820288a commit cad8541
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 85 deletions.
84 changes: 28 additions & 56 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ name = "aperf"
path = "src/bin/aperf.rs"

[dependencies]
nix = { version = "0.29.0", features = ["signal", "poll"] }
clap = { version = "4.2.5", features = ["derive"] }
serde = { version = "1.0", features = ["derive"] }
chrono = { version = "0.4", features = ["serde"] }
Expand All @@ -29,7 +30,7 @@ thiserror = "1.0"
log = "0.4.21"
env_logger = "0.10.0"
lazy_static = "1.4.0"
timerfd = "1.3.0"
timerfd = "1.6.0"
procfs = "0.12.0"
ctor = "0.2.6"
sysinfo = "0.26.2"
Expand Down
99 changes: 71 additions & 28 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,15 @@ use chrono::prelude::*;
use data::TimeEnum;
use flate2::{write::GzEncoder, Compression};
use log::{debug, error, info};
use nix::poll::{poll, PollFd, PollFlags, PollTimeout};
use nix::sys::{
signal,
signalfd::{SfdFlags, SigSet, SignalFd},
};
use serde::{Deserialize, Serialize};
use serde_json::{self};
use std::collections::HashMap;
use std::os::unix::io::AsFd;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use std::{fs, process, time};
Expand Down Expand Up @@ -243,6 +249,7 @@ impl PerformanceData {
let mut current = time::Instant::now();
let end = current + time::Duration::from_secs(self.init_params.period);

// TimerFd
let mut tfd = TimerFd::new()?;
tfd.set_state(
TimerState::Periodic {
Expand All @@ -251,40 +258,76 @@ impl PerformanceData {
},
SetTimeFlags::Default,
);
let timer_pollfd = PollFd::new(tfd.as_fd(), PollFlags::POLLIN);

// SignalFd
let mut mask = SigSet::empty();
mask.add(signal::SIGINT);
mask.add(signal::SIGTERM);
mask.thread_block()?;
let sfd = SignalFd::with_flags(&mask, SfdFlags::SFD_NONBLOCK)?;
let signal_pollfd = PollFd::new(sfd.as_fd(), PollFlags::POLLIN);

let mut poll_fds = [timer_pollfd, signal_pollfd];

while current <= end {
aperf_collect_data.time = TimeEnum::DateTime(Utc::now());
aperf_collect_data.data = HashMap::new();
let ret = tfd.read();
if ret > 1 {
error!("Missed {} interval(s)", ret - 1);
if poll(&mut poll_fds, PollTimeout::NONE)? <= 0 {
error!("Poll error.");
}
if let Some(ev) = poll_fds[0].revents() {
if ev.contains(PollFlags::POLLIN) {
let ret = tfd.read();
if ret > 1 {
error!("Missed {} interval(s)", ret - 1);
}
debug!("Time elapsed: {:?}", start.elapsed());
current += time::Duration::from_secs(ret * self.init_params.interval);
for (name, datatype) in self.collectors.iter_mut() {
if datatype.is_static {
continue;
}

datatype.collector_params.elapsed_time = start.elapsed().as_secs();

aperf_collect_data.measure(
name.clone() + "-collect",
|| -> Result<()> {
datatype.collect_data()?;
Ok(())
},
)?;
aperf_collect_data.measure(name.clone() + "-print", || -> Result<()> {
datatype.write_to_file()?;
Ok(())
})?;
}
let data_collection_time = time::Instant::now() - current;
aperf_collect_data
.data
.insert("aperf".to_string(), data_collection_time.as_micros() as u64);
debug!("Collection time: {:?}", data_collection_time);
bincode::serialize_into(
self.aperf_stats_handle.as_ref().unwrap(),
&aperf_collect_data,
)?;
}
}
debug!("Time elapsed: {:?}", start.elapsed());
current += time::Duration::from_secs(ret * self.init_params.interval);
for (name, datatype) in self.collectors.iter_mut() {
if datatype.is_static {
continue;
if let Some(ev) = poll_fds[1].revents() {
if ev.contains(PollFlags::POLLIN) {
if let Ok(Some(siginfo)) = sfd.read_signal() {
if siginfo.ssi_signo == signal::SIGINT as u32 {
info!("Caught SIGINT. Exiting...");
} else if siginfo.ssi_signo == signal::SIGTERM as u32 {
info!("Caught SIGTERM. Exiting...");
} else {
error!("Caught unknown signal: {}. Exiting...", siginfo.ssi_signo);
}
break;
}
}

datatype.collector_params.elapsed_time = start.elapsed().as_secs();

aperf_collect_data.measure(name.clone() + "-collect", || -> Result<()> {
datatype.collect_data()?;
Ok(())
})?;
aperf_collect_data.measure(name.clone() + "-print", || -> Result<()> {
datatype.write_to_file()?;
Ok(())
})?;
}
let data_collection_time = time::Instant::now() - current;
aperf_collect_data
.data
.insert("aperf".to_string(), data_collection_time.as_micros() as u64);
debug!("Collection time: {:?}", data_collection_time);
bincode::serialize_into(
self.aperf_stats_handle.as_ref().unwrap(),
&aperf_collect_data,
)?;
}
for (_name, datatype) in self.collectors.iter_mut() {
datatype.finish_data_collection()?;
Expand Down

0 comments on commit cad8541

Please # to comment.