Skip to content

Commit

Permalink
fix(server): resolve deadlock during concurrent segment reads
Browse files Browse the repository at this point in the history
This commit addresses a deadlock issue in file reading operations by
refactoring the code to use `Arc<File>` instead of `RwLock<Option<File>>`.
The changes ensure that file operations are performed in a non-blocking
manner using `spawn_blocking` and `read_exact_at`. This improves the
concurrency and efficiency of file access, particularly in scenarios
where multiple tasks are reading from the same file concurrently. The
commit also includes error handling improvements for better diagnostics
and robustness.

Besides that, posix_fadvise(sequential) is added for segment files
to improve page cache utilization.
  • Loading branch information
hubcio committed Feb 16, 2025
1 parent baf0810 commit 1d837b7
Show file tree
Hide file tree
Showing 11 changed files with 285 additions and 262 deletions.
15 changes: 14 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.4.203"
version = "0.4.204"
edition = "2021"
build = "src/build.rs"
license = "Apache-2.0"
Expand Down Expand Up @@ -45,6 +45,7 @@ iggy = { path = "../sdk" }
jsonwebtoken = "9.3.1"
mimalloc = { version = "0.1", optional = true }
moka = { version = "0.12.10", features = ["future"] }
nix = { version = "0.29", features = ["fs"] }
openssl = { version = "0.10.70", features = ["vendored"] }
opentelemetry = { version = "0.28.0", features = ["trace", "logs"] }
opentelemetry-appender-tracing = { version = "0.28.1", features = ["log"] }
Expand Down
4 changes: 2 additions & 2 deletions server/src/compat/index_rebuilding/index_rebuilder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::streaming::utils::file;
use crate::{
server_error::CompatError, streaming::batching::message_batch::RETAINED_BATCH_OVERHEAD,
server_error::CompatError, streaming::batching::message_batch::RETAINED_BATCH_HEADER_LEN,
};
use std::io::SeekFrom;
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, BufWriter};
Expand Down Expand Up @@ -73,7 +73,7 @@ impl IndexRebuilder {
match Self::read_batch_header(&mut reader).await {
Ok(header) => {
// Calculate next position before writing current entry
next_position = position + RETAINED_BATCH_OVERHEAD as u32 + header.length;
next_position = position + RETAINED_BATCH_HEADER_LEN as u32 + header.length;

// Write index entry using current position
Self::write_index_entry(&mut writer, &header, position, self.start_offset)
Expand Down
4 changes: 2 additions & 2 deletions server/src/streaming/batching/batch_accumulator.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::message_batch::{RetainedMessageBatch, RETAINED_BATCH_OVERHEAD};
use super::message_batch::{RetainedMessageBatch, RETAINED_BATCH_HEADER_LEN};
use crate::streaming::local_sizeable::LocalSizeable;
use crate::streaming::models::messages::RetainedMessage;
use bytes::BytesMut;
Expand Down Expand Up @@ -109,6 +109,6 @@ impl BatchAccumulator {

impl Sizeable for BatchAccumulator {
fn get_size_bytes(&self) -> IggyByteSize {
self.current_size + RETAINED_BATCH_OVERHEAD.into()
self.current_size + RETAINED_BATCH_HEADER_LEN.into()
}
}
4 changes: 2 additions & 2 deletions server/src/streaming/batching/message_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::streaming::models::messages::RetainedMessage;
use bytes::Bytes;
use iggy::utils::{byte_size::IggyByteSize, sizeable::Sizeable};

pub const RETAINED_BATCH_OVERHEAD: u64 = 8 + 8 + 4 + 4;
pub const RETAINED_BATCH_HEADER_LEN: u64 = 8 + 8 + 4 + 4;

#[derive(Debug)]
pub struct RetainedMessageBatch {
Expand Down Expand Up @@ -81,6 +81,6 @@ where

impl Sizeable for RetainedMessageBatch {
fn get_size_bytes(&self) -> IggyByteSize {
self.length + RETAINED_BATCH_OVERHEAD.into()
self.length + RETAINED_BATCH_HEADER_LEN.into()
}
}
128 changes: 67 additions & 61 deletions server/src/streaming/segments/indexes/index_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,23 @@ use super::{
};
use error_set::ErrContext;
use iggy::error::IggyError;
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use tokio::{
use std::{
fs::{File, OpenOptions},
io::{AsyncReadExt, AsyncSeekExt, SeekFrom},
sync::RwLock,
io::ErrorKind,
os::unix::fs::FileExt,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};
use tokio::task::spawn_blocking;
use tracing::{error, trace, warn};

/// A dedicated struct for reading from the index file.
#[derive(Debug)]
pub struct SegmentIndexReader {
file_path: String,
file: RwLock<Option<File>>,
file: Arc<File>,
index_size_bytes: Arc<AtomicU64>,
}

Expand All @@ -29,12 +30,11 @@ impl SegmentIndexReader {
let file = OpenOptions::new()
.read(true)
.open(file_path)
.await
.with_error_context(|e| format!("Failed to open index file: {file_path}, error: {e}"))
.map_err(|_| IggyError::CannotReadFile)?;

let actual_index_size = file
.metadata()
.await
.with_error_context(|e| {
format!("Failed to get metadata of index file: {file_path}, error: {e}")
})
Expand All @@ -46,32 +46,31 @@ impl SegmentIndexReader {
trace!("Opened index file for reading: {file_path}, size: {actual_index_size}",);
Ok(Self {
file_path: file_path.to_string(),
file: RwLock::new(Some(file)),
file: Arc::new(file),
index_size_bytes,
})
}

/// Loads all indexes from the index file.
pub async fn load_all_indexes_impl(&self) -> Result<Vec<Index>, IggyError> {
let file_size = self.index_size_bytes.load(Ordering::Acquire);
let file_size = self.file_size();
if file_size == 0 {
warn!("Index {} file is empty.", self.file_path);
return Ok(Vec::new());
}
let mut file = self.file.write().await;
let file = file
.as_mut()
.unwrap_or_else(|| panic!("File {} should be open", self.file_path));

file.seek(SeekFrom::Start(0))
.await
.with_error_context(|e| format!("Failed to seek to start of index file: {e}"))
.map_err(|_| IggyError::CannotReadIndexOffset)?;
let mut buf = vec![0u8; file_size as usize];
file.read_exact(&mut buf)
.await
.with_error_context(|e| format!("Failed to read index file {}: {e}", self.file_path))
.map_err(|_| IggyError::CannotReadIndexOffset)?;
let buf = match self.read_at(0, file_size).await {
Ok(buf) => buf,
Err(e) if e.kind() == ErrorKind::UnexpectedEof => return Ok(Vec::new()),
Err(e) => {
error!(
"Error reading batch header at offset 0 in file {}: {e}",
self.file_path
);
return Err(IggyError::CannotReadIndexOffset);
}
};

let indexes_count = (file_size / INDEX_SIZE) as usize;
let mut indexes = Vec::with_capacity(indexes_count);
for chunk in buf.chunks_exact(INDEX_SIZE as usize) {
Expand Down Expand Up @@ -125,38 +124,35 @@ impl SegmentIndexReader {
);
return Ok(None);
}
let file_length = self.index_size_bytes.load(Ordering::Acquire);
if file_length == 0 {
let file_size = self.file_size();
if file_size == 0 {
warn!("Index {} file is empty.", self.file_path);
return Ok(None);
}
trace!("Index file length: {} bytes.", file_length);
trace!("Index file length: {} bytes.", file_size);

let relative_start_offset = (index_start_offset - segment_start_offset) as u32;
let relative_end_offset = (index_end_offset - segment_start_offset) as u32;
let mut index_range = IndexRange::default();
let mut file = self.file.write().await;

let file = file
.as_mut()
.unwrap_or_else(|| panic!("File {} should be open", self.file_path));

file.seek(SeekFrom::Start(0))
.await
.with_error_context(|e| format!("Failed to seek to start of index file: {e}"))
.map_err(|_| IggyError::CannotSeekFile)?;

let mut buf = vec![0u8; file_length as usize];
file.read_exact(&mut buf)
.await
.with_error_context(|e| format!("Failed to read index file: {e}"))
.map_err(|_| IggyError::CannotReadFile)?;

let buf = match self.read_at(0, file_size).await {
Ok(buf) => buf,
Err(e) if e.kind() == ErrorKind::UnexpectedEof => return Ok(None),
Err(e) => {
error!(
"Error reading batch header at offset 0 in file {}: {e}",
self.file_path
);
return Err(IggyError::CannotReadIndexOffset);
}
};
for chunk in buf.chunks_exact(INDEX_SIZE as usize) {
let offset = u32::from_le_bytes(
chunk[0..4]
.try_into()
.with_error_context(|e| format!("Failed to parse index offset: {e}"))
.with_error_context(|e: &std::array::TryFromSliceError| {
format!("Failed to parse index offset: {e}")
})
.map_err(|_| IggyError::CannotReadIndexOffset)?,
);
let position = u32::from_le_bytes(
Expand Down Expand Up @@ -194,27 +190,23 @@ impl SegmentIndexReader {
&self,
timestamp: u64,
) -> Result<Option<Index>, IggyError> {
let file_size = self.index_size_bytes.load(Ordering::Acquire);
let file_size = self.file_size();
if file_size == 0 {
trace!("Index file {} is empty.", self.file_path);
return Ok(Some(Index::default()));
}

let mut file = self.file.write().await;
let file = file
.as_mut()
.unwrap_or_else(|| panic!("File {} should be open", self.file_path));

file.seek(SeekFrom::Start(0))
.await
.with_error_context(|e| format!("Failed to seek to start of index file: {e}"))
.map_err(|_| IggyError::CannotSeekFile)?;
let mut buf = vec![0u8; file_size as usize];
file.read_exact(&mut buf)
.await
.with_error_context(|e| format!("Failed to read index file: {e}"))
.map_err(|_| IggyError::CannotReadFile)?;

let buf = match self.read_at(0, file_size).await {
Ok(buf) => buf,
Err(e) if e.kind() == ErrorKind::UnexpectedEof => return Ok(None),
Err(e) => {
error!(
"Error reading batch header at offset 0 in file {}: {e}",
self.file_path
);
return Err(IggyError::CannotReadIndexOffset);
}
};
let mut last_index: Option<Index> = None;
for chunk in buf.chunks_exact(INDEX_SIZE as usize) {
let offset = u32::from_le_bytes(
Expand Down Expand Up @@ -247,4 +239,18 @@ impl SegmentIndexReader {
}
Ok(None)
}

fn file_size(&self) -> u64 {
self.index_size_bytes.load(Ordering::Acquire)
}

async fn read_at(&self, offset: u64, len: u64) -> Result<Vec<u8>, std::io::Error> {
let file = self.file.clone();
spawn_blocking(move || {
let mut buf = vec![0u8; len as usize];
file.read_exact_at(&mut buf, offset)?;
Ok(buf)
})
.await?
}
}
12 changes: 3 additions & 9 deletions server/src/streaming/segments/indexes/index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use tracing::trace;
#[derive(Debug)]
pub struct SegmentIndexWriter {
file_path: String,
file: RwLock<Option<File>>,
file: RwLock<File>,
index_size_bytes: Arc<AtomicU64>,
fsync: bool,
}
Expand Down Expand Up @@ -56,7 +56,7 @@ impl SegmentIndexWriter {

Ok(Self {
file_path: file_path.to_string(),
file: RwLock::new(Some(file)),
file: RwLock::new(file),
index_size_bytes,
fsync,
})
Expand All @@ -71,9 +71,6 @@ impl SegmentIndexWriter {

{
let mut file = self.file.write().await;
let file = file
.as_mut()
.unwrap_or_else(|| panic!("File {} should be open", self.file_path));
file.write_all(&buf)
.await
.with_error_context(|e| {
Expand All @@ -93,10 +90,7 @@ impl SegmentIndexWriter {
}

pub async fn fsync(&self) -> Result<(), IggyError> {
let mut file = self.file.write().await;
let file = file
.as_mut()
.unwrap_or_else(|| panic!("File {} should be open", self.file_path));
let file = self.file.write().await;
file.sync_all()
.await
.with_error_context(|e| {
Expand Down
Loading

0 comments on commit 1d837b7

Please # to comment.