• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mt89vein / Sstv.Outbox / 10439138780

18 Aug 2024 08:23AM UTC coverage: 67.144% (+0.06%) from 67.084%
10439138780

push

github

mt89vein
feat: outbox with autopartitioning

199 of 376 branches covered (52.93%)

Branch coverage included in aggregate %.

256 of 367 new or added lines in 18 files covered. (69.75%)

4 existing lines in 2 files now uncovered.

833 of 1161 relevant lines covered (71.75%)

69.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.98
/src/Sstv.Outbox.Npgsql/CompetingOutboxRepository.cs
1
using System.Diagnostics.CodeAnalysis;
2
using Dapper;
3
using Microsoft.Extensions.Options;
4
using Npgsql;
5

6
namespace Sstv.Outbox.Npgsql;
7

8
/// <summary>
9
/// Implements API for concurrent work of multiple workers.
10
/// </summary>
11
[SuppressMessage("Security", "CA2100:sql injection check", Justification = "There is no user input in sql")]
12
public sealed class CompetingOutboxRepository<TOutboxItem> : IOutboxRepository<TOutboxItem>
13
    where TOutboxItem : class, IOutboxItem
14
{
15
    private readonly OutboxOptions _options;
16
    private NpgsqlConnection? _connection;
17
    private NpgsqlTransaction? _transaction;
18

19
    /// <summary>
20
    /// Creates new instance of <see cref="CompetingOutboxRepository{TOutboxItem}"/>.
21
    /// </summary>
22
    /// <param name="options">Outbox options.</param>
23
    public CompetingOutboxRepository(IOptionsMonitor<OutboxOptions> options)
14✔
24
    {
25
        ArgumentNullException.ThrowIfNull(options);
14✔
26

27
        _options = options.Get(typeof(TOutboxItem).Name);
14✔
28
    }
14✔
29

30
    /// <summary>
31
    /// Lock, fetch and return outbox items.
32
    /// </summary>
33
    /// <param name="ct">Token for cancel operation.</param>
34
    /// <returns>OutboxItems.</returns>
35
    public async Task<IEnumerable<TOutboxItem>> LockAndReturnItemsBatchAsync(CancellationToken ct = default)
36
    {
37
        ct.ThrowIfCancellationRequested();
14✔
38

39
        _connection = await _options.GetNpgsqlDataSource().OpenConnectionAsync(ct);
14✔
40
        _transaction = await _connection.BeginTransactionAsync(ct);
14✔
41

42
        var m = _options.GetDbMapping();
14✔
43

44
        var filter = _options.PartitionSettings.Enabled
14!
45
            ? $" and {m.Status} <> {(int)OutboxItemStatus.Completed}"
14✔
46
            : string.Empty;
14✔
47

48
        var order = _options.GetPriorityFeature()
14✔
49
            .Enabled
14✔
50
            ? $"ORDER BY {m.Priority} DESC, {m.Id} ASC, {m.RetryAfter} ASC"
14✔
51
            : $"ORDER BY {m.Id} ASC, {m.RetryAfter} ASC";
14✔
52

53
        var sql = $"""
14✔
54
                     SELECT * FROM {m.QualifiedTableName}
14✔
55
                     WHERE ({m.RetryAfter} is null or {m.RetryAfter} <= @now::timestamptz){filter}
14✔
56
                     {order}
14✔
57
                     LIMIT {_options.OutboxItemsLimit}
14✔
58
                     FOR UPDATE SKIP LOCKED;
14✔
59
                   """;
14✔
60

61
        return await _connection.QueryAsync<TOutboxItem>(sql, transaction: _transaction,
14✔
62
            param: new { now = DateTimeOffset.UtcNow }
14✔
63
        );
14✔
64
    }
14✔
65

66
    /// <summary>
67
    /// Saves results.
68
    /// </summary>
69
    /// <param name="completed">Completed outbox items.</param>
70
    /// <param name="retried">Retries outbox items.</param>
71
    /// <param name="ct">Token for cancel operation.</param>
72
    public async Task SaveAsync(
73
        IReadOnlyCollection<TOutboxItem> completed,
74
        IReadOnlyCollection<TOutboxItem> retried,
75
        CancellationToken ct
76
    )
77
    {
78
        ArgumentNullException.ThrowIfNull(completed);
14✔
79
        ArgumentNullException.ThrowIfNull(retried);
14✔
80

81
        if (_connection is null)
14!
82
        {
83
            throw new InvalidOperationException("Connection was null");
×
84
        }
85

86
        if (_transaction is null)
14!
87
        {
88
            throw new InvalidOperationException("Transaction was null");
×
89
        }
90

91
        var m = _options.GetDbMapping();
14✔
92

93
        if (completed.Count > 0)
14✔
94
        {
95
            if (!_options.PartitionSettings.Enabled)
12!
96
            {
97
                const string ids = "ids";
98
                var sql = $"""
12✔
99
                           DELETE FROM {m.QualifiedTableName}
12✔
100
                           WHERE {m.Id} in (select * from unnest(@{ids}));
12✔
101
                           """;
12✔
102

103
                await using var cmd = _connection!.CreateCommand();
12✔
104
                cmd.CommandText = sql;
12✔
105
                cmd.Parameters.Add(new NpgsqlParameter<Guid[]>(ids, completed
12✔
106
                    .Select(o => o.Id)
112✔
107
                    .ToArray()));
12✔
108

109
                await cmd.ExecuteNonQueryAsync(ct);
12✔
110
            }
12✔
111
            else
112
            {
113
                const string ids = "ids";
NEW
114
                var sql = $"""
×
NEW
115
                           UPDATE {m.QualifiedTableName}
×
NEW
116
                           SET "{m.Status}" = {(int)OutboxItemStatus.Completed}
×
NEW
117
                           WHERE {m.Id} in (select * from unnest(@{ids}));
×
NEW
118
                           """;
×
119

NEW
120
                await using var cmd = _connection!.CreateCommand();
×
NEW
121
                cmd.CommandText = sql;
×
NEW
122
                cmd.Parameters.Add(new NpgsqlParameter<Guid[]>(ids, completed
×
NEW
123
                    .Select(o => o.Id)
×
NEW
124
                    .ToArray()));
×
NEW
125
                await cmd.ExecuteNonQueryAsync(ct);
×
NEW
126
            }
×
127
        }
128

129
        if (retried.Count > 0)
14✔
130
        {
131
            var sql = $"""
3✔
132
                       UPDATE {m.QualifiedTableName}
3✔
133
                       SET "{m.Status}" = data."{m.Status}",
3✔
134
                           "{m.RetryCount}" = data."{m.RetryCount}",
3✔
135
                           "{m.RetryAfter}"  = data."{m.RetryAfter}"
3✔
136
                       FROM (SELECT * FROM unnest(@{m.Id}, @{m.Status}, @{m.RetryCount}, @{m.RetryAfter}))
3✔
137
                                        AS data("{m.Id}", "{m.Status}", "{m.RetryCount}", "{m.RetryAfter}")
3✔
138
                       WHERE {m.QualifiedTableName}."{m.Id}" = data."{m.Id}";
3✔
139
                       """;
3✔
140

141
            await using var cmd = _connection!.CreateCommand();
3✔
142
            cmd.CommandText = sql;
3✔
143
            cmd.Parameters.Add(new NpgsqlParameter<Guid[]>(m.Id, retried.Select(e => e.Id).ToArray()));
31✔
144
            cmd.Parameters.Add(new NpgsqlParameter<int[]>(m.Status, retried.Select(e => (int)((IHasStatus)e).Status).ToArray()));
31✔
145
            cmd.Parameters.Add(new NpgsqlParameter<int?[]>(m.RetryCount, retried.Select(e => ((IHasStatus)e).RetryCount).ToArray()));
31✔
146
            cmd.Parameters.Add(new NpgsqlParameter<DateTimeOffset?[]>(m.RetryAfter, retried.Select(e => ((IHasStatus)e).RetryAfter).ToArray()));
31✔
147
            await cmd.ExecuteNonQueryAsync(ct);
3✔
148
        }
3✔
149

150
        await _transaction.CommitAsync(ct);
14✔
151
    }
14✔
152

153
    /// <summary>
154
    /// Cleans resources.
155
    /// </summary>
156
    public void Dispose()
157
    {
158
        _connection?.Dispose();
×
159
        _transaction?.Dispose();
×
160
    }
×
161

162
    /// <summary>
163
    /// Cleans resources.
164
    /// </summary>
165
    public async ValueTask DisposeAsync()
166
    {
167
        if (_connection != null)
28✔
168
        {
169
            await _connection.DisposeAsync();
28✔
170
        }
171

172
        if (_transaction != null)
28✔
173
        {
174
            await _transaction.DisposeAsync();
28✔
175
        }
176
    }
28✔
177
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc