Skip to content

Commit

Permalink
[ISSUE #1495]🚀Optimize RemotingSerializable methods return type🔥 (#1517)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Dec 3, 2024
1 parent 4e7d5d9 commit 8e81525
Show file tree
Hide file tree
Showing 21 changed files with 162 additions and 69 deletions.
8 changes: 6 additions & 2 deletions rocketmq-broker/src/offset/manager/consumer_offset_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,13 @@ impl ConfigManager for ConsumerOffsetManager {

fn encode_pretty(&self, pretty_format: bool) -> String {
if pretty_format {
self.consumer_offset_wrapper.to_json_pretty()
self.consumer_offset_wrapper
.to_json_pretty()
.expect("encode pretty failed")
} else {
self.consumer_offset_wrapper.to_json()
self.consumer_offset_wrapper
.to_json()
.expect("encode failed")
}
}

Expand Down
7 changes: 5 additions & 2 deletions rocketmq-broker/src/out_api/broker_outer_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ impl BrokerOuterAPI {
queue_datas: vec![queue_data],
..Default::default()
};
let topic_route_body = topic_route_data.encode();
let topic_route_body = topic_route_data
.encode()
.expect("encode topic route data failed");

RemotingCommand::create_request_command(RequestCode::RegisterTopicInNamesrv, request_header)
.set_body(topic_route_body)
Expand Down Expand Up @@ -362,7 +364,8 @@ impl BrokerOuterAPI {
.remark()
.cloned()
.unwrap_or(CheetahString::empty())
.to_json(),
.to_json()
.expect("to json failed"),
"".to_string(),
))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ impl BatchMqHandler {
addr_map.remove(&self.inner.broker_config.broker_identity.broker_id);

let count_down_latch = CountDownLatch::new(addr_map.len() as u32);
let request_body = Bytes::from(request_body.encode());
let request_body =
Bytes::from(request_body.encode().expect("lockBatchMQ encode error"));
let mq_lock_map_arc = Arc::new(Mutex::new(mq_lock_map.clone()));
for (_, broker_addr) in addr_map {
let count_down_latch = count_down_latch.clone();
Expand Down Expand Up @@ -115,7 +116,10 @@ impl BatchMqHandler {
let response_body = LockBatchResponseBody {
lock_ok_mq_set: lock_ok_mqset,
};
Some(RemotingCommand::create_response_command().set_body(response_body.encode()))
Some(
RemotingCommand::create_response_command()
.set_body(response_body.encode().expect("lockBatchMQ encode error")),
)
}

pub async fn unlock_batch_mq(
Expand All @@ -134,7 +138,8 @@ impl BatchMqHandler {
);
} else {
request_body.only_this_broker = true;
let request_body = Bytes::from(request_body.encode());
let request_body =
Bytes::from(request_body.encode().expect("unlockBatchMQ encode error"));
for broker_addr in self.inner.broker_member_group.broker_addrs.values() {
match self
.inner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ impl ConsumerRequestHandler {
connection.set_client_addr(channel.remote_address().to_string().into());
body_data.get_connection_set().insert(connection);
}
let body = body_data.encode();
let body = body_data
.encode()
.expect("consumer connection list encode failed");
response.set_body_mut_ref(body);
Some(response)
}
Expand Down Expand Up @@ -213,7 +215,7 @@ impl ConsumerRequestHandler {
let new_consume_tps = consume_stats.get_consume_tps() + consume_tps;
consume_stats.set_consume_tps(new_consume_tps);
}
let body = consume_stats.encode();
let body = consume_stats.encode().expect("consume stats encode failed");
response.set_body_mut_ref(body);
Some(response)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,9 @@ impl TopicRequestHandler {
},
..Default::default()
};
let content = topic_config_and_mapping_serialize_wrapper.to_json();
let content = topic_config_and_mapping_serialize_wrapper
.to_json()
.expect("encode failed");
if !content.is_empty() {
response.set_body_mut_ref(content);
}
Expand All @@ -421,7 +423,7 @@ impl TopicRequestHandler {
topic_list: topics.into_iter().map(|s| s.into()).collect(),
broker_addr: None,
};
response.set_body_mut_ref(topic_list.encode());
response.set_body_mut_ref(topic_list.encode().expect("encode TopicList failed"));
Some(response)
}

Expand Down Expand Up @@ -482,7 +484,11 @@ impl TopicRequestHandler {
map.insert(message_queue, topic_offset);
}
topic_stats_table.set_offset_table(map);
response.set_body_mut_ref(topic_stats_table.encode());
response.set_body_mut_ref(
topic_stats_table
.encode()
.expect("encode TopicStatsTable failed"),
);
Some(response)
}

Expand Down Expand Up @@ -522,7 +528,11 @@ impl TopicRequestHandler {
}
let topic_config_and_queue_mapping =
TopicConfigAndQueueMapping::new(topic_config.unwrap(), topic_queue_mapping_detail);
response.set_body_mut_ref(topic_config_and_queue_mapping.encode());
response.set_body_mut_ref(
topic_config_and_queue_mapping
.encode()
.expect("encode TopicConfigAndQueueMapping failed"),
);
Some(response)
}

Expand All @@ -545,7 +555,7 @@ impl TopicRequestHandler {
.which_group_by_topic(topic);
groups.extend(group_in_offset.clone());
let group_list = GroupList { group_list: groups };
response.set_body_mut_ref(group_list.encode());
response.set_body_mut_ref(group_list.encode().expect("encode GroupList failed"));
Some(response)
}

Expand All @@ -572,7 +582,7 @@ impl TopicRequestHandler {
topic_list: topics.into_iter().collect(),
broker_addr: Some(broker_addr.into()),
};
response.set_body_mut_ref(topic_list.encode());
response.set_body_mut_ref(topic_list.encode().expect("encode TopicList failed"));
Some(response)
}

Expand Down
5 changes: 4 additions & 1 deletion rocketmq-broker/src/processor/consumer_manage_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,10 @@ where
};
return Some(
response
.set_body(body.encode())
.set_body(
body.encode()
.expect("GetConsumerListByGroupResponseBody encode error"),
)
.set_code(ResponseCode::Success),
);
} else {
Expand Down
7 changes: 6 additions & 1 deletion rocketmq-broker/src/processor/query_assignment_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,12 @@ impl QueryAssignmentProcessor {
let body = QueryAssignmentResponseBody {
message_queue_assignments: assignments,
};
Some(RemotingCommand::create_response_command().set_body(body.encode()))
Some(
RemotingCommand::create_response_command().set_body(
body.encode()
.expect("encode QueryAssignmentResponseBody failed"),
),
)
}

async fn do_load_balance(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,14 @@ impl<MS> ConfigManager for SubscriptionGroupManager<MS> {
.subscription_group_wrapper
.lock()
.clone()
.to_json_pretty(),
false => self.subscription_group_wrapper.lock().clone().to_json(),
.to_json_pretty()
.expect("encode subscription group pretty failed"),
false => self
.subscription_group_wrapper
.lock()
.clone()
.to_json()
.expect("encode subscription group failed"),
}
}

Expand Down
9 changes: 5 additions & 4 deletions rocketmq-broker/src/topic/manager/topic_config_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,10 +566,11 @@ impl ConfigManager for TopicConfigManager {
let version = self.data_version().as_ref().clone();
match pretty_format {
true => TopicConfigSerializeWrapper::new(Some(topic_config_table), Some(version))
.to_json_pretty(),
false => {
TopicConfigSerializeWrapper::new(Some(topic_config_table), Some(version)).to_json()
}
.to_json_pretty()
.expect("Encode TopicConfigSerializeWrapper to json failed"),
false => TopicConfigSerializeWrapper::new(Some(topic_config_table), Some(version))
.to_json()
.expect("Encode TopicConfigSerializeWrapper to json failed"),
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,12 @@ impl ConfigManager for TopicQueueMappingManager {
Some(self.data_version.lock().clone()),
);
match pretty_format {
true => wrapper.to_json_pretty(),
false => wrapper.to_json(),
true => wrapper
.to_json_pretty()
.expect("encode topic queue mapping pretty failed"),
false => wrapper
.to_json()
.expect("encode topic queue mapping failed"),
}
}

Expand Down
8 changes: 6 additions & 2 deletions rocketmq-client/src/consumer/store/local_file_offset_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,9 @@ impl OffsetStoreTrait for LocalFileOffsetStore {
}
}

let content = OffsetSerialize::from(offset_serialize_wrapper).to_json_pretty();
let content = OffsetSerialize::from(offset_serialize_wrapper)
.to_json_pretty()
.expect("persistAll failed");
if !content.is_empty() {
if let Err(e) = file_utils::string_to_file(&content, &self.store_path) {
error!(
Expand All @@ -219,7 +221,9 @@ impl OffsetStoreTrait for LocalFileOffsetStore {
offset_serialize_wrapper
.offset_table
.insert(mq.clone(), AtomicI64::new(offset.get_offset()));
let content = OffsetSerialize::from(offset_serialize_wrapper).to_json_pretty();
let content = OffsetSerialize::from(offset_serialize_wrapper)
.to_json_pretty()
.expect("persist failed");
if !content.is_empty() {
if let Err(e) = file_utils::string_to_file(&content, &self.store_path) {
error!(
Expand Down
5 changes: 4 additions & 1 deletion rocketmq-client/src/consumer/store/offset_serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ impl From<OffsetSerializeWrapper> for OffsetSerialize {
fn from(wrapper: OffsetSerializeWrapper) -> Self {
let mut offset_table = HashMap::new();
for (k, v) in wrapper.offset_table {
offset_table.insert(k.to_json(), v.load(Ordering::Relaxed));
let result = k
.to_json()
.expect("OffsetSerialize::from OffsetSerializeWrapper");
offset_table.insert(result, v.load(Ordering::Relaxed));
}
OffsetSerialize { offset_table }
}
Expand Down
33 changes: 26 additions & 7 deletions rocketmq-client/src/implementation/mq_client_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,11 @@ impl MQClientAPIImpl {
HeartbeatRequestHeader::default(),
)
.set_language(self.client_config.language)
.set_body(heartbeat_data.encode());
.set_body(
heartbeat_data
.encode()
.expect("encode HeartbeatData failed"),
);
let response = self
.remoting_client
.invoke_async(Some(addr), request, timeout_millis)
Expand Down Expand Up @@ -705,7 +709,7 @@ impl MQClientAPIImpl {
consumer_group.to_string(),
subscription_data.clone(),
);
request.set_body_mut_ref(body.encode());
request.set_body_mut_ref(body.encode().expect("encode CheckClientRequestBody failed"));
let response = self
.remoting_client
.invoke_async(
Expand Down Expand Up @@ -1084,7 +1088,11 @@ impl MQClientAPIImpl {
RequestCode::UnlockBatchMq,
UnlockBatchMqRequestHeader::default(),
);
request.set_body_mut_ref(request_body.encode());
request.set_body_mut_ref(
request_body
.encode()
.expect("encode UnlockBatchRequestBody failed"),
);
if oneway {
self.remoting_client
.invoke_oneway(addr, request, timeout_millis)
Expand Down Expand Up @@ -1124,7 +1132,11 @@ impl MQClientAPIImpl {
RequestCode::LockBatchMq,
LockBatchMqRequestHeader::default(),
);
request.set_body_mut_ref(request_body.encode());
request.set_body_mut_ref(
request_body
.encode()
.expect("encode LockBatchRequestBody failed"),
);
let response = self
.remoting_client
.invoke_async(
Expand Down Expand Up @@ -1242,7 +1254,10 @@ impl MQClientAPIImpl {
pop_share_queue_num,
};
let request = RemotingCommand::create_remoting_command(RequestCode::SetMessageRequestMode)
.set_body(body.encode());
.set_body(
body.encode()
.expect("encode SetMessageRequestModeRequestBody failed"),
);
let response = self
.remoting_client
.invoke_async(
Expand Down Expand Up @@ -1280,8 +1295,12 @@ impl MQClientAPIImpl {
strategy_name,
message_model,
};
let request =
RemotingCommand::new_request(RequestCode::QueryAssignment, request_body.encode());
let request = RemotingCommand::new_request(
RequestCode::QueryAssignment,
request_body
.encode()
.expect("encode QueryAssignmentRequestBody failed"),
);
let response = self
.remoting_client
.invoke_async(
Expand Down
2 changes: 1 addition & 1 deletion rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ impl KVConfigManager {
let table = KVTable {
table: kv_table.clone(),
};
table.encode()
table.encode().expect("encode failed")
})
}

Expand Down
4 changes: 3 additions & 1 deletion rocketmq-namesrv/src/processor/client_request_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ impl ClientRequestProcessor {
} else {
topic_route_data.encode()
};*/
let content = topic_route_data.encode();
let content = topic_route_data
.encode()
.expect("encode TopicRouteData failed");
RemotingCommand::create_response_command_with_code(RemotingSysResponseCode::Success)
.set_body(content)
}
Expand Down
Loading

0 comments on commit 8e81525

Please # to comment.