Skip to content

Commit

Permalink
add configuration item for msg expiration in seconds
Browse files Browse the repository at this point in the history
  • Loading branch information
zbrox committed May 3, 2020
1 parent 3462179 commit ba3c860
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 5 deletions.
4 changes: 3 additions & 1 deletion config_sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ password="the_password"
vhost="%2f"

[settings]
poll_seconds=60 # how often to check
poll_seconds=30 # how often to check
msg_expiration_seconds=600 # Default value: 600s (10min); when the message expires it can be resent again, making
# it possible to check more often but not spam Slack with the same messages

[slack]
webhook_url="https://hooks.slack.com/services/xxx/xxxx"
Expand Down
6 changes: 6 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ pub struct RabbitMqConfig {
#[derive(Deserialize, Debug)]
pub struct MonitorSettings {
pub poll_seconds: u64,
#[serde(default = "default_expiration")]
pub msg_expiration_seconds: u64,
}

fn default_expiration() -> u64 {
600
}

#[derive(Deserialize, Debug)]
Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ fn main() -> Result<()> {
let sleep_time = time::Duration::from_secs(config.settings.poll_seconds);
task::block_on(check_loop(
sleep_time,
config.settings.msg_expiration_seconds,
config.rabbitmq,
config.slack,
config.triggers,
Expand Down
16 changes: 12 additions & 4 deletions src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ fn queue_trigger_name(queue_name: &str, trigger_name: &str) -> String {

pub async fn check_loop(
poll_interval: Duration,
expiration_in_seconds: u64,
rmq_config: RabbitMqConfig,
slack_config: SlackConfig,
triggers: Vec<Trigger>,
Expand Down Expand Up @@ -81,12 +82,18 @@ pub async fn check_loop(
let queue_trigger_type =
queue_trigger_name(&msg.metadata.queue_name, &msg.metadata.trigger_type);
let current_ts = get_unix_timestamp().context("Cannot get UNIX timestamp")?;
match has_msg_expired(&mut sent_msgs_registry, &queue_trigger_type, current_ts) {
match has_msg_expired(
&mut sent_msgs_registry,
&queue_trigger_type,
current_ts,
&expiration_in_seconds,
) {
Ok(ExpirationStatus::Expired) => {
log::debug!(
"Message for queue {} of type {} has expired. Resending...",
"Message for queue {} of type {} has expired (expiration time is {}s). Resending...",
&msg.metadata.queue_name,
&msg.metadata.trigger_type
&msg.metadata.trigger_type,
&expiration_in_seconds,
);
}
Ok(ExpirationStatus::NotExpired) => {
Expand Down Expand Up @@ -138,10 +145,11 @@ fn has_msg_expired(
msg_expiration_log: &mut MsgExpirationLog,
queue_trigger_type: &QueueTriggerType,
current_ts: UnixTimestamp,
expiration_in_seconds: &u64,
) -> Result<ExpirationStatus> {
match msg_expiration_log.get(queue_trigger_type) {
Some(ts) => {
if ts + (60 * 10) < current_ts {
if ts + expiration_in_seconds < current_ts {
*msg_expiration_log
.get_mut(queue_trigger_type)
.expect("No such entry in sent msgs log") = current_ts;
Expand Down

0 comments on commit ba3c860

Please # to comment.