• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lucaslorentz / durabletask-extensions / 16703104561

03 Aug 2025 08:37AM UTC coverage: 83.438%. First build
16703104561

push

github

web-flow
Merge 2719b0f3f into 5c28b612b

86 of 97 new or added lines in 27 files covered. (88.66%)

2403 of 2880 relevant lines covered (83.44%)

141.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.06
/src/LLL.DurableTask.EFCore.SqlServer/SqlServerOrchestrationDbContextExtensions.cs
1
using System;
2
using System.Data;
3
using System.Linq;
4
using System.Threading.Tasks;
5
using DurableTask.Core;
6
using LLL.DurableTask.Core;
7
using LLL.DurableTask.EFCore.Entities;
8
using Microsoft.EntityFrameworkCore;
9

10
namespace LLL.DurableTask.EFCore.SqlServer;
11

12
public class SqlServerOrchestrationDbContextExtensions : OrchestrationDbContextExtensions
13
{
14
    private const IsolationLevel TransactionIsolationLevel = IsolationLevel.ReadCommitted;
15

16
    public override async Task Migrate(OrchestrationDbContext dbContext)
17
    {
18
        await dbContext.Database.MigrateAsync();
19✔
19
    }
20

21
    public override async Task WithinTransaction(OrchestrationDbContext dbContext, Func<Task> action)
22
    {
23
        using var transaction = dbContext.Database.BeginTransaction(TransactionIsolationLevel);
243✔
24
        await action();
243✔
25

26
        await transaction.CommitAsync();
243✔
27
    }
28

29
    public override async Task<Instance> LockInstanceForUpdate(OrchestrationDbContext dbContext, string instanceId)
30
    {
31
        return (await dbContext.Instances.FromSqlRaw(@"
33✔
32
                SELECT * FROM Instances WITH (UPDLOCK)
33✔
33
                WHERE InstanceId = {0}
33✔
34
            ", instanceId).ToArrayAsync()).FirstOrDefault();
33✔
35
    }
36

37
    public override async Task<Instance> TryLockNextInstanceAsync(
38
        OrchestrationDbContext dbContext,
39
        TimeSpan lockTimeout)
40
    {
41
        var instance = (await dbContext.Instances.FromSqlRaw(@"
5✔
42
                    SELECT TOP 1 Instances.*
5✔
43
                    FROM OrchestrationMessages WITH (INDEX(IX_OrchestrationMessages_AvailableAt_Queue_InstanceId))
5✔
44
                        INNER JOIN Instances WITH (UPDLOCK, READPAST, INDEX(IX_Instances_InstanceId_LockedUntil))
5✔
45
                            ON OrchestrationMessages.InstanceId = Instances.InstanceId
5✔
46
                    WHERE
5✔
47
                        OrchestrationMessages.AvailableAt <= {0}
5✔
48
                        AND Instances.LockedUntil <= {0}
5✔
49
                ", DateTime.UtcNow).ToArrayAsync()).FirstOrDefault();
5✔
50

51
        if (instance is null)
5✔
52
            return null;
1✔
53

54
        instance.LockId = Guid.NewGuid().ToString();
4✔
55
        instance.LockedUntil = DateTime.UtcNow.Add(lockTimeout);
4✔
56
        await dbContext.SaveChangesAsync();
4✔
57

58
        return instance;
4✔
59
    }
60

61
    public override async Task<Instance> TryLockNextInstanceAsync(
62
        OrchestrationDbContext dbContext,
63
        string[] queues,
64
        TimeSpan lockTimeout)
65
    {
66
        var queuesParams = string.Join(",", queues.Select((_, i) => $"{{{i}}}"));
817✔
67
        var utcNowParam = $"{{{queues.Length}}}";
103✔
68
        var parameters = queues.Cast<object>().Concat(new object[] { DateTime.UtcNow }).ToArray();
103✔
69

70
        var instance = (await dbContext.Instances.FromSqlRaw($@"
103✔
71
                    SELECT TOP 1 Instances.*
103✔
72
                    FROM OrchestrationMessages WITH (INDEX(IX_OrchestrationMessages_AvailableAt_Queue_InstanceId))
103✔
73
                        INNER JOIN Instances WITH (UPDLOCK, READPAST, INDEX(IX_Instances_InstanceId_LockedUntil))
103✔
74
                            ON OrchestrationMessages.InstanceId = Instances.InstanceId
103✔
75
                    WHERE
103✔
76
                        OrchestrationMessages.AvailableAt <= {utcNowParam}
103✔
77
                        AND OrchestrationMessages.Queue IN ({queuesParams})
103✔
78
                        AND Instances.LockedUntil <= {utcNowParam}
103✔
79
                ", parameters).ToArrayAsync()).FirstOrDefault();
103✔
80

81
        if (instance is null)
103✔
82
            return null;
71✔
83

84
        instance.LockId = Guid.NewGuid().ToString();
32✔
85
        instance.LockedUntil = DateTime.UtcNow.Add(lockTimeout);
32✔
86
        await dbContext.SaveChangesAsync();
32✔
87

88
        return instance;
32✔
89
    }
90

91
    public override async Task<ActivityMessage> TryLockNextActivityMessageAsync(
92
        OrchestrationDbContext dbContext,
93
        TimeSpan lockTimeout)
94
    {
95
        var instance = (await dbContext.ActivityMessages.FromSqlRaw(@"
×
96
                    SELECT TOP 1 *
×
97
                    FROM ActivityMessages WITH (UPDLOCK, READPAST, INDEX(IX_ActivityMessages_LockedUntil_Queue))
×
98
                    WHERE LockedUntil <= {0}
×
99
                ", DateTime.UtcNow).ToArrayAsync()).FirstOrDefault();
×
100

NEW
101
        if (instance is null)
×
102
            return null;
×
103

104
        instance.LockId = Guid.NewGuid().ToString();
×
105
        instance.LockedUntil = DateTime.UtcNow.Add(lockTimeout);
×
106
        await dbContext.SaveChangesAsync();
×
107

108
        return instance;
×
109
    }
110

111
    public override async Task<ActivityMessage> TryLockNextActivityMessageAsync(
112
        OrchestrationDbContext dbContext,
113
        string[] queues,
114
        TimeSpan lockTimeout)
115
    {
116
        var queuesParams = string.Join(",", queues.Select((_, i) => $"{{{i}}}"));
344✔
117
        var utcNowParam = $"{{{queues.Length}}}";
86✔
118
        var parameters = queues.Cast<object>().Concat(new object[] { DateTime.UtcNow }).ToArray();
86✔
119

120
        var instance = (await dbContext.ActivityMessages.FromSqlRaw($@"
86✔
121
                SELECT TOP 1 *
86✔
122
                FROM ActivityMessages
86✔
123
                WITH (UPDLOCK, READPAST, INDEX(IX_ActivityMessages_LockedUntil_Queue))
86✔
124
                WHERE Queue IN ({queuesParams})
86✔
125
                    AND LockedUntil <= {utcNowParam}
86✔
126
            ", parameters).ToArrayAsync()).FirstOrDefault();
86✔
127

128
        if (instance is null)
86✔
129
            return null;
68✔
130

131
        instance.LockId = Guid.NewGuid().ToString();
18✔
132
        instance.LockedUntil = DateTime.UtcNow.Add(lockTimeout);
18✔
133
        await dbContext.SaveChangesAsync();
18✔
134

135
        return instance;
18✔
136
    }
137

138
    public override async Task<int> PurgeInstanceHistoryAsync(OrchestrationDbContext dbContext, PurgeInstanceFilter filter)
139
    {
140
        var limit = filter is PurgeInstanceFilterExtended filterExtended
1✔
141
            ? filterExtended.Limit
1✔
142
            : null;
1✔
143

144
        var parameters = new ParametersCollection();
1✔
145

146
        return await dbContext.Database.ExecuteSqlRawAsync($@"
1✔
147
            DELETE FROM Executions
1✔
148
            WHERE ExecutionId IN(
1✔
149
                SELECT {(limit is not null ? $"TOP ({parameters.Add(limit)})" : null)} Executions.ExecutionId
1✔
150
                FROM Executions WITH (UPDLOCK, READPAST)
1✔
151
                    INNER JOIN Instances WITH (UPDLOCK, READPAST) ON Executions.InstanceId = Instances.InstanceId
1✔
152
                WHERE Executions.CreatedTime > {parameters.Add(filter.CreatedTimeFrom)}
1✔
153
                {(filter.CreatedTimeTo is not null ? $"AND Executions.CreatedTime < {parameters.Add(filter.CreatedTimeTo)}" : "")}
1✔
154
                {(filter.RuntimeStatus.Any() ? $"AND Executions.Status IN ({string.Join(",", filter.RuntimeStatus.Select(s => parameters.Add(s.ToString())))})" : "")}
9✔
155
                ORDER BY Executions.CreatedTime
1✔
156
            );
1✔
157
        ", parameters);
1✔
158
    }
159
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc