Skip to content

Commit

Permalink
remove the concept of target list
Browse files Browse the repository at this point in the history
  • Loading branch information
matthieugouel committed Feb 8, 2025
1 parent 6425afa commit f528eb2
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 177 deletions.
22 changes: 5 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,37 +9,25 @@
## Architecture

Right now the probing system is composed of two main components: the **client** and the **agent**. Those components send and receive messages from Kafka topics, respectively. The results are stored in a ClickHouse table.
The probing system is composed of two main components: the **client** and the **agent**. Those components send and receive messages from Kafka topics, respectively. The measurements results can be handled in any way, such as storing them in a ClickHouse database.

Check the [testbed](testbed/README.md) for a quick setup to test the system.

### Agent

The agent performs the measurements. It listens for measurements to be made from a Kafka topic, performs the measurements, and then produces the results to another Kafka topic. The results will eventually be inserted into a ClickHouse table.
The agent performs the measurements. It is always listening for results. It consumes probes to send from Kafka topic, performs the measurements, and then produces the results to another Kafka topic.

```sh
samiris agent --config=saimiris.yml
```

### Client

The client is the agent that sends the measurements to the agent. It sends a message to a Kafka topic, which represents a set of probes to be sent consecutively. A measurement can be composed of multiple messages.
The client is the agent that sends the measurements to the agent. It sends messages to a Kafka topic, which represents a set of probes to be sent consecutively. A measurement can be composed of multiple messages.


```sh
samiris client --config=saimiris.yml <comma-separated-agent-id> <target>
cat probes.txt | samiris client --config=saimiris.yml <comma-separated-agent-ids>
```

A target is a network to probe. It must follow this format:

```
network,protocol,min_ttl,max_ttl,n_flows
```

where:
- the network is a IPv4/IPv6 prefix.
- the prococol can be `icmp` or `udp`.
- the min_ttl and max_ttl are the minimum and maximum TTL values to probe.
- the n_flows is the number of flows to use.


The probes to send are in the [caracal](https://dioptra-io.github.io/caracal/usage/) format.
7 changes: 5 additions & 2 deletions src/agent/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::agent::receiver::ReceiveLoop;
use crate::agent::sender::send;
use crate::auth::{KafkaAuth, SaslAuth};
use crate::config::AppConfig;
use crate::probe::decode_probes;
use crate::probe::decode_probe;
use crate::utils::test_id;

pub async fn handle(config: &AppConfig) -> Result<()> {
Expand Down Expand Up @@ -101,7 +101,10 @@ pub async fn handle(config: &AppConfig) -> Result<()> {
}

// Decode probes
let probes_to_send = decode_probes(probes)?;
let probes_to_send = probes
.split('\n')
.map(|probe| decode_probe(probe))
.collect::<Result<Vec<_>>>()?;

// Probing
let config_clone = config.clone();
Expand Down
63 changes: 0 additions & 63 deletions src/client/generate.rs

This file was deleted.

17 changes: 10 additions & 7 deletions src/client/handler.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use anyhow::Result;
use log::trace;
use std::io::{stdin, BufRead};

use crate::auth::{KafkaAuth, SaslAuth};
use crate::client::generate::generate_probes;
use crate::client::producer::produce;
use crate::client::target::decode_target;
use crate::config::AppConfig;
use crate::probe::decode_probe;

pub async fn handle(config: &AppConfig, agents: &str, target: &str) -> Result<()> {
pub async fn handle(config: &AppConfig, agents: &str) -> Result<()> {
trace!("Client handler");
trace!("{:?}", config);

Expand All @@ -26,13 +26,16 @@ pub async fn handle(config: &AppConfig, agents: &str, target: &str) -> Result<()
}
};

// Get probes from stdin
let mut probes = Vec::new();
for line in stdin().lock().lines() {
let probe = line?;
probes.push(decode_probe(&probe)?);
}

// Split agents
let agents = agents.split(',').collect::<Vec<&str>>();

// Probe Generation
let target = decode_target(target)?;
let probes = generate_probes(&target)?;

produce(config, auth, agents, probes).await;

Ok(())
Expand Down
2 changes: 0 additions & 2 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
mod generate;
mod handler;
mod producer;
mod target;

pub use handler::handle;
26 changes: 16 additions & 10 deletions src/client/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ use crate::auth::KafkaAuth;
use crate::config::AppConfig;
use crate::probe::encode_probe;

// TODO
// - check target format

pub async fn produce(config: &AppConfig, auth: KafkaAuth, agents: Vec<&str>, probes: Vec<Probe>) {
let producer: &FutureProducer = match auth {
KafkaAuth::PlainText => &ClientConfig::new()
Expand All @@ -31,7 +28,6 @@ pub async fn produce(config: &AppConfig, auth: KafkaAuth, agents: Vec<&str>, pro
};

let topic = config.kafka.in_topics.split(',').collect::<Vec<&str>>()[0];
info!("Producing {} probes to {}", probes.len(), topic);

// Construct headers
let mut headers = OwnedHeaders::new();
Expand All @@ -42,22 +38,32 @@ pub async fn produce(config: &AppConfig, auth: KafkaAuth, agents: Vec<&str>, pro
});
}

// Bucket probes into Kafka messages
// Place probes into Kafka messages
let mut messages = Vec::new();
let mut current_message = String::new();
for probe in probes {
for (i, probe) in probes.iter().enumerate() {
// Format probe
let probe_str = encode_probe(&probe);
let probe_str = encode_probe(probe);
// Max message size is 1048576 bytes
if current_message.len() + probe_str.len() > 1048576 {
if current_message.len() + probe_str.len() + 1 > 1048576 {
messages.push(current_message);
current_message = String::new();
}
current_message.push_str(&probe_str);
if i < probes.len() - 1 {
current_message.push_str("\n");
}
}
if !current_message.is_empty() {
messages.push(current_message);
}
messages.push(current_message);

info!("Sending {} messages", messages.len());
info!(
"topic={},messages={},probes={}",
topic,
messages.len(),
probes.len(),
);

// Send to Kafka
for message in messages {
Expand Down
43 changes: 0 additions & 43 deletions src/client/target.rs

This file was deleted.

21 changes: 9 additions & 12 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ mod utils;

use anyhow::Result;
use chrono::Local;
use clap::{Args, Parser, Subcommand};
use clap::{Args, CommandFactory, Parser, Subcommand};
use clap_verbosity_flag::{InfoLevel, Verbosity};
use env_logger::Builder;
use log::error;
use std::io::Write;
use std::io::{stdin, IsTerminal, Write};

use crate::config::app_config;

Expand Down Expand Up @@ -41,10 +41,6 @@ enum Command {
/// Agent IDs (comma separated)
#[arg(index = 1)]
agents: String,

/// Target (eg., 2606:4700:4700::1111/128,icmp,1,32,1)
#[arg(index = 2)]
target: String,
},
}

Expand Down Expand Up @@ -83,13 +79,14 @@ async fn main() -> Result<()> {
Err(e) => error!("Error: {}", e),
}
}
Command::Client {
config,
agents,
target,
} => {
Command::Client { config, agents } => {
if stdin().is_terminal() {
App::command().print_help().unwrap();
::std::process::exit(2);
}

let app_config = app_config(&config);
match client::handle(&app_config, &agents, &target).await {
match client::handle(&app_config, &agents).await {
Ok(_) => (),
Err(e) => error!("Error: {}", e),
}
Expand Down
30 changes: 12 additions & 18 deletions src/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub fn decode_protocol(protocol: &str) -> Result<caracat::models::L4> {

pub fn encode_probe(probe: &Probe) -> String {
format!(
"{},{},{},{},{}\n",
"{},{},{},{},{}",
probe.dst_addr,
probe.src_port,
probe.dst_port,
Expand All @@ -29,23 +29,17 @@ pub fn encode_probe(probe: &Probe) -> String {
)
}

pub fn decode_probes(probes: &str) -> Result<Vec<Probe>> {
let mut decoded_probes = vec![];

for probe in probes.lines() {
let fields: Vec<&str> = probe.split(',').collect();
if fields.len() != 5 {
return Err(anyhow::anyhow!("Invalid probe format: {}", probe));
}

decoded_probes.push(Probe {
dst_addr: fields[0].parse()?,
src_port: fields[1].parse()?,
dst_port: fields[2].parse()?,
ttl: fields[3].parse()?,
protocol: decode_protocol(fields[4])?,
});
pub fn decode_probe(probe: &str) -> Result<Probe> {
let fields: Vec<&str> = probe.split(',').collect();
if fields.len() != 5 {
return Err(anyhow::anyhow!("Invalid probe format: {}", probe));
}

Ok(decoded_probes)
Ok(Probe {
dst_addr: fields[0].parse()?,
src_port: fields[1].parse()?,
dst_port: fields[2].parse()?,
ttl: fields[3].parse()?,
protocol: decode_protocol(fields[4])?,
})
}
4 changes: 1 addition & 3 deletions testbed/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ Docker Compose setup to facilitate the tests of Saimiris.

The testbed consists in a Redpanda and ClickHouse instance. Required ClickHouse [tables](config/clickhouse/docker-entrypoint-initdb.d/init.sql) are created on startup. The `saimiris.from_kafka` table is using the ClickHouse [Kafka engine](https://clickhouse.com/docs/en/engines/table-engines/integrations/kafka) to fetch the results from Redpanda. The `saimiris.results` table is used to store the results.

As an example, Redpanda is configured with SASL authentication, and uses the default Saimiris SASL credentials.

## Usage

* Start the testbed
Expand All @@ -23,7 +21,7 @@ cargo run -- agent --config=testbed/config/saimiris/saimiris.yml
* Run Saimiris Client (from the root of the repository)

```sh
cargo run -- client --config=testbed/config/saimiris/saimiris.yml wbmwwp9vna 2606:4700:4700::1111/128,icmp,1,32,1
cat testbed/probes.txt | cargo run -- client --config=testbed/config/saimiris/saimiris.yml wbmwwp9vna
```

* Stop the testbed
Expand Down
Loading

0 comments on commit f528eb2

Please # to comment.