Skip to content

Reduce memory usage by using the body directly instead of copying in BasicPublishAsync #1445

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
10 changes: 5 additions & 5 deletions projects/Benchmarks/WireFormatting/MethodFraming.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class MethodFramingBasicAck
public ushort Channel { get; set; }

[Benchmark]
internal RentedMemory BasicAckWrite() => Framing.SerializeToFrames(ref _basicAck, Channel);
internal RentedOutgoingMemory BasicAckWrite() => Framing.SerializeToFrames(ref _basicAck, Channel);
}

[Config(typeof(Config))]
Expand All @@ -41,13 +41,13 @@ public class MethodFramingBasicPublish
public int FrameMax { get; set; }

[Benchmark]
internal RentedMemory BasicPublishWriteNonEmpty() => Framing.SerializeToFrames(ref _basicPublish, ref _properties, _body, Channel, FrameMax);
internal RentedOutgoingMemory BasicPublishWriteNonEmpty() => Framing.SerializeToFrames(ref _basicPublish, ref _properties, _body, Channel, FrameMax);

[Benchmark]
internal RentedMemory BasicPublishWrite() => Framing.SerializeToFrames(ref _basicPublish, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax);
internal RentedOutgoingMemory BasicPublishWrite() => Framing.SerializeToFrames(ref _basicPublish, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax);

[Benchmark]
internal RentedMemory BasicPublishMemoryWrite() => Framing.SerializeToFrames(ref _basicPublishMemory, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax);
internal RentedOutgoingMemory BasicPublishMemoryWrite() => Framing.SerializeToFrames(ref _basicPublishMemory, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax);
}

[Config(typeof(Config))]
Expand All @@ -60,6 +60,6 @@ public class MethodFramingChannelClose
public ushort Channel { get; set; }

[Benchmark]
internal RentedMemory ChannelCloseWrite() => Framing.SerializeToFrames(ref _channelClose, Channel);
internal RentedOutgoingMemory ChannelCloseWrite() => Framing.SerializeToFrames(ref _channelClose, Channel);
}
}
17 changes: 12 additions & 5 deletions projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ abstract RabbitMQ.Client.Exceptions.ProtocolException.ReplyCode.get -> ushort
const RabbitMQ.Client.AmqpTcpEndpoint.DefaultAmqpSslPort = 5671 -> int
const RabbitMQ.Client.AmqpTcpEndpoint.UseDefaultPort = -1 -> int
const RabbitMQ.Client.ConnectionFactory.DefaultChannelMax = 2047 -> ushort
const RabbitMQ.Client.ConnectionFactory.DefaultCopyBodyToMemoryThreshold = 2147483647 -> int
const RabbitMQ.Client.ConnectionFactory.DefaultFrameMax = 0 -> uint
const RabbitMQ.Client.ConnectionFactory.DefaultMaxMessageSize = 134217728 -> uint
const RabbitMQ.Client.ConnectionFactory.DefaultPass = "guest" -> string
Expand Down Expand Up @@ -212,6 +213,8 @@ RabbitMQ.Client.ConnectionFactory.ConsumerDispatchConcurrency.get -> int
RabbitMQ.Client.ConnectionFactory.ConsumerDispatchConcurrency.set -> void
RabbitMQ.Client.ConnectionFactory.ContinuationTimeout.get -> System.TimeSpan
RabbitMQ.Client.ConnectionFactory.ContinuationTimeout.set -> void
RabbitMQ.Client.ConnectionFactory.CopyBodyToMemoryThreshold.get -> int
RabbitMQ.Client.ConnectionFactory.CopyBodyToMemoryThreshold.set -> void
RabbitMQ.Client.ConnectionFactory.CreateConnection() -> RabbitMQ.Client.IConnection
RabbitMQ.Client.ConnectionFactory.CreateConnection(RabbitMQ.Client.IEndpointResolver endpointResolver, string clientProvidedName) -> RabbitMQ.Client.IConnection
RabbitMQ.Client.ConnectionFactory.CreateConnection(string clientProvidedName) -> RabbitMQ.Client.IConnection
Expand Down Expand Up @@ -507,8 +510,10 @@ RabbitMQ.Client.IChannel.BasicNackAsync(ulong deliveryTag, bool multiple, bool r
RabbitMQ.Client.IChannel.BasicNacks -> System.EventHandler<RabbitMQ.Client.Events.BasicNackEventArgs>
RabbitMQ.Client.IChannel.BasicPublish<TProperties>(RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, in TProperties basicProperties, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> void
RabbitMQ.Client.IChannel.BasicPublish<TProperties>(string exchange, string routingKey, in TProperties basicProperties, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> void
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, in TProperties basicProperties, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, in TProperties basicProperties, System.Buffers.ReadOnlySequence<byte> body = default(System.Buffers.ReadOnlySequence<byte>), bool mandatory = false, bool? copyBody = null) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, in TProperties basicProperties, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false, bool? copyBody = null) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(string! exchange, string! routingKey, in TProperties basicProperties, System.Buffers.ReadOnlySequence<byte> body = default(System.Buffers.ReadOnlySequence<byte>), bool mandatory = false, bool? copyBody = null) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(string! exchange, string! routingKey, in TProperties basicProperties, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false, bool? copyBody = null) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicQos(uint prefetchSize, ushort prefetchCount, bool global) -> void
RabbitMQ.Client.IChannel.BasicQosAsync(uint prefetchSize, ushort prefetchCount, bool global) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicReject(ulong deliveryTag, bool requeue) -> void
Expand Down Expand Up @@ -583,6 +588,7 @@ RabbitMQ.Client.IConnection.ConnectionRecoveryError -> System.EventHandler<Rabbi
RabbitMQ.Client.IConnection.ConnectionShutdown -> System.EventHandler<RabbitMQ.Client.ShutdownEventArgs>
RabbitMQ.Client.IConnection.ConnectionUnblocked -> System.EventHandler<System.EventArgs>
RabbitMQ.Client.IConnection.ConsumerTagChangeAfterRecovery -> System.EventHandler<RabbitMQ.Client.Events.ConsumerTagChangedAfterRecoveryEventArgs>
RabbitMQ.Client.IConnection.CopyBodyToMemoryThreshold.get -> int
RabbitMQ.Client.IConnection.CreateChannel() -> RabbitMQ.Client.IChannel
RabbitMQ.Client.IConnection.CreateChannelAsync() -> System.Threading.Tasks.ValueTask<RabbitMQ.Client.IChannel>
RabbitMQ.Client.IConnection.Endpoint.get -> RabbitMQ.Client.AmqpTcpEndpoint
Expand Down Expand Up @@ -880,6 +886,7 @@ readonly RabbitMQ.Client.ConnectionConfig.AuthMechanisms -> System.Collections.G
readonly RabbitMQ.Client.ConnectionConfig.ClientProperties -> System.Collections.Generic.IDictionary<string, object>
readonly RabbitMQ.Client.ConnectionConfig.ClientProvidedName -> string
readonly RabbitMQ.Client.ConnectionConfig.ContinuationTimeout -> System.TimeSpan
readonly RabbitMQ.Client.ConnectionConfig.CopyBodyToMemoryThreshold -> int
readonly RabbitMQ.Client.ConnectionConfig.DispatchConsumerConcurrency -> int
readonly RabbitMQ.Client.ConnectionConfig.DispatchConsumersAsync -> bool
readonly RabbitMQ.Client.ConnectionConfig.HandshakeContinuationTimeout -> System.TimeSpan
Expand Down Expand Up @@ -953,11 +960,11 @@ static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client
static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, RabbitMQ.Client.IBasicConsumer consumer) -> System.Threading.Tasks.ValueTask<string>
static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, string consumerTag, RabbitMQ.Client.IBasicConsumer consumer) -> System.Threading.Tasks.ValueTask<string>
static RabbitMQ.Client.IChannelExtensions.BasicConsumeAsync(this RabbitMQ.Client.IChannel channel, string queue, bool autoAck, string consumerTag, System.Collections.Generic.IDictionary<string, object> arguments, RabbitMQ.Client.IBasicConsumer consumer) -> System.Threading.Tasks.ValueTask<string>
static RabbitMQ.Client.IChannelExtensions.BasicPublish(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> void
static RabbitMQ.Client.IChannelExtensions.BasicPublish(this RabbitMQ.Client.IChannel channel, string exchange, string routingKey, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> void
static RabbitMQ.Client.IChannelExtensions.BasicPublish(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false, bool? copyBody = null) -> void
static RabbitMQ.Client.IChannelExtensions.BasicPublish<T>(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.PublicationAddress addr, in T basicProperties, System.ReadOnlyMemory<byte> body) -> void
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel channel, string exchange, string routingKey, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false, bool? copyBody = null) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false, bool? copyBody = null) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync<T>(this RabbitMQ.Client.IChannel channel, RabbitMQ.Client.PublicationAddress addr, in T basicProperties, System.ReadOnlyMemory<byte> body) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.Close(this RabbitMQ.Client.IChannel channel) -> void
static RabbitMQ.Client.IChannelExtensions.Close(this RabbitMQ.Client.IChannel channel, ushort replyCode, string replyText) -> void
Expand Down
117 changes: 117 additions & 0 deletions projects/RabbitMQ.Client/client/RentedOutgoingMemory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#nullable enable

