forked from microsoft/kernel-memory
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRabbitMQPipeline.cs
153 lines (129 loc) · 5.37 KB
/
RabbitMQPipeline.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.KernelMemory.Diagnostics;
using Microsoft.KernelMemory.Pipeline.Queue;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Microsoft.KernelMemory.Orchestration.RabbitMQ;
public sealed class RabbitMQPipeline : IQueue
{
private readonly ILogger<RabbitMQPipeline> _log;
private readonly IConnection _connection;
private readonly IModel _channel;
private readonly AsyncEventingBasicConsumer _consumer;
private string _queueName = string.Empty;
/// <summary>
/// Create a new RabbitMQ queue instance
/// </summary>
public RabbitMQPipeline(RabbitMqConfig config, ILogger<RabbitMQPipeline>? log = null)
{
this._log = log ?? DefaultLogger<RabbitMQPipeline>.Instance;
// see https://www.rabbitmq.com/dotnet-api-guide.html#consuming-async
var factory = new ConnectionFactory
{
HostName = config.Host,
Port = config.Port,
UserName = config.Username,
Password = config.Password,
VirtualHost = !string.IsNullOrWhiteSpace(config.VirtualHost) ? config.VirtualHost : "/",
DispatchConsumersAsync = true
};
this._connection = factory.CreateConnection();
this._channel = this._connection.CreateModel();
this._channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
this._consumer = new AsyncEventingBasicConsumer(this._channel);
}
/// <inheritdoc />
public Task<IQueue> ConnectToQueueAsync(string queueName, QueueOptions options = default, CancellationToken cancellationToken = default)
{
if (string.IsNullOrEmpty(queueName))
{
throw new ArgumentOutOfRangeException(nameof(queueName), "The queue name is empty");
}
if (!string.IsNullOrEmpty(this._queueName))
{
throw new InvalidOperationException($"The queue is already connected to `{this._queueName}`");
}
this._queueName = queueName;
this._channel.QueueDeclare(
queue: queueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
if (options.DequeueEnabled)
{
this._channel.BasicConsume(queue: this._queueName,
autoAck: false,
consumer: this._consumer);
}
return Task.FromResult<IQueue>(this);
}
/// <inheritdoc />
public Task EnqueueAsync(string message, CancellationToken cancellationToken = default)
{
if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled(cancellationToken);
}
if (string.IsNullOrEmpty(this._queueName))
{
throw new InvalidOperationException("The client must be connected to a queue first");
}
this._log.LogDebug("Sending message...");
this._channel.BasicPublish(
routingKey: this._queueName,
body: Encoding.UTF8.GetBytes(message),
exchange: string.Empty,
basicProperties: null);
this._log.LogDebug("Message sent");
return Task.CompletedTask;
}
/// <inheritdoc />
public void OnDequeue(Func<string, Task<bool>> processMessageAction)
{
this._consumer.Received += async (object sender, BasicDeliverEventArgs args) =>
{
try
{
this._log.LogDebug("Message '{0}' received, expires at {1}", args.BasicProperties.MessageId, args.BasicProperties.Expiration);
byte[] body = args.Body.ToArray();
string message = Encoding.UTF8.GetString(body);
bool success = await processMessageAction.Invoke(message).ConfigureAwait(false);
if (success)
{
this._log.LogTrace("Message '{0}' successfully processed, deleting message", args.BasicProperties.MessageId);
this._channel.BasicAck(args.DeliveryTag, multiple: false);
}
else
{
this._log.LogWarning("Message '{0}' failed to process, putting message back in the queue", args.BasicProperties.MessageId);
this._channel.BasicNack(args.DeliveryTag, multiple: false, requeue: true);
}
}
#pragma warning disable CA1031 // Must catch all to handle queue properly
catch (Exception e)
{
// Exceptions caught by this block:
// - message processing failed with exception
// - failed to delete message from queue
// - failed to unlock message in the queue
this._log.LogWarning(e, "Message '{0}' processing failed with exception, putting message back in the queue", args.BasicProperties.MessageId);
// TODO: verify and document what happens if this fails. RabbitMQ should automatically unlock messages.
this._channel.BasicNack(args.DeliveryTag, multiple: false, requeue: true);
}
#pragma warning restore CA1031
};
}
public void Dispose()
{
this._channel.Close();
this._connection.Close();
this._channel.Dispose();
this._connection.Dispose();
}
}