• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lucaslorentz / durabletask-extensions / 5835751495

pending completion
5835751495

push

github

lucaslorentz
Add husky and apply some code fixes

2502 of 2502 new or added lines in 91 files covered. (100.0%)

2286 of 2792 relevant lines covered (81.88%)

143.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.67
/src/LLL.DurableTask.EFCore.PostgreSQL/PostgreOrchestrationDbContextExtensions.cs
1
using System;
2
using System.Data;
3
using System.Linq;
4
using System.Threading.Tasks;
5
using LLL.DurableTask.EFCore.Entities;
6
using Microsoft.EntityFrameworkCore;
7

8
namespace LLL.DurableTask.EFCore.PostgreSQL;
9

10
public class PostgreOrchestrationDbContextExtensions : OrchestrationDbContextExtensions
11
{
12
    private const IsolationLevel TransactionIsolationLevel = IsolationLevel.ReadCommitted;
13

14
    public override async Task Migrate(OrchestrationDbContext dbContext)
15
    {
16
        await dbContext.Database.MigrateAsync();
17✔
17
    }
18

19
    public override async Task WithinTransaction(OrchestrationDbContext dbContext, Func<Task> action)
20
    {
21
        using var transaction = dbContext.Database.BeginTransaction(TransactionIsolationLevel);
226✔
22
        await action();
226✔
23

24
        await transaction.CommitAsync();
226✔
25
    }
26

27
    public override async Task<Instance> LockInstanceForUpdate(OrchestrationDbContext dbContext, string instanceId)
28
    {
29
        return (await dbContext.Instances.FromSqlRaw(@"
31✔
30
                SELECT * FROM ""Instances""
31✔
31
                WHERE ""Instances"".""InstanceId"" = {0}
31✔
32
                FOR UPDATE
31✔
33
            ", instanceId).ToArrayAsync()).FirstOrDefault();
31✔
34
    }
35

36
    public override async Task<Instance> TryLockNextInstanceAsync(
37
        OrchestrationDbContext dbContext,
38
        TimeSpan lockTimeout)
39
    {
40
        var instance = (await dbContext.Instances.FromSqlRaw(@"
5✔
41
                SELECT ""Instances"".* FROM ""OrchestrationMessages""
5✔
42
                    INNER JOIN ""Instances"" ON ""OrchestrationMessages"".""InstanceId"" = ""Instances"".""InstanceId""
5✔
43
                WHERE
5✔
44
                    ""OrchestrationMessages"".""AvailableAt"" <= {0}
5✔
45
                    AND ""Instances"".""LockedUntil"" <= {0}
5✔
46
                ORDER BY ""OrchestrationMessages"".""AvailableAt""
5✔
47
                LIMIT 1
5✔
48
                FOR UPDATE SKIP LOCKED
5✔
49
            ", DateTime.UtcNow).ToArrayAsync()).FirstOrDefault();
5✔
50

51
        if (instance == null)
5✔
52
            return null;
1✔
53

54
        instance.LockId = Guid.NewGuid().ToString();
4✔
55
        instance.LockedUntil = DateTime.UtcNow.Add(lockTimeout);
4✔
56
        await dbContext.SaveChangesAsync();
4✔
57

58
        return instance;
4✔
59
    }
60

61
    public override async Task<Instance> TryLockNextInstanceAsync(
62
        OrchestrationDbContext dbContext,
63
        string[] queues,
64
        TimeSpan lockTimeout)
65
    {
66
        var queuesParams = string.Join(",", queues.Select((_, i) => $"{{{i}}}"));
718✔
67
        var utcNowParam = $"{{{queues.Length}}}";
92✔
68
        var parameters = queues.Cast<object>().Concat(new object[] { DateTime.UtcNow }).ToArray();
92✔
69

70
        var instance = (await dbContext.Instances.FromSqlRaw($@"
92✔
71
                SELECT ""Instances"".* FROM ""OrchestrationMessages""
92✔
72
                    INNER JOIN ""Instances"" ON ""OrchestrationMessages"".""InstanceId"" = ""Instances"".""InstanceId""
92✔
73
                WHERE
92✔
74
                    ""OrchestrationMessages"".""AvailableAt"" <= {utcNowParam}
92✔
75
                    AND ""OrchestrationMessages"".""Queue"" IN ({queuesParams})
92✔
76
                    AND ""Instances"".""LockedUntil"" <= {utcNowParam}
92✔
77
                ORDER BY ""OrchestrationMessages"".""AvailableAt""
92✔
78
                LIMIT 1
92✔
79
                FOR UPDATE SKIP LOCKED
92✔
80
            ", parameters).ToArrayAsync()).FirstOrDefault();
92✔
81

82
        if (instance == null)
92✔
83
            return null;
62✔
84

85
        instance.LockId = Guid.NewGuid().ToString();
30✔
86
        instance.LockedUntil = DateTime.UtcNow.Add(lockTimeout);
30✔
87
        await dbContext.SaveChangesAsync();
30✔
88

89
        return instance;
30✔
90
    }
91

92
    public override async Task<ActivityMessage> TryLockNextActivityMessageAsync(
93
        OrchestrationDbContext dbContext,
94
        TimeSpan lockTimeout)
95
    {
96
        var instance = (await dbContext.ActivityMessages.FromSqlRaw(@"
×
97
                SELECT * FROM ""ActivityMessages""
×
98
                WHERE ""LockedUntil"" <= {0}
×
99
                ORDER BY ""LockedUntil""
×
100
                LIMIT 1
×
101
                FOR UPDATE SKIP LOCKED
×
102
            ", DateTime.UtcNow).ToArrayAsync()).FirstOrDefault();
×
103

104
        if (instance == null)
×
105
            return null;
×
106

107
        instance.LockId = Guid.NewGuid().ToString();
×
108
        instance.LockedUntil = DateTime.UtcNow.Add(lockTimeout);
×
109
        await dbContext.SaveChangesAsync();
×
110

111
        return instance;
×
112
    }
113

114
    public override async Task<ActivityMessage> TryLockNextActivityMessageAsync(
115
        OrchestrationDbContext dbContext,
116
        string[] queues,
117
        TimeSpan lockTimeout)
118
    {
119
        var queuesParams = string.Join(",", queues.Select((_, i) => $"{{{i}}}"));
332✔
120
        var utcNowParam = $"{{{queues.Length}}}";
83✔
121
        var parameters = queues.Cast<object>().Concat(new object[] { DateTime.UtcNow }).ToArray();
83✔
122

123
        var instance = (await dbContext.ActivityMessages.FromSqlRaw($@"
83✔
124
                SELECT * FROM ""ActivityMessages""
83✔
125
                WHERE ""Queue"" IN ({queuesParams})
83✔
126
                    AND ""LockedUntil"" <= {utcNowParam}
83✔
127
                ORDER BY ""LockedUntil""
83✔
128
                LIMIT 1
83✔
129
                FOR UPDATE SKIP LOCKED
83✔
130
            ", parameters).ToArrayAsync()).FirstOrDefault();
83✔
131

132
        if (instance == null)
83✔
133
            return null;
65✔
134

135
        instance.LockId = Guid.NewGuid().ToString();
18✔
136
        instance.LockedUntil = DateTime.UtcNow.Add(lockTimeout);
18✔
137
        await dbContext.SaveChangesAsync();
18✔
138

139
        return instance;
18✔
140
    }
141
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc