• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

KSP-CKAN / CKAN / 15833572481

23 Jun 2025 07:42PM UTC coverage: 42.239% (+0.1%) from 42.099%
15833572481

push

github

HebaruSan
Merge #4398 Exception handling revamp, parallel multi-host inflation

3882 of 9479 branches covered (40.95%)

Branch coverage included in aggregate %.

48 of 137 new or added lines in 30 files covered. (35.04%)

12 existing lines in 6 files now uncovered.

8334 of 19442 relevant lines covered (42.87%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.95
/Netkan/Processors/QueueHandler.cs
1
using System;
2
using System.Collections.Generic;
3
using System.Globalization;
4
using System.IO;
5
using System.Linq;
6
using System.Text;
7
using System.Threading;
8

9
using Amazon.SQS;
10
using Amazon.SQS.Model;
11
using log4net;
12
using log4net.Core;
13
using log4net.Filter;
14
using log4net.Repository.Hierarchy;
15
using Newtonsoft.Json;
16

17
using CKAN.Versioning;
18
using CKAN.NetKAN.Transformers;
19
using CKAN.NetKAN.Model;
20
using CKAN.Extensions;
21
using CKAN.Games;
22

23
namespace CKAN.NetKAN.Processors
24
{
25
    public class QueueHandler
26
    {
27
        public QueueHandler(string  inputQueueName,
×
28
                            string  outputQueueName,
29
                            string? cacheDir,
30
                            string? outputDir,
31
                            bool    overwriteCache,
32
                            string? githubToken,
33
                            string? gitlabToken,
34
                            string? userAgent,
35
                            bool?   prerelease,
36
                            IGame   game)
37
        {
×
38
            this.outputDir = outputDir;
×
39
            warningAppender = GetQueueLogAppender();
×
40
            (LogManager.GetRepository() as Hierarchy)?.Root.AddAppender(warningAppender);
×
41
            this.game = game;
×
42

43
            log.Debug("Initializing SQS queue handler");
×
44
            inflator = new Inflator(cacheDir, overwriteCache, githubToken, gitlabToken, userAgent, prerelease, game);
×
45

46
            inputQueueURL  = getQueueUrl(inputQueueName);
×
47
            outputQueueURL = getQueueUrl(outputQueueName);
×
48
            log.DebugFormat("Queue URLs: {0}, {1}", inputQueueURL, outputQueueURL);
×
49
        }
×
50

51
        ~QueueHandler()
52
        {
×
53
            (LogManager.GetRepository() as Hierarchy)?.Root.RemoveAppender(warningAppender);
×
54
        }
×
55

56
        public void Process()
57
        {
×
58
            while (true)
×
59
            {
×
60
                // 10 messages, 30 minutes to allow time to handle them all
61
                handleMessages(inputQueueURL, 10, 30);
×
62
            }
×
63
        }
64

65
        private static QueueAppender GetQueueLogAppender()
66
        {
×
67
            var qap = new QueueAppender()
×
68
            {
69
                Name = "QueueAppender",
70
            };
71
            qap.AddFilter(new LevelMatchFilter()
×
72
            {
73
                LevelToMatch  = Level.Warn,
74
                AcceptOnMatch = true,
75
            });
76
            qap.AddFilter(new DenyAllFilter());
×
77
            return qap;
×
78
        }
×
79

80
        private string getQueueUrl(string name)
81
        {
×
82
            log.DebugFormat("Looking up URL for queue {0}", name);
×
83
            return client.GetQueueUrlAsync(new GetQueueUrlRequest() { QueueName = name }).Result.QueueUrl;
×
84
        }
×
85

86
        private void handleMessages(string url, int howMany, int timeoutMinutes)
87
        {
×
88
            log.DebugFormat("Looking for messages from {0}", url);
×
89
            var resp = client.ReceiveMessageAsync(new ReceiveMessageRequest()
×
90
            {
91
                QueueUrl              = url,
92
                MaxNumberOfMessages   = howMany,
93
                VisibilityTimeout     = (int)TimeSpan.FromMinutes(timeoutMinutes).TotalSeconds,
94
                MessageAttributeNames = new List<string>() { "All" },
95
            }).Result;
96
            if (resp.Messages.Count == 0)
×
97
            {
×
98
                log.Debug("No metadata in queue");
×
99
            }
×
100
            else
101
            {
×
102
                try
103
                {
×
104
                    // Reset the ids between batches
105
                    responseId = 0;
×
106
                    // Might be >10 if Releases>1
107
                    var responses = resp.Messages.SelectMany(Inflate).ToList();
×
108
                    for (int i = 0; i < responses.Count; i += howMany)
×
109
                    {
×
110
                        client.SendMessageBatchAsync(new SendMessageBatchRequest()
×
111
                        {
112
                            QueueUrl = outputQueueURL,
113
                            Entries  = responses.GetRange(i, Math.Min(howMany, responses.Count - i)),
114
                        });
115
                    }
×
116
                }
×
117
                catch (Exception e)
×
118
                {
×
119
                    log.ErrorFormat("Send failed: {0}\r\n{1}", e.Message, e.StackTrace);
×
120
                }
×
121
                try
122
                {
×
123
                    log.Debug("Deleting messages");
×
124
                    client.DeleteMessageBatchAsync(new DeleteMessageBatchRequest()
×
125
                    {
126
                        QueueUrl = url,
127
                        Entries  = resp.Messages.Select(Delete).ToList(),
128
                    });
129
                }
×
130
                catch (Exception e)
×
131
                {
×
132
                    log.ErrorFormat("Delete failed: {0}\r\n{1}", e.Message, e.StackTrace);
×
133
                }
×
134
            }
×
135
        }
×
136

137
        private IEnumerable<SendMessageBatchRequestEntry> Inflate(Message msg)
138
        {
×
139
            log.DebugFormat("Metadata returned: {0}", msg.Body);
×
140
            var netkans = YamlExtensions.Parse(msg.Body)
×
141
                                        .Select(ymap => new Metadata(ymap))
×
142
                                        .ToArray();
143

144
            int releases = 1;
×
145
            if (msg.MessageAttributes.TryGetValue("Releases", out MessageAttributeValue? releasesAttr))
×
146
            {
×
147
                releases = int.Parse(releasesAttr.StringValue);
×
148
            }
×
149

150
            ModuleVersion? highVer = null;
×
151
            if (msg.MessageAttributes.TryGetValue("HighestVersion", out MessageAttributeValue? highVerAttr))
×
152
            {
×
153
                highVer = new ModuleVersion(highVerAttr.StringValue);
×
154
            }
×
155

156
            ModuleVersion? highVerPre = null;
×
157
            if (msg.MessageAttributes.TryGetValue("HighestVersionPrerelease", out MessageAttributeValue? highVerPreAttr))
×
158
            {
×
159
                highVerPre = new ModuleVersion(highVerPreAttr.StringValue);
×
160
            }
×
161

162
            log.InfoFormat("Inflating {0}", netkans.First().Identifier);
×
163
            IEnumerable<Metadata>? ckans = null;
×
164
            bool    caught        = false;
×
165
            string? caughtMessage = null;
×
166
            var     opts          = new TransformOptions(releases,
×
167
                                                         null,
168
                                                         highVer,
169
                                                         highVerPre,
170
                                                         netkans.First().Staged,
171
                                                         netkans.First().StagingReason);
172
            try
173
            {
×
174
                ckans = inflator.Inflate($"{netkans.First().Identifier}.netkan", netkans, opts)
×
175
                    .ToArray();
176
            }
×
177
            catch (Exception e)
×
178
            {
×
179
                e = e.GetBaseException() ?? e;
×
180
                log.InfoFormat("Inflation failed, sending error: {0}", e.Message);
×
181
                // If you do this the sensible way, the C# compiler throws:
182
                // error CS1631: Cannot yield a value in the body of a catch clause
183
                caught        = true;
×
NEW
184
                caughtMessage = e switch
×
185
                {
186
                    // Short error message for problems with mods
NEW
187
                    Kraken k => k.Message,
×
188
                    // Full message with stack trace for problems with code
NEW
189
                    _ => e.ToString(),
×
190
                };
NEW
191
                if (e is RequestThrottledKraken rtk && rtk.retryTime is DateTime dt)
×
UNCOV
192
                {
×
193
                    // Let the API credits recharge
194
                    var span = dt.Subtract(DateTime.UtcNow);
×
195
                    caughtMessage += $@"; sleeping {Math.Floor(span.TotalMinutes):0}m {span.Seconds}s...";
×
196
                    Thread.Sleep(span);
×
197
                }
×
198
            }
×
199
            if (caught)
×
200
            {
×
201
                yield return inflationMessage(null, netkans.First(), opts, false, outputDir, caughtMessage);
×
202
            }
×
203
            if (ckans != null)
×
204
            {
×
205
                foreach (Metadata ckan in ckans)
×
206
                {
×
207
                    log.InfoFormat("Sending {0}-{1}", ckan.Identifier, ckan.Version);
×
208
                    yield return inflationMessage(ckan, netkans.First(), opts, true, outputDir);
×
209
                }
×
210
            }
×
211
        }
×
212

213
        private SendMessageBatchRequestEntry inflationMessage(Metadata?        ckan,
214
                                                              Metadata         netkan,
215
                                                              TransformOptions opts,
216
                                                              bool             success,
217
                                                              string?          outDir,
218
                                                              string?          err = null)
219
        {
×
220
            var attribs = new Dictionary<string, MessageAttributeValue>()
×
221
            {
222
                {
223
                    "ModIdentifier",
224
                    new MessageAttributeValue()
225
                    {
226
                        DataType    = "String",
227
                        StringValue = netkan.Identifier
228
                    }
229
                },
230
                {
231
                    "Staged",
232
                    new MessageAttributeValue()
233
                    {
234
                        DataType    = "String",
235
                        StringValue = opts.Staged.ToString()
236
                    }
237
                },
238
                {
239
                    "Success",
240
                    new MessageAttributeValue()
241
                    {
242
                        DataType    = "String",
243
                        StringValue = success.ToString()
244
                    }
245
                },
246
                {
247
                    "CheckTime",
248
                    new MessageAttributeValue()
249
                    {
250
                        DataType    = "String",
251
                        StringValue = DateTime.UtcNow.ToString("s", CultureInfo.InvariantCulture)
252
                    }
253
                },
254
                {
255
                    "GameId",
256
                    new MessageAttributeValue()
257
                    {
258
                        DataType    = "String",
259
                        StringValue = game.ShortName
260
                    }
261
                }
262
            };
263
            if (ckan != null)
×
264
            {
×
265
                attribs.Add(
×
266
                    "FileName",
267
                    new MessageAttributeValue()
268
                    {
269
                        DataType    = "String",
270
                        StringValue = Program.CkanFileName(outDir, ckan.AllJson)
271
                    }
272
                );
273
            }
×
274
            if (!string.IsNullOrEmpty(err))
×
275
            {
×
276
                attribs.Add(
×
277
                    "ErrorMessage",
278
                    new MessageAttributeValue()
279
                    {
280
                        DataType    = "String",
281
                        StringValue = err
282
                    }
283
                );
284
            }
×
285
            if (warningAppender.Warnings.Count != 0)
×
286
            {
×
287
                attribs.Add(
×
288
                    "WarningMessages",
289
                    new MessageAttributeValue()
290
                    {
291
                        DataType    = "String",
292
                        StringValue = string.Join("\r\n", warningAppender.Warnings),
293
                    }
294
                );
295
                warningAppender.Warnings.Clear();
×
296
            }
×
297
            if (opts.Staged && opts.StagingReasons.Count > 0)
×
298
            {
×
299
                attribs.Add(
×
300
                    "StagingReason",
301
                    new MessageAttributeValue()
302
                    {
303
                        DataType    = "String",
304
                        StringValue = string.Join("\r\n\r\n", opts.StagingReasons),
305
                    }
306
                );
307
            }
×
308
            return new SendMessageBatchRequestEntry()
×
309
            {
310
                Id                     = (responseId++).ToString(),
311
                MessageGroupId         = "1",
312
                MessageDeduplicationId = Path.GetRandomFileName(),
313
                MessageBody            = serializeCkan(ckan),
314
                MessageAttributes      = attribs,
315
            };
316
        }
×
317

318
        internal static string serializeCkan(Metadata? ckan)
319
        {
×
320
            if (ckan == null)
×
321
            {
×
322
                // SendMessage doesn't like empty bodies, so send an empty JSON object
323
                return "{}";
×
324
            }
325
            var sw = new StringWriter(new StringBuilder());
×
326
            using (var writer = new JsonTextWriter(sw)
×
327
                {
328
                    Formatting  = Formatting.Indented,
329
                    Indentation = 4,
330
                    IndentChar  = ' ',
331
                })
332
            {
×
333
                var serializer = new JsonSerializer();
×
334
                serializer.Serialize(writer, ckan.AllJson);
×
335
            }
×
336
            return sw + Environment.NewLine;
×
337
        }
×
338

339
        private DeleteMessageBatchRequestEntry Delete(Message msg)
340
        {
×
341
            return new DeleteMessageBatchRequestEntry()
×
342
            {
343
                Id            = msg.MessageId,
344
                ReceiptHandle = msg.ReceiptHandle,
345
            };
346
        }
×
347

348
        private readonly string?         outputDir;
349
        private readonly IGame           game;
350
        private readonly Inflator        inflator;
351
        private readonly AmazonSQSClient client = new AmazonSQSClient();
×
352

353
        private readonly string inputQueueURL;
354
        private readonly string outputQueueURL;
355

356
        private int responseId = 0;
×
357

358
        private static readonly ILog log = LogManager.GetLogger(typeof(QueueHandler));
×
359
        private readonly QueueAppender warningAppender;
360
    }
361
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc