• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mt89vein / Sstv.Outbox / 10439200421

18 Aug 2024 08:32AM UTC coverage: 66.428% (-0.7%) from 67.084%
10439200421

push

github

mt89vein
feat: outbox with autopartitioning

199 of 376 branches covered (52.93%)

Branch coverage included in aggregate %.

245 of 367 new or added lines in 18 files covered. (66.76%)

4 existing lines in 2 files now uncovered.

822 of 1161 relevant lines covered (70.8%)

69.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.36
/src/Sstv.Outbox.Npgsql/Partitioner.cs
1
using System.Diagnostics.CodeAnalysis;
2
using Microsoft.Extensions.Logging;
3
using Microsoft.Extensions.Options;
4
using Npgsql;
5
using Sstv.Outbox.Features.Partitions;
6

7
namespace Sstv.Outbox.Npgsql;
8

9
/// <summary>
10
/// Table partitioner.
11
/// </summary>
12
public sealed partial class Partitioner<TOutboxItem> : IPartitioner<TOutboxItem>
13
    where TOutboxItem : class, IOutboxItem
14
{
15
    /// <summary>
16
    /// Options.
17
    /// </summary>
18
    private readonly OutboxOptions _options;
19

20
    /// <summary>
21
    /// Logger.
22
    /// </summary>
23
    private readonly ILogger<Partitioner<TOutboxItem>> _logger;
24

25
    /// <summary>
26
    /// Creates new instance of <see cref="Partitioner{TOutboxItem}"/>.
27
    /// </summary>
28
    /// <param name="options">Options.</param>
29
    /// <param name="logger">Logger.</param>
30
    public Partitioner(
16✔
31
        IOptionsMonitor<OutboxOptions> options,
16✔
32
        ILogger<Partitioner<TOutboxItem>> logger
16✔
33
    )
16✔
34
    {
35
        ArgumentNullException.ThrowIfNull(options);
16✔
36

37
        _options = options.Get(typeof(TOutboxItem).Name);
16✔
38
        _logger = logger;
16✔
39
    }
16✔
40

41
    /// <summary>
42
    /// Precreates partitions for entity.
43
    /// </summary>
44
    /// <param name="ct">Token for cancel operation.</param>
45
    [SuppressMessage("Security", "CA2100:Risk of vulnerability to SQL injection.",
46
        Justification = "There is no user input.")]
47
    public async Task CreatePartitionsAsync(CancellationToken ct = default)
48
    {
49
        try
50
        {
51
            var m = _options.GetDbMapping();
16✔
52
            var connection = await _options
16✔
53
                .GetNpgsqlDataSource()
16✔
54
                .OpenConnectionAsync(ct);
16✔
55

56
            await using var cmd = connection.CreateCommand();
16✔
57

58
            foreach (var partition in _options.PartitionSettings.GetPartitions(m.TableName, DateTimeOffset.UtcNow))
256✔
59
            {
60
                var from = _options.PartitionSettings.UuidV7Generator.ForDate(partition.DateFrom.UtcDateTime);
112✔
61
                var to = _options.PartitionSettings.UuidV7Generator.ForDate(partition.DateTo.UtcDateTime);
112✔
62

63
                var sql = $"""
112✔
64
                           CREATE TABLE IF NOT EXISTS {m.SchemaName}.{partition.PartitionTableName} PARTITION OF {m.QualifiedTableName} FOR values
112✔
65
                           FROM (overlay('{from}'::text placing '0000-0000-000000000000' from 15)::uuid)
112✔
66
                           TO   (overlay('{to}'::text placing '0000-0000-000000000000' from 15)::uuid)
112✔
67
                           WITH (fillfactor = 90);
112✔
68
                           """;
112✔
69

70
                cmd.CommandText = sql;
112✔
71

72
                await cmd.ExecuteNonQueryAsync(ct);
112✔
73
            }
74
        }
16✔
NEW
75
        catch (PostgresException e) when (e.Message.Contains("is not partitioned"))
×
76
        {
NEW
77
            PartitioningNotConfigured(e, typeof(TOutboxItem).Name);
×
78

NEW
79
            throw;
×
80
        }
NEW
81
        catch (Exception e)
×
82
        {
NEW
83
            CreatePartitionFailed(e, typeof(TOutboxItem).Name);
×
84

NEW
85
            throw;
×
86
        }
87
    }
16✔
88

89
    /// <summary>
90
    /// Removes old partitions for <typeparamref name="TOutboxItem"/>.
91
    /// </summary>
92
    /// <param name="ct">Token for cancel operation.</param>
93
    [SuppressMessage("Security", "CA2100:Risk of vulnerability to SQL injection.",
94
        Justification = "There is no user input.")]
95
    public async Task DeleteOldPartitionsAsync(CancellationToken ct = default)
96
    {
97
        try
98
        {
99
            var m = _options.GetDbMapping();
16✔
100
            var connection = await _options
16✔
101
                .GetNpgsqlDataSource()
16✔
102
                .OpenConnectionAsync(ct);
16✔
103

104
            await using var cmd = connection.CreateCommand();
16✔
105

106
            var partitionsForDelete = _options
16✔
107
                .PartitionSettings
16✔
108
                .GetPartitions(
16✔
109
                    m.TableName,
16✔
110
                    startFrom: DateTimeOffset.UtcNow.AddDays(-_options.PartitionSettings.PrecreatePartitionCount))
16✔
111
                .Reverse()
16✔
112
                .Skip(_options.PartitionSettings.PartitionRetentionCount);
16✔
113

114
            foreach (var partition in partitionsForDelete)
160✔
115
            {
116
                try
117
                {
118
                    var sql = $"ALTER TABLE {m.QualifiedTableName} DETACH PARTITION {m.SchemaName}.{partition.PartitionTableName} CONCURRENTLY;";
64✔
119
                    cmd.CommandText = sql;
64✔
120

121
                    await cmd.ExecuteNonQueryAsync(ct);
64✔
122

NEW
123
                    sql = $"DROP TABLE {m.SchemaName}.{partition.PartitionTableName};";
×
NEW
124
                    cmd.CommandText = sql;
×
125

NEW
126
                    await cmd.ExecuteNonQueryAsync(ct);
×
NEW
127
                }
×
128
                catch (PostgresException e) when (e.Message.Contains("does not exist"))
64✔
129
                {
130
                    continue;
64✔
131
                }
NEW
132
            }
×
133
        }
16✔
NEW
134
        catch (PostgresException e) when (e.Message.Contains("is not partitioned"))
×
135
        {
NEW
136
            PartitioningNotConfigured(e, typeof(TOutboxItem).Name);
×
137

NEW
138
            throw;
×
139
        }
NEW
140
        catch (Exception e)
×
141
        {
NEW
142
            DropPartitionFailed(
×
NEW
143
                e,
×
NEW
144
                outboxItem: typeof(TOutboxItem).Name
×
NEW
145
            );
×
146

NEW
147
            throw;
×
148
        }
149
    }
16✔
150

151
    [LoggerMessage(
152
        eventId: 0,
153
        level: LogLevel.Error,
154
        message: "Partitioning for {OutboxItem} not configured!"
155
    )]
156
    private partial void PartitioningNotConfigured(Exception e, string outboxItem);
157

158
    [LoggerMessage(
159
        eventId: 1,
160
        level: LogLevel.Error,
161
        message: "Error occured while precreating partitions for {OutboxItem}"
162
    )]
163
    private partial void CreatePartitionFailed(Exception e, string outboxItem);
164

165
    [LoggerMessage(
166
        eventId: 2,
167
        level: LogLevel.Error,
168
        message: "Error occured while dropping partitions for {OutboxItem}"
169
    )]
170
    private partial void DropPartitionFailed(Exception e, string outboxItem);
171
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc