Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Add event replication #754

Merged
merged 3 commits into from
Jan 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

- Removed `DisabledComponent::<C>` in favor of `DisabledComponents` to have more control over
which components are disabled. In particular, it is now possible to express 'disable all components except these'.
- Enabled replicating events directly!
- Add an `Event` to the protocol with `register_event`
- Replicate an event and buffer it in EventWriter with `send_event`
- Replicate an event and trigger it with `trigger_event`


## 0.18.0 - 2024-12-24
2 changes: 1 addition & 1 deletion examples/simple_box/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ pub(crate) fn send_message(
let message = Message1(5);
info!("Send message: {:?}", message);
server
.send_message_to_target::<Channel1, Message1>(&mut Message1(5), NetworkTarget::All)
.send_message_to_target::<Channel1, Message1>(&message, NetworkTarget::All)
.unwrap_or_else(|e| {
error!("Failed to send message: {:?}", e);
});
Expand Down
140 changes: 101 additions & 39 deletions lightyear/src/client/connection.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Specify how a Client sends/receives messages with a Server
use bevy::ecs::component::Tick as BevyTick;
use bevy::ecs::entity::MapEntities;
use bevy::prelude::{Resource, World};
use bevy::prelude::{Event, Resource, World};
use bevy::utils::{Duration, HashMap};
use bytes::Bytes;
use tracing::{debug, trace, trace_span};
Expand All @@ -23,14 +23,17 @@ use crate::prelude::client::PredictionConfig;
use crate::prelude::{Channel, ChannelKind, ClientId, Message, ReplicationConfig};
use crate::protocol::channel::ChannelRegistry;
use crate::protocol::component::ComponentRegistry;
use crate::protocol::event::EventReplicationMode;
use crate::protocol::message::{MessageRegistry, MessageType};
use crate::protocol::registry::NetId;
use crate::serialize::reader::Reader;
use crate::serialize::writer::Writer;
use crate::serialize::{SerializationError, ToBytes};
use crate::server::error::ServerError;
use crate::shared::events::connection::ConnectionEvents;
use crate::shared::message::MessageSend;
use crate::shared::events::private::InternalEventSend;
use crate::shared::events::EventSend;
use crate::shared::message::{private::InternalMessageSend, MessageSend};
use crate::shared::ping::manager::{PingConfig, PingManager};
use crate::shared::ping::message::{Ping, Pong};
use crate::shared::replication::delta::DeltaManager;
Expand Down Expand Up @@ -80,6 +83,7 @@ pub struct ConnectionManager {
#[cfg(feature = "leafwing")]
pub(crate) received_leafwing_input_messages: HashMap<NetId, Vec<Bytes>>,
/// Used to transfer raw bytes to a system that can convert the bytes to the actual type
pub(crate) received_events: HashMap<NetId, Vec<Bytes>>,
pub(crate) received_messages: HashMap<NetId, Vec<Bytes>>,
pub(crate) writer: Writer,

Expand Down Expand Up @@ -117,6 +121,7 @@ impl Default for ConnectionManager {
events: ConnectionEvents::default(),
#[cfg(feature = "leafwing")]
received_leafwing_input_messages: HashMap::default(),
received_events: HashMap::default(),
received_messages: HashMap::default(),
writer: Writer::with_capacity(0),
messages_to_send: Vec::default(),
Expand Down Expand Up @@ -170,6 +175,7 @@ impl ConnectionManager {
events: ConnectionEvents::default(),
#[cfg(feature = "leafwing")]
received_leafwing_input_messages: HashMap::default(),
received_events: HashMap::default(),
received_messages: HashMap::default(),
writer: Writer::with_capacity(MAX_PACKET_SIZE),
messages_to_send: Vec::default(),
Expand Down Expand Up @@ -235,45 +241,25 @@ impl ConnectionManager {
}

/// Send a [`Message`] to the server using a specific [`Channel`]
pub fn send_message<C: Channel, M: Message>(
&mut self,
message: &mut M,
) -> Result<(), ClientError> {
pub fn send_message<C: Channel, M: Message>(&mut self, message: &M) -> Result<(), ClientError> {
self.send_message_to_target::<C, M>(message, NetworkTarget::None)
}

/// Send a [`Message`] to the server using a specific [`Channel`]
///
/// The message will be sent to the server and re-broadcasted to all clients that match the [`NetworkTarget`]
pub fn send_message_to_target<C: Channel, M: Message>(
/// Send a [`Event`] to the server using a specific [`Channel`].
/// The event will be buffered via EventWriter.
pub fn send_event<C: Channel, E: Event + Message>(
&mut self,
message: &mut M,
target: NetworkTarget,
event: &E,
) -> Result<(), ClientError> {
self.erased_send_message_to_target(message, ChannelKind::of::<C>(), target)
self.send_event_to_target::<C, E>(event, NetworkTarget::None)
}

/// Serialize a message and buffer it internally so that it can be sent later
fn erased_send_message_to_target<M: Message>(
/// Trigger a [`Message`] to the server using a specific [`Channel`]
pub fn trigger_event<C: Channel, E: Event + Message>(
&mut self,
message: &M,
channel_kind: ChannelKind,
target: NetworkTarget,
event: &E,
) -> Result<(), ClientError> {
// write the target first
// NOTE: this is ok to do because most of the time (without rebroadcast, this just adds 1 byte)
target.to_bytes(&mut self.writer)?;
// then write the message
self.message_registry.serialize(
message,
&mut self.writer,
Some(&mut self.replication_receiver.remote_entity_map.local_to_remote),
)?;
let message_bytes = self.writer.split();

// TODO: emit logs/metrics about the message being buffered?
self.messages_to_send.push((message_bytes, channel_kind));
Ok(())
self.trigger_event_to_target::<C, E>(event, NetworkTarget::None)
}

pub(crate) fn buffer_replication_messages(
Expand Down Expand Up @@ -456,6 +442,12 @@ impl ConnectionManager {
.or_default()
.push(single_data);
}
MessageType::Event => {
self.received_events
.entry(net_id)
.or_default()
.push(single_data);
}
}
}
}
Expand Down Expand Up @@ -497,6 +489,12 @@ impl ConnectionManager {
.or_default()
.push(single_data);
}
MessageType::Event => {
self.received_events
.entry(net_id)
.or_default()
.push(single_data);
}
}
Ok(())
}
Expand Down Expand Up @@ -535,23 +533,87 @@ impl ConnectionManager {
}
}

impl MessageSend for ConnectionManager {
impl EventSend for ConnectionManager {}

impl InternalEventSend for ConnectionManager {
type Error = ClientError;
fn send_message_to_target<C: Channel, M: Message>(

fn erased_send_event_to_target<E: Event>(
&mut self,
message: &mut M,
event: &E,
channel_kind: ChannelKind,
target: NetworkTarget,
) -> Result<(), ClientError> {
self.send_message_to_target::<C, M>(message, target)
) -> Result<(), Self::Error> {
// write the target first
// NOTE: this is ok to do because most of the time (without rebroadcast, this just adds 1 byte)
target.to_bytes(&mut self.writer)?;
// then write the message
self.message_registry.serialize_event(
event,
EventReplicationMode::Buffer,
&mut self.writer,
Some(&mut self.replication_receiver.remote_entity_map.local_to_remote),
)?;
let message_bytes = self.writer.split();

// TODO: emit logs/metrics about the message being buffered?
self.messages_to_send.push((message_bytes, channel_kind));
Ok(())
}

fn erased_trigger_event_to_target<E: Event + Message>(
&mut self,
event: &E,
channel_kind: ChannelKind,
target: NetworkTarget,
) -> Result<(), Self::Error> {
// write the target first
// NOTE: this is ok to do because most of the time (without rebroadcast, this just adds 1 byte)
target.to_bytes(&mut self.writer)?;
// then write the message
self.message_registry.serialize_event(
event,
EventReplicationMode::Trigger,
&mut self.writer,
Some(&mut self.replication_receiver.remote_entity_map.local_to_remote),
)?;
let message_bytes = self.writer.split();

// TODO: emit logs/metrics about the message being buffered?
self.messages_to_send.push((message_bytes, channel_kind));
Ok(())
}
}

impl MessageSend for ConnectionManager {}

impl InternalMessageSend for ConnectionManager {
type Error = ClientError;

/// Send a message to the server via a channel.
///
/// The NetworkTarget will be serialized with the message, so that the server knows
/// how to route the message to the correct target.
fn erased_send_message_to_target<M: Message>(
&mut self,
message: &mut M,
message: &M,
channel_kind: ChannelKind,
target: NetworkTarget,
) -> Result<(), ClientError> {
self.erased_send_message_to_target(message, channel_kind, target)
// write the target first
// NOTE: this is ok to do because most of the time (without rebroadcast, this just adds 1 byte)
target.to_bytes(&mut self.writer)?;
// then write the message
self.message_registry.serialize(
message,
&mut self.writer,
Some(&mut self.replication_receiver.remote_entity_map.local_to_remote),
)?;
let message_bytes = self.writer.split();

// TODO: emit logs/metrics about the message being buffered?
self.messages_to_send.push((message_bytes, channel_kind));
Ok(())
}
}

Expand Down
Loading
Loading