Skip to content

Commit

Permalink
Report instance properties and keep alived. (#46)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmjoy authored Jan 19, 2023
1 parent 9d76bc3 commit 384564f
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 83 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ once_cell = "1.17.0"
phper = "0.10.2"
prost = "0.11.6"
serde_json = { version = "1.0.91", features = ["preserve_order"] }
skywalking = "0.5.0"
skywalking = { version = "0.5.0", features = ["management"] }
systemstat = "0.2.2"
thiserror = "1.0.38"
tokio = { version = "1.24.1", features = ["full"] }
Expand Down
31 changes: 23 additions & 8 deletions src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@ use anyhow::anyhow;
use once_cell::sync::OnceCell;
use skywalking::reporter::{CollectItem, Report};
use std::{
io::Write, mem::size_of, ops::DerefMut, os::unix::net::UnixStream, path::Path, sync::Mutex,
io::Write,
mem::size_of,
ops::DerefMut,
os::unix::net::UnixStream,
path::{Path, PathBuf},
sync::Mutex,
};
use tokio::io::AsyncReadExt;
use tokio::{io::AsyncReadExt, sync::mpsc};
use tracing::error;

fn channel_send<T>(data: CollectItem, mut sender: T) -> anyhow::Result<()>
Expand Down Expand Up @@ -47,15 +52,15 @@ pub async fn channel_receive(receiver: &mut tokio::net::UnixStream) -> anyhow::R
Ok(item)
}

pub struct Reporter<T: AsRef<Path>> {
worker_addr: T,
pub struct Reporter {
worker_addr: PathBuf,
stream: OnceCell<Mutex<UnixStream>>,
}

impl<T: AsRef<Path>> Reporter<T> {
pub fn new(worker_addr: T) -> Self {
impl Reporter {
pub fn new(worker_addr: impl AsRef<Path>) -> Self {
Self {
worker_addr,
worker_addr: worker_addr.as_ref().to_path_buf(),
stream: OnceCell::new(),
}
}
Expand All @@ -71,10 +76,20 @@ impl<T: AsRef<Path>> Reporter<T> {
}
}

impl<T: AsRef<Path>> Report for Reporter<T> {
impl Report for Reporter {
fn report(&self, item: CollectItem) {
if let Err(err) = self.try_report(item) {
error!(?err, "channel send failed");
}
}
}

pub struct TxReporter(pub mpsc::Sender<CollectItem>);

impl Report for TxReporter {
fn report(&self, item: CollectItem) {
if let Err(err) = self.0.try_send(item) {
error!(?err, "Send collect item failed");
}
}
}
11 changes: 10 additions & 1 deletion src/module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use skywalking::{
trace::tracer::{self, Tracer},
};
use std::{borrow::ToOwned, env, ffi::CStr, path::Path, str::FromStr, time::SystemTime};
use tracing::{info, metadata::LevelFilter};
use tracing::{error, info, metadata::LevelFilter};
use tracing_subscriber::FmtSubscriber;

pub static SERVICE_NAME: Lazy<String> = Lazy::new(|| {
Expand Down Expand Up @@ -72,6 +72,15 @@ pub fn init() {
service_instance, skywalking_version, "Starting skywalking agent"
);

// Skywalking version check
if *skywalking_version < 8 {
error!(
skywalking_version,
"The skywalking agent only supports versions after skywalking 8"
);
return;
}

Lazy::force(&SOCKET_FILE_PATH);
init_worker();

Expand Down
160 changes: 87 additions & 73 deletions src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,24 @@
// limitations under the License.

use crate::{
channel, module::SOCKET_FILE_PATH, util::change_permission, SKYWALKING_AGENT_SERVER_ADDR,
SKYWALKING_AGENT_WORKER_THREADS,
channel::{self, TxReporter},
module::{SERVICE_INSTANCE, SERVICE_NAME, SOCKET_FILE_PATH},
util::change_permission,
SKYWALKING_AGENT_SERVER_ADDR, SKYWALKING_AGENT_WORKER_THREADS,
};
use anyhow::anyhow;
use once_cell::sync::Lazy;
use phper::ini::ini_get;
use skywalking::reporter::{
grpc::{CollectItemConsume, GrpcReporter},
CollectItem,
use skywalking::{
management::{instance::Properties, manager::Manager},
reporter::{
grpc::{CollectItemConsume, GrpcReporter},
CollectItem,
},
};
use std::{
cmp::Ordering, error::Error, ffi::CStr, fs, io, num::NonZeroUsize, process::exit,
thread::available_parallelism, time::Duration,
cmp::Ordering, error::Error, ffi::CStr, fs, io, marker::PhantomData, num::NonZeroUsize,
process::exit, thread::available_parallelism, time::Duration,
};
use tokio::{
net::UnixListener,
Expand All @@ -51,7 +57,7 @@ pub fn init_worker() {
unsafe {
// TODO Shutdown previous worker before fork if there is a PHP-FPM reload
// operation.
// TODO Chagne the worker process name.
// TODO Change the worker process name.

let pid = libc::fork();
match pid.cmp(&0) {
Expand All @@ -63,9 +69,17 @@ pub fn init_worker() {
#[cfg(target_os = "linux")]
libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGTERM);

// Run the worker in subprocess.
let rt = new_tokio_runtime(worker_threads);
rt.block_on(start_worker(server_addr));
exit(0);
match rt.block_on(start_worker(server_addr)) {
Ok(_) => {
exit(0);
}
Err(err) => {
error!(?err, "worker exit unexpectedly");
exit(1);
}
}
}
Ordering::Greater => {}
}
Expand All @@ -90,39 +104,25 @@ fn new_tokio_runtime(worker_threads: usize) -> Runtime {
.unwrap()
}

async fn start_worker(server_addr: String) {
async fn start_worker(server_addr: String) -> anyhow::Result<()> {
debug!("Starting worker...");

// Ensure to cleanup resources when worker exits.
let _guard = WorkerExitGuard::default();

// Graceful shutdown signal, put it on the top of program.
let mut sig_term = match signal(SignalKind::terminate()) {
Ok(signal) => signal,
Err(err) => {
error!(?err, "Signal terminate failed");
return;
}
};
let mut sig_int = match signal(SignalKind::interrupt()) {
Ok(signal) => signal,
Err(err) => {
error!(?err, "Signal interrupt failed");
return;
}
};
let mut sig_term = signal(SignalKind::terminate())?;
let mut sig_int = signal(SignalKind::interrupt())?;

let socket_file = SOCKET_FILE_PATH.as_str();

let fut = async move {
debug!(socket_file, "Bind unix stream");
let listener = match UnixListener::bind(socket_file) {
Ok(listener) => listener,
Err(err) => {
error!(?err, "Bind failed");
return;
}
};
let listener = UnixListener::bind(socket_file)?;
change_permission(socket_file, 0o777);

let (tx, rx) = mpsc::channel::<Result<CollectItem, Box<dyn Error + Send>>>(255);
let (tx, rx) = mpsc::channel::<CollectItem>(255);
let tx_ = tx.clone();
tokio::spawn(async move {
loop {
match listener.accept().await {
Expand All @@ -139,13 +139,19 @@ async fn start_worker(server_addr: String) {
debug!("Leaving channel_receive loop");
return;
}
_ => Err(err.into()),
_ => {
error!(?err, "channel_receive failed");
continue;
}
},
Ok(i) => Ok(i),
Ok(i) => i,
};

// Try send here, to prevent the ipc blocking caused by the channel
// bursting (too late to report),
// which affects the pool process of php-fpm.
if let Err(err) = tx.try_send(r) {
error!(?err, "Send failed");
error!(?err, "Send collect item failed");
if !matches!(err, TrySendError::Full(_)) {
return;
}
Expand All @@ -160,48 +166,42 @@ async fn start_worker(server_addr: String) {
}
});

let endpoint = match Endpoint::from_shared(server_addr) {
Ok(endpoint) => endpoint,
Err(err) => {
error!(?err, "Create endpoint failed");
return;
}
};
let endpoint = Endpoint::from_shared(server_addr)?;
let channel = connect(endpoint).await;

report_properties_and_keep_alive(TxReporter(tx_));

let reporter = GrpcReporter::new_with_pc(channel, (), Consumer(rx));

// report_instance_properties(channel.clone()).await;
// mark_ready_for_request();
info!("Worker is ready...");

let handle = reporter
.reporting()
.await
// .with_graceful_shutdown(async move {
// sig.recv().await;
// info!("Shutdown signal received");
// })
.with_status_handle(|status| {
warn!(?status, "Collect failed");
})
.spawn();

if let Err(err) = handle.await {
error!(?err, "Tracer reporting failed");
}
handle
.await
.map_err(|err| anyhow!("Tracer reporting failed: {:?}", err))?;

Ok::<_, anyhow::Error>(())
};

// TODO Do graceful shutdown, and wait 10s then force quit.
select! {
_ = sig_term.recv() => {}
_ = sig_int.recv() => {}
_ = fut => {}
r = fut => {
r?;
}
}

info!("Start to shutdown skywalking grpc reporter");

worker_exit();
Ok(())
}

#[tracing::instrument(skip_all)]
Expand All @@ -222,36 +222,50 @@ async fn connect(endpoint: Endpoint) -> Channel {
channel
}

struct Consumer(mpsc::Receiver<Result<CollectItem, Box<dyn Error + Send>>>);
struct Consumer(mpsc::Receiver<CollectItem>);

#[async_trait]
impl CollectItemConsume for Consumer {
async fn consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>> {
self.0
.recv()
.await
.map(|result| result.map(Some))
.unwrap_or(Ok(None))
Ok(self.0.recv().await)
}

async fn try_consume(&mut self) -> Result<Option<CollectItem>, Box<dyn Error + Send>> {
self.0
.try_recv()
.map(|result| result.map(Some))
.unwrap_or(Ok(None))
Ok(self.0.try_recv().ok())
}
}

fn worker_exit() {
match Lazy::get(&SOCKET_FILE_PATH) {
Some(socket_file) => {
info!(socket_file, "Remove socket file");
if let Err(err) = fs::remove_file(socket_file) {
error!(?err, "Remove socket file failed");
#[derive(Default)]
struct WorkerExitGuard(PhantomData<()>);

impl Drop for WorkerExitGuard {
fn drop(&mut self) {
match Lazy::get(&SOCKET_FILE_PATH) {
Some(socket_file) => {
info!(socket_file, "Remove socket file");
if let Err(err) = fs::remove_file(socket_file) {
error!(?err, "Remove socket file failed");
}
}
None => {
warn!("Socket file not created");
}
}
None => {
warn!("Socket file not created");
}
}
}

fn report_properties_and_keep_alive(reporter: TxReporter) {
let manager = Manager::new(&*SERVICE_NAME, &*SERVICE_INSTANCE, reporter);

let mut props = Properties::new();
props.insert_os_info();
props.update(Properties::KEY_LANGUAGE, "php");
props.update(Properties::KEY_PROCESS_NO, unsafe {
libc::getppid().to_string()
});
debug!(?props, "Report instance properties");

manager.report_properties(props);

manager.keep_alive(Duration::from_secs(10));
}

0 comments on commit 384564f

Please # to comment.