Skip to content

Commit

Permalink
[ISSUE #67]:rocket:Namesrv support get broker member group(request co…
Browse files Browse the repository at this point in the history
…de 901) (#72)
  • Loading branch information
mxsm authored Jan 21, 2024
1 parent 08dd4b2 commit e6e2792
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 7 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ Feature list:
| Register Broker | 103 | :heart: | |
| Unregister Broker | 104 | :broken_heart: :x: | |
| Broker Heartbeat | 904 | :sparkling_heart: :white_check_mark: | |
| Get broker member_group | 901 | :broken_heart: :x: | |
| Get broker member_group | 901 | :sparkling_heart: :white_check_mark: | |
| Get broker cluster info | 106 | :broken_heart: :x: | |
| Wipe write perm of boker | 205 | :broken_heart: :x: | |
| Add write perm of brober | 327 | :broken_heart: :x: | |
Expand Down
2 changes: 1 addition & 1 deletion rocketmq-namesrv/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Feature list:
| Register Broker | 103 | :heart: :white_check_mark: | |
| Unregister Broker | 104 | :broken_heart: :x: | |
| Broker Heartbeat | 904 | :sparkling_heart: :white_check_mark: | |
| Get broker member_group | 901 | :broken_heart: :x: | |
| Get broker member_group | 901 | :sparkling_heart: :white_check_mark: | |
| Get broker cluster info | 106 | :broken_heart: :x: | |
| Wipe write perm of boker | 205 | :broken_heart: :x: | |
| Add write perm of brober | 327 | :broken_heart: :x: | |
Expand Down
26 changes: 24 additions & 2 deletions rocketmq-namesrv/src/processor/default_request_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,17 @@ use rocketmq_remoting::{
code::{request_code::RequestCode, response_code::RemotingSysResponseCode},
protocol::{
body::{
broker_body::register_broker_body::RegisterBrokerBody,
broker_body::{
broker_member_group::GetBrokerMemberGroupResponseBody,
register_broker_body::RegisterBrokerBody,
},
topic_info_wrapper::topic_config_wrapper::TopicConfigAndMappingSerializeWrapper,
},
header::namesrv::{
broker_request::{BrokerHeartbeatRequestHeader, UnRegisterBrokerRequestHeader},
broker_request::{
BrokerHeartbeatRequestHeader, GetBrokerMemberGroupRequestHeader,
UnRegisterBrokerRequestHeader,
},
kv_config_header::{
DeleteKVConfigRequestHeader, GetKVConfigRequestHeader, GetKVConfigResponseHeader,
PutKVConfigRequestHeader,
Expand Down Expand Up @@ -68,6 +74,7 @@ impl RequestProcessor for DefaultRequestProcessor {
Some(RequestCode::RegisterBroker) => self.process_register_broker(request),
Some(RequestCode::UnregisterBroker) => self.process_unregister_broker(request),
Some(RequestCode::BrokerHeartbeat) => self.process_broker_heartbeat(request),
Some(RequestCode::GetBrokerMemberGroup) => self.get_broker_member_group(request),
//handle get broker cluster info
Some(RequestCode::GetBrokerClusterInfo) => {
self.process_get_broker_cluster_info(request)
Expand Down Expand Up @@ -286,6 +293,21 @@ impl DefaultRequestProcessor {
);
RemotingCommand::create_response_command()
}

fn get_broker_member_group(&mut self, request: RemotingCommand) -> RemotingCommand {
let request_header = request
.decode_command_custom_header::<GetBrokerMemberGroupRequestHeader>()
.unwrap();

let broker_member_group = self.route_info_manager.write().get_broker_member_group(
request_header.cluster_name.as_str(),
request_header.broker_name.as_str(),
);
let response_body = GetBrokerMemberGroupResponseBody {
broker_member_group,
};
RemotingCommand::create_response_command().set_body(Some(response_body.encode()))
}
}

impl DefaultRequestProcessor {
Expand Down
18 changes: 17 additions & 1 deletion rocketmq-namesrv/src/route/route_info_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use rocketmq_remoting::{
code::request_code::RequestCode,
protocol::{
body::{
broker_body::cluster_info::ClusterInfo,
broker_body::{broker_member_group::BrokerMemberGroup, cluster_info::ClusterInfo},
topic_info_wrapper::topic_config_wrapper::TopicConfigAndMappingSerializeWrapper,
},
header::namesrv::brokerid_change_request_header::NotifyMinBrokerIdChangeRequestHeader,
Expand Down Expand Up @@ -627,4 +627,20 @@ impl RouteInfoManager {
value.last_update_timestamp = TimeUtils::get_current_millis() as i64;
}
}

pub(crate) fn get_broker_member_group(
&mut self,
cluster_name: &str,
broker_name: &str,
) -> Option<BrokerMemberGroup> {
if let Some(broker_data) = self.broker_addr_table.get(broker_name) {
let map = broker_data.broker_addrs().clone();
return Some(BrokerMemberGroup::new(
Some(cluster_name.to_string()),
Some(broker_name.to_string()),
Some(map),
));
}
None
}
}
1 change: 1 addition & 0 deletions rocketmq-remoting/src/protocol/body/broker_body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@
* limitations under the License.
*/

pub mod broker_member_group;
pub mod cluster_info;
pub mod register_broker_body;
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::collections::HashMap;

use serde::{Deserialize, Serialize};

use crate::protocol::RemotingSerializable;

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct BrokerMemberGroup {
pub cluster: Option<String>,
pub broker_name: Option<String>,
pub broker_addrs: Option<HashMap<i64, String>>,
}

impl BrokerMemberGroup {
pub fn new(
cluster: Option<String>,
broker_name: Option<String>,
broker_addrs: Option<HashMap<i64, String>>,
) -> Self {
Self {
cluster,
broker_name,
broker_addrs,
}
}
}

impl RemotingSerializable for BrokerMemberGroup {
type Output = BrokerMemberGroup;
}

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct GetBrokerMemberGroupResponseBody {
pub broker_member_group: Option<BrokerMemberGroup>,
}

impl RemotingSerializable for GetBrokerMemberGroupResponseBody {
type Output = GetBrokerMemberGroupResponseBody;
}
42 changes: 42 additions & 0 deletions rocketmq-remoting/src/protocol/header/namesrv/broker_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,45 @@ impl FromMap for BrokerHeartbeatRequestHeader {
})
}
}

#[derive(Debug, Clone, Deserialize, Serialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct GetBrokerMemberGroupRequestHeader {
pub cluster_name: String,
pub broker_name: String,
}

impl GetBrokerMemberGroupRequestHeader {
const CLUSTER_NAME: &'static str = "clusterName";

const BROKER_NAME: &'static str = "brokerName";

pub fn new(cluster_name: impl Into<String>, broker_name: impl Into<String>) -> Self {
Self {
cluster_name: cluster_name.into(),

broker_name: broker_name.into(),
}
}
}

impl CommandCustomHeader for GetBrokerMemberGroupRequestHeader {
fn to_map(&self) -> Option<HashMap<String, String>> {
Some(HashMap::from([
(Self::CLUSTER_NAME.to_string(), self.cluster_name.clone()),
(Self::BROKER_NAME.to_string(), self.broker_name.clone()),
]))
}
}

impl FromMap for GetBrokerMemberGroupRequestHeader {
type Target = Self;

fn from(map: &HashMap<String, String>) -> Option<Self::Target> {
Some(GetBrokerMemberGroupRequestHeader {
cluster_name: map.get(Self::CLUSTER_NAME).cloned().unwrap_or_default(),

broker_name: map.get(Self::BROKER_NAME).cloned().unwrap_or_default(),
})
}
}
6 changes: 4 additions & 2 deletions rocketmq-remoting/src/protocol/remoting_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,10 @@ impl RemotingCommand {
self
}

pub fn set_body(mut self, body: Option<Bytes>) -> Self {
self.body = body;
pub fn set_body(mut self, body: Option<impl Into<Bytes>>) -> Self {
if let Some(value) = body {
self.body = Some(value.into());
}
self
}
pub fn set_suspended(mut self, suspended: bool) -> Self {
Expand Down

0 comments on commit e6e2792

Please # to comment.