Skip to content

Commit ebfecbc

Browse files
committed
Added radio_address config option
1 parent 80bcb0b commit ebfecbc

File tree

4 files changed

+40
-10
lines changed

4 files changed

+40
-10
lines changed

myceli/src/config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ pub struct Config {
1414
pub window_size: u32,
1515
pub block_size: u32,
1616
pub chunk_transmit_throttle: Option<u32>,
17+
pub radio_address: Option<String>,
1718
}
1819

1920
impl Default for Config {
@@ -33,6 +34,8 @@ impl Default for Config {
3334
block_size: 1024 * 3,
3435
// Default to no throttling of chunks
3536
chunk_transmit_throttle: None,
37+
// Default to no set radio address
38+
radio_address: None,
3639
}
3740
}
3841
}

myceli/src/listener.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pub struct Listener<T> {
1818
storage: Rc<Storage>,
1919
transport: Arc<T>,
2020
connected: Arc<Mutex<bool>>,
21+
radio_address: Option<String>,
2122
}
2223

2324
impl<T: Transport + Send + 'static> Listener<T> {
@@ -26,6 +27,7 @@ impl<T: Transport + Send + 'static> Listener<T> {
2627
storage_path: &str,
2728
transport: Arc<T>,
2829
block_size: u32,
30+
radio_address: Option<String>,
2931
) -> Result<Listener<T>> {
3032
let provider = SqliteStorageProvider::new(storage_path)?;
3133
provider.setup()?;
@@ -36,6 +38,7 @@ impl<T: Transport + Send + 'static> Listener<T> {
3638
storage,
3739
transport,
3840
connected: Arc::new(Mutex::new(true)),
41+
radio_address,
3942
})
4043
}
4144

@@ -51,6 +54,7 @@ impl<T: Transport + Send + 'static> Listener<T> {
5154
let shipper_sender_clone = shipper_sender.clone();
5255
let shipper_transport = Arc::clone(&self.transport);
5356
let initial_connected = Arc::clone(&self.connected);
57+
let shipper_radio = self.radio_address.clone();
5458
spawn(move || {
5559
let mut shipper = Shipper::new(
5660
&shipper_storage_path,
@@ -61,6 +65,7 @@ impl<T: Transport + Send + 'static> Listener<T> {
6165
shipper_transport,
6266
initial_connected,
6367
block_size,
68+
shipper_radio,
6469
)
6570
.expect("Shipper creation failed");
6671
shipper.receive_msg_loop();
@@ -69,16 +74,21 @@ impl<T: Transport + Send + 'static> Listener<T> {
6974
loop {
7075
match self.transport.receive() {
7176
Ok((message, sender_addr)) => {
72-
match self.handle_message(message, &sender_addr, shipper_sender.clone()) {
77+
let target_addr = if let Some(radio_address) = &self.radio_address {
78+
radio_address.to_owned()
79+
} else {
80+
sender_addr.to_owned()
81+
};
82+
match self.handle_message(message, &target_addr, shipper_sender.clone()) {
7383
Ok(Some(resp)) => {
74-
if let Err(e) = self.transmit_response(resp, &sender_addr) {
84+
if let Err(e) = self.transmit_response(resp, &target_addr) {
7585
error!("TransmitResponse error: {e}");
7686
}
7787
}
7888
Ok(None) => {}
7989
Err(e) => {
8090
if let Err(e) =
81-
self.transmit_response(Message::Error(e.to_string()), &sender_addr)
91+
self.transmit_response(Message::Error(e.to_string()), &target_addr)
8292
{
8393
error!("TransmitResponse error: {e}");
8494
}
@@ -205,15 +215,15 @@ impl<T: Transport + Send + 'static> Listener<T> {
205215
}
206216
// Default case for valid messages which don't have handling code implemented yet
207217
message => {
208-
info!("Received unhandled message: {:?}", message);
218+
info!("Received message: {:?}", message);
209219
None
210220
}
211221
};
212222
Ok(resp)
213223
}
214224

215-
fn transmit_response(&self, message: Message, sender_addr: &str) -> Result<()> {
216-
self.transport.send(message, sender_addr)?;
225+
fn transmit_response(&self, message: Message, target_addr: &str) -> Result<()> {
226+
self.transport.send(message, target_addr)?;
217227
Ok(())
218228
}
219229
}

myceli/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ fn main() -> Result<()> {
4242
&db_path,
4343
Arc::new(udp_transport),
4444
cfg.block_size,
45+
cfg.radio_address,
4546
)
4647
.expect("Listener creation failed");
4748
listener

myceli/src/shipper.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ pub struct Shipper<T> {
4242
window_size: u32,
4343
// Current connection status
4444
connected: Arc<Mutex<bool>>,
45+
// Radio address
46+
radio_address: Option<String>,
4547
}
4648

4749
impl<T: Transport + Send + 'static> Shipper<T> {
@@ -55,6 +57,7 @@ impl<T: Transport + Send + 'static> Shipper<T> {
5557
transport: Arc<T>,
5658
connected: Arc<Mutex<bool>>,
5759
block_size: u32,
60+
radio_address: Option<String>,
5861
) -> Result<Shipper<T>> {
5962
let provider = SqliteStorageProvider::new(storage_path)?;
6063
provider.setup()?;
@@ -68,6 +71,7 @@ impl<T: Transport + Send + 'static> Shipper<T> {
6871
window_size,
6972
transport,
7073
connected,
74+
radio_address,
7175
})
7276
}
7377

@@ -116,22 +120,34 @@ impl<T: Transport + Send + 'static> Shipper<T> {
116120
blocks,
117121
Rc::clone(&self.storage),
118122
)?;
119-
self.transmit_msg(missing_blocks_msg, sender_addr)?;
123+
let target_addr = if let Some(radio_address) = &self.radio_address {
124+
radio_address.to_owned()
125+
} else {
126+
sender_addr.to_owned()
127+
};
128+
self.transmit_msg(missing_blocks_msg, &target_addr)?;
120129
}
121130
}
122131
DataProtocol::RequestMissingDagBlocks { cid } => {
123132
if *self.connected.lock().unwrap() {
124133
let missing_blocks_msg =
125134
handlers::get_missing_dag_blocks(&cid, Rc::clone(&self.storage))?;
126-
self.transmit_msg(missing_blocks_msg, sender_addr)?;
135+
let target_addr = if let Some(radio_address) = &self.radio_address {
136+
radio_address.to_owned()
137+
} else {
138+
sender_addr.to_owned()
139+
};
140+
self.transmit_msg(missing_blocks_msg, &target_addr)?;
127141
}
128142
}
129143
DataProtocol::MissingDagBlocks { cid, blocks } => {
130144
if *self.connected.lock().unwrap() {
131145
let target_addr = if let Some(session) = self.window_sessions.get(&cid) {
132-
session.target_addr.to_string()
146+
session.target_addr.to_owned()
147+
} else if let Some(radio_address) = &self.radio_address {
148+
radio_address.to_owned()
133149
} else {
134-
sender_addr.to_string()
150+
sender_addr.to_owned()
135151
};
136152
// If no blocks are missing, then attempt to move to next window
137153
if blocks.is_empty() {

0 commit comments

Comments
 (0)