Skip to content

Performance counters (metrics) #1027

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 2 commits into from
Mar 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions projects/RabbitMQ.Client/client/impl/CommandAssembler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Framing.Impl;
using RabbitMQ.Client.Logging;
using RabbitMQ.Util;

namespace RabbitMQ.Client.Impl
Expand Down Expand Up @@ -91,6 +92,7 @@ public bool HandleFrame(in InboundFrame frame, out IncomingCommand command)
return true;
}

RabbitMqClientEventSource.Log.CommandReceived();
command = new IncomingCommand(_method, _header, _body, _bodyBytes);
Reset();
return shallReturn;
Expand Down
2 changes: 2 additions & 0 deletions projects/RabbitMQ.Client/client/impl/Connection.Commands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Impl;
using RabbitMQ.Client.Logging;
using RabbitMQ.Util;

namespace RabbitMQ.Client.Framing.Impl
Expand Down Expand Up @@ -71,6 +72,7 @@ internal void HandleConnectionUnblocked()

private void Open()
{
RabbitMqClientEventSource.Log.ConnectionOpened();
StartAndTune();
_model0.ConnectionOpen(_factory.VirtualHost);
}
Expand Down
1 change: 1 addition & 0 deletions projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ private void FinishClose()
_frameHandler.Close();
_model0.SetCloseReason(CloseReason);
_model0.FinishClose();
RabbitMqClientEventSource.Log.ConnectionClosed();
}

///<summary>Broadcasts notification of the final shutdown of the connection.</summary>
Expand Down
39 changes: 26 additions & 13 deletions projects/RabbitMQ.Client/client/impl/Frame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
using System.Runtime.ExceptionServices;

using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Logging;
using RabbitMQ.Util;

namespace RabbitMQ.Client.Impl
Expand All @@ -49,9 +50,6 @@ internal static class Framing
* | 1 byte | 2 bytes | 4 bytes | x bytes | 1 byte |
* +------------+---------+----------------+---------+------------------+ */
internal const int BaseFrameSize = 1 + 2 + 4 + 1;
internal const int StartFrameType = 0;
internal const int StartChannel = 1;
internal const int StartPayloadSize = 3;
private const int StartPayload = 7;

internal static class Method
Expand Down Expand Up @@ -199,10 +197,7 @@ internal static InboundFrame ReadFrom(Stream reader, byte[] frameHeaderBuffer)
{
try
{
if (reader.Read(frameHeaderBuffer, 0, frameHeaderBuffer.Length) == 0)
{
throw new EndOfStreamException("Reached the end of the stream. Possible authentication failure.");
}
ReadFromStream(reader, frameHeaderBuffer, frameHeaderBuffer.Length);
}
catch (IOException ioe)
{
Expand Down Expand Up @@ -234,19 +229,15 @@ internal static InboundFrame ReadFrom(Stream reader, byte[] frameHeaderBuffer)
// Is returned by InboundFrame.ReturnPayload in Connection.MainLoopIteration
int readSize = payloadSize + EndMarkerLength;
byte[] payloadBytes = ArrayPool<byte>.Shared.Rent(readSize);
int bytesRead = 0;
try
{
while (bytesRead < readSize)
{
bytesRead += reader.Read(payloadBytes, bytesRead, readSize - bytesRead);
}
ReadFromStream(reader, payloadBytes, readSize);
}
catch (Exception)
{
// Early EOF.
ArrayPool<byte>.Shared.Return(payloadBytes);
throw new MalformedFrameException($"Short frame - expected to read {readSize} bytes, only got {bytesRead} bytes");
throw new MalformedFrameException($"Short frame - expected to read {readSize} bytes");
}

if (payloadBytes[payloadSize] != Constants.FrameEnd)
Expand All @@ -255,9 +246,31 @@ internal static InboundFrame ReadFrom(Stream reader, byte[] frameHeaderBuffer)
throw new MalformedFrameException($"Bad frame end marker: {payloadBytes[payloadSize]}");
}

RabbitMqClientEventSource.Log.DataReceived(payloadSize + Framing.BaseFrameSize);
return new InboundFrame(type, channel, new Memory<byte>(payloadBytes, 0, payloadSize), payloadBytes);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void ReadFromStream(Stream reader, byte[] buffer, int toRead)
{
int bytesRead = 0;
do
{
int read = reader.Read(buffer, bytesRead, toRead - bytesRead);
if (read == 0)
{
ThrowEndOfStream();
}

bytesRead += read;
} while (bytesRead != toRead);

static void ThrowEndOfStream()
{
throw new EndOfStreamException("Reached the end of the stream. Possible authentication failure.");
}
}

public byte[] TakeoverPayload()
{
return _rentedArray;
Expand Down
11 changes: 8 additions & 3 deletions projects/RabbitMQ.Client/client/impl/SessionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@

using System;
using System.Threading;

using RabbitMQ.Client.client.framing;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Framing.Impl;
using RabbitMQ.Client.Logging;

namespace RabbitMQ.Client.Impl
{
Expand All @@ -50,6 +51,7 @@ protected SessionBase(Connection connection, ushort channelNumber)
{
connection.ConnectionShutdown += OnConnectionShutdown;
}
RabbitMqClientEventSource.Log.ChannelOpened();
}

public event EventHandler<ShutdownEventArgs> SessionShutdown
Expand Down Expand Up @@ -102,7 +104,10 @@ public void Close(ShutdownEventArgs reason)

public void Close(ShutdownEventArgs reason, bool notify)
{
Interlocked.CompareExchange(ref _closeReason, reason, null);
if (Interlocked.CompareExchange(ref _closeReason, reason, null) is null)
{
RabbitMqClientEventSource.Log.ChannelClosed();
}
if (notify)
{
OnSessionShutdown(CloseReason);
Expand All @@ -126,7 +131,7 @@ public void Notify()

public virtual void Transmit<T>(in T cmd) where T : struct, IOutgoingCommand
{
if (!IsOpen && cmd.Method.ProtocolCommandId != client.framing.ProtocolCommandId.ChannelCloseOk)
if (!IsOpen && cmd.Method.ProtocolCommandId != ProtocolCommandId.ChannelCloseOk)
{
throw new AlreadyClosedException(CloseReason);
}
Expand Down
4 changes: 3 additions & 1 deletion projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
using System.Threading.Tasks;

using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Logging;

namespace RabbitMQ.Client.Impl
{
Expand All @@ -59,7 +60,7 @@ public static async Task TimeoutAfter(this Task task, TimeSpan timeout)
}
}

internal class SocketFrameHandler : IFrameHandler
internal sealed class SocketFrameHandler : IFrameHandler
{
private readonly ITcpClient _socket;
private readonly Stream _reader;
Expand Down Expand Up @@ -282,6 +283,7 @@ private async Task WriteLoop()
#else
await _writer.WriteAsync(memory).ConfigureAwait(false);
#endif
RabbitMqClientEventSource.Log.CommandSent(segment.Count);
ArrayPool<byte>.Shared.Return(segment.Array);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (c) 2007-2020 VMware, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v2.0:
//
//---------------------------------------------------------------------------
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
//---------------------------------------------------------------------------

using System;
using System.Diagnostics.Tracing;
using System.Threading;

namespace RabbitMQ.Client.Logging
{
#nullable enable
internal sealed partial class RabbitMqClientEventSource
{
private static int ConnectionsOpened;
private static int ConnectionsClosed;
private static int ChannelsOpened;
private static int ChannelsClosed;
private static long BytesSent;
private static long BytesReceived;
private static long CommandsSent;
private static long CommandsReceived;

#if !NETSTANDARD
private PollingCounter? _connectionOpenedCounter;
private PollingCounter? _openConnectionCounter;
private PollingCounter? _channelOpenedCounter;
private PollingCounter? _openChannelCounter;
private IncrementingPollingCounter? _bytesSentCounter;
private IncrementingPollingCounter? _bytesReceivedCounter;
private IncrementingPollingCounter? _commandSentCounter;
private IncrementingPollingCounter? _commandReceivedCounter;

protected override void OnEventCommand(EventCommandEventArgs command)
{
if (command.Command == EventCommand.Enable)
{
_connectionOpenedCounter ??= new PollingCounter("total-connections-opened", this, () => ConnectionsOpened) { DisplayName = "Total connections opened" };
_openConnectionCounter ??= new PollingCounter("current-open-connections", this, () => ConnectionsOpened - ConnectionsClosed) { DisplayName = "Current open connections count" };

_channelOpenedCounter ??= new PollingCounter("total-channels-opened", this, () => ChannelsOpened) { DisplayName = "Total channels opened" };
_openChannelCounter ??= new PollingCounter("current-open-channels", this, () => ChannelsOpened - ChannelsClosed) { DisplayName = "Current open channels count" };

_bytesSentCounter ??= new IncrementingPollingCounter("bytes-sent-rate", this, () => Interlocked.Read(ref BytesSent)) { DisplayName = "Byte sending rate", DisplayUnits = "B", DisplayRateTimeScale = new TimeSpan(0, 0, 1) };
_bytesReceivedCounter ??= new IncrementingPollingCounter("bytes-received-rate", this, () => Interlocked.Read(ref BytesReceived)) { DisplayName = "Byte receiving rate", DisplayUnits = "B", DisplayRateTimeScale = new TimeSpan(0, 0, 1) };

_commandSentCounter ??= new IncrementingPollingCounter("AMQP-method-sent-rate", this, () => Interlocked.Read(ref CommandsSent)) { DisplayName = "AMQP method sending rate", DisplayUnits = "B", DisplayRateTimeScale = new TimeSpan(0, 0, 1) };
_commandReceivedCounter ??= new IncrementingPollingCounter("AMQP-method-received-rate", this, () => Interlocked.Read(ref CommandsReceived)) { DisplayName = "AMQP method receiving rate", DisplayUnits = "B", DisplayRateTimeScale = new TimeSpan(0, 0, 1) };
}
}
#endif
[NonEvent]
public void ConnectionOpened()
{
Interlocked.Increment(ref ConnectionsOpened);
}

[NonEvent]
public void ConnectionClosed()
{
Interlocked.Increment(ref ConnectionsClosed);
}

[NonEvent]
public void ChannelOpened()
{
Interlocked.Increment(ref ChannelsOpened);
}

[NonEvent]
public void ChannelClosed()
{
Interlocked.Increment(ref ChannelsClosed);
}

[NonEvent]
public void DataReceived(int byteCount)
{
Interlocked.Add(ref BytesReceived, byteCount);
}

[NonEvent]
public void CommandSent(int byteCount)
{
Interlocked.Increment(ref CommandsSent);
Interlocked.Add(ref BytesSent, byteCount);
}

[NonEvent]
public void CommandReceived()
{
Interlocked.Increment(ref CommandsReceived);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,20 @@

namespace RabbitMQ.Client.Logging
{
[EventSource(Name="rabbitmq-dotnet-client")]
public sealed class RabbitMqClientEventSource : EventSource
#nullable enable
internal sealed partial class RabbitMqClientEventSource : EventSource
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to internal, as others shouldn't be able to do anything with these methods here

{
public class Keywords
public static readonly RabbitMqClientEventSource Log = new RabbitMqClientEventSource();

public RabbitMqClientEventSource()
: base("rabbitmq-client")
{
public const EventKeywords Log = (EventKeywords)1;
}
#if NET452
public RabbitMqClientEventSource() : base()
{

}
#else
public RabbitMqClientEventSource() : base(EventSourceSettings.EtwSelfDescribingEventFormat)
public class Keywords
{
public const EventKeywords Log = (EventKeywords)1;
}
#endif

public static RabbitMqClientEventSource Log = new RabbitMqClientEventSource ();

[Event(1, Message = "INFO", Keywords = Keywords.Log, Level = EventLevel.Informational)]
public void Info(string message)
Expand Down
19 changes: 0 additions & 19 deletions projects/Unit/APIApproval.Approve.verified.txt
Original file line number Diff line number Diff line change
Expand Up @@ -808,25 +808,6 @@ namespace RabbitMQ.Client.Exceptions
}
namespace RabbitMQ.Client.Logging
{
[System.Diagnostics.Tracing.EventSource(Name="rabbitmq-dotnet-client")]
public sealed class RabbitMqClientEventSource : System.Diagnostics.Tracing.EventSource
{
public static RabbitMQ.Client.Logging.RabbitMqClientEventSource Log;
public RabbitMqClientEventSource() { }
[System.Diagnostics.Tracing.Event(3, Keywords=System.Diagnostics.Tracing.EventKeywords.None | System.Diagnostics.Tracing.EventKeywords.All, Level=System.Diagnostics.Tracing.EventLevel.Error, Message="ERROR")]
public void Error(string message, RabbitMQ.Client.Logging.RabbitMqExceptionDetail ex) { }
[System.Diagnostics.Tracing.NonEvent]
public void Error(string message, System.Exception ex) { }
[System.Diagnostics.Tracing.Event(1, Keywords=System.Diagnostics.Tracing.EventKeywords.None | System.Diagnostics.Tracing.EventKeywords.All, Level=System.Diagnostics.Tracing.EventLevel.Informational, Message="INFO")]
public void Info(string message) { }
[System.Diagnostics.Tracing.Event(2, Keywords=System.Diagnostics.Tracing.EventKeywords.None | System.Diagnostics.Tracing.EventKeywords.All, Level=System.Diagnostics.Tracing.EventLevel.Warning, Message="WARN")]
public void Warn(string message) { }
public class Keywords
{
public const System.Diagnostics.Tracing.EventKeywords Log = 1;
public Keywords() { }
}
}
[System.Diagnostics.Tracing.EventData]
public class RabbitMqExceptionDetail
{
Expand Down