• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Jericho / Picton.Messaging / 189

29 Jan 2024 11:11PM UTC coverage: 43.678% (+17.7%) from 26.023%
189

push

appveyor

Jericho
Merge branch 'release/9.0.0'

152 of 348 relevant lines covered (43.68%)

15353.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/Source/Picton.Messaging/AsyncMultiTenantMessagePumpWithHandlers.cs
1
using App.Metrics;
2
using Microsoft.Extensions.Logging;
3
using Picton.Messaging.Utilities;
4
using System;
5
using System.Collections.Generic;
6
using System.Threading;
7
using System.Threading.Tasks;
8

9
namespace Picton.Messaging
10
{
11
        /// <summary>
12
        /// High performance message processor (also known as a message "pump") for Azure storage queues.
13
        /// Designed to monitor an Azure storage queue and process the message as quickly and efficiently as possible.
14
        /// </summary>
15
        public class AsyncMultiTenantMessagePumpWithHandlers
16
        {
17
                #region FIELDS
18

19
                private static IDictionary<Type, Type[]> _messageHandlers;
20

21
                private readonly string _queueNamePrefix;
22
                private readonly ILogger _logger;
23

24
                private readonly AsyncMultiTenantMessagePump _messagePump;
25

26
                #endregion
27

28
                #region PROPERTIES
29

30
                /// <summary>
31
                /// Gets or sets the logic to execute when an error occurs.
32
                /// </summary>
33
                /// <example>
34
                /// <code>
35
                /// OnError = (message, exception, isPoison) => Trace.TraceError("An error occured: {0}", exception);
36
                /// </code>
37
                /// </example>
38
                /// <remarks>
39
                /// When isPoison is set to true, you should copy this message to a poison queue because it will be deleted from the original queue.
40
                /// </remarks>
41
                public Action<string, CloudMessage, Exception, bool> OnError { get; set; }
42

43
                /// <summary>
44
                /// Gets or sets the logic to execute when all queues are empty.
45
                /// </summary>
46
                /// <example>
47
                /// <code>
48
                /// OnEmpty = cancellationToken => Task.Delay(2500, cancellationToken).Wait();
49
                /// </code>
50
                /// </example>
51
                /// <remarks>
52
                /// If this property is not set, the default logic is to do nothing.
53
                /// </remarks>
54
                public Action<CancellationToken> OnEmpty { get; set; }
55

56
                #endregion
57

58
                #region CONSTRUCTOR
59

60
                /// <summary>
61
                /// Initializes a new instance of the <see cref="AsyncMultiTenantMessagePumpWithHandlers"/> class.
62
                /// </summary>
63
                /// <param name="options">Options for the mesage pump.</param>
64
                /// <param name="queueNamePrefix">The common prefix in the naming convention.</param>
65
                /// <param name="discoverQueuesInterval">The frequency we check for queues in the Azure storage account matching the naming convention. Default is 30 seconds.</param>
66
                /// <param name="visibilityTimeout">The visibility timeout.</param>
67
                /// <param name="maxDequeueCount">The maximum dequeue count.</param>
68
                /// <param name="logger">The logger.</param>
69
                /// <param name="metrics">The system where metrics are published.</param>
70
                public AsyncMultiTenantMessagePumpWithHandlers(MessagePumpOptions options, string queueNamePrefix, TimeSpan? discoverQueuesInterval = null, TimeSpan? visibilityTimeout = null, int maxDequeueCount = 3, ILogger logger = null, IMetrics metrics = null)
71
                {
72
                        _messageHandlers = MessageHandlersDiscoverer.GetMessageHandlers(logger);
×
73

74
                        _queueNamePrefix = queueNamePrefix;
×
75
                        _logger = logger;
×
76

77
                        _messagePump = new AsyncMultiTenantMessagePump(options, queueNamePrefix, discoverQueuesInterval, visibilityTimeout, maxDequeueCount, logger, metrics);
×
78
                }
×
79

80
                #endregion
81

82
                #region PUBLIC METHODS
83

84
                /// <summary>
85
                /// Starts the message pump.
86
                /// </summary>
87
                /// <param name="cancellationToken">The cancellation token.</param>
88
                /// <exception cref="System.ArgumentNullException">OnMessage.</exception>
89
                /// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
90
                public Task StartAsync(CancellationToken cancellationToken)
91
                {
92
                        _messagePump.OnEmpty = OnEmpty;
×
93
                        _messagePump.OnError = (queueName, message, exception, isPoison) => OnError?.Invoke(queueName.TrimStart(_queueNamePrefix), message, exception, isPoison);
×
94
                        _messagePump.OnMessage = (queueName, message, cancellationToken) =>
×
95
                        {
×
96
                                var contentType = message.Content.GetType();
×
97

×
98
                                if (!_messageHandlers.TryGetValue(contentType, out Type[] handlers))
×
99
                                {
×
100
                                        throw new Exception($"Received a message of type {contentType.FullName} but could not find a class implementing IMessageHandler<{contentType.FullName}>");
×
101
                                }
×
102

×
103
                                foreach (var handlerType in handlers)
×
104
                                {
×
105
                                        object handler = null;
×
106
                                        if (handlerType.GetConstructor([typeof(ILogger)]) != null)
×
107
                                        {
×
108
                                                handler = Activator.CreateInstance(handlerType, [(object)_logger]);
×
109
                                        }
×
110
                                        else
×
111
                                        {
×
112
                                                handler = Activator.CreateInstance(handlerType);
×
113
                                        }
×
114

×
115
                                        var handlerMethod = handlerType.GetMethod("Handle", [contentType]);
×
116
                                        handlerMethod.Invoke(handler, [message.Content]);
×
117
                                }
×
118
                        };
×
119

120
                        return _messagePump.StartAsync(cancellationToken);
×
121
                }
122

123
                #endregion
124
        }
125
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc