-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expose the real-time internal state of the batcher through SSE #3065
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
base: main
Are you sure you want to change the base?
Conversation
@@ -112,6 +136,10 @@ impl Backend for BackendV3 { | |||
.is_ok() | |||
} | |||
|
|||
fn events(&self) -> BroadcastReceiver<EngineState> { | |||
self.state_events.0.subscribe() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should send the current state as soon as a new connection is open (new subscribe)
let stream = infer.events(); | ||
let sse = | ||
Sse::new(BroadcastStream::from(stream).map(|state| { | ||
Event::default().json_data(state.map_err(|err| axum::Error::new(err))?) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Name of the event ?
// Dispatch new state to the proxy | ||
{ | ||
// Lock free operation (read) | ||
let num_queued_tokens = engine_state.read().await.in_queue; | ||
{ | ||
// Critical section, doing as little as possible here | ||
let mut engine_state = engine_state.write().await; | ||
engine_state.in_queue = num_queued_tokens + entry_num_tokens; | ||
} | ||
|
||
// Send new state to the channel for broadcasting | ||
if let Err(err) = queue_events.send(*engine_state.read().await) { | ||
tracing::warn!( | ||
"Failed to send BatchEvent::QueueChanged({}): {err}", | ||
num_queued_tokens + entry_num_tokens | ||
) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Dispatch new state to the proxy | |
{ | |
// Lock free operation (read) | |
let num_queued_tokens = engine_state.read().await.in_queue; | |
{ | |
// Critical section, doing as little as possible here | |
let mut engine_state = engine_state.write().await; | |
engine_state.in_queue = num_queued_tokens + entry_num_tokens; | |
} | |
// Send new state to the channel for broadcasting | |
if let Err(err) = queue_events.send(*engine_state.read().await) { | |
tracing::warn!( | |
"Failed to send BatchEvent::QueueChanged({}): {err}", | |
num_queued_tokens + entry_num_tokens | |
) | |
} | |
} | |
engine_state.modify(|state| state.in_queue += entry_num_tokens); |
Function modify then send the new state in the SSE
No description provided.