• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lucaslorentz / durabletask-extensions / 19715028186

26 Nov 2025 07:18PM UTC coverage: 78.714%. First build
19715028186

push

github

web-flow
Merge pull request #53 from lucaslorentz/support-dtfx-rewind

Add support for DTFx native rewind

43 of 50 new or added lines in 4 files covered. (86.0%)

2289 of 2908 relevant lines covered (78.71%)

120.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.69
/src/LLL.DurableTask.EFCore/EFCoreOrchestrationServiceClient.cs
1
using System;
2
using System.Collections.Generic;
3
using System.Linq;
4
using System.Threading;
5
using System.Threading.Tasks;
6
using DurableTask.Core;
7
using DurableTask.Core.Exceptions;
8
using DurableTask.Core.History;
9
using DurableTask.Core.Query;
10
using DurableTask.Core.Tracing;
11
using LLL.DurableTask.Core;
12
using LLL.DurableTask.EFCore.Extensions;
13
using LLL.DurableTask.EFCore.Mappers;
14
using LLL.DurableTask.EFCore.Polling;
15
using Microsoft.EntityFrameworkCore;
16

17
namespace LLL.DurableTask.EFCore;
18

19
public partial class EFCoreOrchestrationService :
20
    IOrchestrationServiceClient,
21
    IOrchestrationServiceFeaturesClient,
22
    IOrchestrationServiceQueryClient,
23
    IOrchestrationServicePurgeClient,
24
    IOrchestrationServiceRewindClient
25
{
26
    public Task CreateTaskOrchestrationAsync(TaskMessage creationMessage)
27
    {
28
        return CreateTaskOrchestrationAsync(creationMessage, null);
×
29
    }
30

31
    public async Task CreateTaskOrchestrationAsync(TaskMessage creationMessage, OrchestrationStatus[] dedupeStatuses)
32
    {
33
        using var dbContext = _dbContextFactory.CreateDbContext();
163✔
34
        var executionStartedEvent = creationMessage.Event as ExecutionStartedEvent;
163✔
35

36
        var instanceId = creationMessage.OrchestrationInstance.InstanceId;
163✔
37
        var executionId = creationMessage.OrchestrationInstance.ExecutionId;
163✔
38

39
        var instance = await _dbContextExtensions.LockInstanceForUpdate(dbContext, instanceId);
163✔
40

41
        if (instance is not null)
163✔
42
        {
43
            var lastExecution = await dbContext.Executions.FindAsync(instance.LastExecutionId);
×
44

45
            // Dedupe dedupeStatuses silently
46
            if (dedupeStatuses is not null && dedupeStatuses.Contains(lastExecution.Status))
×
47
                return;
×
48

49
            // Otherwise, dedupe to avoid multile runnning instances
50
            if (!IsFinalInstanceStatus(lastExecution.Status))
×
51
                throw new OrchestrationAlreadyExistsException("Orchestration already has a running execution");
×
52
        }
53

54
        var runtimeState = new OrchestrationRuntimeState(new[] { executionStartedEvent });
163✔
55

56
        if (instance is null)
163✔
57
        {
58
            instance = _instanceMapper.CreateInstance(executionStartedEvent);
163✔
59
            await dbContext.Instances.AddAsync(instance);
163✔
60
        }
61
        else
62
        {
63
            _instanceMapper.UpdateInstance(instance, runtimeState);
×
64
        }
65

66
        var execution = _executionMapper.CreateExecution(runtimeState);
163✔
67
        await dbContext.Executions.AddAsync(execution);
163✔
68

69
        var knownQueues = new Dictionary<string, string>
163✔
70
        {
163✔
71
            [instance.InstanceId] = QueueMapper.ToQueue(runtimeState.Name, runtimeState.Version)
163✔
72
        };
163✔
73

74
        await SendTaskOrchestrationMessagesAsync(dbContext, new[] { creationMessage }, knownQueues);
163✔
75

76
        await dbContext.SaveChangesAsync();
163✔
77
    }
78

79
    public Task ForceTerminateTaskOrchestrationAsync(string instanceId, string reason)
80
    {
81
        var taskMessage = new TaskMessage
5✔
82
        {
5✔
83
            OrchestrationInstance = new OrchestrationInstance { InstanceId = instanceId },
5✔
84
            Event = new ExecutionTerminatedEvent(-1, reason)
5✔
85
        };
5✔
86

87
        return SendTaskOrchestrationMessageAsync(taskMessage);
5✔
88
    }
89

90
    public async Task<string> GetOrchestrationHistoryAsync(string instanceId, string executionId)
91
    {
92
        using var dbContext = _dbContextFactory.CreateDbContext();
4✔
93
        var events = await dbContext.Events
4✔
94
            .Where(e => e.ExecutionId == executionId)
4✔
95
            .OrderBy(e => e.SequenceNumber)
4✔
96
            .ToArrayAsync();
4✔
97

98
        return $"[{string.Join(",", events.Select(e => e.Content))}]";
36✔
99
    }
100

101
    public async Task<IList<OrchestrationState>> GetOrchestrationStateAsync(string instanceId, bool allExecutions)
102
    {
103
        using var dbContext = _dbContextFactory.CreateDbContext();
20✔
104

105
        var query = dbContext.Executions
20✔
106
            .Where(e => e.InstanceId == instanceId);
20✔
107

108
        if (!allExecutions)
20✔
109
            query = query.Join(dbContext.Instances, x => x.ExecutionId, x => x.LastExecutionId, (x, y) => x);
20✔
110

111
        var executions = await query.ToArrayAsync();
20✔
112

113
        return executions.Select(_executionMapper.MapToState).ToArray();
20✔
114
    }
115

116
    public async Task<OrchestrationState> GetOrchestrationStateAsync(string instanceId, string executionId)
117
    {
118
        using var dbContext = _dbContextFactory.CreateDbContext();
358✔
119

120
        var query = dbContext.Executions
358✔
121
            .Where(e => e.InstanceId == instanceId);
358✔
122

123
        if (executionId is null)
358✔
124
            query = query.Join(dbContext.Instances, x => x.ExecutionId, x => x.LastExecutionId, (x, y) => x);
24✔
125
        else
126
            query = query.Where(e => e.ExecutionId == executionId);
334✔
127

128
        var execution = await query.FirstOrDefaultAsync();
358✔
129

130
        if (execution is null)
358✔
131
            return null;
×
132

133
        return _executionMapper.MapToState(execution);
358✔
134
    }
135

136
    public async Task PurgeOrchestrationHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType)
137
    {
138
        using var dbContext = _dbContextFactory.CreateDbContext();
18✔
139
        await _dbContextExtensions.PurgeOrchestrationHistoryAsync(dbContext, thresholdDateTimeUtc, timeRangeFilterType);
18✔
140
    }
141

142
    public async Task<PurgeResult> PurgeInstanceStateAsync(string instanceId)
143
    {
144
        using var dbContext = _dbContextFactory.CreateDbContext();
5✔
145
        var deletedRows = await _dbContextExtensions.PurgeInstanceHistoryAsync(dbContext, instanceId);
5✔
146

147
        return new PurgeResult(deletedRows);
5✔
148
    }
149

150
    public async Task<PurgeResult> PurgeInstanceStateAsync(PurgeInstanceFilter purgeInstanceFilter)
151
    {
152
        using var dbContext = _dbContextFactory.CreateDbContext();
4✔
153
        var deletedRows = await _dbContextExtensions.PurgeInstanceHistoryAsync(dbContext, purgeInstanceFilter);
4✔
154

155
        return new PurgeResult(deletedRows);
4✔
156
    }
157

158
    public Task SendTaskOrchestrationMessageAsync(TaskMessage message)
159
    {
160
        return SendTaskOrchestrationMessageBatchAsync(message);
27✔
161
    }
162

163
    public async Task SendTaskOrchestrationMessageBatchAsync(params TaskMessage[] messages)
164
    {
165
        using var dbContext = _dbContextFactory.CreateDbContext();
29✔
166
        await SendTaskOrchestrationMessagesAsync(dbContext, messages);
29✔
167

168
        await dbContext.SaveChangesAsync();
29✔
169
    }
170

171
    public async Task<OrchestrationState> WaitForOrchestrationAsync(
172
        string instanceId,
173
        string executionId,
174
        TimeSpan timeout,
175
        CancellationToken cancellationToken)
176
    {
177
        if (string.IsNullOrWhiteSpace(instanceId))
117✔
178
        {
179
            throw new ArgumentException(nameof(instanceId));
×
180
        }
181

182
        var stoppableCancellationToken = CancellationTokenSource.CreateLinkedTokenSource(
117✔
183
            cancellationToken, _stopCts.Token).Token;
117✔
184

185
        var state = await BackoffPollingHelper.PollAsync(async () =>
117✔
186
        {
117✔
187
            return await GetOrchestrationStateAsync(instanceId, executionId);
470✔
188
        },
117✔
189
        s => IsFinalExecutionStatus(s.OrchestrationStatus),
470✔
190
        timeout,
117✔
191
        _options.PollingInterval,
117✔
192
        stoppableCancellationToken);
117✔
193

194
        if (!IsFinalExecutionStatus(state.OrchestrationStatus))
117✔
195
            return null;
×
196

197
        return state;
117✔
198
    }
