• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mt89vein / Sstv.Outbox / 10439138780

18 Aug 2024 08:23AM UTC coverage: 67.144% (+0.06%) from 67.084%
10439138780

push

github

mt89vein
feat: outbox with autopartitioning

199 of 376 branches covered (52.93%)

Branch coverage included in aggregate %.

256 of 367 new or added lines in 18 files covered. (69.75%)

4 existing lines in 2 files now uncovered.

833 of 1161 relevant lines covered (71.75%)

69.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.16
/src/Sstv.Outbox.EntityFrameworkCore.Npgsql/Partitioner.cs
1
using System.Diagnostics.CodeAnalysis;
2
using Microsoft.EntityFrameworkCore;
3
using Microsoft.Extensions.Logging;
4
using Microsoft.Extensions.Options;
5
using Npgsql;
6
using Sstv.Outbox.Features.Partitions;
7

8
namespace Sstv.Outbox.EntityFrameworkCore.Npgsql;
9

10
/// <summary>
11
/// Table partitioner.
12
/// </summary>
13
public sealed partial class Partitioner<TDbContext, TOutboxItem> : IPartitioner<TOutboxItem>
14
    where TDbContext : DbContext
15
    where TOutboxItem : class, IOutboxItem
16
{
17
    /// <summary>
18
    /// Options.
19
    /// </summary>
20
    private readonly OutboxOptions _options;
21

22
    /// <summary>
23
    /// DbContext.
24
    /// </summary>
25
    private readonly TDbContext _dbContext;
26

27
    /// <summary>
28
    /// Logger.
29
    /// </summary>
30
    private readonly ILogger<Partitioner<TDbContext, TOutboxItem>> _logger;
31

32
    /// <summary>
33
    /// Creates new instance of <see cref="Partitioner{TDbContext,TOutboxItem}"/>.
34
    /// </summary>
35
    /// <param name="dbContext">DbContext.</param>
36
    /// <param name="options">Options.</param>
37
    /// <param name="logger">Logger.</param>
38
    public Partitioner(
16✔
39
        TDbContext dbContext,
16✔
40
        IOptionsMonitor<OutboxOptions> options,
16✔
41
        ILogger<Partitioner<TDbContext, TOutboxItem>> logger
16✔
42
    )
16✔
43
    {
44
        ArgumentNullException.ThrowIfNull(dbContext);
16✔
45
        ArgumentNullException.ThrowIfNull(options);
16✔
46

47
        _options = options.Get(typeof(TOutboxItem).Name);
16✔
48
        _dbContext = dbContext;
16✔
49
        _logger = logger;
16✔
50
    }
16✔
51

52
    /// <summary>
53
    /// Precreates partitions for entity.
54
    /// </summary>
55
    /// <param name="ct">Token for cancel operation.</param>
56
    [SuppressMessage("Security", "EF1002:Risk of vulnerability to SQL injection.",
57
        Justification = "There is no user input. FromSqlInterpolated incorrectly sets table name")]
58
    public async Task CreatePartitionsAsync(CancellationToken ct = default)
59
    {
60
        try
61
        {
62
            var m = _options.GetDbMapping();
16✔
63

64
            await _dbContext.Database.BeginTransactionAsync(ct);
16✔
65

66
            foreach (var partition in _options.PartitionSettings.GetPartitions(m.TableName, DateTimeOffset.UtcNow.AddDays(-5)))
256✔
67
            {
68
                var from = _options.PartitionSettings.UuidV7Generator.ForDate(partition.DateFrom.UtcDateTime);
112✔
69
                var to = _options.PartitionSettings.UuidV7Generator.ForDate(partition.DateTo.UtcDateTime);
112✔
70

71
                var sql = $"""
112✔
72
                           CREATE TABLE IF NOT EXISTS {m.SchemaName}.{partition.PartitionTableName} PARTITION OF {m.QualifiedTableName} FOR values
112✔
73
                           FROM (overlay('{from}'::text placing '0000-0000-000000000000' from 15)::uuid)
112✔
74
                           TO   (overlay('{to}'::text placing '0000-0000-000000000000' from 15)::uuid)
112✔
75
                           WITH (fillfactor = 90);
112✔
76
                           """;
112✔
77
                await _dbContext.Database.ExecuteSqlRawAsync(sql, ct);
112✔
78
            }
79

80
            await _dbContext.Database.CommitTransactionAsync(ct);
16✔
81
        }
16✔
NEW
82
        catch (PostgresException e) when (e.Message.Contains("is not partitioned"))
×
83
        {
NEW
84
            PartitioningNotConfigured(e, typeof(TOutboxItem).Name);
×
85

NEW
86
            throw;
×
87
        }
NEW
88
        catch (Exception e)
×
89
        {
NEW
90
            CreatePartitionFailed(
×
NEW
91
                e,
×
NEW
92
                outboxItem: typeof(TOutboxItem).Name,
×
NEW
93
                dbContext: typeof(TDbContext).Name
×
NEW
94
            );
×
95

NEW
96
            throw;
×
97
        }
98
    }
16✔
99

100
    /// <summary>
101
    /// Removes old partitions for <typeparamref name="TOutboxItem"/>.
102
    /// </summary>
103
    /// <param name="ct">Token for cancel operation.</param>
104
    public async Task DeleteOldPartitionsAsync(CancellationToken ct = default)
105
    {
106
        try
107
        {
108
            var m = _options.GetDbMapping();
16✔
109

110
            var partitionsForDelete = _options
16✔
111
                .PartitionSettings
16✔
112
                .GetPartitions(
16✔
113
                    m.TableName,
16✔
114
                    startFrom: DateTimeOffset.UtcNow.AddDays(-_options.PartitionSettings.PrecreatePartitionCount))
16✔
115
                .Reverse()
16✔
116
                .Skip(_options.PartitionSettings.PartitionRetentionCount);
16✔
117

118
            foreach (var partition in partitionsForDelete)
159✔
119
            {
120
                try
121
                {
122
                    var sql = $"ALTER TABLE {m.QualifiedTableName} DETACH PARTITION {m.SchemaName}.{partition.PartitionTableName} CONCURRENTLY;";
64✔
123
                    await _dbContext.Database.ExecuteSqlRawAsync(sql, ct);
64✔
124

125
                    sql = $"DROP TABLE {m.SchemaName}.{partition.PartitionTableName};";
32✔
126
                    await _dbContext.Database.ExecuteSqlRawAsync(sql, ct);
32✔
127
                }
32✔
128
                catch (PostgresException e) when (e.Message.Contains("does not exist"))
31✔
129
                {
130
                    continue;
31✔
131
                }
132
            }
32✔
133
        }
15✔
NEW
134
        catch (PostgresException e) when (e.Message.Contains("is not partitioned"))
×
135
        {
NEW
136
            PartitioningNotConfigured(e, typeof(TOutboxItem).Name);
×
137

NEW
138
            throw;
×
139
        }
140
        catch (Exception e)
1✔
141
        {
142
            DropPartitionFailed(
1✔
143
                e,
1✔
144
                outboxItem: typeof(TOutboxItem).Name,
1✔
145
                dbContext: typeof(TDbContext).Name
1✔
146
            );
1✔
147

148
            throw;
1✔
149
        }
150
    }
15✔
151

152
    [LoggerMessage(
153
        eventId: 0,
154
        level: LogLevel.Error,
155
        message: "Partitioning for {OutboxItem} not configured!"
156
    )]
157
    private partial void PartitioningNotConfigured(Exception e, string outboxItem);
158

159
    [LoggerMessage(
160
        eventId: 1,
161
        level: LogLevel.Error,
162
        message: "Error occured while precreating partitions for {OutboxItem} in {DbContext}"
163
    )]
164
    private partial void CreatePartitionFailed(Exception e, string outboxItem, string dbContext);
165

166
    [LoggerMessage(
167
        eventId: 2,
168
        level: LogLevel.Error,
169
        message: "Error occured while dropping partitions for {OutboxItem} in {DbContext}"
170
    )]
171
    private partial void DropPartitionFailed(Exception e, string outboxItem, string dbContext);
172
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc