Skip to content

Commit

Permalink
working?
Browse files Browse the repository at this point in the history
  • Loading branch information
john-z-yang committed Dec 17, 2024
1 parent e8cf0f7 commit b10ffc2
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 22 deletions.
9 changes: 7 additions & 2 deletions python/integration_tests/test_consumer_rebalancing.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,13 @@ def test_tasks_written_once_during_rebalancing() -> None:
for log_line_index, line in enumerate(lines):
if "[31mERROR" in line:
# If there is an error in log file, capture 10 lines before and after the error line
consumer_error_logs.append(f"Error found in consumer_{i}. Logging 10 lines before and after the error line:")
for j in range(max(0, log_line_index - 10), min(len(lines) - 1, log_line_index + 10)):
consumer_error_logs.append(
f"Error found in consumer_{i}. Logging 10 lines before and after the error line:"
)
for j in range(
max(0, log_line_index - 10),
min(len(lines) - 1, log_line_index + 10),
):
consumer_error_logs.append(lines[j].strip())
consumer_error_logs.append("")

Expand Down
1 change: 1 addition & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ impl Config {
"session.timeout.ms",
self.kafka_session_timeout_ms.to_string(),
)
.set("partition.assignment.strategy", "cooperative-sticky")
.set("enable.partition.eof", "false")
.set("enable.auto.commit", "true")
.set(
Expand Down
95 changes: 75 additions & 20 deletions src/consumer/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use tokio::{
mpsc::{self, unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot,
},
task::{self, JoinError, JoinSet},
task::{self, JoinError, JoinHandle, JoinSet},
time::{self, sleep, MissedTickBehavior},
};
use tokio_stream::wrappers::UnboundedReceiverStream;
Expand Down Expand Up @@ -61,14 +61,16 @@ pub async fn start_consumer(
.expect("Can't subscribe to specified topics");

handle_os_signals(event_sender.clone());
poll_consumer_client(consumer.clone(), client_shutdown_receiver);
let rdkafka_driver = poll_consumer_client(consumer.clone(), client_shutdown_receiver);
handle_events(
consumer,
event_receiver,
client_shutdown_sender,
spawn_actors,
)
.await
.await?;
rdkafka_driver.await?;
Ok(())
}

pub fn handle_os_signals(event_sender: UnboundedSender<(Event, SyncSender<()>)>) {
Expand All @@ -85,23 +87,28 @@ pub fn handle_os_signals(event_sender: UnboundedSender<(Event, SyncSender<()>)>)
pub fn poll_consumer_client(
consumer: Arc<StreamConsumer<KafkaContext>>,
shutdown: oneshot::Receiver<()>,
) {
) -> JoinHandle<()> {
task::spawn_blocking(|| {
Handle::current().block_on(async move {
let _guard = elegant_departure::get_shutdown_guard().shutdown_on_drop();
select! {
biased;
_ = shutdown => {
debug!("Received shutdown signal, commiting state in sync mode...");
let _ = consumer.commit_consumer_state(rdkafka::consumer::CommitMode::Sync);
}
_ = shutdown => {}
msg = consumer.recv() => {
error!("Got unexpected message from consumer client: {:?}", msg);
}
}

};

select! {
biased;
_ = consumer.recv() => {}
_ = sleep(Duration::from_secs(9)) => {}
};

debug!("Shutdown complete");
});
});
})
}

#[derive(Debug)]
Expand All @@ -118,8 +125,20 @@ impl KafkaContext {
impl ClientContext for KafkaContext {}

impl ConsumerContext for KafkaContext {
#[instrument(skip_all)]
fn pre_rebalance(&self, _: &BaseConsumer<Self>, rebalance: &Rebalance) {
#[instrument(skip(self, base_consumer))]
fn pre_rebalance(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance) {
if let Rebalance::Assign(tpl) = rebalance {
if tpl.count() == 0 {
return;
}
}
base_consumer
.pause(
&base_consumer
.assignment()
.expect("Unable to fetch assigned TPL"),
)
.expect("Unable to pause consumer");
let (rendezvous_sender, rendezvous_receiver) = sync_channel(0);
match rebalance {
Rebalance::Assign(tpl) => {
Expand Down Expand Up @@ -149,6 +168,31 @@ impl ConsumerContext for KafkaContext {
}
}

#[instrument(skip(self, base_consumer))]
fn post_rebalance(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance) {
if let Rebalance::Assign(tpl) = rebalance {
if tpl.count() == 0 {
return;
}
}
let assignment = base_consumer
.assignment()
.expect("Failed to get assigned TPL");
if assignment.count() != 0 {
base_consumer
.seek_partitions(
base_consumer
.committed(rdkafka::util::Timeout::Never)
.expect("Failed to get commited TPL"),
rdkafka::util::Timeout::Never,
)
.expect("Failed to seek to commited offset");
base_consumer
.resume(&assignment)
.expect("Failed to resume consumer");
}
}

#[instrument(skip(self))]
fn commit_callback(&self, result: KafkaResult<()>, _offsets: &TopicPartitionList) {
debug!("Got commit callback");
Expand Down Expand Up @@ -336,7 +380,7 @@ pub async fn handle_events(

let mut state = ConsumerState::Ready;

while let ConsumerState::Ready { .. } | ConsumerState::Consuming { .. } = state {
while let ConsumerState::Ready | ConsumerState::Consuming { .. } = state {
select! {
res = match state {
ConsumerState::Consuming(ref mut handles, _) => Either::Left(handles.join_next()),
Expand All @@ -352,20 +396,30 @@ pub async fn handle_events(
};
info!("Received event: {:?}", event);
state = match (state, event) {
(ConsumerState::Ready, Event::Assign(assigned)) => {
ConsumerState::Consuming(spawn_actors(consumer.clone(), &assigned), assigned)
(ConsumerState::Ready, Event::Assign(tpl)) => {
ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl)
}
(ConsumerState::Ready, Event::Revoke(_)) => {
unreachable!("Got partition revocation before the consumer has started")
}
(ConsumerState::Ready, Event::Shutdown) => ConsumerState::Stopped,
(ConsumerState::Consuming(_, _), Event::Assign(_)) => {
unreachable!("Got partition assignment after the consumer has started")
(ConsumerState::Consuming(handles, mut tpl), Event::Assign(mut assigned)) => {
assert!(
tpl.is_disjoint(&assigned),
"Newly assigned TPL should be disjoint from TPL we're consuming from"
);
debug!(
"{} additional topic partitions added after assignment",
assigned.len()
);
tpl.append(&mut assigned);
handles.shutdown(CALLBACK_DURATION).await;
ConsumerState::Consuming(spawn_actors(consumer.clone(), &tpl), tpl)
}
(ConsumerState::Consuming(handles, tpl), Event::Revoke(revoked)) => {
assert!(
tpl == revoked,
"Revoked TPL should be equal to the subset of TPL we're consuming from"
revoked.is_subset(&tpl),
"Revoked TPL should be a subset of TPL we're consuming from"
);
handles.shutdown(CALLBACK_DURATION).await;
ConsumerState::Ready
Expand Down Expand Up @@ -734,7 +788,7 @@ impl CommitClient for StreamConsumer<KafkaContext> {
}
}

#[derive(Default)]
#[derive(Default, Debug)]
struct HighwaterMark {
data: HashMap<(String, i32), i64>,
}
Expand Down Expand Up @@ -779,6 +833,7 @@ pub async fn commit(
while let Some(msgs) = receiver.recv().await {
let mut highwater_mark = HighwaterMark::new();
msgs.0.iter().for_each(|msg| highwater_mark.track(msg));
debug!("Store: {:?}", highwater_mark);
consumer.store_offsets(&highwater_mark.into()).unwrap();
}
debug!("Shutdown complete");
Expand Down

0 comments on commit b10ffc2

Please # to comment.