using System;
using System.Buffers;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Threading.Tasks;
using RabbitMQ.Client.Impl;

namespace RabbitMQ.Client
{
internal sealed class RentedOutgoingMemory : IDisposable
{
private readonly TaskCompletionSource<bool>? _sendCompletionSource;
private bool _disposedValue;
private byte[]? _rentedArray;
private ReadOnlySequence<byte> _data;

public RentedOutgoingMemory(ReadOnlyMemory<byte> data, byte[]? rentedArray = null, bool waitSend = false)
: this(new ReadOnlySequence<byte>(data), rentedArray, waitSend)
{
}

public RentedOutgoingMemory(ReadOnlySequence<byte> data, byte[]? rentedArray = null, bool waitSend = false)
{
_data = data;
_rentedArray = rentedArray;

if (waitSend)
{
_sendCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
}
}

internal int Size => (int)Data.Length;

public int RentedArraySize => _rentedArray?.Length ?? 0;

internal ReadOnlySequence<byte> Data
{
get
{
if (_disposedValue)
{
throw new ObjectDisposedException(nameof(RentedOutgoingMemory));
}

return _data;
}
}

/// <summary>
/// Mark the data as sent.
/// </summary>
/// <returns><c>true</c> if the object can be disposed, <c>false</c> if the <see cref="SocketFrameHandler"/> is waiting for the data to be sent.</returns>
public bool DidSend()
{
if (_sendCompletionSource is null)
{
return true;
}

_sendCompletionSource.SetResult(true);
return false;
}

/// <summary>
/// Wait for the data to be sent.
/// </summary>
/// <returns><c>true</c> if the data was sent and the object can be disposed.</returns>
public ValueTask<bool> WaitForDataSendAsync()
{
return _sendCompletionSource is null ? new ValueTask<bool>(false) : WaitForFinishCore();

async ValueTask<bool> WaitForFinishCore()
{
await _sendCompletionSource.Task.ConfigureAwait(false);
return true;
}
}

public void WriteTo(PipeWriter pipeWriter)
{
foreach (ReadOnlyMemory<byte> memory in Data)
{
pipeWriter.Write(memory.Span);
}
}

private void Dispose(bool disposing)
{
if (_disposedValue)
{
return;
}

Debug.Assert(_sendCompletionSource is null or { Task.IsCompleted: true }, "The send task should be completed before disposing.");
_disposedValue = true;

if (disposing)
{
_data = default;

if (_rentedArray != null)
{
ClientArrayPool.Return(_rentedArray);
_rentedArray = null;
}
}
}

public void Dispose()
{
Dispose(disposing: true);
}
}
}
13 changes: 12 additions & 1 deletion projects/RabbitMQ.Client/client/api/ConnectionConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,16 @@ public sealed class ConnectionConfig
/// </summary>
public readonly int DispatchConsumerConcurrency;