199

200
    public Task<OrchestrationFeature[]> GetFeatures()
201
    {
202
        return Task.FromResult(new OrchestrationFeature[]
1✔
203
        {
1✔
204
            OrchestrationFeature.SearchByInstanceId,
1✔
205
            OrchestrationFeature.SearchByName,
1✔
206
            OrchestrationFeature.SearchByCreatedTime,
1✔
207
            OrchestrationFeature.SearchByStatus,
1✔
208
            OrchestrationFeature.Rewind,
1✔
209
            OrchestrationFeature.Tags,
1✔
210
            OrchestrationFeature.StatePerExecution
1✔
211
        });
1✔
212
    }
213

214
    public async Task<OrchestrationQueryResult> GetOrchestrationWithQueryAsync(OrchestrationQuery query, CancellationToken cancellationToken)
215
    {
216
        using var dbContext = _dbContextFactory.CreateDbContext();
6✔
217
        var instances = await _dbContextExtensions.CreateFilteredQueryable(dbContext, query)
6✔
218
            .OrderByDescending(x => x.CreatedTime)
6✔
219
            .ThenByDescending(x => x.InstanceId)
6✔
220
            .Take(query.PageSize + 1)
6✔
221
            .ToArrayAsync();
6✔
222

223
        var pageInstances = instances
6✔
224
            .Take(query.PageSize)
6✔
225
            .ToArray();
6✔
226

227
        var mappedPageInstances = pageInstances
6✔
228
            .Select(_executionMapper.MapToState)
6✔
229
            .ToArray();
6✔
230

231
        var newContinuationToken = instances.Length > pageInstances.Length
6✔
232
            ? new EFCoreContinuationToken
6✔
233
            {
6✔
234
                CreatedTime = pageInstances.Last().CreatedTime,
6✔
235
                InstanceId = pageInstances.Last().InstanceId,
6✔
236
            }.Serialize()
6✔
237
            : null;
6✔
238

239
        return new OrchestrationQueryResult(mappedPageInstances, newContinuationToken);
6✔
240
    }
241

242
    public async Task RewindTaskOrchestrationAsync(string instanceId, string reason)
243
    {
244
        if (_options.UseDTFxRewind)
1✔
245
        {
246
            var taskMessage = new TaskMessage
1✔
247
            {
1✔
248
                OrchestrationInstance = new OrchestrationInstance { InstanceId = instanceId },
1✔
249
                Event = new ExecutionRewoundEvent(-1, reason)
1✔
250
                {
1✔
251
                    // Set a dummy trace context to avoid an exception in DTFx
1✔
252
                    ParentTraceContext = new DistributedTraceContext($"{instanceId}")
1✔
253
                }
1✔
254
            };
1✔
255
            await SendTaskOrchestrationMessageAsync(taskMessage);
1✔
256
        }
257
        else
258
        {
NEW
259
            using var dbContext = _dbContextFactory.CreateDbContext();
×
NEW
260
            await RewindInstanceAsync(dbContext, instanceId, reason, true, FindLastErrorOrCompletionRewindPoint);
×
NEW
261
            await dbContext.SaveChangesAsync();
×
262
        }
263
    }
264

265
    private async Task RewindInstanceAsync(OrchestrationDbContext dbContext, string instanceId, string reason, bool rewindParents, Func<IList<HistoryEvent>, HistoryEvent> findRewindPoint)
266
    {
267
        var instance = await _dbContextExtensions.LockInstanceForUpdate(dbContext, instanceId);
×
268

269
        var execution = await dbContext.Executions
×
270
            .Where(e => e.ExecutionId == instance.LastExecutionId)
×
271
            .Include(e => e.Events)
×
272
            .SingleAsync();
×
273

274
        var deserializedEvents = execution.Events
×
275
            .OrderBy(e => e.SequenceNumber)
×
276
            .Select(e => _options.DataConverter.Deserialize<HistoryEvent>(e.Content))
×
277
            .ToArray();
×
278

279
        var rewindPoint = findRewindPoint(deserializedEvents);
×
280
        if (rewindPoint is null)
×
281
        {
282
            return;
×
283
        }
284

285
        var rewindResult = deserializedEvents.Rewind(rewindPoint, reason, _options.DataConverter);
×
286

287
        // Rewind suborchestrations
288
        foreach (var instanceIdToRewind in rewindResult.SubOrchestrationsInstancesToRewind)
×
289
        {
290
            await RewindInstanceAsync(dbContext, instanceIdToRewind, reason, false, FindLastErrorOrCompletionRewindPoint);
×
291
        }
292

293
        var orchestrationQueueName = QueueMapper.ToQueue(rewindResult.NewRuntimeState.Name, rewindResult.NewRuntimeState.Version);
×
294

295
        // Write activity messages
296
        var activityMessages = rewindResult.OutboundMessages
×
297
            .Select(m => _activityMessageMapper.CreateActivityMessage(m, orchestrationQueueName))
×
298
            .ToArray();
×
299

300
        await dbContext.ActivityMessages.AddRangeAsync(activityMessages);
×
301

302
        // Write orchestration messages
303
        var knownQueues = new Dictionary<string, string>
×
304
        {
×
305
            [rewindResult.NewRuntimeState.OrchestrationInstance.InstanceId] = orchestrationQueueName
×
306
        };
×
307

308
        if (rewindResult.NewRuntimeState.ParentInstance is not null)
×
309
            knownQueues[rewindResult.NewRuntimeState.ParentInstance.OrchestrationInstance.InstanceId] = QueueMapper.ToQueue(rewindResult.NewRuntimeState.ParentInstance.Name, rewindResult.NewRuntimeState.ParentInstance.Version);
×
310

311
        var allOrchestrationMessages = rewindResult.OrchestratorMessages
×
312
            .Concat(rewindResult.TimerMessages)
×
313
            .ToArray();
×
314

315
        await SendTaskOrchestrationMessagesAsync(dbContext, allOrchestrationMessages, knownQueues);
×
316

317
        // Update instance
318
        _instanceMapper.UpdateInstance(instance, rewindResult.NewRuntimeState);
×
319

320
        // Update current execution
321
        execution = await SaveExecutionAsync(dbContext, rewindResult.NewRuntimeState, execution);
×
322

323
        // Rewind parents
324
        if (rewindParents && rewindResult.NewRuntimeState.ParentInstance is not null)
×
325
        {
326
            await RewindInstanceAsync(dbContext, rewindResult.NewRuntimeState.ParentInstance.OrchestrationInstance.InstanceId, reason, true, historyEvents =>
×
327
            {
×
328
                return historyEvents
×
329
                    .OfType<SubOrchestrationInstanceCompletedEvent>()
×
330
                    .FirstOrDefault(h => h.TaskScheduledId == rewindResult.NewRuntimeState.ParentInstance.TaskScheduleId);
×
331
            });
×
332
        }
333
    }
334

335
    private HistoryEvent FindLastErrorOrCompletionRewindPoint(IList<HistoryEvent> historyEvents)
336
    {
337
        var completedEvent = historyEvents.OfType<ExecutionCompletedEvent>().FirstOrDefault();
×
338
        if (completedEvent is not null)
×
339
        {
340
            if (completedEvent.OrchestrationStatus == OrchestrationStatus.Failed)
×
341
            {
342
                // For failed executions, rewind to last failed task or suborchestration
343
                var lastFailedTask = historyEvents.LastOrDefault(h => h is TaskFailedEvent || h is SubOrchestrationInstanceFailedEvent);
×
344
                if (lastFailedTask is not null)
×
345
                    return lastFailedTask;
×
346
            }
347

348
            // Fallback to just reopenning orchestration, because error could have happened inside orchestrator function
349
            return completedEvent;
×
350
        }
351

352
        // For terminated executions, only rewing the termination
353
        var terminatedEvent = historyEvents.OfType<ExecutionTerminatedEvent>().FirstOrDefault();
×
354
        if (terminatedEvent is not null)
×
355
            return terminatedEvent;
×
356

357
        return null;
×
358
    }
359

360
    private bool IsFinalInstanceStatus(OrchestrationStatus status)
361
    {
362
        return IsFinalExecutionStatus(status) &&
×
363
            status != OrchestrationStatus.ContinuedAsNew;
×
364
    }
365

366
    private bool IsFinalExecutionStatus(OrchestrationStatus status)
367
    {
368
        return status != OrchestrationStatus.Running &&
470✔
369
            status != OrchestrationStatus.Pending;
470✔
370
    }
371
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc