• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Jericho / Picton.Messaging / 189

29 Jan 2024 11:11PM UTC coverage: 43.678% (+17.7%) from 26.023%
189

push

appveyor

Jericho
Merge branch 'release/9.0.0'

152 of 348 relevant lines covered (43.68%)

15353.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/Source/Picton.Messaging/AsyncMultiTenantMessagePump.cs
1
using App.Metrics;
2
using Azure;
3
using Azure.Storage.Queues;
4
using Azure.Storage.Queues.Models;
5
using Microsoft.Extensions.Logging;
6
using Picton.Messaging.Utilities;
7
using System;
8
using System.Threading;
9
using System.Threading.Tasks;
10

11
namespace Picton.Messaging
12
{
13
        /// <summary>
14
        /// High performance message processor (also known as a message "pump") for Azure storage queues.
15
        ///
16
        /// Designed to monitor multiple Azure storage queues that follow the following naming convention:
17
        /// a common prefix followed by a unique tenant identifier. For example, if the prefix is "myqueue",
18
        /// this message pump will monitor queues such as "myqueue001", myqueue002" and "myqueueabc".
19
        ///
20
        /// Please note that the message pump specifically ignores queue that follow the following naming convention:
21
        /// - the common prefix without a postfix. For example "myqueue". Notice the absence of a tenant identifier
22
        /// after the "myqueue" part in the name.
23
        /// - The common prefix followed by "-poison". For example "myqueue-poison".
24
        ///
25
        /// Furthermore, the list of queues matching the naming convention is refreshed at regular interval in order
26
        /// to discover new tenant queues that might have been created in the Azure storage account.
27
        /// </summary>
28
        public class AsyncMultiTenantMessagePump
29
        {
30
                #region FIELDS
31

32
                private static readonly TimeSpan _defaultDiscoverQueuesInterval = TimeSpan.FromSeconds(30);
×
33

34
                private readonly MessagePumpOptions _messagePumpOptions;
35
                private readonly string _queueNamePrefix;
36
                private readonly TimeSpan _discoverQueuesInterval;
37
                private readonly TimeSpan? _visibilityTimeout;
38
                private readonly int _maxDequeueCount;
39
                private readonly ILogger _logger;
40
                private readonly AsyncMessagePump _messagePump;
41

42
                #endregion
43

44
                #region PROPERTIES
45

46
                /// <summary>
47
                /// Gets or sets the logic to execute when a message is retrieved from the queue.
48
                /// </summary>
49
                /// <remarks>
50
                /// If exception is thrown when calling OnMessage, it will regard this queue message as failed.
51
                /// </remarks>
52
                public Action<string, CloudMessage, CancellationToken> OnMessage { get; set; }
53

54
                /// <summary>
55
                /// Gets or sets the logic to execute when an error occurs.
56
                /// </summary>
57
                /// <example>
58
                /// <code>
59
                /// OnError = (message, exception, isPoison) => Trace.TraceError("An error occured: {0}", exception);
60
                /// </code>
61
                /// </example>
62
                /// <remarks>
63
                /// When isPoison is set to true, you should copy this message to a poison queue because it will be deleted from the original queue.
64
                /// </remarks>
65
                public Action<string, CloudMessage, Exception, bool> OnError { get; set; }
66

67
                /// <summary>
68
                /// Gets or sets the logic to execute when all queues are empty.
69
                /// </summary>
70
                /// <example>
71
                /// <code>
72
                /// OnEmpty = cancellationToken => Task.Delay(2500, cancellationToken).Wait();
73
                /// </code>
74
                /// </example>
75
                /// <remarks>
76
                /// If this property is not set, the default logic is to do nothing.
77
                /// </remarks>
78
                public Action<CancellationToken> OnEmpty { get; set; }
79

80
                #endregion
81

82
                #region CONSTRUCTOR
83

84
                /// <summary>
85
                /// Initializes a new instance of the <see cref="AsyncMultiTenantMessagePump"/> class.
86
                /// </summary>
87
                /// <param name="options">Options for the mesage pump.</param>
88
                /// <param name="queueNamePrefix">The common prefix in the naming convention.</param>
89
                /// <param name="discoverQueuesInterval">The frequency we check for queues in the Azure storage account matching the naming convention. Default is 30 seconds.</param>
90
                /// <param name="visibilityTimeout">The visibility timeout.</param>
91
                /// <param name="maxDequeueCount">The maximum dequeue count.</param>
92
                /// <param name="logger">The logger.</param>
93
                /// <param name="metrics">The system where metrics are published.</param>
94
                public AsyncMultiTenantMessagePump(MessagePumpOptions options, string queueNamePrefix, TimeSpan? discoverQueuesInterval = null, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, ILogger logger = null, IMetrics metrics = null)
95
                {
96
                        if (discoverQueuesInterval != null && discoverQueuesInterval <= TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(discoverQueuesInterval), "The 'discover queues' interval must be greater than zero.");
×
97

98
                        _messagePumpOptions = options;
×
99
                        _queueNamePrefix = queueNamePrefix;
×
100
                        _discoverQueuesInterval = discoverQueuesInterval ?? _defaultDiscoverQueuesInterval;
×
101
                        _visibilityTimeout = visibilityTimeout;
×
102
                        _maxDequeueCount = maxDequeueCount;
×
103
                        _logger = logger;
×
104
                        _messagePump = new AsyncMessagePump(options, logger, metrics);
×
105
                }
×
106

107
                #endregion
108

109
                #region PUBLIC METHODS
110

111
                /// <summary>
112
                /// Starts the message pump.
113
                /// </summary>
114
                /// <param name="cancellationToken">The cancellation token.</param>
115
                /// <exception cref="System.ArgumentNullException">OnMessage.</exception>
116
                /// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
117
                public async Task StartAsync(CancellationToken cancellationToken)
118
                {
119
                        if (OnMessage == null) throw new ArgumentNullException(nameof(OnMessage));
120

121
                        _messagePump.OnEmpty = OnEmpty;
122
                        _messagePump.OnError = (queueName, message, exception, isPoison) => OnError?.Invoke(queueName.TrimStart(_queueNamePrefix), message, exception, isPoison);
123
                        _messagePump.OnMessage = (queueName, message, cancellationToken) => OnMessage?.Invoke(queueName.TrimStart(_queueNamePrefix), message, cancellationToken);
124

125
                        // Define the task that discovers queues that follow the naming convention
126
                        RecurrentCancellableTask.StartNew(
127
                                async () =>
128
                                {
129
                                        try
130
                                        {
131
                                                var queueServiceClient = new QueueServiceClient(_messagePumpOptions.ConnectionString);
132
                                                var response = queueServiceClient.GetQueuesAsync(QueueTraits.None, _queueNamePrefix, cancellationToken);
133
                                                await foreach (Page<QueueItem> queues in response.AsPages())
134
                                                {
135
                                                        foreach (var queue in queues.Values)
136
                                                        {
137
                                                                if (!queue.Name.Equals(_queueNamePrefix, StringComparison.OrdinalIgnoreCase) &&
138
                                                                        !queue.Name.Equals($"{_queueNamePrefix}-poison", StringComparison.OrdinalIgnoreCase))
139
                                                                {
140
                                                                        // AddQueue will make sure to add the queue only if it's not already in the round-robin list of queues.
141
                                                                        _messagePump.AddQueue(
142
                                                                                queue.Name,
143
                                                                                $"{_queueNamePrefix}-poison", // All tenants share the same "poison" queue
144
                                                                                _visibilityTimeout,
145
                                                                                _maxDequeueCount,
146
                                                                                $"{_queueNamePrefix}-oversize-messages"); // All tenants share the same "oversize messages" blob storage
147
                                                                }
148
                                                        }
149
                                                }
150

151
                                                // Please note there is no need to remove queues that no longer exist from the message
152
                                                // pump round-robin list. The reason is: message pump will get a RequestFailedException
153
                                                // with ErrorCode == "QueueNotFound" next time the message pump tries to query those
154
                                                // queues and it will automatically remove them at that time.
155
                                        }
156
                                        catch (Exception e) when (e is TaskCanceledException || e is OperationCanceledException)
157
                                        {
158
                                                // The message pump is shutting down.
159
                                                // This exception can be safely ignored.
160
                                        }
161
                                        catch (Exception e)
162
                                        {
163
                                                _logger?.LogError(e.GetBaseException(), "An error occured while fetching the Azure queues that match the naming convention. The error was caught and ignored.");
164
                                        }
165
                                },
166
                                _discoverQueuesInterval,
167
                                cancellationToken,
168
                                TaskCreationOptions.LongRunning);
169

170
                        // Brief pause to ensure the task defined above runs at least once before we start processing messages
171
                        await Task.Delay(500, cancellationToken).ConfigureAwait(false);
172

173
                        await _messagePump.StartAsync(cancellationToken).ConfigureAwait(false);
174
                }
175

176
                #endregion
177
        }
178
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc