• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

KSP-CKAN / CKAN / 15513928963

08 Jun 2025 02:16AM UTC coverage: 30.329% (+0.004%) from 30.325%
15513928963

push

github

HebaruSan
Revert provisional changes, rethrow inner exceptions with original stack traces

4072 of 14383 branches covered (28.31%)

Branch coverage included in aggregate %.

4 of 8 new or added lines in 4 files covered. (50.0%)

6 existing lines in 4 files now uncovered.

13729 of 44310 relevant lines covered (30.98%)

0.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.0
/Netkan/Processors/QueueHandler.cs
1
using System;
2
using System.Collections.Generic;
3
using System.Globalization;
4
using System.IO;
5
using System.Linq;
6
using System.Text;
7
using System.Threading;
8

9
using Amazon.SQS;
10
using Amazon.SQS.Model;
11
using log4net;
12
using log4net.Core;
13
using log4net.Filter;
14
using log4net.Repository.Hierarchy;
15
using Newtonsoft.Json;
16

17
using CKAN.Versioning;
18
using CKAN.NetKAN.Transformers;
19
using CKAN.NetKAN.Model;
20
using CKAN.Extensions;
21
using CKAN.Games;
22

23
namespace CKAN.NetKAN.Processors
24
{
25
    public class QueueHandler
26
    {
27
        public QueueHandler(string  inputQueueName,
×
28
                            string  outputQueueName,
29
                            string? cacheDir,
30
                            string? outputDir,
31
                            bool    overwriteCache,
32
                            string? githubToken,
33
                            string? gitlabToken,
34
                            string? userAgent,
35
                            bool?   prerelease,
36
                            IGame   game)
37
        {
×
38
            this.outputDir = outputDir;
×
39
            warningAppender = GetQueueLogAppender();
×
40
            (LogManager.GetRepository() as Hierarchy)?.Root.AddAppender(warningAppender);
×
41
            this.game = game;
×
42

43
            log.Debug("Initializing SQS queue handler");
×
44
            inflator = new Inflator(cacheDir, overwriteCache, githubToken, gitlabToken, userAgent, prerelease, game);
×
45

46
            inputQueueURL  = getQueueUrl(inputQueueName);
×
47
            outputQueueURL = getQueueUrl(outputQueueName);
×
48
            log.DebugFormat("Queue URLs: {0}, {1}", inputQueueURL, outputQueueURL);
×
49
        }
×
50

51
        ~QueueHandler()
52
        {
×
53
            (LogManager.GetRepository() as Hierarchy)?.Root.RemoveAppender(warningAppender);
×
54
        }
×
55

56
        public void Process()
57
        {
×
58
            while (true)
×
59
            {
×
60
                // 10 messages, 30 minutes to allow time to handle them all
61
                handleMessages(inputQueueURL, 10, 30);
×
62
            }
×
63
        }
64

65
        private static QueueAppender GetQueueLogAppender()
66
        {
×
67
            var qap = new QueueAppender()
×
68
            {
69
                Name = "QueueAppender",
70
            };
71
            qap.AddFilter(new LevelMatchFilter()
×
72
            {
73
                LevelToMatch  = Level.Warn,
74
                AcceptOnMatch = true,
75
            });
76
            qap.AddFilter(new DenyAllFilter());
×
77
            return qap;
×
78
        }
×
79

80
        private string getQueueUrl(string name)
81
        {
×
82
            log.DebugFormat("Looking up URL for queue {0}", name);
×
83
            return client.GetQueueUrlAsync(new GetQueueUrlRequest() { QueueName = name }).Result.QueueUrl;
×
84
        }
×
85

86
        private void handleMessages(string url, int howMany, int timeoutMinutes)
87
        {
×
88
            log.DebugFormat("Looking for messages from {0}", url);
×
89
            var resp = client.ReceiveMessageAsync(new ReceiveMessageRequest()
×
90
            {
91
                QueueUrl              = url,
92
                MaxNumberOfMessages   = howMany,
93
                VisibilityTimeout     = (int)TimeSpan.FromMinutes(timeoutMinutes).TotalSeconds,
94
                MessageAttributeNames = new List<string>() { "All" },
95
            }).Result;
96
            if (resp.Messages.Count == 0)
×
97
            {
×
98
                log.Debug("No metadata in queue");
×
99
            }
×
100
            else
101
            {
×
102
                try
103
                {
×
104
                    // Reset the ids between batches
105
                    responseId = 0;
×
106
                    // Might be >10 if Releases>1
107
                    var responses = resp.Messages.SelectMany(Inflate).ToList();
×
108
                    for (int i = 0; i < responses.Count; i += howMany)
×
109
                    {
×
110
                        client.SendMessageBatchAsync(new SendMessageBatchRequest()
×
111
                        {
112
                            QueueUrl = outputQueueURL,
113
                            Entries  = responses.GetRange(i, Math.Min(howMany, responses.Count - i)),
114
                        });
115
                    }
×
116
                }
×
117
                catch (Exception e)
×
118
                {
×
119
                    log.ErrorFormat("Send failed: {0}\r\n{1}", e.Message, e.StackTrace);
×
120
                }
×
121
                try
122
                {
×
123
                    log.Debug("Deleting messages");
×
124
                    client.DeleteMessageBatchAsync(new DeleteMessageBatchRequest()
×
125
                    {
126
                        QueueUrl = url,
127
                        Entries  = resp.Messages.Select(Delete).ToList(),
128
                    });
129
                }
×
130
                catch (Exception e)
×
131
                {
×
132
                    log.ErrorFormat("Delete failed: {0}\r\n{1}", e.Message, e.StackTrace);
×
133
                }
×
134
            }
×
135
        }
×
136

137
        private IEnumerable<SendMessageBatchRequestEntry> Inflate(Message msg)
138
        {
×
139
            log.DebugFormat("Metadata returned: {0}", msg.Body);
×
140
            var netkans = YamlExtensions.Parse(msg.Body)
×
141
                                        .Select(ymap => new Metadata(ymap))
×
142
                                        .ToArray();
143

144
            int releases = 1;
×
145
            if (msg.MessageAttributes.TryGetValue("Releases", out MessageAttributeValue? releasesAttr))
×
146
            {
×
147
                releases = int.Parse(releasesAttr.StringValue);
×
148
            }
×
149

150
            ModuleVersion? highVer = null;
×
151
            if (msg.MessageAttributes.TryGetValue("HighestVersion", out MessageAttributeValue? highVerAttr))
×
152
            {
×
153
                highVer = new ModuleVersion(highVerAttr.StringValue);
×
154
            }
×
155

156
            ModuleVersion? highVerPre = null;
×
157
            if (msg.MessageAttributes.TryGetValue("HighestVersionPrerelease", out MessageAttributeValue? highVerPreAttr))
×
158
            {
×
159
                highVerPre = new ModuleVersion(highVerPreAttr.StringValue);
×
160
            }
×
161

162
            log.InfoFormat("Inflating {0}", netkans.First().Identifier);
×
163
            IEnumerable<Metadata>? ckans = null;
×
164
            bool    caught        = false;
×
165
            string? caughtMessage = null;
×
166
            var     opts          = new TransformOptions(releases,
×
167
                                                         null,
168
                                                         highVer,
169
                                                         highVerPre,
170
                                                         netkans.First().Staged,
171
                                                         netkans.First().StagingReason);
172
            try
173
            {
×
174
                ckans = inflator.Inflate($"{netkans.First().Identifier}.netkan", netkans, opts)
×
175
                    .ToArray();
176
            }
×
177
            catch (Exception e)
×
178
            {
×
179
                e = e.GetBaseException() ?? e;
×
180
                log.InfoFormat("Inflation failed, sending error: {0}", e.Message);
×
181
                // If you do this the sensible way, the C# compiler throws:
182
                // error CS1631: Cannot yield a value in the body of a catch clause
183
                caught        = true;
×
NEW
184
                caughtMessage = e.Message;
×
UNCOV
185
                if (e is RequestThrottledKraken k && k.retryTime is DateTime dt)
×
186
                {
×
187
                    // Let the API credits recharge
188
                    var span = dt.Subtract(DateTime.UtcNow);
×
189
                    caughtMessage += $@"; sleeping {Math.Floor(span.TotalMinutes):0}m {span.Seconds}s...";
×
190
                    Thread.Sleep(span);
×
191
                }
×
192
            }
×
193
            if (caught)
×
194
            {
×
195
                yield return inflationMessage(null, netkans.First(), opts, false, outputDir, caughtMessage);
×
196
            }
×
197
            if (ckans != null)
×
198
            {
×
199
                foreach (Metadata ckan in ckans)
×
200
                {
×
201
                    log.InfoFormat("Sending {0}-{1}", ckan.Identifier, ckan.Version);
×
202
                    yield return inflationMessage(ckan, netkans.First(), opts, true, outputDir);
×
203
                }
×
204
            }
×
205
        }
×
206

207
        private SendMessageBatchRequestEntry inflationMessage(Metadata?        ckan,
208
                                                              Metadata         netkan,
209
                                                              TransformOptions opts,
210
                                                              bool             success,
211
                                                              string?          outDir,
212
                                                              string?          err = null)
213
        {
×
214
            var attribs = new Dictionary<string, MessageAttributeValue>()
×
215
            {
216
                {
217
                    "ModIdentifier",
218
                    new MessageAttributeValue()
219
                    {
220
                        DataType    = "String",
221
                        StringValue = netkan.Identifier
222
                    }
223
                },
224
                {
225
                    "Staged",
226
                    new MessageAttributeValue()
227
                    {
228
                        DataType    = "String",
229
                        StringValue = opts.Staged.ToString()
230
                    }
231
                },
232
                {
233
                    "Success",
234
                    new MessageAttributeValue()
235
                    {
236
                        DataType    = "String",
237
                        StringValue = success.ToString()
238
                    }
239
                },
240
                {
241
                    "CheckTime",
242
                    new MessageAttributeValue()
243
                    {
244
                        DataType    = "String",
245
                        StringValue = DateTime.UtcNow.ToString("s", CultureInfo.InvariantCulture)
246
                    }
247
                },
248
                {
249
                    "GameId",
250
                    new MessageAttributeValue()
251
                    {
252
                        DataType    = "String",
253
                        StringValue = game.ShortName
254
                    }
255
                }
256
            };
257
            if (ckan != null)
×
258
            {
×
259
                attribs.Add(
×
260
                    "FileName",
261
                    new MessageAttributeValue()
262
                    {
263
                        DataType    = "String",
264
                        StringValue = Program.CkanFileName(outDir, ckan.AllJson)
265
                    }
266
                );
267
            }
×
268
            if (!string.IsNullOrEmpty(err))
×
269
            {
×
270
                attribs.Add(
×
271
                    "ErrorMessage",
272
                    new MessageAttributeValue()
273
                    {
274
                        DataType    = "String",
275
                        StringValue = err
276
                    }
277
                );
278
            }
×
279
            if (warningAppender.Warnings.Count != 0)
×
280
            {
×
281
                attribs.Add(
×
282
                    "WarningMessages",
283
                    new MessageAttributeValue()
284
                    {
285
                        DataType    = "String",
286
                        StringValue = string.Join("\r\n", warningAppender.Warnings),
287
                    }
288
                );
289
                warningAppender.Warnings.Clear();
×
290
            }
×
291
            if (opts.Staged && opts.StagingReasons.Count > 0)
×
292
            {
×
293
                attribs.Add(
×
294
                    "StagingReason",
295
                    new MessageAttributeValue()
296
                    {
297
                        DataType    = "String",
298
                        StringValue = string.Join("\r\n\r\n", opts.StagingReasons),
299
                    }
300
                );
301
            }
×
302
            return new SendMessageBatchRequestEntry()
×
303
            {
304
                Id                     = (responseId++).ToString(),
305
                MessageGroupId         = "1",
306
                MessageDeduplicationId = Path.GetRandomFileName(),
307
                MessageBody            = serializeCkan(ckan),
308
                MessageAttributes      = attribs,
309
            };
310
        }
×
311

312
        internal static string serializeCkan(Metadata? ckan)
313
        {
×
314
            if (ckan == null)
×
315
            {
×
316
                // SendMessage doesn't like empty bodies, so send an empty JSON object
317
                return "{}";
×
318
            }
319
            var sw = new StringWriter(new StringBuilder());
×
320
            using (var writer = new JsonTextWriter(sw)
×
321
                {
322
                    Formatting  = Formatting.Indented,
323
                    Indentation = 4,
324
                    IndentChar  = ' ',
325
                })
326
            {
×
327
                var serializer = new JsonSerializer();
×
328
                serializer.Serialize(writer, ckan.AllJson);
×
329
            }
×
330
            return sw + Environment.NewLine;
×
331
        }
×
332

333
        private DeleteMessageBatchRequestEntry Delete(Message msg)
334
        {
×
335
            return new DeleteMessageBatchRequestEntry()
×
336
            {
337
                Id            = msg.MessageId,
338
                ReceiptHandle = msg.ReceiptHandle,
339
            };
340
        }
×
341

342
        private readonly string?         outputDir;
343
        private readonly IGame           game;
344
        private readonly Inflator        inflator;
345
        private readonly AmazonSQSClient client = new AmazonSQSClient();
×
346

347
        private readonly string inputQueueURL;
348
        private readonly string outputQueueURL;
349

350
        private int responseId = 0;
×
351

352
        private static readonly ILog log = LogManager.GetLogger(typeof(QueueHandler));
×
353
        private readonly QueueAppender warningAppender;
354
    }
355
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc