-
Notifications
You must be signed in to change notification settings - Fork 119
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
[ISSUE #2336]🤡Implement PopLongPollingService notifyMessageArriving🧑💻 #2337
Conversation
WalkthroughThis pull request introduces enhancements to the RocketMQ broker's long polling service and related components. The changes include adding a new dependency Changes
Assessment against linked issues
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2337 +/- ##
==========================================
- Coverage 28.25% 28.22% -0.03%
==========================================
Files 504 504
Lines 72500 72577 +77
==========================================
+ Hits 20485 20486 +1
- Misses 52015 52091 +76 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🔭 Outside diff range comments (1)
rocketmq-broker/src/long_polling/pop_request.rs (1)
Line range hint
34-55
: Correct thecomplete
method implementationThe
complete
method usesunwrap_or_default()
on the result ofcompare_exchange
, which may not accurately reflect whether the atomic exchange was successful. Usingis_ok()
will return a boolean indicating if the state was successfully changed fromfalse
totrue
.Apply this diff to fix the method:
pub fn complete(&self) -> bool { self.complete .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) - .unwrap_or_default() + .is_ok() }
🧹 Nitpick comments (3)
rocketmq-common/src/common/broker/broker_config.rs (2)
203-203
: Add documentation forpop_polling_map_size
The new field
pop_polling_map_size
has been added toBrokerConfig
. To improve maintainability and clarity, consider adding documentation comments to explain its purpose, default value, and how it affects the system.
309-309
: Consider makingpop_polling_map_size
configurableCurrently,
pop_polling_map_size
is set to100000
. If this value might need to be adjusted based on deployment size or performance considerations, consider exposing it as a configurable parameter through a configuration file or command-line argument.rocketmq-broker/src/processor/pop_message_processor.rs (1)
104-107
: Document the purpose of the boolean parameter.The
false
parameter's purpose is not immediately clear. Consider adding a named parameter or documentation to clarify its intent.Apply this diff to improve clarity:
- pop_long_polling_service: ArcMut::new(PopLongPollingService::new( - broker_runtime_inner.clone(), - false, - )), + pop_long_polling_service: ArcMut::new(PopLongPollingService::new( + broker_runtime_inner.clone(), + is_order_enabled: false, // Document why this is false by default + )),
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lock
is excluded by!**/*.lock
📒 Files selected for processing (5)
rocketmq-broker/Cargo.toml
(1 hunks)rocketmq-broker/src/long_polling/long_polling_service/pop_long_polling_service.rs
(3 hunks)rocketmq-broker/src/long_polling/pop_request.rs
(2 hunks)rocketmq-broker/src/processor/pop_message_processor.rs
(2 hunks)rocketmq-common/src/common/broker/broker_config.rs
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: build (windows-latest, nightly)
- GitHub Check: build (windows-latest, stable)
- GitHub Check: build (macos-latest, nightly)
- GitHub Check: build (macos-latest, stable)
- GitHub Check: test
- GitHub Check: build (ubuntu-latest, nightly)
- GitHub Check: build
- GitHub Check: build (ubuntu-latest, stable)
- GitHub Check: auto-approve
🔇 Additional comments (3)
rocketmq-broker/src/long_polling/long_polling_service/pop_long_polling_service.rs (1)
88-113
: Handle unmatched messages appropriately innotify_message_arriving
When
match_result
isfalse
, the code re-inserts thepop_request
intoremoting_commands
. However, sincepop_request
was removed usingpop_front
orpop_back
, re-inserting it might cause inconsistencies or race conditions. Ensure that the concurrency control is properly managed when modifying theremoting_commands
set.rocketmq-broker/src/long_polling/pop_request.rs (1)
30-30
: Verify that derivingClone
forPopRequest
is safeBy adding
#[derive(Clone)]
, all fields withinPopRequest
are now cloned whenclone()
is called. Ensure that this behavior is safe and that deep copies of fields likeArc<AtomicBool>
andArc<Box<dyn MessageFilter>>
do not lead to unintended side effects or performance issues.rocketmq-broker/src/processor/pop_message_processor.rs (1)
85-85
: LGTM! Generic type parameter addition improves type safety.The change to make
PopLongPollingService
generic overMS
aligns with the generic context of the containing struct.
pub fn wake_up(&self, pop_request: PopRequest) -> bool { | ||
unimplemented!("PopLongPollingService::wake_up") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implement the wake_up
method
The wake_up
method is currently unimplemented, but it is essential for notifying pop requests when messages arrive. Please provide an implementation to ensure proper functionality of the long polling service.
loop { | ||
if self.notify_last { | ||
pop_request = remoting_commands | ||
.pop_back() | ||
.map(|entry| entry.value().clone()); | ||
} else { | ||
pop_request = remoting_commands | ||
.pop_front() | ||
.map(|entry| entry.value().clone()); | ||
} | ||
|
||
if let Some(ref request) = pop_request { | ||
self.total_polling_num.fetch_sub(1, Ordering::AcqRel); | ||
} else { | ||
break; | ||
} | ||
} | ||
pop_request | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential infinite loop in poll_remoting_commands
method
The loop in the poll_remoting_commands
method may not exit as expected. If a pop_request
is found (Some
), the loop continues without a break condition, which could lead to an infinite loop and drain all entries from remoting_commands
. Consider adding a break statement after adjusting total_polling_num
to exit the loop appropriately.
Apply this diff to fix the loop:
loop {
if self.notify_last {
pop_request = remoting_commands
.pop_back()
.map(|entry| entry.value().clone());
} else {
pop_request = remoting_commands
.pop_front()
.map(|entry| entry.value().clone());
}
if let Some(ref request) = pop_request {
self.total_polling_num.fetch_sub(1, Ordering::AcqRel);
+ break; // Exit the loop after processing a pop_request
} else {
break;
}
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
loop { | |
if self.notify_last { | |
pop_request = remoting_commands | |
.pop_back() | |
.map(|entry| entry.value().clone()); | |
} else { | |
pop_request = remoting_commands | |
.pop_front() | |
.map(|entry| entry.value().clone()); | |
} | |
if let Some(ref request) = pop_request { | |
self.total_polling_num.fetch_sub(1, Ordering::AcqRel); | |
} else { | |
break; | |
} | |
} | |
pop_request | |
} | |
loop { | |
if self.notify_last { | |
pop_request = remoting_commands | |
.pop_back() | |
.map(|entry| entry.value().clone()); | |
} else { | |
pop_request = remoting_commands | |
.pop_front() | |
.map(|entry| entry.value().clone()); | |
} | |
if let Some(ref request) = pop_request { | |
self.total_polling_num.fetch_sub(1, Ordering::AcqRel); | |
break; // Exit the loop after processing a pop_request | |
} else { | |
break; | |
} | |
} | |
pop_request |
|
||
crossbeam-skiplist = "0.1" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Update crossbeam-skiplist to version 0.1.3
The current version "0.1" should be updated to "0.1.3" to get the latest fixes. The package is actively maintained (last updated January 2024) and has no known security vulnerabilities.
🔗 Analysis chain
Verify the version of crossbeam-skiplist.
The version "0.1" seems outdated. Let's verify if a newer version is available and check for any security advisories.
Run this script to check available versions and security advisories:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for latest versions and security advisories of crossbeam-skiplist
# Check crates.io for latest versions
curl -s https://crates.io/api/v1/crates/crossbeam-skiplist | jq '.crate.max_version'
# Check for security advisories
gh api graphql -f query='
{
securityVulnerabilities(first: 5, ecosystem: RUST, package: "crossbeam-skiplist") {
nodes {
advisory {
summary
severity
publishedAt
}
vulnerableVersionRange
firstPatchedVersion {
identifier
}
}
}
}'
Length of output: 445
Script:
#!/bin/bash
# Check the last update time and repository info for crossbeam-skiplist
curl -s https://crates.io/api/v1/crates/crossbeam-skiplist | jq '{updated_at: .crate.updated_at, repository: .crate.repository}'
Length of output: 246
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Which Issue(s) This PR Fixes(Closes)
Fixes #2336
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
Release Notes
New Features
Improvements
crossbeam-skiplist
dependencyPopRequest
to support better shared state managementTechnical Updates