• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lucaslorentz / durabletask-extensions / 5835585770

pending completion
5835585770

push

github

lucaslorentz
Add husky and apply some code fixes

2502 of 2502 new or added lines in 91 files covered. (100.0%)

2298 of 2792 relevant lines covered (82.31%)

142.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.67
/src/LLL.DurableTask.EFCore.PostgreSQL/PostgreOrchestrationDbContextExtensions.cs
1
using System;
2
using System.Data;
3
using System.Linq;
4
using System.Threading.Tasks;
5
using LLL.DurableTask.EFCore.Entities;
6
using Microsoft.EntityFrameworkCore;
7

8
namespace LLL.DurableTask.EFCore.PostgreSQL;
9

10
public class PostgreOrchestrationDbContextExtensions : OrchestrationDbContextExtensions
11
{
12
    private const IsolationLevel TransactionIsolationLevel = IsolationLevel.ReadCommitted;
13

14
    public override async Task Migrate(OrchestrationDbContext dbContext)
15
    {
16
        await dbContext.Database.MigrateAsync();
17✔
17
    }
18

19
    public override async Task WithinTransaction(OrchestrationDbContext dbContext, Func<Task> action)
20
    {
21
        using var transaction = dbContext.Database.BeginTransaction(TransactionIsolationLevel);
231✔
22
        await action();
231✔
23

24
        await transaction.CommitAsync();
230✔
25
    }
26

27
    public override async Task<Instance> LockInstanceForUpdate(OrchestrationDbContext dbContext, string instanceId)
28
    {
29
        return (await dbContext.Instances.FromSqlRaw(@"
31✔
30
                SELECT * FROM ""Instances""
31✔
31
                WHERE ""Instances"".""InstanceId"" = {0}
31✔
32
                FOR UPDATE
31✔
33
            ", instanceId).ToArrayAsync()).FirstOrDefault();
31✔
34
    }
35

36
    public override async Task<Instance> TryLockNextInstanceAsync(
37
        OrchestrationDbContext dbContext,
38
        TimeSpan lockTimeout)
39
    {
40
        var instance = (await dbContext.Instances.FromSqlRaw(@"
5✔
41
                SELECT ""Instances"".* FROM ""OrchestrationMessages""
5✔
42
                    INNER JOIN ""Instances"" ON ""OrchestrationMessages"".""InstanceId"" = ""Instances"".""InstanceId""
5✔
43
                WHERE
5✔
44
                    ""OrchestrationMessages"".""AvailableAt"" <= {0}
5✔
45
                    AND ""Instances"".""LockedUntil"" <= {0}
5✔
46
                ORDER BY ""OrchestrationMessages"".""AvailableAt""
5✔
47
                LIMIT 1
5✔
48
                FOR UPDATE SKIP LOCKED
5✔
49
            ", DateTime.UtcNow).ToArrayAsync()).FirstOrDefault();
5✔
50

51
        if (instance == null)
5✔
52
            return null;
1✔
53

54
        instance.LockId = Guid.NewGuid().ToString();
4✔
55
        instance.LockedUntil = DateTime.UtcNow.Add(lockTimeout);
4✔
56
        await dbContext.SaveChangesAsync();
4✔
57

58
        return instance;
4✔
59
    }
60

61
    public override async Task<Instance> TryLockNextInstanceAsync(
62
        OrchestrationDbContext dbContext,
63
        string[] queues,
64
        TimeSpan lockTimeout)
65
    {
66
        var queuesParams = string.Join(",", queues.Select((_, i) => $"{{{i}}}"));
754✔
67
        var utcNowParam = $"{{{queues.Length}}}";
96✔
68
        var parameters = queues.Cast<object>().Concat(new object[] { DateTime.UtcNow }).ToArray();
96✔
69

70
        var instance = (await dbContext.Instances.FromSqlRaw($@"
96✔
71
                SELECT ""Instances"".* FROM ""OrchestrationMessages""
96✔
72
                    INNER JOIN ""Instances"" ON ""OrchestrationMessages"".""InstanceId"" = ""Instances"".""InstanceId""
96✔
73
                WHERE
96✔
74
                    ""OrchestrationMessages"".""AvailableAt"" <= {utcNowParam}
96✔
75
                    AND ""OrchestrationMessages"".""Queue"" IN ({queuesParams})
96✔
76
                    AND ""Instances"".""LockedUntil"" <= {utcNowParam}
96✔
77
                ORDER BY ""OrchestrationMessages"".""AvailableAt""
96✔
78
                LIMIT 1
96✔
79
                FOR UPDATE SKIP LOCKED
96✔
80
            ", parameters).ToArrayAsync()).FirstOrDefault();
96✔
81

82
        if (instance == null)
96✔
83
            return null;
66✔
84

85
        instance.LockId = Guid.NewGuid().ToString();
30✔
86
        instance.LockedUntil = DateTime.UtcNow.Add(lockTimeout);
30✔
87
        await dbContext.SaveChangesAsync();
30✔
88

89
        return instance;
30✔
90
    }
91

92
    public override async Task<ActivityMessage> TryLockNextActivityMessageAsync(
93
        OrchestrationDbContext dbContext,
94
        TimeSpan lockTimeout)
95
    {
96
        var instance = (await dbContext.ActivityMessages.FromSqlRaw(@"
×
97
                SELECT * FROM ""ActivityMessages""
×
98
                WHERE ""LockedUntil"" <= {0}
×
99
                ORDER BY ""LockedUntil""
×
100
                LIMIT 1
×
101
                FOR UPDATE SKIP LOCKED
×
102
            ", DateTime.UtcNow).ToArrayAsync()).FirstOrDefault();
×
103

104
        if (instance == null)
×
105
            return null;
×
106

107
        instance.LockId = Guid.NewGuid().ToString();
×
108
        instance.LockedUntil = DateTime.UtcNow.Add(lockTimeout);
×
109
        await dbContext.SaveChangesAsync();
×
110

111
        return instance;
×
112
    }
113

114
    public override async Task<ActivityMessage> TryLockNextActivityMessageAsync(
115
        OrchestrationDbContext dbContext,
116
        string[] queues,
117
        TimeSpan lockTimeout)
118
    {
119
        var queuesParams = string.Join(",", queues.Select((_, i) => $"{{{i}}}"));
336✔
120
        var utcNowParam = $"{{{queues.Length}}}";
84✔
121
        var parameters = queues.Cast<object>().Concat(new object[] { DateTime.UtcNow }).ToArray();
84✔
122

123
        var instance = (await dbContext.ActivityMessages.FromSqlRaw($@"
84✔
124
                SELECT * FROM ""ActivityMessages""
84✔
125
                WHERE ""Queue"" IN ({queuesParams})
84✔
126
                    AND ""LockedUntil"" <= {utcNowParam}
84✔
127
                ORDER BY ""LockedUntil""
84✔
128
                LIMIT 1
84✔
129
                FOR UPDATE SKIP LOCKED
84✔
130
            ", parameters).ToArrayAsync()).FirstOrDefault();
84✔
131

132
        if (instance == null)
84✔
133
            return null;
66✔
134

135
        instance.LockId = Guid.NewGuid().ToString();
18✔
136
        instance.LockedUntil = DateTime.UtcNow.Add(lockTimeout);
18✔
137
        await dbContext.SaveChangesAsync();
18✔
138

139
        return instance;
18✔
140
    }
141
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc