Skip to content

Commit

Permalink
clickhouse logging
Browse files Browse the repository at this point in the history
  • Loading branch information
blind-oracle committed May 17, 2024
1 parent 90dfa51 commit c53d4ca
Show file tree
Hide file tree
Showing 12 changed files with 334 additions and 58 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ bytes = "1.5"
candid = "0.10"
clap = { version = "4.5", features = ["derive", "string"] }
clap_derive = "4.5"
clickhouse = { version = "0.11", features = ["uuid", "time"] }
ctrlc = { version = "3.4", features = ["termination"] }
# cloudflare v0.11 is broken, master is fixed but unreleased yet.
# see https://github.com/cloudflare/cloudflare-rs/issues/222
Expand All @@ -32,6 +33,7 @@ hickory-resolver = { version = "0.24", features = [
"webpki-roots",
"dnssec-ring",
] }
hostname = "0.4"
http = "1.1"
http-body = "1.0"
http-body-util = "0.1"
Expand Down Expand Up @@ -74,6 +76,7 @@ strum = "0.26"
strum_macros = "0.26"
sync_wrapper = "1.0"
thiserror = "1.0"
time = { version = "0.3", features = ["macros", "serde"] }
tempfile = "3.10"
tokio = { version = "1.36", features = ["full"] }
tokio-util = { version = "0.7", features = ["full"] }
Expand Down
47 changes: 46 additions & 1 deletion src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ pub struct Acme {
#[clap(long = "acme-dns-backend")]
pub acme_dns_backend: Option<acme::dns::DnsBackend>,

/// File from which to read API token if DNS backend is Cloudflare
/// Cloudflare API URL
#[clap(
long = "acme-dns-cloudflare-url",
default_value = "https://api.cloudflare.com/client/v4/"
Expand Down Expand Up @@ -257,22 +257,67 @@ pub struct Log {
/// Maximum logging level
#[clap(long = "log-level", default_value = "info")]
pub log_level: tracing::Level,

/// Enables logging to stdout
#[clap(long = "log-stdout")]
pub log_stdout: bool,

/// Enables logging to stdout in JSON
#[clap(long = "log-stdout-json")]
pub log_stdout_json: bool,

/// Enables logging to Journald
#[clap(long = "log-journald")]
pub log_journald: bool,

/// Enables logging to /dev/null (to benchmark logging)
#[clap(long = "log-null")]
pub log_null: bool,

#[command(flatten, next_help_heading = "Clickhouse")]
pub clickhouse: Clickhouse,
}

#[derive(Args, Clone)]
pub struct Clickhouse {
/// Setting this enables logging of HTTP requests to Clickhouse DB
#[clap(long = "log-clickhouse-url")]
pub log_clickhouse_url: Option<Url>,

/// Clickhouse username
#[clap(long = "log-clickhouse-user")]
pub log_clickhouse_user: Option<String>,

/// Clickhouse password
#[clap(long = "log-clickhouse-pass")]
pub log_clickhouse_pass: Option<String>,

/// Clickhouse database
#[clap(long = "log-clickhouse-db")]
pub log_clickhouse_db: Option<String>,

/// Clickhouse table
#[clap(long = "log-clickhouse-table")]
pub log_clickhouse_table: Option<String>,

/// Clickhouse batch size
#[clap(long = "log-clickhouse-batch", default_value = "250000")]
pub log_clickhouse_batch: u64,

/// Clickhouse flush interval
#[clap(long = "log-clickhouse-interval", value_parser = parse_duration, default_value = "5s")]
pub log_clickhouse_interval: Duration,
}

#[derive(Args)]
pub struct Misc {
/// Environment we run in to specify in logs
#[clap(long = "env", default_value = "dev")]
pub env: String,
/// Local hostname to identify in e.g. logs.
/// If not specified - tries to obtain it.
#[clap(long = "hostname", default_value = hostname::get().unwrap().into_string().unwrap())]
pub hostname: String,
/// Path to a GeoIP database
#[clap(long = "geoip-db")]
pub geoip_db: Option<PathBuf>,
Expand Down
36 changes: 21 additions & 15 deletions src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,15 @@ use std::sync::Arc;

use anyhow::{anyhow, Context, Error};
use prometheus::Registry;

use tokio_util::sync::CancellationToken;
use tracing::warn;

use crate::{
cli::Cli,
http, metrics,
routing::{
self,
canister::{CanisterResolver, ResolvesCanister},
},
http, log, metrics,
routing::{self, canister::CanisterResolver},
tasks::TaskManager,
tls::{
self,
cert::{LooksupCustomDomain, Storage},
},
tls::{self, cert::Storage},
};

pub const SERVICE_NAME: &str = "ic_gateway";
Expand All @@ -38,7 +31,7 @@ pub async fn main(cli: &Cli) -> Result<(), Error> {
// Install crypto-provider
rustls::crypto::aws_lc_rs::default_provider()
.install_default()
.expect("unable to install rustls crypto provider");
.map_err(|_| anyhow!("unable to install Rustls crypto provider"))?;

// Prepare some general stuff
let token = CancellationToken::new();
Expand All @@ -48,6 +41,14 @@ pub async fn main(cli: &Cli) -> Result<(), Error> {
(&cli.http_client).into(),
dns_resolver.clone(),
)?);
let clickhouse = if cli.log.clickhouse.log_clickhouse_url.is_some() {
Some(Arc::new(
log::clickhouse::Clickhouse::new(&cli.log.clickhouse)
.context("unable to init Clickhouse")?,
))
} else {
None
};

// List of cancellable tasks to execute & track
let mut tasks = TaskManager::new();
Expand All @@ -64,7 +65,7 @@ pub async fn main(cli: &Cli) -> Result<(), Error> {
let canister_resolver = CanisterResolver::new(
domains.clone(),
cli.domain.canister_aliases.clone(),
storage.clone() as Arc<dyn LooksupCustomDomain>,
storage.clone(),
)?;

// Create a router
Expand All @@ -73,7 +74,8 @@ pub async fn main(cli: &Cli) -> Result<(), Error> {
&mut tasks,
http_client.clone(),
&registry,
Arc::new(canister_resolver) as Arc<dyn ResolvesCanister>,
Arc::new(canister_resolver),
clickhouse.clone(),
)?;

// Set up HTTP
Expand All @@ -91,8 +93,7 @@ pub async fn main(cli: &Cli) -> Result<(), Error> {
&mut tasks,
domains,
http_client.clone(),
storage.clone(),
storage.clone(),
storage,
Arc::new(dns_resolver),
)
.await
Expand Down Expand Up @@ -128,5 +129,10 @@ pub async fn main(cli: &Cli) -> Result<(), Error> {
warn!("Shutdown signal received, cleaning up");
tasks.stop().await;

// Clickhouse should stop last to ensure that all requests are finished & flushed
if let Some(v) = clickhouse {
v.stop().await;
}

Ok(())
}
147 changes: 147 additions & 0 deletions src/log/clickhouse.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
use std::time::Duration;

use anyhow::{anyhow, Context, Error};
use clickhouse::{inserter::Inserter, Client};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use tracing::{debug, error, warn};

use crate::cli;

#[derive(clickhouse::Row, Serialize, Deserialize)]
pub struct Row {
pub env: String,
pub hostname: String,
#[serde(with = "clickhouse::serde::time::datetime")]
pub date: time::OffsetDateTime,
#[serde(with = "clickhouse::serde::uuid")]
pub request_id: uuid::Uuid,
pub method: String,
pub http_version: String,
pub status: u16,
pub domain: String,
pub host: String,
pub path: String,
pub canister_id: String,
pub error_cause: String,
pub tls_version: String,
pub tls_cipher: String,
pub request_size: u64,
pub response_size: u64,
pub duration: f64,
pub duration_full: f64,
pub duration_conn: f64,
}

pub struct Clickhouse {
token: CancellationToken,
tracker: TaskTracker,
tx: Sender<Row>,
}

impl Clickhouse {
pub fn new(cli: &cli::Clickhouse) -> Result<Self, Error> {
let (tx, rx) = channel(65536);
let token = CancellationToken::new();
let actor = ClickhouseActor::new(cli.clone(), rx)?;

let child_token = token.child_token();
let tracker = TaskTracker::new();
tracker.spawn(async move {
if let Err(e) = actor.run(child_token).await {
error!("Clickhouse: error during run: {e}");
}
});

Ok(Self { tx, tracker, token })
}

pub fn send(&self, r: Row) {
// If it fails we'll lose the message, but it's better than to block & eat memory.
let _ = self.tx.try_send(r);
}

pub async fn stop(&self) {
self.token.cancel();
self.tracker.close();
self.tracker.wait().await;
}
}

pub struct ClickhouseActor {
inserter: Inserter<Row>,
rx: Receiver<Row>,
}

impl ClickhouseActor {
pub fn new(c: cli::Clickhouse, rx: Receiver<Row>) -> Result<Self, Error> {
let mut client = Client::default().with_url(
c.log_clickhouse_url
.ok_or_else(|| anyhow!("no URL specified"))?,
);
if let Some(v) = c.log_clickhouse_user {
client = client.with_user(v);
}
if let Some(v) = c.log_clickhouse_pass {
client = client.with_password(v);
}
if let Some(v) = c.log_clickhouse_db {
client = client.with_database(v);
}

let inserter = client
.inserter(
&c.log_clickhouse_table
.ok_or_else(|| anyhow!("no table specified"))?,
)?
.with_max_entries(c.log_clickhouse_batch)
.with_period(Some(c.log_clickhouse_interval))
.with_period_bias(0.1); // add 10% random variance to interval

Ok(Self { inserter, rx })
}

async fn run(mut self, token: CancellationToken) -> Result<(), Error> {
let mut interval = tokio::time::interval(Duration::from_secs(1));

warn!("Clickhouse: started");
loop {
tokio::select! {
biased;

() = token.cancelled() => {
// Close the channel
self.rx.close();

// Drain remaining rows
while let Some(v) = self.rx.recv().await {
self.inserter.write(&v).await.context("unable insert row")?;
}

// Flush the buffer
self.inserter.end().await.context("unable to flush buffer")?;
warn!("Clickhouse: stopped");
return Ok(());
},

// Periodically poke inserter to commit if time has come.
// If the thresholds are not reached - it doesn't do anything.
_ = interval.tick() => {
match self.inserter.commit().await {
Ok(v) => debug!("Clickhouse: {} rows inserted", v.entries),
Err(e) => error!("Clickhouse: unable to commit: {e}"),
}
}

row = self.rx.recv() => {
if let Some(v) = row {
if let Err(e) = self.inserter.write(&v).await {
error!("Clickhouse: unable to insert row: {e}");
}
}
}
}
}
}
}
2 changes: 2 additions & 0 deletions src/log.rs → src/log/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub mod clickhouse;

use std::time::{SystemTime, UNIX_EPOCH};

use anyhow::{Context, Error};
Expand Down
4 changes: 3 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use anyhow::{Context, Error};
use clap::Parser;
use jemallocator::Jemalloc;
use tracing::warn;

use crate::cli::Cli;

Expand All @@ -25,7 +26,8 @@ static GLOBAL: Jemalloc = Jemalloc;
#[tokio::main]
async fn main() -> Result<(), Error> {
let cli = Cli::parse();

log::setup_logging(&cli.log).context("unable to setup logging")?;
warn!("Env: {}, Hostname: {}", cli.misc.env, cli.misc.hostname);

core::main(&cli).await
}
Loading

0 comments on commit c53d4ca

Please # to comment.