• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lucaslorentz / durabletask-extensions / 5835662074

pending completion
5835662074

Pull #28

github

lucaslorentz
Add husky and apply some code fixes
Pull Request #28: Add husky and apply code fixes

2502 of 2502 new or added lines in 91 files covered. (100.0%)

2297 of 2792 relevant lines covered (82.27%)

141.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.48
/src/LLL.DurableTask.EFCore/EFCoreOrchestrationServiceClient.cs
1
using System;
2
using System.Collections.Generic;
3
using System.Linq;
4
using System.Threading;
5
using System.Threading.Tasks;
6
using DurableTask.Core;
7
using DurableTask.Core.Exceptions;
8
using DurableTask.Core.History;
9
using DurableTask.Core.Query;
10
using LLL.DurableTask.Core;
11
using LLL.DurableTask.EFCore.Extensions;
12
using LLL.DurableTask.EFCore.Mappers;
13
using LLL.DurableTask.EFCore.Polling;
14
using Microsoft.EntityFrameworkCore;
15

16
namespace LLL.DurableTask.EFCore;
17

18
public partial class EFCoreOrchestrationService :
19
    IOrchestrationServiceClient,
20
    IOrchestrationServiceFeaturesClient,
21
    IOrchestrationServiceQueryClient,
22
    IOrchestrationServicePurgeClient,
23
    IOrchestrationServiceRewindClient
24
{
25
    public Task CreateTaskOrchestrationAsync(TaskMessage creationMessage)
26
    {
27
        return CreateTaskOrchestrationAsync(creationMessage, null);
×
28
    }
29

30
    public async Task CreateTaskOrchestrationAsync(TaskMessage creationMessage, OrchestrationStatus[] dedupeStatuses)
31
    {
32
        using var dbContext = _dbContextFactory.CreateDbContext();
186✔
33
        var executionStartedEvent = creationMessage.Event as ExecutionStartedEvent;
186✔
34

35
        var instanceId = creationMessage.OrchestrationInstance.InstanceId;
186✔
36
        var executionId = creationMessage.OrchestrationInstance.ExecutionId;
186✔
37

38
        var instance = await _dbContextExtensions.LockInstanceForUpdate(dbContext, instanceId);
186✔
39

40
        if (instance != null)
186✔
41
        {
42
            var lastExecution = await dbContext.Executions.FindAsync(instance.LastExecutionId);
×
43

44
            // Dedupe dedupeStatuses silently
45
            if (dedupeStatuses != null && dedupeStatuses.Contains(lastExecution.Status))
×
46
                return;
×
47

48
            // Otherwise, dedupe to avoid multile runnning instances
49
            if (!IsFinalInstanceStatus(lastExecution.Status))
×
50
                throw new OrchestrationAlreadyExistsException("Orchestration already has a running execution");
×
51
        }
52

53
        var runtimeState = new OrchestrationRuntimeState(new[] { executionStartedEvent });
186✔
54

55
        if (instance == null)
186✔
56
        {
57
            instance = _instanceMapper.CreateInstance(executionStartedEvent);
186✔
58
            await dbContext.Instances.AddAsync(instance);
186✔
59
        }
60
        else
61
        {
62
            _instanceMapper.UpdateInstance(instance, runtimeState);
×
63
        }
64

65
        var execution = _executionMapper.CreateExecution(runtimeState);
186✔
66
        await dbContext.Executions.AddAsync(execution);
186✔
67

68
        var knownQueues = new Dictionary<string, string>
186✔
69
        {
186✔
70
            [instance.InstanceId] = QueueMapper.ToQueue(runtimeState.Name, runtimeState.Version)
186✔
71
        };
186✔
72

73
        await SendTaskOrchestrationMessagesAsync(dbContext, new[] { creationMessage }, knownQueues);
186✔
74

75
        await dbContext.SaveChangesAsync();
186✔
76
    }
77

78
    public Task ForceTerminateTaskOrchestrationAsync(string instanceId, string reason)
79
    {
80
        var taskMessage = new TaskMessage
1✔
81
        {
1✔
82
            OrchestrationInstance = new OrchestrationInstance { InstanceId = instanceId },
1✔
83
            Event = new ExecutionTerminatedEvent(-1, reason)
1✔
84
        };
1✔
85

86
        return SendTaskOrchestrationMessageAsync(taskMessage);
1✔
87
    }
88

89
    public async Task<string> GetOrchestrationHistoryAsync(string instanceId, string executionId)
90
    {
91
        using var dbContext = _dbContextFactory.CreateDbContext();
×
92
        var events = await dbContext.Events
×
93
            .Where(e => e.ExecutionId == executionId)
×
94
            .OrderBy(e => e.SequenceNumber)
×
95
            .ToArrayAsync();
×
96

97
        return $"[{string.Join(",", events.Select(e => e.Content))}]";
×
98
    }
99

100
    public async Task<IList<OrchestrationState>> GetOrchestrationStateAsync(string instanceId, bool allExecutions)
101
    {
102
        var state = await GetOrchestrationStateAsync(instanceId, null);
14✔
103
        if (state == null)
14✔
104
            return Array.Empty<OrchestrationState>();
6✔
105

106
        return new[] { state };
8✔
107
    }
108

109
    public async Task<OrchestrationState> GetOrchestrationStateAsync(string instanceId, string executionId)
110
    {
111
        using var dbContext = _dbContextFactory.CreateDbContext();
417✔
112
        var instance = await dbContext.Executions
417✔
113
            .Where(e => e.InstanceId == instanceId && (executionId == null || e.ExecutionId == executionId))
417✔
114
            .OrderByDescending(e => e.CreatedTime)
417✔
115
            .FirstOrDefaultAsync();
417✔
116

117
        if (instance == null)
417✔
118
            return null;
6✔
119

120
        return _executionMapper.MapToState(instance);
411✔
121
    }
122

123
    public async Task PurgeOrchestrationHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType)
124
    {
125
        using var dbContext = _dbContextFactory.CreateDbContext();
24✔
126
        await _dbContextExtensions.PurgeOrchestrationHistoryAsync(dbContext, thresholdDateTimeUtc, timeRangeFilterType);
24✔
127
    }
128

129
    public async Task<PurgeResult> PurgeInstanceStateAsync(string instanceId)
130
    {
131
        using var dbContext = _dbContextFactory.CreateDbContext();
6✔
132
        var deletedRows = await _dbContextExtensions.PurgeInstanceHistoryAsync(dbContext, instanceId);
6✔
133

134
        return new PurgeResult(deletedRows);
6✔
135
    }
136

137
    public async Task<PurgeResult> PurgeInstanceStateAsync(PurgeInstanceFilter purgeInstanceFilter)
138
    {
139
        using var dbContext = _dbContextFactory.CreateDbContext();
×
140
        var deletedRows = await _dbContextExtensions.PurgeInstanceHistoryAsync(dbContext, purgeInstanceFilter);
×
141

142
        return new PurgeResult(deletedRows);
×
143
    }
144

145
    public Task SendTaskOrchestrationMessageAsync(TaskMessage message)
146
    {
147
        return SendTaskOrchestrationMessageBatchAsync(message);
24✔
148
    }
149

150
    public async Task SendTaskOrchestrationMessageBatchAsync(params TaskMessage[] messages)
151
    {
152
        using var dbContext = _dbContextFactory.CreateDbContext();
26✔
153
        await SendTaskOrchestrationMessagesAsync(dbContext, messages);
26✔
154

155
        await dbContext.SaveChangesAsync();
26✔
156
    }
157

158
    public async Task<OrchestrationState> WaitForOrchestrationAsync(
159
        string instanceId,
160
        string executionId,
161
        TimeSpan timeout,
162
        CancellationToken cancellationToken)
163
    {
164
        if (string.IsNullOrWhiteSpace(instanceId))
137✔
165
        {
166
            throw new ArgumentException(nameof(instanceId));
×
167
        }
168

169
        var stoppableCancellationToken = CancellationTokenSource.CreateLinkedTokenSource(
137✔
170
            cancellationToken, _stopCts.Token).Token;
137✔
171

172
        var state = await BackoffPollingHelper.PollAsync(async () =>
137✔
173
        {
137✔
174
            return await GetOrchestrationStateAsync(instanceId, executionId);
534✔
175
        },
137✔
176
        s => IsFinalExecutionStatus(s.OrchestrationStatus),
534✔
177
        timeout,
137✔
178
        _options.PollingInterval,
137✔
179
        stoppableCancellationToken);
137✔
180

181
        if (!IsFinalExecutionStatus(state.OrchestrationStatus))
137✔
182
            return null;
×
183

184
        return state;
137✔
185
    }
186

187
    public Task<OrchestrationFeature[]> GetFeatures()
188
    {
189
        return Task.FromResult(new OrchestrationFeature[]
1✔
190
        {
1✔
191
            OrchestrationFeature.SearchByInstanceId,
1✔
192
            OrchestrationFeature.SearchByName,
1✔
193
            OrchestrationFeature.SearchByCreatedTime,
1✔
194
            OrchestrationFeature.SearchByStatus,
1✔
195
            OrchestrationFeature.Rewind,
1✔
196
            OrchestrationFeature.Tags,
1✔
197
            OrchestrationFeature.StatePerExecution
1✔
198
        });
1✔
199
    }
200

201
    public async Task<OrchestrationQueryResult> GetOrchestrationWithQueryAsync(OrchestrationQuery query, CancellationToken cancellationToken)
202
    {
203
        using var dbContext = _dbContextFactory.CreateDbContext();
7✔
204
        var instances = await _dbContextExtensions.CreateFilteredQueryable(dbContext, query)
7✔
205
            .OrderByDescending(x => x.CreatedTime)
7✔
206
            .ThenByDescending(x => x.InstanceId)
7✔
207
            .Take(query.PageSize + 1)
7✔
208
            .ToArrayAsync();
7✔
209

210
        var pageInstances = instances
7✔
211
            .Take(query.PageSize)
7✔
212
            .ToArray();
7✔
213

214
        var mappedPageInstances = pageInstances
7✔
215
            .Select(_executionMapper.MapToState)
7✔
216
            .ToArray();
7✔
217

218
        var newContinuationToken = instances.Length > pageInstances.Length
7✔
219
            ? new EFCoreContinuationToken
7✔
220
            {
7✔
221
                CreatedTime = pageInstances.Last().CreatedTime,
7✔
222
                InstanceId = pageInstances.Last().InstanceId,
7✔
223
            }.Serialize()
7✔
224
            : null;
7✔
225

226
        return new OrchestrationQueryResult(mappedPageInstances, newContinuationToken);
7✔
227
    }
228

229
    public async Task RewindTaskOrchestrationAsync(string instanceId, string reason)
230
    {
231
        using var dbContext = _dbContextFactory.CreateDbContext();
1✔
232
        await RewindInstanceAsync(dbContext, instanceId, reason, true, FindLastErrorOrCompletionRewindPoint);
1✔
233
        await dbContext.SaveChangesAsync();
1✔
234
    }
235

236
    private async Task RewindInstanceAsync(OrchestrationDbContext dbContext, string instanceId, string reason, bool rewindParents, Func<IList<HistoryEvent>, HistoryEvent> findRewindPoint)
237
    {
238
        var instance = await _dbContextExtensions.LockInstanceForUpdate(dbContext, instanceId);
1✔
239

240
        var execution = await dbContext.Executions
1✔
241
            .Where(e => e.ExecutionId == instance.LastExecutionId)
1✔
242
            .Include(e => e.Events)
1✔
243
            .SingleAsync();
1✔
244

245
        var deserializedEvents = execution.Events
1✔
246
            .OrderBy(e => e.SequenceNumber)
1✔
247
            .Select(e => _options.DataConverter.Deserialize<HistoryEvent>(e.Content))
1✔
248
            .ToArray();
1✔
249

250
        var rewindPoint = findRewindPoint(deserializedEvents);
1✔
251
        if (rewindPoint == null)
1✔
252
        {
253
            return;
1✔
254
        }
255

256
        var rewindResult = deserializedEvents.Rewind(rewindPoint, reason, _options.DataConverter);
×
257

258
        // Rewind suborchestrations
259
        foreach (var instanceIdToRewind in rewindResult.SubOrchestrationsInstancesToRewind)
×
260
        {
261
            await RewindInstanceAsync(dbContext, instanceIdToRewind, reason, false, FindLastErrorOrCompletionRewindPoint);
×
262
        }
263

264
        var orchestrationQueueName = QueueMapper.ToQueue(rewindResult.NewRuntimeState.Name, rewindResult.NewRuntimeState.Version);
×
265

266
        // Write activity messages
267
        var activityMessages = rewindResult.OutboundMessages
×
268
            .Select(m => _activityMessageMapper.CreateActivityMessage(m, orchestrationQueueName))
×
269
            .ToArray();
×
270

271
        await dbContext.ActivityMessages.AddRangeAsync(activityMessages);
×
272

273
        // Write orchestration messages
274
        var knownQueues = new Dictionary<string, string>
×
275
        {
×
276
            [rewindResult.NewRuntimeState.OrchestrationInstance.InstanceId] = orchestrationQueueName
×
277
        };
×
278

279
        if (rewindResult.NewRuntimeState.ParentInstance != null)
×
280
            knownQueues[rewindResult.NewRuntimeState.ParentInstance.OrchestrationInstance.InstanceId] = QueueMapper.ToQueue(rewindResult.NewRuntimeState.ParentInstance.Name, rewindResult.NewRuntimeState.ParentInstance.Version);
×
281

282
        var allOrchestrationMessages = rewindResult.OrchestratorMessages
×
283
            .Concat(rewindResult.TimerMessages)
×
284
            .ToArray();
×
285

286
        await SendTaskOrchestrationMessagesAsync(dbContext, allOrchestrationMessages, knownQueues);
×
287

288
        // Update instance
289
        _instanceMapper.UpdateInstance(instance, rewindResult.NewRuntimeState);
×
290

291
        // Update current execution
292
        execution = await SaveExecutionAsync(dbContext, rewindResult.NewRuntimeState, execution);
×
293

294
        // Rewind parents
295
        if (rewindParents && rewindResult.NewRuntimeState.ParentInstance != null)
×
296
        {
297
            await RewindInstanceAsync(dbContext, rewindResult.NewRuntimeState.ParentInstance.OrchestrationInstance.InstanceId, reason, true, historyEvents =>
×
298
            {
×
299
                return historyEvents
×
300
                    .OfType<SubOrchestrationInstanceCompletedEvent>()
×
301
                    .FirstOrDefault(h => h.TaskScheduledId == rewindResult.NewRuntimeState.ParentInstance.TaskScheduleId);
×
302
            });
×
303
        }
304
    }
305

306
    private HistoryEvent FindLastErrorOrCompletionRewindPoint(IList<HistoryEvent> historyEvents)
307
    {
308
        var completedEvent = historyEvents.OfType<ExecutionCompletedEvent>().FirstOrDefault();
1✔
309
        if (completedEvent != null)
1✔
310
        {
311
            if (completedEvent.OrchestrationStatus == OrchestrationStatus.Failed)
×
312
            {
313
                // For failed executions, rewind to last failed task or suborchestration
314
                var lastFailedTask = historyEvents.LastOrDefault(h => h is TaskFailedEvent || h is SubOrchestrationInstanceFailedEvent);
×
315
                if (lastFailedTask != null)
×
316
                    return lastFailedTask;
×
317
            }
318

319
            // Fallback to just reopenning orchestration, because error could have happened inside orchestrator function
320
            return completedEvent;
×
321
        }
322

323
        // For terminated executions, only rewing the termination
324
        var terminatedEvent = historyEvents.OfType<ExecutionTerminatedEvent>().FirstOrDefault();
1✔
325
        if (terminatedEvent != null)
1✔
326
            return terminatedEvent;
×
327

328
        return null;
1✔
329
    }
330

331
    private bool IsFinalInstanceStatus(OrchestrationStatus status)
332
    {
333
        return IsFinalExecutionStatus(status) &&
×
334
            status != OrchestrationStatus.ContinuedAsNew;
×
335
    }
336

337
    private bool IsFinalExecutionStatus(OrchestrationStatus status)
338
    {
339
        return status != OrchestrationStatus.Running &&
534✔
340
            status != OrchestrationStatus.Pending;
534✔
341
    }
342
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc