diff --git a/storage/src/lib.rs b/storage/src/lib.rs index 03919e21c..81a0bc5a8 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -16,6 +16,8 @@ pub enum Error { NotBackup, #[error("Corrupted backup file")] CorruptedFile, + #[error("Empty write buffer")] + NoWrites, } pub struct Storage { @@ -85,6 +87,14 @@ impl Storage { return Ok(None); } + self.flush() + } + + /// Force flush the contents of write buffer onto disk + pub fn flush(&mut self) -> Result, Error> { + if self.current_write_file.is_empty() { + return Err(Error::NoWrites); + } match &mut self.persistence { Some(persistence) => { let hash = hash(&self.current_write_file[..]); diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index 43f40a3d4..130690a27 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -3,6 +3,7 @@ mod metrics; use std::collections::{HashMap, VecDeque}; use std::fs::File; use std::io::{self, Read, Write}; +use std::time::Instant; use std::{sync::Arc, time::Duration}; use bytes::{Bytes, BytesMut}; @@ -59,6 +60,8 @@ pub enum Error { EmptyStorage, #[error("Permission denied while accessing persistence directory \"{0}\"")] Persistence(String), + #[error("Serializer has shutdown after handling crash")] + Shutdown, } #[derive(Debug, PartialEq)] @@ -204,6 +207,16 @@ impl StorageHandler { None } + + fn flush_all(&mut self) { + for (stream_name, storage) in self.map.iter_mut() { + match storage.flush() { + Ok(_) => trace!("Force flushed stream = {stream_name} onto disk"), + Err(storage::Error::NoWrites) => {} + Err(e) => error!("Error when force flushing storage = {stream_name}; error = {e}"), + } + } + } } pub struct SerializerShutdown; @@ -299,8 +312,13 @@ impl Serializer { } loop { - // Collect next data packet and write to disk - let data = self.collector_rx.recv_async().await?; + // Collect remaining data packets and write to disk + // NOTE: wait 2s to allow bridge to shutdown and flush leftover data. + let deadline = Instant::now() + Duration::from_secs(2); + let Ok(data) = self.collector_rx.recv_deadline(deadline) else { + self.storage_handler.flush_all(); + return Err(Error::Shutdown); + }; let publish = construct_publish(data)?; let storage = self.storage_handler.select(&publish.topic); match write_to_disk(publish, storage) { diff --git a/uplink/src/main.rs b/uplink/src/main.rs index 3cb23166b..60a430e9b 100644 --- a/uplink/src/main.rs +++ b/uplink/src/main.rs @@ -203,7 +203,7 @@ fn main() -> Result<(), Error> { uplink.resolve_on_shutdown().await.unwrap(); info!("Uplink shutting down..."); // NOTE: wait 5s to allow serializer to write to network/disk - sleep(Duration::from_secs(5)).await; + sleep(Duration::from_secs(10)).await; }); Ok(())