From 880b72653026955891e6951406a1175d34424d4d Mon Sep 17 00:00:00 2001 From: mxsm Date: Thu, 5 Dec 2024 16:24:47 +0800 Subject: [PATCH 1/2] =?UTF-8?q?[ISSUE=20#1566]=F0=9F=9A=80Add=20QueryConsu?= =?UTF-8?q?meTimeSpanRequestHeader=20struct=F0=9F=8D=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rocketmq-remoting/src/protocol/header.rs | 1 + .../query_consume_time_span_request_header.rs | 79 +++++++++++++++++++ 2 files changed, 80 insertions(+) create mode 100644 rocketmq-remoting/src/protocol/header/query_consume_time_span_request_header.rs diff --git a/rocketmq-remoting/src/protocol/header.rs b/rocketmq-remoting/src/protocol/header.rs index fbc8bb1f..20013153 100644 --- a/rocketmq-remoting/src/protocol/header.rs +++ b/rocketmq-remoting/src/protocol/header.rs @@ -43,6 +43,7 @@ pub mod namesrv; pub mod notify_consumer_ids_changed_request_header; pub mod pull_message_request_header; pub mod pull_message_response_header; +pub mod query_consume_time_span_request_header; pub mod query_consumer_offset_request_header; pub mod query_consumer_offset_response_header; pub mod query_message_request_header; diff --git a/rocketmq-remoting/src/protocol/header/query_consume_time_span_request_header.rs b/rocketmq-remoting/src/protocol/header/query_consume_time_span_request_header.rs new file mode 100644 index 00000000..30001ac3 --- /dev/null +++ b/rocketmq-remoting/src/protocol/header/query_consume_time_span_request_header.rs @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use cheetah_string::CheetahString; +use rocketmq_macros::RequestHeaderCodec; +use serde::Deserialize; +use serde::Serialize; + +use crate::rpc::rpc_request_header::RpcRequestHeader; + +#[derive(Clone, Debug, Serialize, Deserialize, Default, RequestHeaderCodec)] +#[serde(rename_all = "camelCase")] +pub struct QueryConsumeTimeSpanRequestHeader { + #[required] + pub topic: CheetahString, + + #[required] + pub group: CheetahString, + + #[serde(flatten)] + pub rpc_request_header: Option, +} + +#[cfg(test)] +mod tests { + use cheetah_string::CheetahString; + + use super::*; + + #[test] + fn query_consume_time_span_request_header_serializes_correctly() { + let header = QueryConsumeTimeSpanRequestHeader { + topic: CheetahString::from_static_str("test_topic"), + group: CheetahString::from_static_str("test_group"), + rpc_request_header: None, + }; + let serialized = serde_json::to_string(&header).unwrap(); + let expected = r#"{"topic":"test_topic","group":"test_group"}"#; + assert_eq!(serialized, expected); + } + + #[test] + fn query_consume_time_span_request_header_deserializes_correctly() { + let data = r#"{"topic":"test_topic","group":"test_group"}"#; + let header: QueryConsumeTimeSpanRequestHeader = serde_json::from_str(data).unwrap(); + assert_eq!(header.topic, CheetahString::from_static_str("test_topic")); + assert_eq!(header.group, CheetahString::from_static_str("test_group")); + assert!(!header.rpc_request_header.is_none()); + } + + #[test] + fn query_consume_time_span_request_header_handles_missing_optional_fields() { + let data = r#"{"topic":"test_topic","group":"test_group"}"#; + let header: QueryConsumeTimeSpanRequestHeader = serde_json::from_str(data).unwrap(); + assert_eq!(header.topic, CheetahString::from_static_str("test_topic")); + assert_eq!(header.group, CheetahString::from_static_str("test_group")); + assert!(!header.rpc_request_header.is_none()); + } + + #[test] + fn query_consume_time_span_request_header_handles_invalid_data() { + let data = r#"{"topic":12345,"group":"test_group"}"#; + let result: Result = serde_json::from_str(data); + assert!(result.is_err()); + } +} From d92252b79aca79eab4445b36a6986ab8a16a3564 Mon Sep 17 00:00:00 2001 From: mxsm Date: Thu, 5 Dec 2024 16:29:55 +0800 Subject: [PATCH 2/2] fix code --- .../header/query_consume_time_span_request_header.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/rocketmq-remoting/src/protocol/header/query_consume_time_span_request_header.rs b/rocketmq-remoting/src/protocol/header/query_consume_time_span_request_header.rs index 30001ac3..1cd2130e 100644 --- a/rocketmq-remoting/src/protocol/header/query_consume_time_span_request_header.rs +++ b/rocketmq-remoting/src/protocol/header/query_consume_time_span_request_header.rs @@ -19,7 +19,7 @@ use rocketmq_macros::RequestHeaderCodec; use serde::Deserialize; use serde::Serialize; -use crate::rpc::rpc_request_header::RpcRequestHeader; +use crate::protocol::header::namesrv::topic_operation_header::TopicRequestHeader; #[derive(Clone, Debug, Serialize, Deserialize, Default, RequestHeaderCodec)] #[serde(rename_all = "camelCase")] @@ -31,7 +31,7 @@ pub struct QueryConsumeTimeSpanRequestHeader { pub group: CheetahString, #[serde(flatten)] - pub rpc_request_header: Option, + pub topic_request_header: Option, } #[cfg(test)] @@ -45,7 +45,7 @@ mod tests { let header = QueryConsumeTimeSpanRequestHeader { topic: CheetahString::from_static_str("test_topic"), group: CheetahString::from_static_str("test_group"), - rpc_request_header: None, + topic_request_header: None, }; let serialized = serde_json::to_string(&header).unwrap(); let expected = r#"{"topic":"test_topic","group":"test_group"}"#; @@ -58,7 +58,7 @@ mod tests { let header: QueryConsumeTimeSpanRequestHeader = serde_json::from_str(data).unwrap(); assert_eq!(header.topic, CheetahString::from_static_str("test_topic")); assert_eq!(header.group, CheetahString::from_static_str("test_group")); - assert!(!header.rpc_request_header.is_none()); + assert!(!header.topic_request_header.is_none()); } #[test] @@ -67,7 +67,7 @@ mod tests { let header: QueryConsumeTimeSpanRequestHeader = serde_json::from_str(data).unwrap(); assert_eq!(header.topic, CheetahString::from_static_str("test_topic")); assert_eq!(header.group, CheetahString::from_static_str("test_group")); - assert!(!header.rpc_request_header.is_none()); + assert!(!header.topic_request_header.is_none()); } #[test]