Skip to content

Commit

Permalink
chore(sdk): update SendMessages Display, update deps
Browse files Browse the repository at this point in the history
This commit modifies the `Display` implementation for the
`SendMessages` struct to include batch length and batch size
in the output instead of raw payload data of message, so
stdout isn't overflown with useless data.
  • Loading branch information
hubcio committed Feb 17, 2025
1 parent f16afc8 commit 5874a85
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 46 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/test_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ jobs:
- name: Build binary
run: cargo build --verbose --target aarch64-apple-darwin

- name: Run cargo clippy
run: cargo clippy --all-targets --all-features -- -D warnings

- name: Run tests
run: cargo test --verbose --target aarch64-apple-darwin

Expand Down
45 changes: 27 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ atomic-time = "0.1.5"
bytes = "1.10.0"
charming = "0.4.0"
chrono = "0.4.39"
clap = { version = "4.5.29", features = ["derive"] }
clap = { version = "4.5.30", features = ["derive"] }
figlet-rs = "0.1.5"
futures = "0.3.31"
hostname = "0.4.0"
Expand Down
4 changes: 2 additions & 2 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ login-session = ["dep:keyring"]
[dependencies]
ahash = { version = "0.8.11", features = ["serde"] }
anyhow = "1.0.95"
clap = { version = "4.5.29", features = ["derive"] }
clap_complete = "4.5.44"
clap = { version = "4.5.30", features = ["derive"] }
clap_complete = "4.5.45"
figlet-rs = "0.1.5"
iggy = { path = "../sdk", features = ["iggy-cli"], version = "0.6.90" }
keyring = { version = "3.6.1", features = [
Expand Down
2 changes: 1 addition & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ path = "src/stream-builder/stream-producer-config/main.rs"
ahash = { version = "0.8.11", features = ["serde"] }
anyhow = "1.0.95"
bytes = "1.10.0"
clap = { version = "4.5.29", features = ["derive"] }
clap = { version = "4.5.30", features = ["derive"] }
futures-util = "0.3.31"
iggy = { path = "../sdk" }
rand = "0.9.0"
Expand Down
4 changes: 2 additions & 2 deletions integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ assert_cmd = "2.0.16"
async-trait = "0.1.86"
bytes = "1.10.0"
chrono = "0.4.39"
ctor = "0.3.3"
ctor = "0.3.4"
derive_more = "2.0.1"
env_logger = "0.11.6"
futures = "0.3.31"
Expand All @@ -24,7 +24,7 @@ predicates = "3.1.3"
regex = "1.11.1"
serial_test = "3.2.0"
server = { path = "../server" }
tempfile = "3.16.0"
tempfile = "3.17.1"
tokio = { version = "1.43.0", features = ["full"] }
tracing-subscriber = { version = "0.3.19", features = ["fmt", "env-filter"] }
twox-hash = { version = "2.1.0", features = ["xxhash32"] }
Expand Down
4 changes: 2 additions & 2 deletions sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ byte-unit = { version = "5.1.6", default-features = false, features = [
] }
bytes = "1.10.0"
chrono = { version = "0.4.39" }
clap = { version = "4.5.29", features = ["derive"] }
clap = { version = "4.5.30", features = ["derive"] }
comfy-table = { version = "7.1.4", optional = true }
crc32fast = "1.4.2"
dashmap = "6.1.0"
Expand All @@ -54,7 +54,7 @@ rustls = { version = "0.23.23", features = ["ring"] }
serde = { version = "1.0.217", features = ["derive", "rc"] }
serde_json = "1.0.138"
serde_with = { version = "3.12.0", features = ["base64"] }
strum = { version = "0.27.0", features = ["derive"] }
strum = { version = "0.27.1", features = ["derive"] }
thiserror = "2.0.11"
tokio = { version = "1.43.0", features = ["full"] }
tokio-rustls = { version = "0.26.1" }
Expand Down
8 changes: 4 additions & 4 deletions sdk/src/messages/send_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,15 +521,15 @@ impl Display for SendMessages {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}|{}|{}|{}",
"{}|{}|{}|batch_len:{}|batch_size:{}",
self.stream_id,
self.topic_id,
self.partitioning,
self.messages.len(),
self.messages
.iter()
.map(std::string::ToString::to_string)
.collect::<Vec<String>>()
.join("|")
.map(Message::get_size_bytes)
.sum::<IggyByteSize>(),
)
}
}
Expand Down
8 changes: 4 additions & 4 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ bincode = "1.3.3"
blake3 = "1.5.5"
bytes = "1.10.0"
chrono = "0.4.39"
clap = { version = "4.5.29", features = ["derive"] }
clap = { version = "4.5.30", features = ["derive"] }
console-subscriber = { version = "0.4.1", optional = true }
dashmap = "6.1.0"
derive_more = "2.0.1"
Expand All @@ -46,7 +46,7 @@ jsonwebtoken = "9.3.1"
mimalloc = { version = "0.1", optional = true }
moka = { version = "0.12.10", features = ["future"] }
nix = { version = "0.29", features = ["fs"] }
openssl = { version = "0.10.70", features = ["vendored"] }
openssl = { version = "0.10.71", features = ["vendored"] }
opentelemetry = { version = "0.28.0", features = ["trace", "logs"] }
opentelemetry-appender-tracing = { version = "0.28.1", features = ["log"] }
opentelemetry-otlp = { version = "0.28.0", features = [
Expand Down Expand Up @@ -80,9 +80,9 @@ serde = { version = "1.0.217", features = ["derive", "rc"] }
serde_json = "1.0.138"
serde_with = { version = "3.12.0", features = ["base64", "macros"] }
static-toml = "1.3.0"
strum = { version = "0.27.0", features = ["derive"] }
strum = { version = "0.27.1", features = ["derive"] }
sysinfo = "0.33.1"
tempfile = "3.16"
tempfile = "3.17"
thiserror = "2.0.11"
tokio = { version = "1.43.0", features = ["full"] }
tokio-native-tls = "0.3.1"
Expand Down
27 changes: 16 additions & 11 deletions server/src/streaming/segments/logs/log_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use error_set::ErrContext;
use iggy::{error::IggyError, utils::byte_size::IggyByteSize};
use std::{
fs::{File, OpenOptions},
os::{fd::AsRawFd, unix::prelude::FileExt},
os::unix::prelude::FileExt,
};
use std::{
io::ErrorKind,
Expand Down Expand Up @@ -38,19 +38,24 @@ impl SegmentLogReader {
.open(file_path)
.with_error_context(|e| format!("Failed to open log file: {file_path}, error: {e}"))
.map_err(|_| IggyError::CannotReadFile)?;
let fd = file.as_raw_fd();

// posix_fadvise() doesn't exist on MacOS
#[cfg(not(target_os = "macos"))]
let _ = nix::fcntl::posix_fadvise(
fd,
0,
0,
nix::fcntl::PosixFadviseAdvice::POSIX_FADV_SEQUENTIAL,
)
.with_info_context(|e| {
format!("Failed to set sequential access pattern on log file: {file_path}, error: {e}")
});
{
use std::os::unix::io::AsRawFd;
let fd = file.as_raw_fd();
let _ = nix::fcntl::posix_fadvise(
fd,
0,
0,
nix::fcntl::PosixFadviseAdvice::POSIX_FADV_SEQUENTIAL,
)
.with_info_context(|e| {
format!(
"Failed to set sequential access pattern on log file: {file_path}, error: {e}"
)
});
}

let actual_log_size = file
.metadata()
Expand Down
2 changes: 1 addition & 1 deletion tools/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ path = "src/data-seeder/main.rs"

[dependencies]
anyhow = "1.0.95"
clap = { version = "4.5.29", features = ["derive"] }
clap = { version = "4.5.30", features = ["derive"] }
iggy = { path = "../sdk" }
rand = "0.9.0"
tokio = { version = "1.43.0", features = ["full"] }
Expand Down

0 comments on commit 5874a85

Please # to comment.