Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[ISSUE #2268]💫Optimize BrokerRuntime start method🧑‍💻 #2272

Merged
merged 1 commit into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 51 additions & 10 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -976,36 +976,47 @@ impl BrokerRuntime {
self.register_broker_all(true, false, true).await;
}

let cloned_broker_runtime = self.inner.clone();
let should_start_time = self.inner.should_start_time.clone();
let is_isolated = self.inner.is_isolated.clone();
let broker_config = self.inner.broker_config.clone();
//start register broker to name server scheduled task
let broker_runtime_inner = self.inner.clone();
self.broker_runtime
.as_ref()
.unwrap()
.get_handle()
.spawn(async move {
let period = Duration::from_millis(
10000.max(60000.min(broker_config.register_name_server_period)),
10000.max(
60000.min(
broker_runtime_inner
.broker_config
.register_name_server_period,
),
),
);
let initial_delay = Duration::from_secs(10);
tokio::time::sleep(initial_delay).await;
loop {
let start_time = should_start_time.load(Ordering::Relaxed);
let start_time = broker_runtime_inner
.should_start_time
.load(Ordering::Relaxed);
if get_current_millis() < start_time {
info!("Register to namesrv after {}", start_time);
continue;
}
if is_isolated.load(Ordering::Relaxed) {
if broker_runtime_inner.is_isolated.load(Ordering::Relaxed) {
info!("Skip register for broker is isolated");
continue;
}
// record current execution time
let current_execution_time = tokio::time::Instant::now();
// execute task
let this = cloned_broker_runtime.clone();
cloned_broker_runtime
.register_broker_all_inner(this, true, false, broker_config.force_register)
let this = broker_runtime_inner.clone();
broker_runtime_inner
.register_broker_all_inner(
this,
true,
false,
broker_runtime_inner.broker_config.force_register,
)
.await;
// Calculate the time of the next execution
let next_execution_time = current_execution_time + period;
Expand All @@ -1019,6 +1030,33 @@ impl BrokerRuntime {

if self.inner.broker_config.enable_slave_acting_master {
self.schedule_send_heartbeat();
let broker_runtime_inner = self.inner.clone();
self.broker_runtime
.as_ref()
.unwrap()
.get_handle()
.spawn(async move {
let period = Duration::from_secs(1);
let initial_delay = Duration::from_millis(
broker_runtime_inner
.broker_config
.sync_broker_member_group_period,
);
tokio::time::sleep(initial_delay).await;
loop {
// record current execution time
let current_execution_time = tokio::time::Instant::now();
// execute task
broker_runtime_inner.sync_broker_member_group();
// Calculate the time of the next execution
let next_execution_time = current_execution_time + period;

// Wait until the next execution
let delay = next_execution_time
.saturating_duration_since(tokio::time::Instant::now());
tokio::time::sleep(delay).await;
}
});
}

if self.inner.broker_config.enable_controller_mode {
Expand Down Expand Up @@ -2072,6 +2110,9 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {
pub fn get_broker_addr(&self) -> &CheetahString {
&self.broker_addr
}
pub fn sync_broker_member_group(&self) {
warn!("sync_broker_member_group not implemented");
}
Comment on lines +2113 to +2115
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Implement the sync_broker_member_group method.

The method is currently a placeholder that only logs a warning. Implementation is needed to complete the broker member group synchronization feature.

Would you like me to help implement this method? I can provide a solution that:

  1. Retrieves the current broker member group
  2. Synchronizes it with other brokers
  3. Handles any synchronization failures

}

fn need_register(
Expand Down
2 changes: 2 additions & 0 deletions rocketmq-common/src/common/broker/broker_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ pub struct BrokerConfig {
pub skip_when_ck_re_put_reach_max_times: bool,
pub compressed_register: bool,
pub broker_not_active_timeout_millis: i64,
pub sync_broker_member_group_period: u64,
}

impl Default for BrokerConfig {
Expand Down Expand Up @@ -303,6 +304,7 @@ impl Default for BrokerConfig {
skip_when_ck_re_put_reach_max_times: false,
compressed_register: false,
broker_not_active_timeout_millis: 10_000,
sync_broker_member_group_period: 1_000,
}
}
}
Expand Down
Loading