• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lucaslorentz / durabletask-extensions / 27982538143

22 Jun 2026 08:40PM UTC coverage: 76.52% (-0.6%) from 77.164%
27982538143

push

github

lucaslorentz
perf: cancel an orchestration's pending timers when it finishes

Optimization, not a bug fix: a finished orchestration's pending timers are
already harmless — when they fire they're dropped because it's no longer
running. This discards them earlier, when the execution reaches a terminal
state in CompleteTaskOrchestrationWorkItemAsync, so e.g. an unfired
Task.WhenAny timeout doesn't linger in the queue until its (possibly far-off)
due time.

Also replaces the flaky TimerOrchestration_ShouldTerminateProperly test with
TerminatedOrchestration_ShouldCancelPendingTimers: it schedules many timers
500ms apart, terminates after at least one has fired (while the rest are still
pending), and asserts the orchestration ends Terminated with no pending
messages left. The later timers can't fire within the test window, so an empty
queue proves they were cancelled.

- Reduce TimerOrchestration's timer 2s -> 1s to speed TimerOrchestration_ShouldComplete.
- Ignore *.lscache and test-results/.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

25 of 25 new or added lines in 1 file covered. (100.0%)

26 existing lines in 4 files now uncovered.

2291 of 2994 relevant lines covered (76.52%)

140.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.34
/src/LLL.DurableTask.EFCore/EFCoreOrchestrationServiceClient.cs
1
using System;
2
using System.Collections.Generic;
3
using System.Linq;
4
using System.Threading;
5
using System.Threading.Tasks;
6
using DurableTask.Core;
7
using DurableTask.Core.Exceptions;
8
using DurableTask.Core.History;
9
using DurableTask.Core.Query;
10
using DurableTask.Core.Tracing;
11
using LLL.DurableTask.Core;
12
using LLL.DurableTask.EFCore.Extensions;
13
using LLL.DurableTask.EFCore.Mappers;
14
using LLL.DurableTask.EFCore.Polling;
15
using Microsoft.EntityFrameworkCore;
16

17
namespace LLL.DurableTask.EFCore;
18

19
public partial class EFCoreOrchestrationService :
20
    IOrchestrationServiceClient,
21
    IOrchestrationServiceFeaturesClient,
22
    IOrchestrationServiceQueryClient,
23
    IOrchestrationServicePurgeClient,
24
    IOrchestrationServiceRewindClient
