From a1b2c7a9678c768f4b38f90775fd8c10793b5dd4 Mon Sep 17 00:00:00 2001 From: spetz Date: Thu, 20 Feb 2025 20:42:11 +0100 Subject: [PATCH] fix(server): fix & remove unnecessary error logs on the server, update deps --- Cargo.lock | 58 +++++++------- bench/Cargo.toml | 2 +- cli/Cargo.toml | 2 +- examples/Cargo.toml | 6 +- sdk/Cargo.toml | 10 +-- server/Cargo.toml | 10 +-- .../delete_consumer_group_handler.rs | 4 +- .../get_consumer_group_handler.rs | 10 +-- .../delete_consumer_offset_handler.rs | 4 +- .../get_consumer_offset_handler.rs | 11 +-- .../partitions/delete_partitions_handler.rs | 3 +- .../handlers/streams/delete_stream_handler.rs | 4 +- .../handlers/streams/get_stream_handler.rs | 12 ++- .../handlers/system/get_client_handler.rs | 6 +- .../binary/handlers/system/get_me_handler.rs | 10 ++- .../binary/handlers/system/ping_handler.rs | 18 ++--- .../handlers/topics/delete_topic_handler.rs | 4 +- .../handlers/topics/get_topic_handler.rs | 12 ++- .../handlers/users/delete_user_handler.rs | 5 +- .../binary/handlers/users/get_user_handler.rs | 7 +- .../channels/commands/maintain_messages.rs | 8 +- server/src/http/consumer_groups.rs | 26 +++--- server/src/http/consumer_offsets.rs | 11 +-- server/src/http/partitions.rs | 2 +- server/src/http/streams.rs | 22 +++--- server/src/http/system.rs | 9 +-- server/src/http/topics.rs | 30 +++---- server/src/http/users.rs | 23 +++--- .../src/streaming/clients/client_manager.rs | 9 +-- .../src/streaming/partitions/persistence.rs | 6 +- server/src/streaming/streams/topics.rs | 11 +++ server/src/streaming/systems/clients.rs | 11 +-- .../src/streaming/systems/consumer_groups.rs | 19 ++--- .../src/streaming/systems/consumer_offsets.rs | 22 ++++-- server/src/streaming/systems/partitions.rs | 2 +- server/src/streaming/systems/streams.rs | 40 +++++++++- server/src/streaming/systems/topics.rs | 79 ++++++++++++------- server/src/streaming/systems/users.rs | 47 ++++++----- .../src/streaming/topics/consumer_groups.rs | 18 +++++ .../src/streaming/topics/consumer_offsets.rs | 11 ++- server/src/streaming/topics/partitions.rs | 4 +- server/src/streaming/topics/persistence.rs | 4 +- tools/Cargo.toml | 2 +- 43 files changed, 335 insertions(+), 279 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8ef6c2f0c..fcac94388 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -153,9 +153,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.95" +version = "1.0.96" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34ac096ce696dc2fcabef30516bb13c0a68a11d30131d3df6f04711467681b04" +checksum = "6b964d184e89d9b6b67dd2715bc8e74cf3107fb2b529990c90cf517326150bf4" [[package]] name = "arbitrary" @@ -408,9 +408,9 @@ dependencies = [ [[package]] name = "aws-lc-rs" -version = "1.12.2" +version = "1.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c2b7ddaa2c56a367ad27a094ad8ef4faacf8a617c2575acb2ba88949df999ca" +checksum = "3c6a895b664295a4ba0c2c0203c7075ea585dd75cd5c37a8efac829e13e460ef" dependencies = [ "aws-lc-sys", "paste", @@ -419,9 +419,9 @@ dependencies = [ [[package]] name = "aws-lc-sys" -version = "0.25.1" +version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54ac4f13dad353b209b34cbec082338202cbc01c8f00336b55c750c13ac91f8f" +checksum = "0f9dd2e03ee80ca2822dd6ea431163d2ef259f2066a4d6ccaca6d9dcb386aa43" dependencies = [ "bindgen", "cc", @@ -1968,9 +1968,9 @@ checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2" [[package]] name = "h2" -version = "0.4.7" +version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccae279728d634d083c00f6099cb58f01cc99c145b84b8be2f6c74618d79922e" +checksum = "5017294ff4bb30944501348f6f8e42e6ad28f42c8bbef7a74029aff064a4e3c2" dependencies = [ "atomic-waker", "bytes", @@ -3048,9 +3048,9 @@ dependencies = [ [[package]] name = "native-tls" -version = "0.2.13" +version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0dab59f8e050d5df8e4dd87d9206fb6f65a483e20ac9fda365ade4fab353196c" +checksum = "87de3442987e9dbec73158d5c715e7ad9072fda936bb03d19d7fa10e00520f0e" dependencies = [ "libc", "log", @@ -3904,7 +3904,7 @@ checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94" dependencies = [ "rand_chacha 0.9.0", "rand_core 0.9.1", - "zerocopy 0.8.18", + "zerocopy 0.8.20", ] [[package]] @@ -3943,7 +3943,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a88e0da7a2c97baa202165137c158d0a2e824ac465d13d81046727b34cb247d3" dependencies = [ "getrandom 0.3.1", - "zerocopy 0.8.18", + "zerocopy 0.8.20", ] [[package]] @@ -4505,18 +4505,18 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.217" +version = "1.0.218" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02fc4265df13d6fa1d00ecff087228cc0a2b5f3c0e87e258d8b94a156e984c70" +checksum = "e8dfc9d19bdbf6d17e22319da49161d5d0108e4188e8b680aef6299eed22df60" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.217" +version = "1.0.218" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0" +checksum = "f09503e191f4e797cb8aac08e9a4a4695c5edf6a2e70e376d961ddd5c969f82b" dependencies = [ "proc-macro2", "quote", @@ -4525,9 +4525,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.138" +version = "1.0.139" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d434192e7da787e94a6ea7e9670b26a036d0ca41e0b7efb2676dd32bae872949" +checksum = "44f86c3acccc9c65b153fe1b85a3be07fe5515274ec9f0653b4a0875731c72a6" dependencies = [ "itoa", "memchr", @@ -4623,7 +4623,7 @@ dependencies = [ [[package]] name = "server" -version = "0.4.209" +version = "0.4.210" dependencies = [ "ahash 0.8.11", "anyhow", @@ -5497,9 +5497,9 @@ dependencies = [ [[package]] name = "unicode-ident" -version = "1.0.16" +version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a210d160f08b701c8721ba1c726c11662f877ea6b7094007e1ca9a1041945034" +checksum = "00e2473a93778eb0bad35909dff6a10d28e63f792f16ed15e404fca9d5eeedbe" [[package]] name = "unicode-segmentation" @@ -5579,7 +5579,7 @@ dependencies = [ "getrandom 0.3.1", "rand 0.9.0", "serde", - "zerocopy 0.8.18", + "zerocopy 0.8.20", ] [[package]] @@ -6150,9 +6150,9 @@ checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" [[package]] name = "winnow" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59690dea168f2198d1a3b0cac23b8063efcd11012f10ae4698f284808c8ef603" +checksum = "0e7f4ea97f6f78012141bcdb6a216b2609f0979ada50b20ca5b52dde2eac2bb1" dependencies = [ "memchr", ] @@ -6247,11 +6247,11 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.18" +version = "0.8.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79386d31a42a4996e3336b0919ddb90f81112af416270cff95b5f5af22b839c2" +checksum = "dde3bb8c68a8f3f1ed4ac9221aad6b10cece3e60a8e2ea54a6a2dec806d0084c" dependencies = [ - "zerocopy-derive 0.8.18", + "zerocopy-derive 0.8.20", ] [[package]] @@ -6267,9 +6267,9 @@ dependencies = [ [[package]] name = "zerocopy-derive" -version = "0.8.18" +version = "0.8.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76331675d372f91bf8d17e13afbd5fe639200b73d01f0fc748bb059f9cca2db7" +checksum = "eea57037071898bf96a6da35fd626f4f27e9cee3ead2a6c703cf09d472b2e700" dependencies = [ "proc-macro2", "quote", diff --git a/bench/Cargo.toml b/bench/Cargo.toml index 08049836a..e31482b39 100644 --- a/bench/Cargo.toml +++ b/bench/Cargo.toml @@ -21,7 +21,7 @@ iggy = { path = "../sdk" } iggy-bench-report = { path = "report" } integration = { path = "../integration" } nonzero_lit = "0.1.2" -serde = { version = "1.0.217", features = ["derive"] } +serde = { version = "1.0.218", features = ["derive"] } sysinfo = "0.33.1" tokio = { version = "1.43.0", features = ["full"] } toml = "0.8.20" diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 27eb2589c..8ea92caef 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -16,7 +16,7 @@ login-session = ["dep:keyring"] [dependencies] ahash = { version = "0.8.11", features = ["serde"] } -anyhow = "1.0.95" +anyhow = "1.0.96" clap = { version = "4.5.30", features = ["derive"] } clap_complete = "4.5.45" figlet-rs = "0.1.5" diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 0abecc892..925e39c9d 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -74,14 +74,14 @@ path = "src/stream-builder/stream-producer-config/main.rs" [dependencies] ahash = { version = "0.8.11", features = ["serde"] } -anyhow = "1.0.95" +anyhow = "1.0.96" bytes = "1.10.0" clap = { version = "4.5.30", features = ["derive"] } futures-util = "0.3.31" iggy = { path = "../sdk" } rand = "0.9.0" -serde = { version = "1.0.217", features = ["derive", "rc"] } -serde_json = "1.0.138" +serde = { version = "1.0.218", features = ["derive", "rc"] } +serde_json = "1.0.139" tokio = { version = "1.43.0", features = ["full"] } tracing = { version = "0.1.41" } tracing-subscriber = { version = "0.3.19", features = ["fmt", "env-filter"] } diff --git a/sdk/Cargo.toml b/sdk/Cargo.toml index eecb7286a..623260c44 100644 --- a/sdk/Cargo.toml +++ b/sdk/Cargo.toml @@ -14,7 +14,7 @@ readme = "../README.md" [dependencies] aes-gcm = "0.10.3" ahash = { version = "0.8.11", features = ["serde"] } -anyhow = "1.0.95" +anyhow = "1.0.96" async-broadcast = { version = "0.7.2" } async-dropper = { version = "0.3.1", features = ["tokio", "simple"] } async-trait = "0.1.86" @@ -51,8 +51,8 @@ reqwest = { version = "0.12.12", default-features = false, features = [ reqwest-middleware = { version = "0.4.0", features = ["json"] } reqwest-retry = "0.7.0" rustls = { version = "0.23.23", features = ["ring"] } -serde = { version = "1.0.217", features = ["derive", "rc"] } -serde_json = "1.0.138" +serde = { version = "1.0.218", features = ["derive", "rc"] } +serde_json = "1.0.139" serde_with = { version = "3.12.0", features = ["base64"] } strum = { version = "0.27.1", features = ["derive"] } thiserror = "2.0.11" @@ -66,8 +66,8 @@ webpki-roots = { version = "0.26.8" } [build-dependencies] convert_case = "0.7.1" -serde = { version = "1.0.217", features = ["derive", "rc"] } -serde_derive = "1.0.217" +serde = { version = "1.0.218", features = ["derive", "rc"] } +serde_derive = "1.0.218" [features] default = ["tokio_lock"] diff --git a/server/Cargo.toml b/server/Cargo.toml index e612986fa..76a896a67 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.4.209" +version = "0.4.210" edition = "2021" build = "src/build.rs" license = "Apache-2.0" @@ -13,7 +13,7 @@ mimalloc = ["dep:mimalloc"] [dependencies] ahash = { version = "0.8.11" } -anyhow = "1.0.95" +anyhow = "1.0.96" async_zip = { version = "0.0.17", features = [ "tokio", "lzma", @@ -76,8 +76,8 @@ ring = "0.17.9" rust-s3 = { version = "0.35.1", features = ["default"] } rustls = { version = "0.23.23" } rustls-pemfile = "2.2.0" -serde = { version = "1.0.217", features = ["derive", "rc"] } -serde_json = "1.0.138" +serde = { version = "1.0.218", features = ["derive", "rc"] } +serde_json = "1.0.139" serde_with = { version = "3.12.0", features = ["base64", "macros"] } static-toml = "1.3.0" strum = { version = "0.27.1", features = ["derive"] } @@ -106,7 +106,7 @@ mockall = "0.13.1" [build-dependencies] figment = { version = "0.10.19", features = ["json", "toml", "env"] } -serde_json = "1.0.138" +serde_json = "1.0.139" vergen-git2 = { version = "1.0.5", features = [ "build", "cargo", diff --git a/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs b/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs index a58c491ef..0526bde95 100644 --- a/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs +++ b/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs @@ -26,8 +26,8 @@ pub async fn handle( &command.group_id, ) .await.with_error_context(|_| format!( - "{COMPONENT} - failed to delete consumer group for stream_id: {}, topic_id: {}, group_id: {:?}, session: {}", - command.stream_id, command.topic_id, command.group_id, session + "{COMPONENT} - failed to delete consumer group with ID: {} for topic with ID: {} in stream with ID: {} for session: {}", + command.group_id, command.topic_id, command.stream_id, session ))?; } diff --git a/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs b/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs index e30296527..6c5ef0ecb 100644 --- a/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs +++ b/server/src/binary/handlers/consumer_groups/get_consumer_group_handler.rs @@ -15,18 +15,18 @@ pub async fn handle( ) -> Result<(), IggyError> { debug!("session: {session}, command: {command}"); let system = system.read().await; - let consumer_group = system.get_consumer_group( + let Some(consumer_group) = system.get_consumer_group( session, &command.stream_id, &command.topic_id, &command.group_id, - ); - if consumer_group.is_err() { + )? + else { sender.send_empty_ok_response().await?; return Ok(()); - } + }; - let consumer_group = consumer_group?.read().await; + let consumer_group = consumer_group.read().await; let consumer_group = mapper::map_consumer_group(&consumer_group).await; sender.send_ok_response(&consumer_group).await?; Ok(()) diff --git a/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs b/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs index 7b44bce3c..16959bcc2 100644 --- a/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs +++ b/server/src/binary/handlers/consumer_offsets/delete_consumer_offset_handler.rs @@ -25,8 +25,8 @@ pub async fn handle( command.partition_id, ) .await - .with_error_context(|_| format!("{COMPONENT} - failed to delete consumer offset for stream_id: {}, topic_id: {}, partition_id: {:?}, session: {}", - command.stream_id, command.topic_id, command.partition_id, session + .with_error_context(|_| format!("{COMPONENT} - failed to delete consumer offset for topic with ID: {} in stream with ID: {} partition ID: {:#?}, session: {}", + command.topic_id, command.stream_id, command.partition_id, session ))?; sender.send_empty_ok_response().await?; Ok(()) diff --git a/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs b/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs index a5fc21e06..215e78fe5 100644 --- a/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs +++ b/server/src/binary/handlers/consumer_offsets/get_consumer_offset_handler.rs @@ -15,7 +15,7 @@ pub async fn handle( ) -> Result<(), IggyError> { debug!("session: {session}, command: {command}"); let system = system.read().await; - let offset = system + let Some(offset) = system .get_consumer_offset( session, &command.consumer, @@ -23,13 +23,8 @@ pub async fn handle( &command.topic_id, command.partition_id, ) - .await; - if offset.is_err() { - sender.send_empty_ok_response().await?; - return Ok(()); - } - - let Some(offset) = offset? else { + .await? + else { sender.send_empty_ok_response().await?; return Ok(()); }; diff --git a/server/src/binary/handlers/partitions/delete_partitions_handler.rs b/server/src/binary/handlers/partitions/delete_partitions_handler.rs index e8de4f750..16b2b42de 100644 --- a/server/src/binary/handlers/partitions/delete_partitions_handler.rs +++ b/server/src/binary/handlers/partitions/delete_partitions_handler.rs @@ -31,8 +31,7 @@ pub async fn handle( .await .with_error_context(|_| { format!( - "{COMPONENT} - failed to delete partitions for stream_id: {}, topic_id: {}, session: {}", - stream_id, topic_id, session + "{COMPONENT} - failed to delete partitions for topic with ID: {topic_id} in stream with ID: {stream_id}, session: {session}", ) })?; } diff --git a/server/src/binary/handlers/streams/delete_stream_handler.rs b/server/src/binary/handlers/streams/delete_stream_handler.rs index dd507527d..731576927 100644 --- a/server/src/binary/handlers/streams/delete_stream_handler.rs +++ b/server/src/binary/handlers/streams/delete_stream_handler.rs @@ -23,7 +23,7 @@ pub async fn handle( .delete_stream(session, &command.stream_id) .await .with_error_context(|_| { - format!("{COMPONENT} - failed to delete stream with id: {stream_id}, session: {session}") + format!("{COMPONENT} - failed to delete stream with ID: {stream_id}, session: {session}") })?; } @@ -33,7 +33,7 @@ pub async fn handle( .apply(session.get_user_id(), EntryCommand::DeleteStream(command)) .await .with_error_context(|_| { - format!("{COMPONENT} - failed to apply delete stream with id: {stream_id}, session: {session}") + format!("{COMPONENT} - failed to apply delete stream with ID: {stream_id}, session: {session}") })?; sender.send_empty_ok_response().await?; Ok(()) diff --git a/server/src/binary/handlers/streams/get_stream_handler.rs b/server/src/binary/handlers/streams/get_stream_handler.rs index a50395ee3..d689f6f3e 100644 --- a/server/src/binary/handlers/streams/get_stream_handler.rs +++ b/server/src/binary/handlers/streams/get_stream_handler.rs @@ -15,13 +15,17 @@ pub async fn handle( ) -> Result<(), IggyError> { debug!("session: {session}, command: {command}"); let system = system.read().await; - let stream = system.find_stream(session, &command.stream_id); - if stream.is_err() { + let Ok(stream) = system.try_find_stream(session, &command.stream_id) else { sender.send_empty_ok_response().await?; return Ok(()); - } + }; - let response = mapper::map_stream(stream?); + let Some(stream) = stream else { + sender.send_empty_ok_response().await?; + return Ok(()); + }; + + let response = mapper::map_stream(stream); sender.send_ok_response(&response).await?; Ok(()) } diff --git a/server/src/binary/handlers/system/get_client_handler.rs b/server/src/binary/handlers/system/get_client_handler.rs index 047075d61..b52ed5462 100644 --- a/server/src/binary/handlers/system/get_client_handler.rs +++ b/server/src/binary/handlers/system/get_client_handler.rs @@ -17,14 +17,12 @@ pub async fn handle( let bytes; { let system = system.read().await; - let client = system.get_client(session, command.client_id).await; - if client.is_err() { + let Some(client) = system.get_client(session, command.client_id).await? else { sender.send_empty_ok_response().await?; return Ok(()); - } + }; { - let client = client?; let client = client.read().await; bytes = mapper::map_client(&client); } diff --git a/server/src/binary/handlers/system/get_me_handler.rs b/server/src/binary/handlers/system/get_me_handler.rs index b603fae66..9cc55f1a5 100644 --- a/server/src/binary/handlers/system/get_me_handler.rs +++ b/server/src/binary/handlers/system/get_me_handler.rs @@ -19,12 +19,16 @@ pub async fn handle( let bytes; { let system = system.read().await; - let client = system + let Some(client) = system .get_client(session, session.client_id) .await .with_error_context(|_| { - format!("{COMPONENT} - failed to get client, session: {session}") - })?; + format!("{COMPONENT} - failed to get current client for session: {session}") + })? + else { + return Err(IggyError::ClientNotFound(session.client_id)); + }; + { let client = client.read().await; bytes = mapper::map_client(&client); diff --git a/server/src/binary/handlers/system/ping_handler.rs b/server/src/binary/handlers/system/ping_handler.rs index 97c07326c..8c6ebcf61 100644 --- a/server/src/binary/handlers/system/ping_handler.rs +++ b/server/src/binary/handlers/system/ping_handler.rs @@ -1,9 +1,7 @@ -use crate::binary::handlers::system::COMPONENT; use crate::binary::sender::SenderKind; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use anyhow::Result; -use error_set::ErrContext; use iggy::error::IggyError; use iggy::locking::IggySharedMutFn; use iggy::system::ping::Ping; @@ -19,15 +17,13 @@ pub async fn handle( debug!("session: {session}, command: {command}"); let system = system.read().await; let client_manager = system.client_manager.read().await; - let client = client_manager - .get_client(session.client_id) - .with_error_context(|_| { - format!("{COMPONENT} - failed to get clients, session: {session}") - })?; - let mut client = client.write().await; - let now = IggyTimestamp::now(); - client.last_heartbeat = now; - debug!("Updated last heartbeat to: {now} for session: {session}"); + if let Some(client) = client_manager.try_get_client(session.client_id) { + let mut client = client.write().await; + let now = IggyTimestamp::now(); + client.last_heartbeat = now; + debug!("Updated last heartbeat to: {now} for session: {session}"); + } + sender.send_empty_ok_response().await?; Ok(()) } diff --git a/server/src/binary/handlers/topics/delete_topic_handler.rs b/server/src/binary/handlers/topics/delete_topic_handler.rs index 01c05520e..1e9141e3a 100644 --- a/server/src/binary/handlers/topics/delete_topic_handler.rs +++ b/server/src/binary/handlers/topics/delete_topic_handler.rs @@ -25,7 +25,7 @@ pub async fn handle( .delete_topic(session, &command.stream_id, &command.topic_id) .await .with_error_context(|_| format!( - "{COMPONENT} - failed to delete topic for stream_id: {stream_id}, topic_id: {topic_id}", + "{COMPONENT} - failed to delete topic with ID: {topic_id} in stream with ID: {stream_id}, session: {session}", ))?; } @@ -35,7 +35,7 @@ pub async fn handle( .apply(session.get_user_id(), EntryCommand::DeleteTopic(command)) .await .with_error_context(|_| format!( - "{COMPONENT} - failed to apply delete topic for stream_id: {stream_id}, topic_id: {topic_id}", + "{COMPONENT} - failed to apply delete topic with ID: {topic_id} in stream with ID: {stream_id}, session: {session}", ))?; sender.send_empty_ok_response().await?; Ok(()) diff --git a/server/src/binary/handlers/topics/get_topic_handler.rs b/server/src/binary/handlers/topics/get_topic_handler.rs index fe7c1b4fe..a62a518ea 100644 --- a/server/src/binary/handlers/topics/get_topic_handler.rs +++ b/server/src/binary/handlers/topics/get_topic_handler.rs @@ -15,13 +15,17 @@ pub async fn handle( ) -> Result<(), IggyError> { debug!("session: {session}, command: {command}"); let system = system.read().await; - let topic = system.find_topic(session, &command.stream_id, &command.topic_id); - if topic.is_err() { + let Ok(topic) = system.try_find_topic(session, &command.stream_id, &command.topic_id) else { sender.send_empty_ok_response().await?; return Ok(()); - } + }; - let topic = mapper::map_topic(topic?).await; + let Some(topic) = topic else { + sender.send_empty_ok_response().await?; + return Ok(()); + }; + + let topic = mapper::map_topic(topic).await; sender.send_ok_response(&topic).await?; Ok(()) } diff --git a/server/src/binary/handlers/users/delete_user_handler.rs b/server/src/binary/handlers/users/delete_user_handler.rs index 13b1a782e..a764d6599 100644 --- a/server/src/binary/handlers/users/delete_user_handler.rs +++ b/server/src/binary/handlers/users/delete_user_handler.rs @@ -23,7 +23,7 @@ pub async fn handle( .await .with_error_context(|_| { format!( - "{COMPONENT} - failed to delete user with id: {}, session: {session}", + "{COMPONENT} - failed to delete user with ID: {}, session: {session}", command.user_id ) })?; @@ -37,8 +37,7 @@ pub async fn handle( .await .with_error_context(|_| { format!( - "{COMPONENT} - failed to apply delete user with id: {}, session: {session}", - user_id + "{COMPONENT} - failed to apply delete user with ID: {user_id}, session: {session}", ) })?; sender.send_empty_ok_response().await?; diff --git a/server/src/binary/handlers/users/get_user_handler.rs b/server/src/binary/handlers/users/get_user_handler.rs index 38584662e..bceddf796 100644 --- a/server/src/binary/handlers/users/get_user_handler.rs +++ b/server/src/binary/handlers/users/get_user_handler.rs @@ -14,13 +14,12 @@ pub async fn handle( ) -> Result<(), IggyError> { debug!("session: {session}, command: {command}"); let system = system.read().await; - let user = system.find_user(session, &command.user_id); - if user.is_err() { + let Some(user) = system.find_user(session, &command.user_id)? else { sender.send_empty_ok_response().await?; return Ok(()); - } + }; - let bytes = mapper::map_user(user?); + let bytes = mapper::map_user(user); sender.send_ok_response(&bytes).await?; Ok(()) } diff --git a/server/src/channels/commands/maintain_messages.rs b/server/src/channels/commands/maintain_messages.rs index 95e038027..0fe04a5b1 100644 --- a/server/src/channels/commands/maintain_messages.rs +++ b/server/src/channels/commands/maintain_messages.rs @@ -496,7 +496,7 @@ async fn delete_segments( let mut last_end_offset = 0; for start_offset in &segment_to_delete.start_offsets { let deleted_segment = partition.delete_segment(*start_offset).await.with_error_context(|_| { - format!("CHANNEL_COMMAND - failed to delete segment for stream ID: {}, topic ID: {}", topic.stream_id, topic.topic_id) + format!("CHANNEL_COMMAND - failed to delete segment for stream with ID: {}, topic with ID: {}", topic.stream_id, topic.topic_id) })?; last_end_offset = deleted_segment.end_offset; segments_count += 1; @@ -506,14 +506,14 @@ async fn delete_segments( if partition.get_segments().is_empty() { let start_offset = last_end_offset + 1; partition.add_persisted_segment(start_offset).await.with_error_context(|_| { - format!("CHANNEL_COMMAND - failed to add persisted segment for stream ID: {}, topic ID: {}", topic.stream_id, topic.topic_id) + format!("CHANNEL_COMMAND - failed to add persisted segment for stream with ID: {}, topic with ID: {}", topic.stream_id, topic.topic_id) })?; } } Err(error) => { error!( - "Partition with ID: {} not found for stream ID: {}, topic ID: {}. Error: {}", - segment_to_delete.partition_id, topic.stream_id, topic.topic_id, error + "Partition with ID: {} was not found for stream with ID: {}, topic with ID: {}. Error: {error}", + segment_to_delete.partition_id, topic.stream_id, topic.topic_id ); continue; } diff --git a/server/src/http/consumer_groups.rs b/server/src/http/consumer_groups.rs index 0df42918d..8320192d6 100644 --- a/server/src/http/consumer_groups.rs +++ b/server/src/http/consumer_groups.rs @@ -40,24 +40,16 @@ async fn get_consumer_group( let identifier_topic_id = Identifier::from_str_value(&topic_id)?; let identifier_group_id = Identifier::from_str_value(&group_id)?; let system = state.system.read().await; - let consumer_group = system - .get_consumer_group( - &Session::stateless(identity.user_id, identity.ip_address), - &identifier_stream_id, - &identifier_topic_id, - &identifier_group_id, - ) - .with_error_context(|_| { - format!( - "{COMPONENT} - failed to get consumer group, stream ID: {}, topic ID: {}, group ID: {}", - stream_id, topic_id, group_id, - ) - }); - if consumer_group.is_err() { + let Some(consumer_group) = system.get_consumer_group( + &Session::stateless(identity.user_id, identity.ip_address), + &identifier_stream_id, + &identifier_topic_id, + &identifier_group_id, + )? + else { return Err(CustomError::ResourceNotFound); - } + }; - let consumer_group = consumer_group?; let consumer_group = consumer_group.read().await; let consumer_group = mapper::map_consumer_group(&consumer_group).await; Ok(Json(consumer_group)) @@ -135,7 +127,7 @@ async fn delete_consumer_group( &identifier_group_id, ) .await - .with_error_context(|_| format!("{COMPONENT} - failed to delete consumer group, stream ID: {}, topic ID: {}, group ID: {}", stream_id, topic_id, group_id))?; + .with_error_context(|_| format!("{COMPONENT} - failed to delete consumer group with ID: {group_id} for topic with ID: {topic_id} in stream with ID: {stream_id}"))?; } let system = state.system.read().await; diff --git a/server/src/http/consumer_offsets.rs b/server/src/http/consumer_offsets.rs index 72f9a9d5e..a342818da 100644 --- a/server/src/http/consumer_offsets.rs +++ b/server/src/http/consumer_offsets.rs @@ -41,7 +41,7 @@ async fn get_consumer_offset( query.validate()?; let consumer = Consumer::new(query.0.consumer.id); let system = state.system.read().await; - let offset = system + let Some(offset) = system .get_consumer_offset( &Session::stateless(identity.user_id, identity.ip_address), &consumer, @@ -49,13 +49,8 @@ async fn get_consumer_offset( &query.0.topic_id, query.0.partition_id, ) - .await - .with_error_context(|_| format!("{COMPONENT} - failed to get consumer offset, stream ID: {}, topic ID: {}, patition ID: {:?}", stream_id, topic_id, query.0.partition_id)); - if offset.is_err() { - return Err(CustomError::ResourceNotFound); - } - - let Some(offset) = offset? else { + .await? + else { return Err(CustomError::ResourceNotFound); }; diff --git a/server/src/http/partitions.rs b/server/src/http/partitions.rs index 6dedc8d39..8a1fa8049 100644 --- a/server/src/http/partitions.rs +++ b/server/src/http/partitions.rs @@ -89,7 +89,7 @@ async fn delete_partitions( .await .with_error_context(|_| { format!( - "{COMPONENT} - failed to delete partitions, stream ID: {}, topic ID: {}", + "{COMPONENT} - failed to delete partitions for topic with ID: {} in stream with ID: {}", stream_id, topic_id ) })?; diff --git a/server/src/http/streams.rs b/server/src/http/streams.rs index 18c553536..8ab9cbce1 100644 --- a/server/src/http/streams.rs +++ b/server/src/http/streams.rs @@ -39,15 +39,17 @@ async fn get_stream( ) -> Result, CustomError> { let system = state.system.read().await; let stream_id = Identifier::from_str_value(&stream_id)?; - let stream = system.find_stream( + let Ok(stream) = system.try_find_stream( &Session::stateless(identity.user_id, identity.ip_address), &stream_id, - ); - if stream.is_err() { + ) else { return Err(CustomError::ResourceNotFound); - } + }; + let Some(stream) = stream else { + return Err(CustomError::ResourceNotFound); + }; - let stream = mapper::map_stream(stream?); + let stream = mapper::map_stream(stream); Ok(Json(stream)) } @@ -165,10 +167,7 @@ async fn delete_stream( ) .await .with_error_context(|_| { - format!( - "{COMPONENT} - failed to delete stream, stream ID: {}", - stream_id - ) + format!("{COMPONENT} - failed to delete stream with ID: {stream_id}",) })?; } @@ -183,10 +182,7 @@ async fn delete_stream( ) .await .with_error_context(|_| { - format!( - "{COMPONENT} - failed to apply delete stream, stream ID: {}", - stream_id - ) + format!("{COMPONENT} - failed to apply delete stream with ID: {stream_id}",) })?; Ok(StatusCode::NO_CONTENT) } diff --git a/server/src/http/system.rs b/server/src/http/system.rs index 21d67a248..9037b5f24 100644 --- a/server/src/http/system.rs +++ b/server/src/http/system.rs @@ -59,7 +59,7 @@ async fn get_client( Path(client_id): Path, ) -> Result, CustomError> { let system = state.system.read().await; - let client = system + let Some(client) = system .get_client( &Session::stateless(identity.user_id, identity.ip_address), client_id, @@ -70,12 +70,11 @@ async fn get_client( "{COMPONENT} - failed to get client, user ID: {}", identity.user_id ) - }); - if client.is_err() { + })? + else { return Err(CustomError::ResourceNotFound); - } + }; - let client = client?; let client = client.read().await; let client = mapper::map_client(&client); Ok(Json(client)) diff --git a/server/src/http/topics.rs b/server/src/http/topics.rs index 8d8783296..b1e003ebe 100644 --- a/server/src/http/topics.rs +++ b/server/src/http/topics.rs @@ -45,23 +45,18 @@ async fn get_topic( let system = state.system.read().await; let identity_stream_id = Identifier::from_str_value(&stream_id)?; let identity_topic_id = Identifier::from_str_value(&topic_id)?; - let topic = system - .find_topic( - &Session::stateless(identity.user_id, identity.ip_address), - &identity_stream_id, - &identity_topic_id, - ) - .with_error_context(|_| { - format!( - "{COMPONENT} - failed to find topic, stream ID: {}, topic ID: {}", - stream_id, topic_id - ) - }); - if topic.is_err() { + let Ok(topic) = system.try_find_topic( + &Session::stateless(identity.user_id, identity.ip_address), + &identity_stream_id, + &identity_topic_id, + ) else { return Err(CustomError::ResourceNotFound); - } + }; + let Some(topic) = topic else { + return Err(CustomError::ResourceNotFound); + }; - let topic = mapper::map_topic(topic?).await; + let topic = mapper::map_topic(topic).await; Ok(Json(topic)) } @@ -79,7 +74,7 @@ async fn get_topics( ) .with_error_context(|_| { format!( - "{COMPONENT} - failed to find topic, stream ID: {}", + "{COMPONENT} - failed to find topics for stream with ID: {}", stream_id ) })?; @@ -204,8 +199,7 @@ async fn delete_topic( .await .with_error_context(|_| { format!( - "{COMPONENT} - failed to delete topic, stream ID: {}, topic ID: {}", - stream_id, topic_id + "{COMPONENT} - failed to delete topic with ID: {topic_id} in stream with ID: {stream_id}", ) })?; } diff --git a/server/src/http/users.rs b/server/src/http/users.rs index 028002435..029088e4b 100644 --- a/server/src/http/users.rs +++ b/server/src/http/users.rs @@ -48,17 +48,15 @@ async fn get_user( ) -> Result, CustomError> { let identifier_user_id = Identifier::from_str_value(&user_id)?; let system = state.system.read().await; - let user = system - .find_user( - &Session::stateless(identity.user_id, identity.ip_address), - &identifier_user_id, - ) - .with_error_context(|_| format!("{COMPONENT} - failed to find user, user ID: {}", user_id)); - if user.is_err() { + let Some(user) = system.find_user( + &Session::stateless(identity.user_id, identity.ip_address), + &identifier_user_id, + )? + else { return Err(CustomError::ResourceNotFound); - } + }; - let user = mapper::map_user(user?); + let user = mapper::map_user(user); Ok(Json(user)) } @@ -275,7 +273,7 @@ async fn delete_user( ) .await .with_error_context(|_| { - format!("{COMPONENT} - failed to delete user, user ID: {}", user_id) + format!("{COMPONENT} - failed to delete user with ID: {user_id}") })?; } @@ -290,10 +288,7 @@ async fn delete_user( ) .await .with_error_context(|_| { - format!( - "{COMPONENT} - failed to apply delete user, user ID: {}", - user_id - ) + format!("{COMPONENT} - failed to apply delete user with ID: {user_id}") })?; Ok(StatusCode::NO_CONTENT) } diff --git a/server/src/streaming/clients/client_manager.rs b/server/src/streaming/clients/client_manager.rs index 3d40c0ba1..d015f2f73 100644 --- a/server/src/streaming/clients/client_manager.rs +++ b/server/src/streaming/clients/client_manager.rs @@ -83,13 +83,8 @@ impl ClientManager { Ok(()) } - pub fn get_client(&self, client_id: u32) -> Result, IggyError> { - let client = self.clients.get(&client_id); - if client.is_none() { - return Err(IggyError::ClientNotFound(client_id)); - } - - Ok(client.unwrap().clone()) + pub fn try_get_client(&self, client_id: u32) -> Option> { + self.clients.get(&client_id).cloned() } pub fn get_clients(&self) -> Vec> { diff --git a/server/src/streaming/partitions/persistence.rs b/server/src/streaming/partitions/persistence.rs index fc50817b5..fc9335e96 100644 --- a/server/src/streaming/partitions/persistence.rs +++ b/server/src/streaming/partitions/persistence.rs @@ -52,19 +52,19 @@ impl Partition { .delete_consumer_offsets(&self.consumer_offsets_path) .await .with_error_context(|_| { - format!("{COMPONENT} - failed to delete consumer offsets, partition: {self}") + format!("{COMPONENT} - failed to delete consumer offsets in partition: {self}") })?; self.storage .partition .delete_consumer_offsets(&self.consumer_group_offsets_path) .await .with_error_context(|_| { - format!("{COMPONENT} - failed to delete consumer offsets, partition: {self}") + format!("{COMPONENT} - failed to delete consumer offsets in partition: {self}") })?; self.add_persisted_segment(0) .await .with_error_context(|_| { - format!("{COMPONENT} - failed to add persisted segment, partition: {self}",) + format!("{COMPONENT} - failed to add persisted segment in partition: {self}",) })?; if !Path::new(&self.consumer_offsets_path).exists() diff --git a/server/src/streaming/streams/topics.rs b/server/src/streaming/streams/topics.rs index 9f1143ff8..c4f10f9de 100644 --- a/server/src/streaming/streams/topics.rs +++ b/server/src/streaming/streams/topics.rs @@ -158,6 +158,17 @@ impl Stream { self.topics.values().collect() } + pub fn try_get_topic(&self, identifier: &Identifier) -> Result, IggyError> { + match identifier.kind { + IdKind::Numeric => Ok(self.topics.get(&identifier.get_u32_value()?)), + IdKind::String => Ok(self.try_get_topic_by_name(&identifier.get_cow_str_value()?)), + } + } + + fn try_get_topic_by_name(&self, name: &str) -> Option<&Topic> { + self.topics_ids.get(name).and_then(|id| self.topics.get(id)) + } + pub fn get_topic(&self, identifier: &Identifier) -> Result<&Topic, IggyError> { match identifier.kind { IdKind::Numeric => self.get_topic_by_id(identifier.get_u32_value()?), diff --git a/server/src/streaming/systems/clients.rs b/server/src/streaming/systems/clients.rs index 6809bd407..97ed569af 100644 --- a/server/src/streaming/systems/clients.rs +++ b/server/src/streaming/systems/clients.rs @@ -58,28 +58,23 @@ impl System { } } - // TODO change errir message for permissiioner pub async fn get_client( &self, session: &Session, client_id: u32, - ) -> Result, IggyError> { + ) -> Result>, IggyError> { self.ensure_authenticated(session)?; self.permissioner .get_client(session.get_user_id()) .with_error_context(|_| { format!( - "{COMPONENT} - failed to get client by user ID: {}", + "{COMPONENT} - permission denied to get client with ID: {client_id} by user ID: {}", session.get_user_id() ) })?; let client_manager = self.client_manager.read().await; - client_manager - .get_client(client_id) - .with_error_context(|_| { - format!("{COMPONENT} - failed to get client with ID {}", client_id) - }) + Ok(client_manager.try_get_client(client_id)) } pub async fn get_clients( diff --git a/server/src/streaming/systems/consumer_groups.rs b/server/src/streaming/systems/consumer_groups.rs index 4e72d9172..9ccf63de6 100644 --- a/server/src/streaming/systems/consumer_groups.rs +++ b/server/src/streaming/systems/consumer_groups.rs @@ -15,25 +15,22 @@ impl System { stream_id: &Identifier, topic_id: &Identifier, group_id: &Identifier, - ) -> Result<&RwLock, IggyError> { + ) -> Result>, IggyError> { self.ensure_authenticated(session)?; - let topic = self.find_topic(session, stream_id, topic_id) - .with_error_context(|_| format!("{COMPONENT} - topic not found for stream_id: {stream_id}, topic_id: {topic_id}"))?; + let Some(topic) = self.try_find_topic(session, stream_id, topic_id)? else { + return Ok(None); + }; self.permissioner .get_consumer_group(session.get_user_id(), topic.stream_id, topic.topic_id) .with_error_context(|_| { format!( - "{COMPONENT} - permission denied to get consumer group for user {} on stream_id: {}, topic_id: {}", + "{COMPONENT} - permission denied to get consumer group with ID: {group_id} for user with ID: {} in topic with ID: {topic_id} and stream with ID: {stream_id}", session.get_user_id(), - topic.stream_id, - topic.topic_id ) })?; - topic.get_consumer_group(group_id).with_error_context(|_| { - format!("{COMPONENT} - consumer group not found for group_id: {group_id}") - }) + topic.try_get_consumer_group(group_id) } pub fn get_consumer_groups( @@ -127,7 +124,7 @@ impl System { consumer_group = topic.delete_consumer_group(consumer_group_id) .await - .with_error_context(|_| format!("{COMPONENT} - failed to delete consumer group for consumer_group_id: {consumer_group_id}"))?; + .with_error_context(|_| format!("{COMPONENT} - failed to delete consumer group with ID: {consumer_group_id}"))? } let client_manager = self.client_manager.read().await; @@ -273,7 +270,7 @@ impl System { { let stream = self.get_stream(stream_id).with_error_context(|_| { - format!("{COMPONENT} - failed to get stream with id: {stream_id}") + format!("{COMPONENT} - failed to get stream with ID: {stream_id}") })?; let topic = stream.get_topic(topic_id) .with_error_context(|_| { diff --git a/server/src/streaming/systems/consumer_offsets.rs b/server/src/streaming/systems/consumer_offsets.rs index 38f84c847..e78fb0aea 100644 --- a/server/src/streaming/systems/consumer_offsets.rs +++ b/server/src/streaming/systems/consumer_offsets.rs @@ -18,7 +18,8 @@ impl System { offset: u64, ) -> Result<(), IggyError> { self.ensure_authenticated(session)?; - let topic = self.find_topic(session, stream_id, topic_id).with_error_context(|_| format!("{COMPONENT} - topic not found for stream_id: {stream_id}, topic_id: {topic_id}"))?; + let topic = self.find_topic(session, stream_id, topic_id) + .with_error_context(|_| format!("{COMPONENT} - topic with ID: {topic_id} was not found in stream with ID: {stream_id}"))?; self.permissioner.store_consumer_offset( session.get_user_id(), topic.stream_id, @@ -39,17 +40,18 @@ impl System { partition_id: Option, ) -> Result, IggyError> { self.ensure_authenticated(session)?; - let topic = self.find_topic(session, stream_id, topic_id).with_error_context(|_| format!("{COMPONENT} - topic not found for stream_id: {stream_id}, topic_id: {topic_id}"))?; + let Some(topic) = self.try_find_topic(session, stream_id, topic_id)? else { + return Ok(None); + }; + self.permissioner.get_consumer_offset( session.get_user_id(), topic.stream_id, topic.topic_id, ).with_error_context(|_| { format!( - "{COMPONENT} - permission denied to get consumer group for user {} on stream_id: {}, topic_id: {}", + "{COMPONENT} - permission denied to get consumer offset for user with ID: {}, consumer: {consumer} in topic with ID: {topic_id} and stream with ID: {stream_id}", session.get_user_id(), - topic.stream_id, - topic.topic_id ) })?; @@ -67,12 +69,18 @@ impl System { partition_id: Option, ) -> Result<(), IggyError> { self.ensure_authenticated(session)?; - let topic = self.find_topic(session, stream_id, topic_id).with_error_context(|_| format!("{COMPONENT} - topic not found for stream_id: {stream_id}, topic_id: {topic_id}"))?; + let topic = self.find_topic(session, stream_id, topic_id) + .with_error_context(|_| format!("{COMPONENT} - topic with ID: {topic_id} was not found in stream with ID: {stream_id}"))?; self.permissioner.delete_consumer_offset( session.get_user_id(), topic.stream_id, topic.topic_id, - )?; + ).with_error_context(|_| { + format!( + "{COMPONENT} - permission denied to delete consumer offset for user with ID: {}, consumer: {consumer} in topic with ID: {topic_id} and stream with ID: {stream_id}", + session.get_user_id(), + ) + })?; topic .delete_consumer_offset(consumer, partition_id, session.client_id) diff --git a/server/src/streaming/systems/partitions.rs b/server/src/streaming/systems/partitions.rs index 7d2e3eb04..7a53fda74 100644 --- a/server/src/streaming/systems/partitions.rs +++ b/server/src/streaming/systems/partitions.rs @@ -82,7 +82,7 @@ impl System { .delete_persisted_partitions(partitions_count) .await .with_error_context(|_| { - format!("{COMPONENT} - failed to delete persisted partitions, topic: {topic}") + format!("{COMPONENT} - failed to delete persisted partitions for topic: {topic}") })?; topic.reassign_consumer_groups().await; if let Some(partitions) = partitions { diff --git a/server/src/streaming/systems/streams.rs b/server/src/streaming/systems/streams.rs index 6f96681d5..f3f09abca 100644 --- a/server/src/streaming/systems/streams.rs +++ b/server/src/streaming/systems/streams.rs @@ -180,6 +180,40 @@ impl System { stream } + pub fn try_find_stream( + &self, + session: &Session, + identifier: &Identifier, + ) -> Result, IggyError> { + self.ensure_authenticated(session)?; + let Some(stream) = self.try_get_stream(identifier)? else { + return Ok(None); + }; + + self.permissioner + .get_stream(session.get_user_id(), stream.stream_id) + .with_error_context(|_| { + format!( + "{COMPONENT} - permission denied to get stream with ID: {identifier} for user with ID: {}", + session.get_user_id(), + ) + })?; + Ok(Some(stream)) + } + + pub fn try_get_stream(&self, identifier: &Identifier) -> Result, IggyError> { + match identifier.kind { + IdKind::Numeric => Ok(self.streams.get(&identifier.get_u32_value()?)), + IdKind::String => Ok(self.try_get_stream_by_name(&identifier.get_cow_str_value()?)), + } + } + + fn try_get_stream_by_name(&self, name: &str) -> Option<&Stream> { + self.streams_ids + .get(name) + .and_then(|id| self.streams.get(id)) + } + pub fn get_stream(&self, identifier: &Identifier) -> Result<&Stream, IggyError> { match identifier.kind { IdKind::Numeric => self.get_stream_by_id(identifier.get_u32_value()?), @@ -287,7 +321,7 @@ impl System { let stream_id; { let stream = self.get_stream(id).with_error_context(|_| { - format!("{COMPONENT} - failed to get stream with id: {id}") + format!("{COMPONENT} - failed to get stream with ID: {id}") })?; stream_id = stream.stream_id; } @@ -337,7 +371,7 @@ impl System { self.ensure_authenticated(session)?; let stream = self .get_stream(id) - .with_error_context(|_| format!("{COMPONENT} - failed to get stream with id: {id}"))?; + .with_error_context(|_| format!("{COMPONENT} - failed to get stream with ID: {id}"))?; let stream_id = stream.stream_id; self.permissioner .delete_stream(session.get_user_id(), stream_id) @@ -379,7 +413,7 @@ impl System { stream_id: &Identifier, ) -> Result<(), IggyError> { let stream = self.get_stream(stream_id).with_error_context(|_| { - format!("{COMPONENT} - failed to get stream with id: {stream_id}") + format!("{COMPONENT} - failed to get stream with ID: {stream_id}") })?; self.permissioner .purge_stream(session.get_user_id(), stream.stream_id) diff --git a/server/src/streaming/systems/topics.rs b/server/src/streaming/systems/topics.rs index 04082fee0..e06bcf70c 100644 --- a/server/src/streaming/systems/topics.rs +++ b/server/src/streaming/systems/topics.rs @@ -21,7 +21,7 @@ impl System { let stream = self .find_stream(session, stream_id) .with_error_context(|_| { - format!("{COMPONENT} - failed to find stream with id: {stream_id}") + format!("{COMPONENT} - failed to find stream with ID: {stream_id}") })?; let topic = stream.get_topic(topic_id); if let Ok(topic) = topic { @@ -29,10 +29,8 @@ impl System { .get_topic(session.get_user_id(), stream.stream_id, topic.topic_id) .with_error_context(|_| { format!( - "{COMPONENT} - permission denied to get topic for user with id: {}, stream ID: {}, topic ID: {}", + "{COMPONENT} - permission denied to get topic with ID: {topic_id} in stream with ID: {stream_id} for user with ID: {}", session.get_user_id(), - stream.stream_id, - topic.topic_id, ) })?; return Ok(topic); @@ -48,20 +46,50 @@ impl System { ) -> Result, IggyError> { self.ensure_authenticated(session)?; let stream = self.get_stream(stream_id).with_error_context(|_| { - format!("{COMPONENT} - failed to get stream with id: {stream_id}") + format!("{COMPONENT} - failed to get stream with ID: {stream_id}") })?; self.permissioner .get_topics(session.get_user_id(), stream.stream_id) .with_error_context(|_| { format!( - "{COMPONENT} - permission denied to get topics for user with id: {}, stream ID: {}", + "{COMPONENT} - permission denied to get topics in stream with ID: {stream_id} for user with ID: {}", session.get_user_id(), - stream.stream_id, ) })?; Ok(stream.get_topics()) } + pub fn try_find_topic( + &self, + session: &Session, + stream_id: &Identifier, + topic_id: &Identifier, + ) -> Result, IggyError> { + self.ensure_authenticated(session)?; + let Some(stream) = self + .try_find_stream(session, stream_id) + .with_error_context(|_| { + format!("{COMPONENT} - failed to find stream with ID: {stream_id}") + })? + else { + return Ok(None); + }; + + let Some(topic) = stream.try_get_topic(topic_id)? else { + return Ok(None); + }; + + self.permissioner + .get_topic(session.get_user_id(), stream.stream_id, topic.topic_id) + .with_error_context(|_| { + format!( + "{COMPONENT} - permission denied to get topic with ID: {topic_id} in stream with ID: {stream_id} for user with ID: {}", + session.get_user_id(), + ) + })?; + Ok(Some(topic)) + } + #[allow(clippy::too_many_arguments)] pub async fn create_topic( &mut self, @@ -78,15 +106,14 @@ impl System { self.ensure_authenticated(session)?; { let stream = self.get_stream(stream_id).with_error_context(|_| { - format!("{COMPONENT} - failed to get stream with id: {stream_id}") + format!("{COMPONENT} - failed to get stream with ID: {stream_id}") })?; self.permissioner .create_topic(session.get_user_id(), stream.stream_id) .with_error_context(|_| { format!( - "{COMPONENT} - permission denied to create topic for user with id: {}, stream ID: {}", + "{COMPONENT} - permission denied to create topic with name: {name} in stream with ID: {stream_id} for user with ID: {}", session.get_user_id(), - stream.stream_id, ) })?; } @@ -104,7 +131,7 @@ impl System { ) .await .with_error_context(|_| { - format!("{COMPONENT} - failed to create topic, stream ID: {stream_id}") + format!("{COMPONENT} - failed to create topic with name: {name} in stream ID: {stream_id}") })?; self.metrics.increment_topics(1); @@ -113,13 +140,12 @@ impl System { self.get_stream(stream_id) .with_error_context(|_| { - format!("{COMPONENT} - failed to get stream with id: {stream_id}") + format!("{COMPONENT} - failed to get stream with ID: {stream_id}") })? .get_topic(&created_topic_id.try_into()?) .with_error_context(|_| { format!( - "{COMPONENT} - failed to get topic with id: {}", - created_topic_id + "{COMPONENT} - failed to get created topic with ID: {created_topic_id} in stream with ID: {stream_id}", ) }) } @@ -141,7 +167,7 @@ impl System { let topic = self .find_topic(session, stream_id, topic_id) .with_error_context(|_| { - format!("{COMPONENT} - failed to find topic with id: {topic_id}") + format!("{COMPONENT} - failed to find topic with ID: {topic_id}") })?; self.permissioner.update_topic( session.get_user_id(), @@ -169,8 +195,7 @@ impl System { .await .with_error_context(|_| { format!( - "{COMPONENT} - failed to update topic, stream ID: {}, topic ID: {}", - stream_id, topic_id + "{COMPONENT} - failed to update topic with ID: {topic_id} in stream with ID: {stream_id}", ) })?; @@ -179,11 +204,11 @@ impl System { // TODO: if replication_factor is changed, we need to do `something` self.get_stream(stream_id) .with_error_context(|_| { - format!("{COMPONENT} - failed to get stream with id: {stream_id}") + format!("{COMPONENT} - failed to get stream with ID: {stream_id}") })? .get_topic(topic_id) .with_error_context(|_| { - format!("{COMPONENT} - failed to get topic with id: {topic_id}") + format!("{COMPONENT} - failed to get topic with ID: {topic_id} in stream with ID: {stream_id}") }) } @@ -199,7 +224,7 @@ impl System { let topic = self .find_topic(session, stream_id, topic_id) .with_error_context(|_| { - format!("{COMPONENT} - failed to find topic with id: {topic_id}") + format!("{COMPONENT} - failed to find topic with ID: {topic_id} in stream with ID: {stream_id}") })?; self.permissioner.delete_topic( session.get_user_id(), @@ -207,10 +232,8 @@ impl System { topic.topic_id, ).with_error_context(|_| { format!( - "{COMPONENT} - permission denied to delete topic for user with id: {}, stream ID: {}, topic ID: {}", + "{COMPONENT} - permission denied to delete topic with ID: {topic_id} in stream with ID: {stream_id} for user with ID: {}", session.get_user_id(), - topic.stream_id, - topic.topic_id, ) })?; stream_id_value = topic.stream_id; @@ -220,7 +243,7 @@ impl System { .get_stream_mut(stream_id)? .delete_topic(topic_id) .await - .with_error_context(|_| format!("{COMPONENT} - failed to delete topic, stream ID: {stream_id}, topic ID: {topic_id}"))?; + .with_error_context(|_| format!("{COMPONENT} - failed to delete topic with ID: {topic_id} in stream with ID: {stream_id}"))?; self.metrics.decrement_topics(1); self.metrics @@ -244,20 +267,18 @@ impl System { let topic = self .find_topic(session, stream_id, topic_id) .with_error_context(|_| { - format!("{COMPONENT} - failed to find topic with id: {topic_id}") + format!("{COMPONENT} - failed to find topic with ID: {topic_id} in stream with ID: {stream_id}") })?; self.permissioner .purge_topic(session.get_user_id(), topic.stream_id, topic.topic_id) .with_error_context(|_| { format!( - "{COMPONENT} - permission denied to purge topic for user with id: {}, stream ID: {}, topic ID: {}", + "{COMPONENT} - permission denied to purge topic with ID: {topic_id} in stream with ID: {stream_id} for user with ID: {}", session.get_user_id(), - topic.stream_id, - topic.topic_id, ) })?; topic.purge().await.with_error_context(|_| { - format!("{COMPONENT} - failed to purge topic with id: {topic_id}") + format!("{COMPONENT} - failed to purge topic with ID: {topic_id} in stream with ID: {stream_id}") }) } } diff --git a/server/src/streaming/systems/users.rs b/server/src/streaming/systems/users.rs index e2541348d..4b9537f26 100644 --- a/server/src/streaming/systems/users.rs +++ b/server/src/streaming/systems/users.rs @@ -118,38 +118,43 @@ impl System { User::root(&username, &password) } - pub fn find_user(&self, session: &Session, user_id: &Identifier) -> Result<&User, IggyError> { + pub fn find_user( + &self, + session: &Session, + user_id: &Identifier, + ) -> Result, IggyError> { self.ensure_authenticated(session)?; - let user = self.get_user(user_id); - if let Ok(user) = user { - let session_user_id = session.get_user_id(); - if user.id != session_user_id { - self.permissioner.get_user(session_user_id).with_error_context(|_| { - format!( - "{COMPONENT} - permission denied to get user for user with id: {session_user_id}" - ) - })?; - } + let Some(user) = self.try_get_user(user_id)? else { + return Ok(None); + }; - return Ok(user); + let session_user_id = session.get_user_id(); + if user.id != session_user_id { + self.permissioner.get_user(session_user_id).with_error_context(|_| { + format!( + "{COMPONENT} - permission denied to get user with ID: {user_id} for current user with ID: {session_user_id}" + ) + })?; } - user + Ok(Some(user)) } pub fn get_user(&self, user_id: &Identifier) -> Result<&User, IggyError> { + self.try_get_user(user_id)? + .ok_or(IggyError::ResourceNotFound(user_id.to_string())) + } + + pub fn try_get_user(&self, user_id: &Identifier) -> Result, IggyError> { match user_id.kind { - IdKind::Numeric => self - .users - .get(&user_id.get_u32_value()?) - .ok_or(IggyError::ResourceNotFound(user_id.to_string())), + IdKind::Numeric => Ok(self.users.get(&user_id.get_u32_value()?)), IdKind::String => { let username = user_id.get_cow_str_value()?; - self.users + Ok(self + .users .iter() .find(|(_, user)| user.username == username) - .map(|(_, user)| user) - .ok_or(IggyError::ResourceNotFound(user_id.to_string())) + .map(|(_, user)| user)) } } } @@ -266,7 +271,7 @@ impl System { .await .with_error_context(|_| { format!( - "{COMPONENT} - failed to delete clients for user with id: {existing_user_id}" + "{COMPONENT} - failed to delete clients for user with ID: {existing_user_id}" ) })?; info!("Deleted user: {existing_username} with ID: {user_id}."); diff --git a/server/src/streaming/topics/consumer_groups.rs b/server/src/streaming/topics/consumer_groups.rs index 7169b3ed2..275f0d1c8 100644 --- a/server/src/streaming/topics/consumer_groups.rs +++ b/server/src/streaming/topics/consumer_groups.rs @@ -40,6 +40,24 @@ impl Topic { } } + pub fn try_get_consumer_group( + &self, + identifier: &Identifier, + ) -> Result>, IggyError> { + match identifier.kind { + IdKind::Numeric => Ok(self.consumer_groups.get(&identifier.get_u32_value()?)), + IdKind::String => { + Ok(self.try_get_consumer_group_by_name(&identifier.get_cow_str_value()?)) + } + } + } + + fn try_get_consumer_group_by_name(&self, name: &str) -> Option<&RwLock> { + self.consumer_groups_ids + .get(name) + .and_then(|id| self.consumer_groups.get(id)) + } + pub fn get_consumer_group_by_name( &self, name: &str, diff --git a/server/src/streaming/topics/consumer_offsets.rs b/server/src/streaming/topics/consumer_offsets.rs index c5f9d9e1b..292828148 100644 --- a/server/src/streaming/topics/consumer_offsets.rs +++ b/server/src/streaming/topics/consumer_offsets.rs @@ -54,19 +54,21 @@ impl Topic { let Some((polling_consumer, partition_id)) = self .resolve_consumer_with_partition_id(consumer, client_id, partition_id, false) .await - .with_error_context(|_| format!("{COMPONENT} - failed to resolve consumer with partition id, consumer: {consumer}, client ID: {client_id}, partition ID: {:?}", partition_id))? else { + .with_error_context(|_| format!("{COMPONENT} - failed to resolve consumer offset for consumer: {consumer}, client ID: {client_id}, partition ID: {:#?}", partition_id))? else { return Ok(None); }; let partition = self.get_partition(partition_id).with_error_context(|_| { - format!("{COMPONENT} - failed to get partition with id: {partition_id}") + format!("{COMPONENT} - failed to get partition with ID: {partition_id}") })?; let partition = partition.read().await; let offset = partition .get_consumer_offset(polling_consumer) .await .with_error_context(|_| { - format!("{COMPONENT} - failed to get consumer offset, consumer: {polling_consumer}") + format!( + "{COMPONENT} - failed to get consumer offset for consumer: {polling_consumer}" + ) })?; let Some(offset) = offset else { return Ok(None); @@ -101,7 +103,8 @@ impl Topic { .await .with_error_context(|_| { format!( - "{COMPONENT} - failed to delete consumer offset, consumer: {polling_consumer}" + "{COMPONENT} - failed to delete consumer offset for consumer: {polling_consumer}, in topic with ID: {}, partition ID: {partition_id}", + self.topic_id ) }) } diff --git a/server/src/streaming/topics/partitions.rs b/server/src/streaming/topics/partitions.rs index 6eab79820..bca77baf9 100644 --- a/server/src/streaming/topics/partitions.rs +++ b/server/src/streaming/topics/partitions.rs @@ -94,8 +94,8 @@ impl Topic { messages_count += partition_messages_count; partition.delete().await.with_error_context(|_| { format!( - "{COMPONENT} - failed to delete partition with id: {}", - partition.partition_id + "{COMPONENT} - failed to delete partition with ID: {partition_id} in topic with ID: {}", + self.topic_id ) })?; } diff --git a/server/src/streaming/topics/persistence.rs b/server/src/streaming/topics/persistence.rs index e2d91ae3e..95deaf6c2 100644 --- a/server/src/streaming/topics/persistence.rs +++ b/server/src/streaming/topics/persistence.rs @@ -21,8 +21,8 @@ impl Topic { let mut partition = partition.write().await; partition.delete().await.with_error_context(|_| { format!( - "{COMPONENT} - failed to delete partition with id: {}", - partition.partition_id + "{COMPONENT} - failed to delete partition with ID: {} in topic with ID: {}", + partition.partition_id, self.topic_id ) })?; } diff --git a/tools/Cargo.toml b/tools/Cargo.toml index 9efb7a1b7..1e193bc9b 100644 --- a/tools/Cargo.toml +++ b/tools/Cargo.toml @@ -9,7 +9,7 @@ name = "data-seeder-tool" path = "src/data-seeder/main.rs" [dependencies] -anyhow = "1.0.95" +anyhow = "1.0.96" clap = { version = "4.5.30", features = ["derive"] } iggy = { path = "../sdk" } rand = "0.9.0"