-
-
Notifications
You must be signed in to change notification settings - Fork 122
/
Copy pathNebulaNetPacketProcessor.cs
140 lines (122 loc) · 4.11 KB
/
NebulaNetPacketProcessor.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
using System;
using System.Collections.Generic;
using System.Diagnostics;
using NebulaAPI.Interfaces;
using NebulaAPI.Networking;
using NebulaAPI.Packets;
using NebulaModel.Logger;
namespace NebulaModel.Networking.Serialization;
public class NebulaNetPacketProcessor : NetPacketProcessor, INetPacketProcessor
{
// Packet simulation stuff
private readonly Dictionary<ulong, Type> _callbacksDebugInfo = [];
private readonly NetDataWriter writer = new();
private readonly List<DelayedPacket> delayedPackets = [];
private readonly Queue<PendingPacket> pendingPackets = new();
#if DEBUG
private readonly Random simulationRandom = new();
private readonly int SimulatedMaxLatency = 50;
private readonly int SimulatedMinLatency = 20;
#endif
public bool SimulateLatency { get; set; } = false;
/// <summary>
/// Whether or not packet processing is enabled
/// </summary>
public bool EnablePacketProcessing { get; set; } = true;
public NebulaNetPacketProcessor()
{
_netSerializer = new NebulaNetSerializer();
}
/// <summary>
/// Adds back some functionality that nebula relied on before the update.
/// This method was removed from LiteNetLib as it was not thread-safe, and is still not thread safe in below implementation.
/// @TODO: Optimize & move into `NebulaConnection.cs`
/// </summary>
public byte[] Write<T>(T packet) where T : class, new()
{
writer.Reset();
Write(writer, packet);
#if DEBUG
if (!typeof(T).IsDefined(typeof(HidePacketInDebugLogsAttribute), false))
{
Log.Debug($"Packet Sent << {packet.GetType().Name}, Size: {writer.Length}");
}
#endif
return writer.CopyData();
}
#region DEBUG_PACKET_DELAY
public void ProcessPacketQueue()
{
lock (pendingPackets)
{
ProcessDelayedPackets();
while (pendingPackets.Count > 0 && EnablePacketProcessing)
{
var packet = pendingPackets.Dequeue();
try
{
ReadPacket(new NetDataReader(packet.Data), packet.UserData);
}
catch (Exception ex)
{
// We can't do anything to ParseException, so just leave a warning in the log file
// For other exception, display a red error pop-up window
if (ex is not ParseException)
{
Log.Error(ex);
}
}
}
}
}
[Conditional("DEBUG")]
private void ProcessDelayedPackets()
{
lock (delayedPackets)
{
var now = DateTime.UtcNow;
var deleteCount = 0;
for (var i = 0; i < delayedPackets.Count; ++i)
{
if (now >= delayedPackets[i].DueTime)
{
pendingPackets.Enqueue(delayedPackets[i].Packet);
deleteCount = i + 1;
}
else
{
// We need to break to avoid messing up the order of the packets.
break;
}
}
if (deleteCount > 0)
{
delayedPackets.RemoveRange(0, deleteCount);
}
}
}
public void EnqueuePacketForProcessing<T>(T packet, object userData) where T : class, new()
{
EnqueuePacketForProcessing(Write(packet), userData);
}
public void EnqueuePacketForProcessing(byte[] rawData, object userData)
{
#if DEBUG
if (SimulateLatency)
{
lock (delayedPackets)
{
var packet = new PendingPacket(rawData, userData);
var dueTime = DateTime.UtcNow.AddMilliseconds(simulationRandom.Next(SimulatedMinLatency, SimulatedMaxLatency));
delayedPackets.Add(new DelayedPacket(packet, dueTime));
return;
}
}
#endif
lock (pendingPackets)
{
pendingPackets.Enqueue(new PendingPacket(rawData, userData));
}
}
#endregion
}