Skip to content

Commit

Permalink
hopefully making it more readable
Browse files Browse the repository at this point in the history
  • Loading branch information
zbrox committed May 3, 2020
1 parent 1b0a885 commit 3462179
Showing 1 changed file with 51 additions and 20 deletions.
71 changes: 51 additions & 20 deletions src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub fn check_trigger_applicability(trigger: &Trigger, queue_name: &str, stat: &Q

type QueueTriggerType = String;
type UnixTimestamp = u64;
type MsgExpirationLog = HashMap<QueueTriggerType, UnixTimestamp>;

fn get_unix_timestamp() -> Result<UnixTimestamp> {
Ok(SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs())
Expand Down Expand Up @@ -79,37 +80,35 @@ pub async fn check_loop(
for msg in msgs {
let queue_trigger_type =
queue_trigger_name(&msg.metadata.queue_name, &msg.metadata.trigger_type);

if !sent_msgs_registry.contains_key(&queue_trigger_type) {
log::debug!(
"Haven't yet sent a message for queue {} of type {} has expired. Logging in registry.",
&msg.metadata.queue_name,
&msg.metadata.trigger_type
);
sent_msgs_registry.insert(
queue_trigger_type,
get_unix_timestamp().context("Cannot get UNIX timestamp")?,
);
} else if let Some(ts) = sent_msgs_registry.get(&queue_trigger_type) {
let current_ts = get_unix_timestamp().context("Cannot get UNIX timestamp")?;
if ts + (60 * 10) < current_ts {
let current_ts = get_unix_timestamp().context("Cannot get UNIX timestamp")?;
match has_msg_expired(&mut sent_msgs_registry, &queue_trigger_type, current_ts) {
Ok(ExpirationStatus::Expired) => {
log::debug!(
"Time since last message for queue {} of type {} has expired. Valid for sending again.",
"Message for queue {} of type {} has expired. Resending...",
&msg.metadata.queue_name,
&msg.metadata.trigger_type
);
*sent_msgs_registry
.get_mut(&queue_trigger_type)
.expect("No such entry in sent msgs log") = current_ts;
} else {
}
Ok(ExpirationStatus::NotExpired) => {
log::debug!(
"Last message for queue {} of type {} was sent too recently. Skipping sending this one...",
&msg.metadata.queue_name,
&msg.metadata.trigger_type
);
continue;
}
}
Ok(ExpirationStatus::NotSentYet) => {
log::debug!(
"Haven't yet sent a message for queue {} of type {} has expired. Saved in log.",
&msg.metadata.queue_name,
&msg.metadata.trigger_type
);
}
Err(error) => {
log::error!("Unexpected error with msgs: {}", error);
continue;
}
};

match send_slack_msg(&slack_config.webhook_url, &msg).await {
Ok(_) => {
Expand All @@ -129,6 +128,38 @@ pub async fn check_loop(
Ok(())
}

enum ExpirationStatus {
Expired,
NotSentYet,
NotExpired,
}

fn has_msg_expired(
msg_expiration_log: &mut MsgExpirationLog,
queue_trigger_type: &QueueTriggerType,
current_ts: UnixTimestamp,
) -> Result<ExpirationStatus> {
match msg_expiration_log.get(queue_trigger_type) {
Some(ts) => {
if ts + (60 * 10) < current_ts {
*msg_expiration_log
.get_mut(queue_trigger_type)
.expect("No such entry in sent msgs log") = current_ts;
return Ok(ExpirationStatus::Expired);
} else {
return Ok(ExpirationStatus::NotExpired);
}
}
None => {
msg_expiration_log.insert(
queue_trigger_type.into(),
get_unix_timestamp().context("Cannot get UNIX timestamp")?,
);
return Ok(ExpirationStatus::NotSentYet);
}
}
}

fn build_msgs_for_trigger(
queue_info: &Vec<QueueInfo>,
trigger: &Trigger,
Expand Down

0 comments on commit 3462179

Please # to comment.