25
{
26
    public Task CreateTaskOrchestrationAsync(TaskMessage creationMessage)
27
    {
28
        return CreateTaskOrchestrationAsync(creationMessage, null);
×
29
    }
30

31
    public async Task CreateTaskOrchestrationAsync(TaskMessage creationMessage, OrchestrationStatus[] dedupeStatuses)
32
    {
33
        using var dbContext = _dbContextFactory.CreateDbContext();
163✔
34
        var executionStartedEvent = creationMessage.Event as ExecutionStartedEvent;
163✔
35

36
        var instanceId = creationMessage.OrchestrationInstance.InstanceId;
163✔
37
        var executionId = creationMessage.OrchestrationInstance.ExecutionId;
163✔
38

39
        var instance = await _dbContextExtensions.LockInstanceForUpdate(dbContext, instanceId);
163✔
40

41
        if (instance is not null)
163✔
42
        {
43
            var lastExecution = await dbContext.Executions.FindAsync(instance.LastExecutionId);
×
44

45
            // Dedupe dedupeStatuses silently
46
            if (dedupeStatuses is not null && dedupeStatuses.Contains(lastExecution.Status))
×
47
                return;
×
48

49
            // Otherwise, dedupe to avoid multile runnning instances
50
            if (!IsFinalInstanceStatus(lastExecution.Status))
×
51
                throw new OrchestrationAlreadyExistsException("Orchestration already has a running execution");
×
52
        }
53

54
        var runtimeState = new OrchestrationRuntimeState(new[] { executionStartedEvent });
163✔
55

56
        if (instance is null)
163✔
57
        {
58
            instance = _instanceMapper.CreateInstance(executionStartedEvent);
163✔
59
            await dbContext.Instances.AddAsync(instance);
163✔
60
        }
61
        else
62
        {
63
            _instanceMapper.UpdateInstance(instance, runtimeState);
×
64
        }
65

66
        var execution = _executionMapper.CreateExecution(runtimeState);
163✔
67
        await dbContext.Executions.AddAsync(execution);
163✔
68

69
        var knownQueues = new Dictionary<string, string>
163✔
70
        {
163✔
71
            [instance.InstanceId] = QueueMapper.ToQueue(runtimeState.Name, runtimeState.Version)
163✔
72
        };
163✔
73

74
        await SendTaskOrchestrationMessagesAsync(dbContext, new[] { creationMessage }, knownQueues);
163✔
75

76
        await dbContext.SaveChangesAsync();
163✔
77
    }
78

79
    public Task ForceTerminateTaskOrchestrationAsync(string instanceId, string reason)
80
    {
81
        var taskMessage = new TaskMessage
5✔
82
        {
5✔
83
            OrchestrationInstance = new OrchestrationInstance { InstanceId = instanceId },
5✔
84
            Event = new ExecutionTerminatedEvent(-1, reason)
5✔
85
        };
5✔
86

87
        return SendTaskOrchestrationMessageAsync(taskMessage);
5✔
88
    }
89

90
    public async Task<string> GetOrchestrationHistoryAsync(string instanceId, string executionId)
91
    {
UNCOV
92
        using var dbContext = _dbContextFactory.CreateDbContext();
×
UNCOV
93
        var events = await dbContext.Events
×
UNCOV
94
            .Where(e => e.ExecutionId == executionId)
×
UNCOV
95
            .OrderBy(e => e.SequenceNumber)
×
UNCOV
96
            .ToArrayAsync();
×
97

UNCOV
98
        return $"[{string.Join(",", events.Select(e => e.Content))}]";
×
99
    }
100

101
    public async Task<IList<OrchestrationState>> GetOrchestrationStateAsync(string instanceId, bool allExecutions)
102
    {
103
        using var dbContext = _dbContextFactory.CreateDbContext();
94✔
104

105
        var query = dbContext.Executions
94✔
106
            .Where(e => e.InstanceId == instanceId);
94✔
107

108
        if (!allExecutions)
94✔
109
            query = query.Join(dbContext.Instances, x => x.ExecutionId, x => x.LastExecutionId, (x, y) => x);
94✔
110

111
        var executions = await query.ToArrayAsync();
94✔
112

113
        return executions.Select(_executionMapper.MapToState).ToArray();
94✔
114
    }
115

116
    public async Task<OrchestrationState> GetOrchestrationStateAsync(string instanceId, string executionId)
117
    {
118
        using var dbContext = _dbContextFactory.CreateDbContext();
356✔
119

120
        var query = dbContext.Executions
356✔
121
            .Where(e => e.InstanceId == instanceId);
356✔
122

123
        if (executionId is null)
356✔
124
            query = query.Join(dbContext.Instances, x => x.ExecutionId, x => x.LastExecutionId, (x, y) => x);
24✔
125
        else
126
            query = query.Where(e => e.ExecutionId == executionId);
332✔
127

128
        var execution = await query.FirstOrDefaultAsync();
356✔
129

130
        if (execution is null)
356✔
131
            return null;
×
132

133
        return _executionMapper.MapToState(execution);
356✔
134
    }
135

136
    public async Task PurgeOrchestrationHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType)
137
    {
138
        using var dbContext = _dbContextFactory.CreateDbContext();
18✔
139
        await _dbContextExtensions.PurgeOrchestrationHistoryAsync(dbContext, thresholdDateTimeUtc, timeRangeFilterType);
18✔
140
    }
141

142
    public async Task<PurgeResult> PurgeInstanceStateAsync(string instanceId)
143
    {
144
        using var dbContext = _dbContextFactory.CreateDbContext();
5✔
145
        var deletedRows = await _dbContextExtensions.PurgeInstanceHistoryAsync(dbContext, instanceId);
5✔
146

147
        return new PurgeResult(deletedRows);
5✔
148
    }
149

150
    public async Task<PurgeResult> PurgeInstanceStateAsync(PurgeInstanceFilter purgeInstanceFilter)
151
    {
152
        using var dbContext = _dbContextFactory.CreateDbContext();
4✔
153
        var deletedRows = await _dbContextExtensions.PurgeInstanceHistoryAsync(dbContext, purgeInstanceFilter);
4✔
154

155
        return new PurgeResult(deletedRows);
4✔
156
    }
157

158
    public Task SendTaskOrchestrationMessageAsync(TaskMessage message)
159
    {
160
        return SendTaskOrchestrationMessageBatchAsync(message);
27✔
161
    }
162

163
    public async Task SendTaskOrchestrationMessageBatchAsync(params TaskMessage[] messages)
164
    {
165
        using var dbContext = _dbContextFactory.CreateDbContext();
29✔
166
        await SendTaskOrchestrationMessagesAsync(dbContext, messages);
29✔
167

168
        await dbContext.SaveChangesAsync();
29✔
169
    }
170

171
    public async Task<OrchestrationState> WaitForOrchestrationAsync(
172
        string instanceId,
173
        string executionId,
174
        TimeSpan timeout,
175
        CancellationToken cancellationToken)
176
    {
177
        if (string.IsNullOrWhiteSpace(instanceId))
119✔
178
        {
179
            throw new ArgumentException(nameof(instanceId));
×
180
        }
181

182
        var stoppableCancellationToken = CancellationTokenSource.CreateLinkedTokenSource(
119✔
183
            cancellationToken, _stopCts.Token).Token;
119✔
184

185
        var state = await BackoffPollingHelper.PollAsync(async () =>
119✔
186
        {
119✔
187
            return await GetOrchestrationStateAsync(instanceId, executionId);
470✔
188
        },
119✔
189
        s => IsFinalExecutionStatus(s.OrchestrationStatus),
470✔
190
        timeout,
119✔
191
        _options.PollingInterval,
119✔
192
        stoppableCancellationToken);
119✔
193

194
        if (!IsFinalExecutionStatus(state.OrchestrationStatus))
119✔
195
            return null;
×
196

197
        return state;
119✔
198
    }
199

200
    public Task<OrchestrationFeature[]> GetFeatures()
201
    {
202
        return Task.FromResult(new OrchestrationFeature[]
1✔
203
        {
1✔
204
            OrchestrationFeature.SearchByInstanceId,
1✔
205
            OrchestrationFeature.SearchByName,
1✔
206
            OrchestrationFeature.SearchByCreatedTime,
1✔
207
            OrchestrationFeature.SearchByStatus,
1✔
208
            OrchestrationFeature.Rewind,
1✔
209
            OrchestrationFeature.Tags,
1✔
210
            OrchestrationFeature.StatePerExecution
1✔
211
        });
1✔
212
    }
213

214
    public async Task<OrchestrationQueryResult> GetOrchestrationWithQueryAsync(OrchestrationQuery query, CancellationToken cancellationToken)
215
    {
216
        using var dbContext = _dbContextFactory.CreateDbContext();
6✔
217
        var instances = await _dbContextExtensions.CreateFilteredQueryable(dbContext, query)
6✔
218
            .OrderByDescending(x => x.CreatedTime)
6✔
219
            .ThenByDescending(x => x.InstanceId)
6✔
220
            .Take(query.PageSize + 1)
6✔
221
            .ToArrayAsync();
6✔
222

223
        var pageInstances = instances
6✔
224
            .Take(query.PageSize)
6✔
225
            .ToArray();
6✔
226

227
        var mappedPageInstances = pageInstances
6✔
228
            .Select(_executionMapper.MapToState)
6✔
229
            .ToArray();
6✔
230

231
        var newContinuationToken = instances.Length > pageInstances.Length
6✔
232
            ? new EFCoreContinuationToken
6✔
233
            {
6✔
234
                CreatedTime = pageInstances.Last().CreatedTime,
6✔
235
                InstanceId = pageInstances.Last().InstanceId,
6✔
236
            }.Serialize()
6✔
237
            : null;
6✔
238

239
        return new OrchestrationQueryResult(mappedPageInstances, newContinuationToken);
6✔
240
    }
241

242
    public async Task RewindTaskOrchestrationAsync(string instanceId, string reason)
243
    {
244
        if (_options.UseDTFxRewind)
1✔
245
        {
246
            var taskMessage = new TaskMessage
1✔
247
            {
1✔
248
                OrchestrationInstance = new OrchestrationInstance { InstanceId = instanceId },
1✔
249
                Event = new ExecutionRewoundEvent(-1, reason)
1✔
250
                {
1✔
251
                    // Set a dummy trace context to avoid an exception in DTFx
1✔
252
                    ParentTraceContext = new DistributedTraceContext($"{instanceId}")
1✔
253
                }
1✔
254
            };
1✔
255
            await SendTaskOrchestrationMessageAsync(taskMessage);
1✔
256
        }
257
        else
258
        {
259
            using var dbContext = _dbContextFactory.CreateDbContext();
×
260
            await RewindInstanceAsync(dbContext, instanceId, reason, true, FindLastErrorOrCompletionRewindPoint);
×
261
            await dbContext.SaveChangesAsync();
×
262
        }
263
    }
264

265
    private async Task RewindInstanceAsync(OrchestrationDbContext dbContext, string instanceId, string reason, bool rewindParents, Func<IList<HistoryEvent>, HistoryEvent> findRewindPoint)
266
    {
267
        var instance = await _dbContextExtensions.LockInstanceForUpdate(dbContext, instanceId);
×
268

269
        var execution = await dbContext.Executions
×
270
            .Where(e => e.ExecutionId == instance.LastExecutionId)
×
271
            .Include(e => e.Events)
×
272
            .SingleAsync();
×
273

274
        var deserializedEvents = execution.Events
×
275
            .OrderBy(e => e.SequenceNumber)
×
276
            .Select(e => _options.DataConverter.Deserialize<HistoryEvent>(e.Content))
×
277
            .ToArray();
×
278

279
        var rewindPoint = findRewindPoint(deserializedEvents);
×
280
        if (rewindPoint is null)
×
281
        {
282
            return;
×
283
        }
284

285
        var rewindResult = deserializedEvents.Rewind(rewindPoint, reason, _options.DataConverter);
×
286

287
        // Rewind suborchestrations
288
        foreach (var instanceIdToRewind in rewindResult.SubOrchestrationsInstancesToRewind)
×
289
        {
290
            await RewindInstanceAsync(dbContext, instanceIdToRewind, reason, false, FindLastErrorOrCompletionRewindPoint);
×
291
        }
292

293
        var orchestrationQueueName = QueueMapper.ToQueue(rewindResult.NewRuntimeState.Name, rewindResult.NewRuntimeState.Version);
×
294

295
        // Write activity messages
296
        var activityMessages = rewindResult.OutboundMessages
×
297
            .Select(m => _activityMessageMapper.CreateActivityMessage(m, orchestrationQueueName))
×
298
            .ToArray();
×
299

300
        await dbContext.ActivityMessages.AddRangeAsync(activityMessages);
×
301

302
        // Write orchestration messages
303
        var knownQueues = new Dictionary<string, string>
×
304
        {
×
305
            [rewindResult.NewRuntimeState.OrchestrationInstance.InstanceId] = orchestrationQueueName
×
306
        };
×
307

308
        if (rewindResult.NewRuntimeState.ParentInstance is not null)
×
309
            knownQueues[rewindResult.NewRuntimeState.ParentInstance.OrchestrationInstance.InstanceId] = QueueMapper.ToQueue(rewindResult.NewRuntimeState.ParentInstance.Name, rewindResult.NewRuntimeState.ParentInstance.Version);
×
310

311
        var allOrchestrationMessages = rewindResult.OrchestratorMessages
×
312
            .Concat(rewindResult.TimerMessages)
×
313
            .ToArray();
×
314

315
        await SendTaskOrchestrationMessagesAsync(dbContext, allOrchestrationMessages, knownQueues);
×
316

317
        // Update instance
318
        _instanceMapper.UpdateInstance(instance, rewindResult.NewRuntimeState);
×
319

320
        // Update current execution
321
        execution = await SaveExecutionAsync(dbContext, rewindResult.NewRuntimeState, execution);
×
322

323
        // Rewind parents
324
        if (rewindParents && rewindResult.NewRuntimeState.ParentInstance is not null)
×
325
        {
326
            await RewindInstanceAsync(dbContext, rewindResult.NewRuntimeState.ParentInstance.OrchestrationInstance.InstanceId, reason, true, historyEvents =>
×
327
            {
×
328
                return historyEvents
×
329
                    .OfType<SubOrchestrationInstanceCompletedEvent>()
×
330
                    .FirstOrDefault(h => h.TaskScheduledId == rewindResult.NewRuntimeState.ParentInstance.TaskScheduleId);
×
331
            });
×
332
        }
333
    }
334

335
    private HistoryEvent FindLastErrorOrCompletionRewindPoint(IList<HistoryEvent> historyEvents)
336
    {
337
        var completedEvent = historyEvents.OfType<ExecutionCompletedEvent>().FirstOrDefault();
×
338
        if (completedEvent is not null)
×
339
        {
340
            if (completedEvent.OrchestrationStatus == OrchestrationStatus.Failed)
×
341
            {
342
                // For failed executions, rewind to last failed task or suborchestration
343
                var lastFailedTask = historyEvents.LastOrDefault(h => h is TaskFailedEvent || h is SubOrchestrationInstanceFailedEvent);
×
344
                if (lastFailedTask is not null)
×
345
                    return lastFailedTask;
×
346
            }
347

348
            // Fallback to just reopenning orchestration, because error could have happened inside orchestrator function
349
            return completedEvent;
×
350
        }
351

352
        // For terminated executions, only rewing the termination
353
        var terminatedEvent = historyEvents.OfType<ExecutionTerminatedEvent>().FirstOrDefault();
×
354
        if (terminatedEvent is not null)
×
355
            return terminatedEvent;
×
356

357
        return null;
×
358
    }
359

360
    private bool IsFinalInstanceStatus(OrchestrationStatus status)
361
    {
362
        return IsFinalExecutionStatus(status) &&
×
363
            status != OrchestrationStatus.ContinuedAsNew;
×
364
    }
365

366
    private bool IsFinalExecutionStatus(OrchestrationStatus status)
367
    {
368
        return status != OrchestrationStatus.Running &&
470✔
369
            status != OrchestrationStatus.Pending;
470✔
370
    }
371
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc