• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lucaslorentz / durabletask-extensions / 16703104561

03 Aug 2025 08:37AM UTC coverage: 83.438%. First build
16703104561

push

github

web-flow
Merge 2719b0f3f into 5c28b612b

86 of 97 new or added lines in 27 files covered. (88.66%)

2403 of 2880 relevant lines covered (83.44%)

141.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.02
/src/LLL.DurableTask.EFCore.PostgreSQL/PostgreOrchestrationDbContextExtensions.cs
1
using System;
2
using System.Data;
3
using System.Linq;
4
using System.Threading.Tasks;
5
using DurableTask.Core;
6
using LLL.DurableTask.Core;
7
using LLL.DurableTask.EFCore.Entities;
8
using Microsoft.EntityFrameworkCore;
9

10
namespace LLL.DurableTask.EFCore.PostgreSQL;
11

12
public class PostgreOrchestrationDbContextExtensions : OrchestrationDbContextExtensions
13
{
14
    private const IsolationLevel TransactionIsolationLevel = IsolationLevel.ReadCommitted;
15

16
    public override async Task Migrate(OrchestrationDbContext dbContext)
17
    {
18
        await dbContext.Database.MigrateAsync();
19✔
19
    }
20

21
    public override async Task WithinTransaction(OrchestrationDbContext dbContext, Func<Task> action)
22
    {
23
        using var transaction = dbContext.Database.BeginTransaction(TransactionIsolationLevel);
243✔
24
        await action();
243✔
25

26
        await transaction.CommitAsync();
243✔
27
    }
28

29
    public override async Task<Instance> LockInstanceForUpdate(OrchestrationDbContext dbContext, string instanceId)
30
    {
31
        return (await dbContext.Instances.FromSqlRaw(@"
33✔
32
                SELECT * FROM ""Instances""
33✔
33
                WHERE ""Instances"".""InstanceId"" = {0}
33✔
34
                FOR UPDATE
33✔
35
            ", instanceId).ToArrayAsync()).FirstOrDefault();
33✔
36
    }
37

38
    public override async Task<Instance> TryLockNextInstanceAsync(
39
        OrchestrationDbContext dbContext,
40
        TimeSpan lockTimeout)
41
    {
42
        var instance = (await dbContext.Instances.FromSqlRaw(@"
5✔
43
                SELECT ""Instances"".* FROM ""OrchestrationMessages""
5✔
44
                    INNER JOIN ""Instances"" ON ""OrchestrationMessages"".""InstanceId"" = ""Instances"".""InstanceId""
5✔
45
                WHERE
5✔
46
                    ""OrchestrationMessages"".""AvailableAt"" <= {0}
5✔
47
                    AND ""Instances"".""LockedUntil"" <= {0}
5✔
48
                ORDER BY ""OrchestrationMessages"".""AvailableAt""
5✔
49
                LIMIT 1
5✔
50
                FOR UPDATE SKIP LOCKED
5✔
51
            ", DateTime.UtcNow).ToArrayAsync()).FirstOrDefault();
5✔
52

53
        if (instance is null)
5✔
54
            return null;
1✔
55

56
        instance.LockId = Guid.NewGuid().ToString();
4✔
57
        instance.LockedUntil = DateTime.UtcNow.Add(lockTimeout);
4✔
58
        await dbContext.SaveChangesAsync();
4✔
59

60
        return instance;
4✔
61
    }
62

63
    public override async Task<Instance> TryLockNextInstanceAsync(
64
        OrchestrationDbContext dbContext,
65
        string[] queues,
66
        TimeSpan lockTimeout)
67
    {
68
        var queuesParams = string.Join(",", queues.Select((_, i) => $"{{{i}}}"));
808✔
69
        var utcNowParam = $"{{{queues.Length}}}";
102✔
70
        var parameters = queues.Cast<object>().Concat(new object[] { DateTime.UtcNow }).ToArray();
102✔
71

72
        var instance = (await dbContext.Instances.FromSqlRaw($@"
102✔
73
                SELECT ""Instances"".* FROM ""OrchestrationMessages""
102✔
74
                    INNER JOIN ""Instances"" ON ""OrchestrationMessages"".""InstanceId"" = ""Instances"".""InstanceId""
102✔
75
                WHERE
102✔
76
                    ""OrchestrationMessages"".""AvailableAt"" <= {utcNowParam}
102✔
77
                    AND ""OrchestrationMessages"".""Queue"" IN ({queuesParams})
102✔
78
                    AND ""Instances"".""LockedUntil"" <= {utcNowParam}
102✔
79
                ORDER BY ""OrchestrationMessages"".""AvailableAt""
102✔
80
                LIMIT 1
102✔
81
                FOR UPDATE SKIP LOCKED
102✔
82
            ", parameters).ToArrayAsync()).FirstOrDefault();
102✔
83

84
        if (instance is null)
102✔
85
            return null;
71✔
86

87
        instance.LockId = Guid.NewGuid().ToString();
31✔
88
        instance.LockedUntil = DateTime.UtcNow.Add(lockTimeout);
31✔
89
        await dbContext.SaveChangesAsync();
31✔
90

91
        return instance;
31✔
92
    }
93

94
    public override async Task<ActivityMessage> TryLockNextActivityMessageAsync(
95
        OrchestrationDbContext dbContext,
96
        TimeSpan lockTimeout)
97
    {
98
        var instance = (await dbContext.ActivityMessages.FromSqlRaw(@"
×
99
                SELECT * FROM ""ActivityMessages""
×
100
                WHERE ""LockedUntil"" <= {0}
×
101
                ORDER BY ""LockedUntil""
×
102
                LIMIT 1
×
103
                FOR UPDATE SKIP LOCKED
×
104
            ", DateTime.UtcNow).ToArrayAsync()).FirstOrDefault();
×
105

NEW
106
        if (instance is null)
×
107
            return null;
×
108

109
        instance.LockId = Guid.NewGuid().ToString();
×
110
        instance.LockedUntil = DateTime.UtcNow.Add(lockTimeout);
×
111
        await dbContext.SaveChangesAsync();
×
112

113
        return instance;
×
114
    }
115

116
    public override async Task<ActivityMessage> TryLockNextActivityMessageAsync(
117
        OrchestrationDbContext dbContext,
118
        string[] queues,
119
        TimeSpan lockTimeout)
120
    {
121
        var queuesParams = string.Join(",", queues.Select((_, i) => $"{{{i}}}"));
356✔
122
        var utcNowParam = $"{{{queues.Length}}}";
89✔
123
        var parameters = queues.Cast<object>().Concat(new object[] { DateTime.UtcNow }).ToArray();
89✔
124

125
        var instance = (await dbContext.ActivityMessages.FromSqlRaw($@"
89✔
126
                SELECT * FROM ""ActivityMessages""
89✔
127
                WHERE ""Queue"" IN ({queuesParams})
89✔
128
                    AND ""LockedUntil"" <= {utcNowParam}
89✔
129
                ORDER BY ""LockedUntil""
89✔
130
                LIMIT 1
89✔
131
                FOR UPDATE SKIP LOCKED
89✔
132
            ", parameters).ToArrayAsync()).FirstOrDefault();
89✔
133

134
        if (instance is null)
89✔
135
            return null;
71✔
136

137
        instance.LockId = Guid.NewGuid().ToString();
18✔
138
        instance.LockedUntil = DateTime.UtcNow.Add(lockTimeout);
18✔
139
        await dbContext.SaveChangesAsync();
18✔
140

141
        return instance;
18✔
142
    }
143

144
    public override async Task<int> PurgeInstanceHistoryAsync(OrchestrationDbContext dbContext, PurgeInstanceFilter filter)
145
    {
146
        var limit = filter is PurgeInstanceFilterExtended filterExtended
1✔
147
            ? filterExtended.Limit
1✔
148
            : null;
1✔
149

150
        var parameters = new ParametersCollection();
1✔
151

152
        return await dbContext.Database.ExecuteSqlRawAsync($@"
1✔
153
            DELETE FROM ""Executions""
1✔
154
            WHERE ""ExecutionId"" IN(
1✔
155
                SELECT ""Executions"".""ExecutionId""
1✔
156
                FROM ""Executions""
1✔
157
                    INNER JOIN ""Instances"" ON ""Executions"".""InstanceId"" = ""Instances"".""InstanceId""
1✔
158
                WHERE ""Executions"".""CreatedTime"" > {parameters.Add(filter.CreatedTimeFrom)}
1✔
159
                {(filter.CreatedTimeTo is not null ? $@"AND ""Executions"".""CreatedTime"" < {parameters.Add(filter.CreatedTimeTo)}" : "")}
1✔
160
                {(filter.RuntimeStatus.Any() ? $@"AND ""Executions"".""Status"" IN ({string.Join(",", filter.RuntimeStatus.Select(s => parameters.Add(s.ToString())))})" : "")}
9✔
161
                ORDER BY ""Executions"".""CreatedTime""
1✔
162
                {(limit is not null ? $"LIMIT {parameters.Add(limit)}" : null)}
1✔
163
                FOR UPDATE SKIP LOCKED
1✔
164
            );
1✔
165
        ", parameters);
1✔
166
    }
167
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc