• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Jericho / Picton.Messaging / 206

07 Jan 2025 01:41AM UTC coverage: 46.591% (-7.4%) from 54.007%
206

push

appveyor

Jericho
Fix error due to missing "using" statement

123 of 264 relevant lines covered (46.59%)

7469.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/Source/Picton.Messaging/AsyncMultiTenantMessagePump.cs
1
using Azure;
2
using Azure.Storage.Queues;
3
using Azure.Storage.Queues.Models;
4
using Microsoft.Extensions.Logging;
5
using Microsoft.Extensions.Logging.Abstractions;
6
using Picton.Messaging.Utilities;
7
using System;
8
using System.Diagnostics.Metrics;
9
using System.Threading;
10
using System.Threading.Tasks;
11

12
namespace Picton.Messaging
13
{
14
        /// <summary>
15
        /// High performance message processor (also known as a message "pump") for Azure storage queues.
16
        ///
17
        /// Designed to monitor multiple Azure storage queues that follow the following naming convention:
18
        /// a common prefix followed by a unique tenant identifier. For example, if the prefix is "myqueue",
19
        /// this message pump will monitor queues such as "myqueue001", myqueue002" and "myqueueabc".
20
        ///
21
        /// Please note that the message pump intentionally ignores queues that follow the following naming convention:
22
        /// - the common prefix without a postfix. For example "myqueue". Notice the absence of a tenant identifier
23
        /// after the "myqueue" part in the name.
24
        /// - The common prefix followed by "-poison". For example "myqueue-poison".
25
        ///
26
        /// Furthermore, the list of queues matching the naming convention is refreshed at regular interval in order
27
        /// to discover new tenant queues that might have been created in the Azure storage account.
28
        /// </summary>
29
        public class AsyncMultiTenantMessagePump
30
        {
31
                #region FIELDS
32

33
                private static readonly TimeSpan _defaultDiscoverQueuesInterval = TimeSpan.FromSeconds(30);
×
34

35
                private readonly MessagePumpOptions _messagePumpOptions;
36
                private readonly string _queueNamePrefix;
37
                private readonly TimeSpan _discoverQueuesInterval;
38
                private readonly TimeSpan? _visibilityTimeout;
39
                private readonly int _maxDequeueCount;
40
                private readonly ILogger _logger;
41
                private readonly AsyncMessagePump _messagePump;
42

43
                #endregion
44

45
                #region PROPERTIES
46

47
                /// <summary>
48
                /// Gets or sets the logic to execute when a message is retrieved from the queue.
49
                /// </summary>
50
                /// <remarks>
51
                /// If exception is thrown when calling OnMessage, it will regard this queue message as failed.
52
                /// </remarks>
53
                public Action<string, CloudMessage, CancellationToken> OnMessage { get; set; }
54

55
                /// <summary>
56
                /// Gets or sets the logic to execute when an error occurs.
57
                /// </summary>
58
                /// <example>
59
                /// <code>
60
                /// OnError = (message, exception, isPoison) => Trace.TraceError("An error occured: {0}", exception);
61
                /// </code>
62
                /// </example>
63
                /// <remarks>
64
                /// When isPoison is set to true, you should copy this message to a poison queue because it will be deleted from the original queue.
65
                /// </remarks>
66
                public Action<string, CloudMessage, Exception, bool> OnError { get; set; }
67

68
                /// <summary>
69
                /// Gets or sets the logic to execute when a queue is empty.
70
                /// </summary>
71
                /// <example>
72
                /// <code>
73
                /// OnQueueEmpty = (queueName, cancellationToken) => _logger.LogInformation("Queue {queueName} is empty", queueName);
74
                /// </code>
75
                /// </example>
76
                /// <remarks>
77
                /// If this property is not set, the default logic is to do nothing.
78
                /// </remarks>
79
                public Action<string, CancellationToken> OnQueueEmpty { get; set; }
80

81
                /// <summary>
82
                /// Gets or sets the logic to execute when all queues are empty.
83
                /// </summary>
84
                /// <example>
85
                /// <code>
86
                /// OnAllQueuesEmpty = (cancellationToken) => _logger.LogInformation("All queues are empty");
87
                /// </code>
88
                /// </example>
89
                /// <remarks>
90
                /// If this property is not set, the default logic is to do nothing.
91
                /// </remarks>
92
                public Action<CancellationToken> OnAllQueuesEmpty { get; set; }
93

94
                #endregion
95

96
                #region CONSTRUCTOR
97

98
                /// <summary>
99
                /// Initializes a new instance of the <see cref="AsyncMultiTenantMessagePump"/> class.
100
                /// </summary>
101
                /// <param name="options">Options for the mesage pump.</param>
102
                /// <param name="queueNamePrefix">The common prefix in the naming convention.</param>
103
                /// <param name="discoverQueuesInterval">The frequency we check for queues in the Azure storage account matching the naming convention. Default is 30 seconds.</param>
104
                /// <param name="visibilityTimeout">The visibility timeout.</param>
105
                /// <param name="maxDequeueCount">The maximum dequeue count.</param>
106
                /// <param name="logger">The logger.</param>
107
                /// <param name="meterFactory">The meter factory.</param>
108
                public AsyncMultiTenantMessagePump(MessagePumpOptions options, string queueNamePrefix, TimeSpan? discoverQueuesInterval = null, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, ILogger logger = null, IMeterFactory meterFactory = null)
109
                {
110
                        if (discoverQueuesInterval != null && discoverQueuesInterval <= TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(discoverQueuesInterval), "The 'discover queues' interval must be greater than zero.");
×
111

112
                        _messagePumpOptions = options;
×
113
                        _queueNamePrefix = queueNamePrefix;
×
114
                        _discoverQueuesInterval = discoverQueuesInterval ?? _defaultDiscoverQueuesInterval;
×
115
                        _visibilityTimeout = visibilityTimeout;
×
116
                        _maxDequeueCount = maxDequeueCount;
×
117
                        _logger = logger ?? NullLogger<AsyncMultiTenantMessagePump>.Instance;
×
118
                        _messagePump = new AsyncMessagePump(options, logger, meterFactory);
×
119
                }
×
120

121
                #endregion
122

123
                #region PUBLIC METHODS
124

125
                /// <summary>
126
                /// Starts the message pump.
127
                /// </summary>
128
                /// <param name="cancellationToken">The cancellation token.</param>
129
                /// <exception cref="System.ArgumentNullException">OnMessage.</exception>
130
                /// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
131
                public async Task StartAsync(CancellationToken cancellationToken)
132
                {
133
                        if (OnMessage == null) throw new Exception($"You must specify a {nameof(OnMessage)} delegate before starting the message pump.");
134

135
                        _messagePump.OnQueueEmpty = (queueName, cancellationToken) => OnQueueEmpty?.Invoke(queueName.TrimStart(_queueNamePrefix), cancellationToken);
136
                        _messagePump.OnAllQueuesEmpty = OnAllQueuesEmpty;
137
                        _messagePump.OnError = (queueName, message, exception, isPoison) => OnError?.Invoke(queueName.TrimStart(_queueNamePrefix), message, exception, isPoison);
138
                        _messagePump.OnMessage = (queueName, message, cancellationToken) => OnMessage?.Invoke(queueName.TrimStart(_queueNamePrefix), message, cancellationToken);
139

140
                        // Define the task that discovers queues that follow the naming convention
141
                        RecurrentCancellableTask.StartNew(
142
                                async () =>
143
                                {
144
                                        try
145
                                        {
146
                                                var queueServiceClient = new QueueServiceClient(_messagePumpOptions.ConnectionString);
147
                                                var response = queueServiceClient.GetQueuesAsync(QueueTraits.None, _queueNamePrefix, cancellationToken);
148
                                                await foreach (Page<QueueItem> queues in response.AsPages())
149
                                                {
150
                                                        foreach (var queue in queues.Values)
151
                                                        {
152
                                                                if (!queue.Name.Equals(_queueNamePrefix, StringComparison.OrdinalIgnoreCase) &&
153
                                                                        !queue.Name.Equals($"{_queueNamePrefix}-poison", StringComparison.OrdinalIgnoreCase))
154
                                                                {
155
                                                                        // AddQueue will make sure to add the queue only if it's not already in the round-robin list of queues.
156
                                                                        _messagePump.AddQueue(
157
                                                                                queue.Name,
158
                                                                                $"{_queueNamePrefix}-poison", // All tenants share the same "poison" queue
159
                                                                                _visibilityTimeout,
160
                                                                                _maxDequeueCount,
161
                                                                                $"{_queueNamePrefix}-oversize-messages"); // All tenants share the same "oversize messages" blob storage
162
                                                                }
163
                                                        }
164
                                                }
165

166
                                                // Please note there is no need to remove queues that no longer exist from the message
167
                                                // pump round-robin list. The reason is: message pump will get a RequestFailedException
168
                                                // with ErrorCode == "QueueNotFound" next time it tries to query those queues and it will
169
                                                // automatically remove them at that time.
170
                                        }
171
                                        catch (Exception e) when (e is TaskCanceledException || e is OperationCanceledException)
172
                                        {
173
                                                // The message pump is shutting down.
174
                                                // This exception can be safely ignored.
175
                                        }
176
                                        catch (Exception e)
177
                                        {
178
                                                _logger.LogError(e.GetBaseException(), "An error occured while fetching the Azure queues that match the naming convention. The error was caught and ignored.");
179
                                        }
180
                                },
181
                                _discoverQueuesInterval,
182
                                cancellationToken,
183
                                TaskCreationOptions.LongRunning);
184

185
                        // Brief pause to ensure the task defined above runs at least once before we start processing messages
186
                        await Task.Delay(500, cancellationToken).ConfigureAwait(false);
187

188
                        await _messagePump.StartAsync(cancellationToken).ConfigureAwait(false);
189
                }
190

191
                #endregion
192
        }
193
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc