• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lucaslorentz / durabletask-extensions / 5835751495

pending completion
5835751495

push

github

lucaslorentz
Add husky and apply some code fixes

2502 of 2502 new or added lines in 91 files covered. (100.0%)

2286 of 2792 relevant lines covered (81.88%)

143.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.05
/src/LLL.DurableTask.EFCore.InMemory/InMemoryOrchestrationDbContextExtensions.cs
1
using System;
2
using System.Collections.Concurrent;
3
using System.Collections.Generic;
4
using System.Linq;
5
using System.Threading;
6
using System.Threading.Tasks;
7
using LLL.DurableTask.EFCore.Entities;
8
using Microsoft.EntityFrameworkCore;
9

10
namespace LLL.DurableTask.EFCore.InMemory;
11

12
public class InMemoryOrchestrationDbContextExtensions : OrchestrationDbContextExtensions
13
{
14
    private readonly object _lock = new();
73✔
15
    private readonly ConcurrentDictionary<string, SemaphoreSlim> _instancesSemaphores = new();
73✔
16
    private readonly ConcurrentDictionary<OrchestrationDbContext, HashSet<string>> _lockedInstanes = new();
73✔
17

18
    public override Task Migrate(OrchestrationDbContext dbContext)
19
    {
20
        return Task.CompletedTask;
63✔
21
    }
22

23
    public override async Task WithinTransaction(OrchestrationDbContext dbContext, Func<Task> action)
24
    {
25
        await action();
713✔
26
    }
27

28
    public override async Task<Instance> LockInstanceForUpdate(OrchestrationDbContext dbContext, string instanceId)
29
    {
30
        var instance = dbContext.Instances.Find(instanceId);
94✔
31

32
        if (instance == null)
94✔
33
            return null;
93✔
34

35
        var lockedInstances = _lockedInstanes.GetOrAdd(dbContext, (d) => new HashSet<string>());
2✔
36
        if (!lockedInstances.Add(instanceId))
1✔
37
            return instance;
×
38

39
        var semaphore = _instancesSemaphores.GetOrAdd(instanceId, (_) => new SemaphoreSlim(1));
2✔
40
        await semaphore.WaitAsync();
1✔
41

42
        dbContext.SaveChangesFailed += (o, e) => Unlock();
1✔
43
        dbContext.SavedChanges += (o, e) => Unlock();
2✔
44

45
        return instance;
1✔
46

47
        void Unlock()
48
        {
49
            semaphore.Release();
1✔
50
            lockedInstances.Remove(instanceId);
1✔
51
        }
52
    }
53

54
    public override Task<Instance> TryLockNextInstanceAsync(
55
        OrchestrationDbContext dbContext,
56
        TimeSpan lockTimeout)
57
    {
58
        lock (_lock)
5✔
59
        {
60
            var instance = (
5✔
61
                from b in dbContext.OrchestrationMessages
5✔
62
                where b.AvailableAt <= DateTime.UtcNow
5✔
63
                && b.Instance.LockedUntil <= DateTime.UtcNow
5✔
64
                orderby b.AvailableAt
5✔
65
                select b.Instance
5✔
66
            ).FirstOrDefault();
5✔
67

68
            if (instance == null)
5✔
69
                return Task.FromResult(default(Instance));
1✔
70

71
            instance.LockId = Guid.NewGuid().ToString();
4✔
72
            instance.LockedUntil = DateTime.UtcNow.Add(lockTimeout);
4✔
73
            dbContext.SaveChanges();
4✔
74

75
            return Task.FromResult(instance);
4✔
76
        }
77
    }
78

79
    public override Task<Instance> TryLockNextInstanceAsync(
80
        OrchestrationDbContext dbContext,
81
        string[] queues,
82
        TimeSpan lockTimeout)
83
    {
84
        lock (_lock)
316✔
85
        {
86
            var instance = (
316✔
87
                from b in dbContext.OrchestrationMessages
316✔
88
                where b.AvailableAt <= DateTime.UtcNow
316✔
89
                && queues.Contains(b.Queue)
316✔
90
                && b.Instance.LockedUntil <= DateTime.UtcNow
316✔
91
                orderby b.AvailableAt
316✔
92
                select b.Instance
316✔
93
            ).FirstOrDefault();
316✔
94

95
            if (instance == null)
316✔
96
                return Task.FromResult(default(Instance));
237✔
97

98
            instance.LockId = Guid.NewGuid().ToString();
79✔
99
            instance.LockedUntil = DateTime.UtcNow.Add(lockTimeout);
79✔
100
            dbContext.SaveChanges();
79✔
101

102
            return Task.FromResult(instance);
79✔
103
        }
104
    }
105

106
    public override Task<ActivityMessage> TryLockNextActivityMessageAsync(
107
        OrchestrationDbContext dbContext,
108
        TimeSpan lockTimeout)
109
    {
110
        lock (_lock)
×
111
        {
112
            var activityMessage = (
×
113
                from message in dbContext.ActivityMessages
×
114
                where message.LockedUntil <= DateTime.UtcNow
×
115
                orderby message.LockedUntil
×
116
                select message
×
117
            ).FirstOrDefault();
×
118

119
            if (activityMessage == null)
×
120
                return Task.FromResult(default(ActivityMessage));
×
121

122
            activityMessage.LockId = Guid.NewGuid().ToString();
×
123
            activityMessage.LockedUntil = DateTime.UtcNow.Add(lockTimeout);
×
124
            dbContext.SaveChanges();
×
125

126
            return Task.FromResult(activityMessage);
×
127
        }
128
    }
129

130
    public override Task<ActivityMessage> TryLockNextActivityMessageAsync(
131
        OrchestrationDbContext dbContext,
132
        string[] queues,
133
        TimeSpan lockTimeout)
134
    {
135
        lock (_lock)
228✔
136
        {
137
            var activityMessage = (
228✔
138
                from message in dbContext.ActivityMessages
228✔
139
                where message.LockedUntil <= DateTime.UtcNow
228✔
140
                && queues.Contains(message.Queue)
228✔
141
                orderby message.LockedUntil
228✔
142
                select message
228✔
143
            ).FirstOrDefault();
228✔
144

145
            if (activityMessage == null)
228✔
146
                return Task.FromResult(default(ActivityMessage));
178✔
147

148
            activityMessage.LockId = Guid.NewGuid().ToString();
50✔
149
            activityMessage.LockedUntil = DateTime.UtcNow.Add(lockTimeout);
50✔
150
            dbContext.SaveChanges();
50✔
151

152
            return Task.FromResult(activityMessage);
50✔
153
        }
154
    }
155

156
    protected override async Task<int> ExecuteDeleteAsync<T>(OrchestrationDbContext dbContext, IQueryable<T> query)
157
        where T : class
158
    {
159
        var entities = await query.ToArrayAsync();
9✔
160
        dbContext.Set<T>().RemoveRange(entities);
9✔
161
        await dbContext.SaveChangesAsync();
9✔
162
        return entities.Length;
9✔
163
    }
164
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc