diff --git a/news/1746-feature.md b/news/1746-feature.md new file mode 100644 index 000000000..4403fea36 --- /dev/null +++ b/news/1746-feature.md @@ -0,0 +1,6 @@ +- Start to refactor `RabbitMqAdapter` logic into generic interface + - Rename IRabbitMqAdapter -> IMessageBroker + - Move into `Smi.Common.Messaging` namespace + - Add `MessageBrokerType` and `MessageBrokerFactory` + - Create ConnectionFactory directly in `RabbitMQBroker` + - Tidy unused variables and naming diff --git a/src/applications/Applications.DicomDirectoryProcessor/Execution/DicomDirectoryProcessorHost.cs b/src/applications/Applications.DicomDirectoryProcessor/Execution/DicomDirectoryProcessorHost.cs index 59a4ce6f2..02e5ccc57 100644 --- a/src/applications/Applications.DicomDirectoryProcessor/Execution/DicomDirectoryProcessorHost.cs +++ b/src/applications/Applications.DicomDirectoryProcessor/Execution/DicomDirectoryProcessorHost.cs @@ -55,25 +55,25 @@ public DicomDirectoryProcessorHost(GlobalOptions globals, DicomDirectoryProcesso Logger.Info("Creating PACS directory finder"); _ddf = new PacsDirectoryFinder(globals.FileSystemOptions!.FileSystemRoot!, - globals.FileSystemOptions.DicomSearchPattern!, RabbitMqAdapter.SetupProducer(globals.ProcessDirectoryOptions!.AccessionDirectoryProducerOptions!, isBatch: false)); + globals.FileSystemOptions.DicomSearchPattern!, MessageBroker.SetupProducer(globals.ProcessDirectoryOptions!.AccessionDirectoryProducerOptions!, isBatch: false)); break; case "list": Logger.Info("Creating accession directory lister"); _ddf = new AccessionDirectoryLister(globals.FileSystemOptions!.FileSystemRoot!, - globals.FileSystemOptions.DicomSearchPattern!, RabbitMqAdapter.SetupProducer(globals.ProcessDirectoryOptions!.AccessionDirectoryProducerOptions!, isBatch: false)); + globals.FileSystemOptions.DicomSearchPattern!, MessageBroker.SetupProducer(globals.ProcessDirectoryOptions!.AccessionDirectoryProducerOptions!, isBatch: false)); break; case "default": Logger.Info("Creating basic directory finder"); _ddf = new BasicDicomDirectoryFinder(globals.FileSystemOptions!.FileSystemRoot!, - globals.FileSystemOptions.DicomSearchPattern!, RabbitMqAdapter.SetupProducer(globals.ProcessDirectoryOptions!.AccessionDirectoryProducerOptions!, isBatch: false)); + globals.FileSystemOptions.DicomSearchPattern!, MessageBroker.SetupProducer(globals.ProcessDirectoryOptions!.AccessionDirectoryProducerOptions!, isBatch: false)); break; case "zips": Logger.Info("Creating zip directory finder"); _ddf = new ZipDicomDirectoryFinder(globals.FileSystemOptions!.FileSystemRoot!, - globals.FileSystemOptions.DicomSearchPattern!, RabbitMqAdapter.SetupProducer(globals.ProcessDirectoryOptions!.AccessionDirectoryProducerOptions!, isBatch: false)); + globals.FileSystemOptions.DicomSearchPattern!, MessageBroker.SetupProducer(globals.ProcessDirectoryOptions!.AccessionDirectoryProducerOptions!, isBatch: false)); break; default: throw new ArgumentException( diff --git a/src/applications/Applications.ExtractImages/ExtractImagesHost.cs b/src/applications/Applications.ExtractImages/ExtractImagesHost.cs index aeeba7c4f..f26158c13 100644 --- a/src/applications/Applications.ExtractImages/ExtractImagesHost.cs +++ b/src/applications/Applications.ExtractImages/ExtractImagesHost.cs @@ -27,13 +27,13 @@ public ExtractImagesHost( GlobalOptions globals, ExtractImagesCliOptions cliOptions, IExtractionMessageSender? extractionMessageSender = null, - IRabbitMqAdapter? rabbitMqAdapter = null, + IMessageBroker? messageBroker = null, IFileSystem? fileSystem = null, bool threaded = false ) : base( globals, - rabbitMqAdapter, + messageBroker, threaded ) { @@ -64,8 +64,8 @@ public ExtractImagesHost( if (extractionMessageSender == null) { - IProducerModel extractionRequestProducer = RabbitMqAdapter.SetupProducer(options.ExtractionRequestProducerOptions!, isBatch: false); - IProducerModel extractionRequestInfoProducer = RabbitMqAdapter.SetupProducer(options.ExtractionRequestInfoProducerOptions!, isBatch: false); + IProducerModel extractionRequestProducer = MessageBroker.SetupProducer(options.ExtractionRequestProducerOptions!, isBatch: false); + IProducerModel extractionRequestInfoProducer = MessageBroker.SetupProducer(options.ExtractionRequestInfoProducerOptions!, isBatch: false); _extractionMessageSender = new ExtractionMessageSender( options, diff --git a/src/applications/Applications.TriggerUpdates/Execution/TriggerUpdatesHost.cs b/src/applications/Applications.TriggerUpdates/Execution/TriggerUpdatesHost.cs index 2fdf40f21..114d7de35 100644 --- a/src/applications/Applications.TriggerUpdates/Execution/TriggerUpdatesHost.cs +++ b/src/applications/Applications.TriggerUpdates/Execution/TriggerUpdatesHost.cs @@ -11,11 +11,11 @@ public class TriggerUpdatesHost : MicroserviceHost private ITriggerUpdatesSource _source; private IProducerModel _producer; - public TriggerUpdatesHost(GlobalOptions options,ITriggerUpdatesSource source,IRabbitMqAdapter? rabbitMqAdapter = null) - : base(options, rabbitMqAdapter) + public TriggerUpdatesHost(GlobalOptions options,ITriggerUpdatesSource source,IMessageBroker? messageBroker = null) + : base(options, messageBroker) { _source = source; - _producer = RabbitMqAdapter.SetupProducer(options.TriggerUpdatesOptions!, isBatch: false); + _producer = MessageBroker.SetupProducer(options.TriggerUpdatesOptions!, isBatch: false); } public override void Start() diff --git a/src/applications/Setup/EnvironmentProbe.cs b/src/applications/Setup/EnvironmentProbe.cs index a3d062707..ca8d87823 100644 --- a/src/applications/Setup/EnvironmentProbe.cs +++ b/src/applications/Setup/EnvironmentProbe.cs @@ -21,6 +21,7 @@ using Rdmp.Core.ReusableLibraryCode; using Rdmp.Core.ReusableLibraryCode.Checks; using Rdmp.Core.ReusableLibraryCode.Progress; +using Smi.Common.Messaging; namespace Setup { @@ -225,8 +226,7 @@ internal void CheckMicroservices(IDataLoadEventListener? listener = null) try { - var factory = Options.RabbitOptions.CreateConnectionFactory(); - var adapter = new RabbitMqAdapter(factory, "setup"); + var adapter = new RabbitMQBroker(Options.RabbitOptions, "setup"); return new CheckEventArgs("Connected to RabbitMq", CheckResult.Success); } diff --git a/src/common/Smi.Common/ConnectionFactoryExtensions.cs b/src/common/Smi.Common/ConnectionFactoryExtensions.cs deleted file mode 100644 index 7f2ff0ac6..000000000 --- a/src/common/Smi.Common/ConnectionFactoryExtensions.cs +++ /dev/null @@ -1,18 +0,0 @@ -using RabbitMQ.Client; -using Smi.Common.Options; - -namespace Smi.Common -{ - public static class ConnectionFactoryExtensions - { - public static ConnectionFactory CreateConnectionFactory(this RabbitOptions options) - => new() - { - HostName = options.RabbitMqHostName, - VirtualHost = options.RabbitMqVirtualHost, - Port = options.RabbitMqHostPort, - UserName = options.RabbitMqUserName, - Password = options.RabbitMqPassword - }; - } -} diff --git a/src/common/Smi.Common/Execution/MicroserviceHost.cs b/src/common/Smi.Common/Execution/MicroserviceHost.cs index c9525a64a..943a21421 100644 --- a/src/common/Smi.Common/Execution/MicroserviceHost.cs +++ b/src/common/Smi.Common/Execution/MicroserviceHost.cs @@ -1,6 +1,5 @@ using DicomTypeTranslation; using NLog; -using RabbitMQ.Client; using Smi.Common.Events; using Smi.Common.Helpers; using Smi.Common.Messages; @@ -22,7 +21,7 @@ public abstract class MicroserviceHost : IMicroserviceHost protected readonly GlobalOptions Globals; protected readonly ILogger Logger; - protected readonly IRabbitMqAdapter RabbitMqAdapter; + protected readonly IMessageBroker MessageBroker; private readonly object _oAdapterLock = new(); @@ -41,11 +40,11 @@ public abstract class MicroserviceHost : IMicroserviceHost /// Loads logging, sets up fatal behaviour, subscribes rabbit etc. /// /// Settings for the microservice (location of rabbit, queue names etc) - /// + /// /// protected MicroserviceHost( GlobalOptions globals, - IRabbitMqAdapter? rabbitMqAdapter = null, + IMessageBroker? messageBroker = null, bool threaded = false) { if (globals == null || globals.FileSystemOptions == null || globals.RabbitOptions == null || globals.LoggingOptions == null) @@ -81,15 +80,14 @@ protected MicroserviceHost( OnFatal += (sender, args) => Fatal(args.Message, args.Exception); - if (rabbitMqAdapter == null) + if (messageBroker == null) { - ConnectionFactory connectionFactory = globals.RabbitOptions.CreateConnectionFactory(); - rabbitMqAdapter = new RabbitMqAdapter(connectionFactory, HostProcessName + HostProcessID, OnFatal, threaded); + messageBroker = new RabbitMQBroker(globals.RabbitOptions, HostProcessName + HostProcessID, OnFatal, threaded); var controlExchangeName = globals.RabbitOptions.RabbitMqControlExchangeName ?? throw new ArgumentNullException(nameof(globals.RabbitOptions.RabbitMqControlExchangeName)); - _controlMessageConsumer = new ControlMessageConsumer(connectionFactory, HostProcessName, HostProcessID, controlExchangeName, Stop); + _controlMessageConsumer = new ControlMessageConsumer(globals.RabbitOptions, HostProcessName, HostProcessID, controlExchangeName, Stop); } - RabbitMqAdapter = rabbitMqAdapter; + MessageBroker = messageBroker; ObjectFactory = new MicroserviceObjectFactory(); ObjectFactory.FatalHandler = (s, e) => Fatal(e.Message, e.Exception); @@ -118,11 +116,11 @@ public void StartAuxConnections() _auxConnectionsCreated = true; // Ensures no consumers have been started until we explicitly call Start() - if (RabbitMqAdapter.HasConsumers) + if (MessageBroker.HasConsumers) throw new ApplicationException("Rabbit adapter has consumers before aux. connections created"); - _fatalLoggingProducer = RabbitMqAdapter.SetupProducer(_fatalLoggingProducerOptions, isBatch: false); - RabbitMqAdapter.StartConsumer(_controlMessageConsumer.ControlConsumerOptions, _controlMessageConsumer, isSolo: false); + _fatalLoggingProducer = MessageBroker.SetupProducer(_fatalLoggingProducerOptions, isBatch: false); + MessageBroker.StartConsumer(_controlMessageConsumer.ControlConsumerOptions, _controlMessageConsumer, isSolo: false); } } @@ -156,7 +154,7 @@ public virtual void Stop(string reason) lock (_oAdapterLock) { - RabbitMqAdapter.Shutdown(Common.RabbitMqAdapter.DefaultOperationTimeout); + MessageBroker.Shutdown(RabbitMQBroker.DefaultOperationTimeout); } Logger.Info("Host stop completed"); @@ -190,7 +188,7 @@ public void Fatal(string msg, Exception? exception) public void Wait() { - RabbitMqAdapter.Wait(); + MessageBroker.Wait(); } } } diff --git a/src/common/Smi.Common/IRabbitMqAdapter.cs b/src/common/Smi.Common/IMessageBroker.cs similarity index 94% rename from src/common/Smi.Common/IRabbitMqAdapter.cs rename to src/common/Smi.Common/IMessageBroker.cs index 225547660..91109716e 100644 --- a/src/common/Smi.Common/IRabbitMqAdapter.cs +++ b/src/common/Smi.Common/IMessageBroker.cs @@ -5,7 +5,7 @@ namespace Smi.Common { - public interface IRabbitMqAdapter + public interface IMessageBroker { bool HasConsumers { get; } diff --git a/src/common/Smi.Common/Messaging/Consumer.cs b/src/common/Smi.Common/Messaging/Consumer.cs index 864eb283b..567e0df52 100644 --- a/src/common/Smi.Common/Messaging/Consumer.cs +++ b/src/common/Smi.Common/Messaging/Consumer.cs @@ -234,7 +234,7 @@ protected void Ack(IList batchHeaders, ulong latestDeliveryTag) } /// - /// Logs a Fatal in the Logger and triggers the FatalError event which should shutdown the RabbitMQAdapter + /// Logs a Fatal in the Logger and triggers the FatalError event which should shutdown the MessageBroker /// Do not do any further processing after triggering this method /// /// diff --git a/src/common/Smi.Common/Messaging/ControlMessageConsumer.cs b/src/common/Smi.Common/Messaging/ControlMessageConsumer.cs index 26e932e1a..0fd44261f 100644 --- a/src/common/Smi.Common/Messaging/ControlMessageConsumer.cs +++ b/src/common/Smi.Common/Messaging/ControlMessageConsumer.cs @@ -34,7 +34,7 @@ public class ControlMessageConsumer : Consumer public ControlMessageConsumer( - IConnectionFactory connectionFactory, + RabbitOptions rabbitOptions, string processName, int processId, string controlExchangeName, @@ -47,7 +47,14 @@ public ControlMessageConsumer( ControlConsumerOptions.QueueName = $"Control.{_processName}{_processId}"; - _factory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory)); + _factory = new ConnectionFactory() + { + HostName = rabbitOptions.RabbitMqHostName, + VirtualHost = rabbitOptions.RabbitMqVirtualHost, + Port = rabbitOptions.RabbitMqHostPort, + UserName = rabbitOptions.RabbitMqUserName, + Password = rabbitOptions.RabbitMqPassword + }; if (controlExchangeName == null) throw new ArgumentNullException(nameof(controlExchangeName)); @@ -160,7 +167,7 @@ public override void Shutdown() /// /// Creates a one-time connection to set up the required control queue and bindings on the RabbitMQ server. - /// The connection is disposed and StartConsumer(...) can then be called on the parent RabbitMQAdapter with ControlConsumerOptions + /// The connection is disposed and StartConsumer(...) can then be called on the parent MessageBroker with ControlConsumerOptions /// /// private void SetupControlQueueForHost(string controlExchangeName) diff --git a/src/common/Smi.Common/Messaging/IConsumer.cs b/src/common/Smi.Common/Messaging/IConsumer.cs index 86f7fda3e..15a06848d 100644 --- a/src/common/Smi.Common/Messaging/IConsumer.cs +++ b/src/common/Smi.Common/Messaging/IConsumer.cs @@ -7,7 +7,7 @@ namespace Smi.Common.Messaging { /// - /// Interface for an object which handles messages obtained by a RabbitMQAdapter. + /// Interface for an object which handles messages obtained by a MessageBroker. /// public interface IConsumer { diff --git a/src/common/Smi.Common/Messaging/MessageBrokerFactory.cs b/src/common/Smi.Common/Messaging/MessageBrokerFactory.cs new file mode 100644 index 000000000..0fdd96fba --- /dev/null +++ b/src/common/Smi.Common/Messaging/MessageBrokerFactory.cs @@ -0,0 +1,27 @@ +using Smi.Common.Options; +using System; +using System.Diagnostics.CodeAnalysis; + +namespace Smi.Common.Messaging; + +public static class MessageBrokerFactory +{ + [ExcludeFromCodeCoverage] // NOTE(rkm 2024-02-08) This can be removed after we use the class + public static IMessageBroker Create(GlobalOptions globals, string connectionIdentifier) + { + switch (globals.MessageBrokerType) + { + case MessageBrokerType.RabbitMQ: + { + if (globals.RabbitOptions == null) + throw new ArgumentNullException(nameof(globals), $"{nameof(globals.RabbitOptions)} must not be null"); + + return new RabbitMQBroker(globals.RabbitOptions, connectionIdentifier); + } + case MessageBrokerType.None: + throw new ArgumentOutOfRangeException(nameof(globals.MessageBrokerType), $"A valid {nameof(MessageBrokerType)} must be chosen"); + default: + throw new NotImplementedException($"No case for {globals.MessageBrokerType}"); + } + } +} diff --git a/src/common/Smi.Common/Messaging/MessageBrokerType.cs b/src/common/Smi.Common/Messaging/MessageBrokerType.cs new file mode 100644 index 000000000..4b299262c --- /dev/null +++ b/src/common/Smi.Common/Messaging/MessageBrokerType.cs @@ -0,0 +1,11 @@ +namespace Smi.Common.Messaging; + +public enum MessageBrokerType +{ + /// + /// Unused placeholder value + /// + None = 0, + + RabbitMQ, +} diff --git a/src/common/Smi.Common/RabbitMQAdapter.cs b/src/common/Smi.Common/Messaging/RabbitMQBroker.cs similarity index 94% rename from src/common/Smi.Common/RabbitMQAdapter.cs rename to src/common/Smi.Common/Messaging/RabbitMQBroker.cs index c5d8c13a1..070b8b74c 100644 --- a/src/common/Smi.Common/RabbitMQAdapter.cs +++ b/src/common/Smi.Common/Messaging/RabbitMQBroker.cs @@ -1,4 +1,3 @@ - using System; using System.Collections.Generic; using System.Linq; @@ -9,15 +8,14 @@ using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; using Smi.Common.Events; -using Smi.Common.Messaging; using Smi.Common.Options; -namespace Smi.Common +namespace Smi.Common.Messaging { /// /// Adapter for the RabbitMQ API. /// - public class RabbitMqAdapter : IRabbitMqAdapter + public class RabbitMQBroker : IMessageBroker { /// /// Used to ensure we can't create any new connections after we have called Shutdown() @@ -38,12 +36,11 @@ public bool HasConsumers public const string RabbitMqRoutingKey_MatchAnything = "#"; public const string RabbitMqRoutingKey_MatchOneWord = "*"; - public static TimeSpan DefaultOperationTimeout = TimeSpan.FromSeconds(5); + public static readonly TimeSpan DefaultOperationTimeout = TimeSpan.FromSeconds(5); private readonly ILogger _logger = LogManager.GetCurrentClassLogger(); private readonly HostFatalHandler? _hostFatalHandler; - private readonly string _hostId; private readonly IConnection _connection; private readonly Dictionary _rabbitResources = new(); @@ -54,23 +51,16 @@ public bool HasConsumers private const int MinRabbitServerVersionMinor = 7; private const int MinRabbitServerVersionPatch = 0; - private const int MaxSubscriptionAttempts = 5; - - private readonly bool _threaded; - /// /// /// - /// + /// /// Identifier for this host instance /// /// - public RabbitMqAdapter(IConnectionFactory connectionFactory, string hostId, HostFatalHandler? hostFatalHandler = null, bool threaded = false) + public RabbitMQBroker(RabbitOptions rabbitOptions, string hostId, HostFatalHandler? hostFatalHandler = null, bool threaded = false) { - //_threaded = options.ThreadReceivers; - _threaded = threaded; - - if (_threaded) + if (threaded) { ThreadPool.GetMinThreads(out var minWorker, out var minIOC); var workers = Math.Max(50, minWorker); @@ -82,10 +72,17 @@ public RabbitMqAdapter(IConnectionFactory connectionFactory, string hostId, Host if (string.IsNullOrWhiteSpace(hostId)) throw new ArgumentException("RabbitMQ host ID required", nameof(hostId)); - _hostId = hostId; + var connectionFactory = new ConnectionFactory() + { + HostName = rabbitOptions.RabbitMqHostName, + VirtualHost = rabbitOptions.RabbitMqVirtualHost, + Port = rabbitOptions.RabbitMqHostPort, + UserName = rabbitOptions.RabbitMqUserName, + Password = rabbitOptions.RabbitMqPassword + }; _connection = connectionFactory.CreateConnection(hostId); - _connection.ConnectionBlocked += (s, a) => _logger.Warn($"ConnectionBlocked (Reason: {a.Reason.ToString()})"); + _connection.ConnectionBlocked += (s, a) => _logger.Warn($"ConnectionBlocked (Reason: {a.Reason})"); _connection.ConnectionUnblocked += (s, a) => _logger.Warn("ConnectionUnblocked"); if (hostFatalHandler == null) @@ -302,7 +299,7 @@ public void Shutdown(TimeSpan timeout) } _rabbitResources.Clear(); } - lock(_exitLock) + lock (_exitLock) Monitor.PulseAll(_exitLock); } diff --git a/src/common/Smi.Common/Options/GlobalOptions.cs b/src/common/Smi.Common/Options/GlobalOptions.cs index b9ed08f79..36e13bc41 100644 --- a/src/common/Smi.Common/Options/GlobalOptions.cs +++ b/src/common/Smi.Common/Options/GlobalOptions.cs @@ -8,6 +8,7 @@ using Rdmp.Core.Startup; using Smi.Common.Messages; using Smi.Common.Messages.Extraction; +using Smi.Common.Messaging; using System; using System.Collections.Generic; using System.Linq; @@ -44,6 +45,8 @@ public string HostProcessName } } + public MessageBrokerType? MessageBrokerType { get; set; } = Messaging.MessageBrokerType.RabbitMQ; + public LoggingOptions? LoggingOptions { get; set; } = new LoggingOptions(); public RabbitOptions? RabbitOptions { get; set; } = new RabbitOptions(); public FileSystemOptions? FileSystemOptions { get; set; } = new FileSystemOptions(); @@ -603,11 +606,10 @@ public class RabbitOptions : IOptions public string RabbitMqHostName { get; set; } = "localhost"; public int RabbitMqHostPort { get; set; } = 5672; public string? RabbitMqVirtualHost { get; set; } = "/"; - public string? RabbitMqUserName { get; set; } - public string? RabbitMqPassword { get; set; } + public string? RabbitMqUserName { get; set; } = "guest"; + public string? RabbitMqPassword { get; set; } = "guest"; public string? FatalLoggingExchange { get; set; } public string? RabbitMqControlExchangeName { get; set; } - public bool ThreadReceivers { get; set; } public override string ToString() { diff --git a/src/common/Smi.Common/RabbitMQAdapter.cd b/src/common/Smi.Common/RabbitMQAdapter.cd deleted file mode 100644 index 3423e13bc..000000000 --- a/src/common/Smi.Common/RabbitMQAdapter.cd +++ /dev/null @@ -1,72 +0,0 @@ - - - - - - AAAAAAAAAAAAAAAEgAAEAAAAAAAAAIAAAAAAAAAAImA= - Options\ProducerOptions.cs - - - - - - AAAAAEAAAAAAAAAEAAAAAAAAAAAAAQAAAAgAAAAAIAA= - Options\ConsumerOptions.cs - - - - - - - - RabbitMQAdapter.cs - - - - - - RabbitMQAdapter.cs - - - - - RabbitMQAdapter.cs - - - - - AAAAAQAIRJCAAAAEoACwIQAAIAAAAAAIBEACAgAEAAA= - RabbitMQAdapter.cs - - - - - - - - - - AAAAAAAAAAAAAAAAAAAAAAAAAAgAAAAAAAAAgAAAAAA= - Options\CliOptions.cs - - - - - - - - - - AAAAAAAACAAAAAAAAAAAAQAAAAAEAABAAAAAAAAAAAA= - Messaging\IConsumer.cs - - - - - - AAAAAEAAAAAAAAAAACAAAAAAAAAAAABAAAAAAAAAAAA= - Messaging\IProducerModel.cs - - - - diff --git a/src/microservices/Microservices.CohortExtractor/Execution/CohortExtractorHost.cs b/src/microservices/Microservices.CohortExtractor/Execution/CohortExtractorHost.cs index 0fdff33b5..56cdf0910 100644 --- a/src/microservices/Microservices.CohortExtractor/Execution/CohortExtractorHost.cs +++ b/src/microservices/Microservices.CohortExtractor/Execution/CohortExtractorHost.cs @@ -75,14 +75,14 @@ public override void Start() foreach (var args in toMemory.Messages.Where(static m => m.Result == CheckResult.Fail)) Logger.Log(LogLevel.Warn, args.Ex, args.Message); - _fileMessageProducer = RabbitMqAdapter.SetupProducer(Globals.CohortExtractorOptions!.ExtractFilesProducerOptions!, isBatch: true); - var fileMessageInfoProducer = RabbitMqAdapter.SetupProducer(Globals.CohortExtractorOptions.ExtractFilesInfoProducerOptions!, isBatch: false); + _fileMessageProducer = MessageBroker.SetupProducer(Globals.CohortExtractorOptions!.ExtractFilesProducerOptions!, isBatch: true); + var fileMessageInfoProducer = MessageBroker.SetupProducer(Globals.CohortExtractorOptions.ExtractFilesInfoProducerOptions!, isBatch: false); InitializeExtractionSources(repositoryLocator); Consumer = new ExtractionRequestQueueConsumer(Globals.CohortExtractorOptions, _fulfiller!, _auditor!, _pathResolver!, _fileMessageProducer, fileMessageInfoProducer); - RabbitMqAdapter.StartConsumer(_consumerOptions, Consumer, isSolo: false); + MessageBroker.StartConsumer(_consumerOptions, Consumer, isSolo: false); } public override void Stop(string reason) diff --git a/src/microservices/Microservices.CohortPackager/Execution/CohortPackagerHost.cs b/src/microservices/Microservices.CohortPackager/Execution/CohortPackagerHost.cs index 1a0b67e94..ff82c711f 100644 --- a/src/microservices/Microservices.CohortPackager/Execution/CohortPackagerHost.cs +++ b/src/microservices/Microservices.CohortPackager/Execution/CohortPackagerHost.cs @@ -40,7 +40,7 @@ public class CohortPackagerHost : MicroserviceHost /// Globals.CohortPackagerOptions.ReportFormat. That value should not be set if a reporter is passed. /// /// - /// + /// /// public CohortPackagerHost( GlobalOptions globals, @@ -48,10 +48,10 @@ public CohortPackagerHost( IFileSystem? fileSystem = null, IJobReporter? reporter = null, IJobCompleteNotifier? notifier = null, - IRabbitMqAdapter? rabbitMqAdapter = null, + IMessageBroker? messageBroker = null, DateTimeProvider? dateTimeProvider = null ) - : base(globals, rabbitMqAdapter) + : base(globals, messageBroker) { var cohortPackagerOptions = globals.CohortPackagerOptions ?? throw new ArgumentNullException(nameof(globals), "CohortPackagerOptions cannot be null"); @@ -136,10 +136,10 @@ public override void Start() _jobWatcher.Start(); // TODO(rkm 2020-03-02) Once this is transactional, we can have one "master" service which actually does the job checking - RabbitMqAdapter.StartConsumer(Globals.CohortPackagerOptions!.ExtractRequestInfoOptions!, _requestInfoMessageConsumer, isSolo: true); - RabbitMqAdapter.StartConsumer(Globals.CohortPackagerOptions.FileCollectionInfoOptions!, _fileCollectionMessageConsumer, isSolo: true); - RabbitMqAdapter.StartConsumer(Globals.CohortPackagerOptions.NoVerifyStatusOptions!, _anonFailedMessageConsumer, isSolo: true); - RabbitMqAdapter.StartConsumer(Globals.CohortPackagerOptions.VerificationStatusOptions!, _anonVerificationMessageConsumer, isSolo: true); + MessageBroker.StartConsumer(Globals.CohortPackagerOptions!.ExtractRequestInfoOptions!, _requestInfoMessageConsumer, isSolo: true); + MessageBroker.StartConsumer(Globals.CohortPackagerOptions.FileCollectionInfoOptions!, _fileCollectionMessageConsumer, isSolo: true); + MessageBroker.StartConsumer(Globals.CohortPackagerOptions.NoVerifyStatusOptions!, _anonFailedMessageConsumer, isSolo: true); + MessageBroker.StartConsumer(Globals.CohortPackagerOptions.VerificationStatusOptions!, _anonVerificationMessageConsumer, isSolo: true); } public override void Stop(string reason) diff --git a/src/microservices/Microservices.DicomAnonymiser/DicomAnonymiserHost.cs b/src/microservices/Microservices.DicomAnonymiser/DicomAnonymiserHost.cs index c8f71fec7..133dd8998 100644 --- a/src/microservices/Microservices.DicomAnonymiser/DicomAnonymiserHost.cs +++ b/src/microservices/Microservices.DicomAnonymiser/DicomAnonymiserHost.cs @@ -20,7 +20,7 @@ public DicomAnonymiserHost( { _anonymiser = anonymiser ?? AnonymiserFactory.CreateAnonymiser(Globals.DicomAnonymiserOptions!); - var producerModel = RabbitMqAdapter.SetupProducer(options.DicomAnonymiserOptions!.ExtractFileStatusProducerOptions!, isBatch: false); + var producerModel = MessageBroker.SetupProducer(options.DicomAnonymiserOptions!.ExtractFileStatusProducerOptions!, isBatch: false); _consumer = new DicomAnonymiserConsumer( Globals.DicomAnonymiserOptions!, @@ -34,7 +34,7 @@ public DicomAnonymiserHost( public override void Start() { - RabbitMqAdapter.StartConsumer(Globals.DicomAnonymiserOptions!.AnonFileConsumerOptions!, _consumer, isSolo: false); + MessageBroker.StartConsumer(Globals.DicomAnonymiserOptions!.AnonFileConsumerOptions!, _consumer, isSolo: false); } public override void Stop(string reason) diff --git a/src/microservices/Microservices.DicomRelationalMapper/Execution/DicomRelationalMapperHost.cs b/src/microservices/Microservices.DicomRelationalMapper/Execution/DicomRelationalMapperHost.cs index 14922fd19..50e9151ef 100644 --- a/src/microservices/Microservices.DicomRelationalMapper/Execution/DicomRelationalMapperHost.cs +++ b/src/microservices/Microservices.DicomRelationalMapper/Execution/DicomRelationalMapperHost.cs @@ -61,7 +61,7 @@ public override void Start() RunChecks = Globals.DicomRelationalMapperOptions.RunChecks }; - RabbitMqAdapter.StartConsumer(Globals.DicomRelationalMapperOptions, Consumer, isSolo: false); + MessageBroker.StartConsumer(Globals.DicomRelationalMapperOptions, Consumer, isSolo: false); } private void Startup_DatabaseFound(object sender, PlatformDatabaseFoundEventArgs e) diff --git a/src/microservices/Microservices.DicomReprocessor/Execution/DicomReprocessorHost.cs b/src/microservices/Microservices.DicomReprocessor/Execution/DicomReprocessorHost.cs index 9794bdd6e..c72b95f0e 100644 --- a/src/microservices/Microservices.DicomReprocessor/Execution/DicomReprocessorHost.cs +++ b/src/microservices/Microservices.DicomReprocessor/Execution/DicomReprocessorHost.cs @@ -28,7 +28,7 @@ public DicomReprocessorHost(GlobalOptions options, DicomReprocessorCliOptions cl // Set the initial sleep time Globals.DicomReprocessorOptions!.SleepTime = TimeSpan.FromMilliseconds(cliOptions.SleepTimeMs); - IProducerModel reprocessingProducerModel = RabbitMqAdapter.SetupProducer(options.DicomReprocessorOptions!.ReprocessingProducerOptions!, true); + IProducerModel reprocessingProducerModel = MessageBroker.SetupProducer(options.DicomReprocessorOptions!.ReprocessingProducerOptions!, true); Logger.Info("Documents will be reprocessed to " + options.DicomReprocessorOptions.ReprocessingProducerOptions!.ExchangeName + " on vhost " + diff --git a/src/microservices/Microservices.DicomTagReader/Execution/DicomTagReaderHost.cs b/src/microservices/Microservices.DicomTagReader/Execution/DicomTagReaderHost.cs index 320959ae1..e8d0e5553 100644 --- a/src/microservices/Microservices.DicomTagReader/Execution/DicomTagReaderHost.cs +++ b/src/microservices/Microservices.DicomTagReader/Execution/DicomTagReaderHost.cs @@ -32,11 +32,11 @@ public DicomTagReaderHost(GlobalOptions options) { Logger.Debug( $"Creating seriesProducerModel with ExchangeName: {options.DicomTagReaderOptions.SeriesProducerOptions!.ExchangeName}"); - seriesProducerModel = RabbitMqAdapter.SetupProducer(options.DicomTagReaderOptions.SeriesProducerOptions, true); + seriesProducerModel = MessageBroker.SetupProducer(options.DicomTagReaderOptions.SeriesProducerOptions, true); Logger.Debug( $"Creating imageProducerModel with ExchangeName: {options.DicomTagReaderOptions.ImageProducerOptions!.ExchangeName}"); - imageProducerModel = RabbitMqAdapter.SetupProducer(options.DicomTagReaderOptions.ImageProducerOptions, true); + imageProducerModel = MessageBroker.SetupProducer(options.DicomTagReaderOptions.ImageProducerOptions, true); } catch (Exception e) { @@ -59,7 +59,7 @@ public DicomTagReaderHost(GlobalOptions options) public override void Start() { // Start the consumer to await callbacks when messages arrive - RabbitMqAdapter.StartConsumer(Globals.DicomTagReaderOptions!, AccessionDirectoryMessageConsumer, isSolo: false); + MessageBroker.StartConsumer(Globals.DicomTagReaderOptions!, AccessionDirectoryMessageConsumer, isSolo: false); Logger.Debug("Consumer started"); } diff --git a/src/microservices/Microservices.FileCopier/Execution/FileCopierHost.cs b/src/microservices/Microservices.FileCopier/Execution/FileCopierHost.cs index bae4b1fc6..d3c2ba999 100644 --- a/src/microservices/Microservices.FileCopier/Execution/FileCopierHost.cs +++ b/src/microservices/Microservices.FileCopier/Execution/FileCopierHost.cs @@ -21,7 +21,7 @@ public FileCopierHost( { Logger.Debug("Creating FileCopierHost with FileSystemRoot: " + Globals.FileSystemOptions!.FileSystemRoot); - IProducerModel copyStatusProducerModel = RabbitMqAdapter.SetupProducer(Globals.FileCopierOptions!.CopyStatusProducerOptions!, isBatch: false); + IProducerModel copyStatusProducerModel = MessageBroker.SetupProducer(Globals.FileCopierOptions!.CopyStatusProducerOptions!, isBatch: false); var fileCopier = new ExtractionFileCopier( Globals.FileCopierOptions, @@ -35,7 +35,7 @@ public FileCopierHost( public override void Start() { - RabbitMqAdapter.StartConsumer(Globals.FileCopierOptions!, _consumer, isSolo: false); + MessageBroker.StartConsumer(Globals.FileCopierOptions!, _consumer, isSolo: false); } } } diff --git a/src/microservices/Microservices.IdentifierMapper/Execution/IdentifierMapperHost.cs b/src/microservices/Microservices.IdentifierMapper/Execution/IdentifierMapperHost.cs index e0c9b0978..a5a2a2e98 100644 --- a/src/microservices/Microservices.IdentifierMapper/Execution/IdentifierMapperHost.cs +++ b/src/microservices/Microservices.IdentifierMapper/Execution/IdentifierMapperHost.cs @@ -62,7 +62,7 @@ public IdentifierMapperHost(GlobalOptions options, ISwapIdentifiers? swapper = n Logger.Info($"Swapper of type {_swapper.GetType()} created"); // Batching now handled implicitly as backlog demands - _producerModel = RabbitMqAdapter.SetupProducer(options.IdentifierMapperOptions.AnonImagesProducerOptions!, isBatch: true); + _producerModel = MessageBroker.SetupProducer(options.IdentifierMapperOptions.AnonImagesProducerOptions!, isBatch: true); Consumer = new IdentifierMapperQueueConsumer(_producerModel, _swapper) { @@ -75,13 +75,13 @@ public IdentifierMapperHost(GlobalOptions options, ISwapIdentifiers? swapper = n public override void Start() { - _consumerId = RabbitMqAdapter.StartConsumer(_consumerOptions, Consumer, isSolo: false); + _consumerId = MessageBroker.StartConsumer(_consumerOptions, Consumer, isSolo: false); } public override void Stop(string reason) { if (_consumerId != Guid.Empty) - RabbitMqAdapter.StopConsumer(_consumerId, Smi.Common.RabbitMqAdapter.DefaultOperationTimeout); + MessageBroker.StopConsumer(_consumerId, RabbitMQBroker.DefaultOperationTimeout); try { // Wait for any unconfirmed messages before calling stop diff --git a/src/microservices/Microservices.IsIdentifiable/Service/IsIdentifiableHost.cs b/src/microservices/Microservices.IsIdentifiable/Service/IsIdentifiableHost.cs index 3526adb09..9de13665b 100644 --- a/src/microservices/Microservices.IsIdentifiable/Service/IsIdentifiableHost.cs +++ b/src/microservices/Microservices.IsIdentifiable/Service/IsIdentifiableHost.cs @@ -33,14 +33,14 @@ GlobalOptions globals var objectFactory = new MicroserviceObjectFactory(); var classifier = objectFactory.CreateInstance(classifierTypename, typeof(IClassifier).Assembly, new DirectoryInfo(dataDirectory), globals.IsIdentifiableOptions!) ?? throw new TypeLoadException($"Could not find IClassifier Type {classifierTypename}"); - _producerModel = RabbitMqAdapter.SetupProducer(globals.IsIdentifiableServiceOptions.IsIdentifiableProducerOptions!, isBatch: false); + _producerModel = MessageBroker.SetupProducer(globals.IsIdentifiableServiceOptions.IsIdentifiableProducerOptions!, isBatch: false); Consumer = new IsIdentifiableQueueConsumer(_producerModel, globals.FileSystemOptions!.ExtractRoot!, classifier); } public override void Start() { - RabbitMqAdapter.StartConsumer(_consumerOptions, Consumer, isSolo: false); + MessageBroker.StartConsumer(_consumerOptions, Consumer, isSolo: false); } public override void Stop(string reason) diff --git a/src/microservices/Microservices.MongoDbPopulator/Execution/MongoDbPopulatorHost.cs b/src/microservices/Microservices.MongoDbPopulator/Execution/MongoDbPopulatorHost.cs index 02e3735eb..23c76da09 100644 --- a/src/microservices/Microservices.MongoDbPopulator/Execution/MongoDbPopulatorHost.cs +++ b/src/microservices/Microservices.MongoDbPopulator/Execution/MongoDbPopulatorHost.cs @@ -35,7 +35,7 @@ public override void Start() Logger.Info("Starting consumers"); foreach (IMongoDbPopulatorMessageConsumer consumer in Consumers) - RabbitMqAdapter.StartConsumer(consumer.ConsumerOptions, consumer, isSolo: false); + MessageBroker.StartConsumer(consumer.ConsumerOptions, consumer, isSolo: false); Logger.Info("Consumers successfully started"); } diff --git a/src/microservices/Updating/Microservices.UpdateValues/Execution/UpdateValuesHost.cs b/src/microservices/Updating/Microservices.UpdateValues/Execution/UpdateValuesHost.cs index 67ba107ac..00909d98b 100644 --- a/src/microservices/Updating/Microservices.UpdateValues/Execution/UpdateValuesHost.cs +++ b/src/microservices/Updating/Microservices.UpdateValues/Execution/UpdateValuesHost.cs @@ -9,8 +9,8 @@ public class UpdateValuesHost : MicroserviceHost { public UpdateValuesQueueConsumer? Consumer { get; set; } - public UpdateValuesHost(GlobalOptions globals, IRabbitMqAdapter? rabbitMqAdapter = null, bool threaded = false) - : base(globals, rabbitMqAdapter, threaded) + public UpdateValuesHost(GlobalOptions globals, IMessageBroker? messageBroker = null, bool threaded = false) + : base(globals, messageBroker, threaded) { FansiImplementations.Load(); } @@ -21,7 +21,7 @@ public override void Start() IRDMPPlatformRepositoryServiceLocator repositoryLocator = Globals.RDMPOptions!.GetRepositoryProvider(); Consumer = new UpdateValuesQueueConsumer(Globals.UpdateValuesOptions!, repositoryLocator.CatalogueRepository); - RabbitMqAdapter.StartConsumer(Globals.UpdateValuesOptions!, Consumer, isSolo: false); + MessageBroker.StartConsumer(Globals.UpdateValuesOptions!, Consumer, isSolo: false); } } } diff --git a/tests/common/Smi.Common.Tests/HeaderPreservationTest.cs b/tests/common/Smi.Common.Tests/HeaderPreservationTest.cs index 678fc1f44..a3ea58eab 100644 --- a/tests/common/Smi.Common.Tests/HeaderPreservationTest.cs +++ b/tests/common/Smi.Common.Tests/HeaderPreservationTest.cs @@ -34,7 +34,7 @@ public void SendHeader() tester.SendMessage(consumerOptions, header, new TestMessage { Message = "hi" }); consumer = new TestConsumer(); - tester.Adapter.StartConsumer(consumerOptions, consumer); + tester.Broker.StartConsumer(consumerOptions, consumer); TestTimelineAwaiter.Await(() => consumer.Failed || consumer.Passed, "timed out", 5000); diff --git a/tests/common/Smi.Common.Tests/RabbitMqAdapterTests.cs b/tests/common/Smi.Common.Tests/Messaging/RabbitMQBrokerTests.cs similarity index 83% rename from tests/common/Smi.Common.Tests/RabbitMqAdapterTests.cs rename to tests/common/Smi.Common.Tests/Messaging/RabbitMQBrokerTests.cs index 514e3cbb8..234aa2b59 100644 --- a/tests/common/Smi.Common.Tests/RabbitMqAdapterTests.cs +++ b/tests/common/Smi.Common.Tests/Messaging/RabbitMQBrokerTests.cs @@ -15,10 +15,10 @@ using System.Threading; using System.Threading.Channels; -namespace Smi.Common.Tests +namespace Smi.Common.Tests.Messaging { [TestFixture, RequiresRabbit] - public class RabbitMqAdapterTests + public class RabbitMQBrokerTests { private ProducerOptions _testProducerOptions = null!; private ConsumerOptions _testConsumerOptions = null!; @@ -32,7 +32,7 @@ public class RabbitMqAdapterTests public void OneTimeSetUp() { TestLogger.Setup(); - _testOptions = new GlobalOptionsFactory().Load(nameof(RabbitMqAdapterTests)); + _testOptions = new GlobalOptionsFactory().Load(nameof(RabbitMQBrokerTests)); _testProducerOptions = new ProducerOptions { @@ -68,10 +68,10 @@ public void TestSetupProducerThrowsOnNonExistentExchange() ExchangeName = null }; - Assert.Throws(() => _tester.Adapter.SetupProducer(producerOptions)); + Assert.Throws(() => _tester.Broker.SetupProducer(producerOptions)); producerOptions.ExchangeName = "TEST.DoesNotExistExchange"; - Assert.Throws(() => _tester.Adapter.SetupProducer(producerOptions)); + Assert.Throws(() => _tester.Broker.SetupProducer(producerOptions)); } /// @@ -82,7 +82,7 @@ public void TestStartConsumerThrowsOnNonExistentQueue() { var oldq = _testConsumerOptions.QueueName; _testConsumerOptions.QueueName = $"TEST.WrongQueue{new Random().NextInt64()}"; - Assert.Throws(() => _tester.Adapter.StartConsumer(_testConsumerOptions, _mockConsumer)); + Assert.Throws(() => _tester.Broker.StartConsumer(_testConsumerOptions, _mockConsumer)); _testConsumerOptions.QueueName = oldq; } @@ -93,8 +93,8 @@ public void TestStartConsumerThrowsOnNonExistentQueue() public void TestShutdownExitsProperly() { // Setup some consumers/producers so some channels are created - _tester.Adapter.SetupProducer(_testProducerOptions); - _tester.Adapter.StartConsumer(_testConsumerOptions, _mockConsumer); + _tester.Broker.SetupProducer(_testProducerOptions); + _tester.Broker.StartConsumer(_testConsumerOptions, _mockConsumer); } /// @@ -103,7 +103,7 @@ public void TestShutdownExitsProperly() [Test] public void TestShutdownThrowsOnTimeout() { - var testAdapter = new RabbitMqAdapter(_testOptions.RabbitOptions!.CreateConnectionFactory(), "RabbitMqAdapterTests"); + var testAdapter = new RabbitMQBroker(_testOptions.RabbitOptions!, "RabbitMQBrokerTests"); testAdapter.StartConsumer(_testConsumerOptions, _mockConsumer); Assert.Throws(() => testAdapter.Shutdown(TimeSpan.Zero)); } @@ -114,10 +114,10 @@ public void TestShutdownThrowsOnTimeout() [Test] public void TestNoNewConnectionsAfterShutdown() { - var testAdapter = new RabbitMqAdapter(_testOptions.RabbitOptions!.CreateConnectionFactory(), "RabbitMqAdapterTests"); + var testAdapter = new RabbitMQBroker(_testOptions.RabbitOptions!, "RabbitMQBrokerTests"); Assert.False(testAdapter.ShutdownCalled); - testAdapter.Shutdown(RabbitMqAdapter.DefaultOperationTimeout); + testAdapter.Shutdown(RabbitMQBroker.DefaultOperationTimeout); Assert.True(testAdapter.ShutdownCalled); Assert.Throws(() => testAdapter.StartConsumer(_testConsumerOptions, _mockConsumer)); @@ -127,9 +127,9 @@ public void TestNoNewConnectionsAfterShutdown() [Test] public void TestStopConsumer() { - var consumerId = _tester.Adapter.StartConsumer(_testConsumerOptions, _mockConsumer); - Assert.DoesNotThrow(() => _tester.Adapter.StopConsumer(consumerId, RabbitMqAdapter.DefaultOperationTimeout)); - Assert.Throws(() => _tester.Adapter.StopConsumer(consumerId, RabbitMqAdapter.DefaultOperationTimeout)); + var consumerId = _tester.Broker.StartConsumer(_testConsumerOptions, _mockConsumer); + Assert.DoesNotThrow(() => _tester.Broker.StopConsumer(consumerId, RabbitMQBroker.DefaultOperationTimeout)); + Assert.Throws(() => _tester.Broker.StopConsumer(consumerId, RabbitMQBroker.DefaultOperationTimeout)); } [Test] @@ -154,7 +154,7 @@ public void TestGetRabbitServerVersion() [Test] public void TestMultipleConfirmsOk() { - var pm = _tester.Adapter.SetupProducer(_testProducerOptions, true); + var pm = _tester.Broker.SetupProducer(_testProducerOptions, true); pm.SendMessage(new TestMessage(), isInResponseTo: null, routingKey: null); @@ -195,11 +195,11 @@ public void TestMultipleCloseOk() [Test] public void TestWaitAfterChannelClosed() { - var testAdapter = new RabbitMqAdapter(_testOptions.RabbitOptions!.CreateConnectionFactory(), "RabbitMqAdapterTests"); + var testAdapter = new RabbitMQBroker(_testOptions.RabbitOptions!, "RabbitMqAdapterTests"); var model = testAdapter.GetModel("TestConnection"); model.ConfirmSelect(); - testAdapter.Shutdown(RabbitMqAdapter.DefaultOperationTimeout); + testAdapter.Shutdown(RabbitMQBroker.DefaultOperationTimeout); Assert.True(model.IsClosed); Assert.Throws(() => model.WaitForConfirms()); @@ -221,8 +221,8 @@ public void Test_Shutdown(Type consumerType) var consumer = (IConsumer?)Activator.CreateInstance(consumerType); //connect to rabbit with a new consumer - using var tester = new MicroserviceTester(o.RabbitOptions!, new[] { _testConsumerOptions }) {CleanUpAfterTest = false}; - tester.Adapter.StartConsumer(_testConsumerOptions, consumer!, true); + using var tester = new MicroserviceTester(o.RabbitOptions!, new[] { _testConsumerOptions }) { CleanUpAfterTest = false }; + tester.Broker.StartConsumer(_testConsumerOptions, consumer!, true); //send a message to trigger consumer behaviour tester.SendMessage(_testConsumerOptions, new TestMessage()); @@ -231,7 +231,7 @@ public void Test_Shutdown(Type consumerType) Thread.Sleep(3000); //now attempt to shut down adapter - tester.Adapter.Shutdown(RabbitMqAdapter.DefaultOperationTimeout); + tester.Broker.Shutdown(RabbitMQBroker.DefaultOperationTimeout); var expectedErrorMessage = consumer switch { @@ -256,7 +256,7 @@ public void MessageHolds() var consumer = new ThrowingConsumer(); using var tester = new MicroserviceTester(_testOptions.RabbitOptions!, new[] { consumerOptions }); - tester.Adapter.StartConsumer(consumerOptions, consumer!, true); + tester.Broker.StartConsumer(consumerOptions, consumer!, true); tester.SendMessage(consumerOptions, new TestMessage()); Thread.Sleep(500); diff --git a/tests/common/Smi.Common.Tests/MicroserviceTester.cs b/tests/common/Smi.Common.Tests/MicroserviceTester.cs index b21c5368e..8f1c00cd7 100644 --- a/tests/common/Smi.Common.Tests/MicroserviceTester.cs +++ b/tests/common/Smi.Common.Tests/MicroserviceTester.cs @@ -14,7 +14,7 @@ namespace Smi.Common.Tests { public class MicroserviceTester : IDisposable { - public readonly RabbitMqAdapter Adapter; + public readonly RabbitMQBroker Broker; private readonly Dictionary _sendToConsumers = new(); @@ -40,9 +40,9 @@ public MicroserviceTester(RabbitOptions rabbitOptions, params ConsumerOptions[] { CleanUpAfterTest = true; - Adapter = new RabbitMqAdapter(rabbitOptions.CreateConnectionFactory(), "TestHost"); + Broker = new RabbitMQBroker(rabbitOptions, "TestHost"); - using var model = Adapter.GetModel(nameof(MicroserviceTester)); + using var model = Broker.GetModel(nameof(MicroserviceTester)); //setup a sender channel for each of the consumers you want to test sending messages to foreach (ConsumerOptions consumer in peopleYouWantToSendMessagesTo) { @@ -57,7 +57,7 @@ public MicroserviceTester(RabbitOptions rabbitOptions, params ConsumerOptions[] _declaredExchanges.Add(exchangeName); //Create a binding between the exchange and the queue - model.ExchangeDeclare(exchangeName, ExchangeType.Direct, true);//durable seems to be needed because RabbitMQAdapter wants it? + model.ExchangeDeclare(exchangeName, ExchangeType.Direct, true);//durable seems to be needed because MessageBroker wants it? model.QueueDeclare(consumer.QueueName, true, false, false);//shared with other users model.QueueBind(consumer.QueueName, exchangeName, ""); _declaredQueues.Add(consumer.QueueName); @@ -68,13 +68,13 @@ public MicroserviceTester(RabbitOptions rabbitOptions, params ConsumerOptions[] ExchangeName = exchangeName }; - _sendToConsumers.Add(consumer, Adapter.SetupProducer(producerOptions, true)); + _sendToConsumers.Add(consumer, Broker.SetupProducer(producerOptions, true)); } } /// /// Sends the given message to your consumer, you must have passed the consumer into the MicroserviceTester constructor since all adapter setup happens via option - /// at RabbitMQAdapter construction time + /// at MessageBroker construction time /// /// /// @@ -86,7 +86,7 @@ public void SendMessage(ConsumerOptions toConsumer, IMessage msg) /// /// Sends the given message to your consumer, you must have passed the consumer into the MicroserviceTester constructor since all adapter setup happens via option - /// at RabbitMQAdapter construction time + /// at MessageBroker construction time /// /// /// @@ -101,7 +101,7 @@ public void SendMessages(ConsumerOptions toConsumer, IEnumerable messa /// /// Sends the given message to your consumer, you must have passed the consumer into the MicroserviceTester constructor since all adapter setup happens via option - /// at RabbitMQAdapter construction time + /// at MessageBroker construction time /// /// /// @@ -129,7 +129,7 @@ public void CreateExchange(string exchangeName, string? queueName = null, bool i string queueNameToUse = queueName ?? exchangeName.Replace("Exchange", "Queue"); - using var model = Adapter.GetModel(nameof(CreateExchange)); + using var model = Broker.GetModel(nameof(CreateExchange)); //setup a sender channel for each of the consumers you want to test sending messages to //terminate any old queues / exchanges @@ -140,7 +140,7 @@ public void CreateExchange(string exchangeName, string? queueName = null, bool i //Create a binding between the exchange and the queue if (!isSecondaryBinding) - model.ExchangeDeclare(exchangeName, ExchangeType.Direct, true);//durable seems to be needed because RabbitMQAdapter wants it? + model.ExchangeDeclare(exchangeName, ExchangeType.Direct, true);//durable seems to be needed because MessageBroker wants it? model.QueueDeclare(queueNameToUse, true, false, false); //shared with other users model.QueueBind(queueNameToUse, exchangeName, routingKey); @@ -156,7 +156,7 @@ public void CreateExchange(string exchangeName, string? queueName = null, bool i /// public IEnumerable> ConsumeMessages(string queueName) where T : IMessage { - IModel model = Adapter.GetModel($"ConsumeMessages-{queueName}"); + IModel model = Broker.GetModel($"ConsumeMessages-{queueName}"); while (true) { @@ -187,12 +187,12 @@ public void Dispose() if (CleanUpAfterTest) { - using IModel model = Adapter.GetModel(nameof(MicroserviceTester.Dispose)); + using IModel model = Broker.GetModel(nameof(MicroserviceTester.Dispose)); _declaredExchanges.ForEach(x => model.ExchangeDelete(x)); _declaredQueues.ForEach(x => model.QueueDelete(x)); } - Adapter.Shutdown(RabbitMqAdapter.DefaultOperationTimeout); + Broker.Shutdown(RabbitMQBroker.DefaultOperationTimeout); } } } diff --git a/tests/microservices/Microservices.DicomRelationalMapper.Tests/RunMeFirstTests/RunMeFirstMongoServers.cs b/tests/microservices/Microservices.DicomRelationalMapper.Tests/RunMeFirstTests/RunMeFirstMongoServers.cs index 4d164c37e..6d56220ee 100644 --- a/tests/microservices/Microservices.DicomRelationalMapper.Tests/RunMeFirstTests/RunMeFirstMongoServers.cs +++ b/tests/microservices/Microservices.DicomRelationalMapper.Tests/RunMeFirstTests/RunMeFirstMongoServers.cs @@ -1,5 +1,5 @@ using NUnit.Framework; -using Smi.Common; +using Smi.Common.Messaging; using Smi.Common.Options; using Smi.Common.Tests; using System; @@ -22,7 +22,7 @@ public void RabbitAvailable() var options = new GlobalOptionsFactory().Load(nameof(RabbitAvailable)); var rabbitOptions = options.RabbitOptions!; - Assert.DoesNotThrow(()=> _=new RabbitMqAdapter(rabbitOptions.CreateConnectionFactory(),nameof(RabbitAvailable)), $"Rabbit failed with the following configuration:{Environment.NewLine}{rabbitOptions}"); + Assert.DoesNotThrow(()=> _=new RabbitMQBroker(rabbitOptions,nameof(RabbitAvailable)), $"Rabbit failed with the following configuration:{Environment.NewLine}{rabbitOptions}"); } } } diff --git a/tests/microservices/Microservices.FileCopier.Tests/Execution/FileCopierHostTest.cs b/tests/microservices/Microservices.FileCopier.Tests/Execution/FileCopierHostTest.cs index 73553caba..33a8edc42 100644 --- a/tests/microservices/Microservices.FileCopier.Tests/Execution/FileCopierHostTest.cs +++ b/tests/microservices/Microservices.FileCopier.Tests/Execution/FileCopierHostTest.cs @@ -77,7 +77,7 @@ public void Test_FileCopierHost_HappyPath() }; tester.SendMessage(globals.FileCopierOptions, message); - using var model = tester.Adapter.GetModel(nameof(FileCopierHostTest)); + using var model = tester.Broker.GetModel(nameof(FileCopierHostTest)); var consumer = new EventingBasicConsumer(model); ExtractedFileStatusMessage? statusMessage = null; consumer.Received += (_, ea) => statusMessage = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(ea.Body.ToArray()));