Skip to content

Commit

Permalink
Start to abstract message brokers (#1746)
Browse files Browse the repository at this point in the history
* rename IRabbitMqAdapter -> IMessageBroker
* rename RabbitMqAdapter -> RabbitMQBroker and fix namespace
* add MessageBrokerType and MessageBrokerFactory
  • Loading branch information
rkm authored Feb 9, 2024
1 parent b577cc5 commit aec0b0d
Show file tree
Hide file tree
Showing 32 changed files with 169 additions and 211 deletions.
6 changes: 6 additions & 0 deletions news/1746-feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
- Start to refactor `RabbitMqAdapter` logic into generic interface
- Rename IRabbitMqAdapter -> IMessageBroker
- Move into `Smi.Common.Messaging` namespace
- Add `MessageBrokerType` and `MessageBrokerFactory`
- Create ConnectionFactory directly in `RabbitMQBroker`
- Tidy unused variables and naming
Original file line number Diff line number Diff line change
Expand Up @@ -55,25 +55,25 @@ public DicomDirectoryProcessorHost(GlobalOptions globals, DicomDirectoryProcesso
Logger.Info("Creating PACS directory finder");

_ddf = new PacsDirectoryFinder(globals.FileSystemOptions!.FileSystemRoot!,
globals.FileSystemOptions.DicomSearchPattern!, RabbitMqAdapter.SetupProducer(globals.ProcessDirectoryOptions!.AccessionDirectoryProducerOptions!, isBatch: false));
globals.FileSystemOptions.DicomSearchPattern!, MessageBroker.SetupProducer(globals.ProcessDirectoryOptions!.AccessionDirectoryProducerOptions!, isBatch: false));
break;
case "list":
Logger.Info("Creating accession directory lister");

_ddf = new AccessionDirectoryLister(globals.FileSystemOptions!.FileSystemRoot!,
globals.FileSystemOptions.DicomSearchPattern!, RabbitMqAdapter.SetupProducer(globals.ProcessDirectoryOptions!.AccessionDirectoryProducerOptions!, isBatch: false));
globals.FileSystemOptions.DicomSearchPattern!, MessageBroker.SetupProducer(globals.ProcessDirectoryOptions!.AccessionDirectoryProducerOptions!, isBatch: false));
break;
case "default":
Logger.Info("Creating basic directory finder");

_ddf = new BasicDicomDirectoryFinder(globals.FileSystemOptions!.FileSystemRoot!,
globals.FileSystemOptions.DicomSearchPattern!, RabbitMqAdapter.SetupProducer(globals.ProcessDirectoryOptions!.AccessionDirectoryProducerOptions!, isBatch: false));
globals.FileSystemOptions.DicomSearchPattern!, MessageBroker.SetupProducer(globals.ProcessDirectoryOptions!.AccessionDirectoryProducerOptions!, isBatch: false));
break;
case "zips":
Logger.Info("Creating zip directory finder");

_ddf = new ZipDicomDirectoryFinder(globals.FileSystemOptions!.FileSystemRoot!,
globals.FileSystemOptions.DicomSearchPattern!, RabbitMqAdapter.SetupProducer(globals.ProcessDirectoryOptions!.AccessionDirectoryProducerOptions!, isBatch: false));
globals.FileSystemOptions.DicomSearchPattern!, MessageBroker.SetupProducer(globals.ProcessDirectoryOptions!.AccessionDirectoryProducerOptions!, isBatch: false));
break;
default:
throw new ArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ public ExtractImagesHost(
GlobalOptions globals,
ExtractImagesCliOptions cliOptions,
IExtractionMessageSender? extractionMessageSender = null,
IRabbitMqAdapter? rabbitMqAdapter = null,
IMessageBroker? messageBroker = null,
IFileSystem? fileSystem = null,
bool threaded = false
)
: base(
globals,
rabbitMqAdapter,
messageBroker,
threaded
)
{
Expand Down Expand Up @@ -64,8 +64,8 @@ public ExtractImagesHost(

if (extractionMessageSender == null)
{
IProducerModel extractionRequestProducer = RabbitMqAdapter.SetupProducer(options.ExtractionRequestProducerOptions!, isBatch: false);
IProducerModel extractionRequestInfoProducer = RabbitMqAdapter.SetupProducer(options.ExtractionRequestInfoProducerOptions!, isBatch: false);
IProducerModel extractionRequestProducer = MessageBroker.SetupProducer(options.ExtractionRequestProducerOptions!, isBatch: false);
IProducerModel extractionRequestInfoProducer = MessageBroker.SetupProducer(options.ExtractionRequestInfoProducerOptions!, isBatch: false);

_extractionMessageSender = new ExtractionMessageSender(
options,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ public class TriggerUpdatesHost : MicroserviceHost
private ITriggerUpdatesSource _source;
private IProducerModel _producer;

public TriggerUpdatesHost(GlobalOptions options,ITriggerUpdatesSource source,IRabbitMqAdapter? rabbitMqAdapter = null)
: base(options, rabbitMqAdapter)
public TriggerUpdatesHost(GlobalOptions options,ITriggerUpdatesSource source,IMessageBroker? messageBroker = null)
: base(options, messageBroker)
{
_source = source;
_producer = RabbitMqAdapter.SetupProducer(options.TriggerUpdatesOptions!, isBatch: false);
_producer = MessageBroker.SetupProducer(options.TriggerUpdatesOptions!, isBatch: false);
}

public override void Start()
Expand Down
4 changes: 2 additions & 2 deletions src/applications/Setup/EnvironmentProbe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
using Rdmp.Core.ReusableLibraryCode;
using Rdmp.Core.ReusableLibraryCode.Checks;
using Rdmp.Core.ReusableLibraryCode.Progress;
using Smi.Common.Messaging;

namespace Setup
{
Expand Down Expand Up @@ -225,8 +226,7 @@ internal void CheckMicroservices(IDataLoadEventListener? listener = null)

try
{
var factory = Options.RabbitOptions.CreateConnectionFactory();
var adapter = new RabbitMqAdapter(factory, "setup");
var adapter = new RabbitMQBroker(Options.RabbitOptions, "setup");

return new CheckEventArgs("Connected to RabbitMq", CheckResult.Success);
}
Expand Down
18 changes: 0 additions & 18 deletions src/common/Smi.Common/ConnectionFactoryExtensions.cs

This file was deleted.

26 changes: 12 additions & 14 deletions src/common/Smi.Common/Execution/MicroserviceHost.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using DicomTypeTranslation;
using NLog;
using RabbitMQ.Client;
using Smi.Common.Events;
using Smi.Common.Helpers;
using Smi.Common.Messages;
Expand All @@ -22,7 +21,7 @@ public abstract class MicroserviceHost : IMicroserviceHost
protected readonly GlobalOptions Globals;
protected readonly ILogger Logger;

protected readonly IRabbitMqAdapter RabbitMqAdapter;
protected readonly IMessageBroker MessageBroker;


private readonly object _oAdapterLock = new();
Expand All @@ -41,11 +40,11 @@ public abstract class MicroserviceHost : IMicroserviceHost
/// Loads logging, sets up fatal behaviour, subscribes rabbit etc.
/// </summary>
/// <param name="globals">Settings for the microservice (location of rabbit, queue names etc)</param>
/// <param name="rabbitMqAdapter"></param>
/// <param name="messageBroker"></param>
/// <param name="threaded"></param>
protected MicroserviceHost(
GlobalOptions globals,
IRabbitMqAdapter? rabbitMqAdapter = null,
IMessageBroker? messageBroker = null,
bool threaded = false)
{
if (globals == null || globals.FileSystemOptions == null || globals.RabbitOptions == null || globals.LoggingOptions == null)
Expand Down Expand Up @@ -81,15 +80,14 @@ protected MicroserviceHost(

OnFatal += (sender, args) => Fatal(args.Message, args.Exception);

if (rabbitMqAdapter == null)
if (messageBroker == null)
{
ConnectionFactory connectionFactory = globals.RabbitOptions.CreateConnectionFactory();
rabbitMqAdapter = new RabbitMqAdapter(connectionFactory, HostProcessName + HostProcessID, OnFatal, threaded);
messageBroker = new RabbitMQBroker(globals.RabbitOptions, HostProcessName + HostProcessID, OnFatal, threaded);
var controlExchangeName = globals.RabbitOptions.RabbitMqControlExchangeName
?? throw new ArgumentNullException(nameof(globals.RabbitOptions.RabbitMqControlExchangeName));
_controlMessageConsumer = new ControlMessageConsumer(connectionFactory, HostProcessName, HostProcessID, controlExchangeName, Stop);
_controlMessageConsumer = new ControlMessageConsumer(globals.RabbitOptions, HostProcessName, HostProcessID, controlExchangeName, Stop);
}
RabbitMqAdapter = rabbitMqAdapter;
MessageBroker = messageBroker;

ObjectFactory = new MicroserviceObjectFactory();
ObjectFactory.FatalHandler = (s, e) => Fatal(e.Message, e.Exception);
Expand Down Expand Up @@ -118,11 +116,11 @@ public void StartAuxConnections()
_auxConnectionsCreated = true;

// Ensures no consumers have been started until we explicitly call Start()
if (RabbitMqAdapter.HasConsumers)
if (MessageBroker.HasConsumers)
throw new ApplicationException("Rabbit adapter has consumers before aux. connections created");

_fatalLoggingProducer = RabbitMqAdapter.SetupProducer(_fatalLoggingProducerOptions, isBatch: false);
RabbitMqAdapter.StartConsumer(_controlMessageConsumer.ControlConsumerOptions, _controlMessageConsumer, isSolo: false);
_fatalLoggingProducer = MessageBroker.SetupProducer(_fatalLoggingProducerOptions, isBatch: false);
MessageBroker.StartConsumer(_controlMessageConsumer.ControlConsumerOptions, _controlMessageConsumer, isSolo: false);
}
}

Expand Down Expand Up @@ -156,7 +154,7 @@ public virtual void Stop(string reason)

lock (_oAdapterLock)
{
RabbitMqAdapter.Shutdown(Common.RabbitMqAdapter.DefaultOperationTimeout);
MessageBroker.Shutdown(RabbitMQBroker.DefaultOperationTimeout);
}

Logger.Info("Host stop completed");
Expand Down Expand Up @@ -190,7 +188,7 @@ public void Fatal(string msg, Exception? exception)

public void Wait()
{
RabbitMqAdapter.Wait();
MessageBroker.Wait();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

namespace Smi.Common
{
public interface IRabbitMqAdapter
public interface IMessageBroker
{
bool HasConsumers { get; }

Expand Down
2 changes: 1 addition & 1 deletion src/common/Smi.Common/Messaging/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ protected void Ack(IList<IMessageHeader> batchHeaders, ulong latestDeliveryTag)
}

/// <summary>
/// Logs a Fatal in the Logger and triggers the FatalError event which should shutdown the RabbitMQAdapter
/// Logs a Fatal in the Logger and triggers the FatalError event which should shutdown the MessageBroker
/// <para>Do not do any further processing after triggering this method</para>
/// </summary>
/// <param name="msg"></param>
Expand Down
13 changes: 10 additions & 3 deletions src/common/Smi.Common/Messaging/ControlMessageConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class ControlMessageConsumer : Consumer<IMessage>


public ControlMessageConsumer(
IConnectionFactory connectionFactory,
RabbitOptions rabbitOptions,
string processName,
int processId,
string controlExchangeName,
Expand All @@ -47,7 +47,14 @@ public ControlMessageConsumer(

ControlConsumerOptions.QueueName = $"Control.{_processName}{_processId}";

_factory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
_factory = new ConnectionFactory()
{
HostName = rabbitOptions.RabbitMqHostName,
VirtualHost = rabbitOptions.RabbitMqVirtualHost,
Port = rabbitOptions.RabbitMqHostPort,
UserName = rabbitOptions.RabbitMqUserName,
Password = rabbitOptions.RabbitMqPassword
};

if (controlExchangeName == null)
throw new ArgumentNullException(nameof(controlExchangeName));
Expand Down Expand Up @@ -160,7 +167,7 @@ public override void Shutdown()

/// <summary>
/// Creates a one-time connection to set up the required control queue and bindings on the RabbitMQ server.
/// The connection is disposed and StartConsumer(...) can then be called on the parent RabbitMQAdapter with ControlConsumerOptions
/// The connection is disposed and StartConsumer(...) can then be called on the parent MessageBroker with ControlConsumerOptions
/// </summary>
/// <param name="controlExchangeName"></param>
private void SetupControlQueueForHost(string controlExchangeName)
Expand Down
2 changes: 1 addition & 1 deletion src/common/Smi.Common/Messaging/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
namespace Smi.Common.Messaging
{
/// <summary>
/// Interface for an object which handles messages obtained by a RabbitMQAdapter.
/// Interface for an object which handles messages obtained by a MessageBroker.
/// </summary>
public interface IConsumer
{
Expand Down
27 changes: 27 additions & 0 deletions src/common/Smi.Common/Messaging/MessageBrokerFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using Smi.Common.Options;
using System;
using System.Diagnostics.CodeAnalysis;

namespace Smi.Common.Messaging;

public static class MessageBrokerFactory
{
[ExcludeFromCodeCoverage] // NOTE(rkm 2024-02-08) This can be removed after we use the class
public static IMessageBroker Create(GlobalOptions globals, string connectionIdentifier)
{
switch (globals.MessageBrokerType)
{
case MessageBrokerType.RabbitMQ:
{
if (globals.RabbitOptions == null)
throw new ArgumentNullException(nameof(globals), $"{nameof(globals.RabbitOptions)} must not be null");

return new RabbitMQBroker(globals.RabbitOptions, connectionIdentifier);
}
case MessageBrokerType.None:
throw new ArgumentOutOfRangeException(nameof(globals.MessageBrokerType), $"A valid {nameof(MessageBrokerType)} must be chosen");
default:
throw new NotImplementedException($"No case for {globals.MessageBrokerType}");
}
}
}
11 changes: 11 additions & 0 deletions src/common/Smi.Common/Messaging/MessageBrokerType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace Smi.Common.Messaging;

public enum MessageBrokerType
{
/// <summary>
/// Unused placeholder value
/// </summary>
None = 0,

RabbitMQ,
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

using System;
using System.Collections.Generic;
using System.Linq;
Expand All @@ -9,15 +8,14 @@
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using Smi.Common.Events;
using Smi.Common.Messaging;
using Smi.Common.Options;

namespace Smi.Common
namespace Smi.Common.Messaging
{
/// <summary>
/// Adapter for the RabbitMQ API.
/// </summary>
public class RabbitMqAdapter : IRabbitMqAdapter
public class RabbitMQBroker : IMessageBroker
{
/// <summary>
/// Used to ensure we can't create any new connections after we have called Shutdown()
Expand All @@ -38,12 +36,11 @@ public bool HasConsumers
public const string RabbitMqRoutingKey_MatchAnything = "#";
public const string RabbitMqRoutingKey_MatchOneWord = "*";

public static TimeSpan DefaultOperationTimeout = TimeSpan.FromSeconds(5);
public static readonly TimeSpan DefaultOperationTimeout = TimeSpan.FromSeconds(5);

private readonly ILogger _logger = LogManager.GetCurrentClassLogger();

private readonly HostFatalHandler? _hostFatalHandler;
private readonly string _hostId;

private readonly IConnection _connection;
private readonly Dictionary<Guid, RabbitResources> _rabbitResources = new();
Expand All @@ -54,23 +51,16 @@ public bool HasConsumers
private const int MinRabbitServerVersionMinor = 7;
private const int MinRabbitServerVersionPatch = 0;

private const int MaxSubscriptionAttempts = 5;

private readonly bool _threaded;

/// <summary>
///
/// </summary>
/// <param name="connectionFactory"></param>
/// <param name="rabbitOptions"></param>
/// <param name="hostId">Identifier for this host instance</param>
/// <param name="hostFatalHandler"></param>
/// <param name="threaded"></param>
public RabbitMqAdapter(IConnectionFactory connectionFactory, string hostId, HostFatalHandler? hostFatalHandler = null, bool threaded = false)
public RabbitMQBroker(RabbitOptions rabbitOptions, string hostId, HostFatalHandler? hostFatalHandler = null, bool threaded = false)
{
//_threaded = options.ThreadReceivers;
_threaded = threaded;

if (_threaded)
if (threaded)
{
ThreadPool.GetMinThreads(out var minWorker, out var minIOC);
var workers = Math.Max(50, minWorker);
Expand All @@ -82,10 +72,17 @@ public RabbitMqAdapter(IConnectionFactory connectionFactory, string hostId, Host

if (string.IsNullOrWhiteSpace(hostId))
throw new ArgumentException("RabbitMQ host ID required", nameof(hostId));
_hostId = hostId;

var connectionFactory = new ConnectionFactory()
{
HostName = rabbitOptions.RabbitMqHostName,
VirtualHost = rabbitOptions.RabbitMqVirtualHost,
Port = rabbitOptions.RabbitMqHostPort,
UserName = rabbitOptions.RabbitMqUserName,
Password = rabbitOptions.RabbitMqPassword
};
_connection = connectionFactory.CreateConnection(hostId);
_connection.ConnectionBlocked += (s, a) => _logger.Warn($"ConnectionBlocked (Reason: {a.Reason.ToString()})");
_connection.ConnectionBlocked += (s, a) => _logger.Warn($"ConnectionBlocked (Reason: {a.Reason})");
_connection.ConnectionUnblocked += (s, a) => _logger.Warn("ConnectionUnblocked");

if (hostFatalHandler == null)
Expand Down Expand Up @@ -302,7 +299,7 @@ public void Shutdown(TimeSpan timeout)
}
_rabbitResources.Clear();
}
lock(_exitLock)
lock (_exitLock)
Monitor.PulseAll(_exitLock);
}

Expand Down
Loading

0 comments on commit aec0b0d

Please # to comment.