• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

JonasMacielWork / SkillLearning / 16431941533

22 Jul 2025 01:09AM UTC coverage: 57.508% (-0.3%) from 57.814%
16431941533

push

github

web-flow
Merge pull request #46 from JonasMacielWork/develop

Develop

95 of 134 branches covered (70.9%)

Branch coverage included in aggregate %.

468 of 845 relevant lines covered (55.38%)

2.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.97
/Backend/SkillLearning.Infrastructure/Services/KafkaConsumerService.cs
1
using Confluent.Kafka;
2
using Microsoft.Extensions.Options;
3
using SkillLearning.Application.Common.Configuration;
4
using SkillLearning.Application.Common.Interfaces;
5
using System.Text.Json;
6

7
namespace SkillLearning.Infrastructure.Services
8
{
9
    public class KafkaConsumerService<TKey, TValue> : IKafkaConsumerService<TKey, TValue>
10
    {
11
        private readonly IConsumer<TKey, string> _consumer;
12
        private CancellationTokenSource? _consumeCancellationTokenSource;
13
        private bool _disposed;
14

15
        public KafkaConsumerService(IOptions<KafkaSettings> kafkaSettings, Action<ConsumerConfig>? configureOverrides = null)
1✔
16
        {
17
            var config = new ConsumerConfig
1✔
18
            {
1✔
19
                BootstrapServers = kafkaSettings.Value.BootstrapServers,
1✔
20
                GroupId = $"skilllearning-email-sender-group",
1✔
21
                AutoOffsetReset = AutoOffsetReset.Earliest,
1✔
22
                EnableAutoCommit = false,
1✔
23
                ClientId = $"skilllearning-worker-{Guid.NewGuid()}",
1✔
24
                SecurityProtocol = SecurityProtocol.Plaintext
1✔
25
            };
1✔
26

27
            configureOverrides?.Invoke(config);
1!
28

29
            _consumer = new ConsumerBuilder<TKey, string>(config)
1✔
30
                .SetKeyDeserializer(KafkaConsumerService<TKey, TValue>.CreateKeyDeserializer())
1✔
31
                .SetValueDeserializer(Deserializers.Utf8)
1✔
32
                .Build();
1✔
33
        }
1✔
34

35
        private static IDeserializer<TKey> CreateKeyDeserializer()
36
        {
37
            if (typeof(TKey) == typeof(Null))
1!
38
            {
39
                return (IDeserializer<TKey>)Deserializers.Null;
1✔
40
            }
41
            else if (typeof(TKey) == typeof(string))
×
42
            {
43
                return (IDeserializer<TKey>)Deserializers.Utf8;
×
44
            }
45
            throw new NotSupportedException($"Key deserializer for type {typeof(TKey).Name} is not supported.");
×
46
        }
47

48
        public async Task StartConsuming(string topic, Func<TValue, Task> handler, CancellationToken cancellationToken)
49
        {
50
            if (_consumeCancellationTokenSource != null && !_consumeCancellationTokenSource.IsCancellationRequested)
1!
51
                return;
×
52

53
            _consumeCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
1✔
54

55
            _consumer.Subscribe(topic);
1✔
56

57
            try
58
            {
59
                while (!_consumeCancellationTokenSource.Token.IsCancellationRequested)
2✔
60
                {
61
                    try
62
                    {
63
                        var consumeResult = _consumer.Consume(TimeSpan.FromSeconds(1));
1✔
64

65
                        if (consumeResult == null || consumeResult.IsPartitionEOF)
1!
66
                            continue;
×
67

68
                        var deserializedValue = JsonSerializer.Deserialize<TValue>(consumeResult.Message.Value, new JsonSerializerOptions
1✔
69
                        {
1✔
70
                            PropertyNamingPolicy = JsonNamingPolicy.CamelCase
1✔
71
                        });
1✔
72

73
                        if (deserializedValue is not null)
1✔
74
                            await handler(deserializedValue);
1✔
75

76
                        _consumer.Commit(consumeResult);
1✔
77
                    }
1✔
78
                    catch (ConsumeException ex) when (ex.Error.Code == ErrorCode.UnknownTopicOrPart)
×
79
                    {
80
                        await Task.Delay(5000, _consumeCancellationTokenSource.Token);
×
81
                    }
×
82
                    catch (OperationCanceledException)
×
83
                    {
84
                        break;
×
85
                    }
86
                    catch (Exception ex)
×
87
                    {
88
                        Console.WriteLine("❌ Erro inesperado no consumo:");
×
89
                        Console.WriteLine(ex);
×
90
                        await Task.Delay(1000);
×
91
                    }
92
                }
93
            }
1✔
94
            finally
95
            {
96
                _consumer.Unsubscribe();
1✔
97
            }
98
        }
1✔
99

100
        public void StopConsuming()
101
        {
102
            _consumeCancellationTokenSource?.Cancel();
×
103
        }
×
104

105
        protected virtual void Dispose(bool disposing)
106
        {
107
            if (!_disposed)
1✔
108
            {
109
                if (disposing)
1✔
110
                {
111
                    _consumer.Close();
1✔
112
                    _consumer.Dispose();
1✔
113
                    _consumeCancellationTokenSource?.Dispose();
1!
114
                }
115

116
                _disposed = true;
1✔
117
            }
118
        }
1✔
119

120
        public void Dispose()
121
        {
122
            Dispose(true);
1✔
123
            GC.SuppressFinalize(this);
1✔
124
        }
1✔
125
    }
126
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc