Skip to content

Commit

Permalink
Merge pull request #33 from axiomhq/arne/dx-311-rust-ingest_stream-sh…
Browse files Browse the repository at this point in the history
…ould-flush-every

Use chunks_timeout for {try_,}ingest_stream
  • Loading branch information
bahlo authored Oct 26, 2022
2 parents a8d5b48 + f86ac11 commit 3fd3697
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ tokio = { version = "1", optional = true, features = ["rt", "sync"] }
async-std = { version = "1", optional = true, features = ["tokio1"] }
url = "2"
tracing = { version = "0.1" }
tokio-stream = "0.1"

[dev-dependencies]
tokio = { version = "1", features = ["rt", "macros"] }
Expand Down
7 changes: 4 additions & 3 deletions src/datasets/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use async_std::task::spawn_blocking;
use bytes::Bytes;
use flate2::{write::GzEncoder, Compression};
use futures::{Stream, StreamExt};
use futures::Stream;
use reqwest::header;
use serde::Serialize;
use std::{
Expand All @@ -14,6 +14,7 @@ use std::{
};
#[cfg(feature = "tokio")]
use tokio::task::spawn_blocking;
use tokio_stream::StreamExt;
use tracing::instrument;

use crate::{
Expand Down Expand Up @@ -216,7 +217,7 @@ impl Client {
E: Serialize,
{
let dataset_name = dataset_name.into();
let mut chunks = Box::pin(stream.chunks(1000));
let mut chunks = Box::pin(stream.chunks_timeout(1000, StdDuration::from_secs(1)));
let mut ingest_status = IngestStatus::default();
while let Some(events) = chunks.next().await {
let new_ingest_status = self.ingest(dataset_name.clone(), events).await?;
Expand All @@ -239,7 +240,7 @@ impl Client {
E: std::error::Error + Send + Sync + 'static,
{
let dataset_name = dataset_name.into();
let mut chunks = Box::pin(stream.chunks(1000));
let mut chunks = Box::pin(stream.chunks_timeout(1000, StdDuration::from_secs(1)));
let mut ingest_status = IngestStatus::default();
while let Some(events) = chunks.next().await {
let events: StdResult<Vec<I>, E> = events.into_iter().collect();
Expand Down

0 comments on commit 3fd3697

Please # to comment.