diff --git a/Cargo.lock b/Cargo.lock index 9a592ff44..5325dc803 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3043,6 +3043,18 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nix" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" +dependencies = [ + "bitflags 2.8.0", + "cfg-if", + "cfg_aliases", + "libc", +] + [[package]] name = "nom" version = "7.1.3" @@ -4591,7 +4603,7 @@ dependencies = [ [[package]] name = "server" -version = "0.4.203" +version = "0.4.204" dependencies = [ "ahash 0.8.11", "anyhow", @@ -4620,6 +4632,7 @@ dependencies = [ "mimalloc", "mockall", "moka", + "nix", "openssl", "opentelemetry", "opentelemetry-appender-tracing", diff --git a/server/Cargo.toml b/server/Cargo.toml index de0f58c8d..0f2df4291 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.4.203" +version = "0.4.204" edition = "2021" build = "src/build.rs" license = "Apache-2.0" @@ -45,6 +45,7 @@ iggy = { path = "../sdk" } jsonwebtoken = "9.3.1" mimalloc = { version = "0.1", optional = true } moka = { version = "0.12.10", features = ["future"] } +nix = { version = "0.29", features = ["fs"] } openssl = { version = "0.10.70", features = ["vendored"] } opentelemetry = { version = "0.28.0", features = ["trace", "logs"] } opentelemetry-appender-tracing = { version = "0.28.1", features = ["log"] } diff --git a/server/src/compat/index_rebuilding/index_rebuilder.rs b/server/src/compat/index_rebuilding/index_rebuilder.rs index b7c632526..ce24f72ca 100644 --- a/server/src/compat/index_rebuilding/index_rebuilder.rs +++ b/server/src/compat/index_rebuilding/index_rebuilder.rs @@ -1,6 +1,6 @@ use crate::streaming::utils::file; use crate::{ - server_error::CompatError, streaming::batching::message_batch::RETAINED_BATCH_OVERHEAD, + server_error::CompatError, streaming::batching::message_batch::RETAINED_BATCH_HEADER_LEN, }; use std::io::SeekFrom; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, BufWriter}; @@ -73,7 +73,7 @@ impl IndexRebuilder { match Self::read_batch_header(&mut reader).await { Ok(header) => { // Calculate next position before writing current entry - next_position = position + RETAINED_BATCH_OVERHEAD as u32 + header.length; + next_position = position + RETAINED_BATCH_HEADER_LEN as u32 + header.length; // Write index entry using current position Self::write_index_entry(&mut writer, &header, position, self.start_offset) diff --git a/server/src/streaming/batching/batch_accumulator.rs b/server/src/streaming/batching/batch_accumulator.rs index eb9b6f25c..4af306be5 100644 --- a/server/src/streaming/batching/batch_accumulator.rs +++ b/server/src/streaming/batching/batch_accumulator.rs @@ -1,4 +1,4 @@ -use super::message_batch::{RetainedMessageBatch, RETAINED_BATCH_OVERHEAD}; +use super::message_batch::{RetainedMessageBatch, RETAINED_BATCH_HEADER_LEN}; use crate::streaming::local_sizeable::LocalSizeable; use crate::streaming::models::messages::RetainedMessage; use bytes::BytesMut; @@ -109,6 +109,6 @@ impl BatchAccumulator { impl Sizeable for BatchAccumulator { fn get_size_bytes(&self) -> IggyByteSize { - self.current_size + RETAINED_BATCH_OVERHEAD.into() + self.current_size + RETAINED_BATCH_HEADER_LEN.into() } } diff --git a/server/src/streaming/batching/message_batch.rs b/server/src/streaming/batching/message_batch.rs index 7a7155e4d..29ab21197 100644 --- a/server/src/streaming/batching/message_batch.rs +++ b/server/src/streaming/batching/message_batch.rs @@ -4,7 +4,7 @@ use crate::streaming::models::messages::RetainedMessage; use bytes::Bytes; use iggy::utils::{byte_size::IggyByteSize, sizeable::Sizeable}; -pub const RETAINED_BATCH_OVERHEAD: u64 = 8 + 8 + 4 + 4; +pub const RETAINED_BATCH_HEADER_LEN: u64 = 8 + 8 + 4 + 4; #[derive(Debug)] pub struct RetainedMessageBatch { @@ -81,6 +81,6 @@ where impl Sizeable for RetainedMessageBatch { fn get_size_bytes(&self) -> IggyByteSize { - self.length + RETAINED_BATCH_OVERHEAD.into() + self.length + RETAINED_BATCH_HEADER_LEN.into() } } diff --git a/server/src/streaming/segments/indexes/index_reader.rs b/server/src/streaming/segments/indexes/index_reader.rs index 478b0bf0c..a6bd1eb69 100644 --- a/server/src/streaming/segments/indexes/index_reader.rs +++ b/server/src/streaming/segments/indexes/index_reader.rs @@ -4,22 +4,23 @@ use super::{ }; use error_set::ErrContext; use iggy::error::IggyError; -use std::sync::{ - atomic::{AtomicU64, Ordering}, - Arc, -}; -use tokio::{ +use std::{ fs::{File, OpenOptions}, - io::{AsyncReadExt, AsyncSeekExt, SeekFrom}, - sync::RwLock, + io::ErrorKind, + os::unix::fs::FileExt, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, }; +use tokio::task::spawn_blocking; use tracing::{error, trace, warn}; /// A dedicated struct for reading from the index file. #[derive(Debug)] pub struct SegmentIndexReader { file_path: String, - file: RwLock>, + file: Arc, index_size_bytes: Arc, } @@ -29,12 +30,11 @@ impl SegmentIndexReader { let file = OpenOptions::new() .read(true) .open(file_path) - .await + .with_error_context(|e| format!("Failed to open index file: {file_path}, error: {e}")) .map_err(|_| IggyError::CannotReadFile)?; let actual_index_size = file .metadata() - .await .with_error_context(|e| { format!("Failed to get metadata of index file: {file_path}, error: {e}") }) @@ -46,60 +46,38 @@ impl SegmentIndexReader { trace!("Opened index file for reading: {file_path}, size: {actual_index_size}",); Ok(Self { file_path: file_path.to_string(), - file: RwLock::new(Some(file)), + file: Arc::new(file), index_size_bytes, }) } /// Loads all indexes from the index file. pub async fn load_all_indexes_impl(&self) -> Result, IggyError> { - let file_size = self.index_size_bytes.load(Ordering::Acquire); + let file_size = self.file_size(); if file_size == 0 { warn!("Index {} file is empty.", self.file_path); return Ok(Vec::new()); } - let mut file = self.file.write().await; - let file = file - .as_mut() - .unwrap_or_else(|| panic!("File {} should be open", self.file_path)); - - file.seek(SeekFrom::Start(0)) - .await - .with_error_context(|e| format!("Failed to seek to start of index file: {e}")) - .map_err(|_| IggyError::CannotReadIndexOffset)?; - let mut buf = vec![0u8; file_size as usize]; - file.read_exact(&mut buf) - .await - .with_error_context(|e| format!("Failed to read index file {}: {e}", self.file_path)) - .map_err(|_| IggyError::CannotReadIndexOffset)?; - let indexes_count = (file_size / INDEX_SIZE) as usize; - let mut indexes = Vec::with_capacity(indexes_count); - for chunk in buf.chunks_exact(INDEX_SIZE as usize) { - let offset = u32::from_le_bytes( - chunk[0..4] - .try_into() - .with_error_context(|e| format!("Failed to parse index offset: {e}")) - .map_err(|_| IggyError::CannotReadIndexOffset)?, - ); - let position = u32::from_le_bytes( - chunk[4..8] - .try_into() - .with_error_context(|e| format!("Failed to parse index position: {e}")) - .map_err(|_| IggyError::CannotReadIndexOffset)?, - ); - let timestamp = u64::from_le_bytes( - chunk[8..16] - .try_into() - .with_error_context(|e| format!("Failed to parse index timestamp: {e}")) - .map_err(|_| IggyError::CannotReadIndexOffset)?, - ); - let current_index = Index { - offset, - position, - timestamp, - }; - indexes.push(current_index); - } + + let buf = match self.read_at(0, file_size).await { + Ok(buf) => buf, + Err(e) if e.kind() == ErrorKind::UnexpectedEof => return Ok(Vec::new()), + Err(e) => { + error!( + "Error reading batch header at offset 0 in file {}: {e}", + self.file_path + ); + return Err(IggyError::CannotReadFile); + } + }; + + let indexes: Vec = buf + .chunks_exact(INDEX_SIZE as usize) + .map(parse_index) + .collect::, IggyError>>() + .with_error_context(|e| { + format!("Failed to parse indexes in file {}: {e}", self.file_path) + })?; if indexes.len() as u64 != file_size / INDEX_SIZE { error!( "Loaded {} indexes from disk, expected {}, file {} is probably corrupted!", @@ -125,61 +103,37 @@ impl SegmentIndexReader { ); return Ok(None); } - let file_length = self.index_size_bytes.load(Ordering::Acquire); - if file_length == 0 { + let file_size = self.file_size(); + if file_size == 0 { warn!("Index {} file is empty.", self.file_path); return Ok(None); } - trace!("Index file length: {} bytes.", file_length); + trace!("Index file length: {} bytes.", file_size); let relative_start_offset = (index_start_offset - segment_start_offset) as u32; let relative_end_offset = (index_end_offset - segment_start_offset) as u32; let mut index_range = IndexRange::default(); - let mut file = self.file.write().await; - - let file = file - .as_mut() - .unwrap_or_else(|| panic!("File {} should be open", self.file_path)); - - file.seek(SeekFrom::Start(0)) - .await - .with_error_context(|e| format!("Failed to seek to start of index file: {e}")) - .map_err(|_| IggyError::CannotSeekFile)?; - - let mut buf = vec![0u8; file_length as usize]; - file.read_exact(&mut buf) - .await - .with_error_context(|e| format!("Failed to read index file: {e}")) - .map_err(|_| IggyError::CannotReadFile)?; + let buf = match self.read_at(0, file_size).await { + Ok(buf) => buf, + Err(e) if e.kind() == ErrorKind::UnexpectedEof => return Ok(None), + Err(e) => { + error!( + "Error reading batch header at offset 0 in file {}: {e}", + self.file_path + ); + return Err(IggyError::CannotReadFile); + } + }; for chunk in buf.chunks_exact(INDEX_SIZE as usize) { - let offset = u32::from_le_bytes( - chunk[0..4] - .try_into() - .with_error_context(|e| format!("Failed to parse index offset: {e}")) - .map_err(|_| IggyError::CannotReadIndexOffset)?, - ); - let position = u32::from_le_bytes( - chunk[4..8] - .try_into() - .with_error_context(|e| format!("Failed to parse index position: {e}")) - .map_err(|_| IggyError::CannotReadIndexOffset)?, - ); - let timestamp = u64::from_le_bytes( - chunk[8..16] - .try_into() - .with_error_context(|e| format!("Failed to parse index timestamp: {e}")) - .map_err(|_| IggyError::CannotReadIndexOffset)?, - ); - let current_index = Index { - offset, - position, - timestamp, - }; - if offset >= relative_start_offset && index_range.start == Index::default() { + let current_index = parse_index(chunk) + .with_error_context(|e| format!("Failed to parse index {}: {e}", self.file_path))?; + if current_index.offset >= relative_start_offset + && index_range.start == Index::default() + { index_range.start = current_index; } - if offset >= relative_end_offset { + if current_index.offset >= relative_end_offset { index_range.end = current_index; break; } @@ -194,52 +148,26 @@ impl SegmentIndexReader { &self, timestamp: u64, ) -> Result, IggyError> { - let file_size = self.index_size_bytes.load(Ordering::Acquire); + let file_size = self.file_size(); if file_size == 0 { trace!("Index file {} is empty.", self.file_path); return Ok(Some(Index::default())); } - let mut file = self.file.write().await; - let file = file - .as_mut() - .unwrap_or_else(|| panic!("File {} should be open", self.file_path)); - - file.seek(SeekFrom::Start(0)) - .await - .with_error_context(|e| format!("Failed to seek to start of index file: {e}")) - .map_err(|_| IggyError::CannotSeekFile)?; - let mut buf = vec![0u8; file_size as usize]; - file.read_exact(&mut buf) - .await - .with_error_context(|e| format!("Failed to read index file: {e}")) - .map_err(|_| IggyError::CannotReadFile)?; - + let buf = match self.read_at(0, file_size).await { + Ok(buf) => buf, + Err(e) if e.kind() == ErrorKind::UnexpectedEof => return Ok(None), + Err(e) => { + error!( + "Error reading batch header at offset 0 in file {}: {e}", + self.file_path + ); + return Err(IggyError::CannotReadFile); + } + }; let mut last_index: Option = None; for chunk in buf.chunks_exact(INDEX_SIZE as usize) { - let offset = u32::from_le_bytes( - chunk[0..4] - .try_into() - .with_error_context(|e| format!("Failed to parse index offset: {e}")) - .map_err(|_| IggyError::CannotReadIndexOffset)?, - ); - let position = u32::from_le_bytes( - chunk[4..8] - .try_into() - .with_error_context(|e| format!("Failed to parse index position: {e}")) - .map_err(|_| IggyError::CannotReadIndexPosition)?, - ); - let time = u64::from_le_bytes( - chunk[8..16] - .try_into() - .with_error_context(|e| format!("Failed to parse index timestamp: {e}")) - .map_err(|_| IggyError::CannotReadIndexTimestamp)?, - ); - let current = Index { - offset, - position, - timestamp: time, - }; + let current = parse_index(chunk)?; if current.timestamp >= timestamp { return Ok(last_index.or(Some(current))); } @@ -247,4 +175,44 @@ impl SegmentIndexReader { } Ok(None) } + + fn file_size(&self) -> u64 { + self.index_size_bytes.load(Ordering::Acquire) + } + + async fn read_at(&self, offset: u64, len: u64) -> Result, std::io::Error> { + let file = self.file.clone(); + spawn_blocking(move || { + let mut buf = vec![0u8; len as usize]; + file.read_exact_at(&mut buf, offset)?; + Ok(buf) + }) + .await? + } +} + +fn parse_index(chunk: &[u8]) -> Result { + let offset = u32::from_le_bytes( + chunk[0..4] + .try_into() + .with_error_context(|e| format!("Failed to parse index offset: {e}")) + .map_err(|_| IggyError::CannotReadIndexOffset)?, + ); + let position = u32::from_le_bytes( + chunk[4..8] + .try_into() + .with_error_context(|e| format!("Failed to parse index position: {e}")) + .map_err(|_| IggyError::CannotReadIndexPosition)?, + ); + let timestamp = u64::from_le_bytes( + chunk[8..16] + .try_into() + .with_error_context(|e| format!("Failed to parse index timestamp: {e}")) + .map_err(|_| IggyError::CannotReadIndexTimestamp)?, + ); + Ok(Index { + offset, + position, + timestamp, + }) } diff --git a/server/src/streaming/segments/indexes/index_writer.rs b/server/src/streaming/segments/indexes/index_writer.rs index 9e1800ede..740d8f56d 100644 --- a/server/src/streaming/segments/indexes/index_writer.rs +++ b/server/src/streaming/segments/indexes/index_writer.rs @@ -8,7 +8,6 @@ use std::sync::{ use tokio::{ fs::{File, OpenOptions}, io::AsyncWriteExt, - sync::RwLock, }; use tracing::trace; @@ -16,7 +15,7 @@ use tracing::trace; #[derive(Debug)] pub struct SegmentIndexWriter { file_path: String, - file: RwLock>, + file: File, index_size_bytes: Arc, fsync: bool, } @@ -56,25 +55,22 @@ impl SegmentIndexWriter { Ok(Self { file_path: file_path.to_string(), - file: RwLock::new(Some(file)), + file, index_size_bytes, fsync, }) } /// Append the given index record to the index file. - pub async fn save_index(&self, index: Index) -> Result<(), IggyError> { + pub async fn save_index(&mut self, index: Index) -> Result<(), IggyError> { let mut buf = [0u8; INDEX_SIZE as usize]; buf[0..4].copy_from_slice(&index.offset.to_le_bytes()); buf[4..8].copy_from_slice(&index.position.to_le_bytes()); buf[8..16].copy_from_slice(&index.timestamp.to_le_bytes()); { - let mut file = self.file.write().await; - let file = file - .as_mut() - .unwrap_or_else(|| panic!("File {} should be open", self.file_path)); - file.write_all(&buf) + self.file + .write_all(&buf) .await .with_error_context(|e| { format!( @@ -93,11 +89,8 @@ impl SegmentIndexWriter { } pub async fn fsync(&self) -> Result<(), IggyError> { - let mut file = self.file.write().await; - let file = file - .as_mut() - .unwrap_or_else(|| panic!("File {} should be open", self.file_path)); - file.sync_all() + self.file + .sync_all() .await .with_error_context(|e| { format!("Failed to fsync index file: {}, error: {e}", self.file_path) diff --git a/server/src/streaming/segments/logs/log_reader.rs b/server/src/streaming/segments/logs/log_reader.rs index 2ae195c86..2eaaa8bb3 100644 --- a/server/src/streaming/segments/logs/log_reader.rs +++ b/server/src/streaming/segments/logs/log_reader.rs @@ -1,35 +1,32 @@ -use crate::{ - state::file::BUF_READER_CAPACITY_BYTES, - streaming::{ - batching::{ - iterator::IntoMessagesIterator, - message_batch::{RetainedMessageBatch, RETAINED_BATCH_OVERHEAD}, - }, - segments::indexes::IndexRange, +use crate::streaming::{ + batching::{ + iterator::IntoMessagesIterator, + message_batch::{RetainedMessageBatch, RETAINED_BATCH_HEADER_LEN}, }, + segments::indexes::IndexRange, }; use bytes::BytesMut; use error_set::ErrContext; use iggy::{error::IggyError, utils::byte_size::IggyByteSize}; use std::{ - io::SeekFrom, + fs::{File, OpenOptions}, + os::{fd::AsRawFd, unix::prelude::FileExt}, +}; +use std::{ + io::ErrorKind, sync::{ atomic::{AtomicU64, Ordering}, Arc, }, }; -use tokio::{ - fs::{File, OpenOptions}, - io::{AsyncReadExt, AsyncSeekExt, BufReader}, - sync::RwLock, -}; -use tracing::{trace, warn}; +use tokio::task::spawn_blocking; +use tracing::{error, trace, warn}; /// A dedicated struct for reading from the log file. #[derive(Debug)] pub struct SegmentLogReader { file_path: String, - file: RwLock>, + file: Arc, log_size_bytes: Arc, } @@ -39,13 +36,27 @@ impl SegmentLogReader { let file = OpenOptions::new() .read(true) .open(file_path) - .await .with_error_context(|e| format!("Failed to open log file: {file_path}, error: {e}")) .map_err(|_| IggyError::CannotReadFile)?; + let fd = file.as_raw_fd(); + + // posix_fadvise() doesn't exist on MacOS + #[cfg(not(target_os = "macos"))] + let _ = nix::fcntl::posix_fadvise( + fd, + 0, + 0, + nix::fcntl::PosixFadviseAdvice::POSIX_FADV_SEQUENTIAL, + ) + .with_info_context(|e| { + format!("Failed to set sequential access pattern on log file: {file_path}, error: {e}") + }); let actual_log_size = file .metadata() - .await + .with_error_context(|e| { + format!("Failed to get metadata of log file: {file_path}, error: {e}") + }) .map_err(|_| IggyError::CannotReadFileMetadata)? .len(); @@ -53,7 +64,7 @@ impl SegmentLogReader { Ok(Self { file_path: file_path.to_string(), - file: RwLock::new(Some(file)), + file: Arc::new(file), log_size_bytes, }) } @@ -63,41 +74,32 @@ impl SegmentLogReader { &self, index_range: &IndexRange, ) -> Result, IggyError> { - let mut file_guard = self.file.write().await; - let file = file_guard - .as_mut() - .unwrap_or_else(|| panic!("File {} should be open", self.file_path)); - - // TODO(hubcio): perhaps we could read size in below loop, - // this way if someone is writing, we can read in parallel - let file_size = self.log_size_bytes.load(Ordering::Acquire); + let mut file_size = self.file_size(); if file_size == 0 { warn!("Log file {} is empty.", self.file_path); return Ok(Vec::new()); } - file.seek(SeekFrom::Start(index_range.start.position as u64)) - .await - .map_err(|_| IggyError::CannotSeekFile)?; - - let mut reader = BufReader::with_capacity(BUF_READER_CAPACITY_BYTES, file); - let mut read_bytes = index_range.start.position as u64; + let mut offset = index_range.start.position as u64; let mut batches = Vec::new(); let mut last_batch_to_read = false; - while !last_batch_to_read { - match read_next_batch(&mut reader, &self.file_path).await? { + while !last_batch_to_read && offset < file_size { + file_size = self.file_size(); + match self.read_next_batch(offset, file_size).await? { Some((batch, bytes_read)) => { - read_bytes += bytes_read; + offset += bytes_read; let last_offset_in_batch = batch.base_offset + batch.last_offset_delta as u64; - if last_offset_in_batch >= index_range.end.offset as u64 - || read_bytes >= file_size + + if last_offset_in_batch >= index_range.end.offset as u64 || offset >= file_size { last_batch_to_read = true; } batches.push(batch); } - None => break, + None => { + break; + } } } @@ -107,41 +109,28 @@ impl SegmentLogReader { /// Loads and returns all message IDs from the log file. pub async fn load_message_ids_impl(&self) -> Result, IggyError> { - let file_size = self.log_size_bytes.load(Ordering::Acquire); - + let mut file_size = self.file_size(); if file_size == 0 { warn!("Log file {} is empty.", self.file_path); return Ok(Vec::new()); } - let mut file = self.file.write().await; - let file = file - .as_mut() - .unwrap_or_else(|| panic!("File {} should be open", self.file_path)); - - file.seek(SeekFrom::Start(0)) - .await - .with_error_context(|e| { - format!( - "Failed to seek to position 0 in file {}, error: {e}", - self.file_path - ) - }) - .map_err(|_| IggyError::CannotSeekFile)?; - - let mut reader = BufReader::with_capacity(BUF_READER_CAPACITY_BYTES, &mut *file); + let mut offset = 0_u64; let mut message_ids = Vec::new(); - let mut read_bytes = 0_u64; - while read_bytes < file_size { - match read_next_batch(&mut reader, &self.file_path).await? { + while offset < file_size { + file_size = self.file_size(); + match self.read_next_batch(offset, file_size).await? { Some((batch, bytes_read)) => { - read_bytes += bytes_read; + offset += bytes_read; for msg in batch.into_messages_iter() { message_ids.push(msg.id); } } - None => break, // reached EOF or encountered a truncated batch + None => { + // Possibly reached EOF or truncated + break; + } } } @@ -158,54 +147,30 @@ impl SegmentLogReader { where F: FnMut(RetainedMessageBatch) -> Result<(), IggyError>, { - let file_size = self.log_size_bytes.load(Ordering::Acquire); - - trace!( - "Loading message batches by index range: {} [{}] - {} [{}], file size: {}", - index_range.start.position, - index_range.start.offset, - index_range.end.position, - index_range.end.offset, - file_size - ); - + let mut file_size = self.file_size(); if file_size == 0 { warn!("Log file {} is empty.", self.file_path); return Ok(()); } - let mut file_guard = self.file.write().await; - let file = file_guard - .as_mut() - .unwrap_or_else(|| panic!("File {} should be open", self.file_path)); - - file.seek(SeekFrom::Start(index_range.start.position as u64)) - .await - .with_error_context(|e| { - format!( - "Failed to seek to position {} in file {}, error: {e}", - index_range.start.position, self.file_path - ) - }) - .map_err(|_| IggyError::CannotSeekFile)?; - - let mut reader = BufReader::with_capacity(BUF_READER_CAPACITY_BYTES, file); - let mut read_bytes = index_range.start.position as u64; + let mut offset = index_range.start.position as u64; let mut last_batch_to_read = false; - while !last_batch_to_read { - match read_next_batch(&mut reader, &self.file_path).await? { + while !last_batch_to_read && offset < file_size { + file_size = self.file_size(); + match self.read_next_batch(offset, file_size).await? { Some((batch, bytes_read)) => { - read_bytes += bytes_read; + offset += bytes_read; let last_offset_in_batch = batch.base_offset + batch.last_offset_delta as u64; - if read_bytes >= file_size - || last_offset_in_batch >= index_range.end.offset as u64 + if offset >= file_size || last_offset_in_batch >= index_range.end.offset as u64 { last_batch_to_read = true; } on_batch(batch)?; } - None => break, + None => { + break; + } } } @@ -219,111 +184,154 @@ impl SegmentLogReader { bytes_to_load: u64, mut on_batch: impl FnMut(RetainedMessageBatch) -> Result<(), IggyError>, ) -> Result<(), IggyError> { - let mut file = self.file.write().await; - let file = file - .as_mut() - .unwrap_or_else(|| panic!("File {} should be open", self.file_path)); - - let file_size = self.log_size_bytes.load(Ordering::Acquire); + let mut file_size = self.file_size(); if file_size == 0 { warn!("Log file {} is empty.", self.file_path); return Ok(()); } - file.seek(SeekFrom::Start(0)) - .await - .with_error_context(|e| { - format!("Failed to seek to the beginning of the file, error: {e}") - }) - .map_err(|_| IggyError::CannotSeekFile)?; - - let threshold = file_size.saturating_sub(bytes_to_load); + let mut offset = 0_u64; let mut accumulated_size = 0_u64; - let mut reader = BufReader::with_capacity(BUF_READER_CAPACITY_BYTES, file); - while accumulated_size < file_size { - match read_next_batch(&mut reader, &self.file_path).await? { + while offset < file_size { + file_size = self.file_size(); + let threshold = file_size.saturating_sub(bytes_to_load); + match self.read_next_batch(offset, file_size).await? { Some((batch, bytes_read)) => { if accumulated_size >= threshold { on_batch(batch)?; } + offset += bytes_read; accumulated_size += bytes_read; } - None => break, // reached EOF or encountered a truncated batch + None => { + break; + } } } Ok(()) } -} -/// Helper function that reads one batch (header and payload) from the provided BufReader. -/// It returns `Ok(Some((batch, bytes_read)))` when a full batch is read, -/// and returns `Ok(None)` when the reader cannot read a full header or payload (EOF or a truncated file). -async fn read_next_batch( - reader: &mut BufReader, - file_path: &str, -) -> Result, IggyError> -where - R: AsyncReadExt + Unpin, -{ - let mut header = [0u8; RETAINED_BATCH_OVERHEAD as usize]; - if let Err(e) = reader.read_exact(&mut header).await { - trace!("Cannot read batch header in file {file_path}, error: {e}"); - return Ok(None); - } + async fn read_next_batch( + &self, + offset: u64, + file_size: u64, + ) -> Result, IggyError> { + let batch_header_size = RETAINED_BATCH_HEADER_LEN; + if offset + batch_header_size > file_size { + return Ok(None); + } + + let header_buf = match self.read_at(offset, batch_header_size).await { + Ok(buf) => buf, + Err(e) if e.kind() == ErrorKind::UnexpectedEof => return Ok(None), + Err(e) => { + error!( + "Error reading batch header at offset {} in file {}: {e}", + offset, self.file_path + ); + return Err(IggyError::CannotReadBatchBaseOffset); + } + }; + if header_buf.len() < batch_header_size as usize { + warn!( + "Cannot read batch header at offset {} in file {}. Possibly truncated.", + offset, self.file_path + ); + return Ok(None); + } + + let batch_base_offset = u64::from_le_bytes( + header_buf[0..8] + .try_into() + .with_error_context(|e| { + format!( + "Failed to parse batch base offset at offset {offset} in file {}: {e}", + self.file_path + ) + }) + .map_err(|_| IggyError::CannotReadBatchBaseOffset)?, + ); + let batch_length = u32::from_le_bytes( + header_buf[8..12] + .try_into() + .with_error_context(|e| { + format!( + "Failed to parse batch length at offset {offset} in file {}: {e}", + self.file_path + ) + }) + .map_err(|_| IggyError::CannotReadBatchLength)?, + ); + let last_offset_delta = u32::from_le_bytes( + header_buf[12..16] + .try_into() + .with_error_context(|e| { + format!( + "Failed to parse last offset delta at offset {offset} in file {}: {e}", + self.file_path + ) + }) + .map_err(|_| IggyError::CannotReadLastOffsetDelta)?, + ); + let max_timestamp = u64::from_le_bytes( + header_buf[16..24] + .try_into() + .with_error_context(|e| { + format!( + "Failed to parse max timestamp at offset {offset} in file {}: {e}", + self.file_path + ) + }) + .map_err(|_| IggyError::CannotReadMaxTimestamp)?, + ); + + let payload_len = batch_length as usize; + let payload_offset = offset + batch_header_size; + if payload_offset + payload_len as u64 > file_size { + warn!( + "It's not possible to read the full batch payload ({} bytes) at offset {} in file {} of size {}. Possibly truncated.", + payload_len, payload_offset, self.file_path, file_size + ); + return Ok(None); + } - let batch_base_offset = u64::from_le_bytes( - header[0..8] - .try_into() - .with_error_context(|e| format!("Failed to read batch base offset, error: {e}")) - .map_err(|_| IggyError::CannotReadBatchBaseOffset)?, - ); - let batch_length = u32::from_le_bytes( - header[8..12] - .try_into() - .with_error_context(|e| format!("Failed to read batch length, error: {e}")) - .map_err(|_| IggyError::CannotReadBatchLength)?, - ); - let last_offset_delta = u32::from_le_bytes( - header[12..16] - .try_into() - .with_error_context(|e| format!("Failed to read last offset delta, error: {e}")) - .map_err(|_| IggyError::CannotReadLastOffsetDelta)?, - ); - let max_timestamp = u64::from_le_bytes( - header[16..24] - .try_into() - .with_error_context(|e| format!("Failed to read max timestamp, error: {e}")) - .map_err(|_| IggyError::CannotReadMaxTimestamp)?, - ); - - let payload_len = batch_length as usize; - let mut payload = BytesMut::with_capacity(payload_len); - unsafe { payload.set_len(payload_len) }; - if let Err(error) = reader.read_exact(&mut payload).await { - warn!( - "Cannot read batch payload for base offset: {}, delta: {}, timestamp: {}. \ - Batch length: {}, payload length: {} in file {}. Possibly truncated file, error: {:?}", + let payload_buf = match self.read_at(payload_offset, payload_len as u64).await { + Ok(buf) => buf, + Err(e) if e.kind() == ErrorKind::UnexpectedEof => return Ok(None), + Err(e) => { + error!( + "Error reading batch payload at offset {} in file {}: {e}", + payload_offset, self.file_path + ); + return Err(IggyError::CannotReadBatchPayload); + } + }; + + let bytes_read = batch_header_size + payload_len as u64; + let batch = RetainedMessageBatch::new( batch_base_offset, last_offset_delta, max_timestamp, - batch_length, - payload_len, - file_path, - error + IggyByteSize::from(payload_len as u64), + BytesMut::from(&payload_buf[..]).freeze(), ); - return Ok(None); + + Ok(Some((batch, bytes_read))) } - let bytes_read = RETAINED_BATCH_OVERHEAD + payload_len as u64; - let batch = RetainedMessageBatch::new( - batch_base_offset, - last_offset_delta, - max_timestamp, - IggyByteSize::from(payload_len as u64), - payload.freeze(), - ); + fn file_size(&self) -> u64 { + self.log_size_bytes.load(Ordering::Acquire) + } - Ok(Some((batch, bytes_read))) + async fn read_at(&self, offset: u64, len: u64) -> Result, std::io::Error> { + let file = self.file.clone(); + spawn_blocking(move || { + let mut buf = vec![0u8; len as usize]; + file.read_exact_at(&mut buf, offset)?; + Ok(buf) + }) + .await? + } } diff --git a/server/src/streaming/segments/logs/log_writer.rs b/server/src/streaming/segments/logs/log_writer.rs index 27b372505..77f625e2d 100644 --- a/server/src/streaming/segments/logs/log_writer.rs +++ b/server/src/streaming/segments/logs/log_writer.rs @@ -1,3 +1,4 @@ +use super::PersisterTask; use crate::streaming::batching::message_batch::RetainedMessageBatch; use error_set::ErrContext; use iggy::{ @@ -15,17 +16,15 @@ use std::{ use tokio::{ fs::{File, OpenOptions}, io::AsyncWriteExt, - sync::RwLock, }; use tracing::{error, trace}; -use super::PersisterTask; /// A dedicated struct for writing to the log file. #[derive(Debug)] pub struct SegmentLogWriter { file_path: String, /// Holds the file for synchronous writes; when asynchronous persistence is enabled, this will be None. - file: RwLock>, + file: Option, /// When set, asynchronous writes are handled by this persister task. persister_task: Option, log_size_bytes: Arc, @@ -68,7 +67,7 @@ impl SegmentLogWriter { trace!("Opened log file for writing: {file_path}, size: {actual_log_size}"); - let (file_option, persister_task) = match server_confirmation { + let (file, persister_task) = match server_confirmation { Confirmation::NoWait => { let persister = PersisterTask::new( file, @@ -85,7 +84,7 @@ impl SegmentLogWriter { Ok(Self { file_path: file_path.to_string(), - file: RwLock::new(file_option), + file, persister_task, log_size_bytes, fsync, @@ -94,7 +93,7 @@ impl SegmentLogWriter { /// Append a message batch to the log file. pub async fn save_batches( - &self, + &mut self, batch: RetainedMessageBatch, confirmation: Confirmation, ) -> Result { @@ -128,9 +127,8 @@ impl SegmentLogWriter { } /// Write a batch of bytes to the log file and return the new file position. - async fn write_batch(&self, batch_to_write: RetainedMessageBatch) -> Result<(), IggyError> { - let mut file_guard = self.file.write().await; - if let Some(ref mut file) = *file_guard { + async fn write_batch(&mut self, batch_to_write: RetainedMessageBatch) -> Result<(), IggyError> { + if let Some(ref mut file) = self.file { let header = batch_to_write.header_as_bytes(); let batch_bytes = batch_to_write.bytes; let slices = [IoSlice::new(&header), IoSlice::new(&batch_bytes)]; @@ -150,8 +148,7 @@ impl SegmentLogWriter { } pub async fn fsync(&self) -> Result<(), IggyError> { - let mut file = self.file.write().await; - if let Some(file) = file.as_mut() { + if let Some(file) = self.file.as_ref() { file.sync_all() .await .with_error_context(|e| { diff --git a/server/src/streaming/segments/logs/persister_task.rs b/server/src/streaming/segments/logs/persister_task.rs index 5fd831d15..a95037402 100644 --- a/server/src/streaming/segments/logs/persister_task.rs +++ b/server/src/streaming/segments/logs/persister_task.rs @@ -1,4 +1,4 @@ -use crate::streaming::batching::message_batch::{RetainedMessageBatch, RETAINED_BATCH_OVERHEAD}; +use crate::streaming::batching::message_batch::{RetainedMessageBatch, RETAINED_BATCH_HEADER_LEN}; use flume::{unbounded, Receiver}; use iggy::{error::IggyError, utils::duration::IggyDuration}; use std::{ @@ -222,7 +222,7 @@ impl PersisterTask { let header = batch_to_write.header_as_bytes(); let batch_bytes = batch_to_write.bytes; let slices = [IoSlice::new(&header), IoSlice::new(&batch_bytes)]; - let bytes_written = RETAINED_BATCH_OVERHEAD + batch_bytes.len() as u64; + let bytes_written = RETAINED_BATCH_HEADER_LEN + batch_bytes.len() as u64; let mut attempts = 0; loop { diff --git a/server/src/streaming/segments/writing_messages.rs b/server/src/streaming/segments/writing_messages.rs index d72229b51..9ac5ec133 100644 --- a/server/src/streaming/segments/writing_messages.rs +++ b/server/src/streaming/segments/writing_messages.rs @@ -1,6 +1,6 @@ use super::indexes::*; use crate::streaming::batching::batch_accumulator::BatchAccumulator; -use crate::streaming::batching::message_batch::RETAINED_BATCH_OVERHEAD; +use crate::streaming::batching::message_batch::RETAINED_BATCH_HEADER_LEN; use crate::streaming::models::messages::RetainedMessage; use crate::streaming::segments::segment::Segment; use error_set::ErrContext; @@ -112,7 +112,7 @@ impl Segment { }; let saved_bytes = self .log_writer - .as_ref() + .as_mut() .unwrap() .save_batches(batch, confirmation) .await @@ -124,20 +124,20 @@ impl Segment { })?; self.index_writer - .as_ref() + .as_mut() .unwrap() .save_index(index) .await .with_error_context(|e| format!("Failed to save index, error: {e} for {}", self))?; self.last_index_position += batch_size.as_bytes_u64() as u32; - self.size_bytes += IggyByteSize::from(RETAINED_BATCH_OVERHEAD); + self.size_bytes += IggyByteSize::from(RETAINED_BATCH_HEADER_LEN); self.size_of_parent_stream - .fetch_add(RETAINED_BATCH_OVERHEAD, Ordering::AcqRel); + .fetch_add(RETAINED_BATCH_HEADER_LEN, Ordering::AcqRel); self.size_of_parent_topic - .fetch_add(RETAINED_BATCH_OVERHEAD, Ordering::AcqRel); + .fetch_add(RETAINED_BATCH_HEADER_LEN, Ordering::AcqRel); self.size_of_parent_partition - .fetch_add(RETAINED_BATCH_OVERHEAD, Ordering::AcqRel); + .fetch_add(RETAINED_BATCH_HEADER_LEN, Ordering::AcqRel); trace!( "Saved {} messages on disk in segment with start offset: {} for partition with ID: {}, total bytes written: {}.",