/// <summary>
/// The threshold for when to copy the body to a temporary array.
/// </summary>
/// <remarks>
/// When the body is larger than this threshold it will reuse the same buffer. Because of this
/// the buffer cannot be modified by the application. This causes
/// the socket (<see cref="SocketFrameHandler.WriteAsync"/>) to block until the frame is sent.
/// </remarks>
public readonly int CopyBodyToMemoryThreshold;

internal readonly Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> FrameHandlerFactoryAsync;

internal ConnectionConfig(string virtualHost, string userName, string password,
Expand All @@ -153,7 +163,7 @@ internal ConnectionConfig(string virtualHost, string userName, string password,
TopologyRecoveryFilter topologyRecoveryFilter, TopologyRecoveryExceptionHandler topologyRecoveryExceptionHandler,
TimeSpan networkRecoveryInterval, TimeSpan heartbeatInterval, TimeSpan continuationTimeout, TimeSpan handshakeContinuationTimeout, TimeSpan requestedConnectionTimeout,
bool dispatchConsumersAsync, int dispatchConsumerConcurrency,
Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> frameHandlerFactoryAsync)
Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> frameHandlerFactoryAsync, int copyBodyToMemoryThreshold)
{
VirtualHost = virtualHost;
UserName = userName;
Expand All @@ -176,6 +186,7 @@ internal ConnectionConfig(string virtualHost, string userName, string password,
DispatchConsumersAsync = dispatchConsumersAsync;
DispatchConsumerConcurrency = dispatchConsumerConcurrency;
FrameHandlerFactoryAsync = frameHandlerFactoryAsync;
CopyBodyToMemoryThreshold = copyBodyToMemoryThreshold;
}
}
}
25 changes: 23 additions & 2 deletions projects/RabbitMQ.Client/client/api/ConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor
/// </summary>
public const string DefaultVHost = "/";

/// <summary>
/// Default value for the copy body to memory threshold.
/// </summary>
public const int DefaultCopyBodyToMemoryThreshold = int.MaxValue;

/// <summary>
/// TLS versions enabled by default: TLSv1.2, v1.1, v1.0.
/// </summary>
Expand Down Expand Up @@ -361,6 +366,16 @@ public AmqpTcpEndpoint Endpoint
/// </summary>
public uint MaxMessageSize { get; set; } = DefaultMaxMessageSize;

/// <summary>
/// The threshold for when to copy the body to a temporary array.
/// </summary>
/// <remarks>
/// When the body is larger than this threshold it will reuse the same buffer. Because of this
/// the buffer cannot be modified by the application. This causes
/// the socket (<see cref="SocketFrameHandler.WriteAsync"/>) to block until the frame is sent.
/// </remarks>
public int CopyBodyToMemoryThreshold { get; set; } = DefaultCopyBodyToMemoryThreshold;

/// <summary>
/// The uri to use for the connection.
/// </summary>
Expand Down Expand Up @@ -748,7 +763,12 @@ public async ValueTask<IConnection> CreateConnectionAsync(IEndpointResolver endp
}
}

private ConnectionConfig CreateConfig(string clientProvidedName)
internal ConnectionConfig CreateConfig()
{
return CreateConfig(ClientProvidedName);
}

internal ConnectionConfig CreateConfig(string clientProvidedName)
{
return new ConnectionConfig(
VirtualHost,
Expand All @@ -771,7 +791,8 @@ private ConnectionConfig CreateConfig(string clientProvidedName)
RequestedConnectionTimeout,
DispatchConsumersAsync,
ConsumerDispatchConcurrency,
CreateFrameHandlerAsync);
CreateFrameHandlerAsync,
CopyBodyToMemoryThreshold);
}

internal async Task<IFrameHandler> CreateFrameHandlerAsync(
Expand Down
Loading