• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mt89vein / Sstv.Outbox / 10439138780

18 Aug 2024 08:23AM UTC coverage: 67.144% (+0.06%) from 67.084%
10439138780

push

github

mt89vein
feat: outbox with autopartitioning

199 of 376 branches covered (52.93%)

Branch coverage included in aggregate %.

256 of 367 new or added lines in 18 files covered. (69.75%)

4 existing lines in 2 files now uncovered.

833 of 1161 relevant lines covered (71.75%)

69.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.74
/src/Sstv.Outbox.Npgsql/StrictOrderingOutboxRepository.cs
1
using System.Diagnostics.CodeAnalysis;
2
using Dapper;
3
using Microsoft.Extensions.Options;
4
using Npgsql;
5

6
namespace Sstv.Outbox.Npgsql;
7

8
/// <summary>
9
/// Implements API for single active worker.
10
/// </summary>
11
[SuppressMessage("Security", "CA2100:sql injection check", Justification = "There is no user input in sql")]
12
public sealed class StrictOrderingOutboxRepository<TOutboxItem> : IOutboxRepository<TOutboxItem>
13
    where TOutboxItem : class, IOutboxItem
14
{
15
    private readonly OutboxOptions _options;
16
    private NpgsqlConnection? _connection;
17
    private NpgsqlTransaction? _transaction;
18

19
    /// <summary>
20
    /// Creates new instance of <see cref="StrictOrderingOutboxRepository{TOutboxItem}"/>.
21
    /// </summary>
22
    /// <param name="options">Outbox Options.</param>
23
    public StrictOrderingOutboxRepository(IOptionsMonitor<OutboxOptions> options)
12✔
24
    {
25
        ArgumentNullException.ThrowIfNull(options);
12✔
26

27
        _options = options.Get(typeof(TOutboxItem).Name);
12✔
28
    }
12✔
29

30
    /// <summary>
31
    /// Lock, fetch and return outbox items.
32
    /// </summary>
33
    /// <param name="ct">Token for cancel operation.</param>
34
    /// <returns>OutboxItems.</returns>
35
    public async Task<IEnumerable<TOutboxItem>> LockAndReturnItemsBatchAsync(CancellationToken ct = default)
36
    {
37
        ct.ThrowIfCancellationRequested();
12✔
38

39
        _connection = await _options.GetNpgsqlDataSource().OpenConnectionAsync(ct);
12✔
40
        _transaction = await _connection.BeginTransactionAsync(ct);
12✔
41

42
        var m = _options.GetDbMapping();
12✔
43

44
        var filter = _options.PartitionSettings.Enabled
12!
45
            ? $"WHERE {m.Status} <> {(int)OutboxItemStatus.Completed}"
12✔
46
            : string.Empty;
12✔
47

48
        var order = _options.GetPriorityFeature()
12!
49
            .Enabled
12✔
50
            ? $"ORDER BY {m.Priority} DESC, {m.Id} ASC"
12✔
51
            : $"ORDER BY {m.Id} ASC";
12✔
52

53
        var sql = $"""
12✔
54
                   SELECT * FROM {m.QualifiedTableName}
12✔
55
                   {filter}
12✔
56
                   {order}
12✔
57
                   LIMIT {_options.OutboxItemsLimit}
12✔
58
                   FOR UPDATE NOWAIT;
12✔
59
                   """;
12✔
60

61
        try
62
        {
63
            return await _connection.QueryAsync<TOutboxItem>(sql, transaction: _transaction);
12✔
64
        }
65
        catch (PostgresException e) when (e.SqlState == PostgresErrorCodes.LockNotAvailable)
9✔
66
        {
67
            return Array.Empty<TOutboxItem>();
9✔
68
        }
69
    }
12✔
70

71
    /// <summary>
72
    /// Saves results.
73
    /// </summary>
74
    /// <param name="completed">Completed outbox items.</param>
75
    /// <param name="retried">Retries outbox items.</param>
76
    /// <param name="ct">Token for cancel operation.</param>
77
    public async Task SaveAsync(
78
        IReadOnlyCollection<TOutboxItem> completed,
79
        IReadOnlyCollection<TOutboxItem> retried,
80
        CancellationToken ct
81
    )
82
    {
83
        ArgumentNullException.ThrowIfNull(completed);
3✔
84
        ArgumentNullException.ThrowIfNull(retried);
3✔
85

86
        if (_transaction is null)
3!
87
        {
88
            throw new InvalidOperationException("Transaction was null");
×
89
        }
90

91
        if (retried.Count != 0)
3!
92
        {
93
            throw new NotSupportedException("Retry not supported for strict ordering worker");
×
94
        }
95

96
        var m = _options.GetDbMapping();
3✔
97

98
        if (!_options.PartitionSettings.Enabled)
3!
99
        {
100
            const string ids = "ids";
101
            var sql = $"""
3✔
102
                       DELETE FROM {m.QualifiedTableName}
3✔
103
                       WHERE {m.Id} in (select * from unnest(@{ids}));
3✔
104
                       """;
3✔
105

106
            await using var cmd = _connection!.CreateCommand();
3✔
107
            cmd.CommandText = sql;
3✔
108
            cmd.Parameters.Add(new NpgsqlParameter<Guid[]>(ids, completed
3✔
109
                .Select(o => o.Id)
22✔
110
                .ToArray()));
3✔
111

112
            await cmd.ExecuteNonQueryAsync(ct);
3✔
113
        }
3✔
114
        else
115
        {
116
            const string ids = "ids";
NEW
117
            var sql = $"""
×
NEW
118
                       UPDATE {m.QualifiedTableName}
×
NEW
119
                       SET "{m.Status}" = {(int)OutboxItemStatus.Completed}
×
NEW
120
                       WHERE {m.Id} in (select * from unnest(@{ids}));
×
NEW
121
                       """;
×
122

NEW
123
            await using var cmd = _connection!.CreateCommand();
×
NEW
124
            cmd.CommandText = sql;
×
NEW
125
            cmd.Parameters.Add(new NpgsqlParameter<Guid[]>(ids, completed
×
NEW
126
                .Select(o => o.Id)
×
NEW
127
                .ToArray()));
×
NEW
128
            await cmd.ExecuteNonQueryAsync(ct);
×
NEW
129
        }
×
130

131
        await _transaction.CommitAsync(ct);
3✔
132
    }
3✔
133

134
    /// <summary>
135
    /// Cleans resources.
136
    /// </summary>
137
    public void Dispose()
138
    {
139
        _connection?.Dispose();
×
140
        _transaction?.Dispose();
×
141
    }
×
142

143
    /// <summary>
144
    /// Cleans resources.
145
    /// </summary>
146
    public async ValueTask DisposeAsync()
147
    {
148
        if (_connection != null)
24✔
149
        {
150
            await _connection.DisposeAsync();
24✔
151
        }
152

153
        if (_transaction != null)
24✔
154
        {
155
            await _transaction.DisposeAsync();
24✔
156
        }
157
    }
24✔
158
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc