Skip to content

Commit

Permalink
Merge pull request #11 from axiomhq/arne/dx-124-support-rate-limiting…
Browse files Browse the repository at this point in the history
…-in-axiom-rs
  • Loading branch information
bahlo authored Aug 2, 2022
2 parents 3bb0522 + 5004ed8 commit e623e94
Show file tree
Hide file tree
Showing 8 changed files with 590 additions and 61 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ flate2 = "1"
http = "0.2"
backoff = { version = "0.4", features = ["futures"] }
futures = "0.3"
tokio = { version = "1", optional = true, features = ["rt"] }
tokio = { version = "1", optional = true, features = ["rt", "sync"] }
async-std = { version = "1", optional = true, features = ["tokio1"] }
url = "2"

[dev-dependencies]
tokio = { version = "1", features = ["rt", "macros"] }
Expand All @@ -37,6 +38,7 @@ serde_test = "1"
test-context = "0.1"
async-trait = "0.1"
futures-util = "0.3"
httpmock = "0.6"

[features]
default = ["tokio", "default-tls"]
Expand Down
50 changes: 24 additions & 26 deletions src/datasets/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use tokio::task::spawn_blocking;
use crate::{
datasets::model::*,
error::{Error, Result},
http::{self, Response},
http::{self, HeaderMap},
};

/// Provides methods to work with Axiom datasets, including ingesting and
Expand Down Expand Up @@ -64,7 +64,7 @@ impl Client {
};

let query_params = serde_qs::to_string(&query_params)?;
let path = format!("/datasets/_apl?{}", query_params);
let path = format!("/v1/datasets/_apl?{}", query_params);
let res = self.http_client.post(path, &req).await?;

let saved_query_id = res
Expand All @@ -91,20 +91,24 @@ impl Client {
name: dataset_name.into(),
description: description.into(),
};
self.http_client.post("/datasets", &req).await?.json().await
self.http_client
.post("/v1/datasets", &req)
.await?
.json()
.await
}

/// Delete the dataset with the given ID.
pub async fn delete<N: Into<String>>(&self, dataset_name: N) -> Result<()> {
self.http_client
.delete(format!("/datasets/{}", dataset_name.into()))
.delete(format!("/v1/datasets/{}", dataset_name.into()))
.await
}

/// Get a dataset by its id.
pub async fn get<N: Into<String>>(&self, dataset_name: N) -> Result<Dataset> {
self.http_client
.get(format!("/datasets/{}", dataset_name.into()))
.get(format!("/v1/datasets/{}", dataset_name.into()))
.await?
.json()
.await
Expand All @@ -113,7 +117,7 @@ impl Client {
/// Retrieve the information of the dataset identified by its id.
pub async fn info<N: Into<String>>(&self, dataset_name: N) -> Result<Info> {
self.http_client
.get(format!("/datasets/{}/info", dataset_name.into()))
.get(format!("/v1/datasets/{}/info", dataset_name.into()))
.await?
.json()
.await
Expand Down Expand Up @@ -163,23 +167,17 @@ impl Client {
N: Into<String>,
P: Into<Bytes>,
{
let mut request = self
.http_client
.post_builder(format!("/datasets/{}/ingest", dataset_name.into()))
.header(header::CONTENT_TYPE, content_type);
let mut headers = HeaderMap::new();
headers.insert(header::CONTENT_TYPE, content_type.into());
headers.insert(header::CONTENT_ENCODING, content_encoding.into());

// Add Content-Encoding header if necessary
request = match content_encoding {
ContentEncoding::Identity => request,
_ => request.header(header::CONTENT_ENCODING, content_encoding),
};

request
.body(payload.into())
.send()
.await
.map(Response::new)
.map_err(Error::Http)?
self.http_client
.post_bytes(
format!("/v1/datasets/{}/ingest", dataset_name.into()),
payload,
headers,
)
.await?
.json()
.await
}
Expand Down Expand Up @@ -207,7 +205,7 @@ impl Client {

/// List all available datasets.
pub async fn list(&self) -> Result<Vec<Dataset>> {
self.http_client.get("/datasets").await?.json().await
self.http_client.get("/v1/datasets").await?.json().await
}

/// Execute the given query on the dataset identified by its id.
Expand All @@ -217,7 +215,7 @@ impl Client {
O: Into<Option<QueryOptions>>,
{
let path = format!(
"/datasets/{}/query?{}",
"/v1/datasets/{}/query?{}",
dataset_name.into(),
&opts
.into()
Expand Down Expand Up @@ -252,7 +250,7 @@ impl Client {
let duration = duration.try_into()?;
let req = TrimRequest::new(duration.into());
self.http_client
.post(format!("/datasets/{}/trim", dataset_name.into()), &req)
.post(format!("/v1/datasets/{}/trim", dataset_name.into()), &req)
.await?
.json()
.await
Expand All @@ -265,7 +263,7 @@ impl Client {
req: DatasetUpdateRequest,
) -> Result<Dataset> {
self.http_client
.put(format!("/datasets/{}", dataset_name.into()), &req)
.put(format!("/v1/datasets/{}", dataset_name.into()), &req)
.await?
.json()
.await
Expand Down
2 changes: 1 addition & 1 deletion src/datasets/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ pub struct TrimResult {
}

/// Returned on event ingestion operation.
#[derive(Deserialize, Debug, Default)]
#[derive(Serialize, Deserialize, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct IngestStatus {
/// Amount of events that have been ingested.
Expand Down
6 changes: 6 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use serde::Deserialize;
use std::fmt;
use thiserror::Error;

use crate::limits::Limits;

/// A `Result` alias where the `Err` case is `axiom::Error`.
pub type Result<T> = std::result::Result<T, Error>;

Expand Down Expand Up @@ -40,6 +42,10 @@ pub enum Error {
#[cfg(feature = "tokio")]
#[error("Failed to join thread: {0}")]
JoinError(tokio::task::JoinError),
#[error("Rate limit exceeded: {0}")]
RateLimitExceeded(Limits),
#[error("Invalid URL: {0}")]
InvalidUrl(url::ParseError),
}

/// This is the manual implementation. We don't really care if the error is
Expand Down
Loading

0 comments on commit e623e94

Please # to comment.