• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lucaslorentz / durabletask-extensions / 16703104561

03 Aug 2025 08:37AM UTC coverage: 83.438%. First build
16703104561

push

github

web-flow
Merge 2719b0f3f into 5c28b612b

86 of 97 new or added lines in 27 files covered. (88.66%)

2403 of 2880 relevant lines covered (83.44%)

141.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.01
/src/LLL.DurableTask.EFCore/EFCoreOrchestrationServiceClient.cs
1
using System;
2
using System.Collections.Generic;
3
using System.Linq;
4
using System.Threading;
5
using System.Threading.Tasks;
6
using DurableTask.Core;
7
using DurableTask.Core.Exceptions;
8
using DurableTask.Core.History;
9
using DurableTask.Core.Query;
10
using LLL.DurableTask.Core;
11
using LLL.DurableTask.EFCore.Extensions;
12
using LLL.DurableTask.EFCore.Mappers;
13
using LLL.DurableTask.EFCore.Polling;
14
using Microsoft.EntityFrameworkCore;
15

16
namespace LLL.DurableTask.EFCore;
17

18
public partial class EFCoreOrchestrationService :
19
    IOrchestrationServiceClient,
20
    IOrchestrationServiceFeaturesClient,
21
    IOrchestrationServiceQueryClient,
22
    IOrchestrationServicePurgeClient,
23
    IOrchestrationServiceRewindClient
24
{
25
    public Task CreateTaskOrchestrationAsync(TaskMessage creationMessage)
26
    {
27
        return CreateTaskOrchestrationAsync(creationMessage, null);
×
28
    }
29

30
    public async Task CreateTaskOrchestrationAsync(TaskMessage creationMessage, OrchestrationStatus[] dedupeStatuses)
31
    {
32
        using var dbContext = _dbContextFactory.CreateDbContext();
196✔
33
        var executionStartedEvent = creationMessage.Event as ExecutionStartedEvent;
196✔
34

35
        var instanceId = creationMessage.OrchestrationInstance.InstanceId;
196✔
36
        var executionId = creationMessage.OrchestrationInstance.ExecutionId;
196✔
37

38
        var instance = await _dbContextExtensions.LockInstanceForUpdate(dbContext, instanceId);
196✔
39

40
        if (instance is not null)
196✔
41
        {
42
            var lastExecution = await dbContext.Executions.FindAsync(instance.LastExecutionId);
×
43

44
            // Dedupe dedupeStatuses silently
NEW
45
            if (dedupeStatuses is not null && dedupeStatuses.Contains(lastExecution.Status))
×
46
                return;
×
47

48
            // Otherwise, dedupe to avoid multile runnning instances
49
            if (!IsFinalInstanceStatus(lastExecution.Status))
×
50
                throw new OrchestrationAlreadyExistsException("Orchestration already has a running execution");
×
51
        }
52

53
        var runtimeState = new OrchestrationRuntimeState(new[] { executionStartedEvent });
196✔
54

55
        if (instance is null)
196✔
56
        {
57
            instance = _instanceMapper.CreateInstance(executionStartedEvent);
196✔
58
            await dbContext.Instances.AddAsync(instance);
196✔
59
        }
60
        else
61
        {
62
            _instanceMapper.UpdateInstance(instance, runtimeState);
×
63
        }
64

65
        var execution = _executionMapper.CreateExecution(runtimeState);
196✔
66
        await dbContext.Executions.AddAsync(execution);
196✔
67

68
        var knownQueues = new Dictionary<string, string>
196✔
69
        {
196✔
70
            [instance.InstanceId] = QueueMapper.ToQueue(runtimeState.Name, runtimeState.Version)
196✔
71
        };
196✔
72

73
        await SendTaskOrchestrationMessagesAsync(dbContext, new[] { creationMessage }, knownQueues);
196✔
74

75
        await dbContext.SaveChangesAsync();
196✔
76
    }
77

78
    public Task ForceTerminateTaskOrchestrationAsync(string instanceId, string reason)
79
    {
80
        var taskMessage = new TaskMessage
6✔
81
        {
6✔
82
            OrchestrationInstance = new OrchestrationInstance { InstanceId = instanceId },
6✔
83
            Event = new ExecutionTerminatedEvent(-1, reason)
6✔
84
        };
6✔
85

86
        return SendTaskOrchestrationMessageAsync(taskMessage);
6✔
87
    }
88

89
    public async Task<string> GetOrchestrationHistoryAsync(string instanceId, string executionId)
90
    {
91
        using var dbContext = _dbContextFactory.CreateDbContext();
5✔
92
        var events = await dbContext.Events
5✔
93
            .Where(e => e.ExecutionId == executionId)
5✔
94
            .OrderBy(e => e.SequenceNumber)
5✔
95
            .ToArrayAsync();
5✔
96

97
        return $"[{string.Join(",", events.Select(e => e.Content))}]";
45✔
98
    }
99

100
    public async Task<IList<OrchestrationState>> GetOrchestrationStateAsync(string instanceId, bool allExecutions)
101
    {
102
        var state = await GetOrchestrationStateAsync(instanceId, null);
24✔
103
        if (state is null)
24✔
104
            return Array.Empty<OrchestrationState>();
11✔
105

106
        return new[] { state };
13✔
107
    }
108

109
    public async Task<OrchestrationState> GetOrchestrationStateAsync(string instanceId, string executionId)
110
    {
111
        using var dbContext = _dbContextFactory.CreateDbContext();
440✔
112
        var instance = await dbContext.Executions
440✔
113
            .Where(e => e.InstanceId == instanceId && (executionId == null || e.ExecutionId == executionId))
440✔
114
            .OrderByDescending(e => e.CreatedTime)
440✔
115
            .FirstOrDefaultAsync();
440✔
116

117
        if (instance is null)
440✔
118
            return null;
11✔
119

120
        return _executionMapper.MapToState(instance);
429✔
121
    }
122

123
    public async Task PurgeOrchestrationHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType)
124
    {
125
        using var dbContext = _dbContextFactory.CreateDbContext();
24✔
126
        await _dbContextExtensions.PurgeOrchestrationHistoryAsync(dbContext, thresholdDateTimeUtc, timeRangeFilterType);
24✔
127
    }
128

129
    public async Task<PurgeResult> PurgeInstanceStateAsync(string instanceId)
130
    {
131
        using var dbContext = _dbContextFactory.CreateDbContext();
6✔
132
        var deletedRows = await _dbContextExtensions.PurgeInstanceHistoryAsync(dbContext, instanceId);
6✔
133

134
        return new PurgeResult(deletedRows);
6✔
135
    }
136

137
    public async Task<PurgeResult> PurgeInstanceStateAsync(PurgeInstanceFilter purgeInstanceFilter)
138
    {
139
        using var dbContext = _dbContextFactory.CreateDbContext();
5✔
140
        var deletedRows = await _dbContextExtensions.PurgeInstanceHistoryAsync(dbContext, purgeInstanceFilter);
5✔
141

142
        return new PurgeResult(deletedRows);
5✔
143
    }
144

145
    public Task SendTaskOrchestrationMessageAsync(TaskMessage message)
146
    {
147
        return SendTaskOrchestrationMessageBatchAsync(message);
29✔
148
    }
149

150
    public async Task SendTaskOrchestrationMessageBatchAsync(params TaskMessage[] messages)
151
    {
152
        using var dbContext = _dbContextFactory.CreateDbContext();
31✔
153
        await SendTaskOrchestrationMessagesAsync(dbContext, messages);
31✔
154

155
        await dbContext.SaveChangesAsync();
31✔
156
    }
157

158
    public async Task<OrchestrationState> WaitForOrchestrationAsync(
159
        string instanceId,
160
        string executionId,
161
        TimeSpan timeout,
162
        CancellationToken cancellationToken)
163
    {
164
        if (string.IsNullOrWhiteSpace(instanceId))
136✔
165
        {
166
            throw new ArgumentException(nameof(instanceId));
×
167
        }
168

169
        var stoppableCancellationToken = CancellationTokenSource.CreateLinkedTokenSource(
136✔
170
            cancellationToken, _stopCts.Token).Token;
136✔
171

172
        var state = await BackoffPollingHelper.PollAsync(async () =>
136✔
173
        {
136✔
174
            return await GetOrchestrationStateAsync(instanceId, executionId);
546✔
175
        },
136✔
176
        s => IsFinalExecutionStatus(s.OrchestrationStatus),
546✔
177
        timeout,
136✔
178
        _options.PollingInterval,
136✔
179
        stoppableCancellationToken);
136✔
180

181
        if (!IsFinalExecutionStatus(state.OrchestrationStatus))
136✔
182
            return null;
×
183

184
        return state;
136✔
185
    }
186

187
    public Task<OrchestrationFeature[]> GetFeatures()
188
    {
189
        return Task.FromResult(new OrchestrationFeature[]
1✔
190
        {
1✔
191
            OrchestrationFeature.SearchByInstanceId,
1✔
192
            OrchestrationFeature.SearchByName,
1✔
193
            OrchestrationFeature.SearchByCreatedTime,
1✔
194
            OrchestrationFeature.SearchByStatus,
1✔
195
            OrchestrationFeature.Rewind,
1✔
196
            OrchestrationFeature.Tags,
1✔
197
            OrchestrationFeature.StatePerExecution
1✔
198
        });
1✔
199
    }
200

201
    public async Task<OrchestrationQueryResult> GetOrchestrationWithQueryAsync(OrchestrationQuery query, CancellationToken cancellationToken)
202
    {
203
        using var dbContext = _dbContextFactory.CreateDbContext();
7✔
204
        var instances = await _dbContextExtensions.CreateFilteredQueryable(dbContext, query)
7✔
205
            .OrderByDescending(x => x.CreatedTime)
7✔
206
            .ThenByDescending(x => x.InstanceId)
7✔
207
            .Take(query.PageSize + 1)
7✔
208
            .ToArrayAsync();
7✔
209

210
        var pageInstances = instances
7✔
211
            .Take(query.PageSize)
7✔
212
            .ToArray();
7✔
213

214
        var mappedPageInstances = pageInstances
7✔
215
            .Select(_executionMapper.MapToState)
7✔
216
            .ToArray();
7✔
217

218
        var newContinuationToken = instances.Length > pageInstances.Length
7✔
219
            ? new EFCoreContinuationToken
7✔
220
            {
7✔
221
                CreatedTime = pageInstances.Last().CreatedTime,
7✔
222
                InstanceId = pageInstances.Last().InstanceId,
7✔
223
            }.Serialize()
7✔
224
            : null;
7✔
225

226
        return new OrchestrationQueryResult(mappedPageInstances, newContinuationToken);
7✔
227
    }
228

229
    public async Task RewindTaskOrchestrationAsync(string instanceId, string reason)
230
    {
231
        using var dbContext = _dbContextFactory.CreateDbContext();
1✔
232
        await RewindInstanceAsync(dbContext, instanceId, reason, true, FindLastErrorOrCompletionRewindPoint);
1✔
233
        await dbContext.SaveChangesAsync();
1✔
234
    }
235

236
    private async Task RewindInstanceAsync(OrchestrationDbContext dbContext, string instanceId, string reason, bool rewindParents, Func<IList<HistoryEvent>, HistoryEvent> findRewindPoint)
237
    {
238
        var instance = await _dbContextExtensions.LockInstanceForUpdate(dbContext, instanceId);
1✔
239

240
        var execution = await dbContext.Executions
1✔
241
            .Where(e => e.ExecutionId == instance.LastExecutionId)
1✔
242
            .Include(e => e.Events)
1✔
243
            .SingleAsync();
1✔
244

245
        var deserializedEvents = execution.Events
1✔
246
            .OrderBy(e => e.SequenceNumber)
1✔
247
            .Select(e => _options.DataConverter.Deserialize<HistoryEvent>(e.Content))
1✔
248
            .ToArray();
1✔
249

250
        var rewindPoint = findRewindPoint(deserializedEvents);
1✔
251
        if (rewindPoint is null)
1✔
252
        {
253
            return;
1✔
254
        }
255

256
        var rewindResult = deserializedEvents.Rewind(rewindPoint, reason, _options.DataConverter);
×
257

258
        // Rewind suborchestrations
259
        foreach (var instanceIdToRewind in rewindResult.SubOrchestrationsInstancesToRewind)
×
260
        {
261
            await RewindInstanceAsync(dbContext, instanceIdToRewind, reason, false, FindLastErrorOrCompletionRewindPoint);
×
262
        }
263

264
        var orchestrationQueueName = QueueMapper.ToQueue(rewindResult.NewRuntimeState.Name, rewindResult.NewRuntimeState.Version);
×
265

266
        // Write activity messages
267
        var activityMessages = rewindResult.OutboundMessages
×
268
            .Select(m => _activityMessageMapper.CreateActivityMessage(m, orchestrationQueueName))
×
269
            .ToArray();
×
270

271
        await dbContext.ActivityMessages.AddRangeAsync(activityMessages);
×
272

273
        // Write orchestration messages
274
        var knownQueues = new Dictionary<string, string>
×
275
        {
×
276
            [rewindResult.NewRuntimeState.OrchestrationInstance.InstanceId] = orchestrationQueueName
×
277
        };
×
278

NEW
279
        if (rewindResult.NewRuntimeState.ParentInstance is not null)
×
280
            knownQueues[rewindResult.NewRuntimeState.ParentInstance.OrchestrationInstance.InstanceId] = QueueMapper.ToQueue(rewindResult.NewRuntimeState.ParentInstance.Name, rewindResult.NewRuntimeState.ParentInstance.Version);
×
281

282
        var allOrchestrationMessages = rewindResult.OrchestratorMessages
×
283
            .Concat(rewindResult.TimerMessages)
×
284
            .ToArray();
×
285

286
        await SendTaskOrchestrationMessagesAsync(dbContext, allOrchestrationMessages, knownQueues);
×
287

288
        // Update instance
289
        _instanceMapper.UpdateInstance(instance, rewindResult.NewRuntimeState);
×
290

291
        // Update current execution
292
        execution = await SaveExecutionAsync(dbContext, rewindResult.NewRuntimeState, execution);
×
293

294
        // Rewind parents
NEW
295
        if (rewindParents && rewindResult.NewRuntimeState.ParentInstance is not null)
×
296
        {
297
            await RewindInstanceAsync(dbContext, rewindResult.NewRuntimeState.ParentInstance.OrchestrationInstance.InstanceId, reason, true, historyEvents =>
×
298
            {
×
299
                return historyEvents
×
300
                    .OfType<SubOrchestrationInstanceCompletedEvent>()
×
301
                    .FirstOrDefault(h => h.TaskScheduledId == rewindResult.NewRuntimeState.ParentInstance.TaskScheduleId);
×
302
            });
×
303
        }
304
    }
305

306
    private HistoryEvent FindLastErrorOrCompletionRewindPoint(IList<HistoryEvent> historyEvents)
307
    {
308
        var completedEvent = historyEvents.OfType<ExecutionCompletedEvent>().FirstOrDefault();
1✔
309
        if (completedEvent is not null)
1✔
310
        {
311
            if (completedEvent.OrchestrationStatus == OrchestrationStatus.Failed)
×
312
            {
313
                // For failed executions, rewind to last failed task or suborchestration
314
                var lastFailedTask = historyEvents.LastOrDefault(h => h is TaskFailedEvent || h is SubOrchestrationInstanceFailedEvent);
×
NEW
315
                if (lastFailedTask is not null)
×
316
                    return lastFailedTask;
×
317
            }
318

319
            // Fallback to just reopenning orchestration, because error could have happened inside orchestrator function
320
            return completedEvent;
×
321
        }
322

323
        // For terminated executions, only rewing the termination
324
        var terminatedEvent = historyEvents.OfType<ExecutionTerminatedEvent>().FirstOrDefault();
1✔
325
        if (terminatedEvent is not null)
1✔
326
            return terminatedEvent;
×
327

328
        return null;
1✔
329
    }
330

331
    private bool IsFinalInstanceStatus(OrchestrationStatus status)
332
    {
333
        return IsFinalExecutionStatus(status) &&
×
334
            status != OrchestrationStatus.ContinuedAsNew;
×
335
    }
336

337
    private bool IsFinalExecutionStatus(OrchestrationStatus status)
338
    {
339
        return status != OrchestrationStatus.Running &&
546✔
340
            status != OrchestrationStatus.Pending;
546✔
341
    }
342
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc