• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lucaslorentz / durabletask-extensions / 4431705642

pending completion
4431705642

push

github

Add support for registering activity methods from interfaces

2302 of 2777 relevant lines covered (82.9%)

142.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.48
/src/LLL.DurableTask.EFCore/EFCoreOrchestrationServiceClient.cs
1
using System;
2
using System.Collections.Generic;
3
using System.Linq;
4
using System.Threading;
5
using System.Threading.Tasks;
6
using DurableTask.Core;
7
using DurableTask.Core.Exceptions;
8
using DurableTask.Core.History;
9
using DurableTask.Core.Query;
10
using LLL.DurableTask.Core;
11
using LLL.DurableTask.EFCore.Extensions;
12
using LLL.DurableTask.EFCore.Mappers;
13
using LLL.DurableTask.EFCore.Polling;
14
using Microsoft.EntityFrameworkCore;
15

16
namespace LLL.DurableTask.EFCore
17
{
18
    public partial class EFCoreOrchestrationService :
19
        IOrchestrationServiceClient,
20
        IOrchestrationServiceFeaturesClient,
21
        IOrchestrationServiceQueryClient,
22
        IOrchestrationServicePurgeClient,
23
        IOrchestrationServiceRewindClient
24
    {
25
        public Task CreateTaskOrchestrationAsync(TaskMessage creationMessage)
26
        {
27
            return CreateTaskOrchestrationAsync(creationMessage, null);
×
28
        }
29

30
        public async Task CreateTaskOrchestrationAsync(TaskMessage creationMessage, OrchestrationStatus[] dedupeStatuses)
31
        {
32
            using (var dbContext = _dbContextFactory.CreateDbContext())
185✔
33
            {
34
                var executionStartedEvent = creationMessage.Event as ExecutionStartedEvent;
185✔
35

36
                var instanceId = creationMessage.OrchestrationInstance.InstanceId;
185✔
37
                var executionId = creationMessage.OrchestrationInstance.ExecutionId;
185✔
38

39
                var instance = await _dbContextExtensions.LockInstanceForUpdate(dbContext, instanceId);
185✔
40

41
                if (instance != null)
185✔
42
                {
43
                    var lastExecution = await dbContext.Executions.FindAsync(instance.LastExecutionId);
×
44

45
                    // Dedupe dedupeStatuses silently
46
                    if (dedupeStatuses != null && dedupeStatuses.Contains(lastExecution.Status))
×
47
                        return;
×
48

49
                    // Otherwise, dedupe to avoid multile runnning instances
50
                    if (!IsFinalInstanceStatus(lastExecution.Status))
×
51
                        throw new OrchestrationAlreadyExistsException("Orchestration already has a running execution");
×
52
                }
53

54
                var runtimeState = new OrchestrationRuntimeState(new[] { executionStartedEvent });
185✔
55

56
                if (instance == null)
185✔
57
                {
58
                    instance = _instanceMapper.CreateInstance(executionStartedEvent);
185✔
59
                    await dbContext.Instances.AddAsync(instance);
185✔
60
                }
61
                else
62
                {
63
                    _instanceMapper.UpdateInstance(instance, runtimeState);
×
64
                }
65

66
                var execution = _executionMapper.CreateExecution(runtimeState);
185✔
67
                await dbContext.Executions.AddAsync(execution);
185✔
68

69
                var knownQueues = new Dictionary<string, string>
185✔
70
                {
185✔
71
                    [instance.InstanceId] = QueueMapper.ToQueue(runtimeState.Name, runtimeState.Version)
185✔
72
                };
185✔
73

74
                await SendTaskOrchestrationMessagesAsync(dbContext, new[] { creationMessage }, knownQueues);
185✔
75

76
                await dbContext.SaveChangesAsync();
185✔
77
            }
78
        }
79

80
        public Task ForceTerminateTaskOrchestrationAsync(string instanceId, string reason)
81
        {
82
            var taskMessage = new TaskMessage
1✔
83
            {
1✔
84
                OrchestrationInstance = new OrchestrationInstance { InstanceId = instanceId },
1✔
85
                Event = new ExecutionTerminatedEvent(-1, reason)
1✔
86
            };
1✔
87

88
            return SendTaskOrchestrationMessageAsync(taskMessage);
1✔
89
        }
90

91
        public async Task<string> GetOrchestrationHistoryAsync(string instanceId, string executionId)
92
        {
93
            using (var dbContext = _dbContextFactory.CreateDbContext())
×
94
            {
95
                var events = await dbContext.Events
×
96
                    .Where(e => e.ExecutionId == executionId)
×
97
                    .OrderBy(e => e.SequenceNumber)
×
98
                    .ToArrayAsync();
×
99

100
                return $"[{string.Join(",", events.Select(e => e.Content))}]";
×
101
            }
102
        }
103

104
        public async Task<IList<OrchestrationState>> GetOrchestrationStateAsync(string instanceId, bool allExecutions)
105
        {
106
            var state = await GetOrchestrationStateAsync(instanceId, null);
14✔
107
            if (state == null)
14✔
108
                return new OrchestrationState[0];
6✔
109

110
            return new[] { state };
8✔
111
        }
112

113
        public async Task<OrchestrationState> GetOrchestrationStateAsync(string instanceId, string executionId)
114
        {
115
            using (var dbContext = _dbContextFactory.CreateDbContext())
409✔
116
            {
117
                var instance = await dbContext.Executions
409✔
118
                    .Where(e => e.InstanceId == instanceId && (executionId == null || e.ExecutionId == executionId))
409✔
119
                    .OrderByDescending(e => e.CreatedTime)
409✔
120
                    .FirstOrDefaultAsync();
409✔
121

122
                if (instance == null)
409✔
123
                    return null;
6✔
124

125
                return _executionMapper.MapToState(instance);
403✔
126
            }
127
        }
128

129
        public async Task PurgeOrchestrationHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType)
130
        {
131
            using (var dbContext = _dbContextFactory.CreateDbContext())
24✔
132
            {
133
                await _dbContextExtensions.PurgeOrchestrationHistoryAsync(dbContext, thresholdDateTimeUtc, timeRangeFilterType);
24✔
134
            }
135
        }
136

137
        public async Task<PurgeResult> PurgeInstanceStateAsync(string instanceId)
138
        {
139
            using (var dbContext = _dbContextFactory.CreateDbContext())
6✔
140
            {
141
                var deletedRows = await _dbContextExtensions.PurgeInstanceHistoryAsync(dbContext, instanceId);
6✔
142

143
                return new PurgeResult(deletedRows);
6✔
144
            }
145
        }
146

147
        public async Task<PurgeResult> PurgeInstanceStateAsync(PurgeInstanceFilter purgeInstanceFilter)
148
        {
149
            using (var dbContext = _dbContextFactory.CreateDbContext())
×
150
            {
151
                var deletedRows = await _dbContextExtensions.PurgeInstanceHistoryAsync(dbContext, purgeInstanceFilter);
×
152

153
                return new PurgeResult(deletedRows);
×
154
            }
155
        }
156

157
        public Task SendTaskOrchestrationMessageAsync(TaskMessage message)
158
        {
159
            return SendTaskOrchestrationMessageBatchAsync(message);
24✔
160
        }
161

162
        public async Task SendTaskOrchestrationMessageBatchAsync(params TaskMessage[] messages)
163
        {
164
            using (var dbContext = _dbContextFactory.CreateDbContext())
26✔
165
            {
166
                await SendTaskOrchestrationMessagesAsync(dbContext, messages);
26✔
167

168
                await dbContext.SaveChangesAsync();
26✔
169
            }
170
        }
171

172
        public async Task<OrchestrationState> WaitForOrchestrationAsync(
173
            string instanceId,
174
            string executionId,
175
            TimeSpan timeout,
176
            CancellationToken cancellationToken)
177
        {
178
            if (string.IsNullOrWhiteSpace(instanceId))
143✔
179
            {
180
                throw new ArgumentException(nameof(instanceId));
×
181
            }
182

183
            var stoppableCancellationToken = CancellationTokenSource.CreateLinkedTokenSource(
143✔
184
                cancellationToken, _stopCts.Token).Token;
143✔
185

186
            var state = await BackoffPollingHelper.PollAsync(async () =>
143✔
187
            {
143✔
188
                return await GetOrchestrationStateAsync(instanceId, executionId);
532✔
189
            },
143✔
190
            s => IsFinalExecutionStatus(s.OrchestrationStatus),
532✔
191
            timeout,
143✔
192
            _options.PollingInterval,
143✔
193
            stoppableCancellationToken);
143✔
194

195
            if (!IsFinalExecutionStatus(state.OrchestrationStatus))
143✔
196
                return null;
×
197

198
            return state;
143✔
199
        }
200

201
        public Task<OrchestrationFeature[]> GetFeatures()
202
        {
203
            return Task.FromResult(new OrchestrationFeature[]
1✔
204
            {
1✔
205
                OrchestrationFeature.SearchByInstanceId,
1✔
206
                OrchestrationFeature.SearchByName,
1✔
207
                OrchestrationFeature.SearchByCreatedTime,
1✔
208
                OrchestrationFeature.SearchByStatus,
1✔
209
                OrchestrationFeature.Rewind,
1✔
210
                OrchestrationFeature.Tags,
1✔
211
                OrchestrationFeature.StatePerExecution
1✔
212
            });
1✔
213
        }
214

215
        public async Task<OrchestrationQueryResult> GetOrchestrationWithQueryAsync(OrchestrationQuery query, CancellationToken cancellationToken)
216
        {
217
            using (var dbContext = _dbContextFactory.CreateDbContext())
7✔
218
            {
219
                var instances = await _dbContextExtensions.CreateFilteredQueryable(dbContext, query)
7✔
220
                    .OrderByDescending(x => x.CreatedTime)
7✔
221
                    .ThenByDescending(x => x.InstanceId)
7✔
222
                    .Take(query.PageSize + 1)
7✔
223
                    .ToArrayAsync();
7✔
224

225
                var pageInstances = instances
7✔
226
                    .Take(query.PageSize)
7✔
227
                    .ToArray();
7✔
228

229
                var mappedPageInstances = pageInstances
7✔
230
                    .Select(_executionMapper.MapToState)
7✔
231
                    .ToArray();
7✔
232

233
                var newContinuationToken = instances.Length > pageInstances.Length
7✔
234
                    ? new EFCoreContinuationToken
7✔
235
                    {
7✔
236
                        CreatedTime = pageInstances.Last().CreatedTime,
7✔
237
                        InstanceId = pageInstances.Last().InstanceId,
7✔
238
                    }.Serialize()
7✔
239
                    : null;
7✔
240

241
                return new OrchestrationQueryResult(mappedPageInstances, newContinuationToken);
7✔
242
            }
243
        }
244

245
        public async Task RewindTaskOrchestrationAsync(string instanceId, string reason)
246
        {
247
            using (var dbContext = _dbContextFactory.CreateDbContext())
1✔
248
            {
249
                await RewindInstanceAsync(dbContext, instanceId, reason, true, FindLastErrorOrCompletionRewindPoint);
1✔
250
                await dbContext.SaveChangesAsync();
1✔
251
            }
252
        }
253

254
        private async Task RewindInstanceAsync(OrchestrationDbContext dbContext, string instanceId, string reason, bool rewindParents, Func<IList<HistoryEvent>, HistoryEvent> findRewindPoint)
255
        {
256
            var instance = await _dbContextExtensions.LockInstanceForUpdate(dbContext, instanceId);
1✔
257

258
            var execution = await dbContext.Executions
1✔
259
                .Where(e => e.ExecutionId == instance.LastExecutionId)
1✔
260
                .Include(e => e.Events)
1✔
261
                .SingleAsync();
1✔
262

263
            var deserializedEvents = execution.Events
1✔
264
                .OrderBy(e => e.SequenceNumber)
1✔
265
                .Select(e => _options.DataConverter.Deserialize<HistoryEvent>(e.Content))
1✔
266
                .ToArray();
1✔
267

268
            var rewindPoint = findRewindPoint(deserializedEvents);
1✔
269
            if (rewindPoint == null)
1✔
270
            {
271
                return;
1✔
272
            }
273

274
            var rewindResult = deserializedEvents.Rewind(rewindPoint, reason, _options.DataConverter);
×
275

276
            // Rewind suborchestrations
277
            foreach (var instanceIdToRewind in rewindResult.SubOrchestrationsInstancesToRewind)
×
278
            {
279
                await RewindInstanceAsync(dbContext, instanceIdToRewind, reason, false, FindLastErrorOrCompletionRewindPoint);
×
280
            }
281

282
            var orchestrationQueueName = QueueMapper.ToQueue(rewindResult.NewRuntimeState.Name, rewindResult.NewRuntimeState.Version);
×
283

284
            // Write activity messages
285
            var activityMessages = rewindResult.OutboundMessages
×
286
                .Select(m => _activityMessageMapper.CreateActivityMessage(m, orchestrationQueueName))
×
287
                .ToArray();
×
288

289
            await dbContext.ActivityMessages.AddRangeAsync(activityMessages);
×
290

291
            // Write orchestration messages
292
            var knownQueues = new Dictionary<string, string>
×
293
            {
×
294
                [rewindResult.NewRuntimeState.OrchestrationInstance.InstanceId] = orchestrationQueueName
×
295
            };
×
296

297
            if (rewindResult.NewRuntimeState.ParentInstance != null)
×
298
                knownQueues[rewindResult.NewRuntimeState.ParentInstance.OrchestrationInstance.InstanceId] = QueueMapper.ToQueue(rewindResult.NewRuntimeState.ParentInstance.Name, rewindResult.NewRuntimeState.ParentInstance.Version);
×
299

300
            var allOrchestrationMessages = rewindResult.OrchestratorMessages
×
301
                .Concat(rewindResult.TimerMessages)
×
302
                .ToArray();
×
303

304
            await SendTaskOrchestrationMessagesAsync(dbContext, allOrchestrationMessages, knownQueues);
×
305

306
            // Update instance
307
            _instanceMapper.UpdateInstance(instance, rewindResult.NewRuntimeState);
×
308

309
            // Update current execution
310
            execution = await SaveExecutionAsync(dbContext, rewindResult.NewRuntimeState, execution);
×
311

312
            // Rewind parents
313
            if (rewindParents && rewindResult.NewRuntimeState.ParentInstance != null)
×
314
            {
315
                await RewindInstanceAsync(dbContext, rewindResult.NewRuntimeState.ParentInstance.OrchestrationInstance.InstanceId, reason, true, historyEvents =>
×
316
                {
×
317
                    return historyEvents
×
318
                        .OfType<SubOrchestrationInstanceCompletedEvent>()
×
319
                        .FirstOrDefault(h => h.TaskScheduledId == rewindResult.NewRuntimeState.ParentInstance.TaskScheduleId);
×
320
                });
×
321
            }
322
        }
323

324
        private HistoryEvent FindLastErrorOrCompletionRewindPoint(IList<HistoryEvent> historyEvents)
325
        {
326
            var completedEvent = historyEvents.OfType<ExecutionCompletedEvent>().FirstOrDefault();
1✔
327
            if (completedEvent != null)
1✔
328
            {
329
                if (completedEvent.OrchestrationStatus == OrchestrationStatus.Failed)
×
330
                {
331
                    // For failed executions, rewind to last failed task or suborchestration
332
                    var lastFailedTask = historyEvents.LastOrDefault(h => h is TaskFailedEvent || h is SubOrchestrationInstanceFailedEvent);
×
333
                    if (lastFailedTask != null)
×
334
                        return lastFailedTask;
×
335
                }
336

337
                // Fallback to just reopenning orchestration, because error could have happened inside orchestrator function
338
                return completedEvent;
×
339
            }
340

341
            // For terminated executions, only rewing the termination
342
            var terminatedEvent = historyEvents.OfType<ExecutionTerminatedEvent>().FirstOrDefault();
1✔
343
            if (terminatedEvent != null)
1✔
344
                return terminatedEvent;
×
345

346
            return null;
1✔
347
        }
348

349
        private bool IsFinalInstanceStatus(OrchestrationStatus status)
350
        {
351
            return IsFinalExecutionStatus(status) &&
×
352
                status != OrchestrationStatus.ContinuedAsNew;
×
353
        }
354

355
        private bool IsFinalExecutionStatus(OrchestrationStatus status)
356
        {
357
            return status != OrchestrationStatus.Running &&
532✔
358
                status != OrchestrationStatus.Pending;
532✔
359
        }
360
    }
361
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc