Skip to content
This repository was archived by the owner on May 16, 2023. It is now read-only.

Carry dynamodb fix #73

Merged
merged 4 commits into from
Feb 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions aws_lambda_events/src/custom_serde/float_unix_epoch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
use serde::{de, ser};
use std::fmt;

use chrono::offset::TimeZone;
use chrono::{DateTime, LocalResult, Utc};

enum SerdeError<V: fmt::Display, D: fmt::Display> {
NonExistent { timestamp: V },
Ambiguous { timestamp: V, min: D, max: D },
}

fn ne_timestamp<T: fmt::Display>(ts: T) -> SerdeError<T, u8> {
SerdeError::NonExistent::<T, u8> { timestamp: ts }
}

impl<V: fmt::Display, D: fmt::Display> fmt::Debug for SerdeError<V, D> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "ChronoSerdeError({})", self)
}
}

impl<V: fmt::Display, D: fmt::Display> fmt::Display for SerdeError<V, D> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
SerdeError::NonExistent { ref timestamp } => {
write!(f, "value is not a legal timestamp: {}", timestamp)
}
SerdeError::Ambiguous {
ref timestamp,
ref min,
ref max,
} => write!(
f,
"value is an ambiguous timestamp: {}, could be either of {}, {}",
timestamp, min, max
),
}
}
}

fn serde_from<T, E, V>(me: LocalResult<T>, ts: &V) -> Result<T, E>
where
E: de::Error,
V: fmt::Display,
T: fmt::Display,
{
match me {
LocalResult::None => Err(E::custom(ne_timestamp(ts))),
LocalResult::Ambiguous(min, max) => Err(E::custom(SerdeError::Ambiguous {
timestamp: ts,
min,
max,
})),
LocalResult::Single(val) => Ok(val),
}
}

struct SecondsFloatTimestampVisitor;

/// Serialize a UTC datetime into an float number of seconds since the epoch
/// ```
pub fn serialize<S>(dt: &DateTime<Utc>, serializer: S) -> Result<S::Ok, S::Error>
where
S: ser::Serializer,
{
serializer.serialize_i64(dt.timestamp_millis() / 1000)
}

/// Deserialize a `DateTime` from a float seconds timestamp
pub fn deserialize<'de, D>(d: D) -> Result<DateTime<Utc>, D::Error>
where
D: de::Deserializer<'de>,
{
d.deserialize_f64(SecondsFloatTimestampVisitor)
}

impl<'de> de::Visitor<'de> for SecondsFloatTimestampVisitor {
type Value = DateTime<Utc>;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a unix timestamp as a float")
}

/// Deserialize a timestamp in seconds since the epoch
fn visit_u64<E>(self, value: u64) -> Result<DateTime<Utc>, E>
where
E: de::Error,
{
serde_from(Utc.timestamp_opt(value as i64, 0), &value)
}

/// Deserialize a timestamp in seconds since the epoch
fn visit_i64<E>(self, value: i64) -> Result<DateTime<Utc>, E>
where
E: de::Error,
{
serde_from(Utc.timestamp_opt(value, 0), &value)
}

/// Deserialize a timestamp in seconds since the epoch
fn visit_f64<E>(self, value: f64) -> Result<DateTime<Utc>, E>
where
E: de::Error,
{
let time_ms = (value.fract() * 1_000_000.).floor() as u32;
let time_s = value.trunc() as i64;
serde_from(Utc.timestamp_opt(time_s, time_ms), &value)
}
}
1 change: 1 addition & 0 deletions aws_lambda_events/src/custom_serde/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub(crate) mod codebuild_time;
mod headers;
pub(crate) use self::headers::*;

pub(crate) mod float_unix_epoch;
pub(crate) mod http_method;

fn normalize_timestamp<'de, D>(deserializer: D) -> Result<(u64, u64), D::Error>
Expand Down
11 changes: 8 additions & 3 deletions aws_lambda_events/src/dynamodb/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::custom_serde::*;
use chrono::{serde::ts_nanoseconds, DateTime, Utc};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, fmt};

Expand Down Expand Up @@ -178,7 +178,7 @@ pub struct StreamRecord {
/// The approximate date and time when the stream record was created, in UNIX
/// epoch time (http://www.epochconverter.com/) format.
#[serde(rename = "ApproximateCreationDateTime")]
#[serde(with = "ts_nanoseconds")]
#[serde(with = "float_unix_epoch")]
pub approximate_creation_date_time: DateTime<Utc>,
/// The primary key attribute(s) for the DynamoDB item that was modified.
#[serde(deserialize_with = "deserialize_lambda_map")]
Expand Down Expand Up @@ -213,16 +213,21 @@ pub struct StreamRecord {
#[cfg(test)]
mod test {
use super::*;
use chrono::TimeZone;

extern crate serde_json;

#[test]
#[cfg(feature = "dynamodb")]
fn example_dynamodb_event() {
let data = include_bytes!("../generated/fixtures/example-dynamodb-event.json");
let parsed: Event = serde_json::from_slice(data).unwrap();
let mut parsed: Event = serde_json::from_slice(data).unwrap();
let output: String = serde_json::to_string(&parsed).unwrap();
let reparsed: Event = serde_json::from_slice(output.as_bytes()).unwrap();
assert_eq!(parsed, reparsed);

let event = parsed.records.pop().unwrap();
let date = Utc.ymd(2016, 12, 2).and_hms(1, 27, 0);
assert_eq!(date, event.change.approximate_creation_date_time);
}
}