• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Jericho / Picton.Messaging / 189

29 Jan 2024 11:11PM UTC coverage: 43.678% (+17.7%) from 26.023%
189

push

appveyor

Jericho
Merge branch 'release/9.0.0'

152 of 348 relevant lines covered (43.68%)

15353.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.0
/Source/Picton.Messaging/AsyncMessagePump.cs
1
using App.Metrics;
2
using Azure;
3
using Azure.Storage.Queues;
4
using Azure.Storage.Queues.Models;
5
using Microsoft.Extensions.Logging;
6
using Picton.Managers;
7
using Picton.Messaging.Utilities;
8
using System;
9
using System.Collections.Concurrent;
10
using System.Collections.Generic;
11
using System.Collections.Immutable;
12
using System.Linq;
13
using System.Runtime.CompilerServices;
14
using System.Text.RegularExpressions;
15
using System.Threading;
16
using System.Threading.Channels;
17
using System.Threading.Tasks;
18

19
namespace Picton.Messaging
20
{
21
        /// <summary>
22
        /// High performance message processor (also known as a message "pump") for Azure storage queues.
23
        /// Designed to monitor either a single queue or a list of queues and process messages as
24
        /// quickly and efficiently as possible.
25
        /// </summary>
26
        public class AsyncMessagePump
27
        {
28
                #region FIELDS
29

30
                private readonly ConcurrentDictionary<string, (QueueConfig Config, QueueManager QueueManager, QueueManager PoisonQueueManager, DateTime LastFetched, TimeSpan FetchDelay)> _queueManagers = new();
22✔
31
                private readonly RoundRobinList<string> _queueNames = new(Enumerable.Empty<string>());
22✔
32

33
                private readonly MessagePumpOptions _messagePumpOptions;
34
                private readonly ILogger _logger;
35
                private readonly IMetrics _metrics;
36
                private readonly bool _metricsTurnedOff;
37

38
                #endregion
39

40
                #region PROPERTIES
41

42
                /// <summary>
43
                /// Gets or sets the logic to execute when a message is retrieved from the queue.
44
                /// </summary>
45
                /// <remarks>
46
                /// If exception is thrown when calling OnMessage, it will regard this queue message as failed.
47
                /// </remarks>
48
                public Action<string, CloudMessage, CancellationToken> OnMessage { get; set; }
49

50
                /// <summary>
51
                /// Gets or sets the logic to execute when an error occurs.
52
                /// </summary>
53
                /// <example>
54
                /// <code>
55
                /// OnError = (message, exception, isPoison) => Trace.TraceError("An error occured: {0}", exception);
56
                /// </code>
57
                /// </example>
58
                /// <remarks>
59
                /// When isPoison is set to true, you should copy this message to a poison queue because it will be deleted from the original queue.
60
                /// </remarks>
61
                public Action<string, CloudMessage, Exception, bool> OnError { get; set; }
62

63
                /// <summary>
64
                /// Gets or sets the logic to execute when all queues are empty.
65
                /// </summary>
66
                /// <example>
67
                /// <code>
68
                /// OnEmpty = cancellationToken => Task.Delay(2500, cancellationToken).Wait();
69
                /// </code>
70
                /// </example>
71
                /// <remarks>
72
                /// If this property is not set, the default logic is to do nothing.
73
                /// </remarks>
74
                public Action<CancellationToken> OnEmpty { get; set; }
75

76
                #endregion
77

78
                #region CONSTRUCTOR
79

80
                /// <summary>
81
                /// Initializes a new instance of the <see cref="AsyncMessagePump"/> class.
82
                /// </summary>
83
                /// <param name="options">Options for the mesage pump.</param>
84
                /// <param name="logger">The logger.</param>
85
                /// <param name="metrics">The system where metrics are published.</param>
86
                public AsyncMessagePump(MessagePumpOptions options, ILogger logger = null, IMetrics metrics = null)
87
                {
88
                        if (options == null) throw new ArgumentNullException(nameof(options));
24✔
89
                        if (string.IsNullOrEmpty(options.ConnectionString)) throw new ArgumentNullException(nameof(options.ConnectionString));
20✔
90
                        if (options.ConcurrentTasks < 1) throw new ArgumentOutOfRangeException(nameof(options.ConcurrentTasks), "Number of concurrent tasks must be greather than zero");
22✔
91
                        if (options.FetchMessagesInterval <= TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(options.FetchMessagesInterval), "Fetch messages interval must be greather than zero");
18✔
92
                        if (options.EmptyQueueFetchDelay <= TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(options.EmptyQueueFetchDelay), "Emnpty queue fetch delay must be greather than zero");
18✔
93
                        if (options.EmptyQueueMaxFetchDelay < options.EmptyQueueFetchDelay) throw new ArgumentOutOfRangeException(nameof(options.EmptyQueueMaxFetchDelay), "Max fetch delay can not be smaller than fetch delay");
18✔
94

95
                        _messagePumpOptions = options;
18✔
96
                        _logger = logger;
18✔
97
                        _metrics = metrics ?? TurnOffMetrics();
18✔
98
                        _metricsTurnedOff = metrics == null;
18✔
99

100
                        InitDefaultActions();
18✔
101
                }
18✔
102

103
                #endregion
104

105
                #region PUBLIC METHODS
106

107
                /// <summary>
108
                /// Add a queue to be monitored.
109
                /// </summary>
110
                /// <param name="queueName">The name of the queue.</param>
111
                /// <param name="poisonQueueName">Optional. The name of the queue where poison messages are automatically moved.</param>
112
                /// <param name="visibilityTimeout">Optional. Specifies the visibility timeout value. The default value is 30 seconds.</param>
113
                /// <param name="maxDequeueCount">Optional. A nonzero integer value that specifies the number of time we try to process a message before giving up and declaring the message to be "poison". The default value is 3.</param>
114
                /// <param name="oversizeMessagesBlobStorageName">Name of the blob storage where messages that exceed the maximum size for a queue message are stored.</param>
115
                public void AddQueue(string queueName, string poisonQueueName = null, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, string oversizeMessagesBlobStorageName = null)
116
                {
117
                        AddQueue(new QueueConfig(queueName, poisonQueueName, visibilityTimeout, maxDequeueCount, oversizeMessagesBlobStorageName));
×
118
                }
×
119

120
                /// <summary>
121
                /// Add a queue to be monitored.
122
                /// </summary>
123
                /// <param name="queueConfig">Queue configuration.</param>
124
                public void AddQueue(QueueConfig queueConfig)
125
                {
126
                        if (string.IsNullOrEmpty(queueConfig.QueueName)) throw new ArgumentNullException(nameof(queueConfig.QueueName));
×
127
                        if (queueConfig.MaxDequeueCount < 1) throw new ArgumentOutOfRangeException(nameof(queueConfig.MaxDequeueCount), "Number of retries must be greater than zero.");
×
128

129
                        var queueManager = new QueueManager(_messagePumpOptions.ConnectionString, queueConfig.QueueName, queueConfig.OversizedMessagesBlobStorageName, true, _messagePumpOptions.QueueClientOptions, _messagePumpOptions.BlobClientOptions);
×
130
                        var poisonQueueManager = string.IsNullOrEmpty(queueConfig.PoisonQueueName) ? null : new QueueManager(_messagePumpOptions.ConnectionString, queueConfig.PoisonQueueName, queueConfig.OversizedMessagesBlobStorageName, true, _messagePumpOptions.QueueClientOptions, _messagePumpOptions.BlobClientOptions);
×
131

132
                        AddQueue(queueManager, poisonQueueManager, queueConfig.VisibilityTimeout, queueConfig.MaxDequeueCount);
×
133
                }
×
134

135
                /// <summary>
136
                /// Remove a queue from the list of queues that are monitored.
137
                /// </summary>
138
                /// <param name="queueName">The name of the queue.</param>
139
                public void RemoveQueue(string queueName)
140
                {
141
                        /*
142
                         * Do not remove from _queuManagers because there could messages still in the memory queue that need to be processed
143
                         * _queueManagers.TryRemove(queueName, out _);
144
                         */
145

146
                        _queueNames.RemoveItem(queueName);
×
147
                }
×
148

149
                /// <summary>
150
                /// Add queues that meet the specified RegEx pattern.
151
                /// </summary>
152
                /// <remarks>
153
                /// All the queues that match the specified pattern will share the same poison queue if you specify the name of the poison queue.
154
                /// If you omit this value, each queue will get their own poison queue.
155
                ///
156
                /// Similarly, all the queues that match the specified pattern will share the same oversize messages storage if you specify the name of the blob storage container.
157
                /// If you omit this value, each queue will get their own blob container.
158
                /// </remarks>
159
                /// <param name="queueNamePattern">The RegEx pattern.</param>
160
                /// <param name="poisonQueueName">Optional. The name of the queue where poison messages are automatically moved.</param>
161
                /// <param name="visibilityTimeout">Optional. Specifies the visibility timeout value. The default value is 30 seconds.</param>
162
                /// <param name="maxDequeueCount">Optional. A nonzero integer value that specifies the number of time we try to process a message before giving up and declaring the message to be "poison". The default value is 3.</param>
163
                /// <param name="oversizeMessagesBlobStorageName">Name of the blob storage where messages that exceed the maximum size for a queue message are stored.</param>
164
                /// <param name="cancellationToken">The cancellation token.</param>
165
                /// <returns>The async task.</returns>
166
                public async Task AddQueuesByPatternAsync(string queueNamePattern, string poisonQueueName = null, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, string oversizeMessagesBlobStorageName = null, CancellationToken cancellationToken = default)
167
                {
168
                        var regex = new Regex(queueNamePattern, RegexOptions.Compiled);
169
                        var queueServiceClient = new QueueServiceClient(_messagePumpOptions.ConnectionString);
170
                        var response = queueServiceClient.GetQueuesAsync(QueueTraits.None, null, cancellationToken);
171
                        await foreach (Page<QueueItem> queues in response.AsPages())
172
                        {
173
                                foreach (var queue in queues.Values)
174
                                {
175
                                        if (regex.IsMatch(queue.Name))
176
                                        {
177
                                                AddQueue(new QueueConfig(queue.Name, poisonQueueName, visibilityTimeout, maxDequeueCount, oversizeMessagesBlobStorageName));
178
                                        }
179
                                }
180
                        }
181
                }
182

183
                /// <summary>
184
                /// Gets the names of the queues currently being monitored.
185
                /// </summary>
186
                /// <returns>The read only list of queue names.</returns>
187
                public IReadOnlyList<string> GetMonitoredQueueNames() => ImmutableList.CreateRange(_queueManagers.Keys);
×
188

189
                /// <summary>
190
                /// Starts the message pump.
191
                /// </summary>
192
                /// <param name="cancellationToken">The cancellation token.</param>
193
                /// <exception cref="System.ArgumentNullException">OnMessage.</exception>
194
                /// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
195
                public async Task StartAsync(CancellationToken cancellationToken)
196
                {
197
                        if (OnMessage == null) throw new ArgumentNullException(nameof(OnMessage));
198
                        await ProcessMessagesAsync(cancellationToken).ConfigureAwait(false);
199
                }
200

201
                #endregion
202

203
                #region PRIVATE METHODS
204

205
                // This internal method is primarily for unit testing purposes. It allows me to inject mocked queue managers
206
                internal void AddQueue(QueueManager queueManager, QueueManager poisonQueueManager, TimeSpan? visibilityTimeout, int maxDequeueCount)
207
                {
208
                        if (queueManager == null) throw new ArgumentNullException(nameof(queueManager));
16✔
209
                        if (string.IsNullOrEmpty(queueManager.QueueName)) throw new ArgumentNullException(nameof(queueManager.QueueName));
16✔
210
                        if (maxDequeueCount < 1) throw new ArgumentOutOfRangeException(nameof(maxDequeueCount), "Number of retries must be greater than zero.");
18✔
211

212
                        var queueConfig = new QueueConfig(queueManager.QueueName, poisonQueueManager?.QueueName, visibilityTimeout, maxDequeueCount);
14✔
213

214
                        _queueManagers.AddOrUpdate(
14✔
215
                                queueManager.QueueName,
14✔
216
                                (queueName) => (queueConfig, queueManager, poisonQueueManager, DateTime.MinValue, TimeSpan.Zero),
14✔
217
                                (queueName, oldConfig) => (queueConfig, queueManager, poisonQueueManager, oldConfig.LastFetched, oldConfig.FetchDelay));
14✔
218
                        _queueNames.AddItem(queueManager.QueueName);
14✔
219
                }
14✔
220

221
                private void InitDefaultActions()
222
                {
223
                        OnError = (queueName, message, exception, isPoison) => _logger?.LogError(exception, "An error occured when processing a message in {queueName}", queueName);
18✔
224
                }
18✔
225

226
                private IMetrics TurnOffMetrics()
227
                {
228
                        var metricsTurnedOff = new MetricsBuilder();
18✔
229
                        metricsTurnedOff.Configuration.Configure(new MetricsOptions()
18✔
230
                        {
18✔
231
                                Enabled = false,
18✔
232
                                ReportingEnabled = false
18✔
233
                        });
18✔
234
                        return metricsTurnedOff.Build();
18✔
235
                }
236

237
                private async Task ProcessMessagesAsync(CancellationToken cancellationToken)
238
                {
239
                        var runningTasks = new ConcurrentDictionary<Task, Task>();
240
                        var semaphore = new SemaphoreSlim(_messagePumpOptions.ConcurrentTasks, _messagePumpOptions.ConcurrentTasks);
241
                        var channelOptions = new UnboundedChannelOptions() { SingleReader = false, SingleWriter = true };
242
                        var channel = Channel.CreateUnbounded<(string QueueName, CloudMessage Message)>(channelOptions);
243
                        var channelCompleted = false;
244

245
                        // Define the task that fetches messages from the Azure queue
246
                        RecurrentCancellableTask.StartNew(
247
                                async () =>
248
                                {
249
                                        // Fetch messages from Azure when the number of items in the concurrent queue falls below an "acceptable" level.
250
                                        if (!cancellationToken.IsCancellationRequested &&
251
                                                !channelCompleted &&
252
                                                channel.Reader.Count <= _messagePumpOptions.ConcurrentTasks / 2)
253
                                        {
254
                                                await foreach (var message in FetchMessages(cancellationToken).ConfigureAwait(false))
255
                                                {
256
                                                        await channel.Writer.WriteAsync(message).ConfigureAwait(false);
257
                                                }
258
                                        }
259

260
                                        // Mark the channel as "complete" which means that no more messages will be written to it
261
                                        else if (!channelCompleted)
262
                                        {
263
                                                channelCompleted = channel.Writer.TryComplete();
264
                                        }
265
                                },
266
                                _messagePumpOptions.FetchMessagesInterval,
267
                                cancellationToken,
268
                                TaskCreationOptions.LongRunning);
269

270
                        // Define the task that checks how many messages are queued in Azure
271
                        if (!_metricsTurnedOff && _messagePumpOptions.CountAzureMessagesInterval > TimeSpan.Zero)
272
                        {
273
                                RecurrentCancellableTask.StartNew(
274
                                        async () =>
275
                                        {
276
                                                var count = 0;
277
                                                foreach (var kvp in _queueManagers)
278
                                                {
279
                                                        var queueName = kvp.Key;
280
                                                        (var queueConfig, var queueManager, var poisonQueueManager, var lastFetched, var fetchDelay) = kvp.Value;
281

282
                                                        try
283
                                                        {
284
                                                                var properties = await queueManager.GetPropertiesAsync(cancellationToken).ConfigureAwait(false);
285

286
                                                                count += properties.ApproximateMessagesCount;
287
                                                        }
288
                                                        catch (Exception e) when (e is TaskCanceledException || e is OperationCanceledException)
289
                                                        {
290
                                                                // The message pump is shutting down.
291
                                                                // This exception can be safely ignored.
292
                                                        }
293
                                                        catch (RequestFailedException rfe) when (rfe.ErrorCode == "QueueNotFound")
294
                                                        {
295
                                                                // The queue has been deleted
296
                                                                RemoveQueue(queueName);
297
                                                        }
298
                                                        catch (Exception e)
299
                                                        {
300
                                                                _logger?.LogError(e.GetBaseException(), "An error occured while checking how many message are waiting in Azure. The error was caught and ignored.");
301
                                                        }
302
                                                }
303

304
                                                _metrics.Measure.Gauge.SetValue(Metrics.QueuedCloudMessagesGauge, count);
305
                                        },
306
                                        _messagePumpOptions.CountAzureMessagesInterval,
307
                                        cancellationToken,
308
                                        TaskCreationOptions.LongRunning);
309
                        }
310

311
                        // Define the task that checks how many messages are queued in memory
312
                        if (!_metricsTurnedOff && _messagePumpOptions.CountMemoryMessagesInterval > TimeSpan.Zero)
313
                        {
314
                                RecurrentCancellableTask.StartNew(
315
                                        () =>
316
                                        {
317
                                                try
318
                                                {
319
                                                        _metrics.Measure.Gauge.SetValue(Metrics.QueuedMemoryMessagesGauge, channel.Reader.Count);
320
                                                }
321
                                                catch (Exception e)
322
                                                {
323
                                                        _logger?.LogError(e.GetBaseException(), "An error occured while checking how many message are waiting in the memory queue. The error was caught and ignored.");
324
                                                }
325

326
                                                return Task.CompletedTask;
327
                                        },
328
                                        TimeSpan.FromMilliseconds(5000),
329
                                        cancellationToken,
330
                                        TaskCreationOptions.LongRunning);
331
                        }
332

333
                        // Define the task pump
334
                        var pumpTask = Task.Run(async () =>
335
                        {
336
                                // We process messages until cancellation is requested.
337
                                // When cancellation is requested, we continue processing messages until the memory queue is drained.
338
                                while (!cancellationToken.IsCancellationRequested || channel.Reader.Count > 0)
339
                                {
340
                                        await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
341

342
                                        // Retrieved the next message from the queue and process it
343
                                        var runningTask = Task.Run(
344
                                                async () =>
345
                                                {
346
                                                        var messageProcessed = false;
347

348
                                                        if (channel.Reader.TryRead(out (string QueueName, CloudMessage Message) result))
349
                                                        {
350
                                                                if (_queueManagers.TryGetValue(result.QueueName, out (QueueConfig Config, QueueManager QueueManager, QueueManager PoisonQueueManager, DateTime LastFetched, TimeSpan FetchDelay) queueInfo))
351
                                                                {
352
                                                                        using (_metrics.Measure.Timer.Time(Metrics.MessageProcessingTimer))
353
                                                                        {
354
                                                                                try
355
                                                                                {
356
                                                                                        // Process the message
357
                                                                                        OnMessage?.Invoke(result.QueueName, result.Message, cancellationToken);
358

359
                                                                                        // Delete the processed message from the queue
360
                                                                                        // PLEASE NOTE: we use "CancellationToken.None" to ensure a processed message is deleted from the queue even when the message pump is shutting down
361
                                                                                        await queueInfo.QueueManager.DeleteMessageAsync(result.Message, CancellationToken.None).ConfigureAwait(false);
362
                                                                                }
363
                                                                                catch (Exception ex)
364
                                                                                {
365
                                                                                        var isPoison = result.Message.DequeueCount >= queueInfo.Config.MaxDequeueCount;
366

367
                                                                                        try
368
                                                                                        {
369
                                                                                                OnError?.Invoke(result.QueueName, result.Message, ex, isPoison);
370
                                                                                        }
371
                                                                                        catch (Exception e)
372
                                                                                        {
373
                                                                                                _logger?.LogError(e.GetBaseException(), "An error occured when handling an exception for {queueName}. The error was caught and ignored.", result.QueueName);
374
                                                                                        }
375

376
                                                                                        if (isPoison)
377
                                                                                        {
378
                                                                                                // PLEASE NOTE: we use "CancellationToken.None" to ensure a processed message is deleted from the queue and moved to poison queue even when the message pump is shutting down
379
                                                                                                if (queueInfo.PoisonQueueManager != null)
380
                                                                                                {
381
                                                                                                        result.Message.Metadata["PoisonExceptionMessage"] = ex.GetBaseException().Message;
382
                                                                                                        result.Message.Metadata["PoisonExceptionDetails"] = ex.GetBaseException().ToString();
383
                                                                                                        result.Message.Metadata["PoisonOriginalQueue"] = queueInfo.QueueManager.QueueName;
384

385
                                                                                                        await queueInfo.PoisonQueueManager.AddMessageAsync(result.Message.Content, result.Message.Metadata, null, null, CancellationToken.None).ConfigureAwait(false);
386
                                                                                                }
387

388
                                                                                                await queueInfo.QueueManager.DeleteMessageAsync(result.Message, CancellationToken.None).ConfigureAwait(false);
389
                                                                                        }
390
                                                                                }
391

392
                                                                                messageProcessed = true;
393
                                                                        }
394
                                                                }
395
                                                                else
396
                                                                {
397
                                                                        _queueNames.RemoveItem(result.QueueName);
398
                                                                }
399
                                                        }
400

401
                                                        // Increment the counter if we processed a message
402
                                                        if (messageProcessed) _metrics.Measure.Counter.Increment(Metrics.MessagesProcessedCounter);
403

404
                                                        // Return a value indicating whether we processed a message or not
405
                                                        return messageProcessed;
406
                                                },
407
                                                CancellationToken.None);
408

409
                                        // Add the task to the dictionary of tasks (allows us to keep track of the running tasks)
410
                                        runningTasks.TryAdd(runningTask, runningTask);
411

412
                                        // Complete the task
413
                                        runningTask.ContinueWith(
414
                                                t =>
415
                                                {
416
                                                        semaphore.Release();
417
                                                        runningTasks.TryRemove(t, out Task _);
418
                                                },
419
                                                TaskContinuationOptions.ExecuteSynchronously)
420
                                        .IgnoreAwait();
421
                                }
422
                        });
423

424
                        // Run the task pump until canceled
425
                        await pumpTask.UntilCancelled().ConfigureAwait(false);
426

427
                        // Task pump has been canceled, wait for the currently running tasks to complete
428
                        await Task.WhenAll(runningTasks.Values).UntilCancelled().ConfigureAwait(false);
429
                }
430

431
                private async IAsyncEnumerable<(string QueueName, CloudMessage Message)> FetchMessages([EnumeratorCancellation] CancellationToken cancellationToken)
432
                {
433
                        var messageCount = 0;
434

435
                        if (_queueNames.Count == 0)
436
                        {
437
                                _logger?.LogTrace("There are no queues being monitored. Therefore no messages could be fetched.");
438
                                yield break;
439
                        }
440

441
                        var originalQueue = _queueNames.Current;
442

443
                        using (_metrics.Measure.Timer.Time(Metrics.MessagesFetchingTimer))
444
                        {
445
                                do
446
                                {
447
                                        var queueName = _queueNames.MoveToNextItem();
448
                                        originalQueue ??= queueName; // This is important because originalQueue will be null the very first time we fetch messages
449

450
                                        if (_queueManagers.TryGetValue(queueName, out (QueueConfig Config, QueueManager QueueManager, QueueManager PoisonQueueManager, DateTime LastFetched, TimeSpan FetchDelay) queueInfo))
451
                                        {
452
                                                if (!cancellationToken.IsCancellationRequested && queueInfo.LastFetched.Add(queueInfo.FetchDelay) < DateTime.UtcNow)
453
                                                {
454
                                                        IEnumerable<CloudMessage> messages = null;
455

456
                                                        try
457
                                                        {
458
                                                                messages = await queueInfo.QueueManager.GetMessagesAsync(_messagePumpOptions.ConcurrentTasks, queueInfo.Config.VisibilityTimeout, cancellationToken).ConfigureAwait(false);
459
                                                        }
460
                                                        catch (Exception e) when (e is TaskCanceledException || e is OperationCanceledException)
461
                                                        {
462
                                                                // The message pump is shutting down.
463
                                                                // This exception can be safely ignored.
464
                                                        }
465
                                                        catch (RequestFailedException rfe) when (rfe.ErrorCode == "QueueNotFound")
466
                                                        {
467
                                                                // The queue has been deleted
468
                                                                RemoveQueue(queueName);
469
                                                        }
470
                                                        catch (Exception e)
471
                                                        {
472
                                                                _logger?.LogError(e.GetBaseException(), "An error occured while fetching messages from {queueName}. The error was caught and ignored.", queueName);
473
                                                        }
474

475
                                                        if (messages != null && messages.Any())
476
                                                        {
477
                                                                var messagesCount = messages.Count();
478
                                                                _logger?.LogTrace("Fetched {messagesCount} message(s) in {queueName}.", messagesCount, queueName);
479

480
                                                                foreach (var message in messages)
481
                                                                {
482
                                                                        Interlocked.Increment(ref messageCount);
483
                                                                        yield return (queueName, message);
484
                                                                }
485

486
                                                                // Reset the Fetch delay to zero to indicate that we can fetch more messages from this queue as soon as possible
487
                                                                _queueManagers[queueName] = (queueInfo.Config, queueInfo.QueueManager, queueInfo.PoisonQueueManager, DateTime.UtcNow, TimeSpan.Zero);
488
                                                        }
489
                                                        else
490
                                                        {
491
                                                                _logger?.LogTrace("There are no messages in {queueName}.", queueName);
492
                                                                _metrics.Measure.Counter.Increment(Metrics.QueueEmptyCounter);
493

494
                                                                // Set a "reasonable" fetch delay to ensure we don't query an empty queue too often
495
                                                                var delay = queueInfo.FetchDelay.Add(_messagePumpOptions.EmptyQueueFetchDelay);
496
                                                                if (delay > _messagePumpOptions.EmptyQueueMaxFetchDelay) delay = _messagePumpOptions.EmptyQueueMaxFetchDelay;
497

498
                                                                _queueManagers[queueName] = (queueInfo.Config, queueInfo.QueueManager, queueInfo.PoisonQueueManager, DateTime.UtcNow, delay);
499
                                                        }
500
                                                }
501
                                        }
502
                                        else
503
                                        {
504
                                                _queueNames.RemoveItem(queueName);
505
                                        }
506
                                }
507

508
                                // Stop when we either retrieved the desired number of messages OR we have looped through all the queues
509
                                while (messageCount < (_messagePumpOptions.ConcurrentTasks * 2) && originalQueue != _queueNames.Next);
510
                        }
511

512
                        if (messageCount == 0)
513
                        {
514
                                _logger?.LogTrace("All tenant queues are empty, no messages fetched.");
515
                                try
516
                                {
517
                                        // All queues are empty
518
                                        _metrics.Measure.Counter.Increment(Metrics.AllQueuesEmptyCounter);
519
                                        OnEmpty?.Invoke(cancellationToken);
520
                                }
521
                                catch (Exception e) when (e is TaskCanceledException || e is OperationCanceledException)
522
                                {
523
                                        // The message pump is shutting down.
524
                                        // This exception can be safely ignored.
525
                                }
526
                                catch (Exception e)
527
                                {
528
                                        _logger?.LogError(e.GetBaseException(), "An error occured when handling empty queues. The error was caught and ignored.");
529
                                }
530
                        }
531
                }
532

533
                #endregion
534
        }
535
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc