• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lucaslorentz / durabletask-extensions / 7291576787

24 Apr 2023 09:55AM UTC coverage: 82.598% (+0.3%) from 82.27%
7291576787

push

github

lucaslorentz
Add nats

71 of 72 new or added lines in 5 files covered. (98.61%)

1 existing line in 1 file now uncovered.

2340 of 2833 relevant lines covered (82.6%)

165.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.05
/src/LLL.DurableTask.EFCore/EFCoreOrchestrationServiceClient.cs
1
using System;
2
using System.Collections.Generic;
3
using System.Linq;
4
using System.Threading;
5
using System.Threading.Tasks;
6
using DurableTask.Core;
7
using DurableTask.Core.Exceptions;
8
using DurableTask.Core.History;
9
using DurableTask.Core.Query;
10
using LLL.DurableTask.Core;
11
using LLL.DurableTask.EFCore.Extensions;
12
using LLL.DurableTask.EFCore.Mappers;
13
using LLL.DurableTask.EFCore.Polling;
14
using Microsoft.EntityFrameworkCore;
15

16
namespace LLL.DurableTask.EFCore
17
{
18
    public partial class EFCoreOrchestrationService :
19
        IOrchestrationServiceClient,
20
        IOrchestrationServiceFeaturesClient,
21
        IOrchestrationServiceQueryClient,
22
        IOrchestrationServicePurgeClient,
23
        IOrchestrationServiceRewindClient
24
    {
25
        public Task CreateTaskOrchestrationAsync(TaskMessage creationMessage)
26
        {
27
            return CreateTaskOrchestrationAsync(creationMessage, null);
×
28
        }
29

30
        public async Task CreateTaskOrchestrationAsync(TaskMessage creationMessage, OrchestrationStatus[] dedupeStatuses)
31
        {
32
            var executionStartedEvent = creationMessage.Event as ExecutionStartedEvent;
185✔
33

34
            using (var dbContext = _dbContextFactory.CreateDbContext())
185✔
35
            {
36
                var instanceId = creationMessage.OrchestrationInstance.InstanceId;
185✔
37
                var executionId = creationMessage.OrchestrationInstance.ExecutionId;
185✔
38

39
                var instance = await _dbContextExtensions.LockInstanceForUpdate(dbContext, instanceId);
185✔
40

41
                if (instance != null)
185✔
42
                {
43
                    var lastExecution = await dbContext.Executions.FindAsync(instance.LastExecutionId);
×
44

45
                    // Dedupe dedupeStatuses silently
46
                    if (dedupeStatuses != null && dedupeStatuses.Contains(lastExecution.Status))
×
47
                        return;
×
48

49
                    // Otherwise, dedupe to avoid multile runnning instances
50
                    if (!IsFinalInstanceStatus(lastExecution.Status))
×
51
                        throw new OrchestrationAlreadyExistsException("Orchestration already has a running execution");
×
52
                }
53

54
                var runtimeState = new OrchestrationRuntimeState(new[] { executionStartedEvent });
185✔
55

56
                if (instance == null)
185✔
57
                {
58
                    instance = _instanceMapper.CreateInstance(executionStartedEvent);
185✔
59
                    await dbContext.Instances.AddAsync(instance);
185✔
60
                }
61
                else
62
                {
63
                    _instanceMapper.UpdateInstance(instance, runtimeState);
×
64
                }
65

66
                var execution = _executionMapper.CreateExecution(runtimeState);
185✔
67
                await dbContext.Executions.AddAsync(execution);
185✔
68

69
                var knownQueues = new Dictionary<string, string>
185✔
70
                {
185✔
71
                    [instance.InstanceId] = QueueMapper.ToQueue(runtimeState.Name, runtimeState.Version)
185✔
72
                };
185✔
73

74
                await SendTaskOrchestrationMessagesAsync(dbContext, new[] { creationMessage }, knownQueues);
185✔
75

76
                await dbContext.SaveChangesAsync();
185✔
77
            }
78

79
            _connection.Publish($"orchestration.{QueueMapper.ToQueue(executionStartedEvent.Name, executionStartedEvent.Version)}.{executionStartedEvent.OrchestrationInstance.InstanceId}", Array.Empty<byte>());
185✔
80
        }
81

82
        public Task ForceTerminateTaskOrchestrationAsync(string instanceId, string reason)
83
        {
84
            var taskMessage = new TaskMessage
1✔
85
            {
1✔
86
                OrchestrationInstance = new OrchestrationInstance { InstanceId = instanceId },
1✔
87
                Event = new ExecutionTerminatedEvent(-1, reason)
1✔
88
            };
1✔
89

90
            return SendTaskOrchestrationMessageAsync(taskMessage);
1✔
91
        }
92

93
        public async Task<string> GetOrchestrationHistoryAsync(string instanceId, string executionId)
94
        {
95
            using (var dbContext = _dbContextFactory.CreateDbContext())
×
96
            {
97
                var events = await dbContext.Events
×
98
                    .Where(e => e.ExecutionId == executionId)
×
99
                    .OrderBy(e => e.SequenceNumber)
×
NEW
100
                    .AsNoTracking()
×
UNCOV
101
                    .ToArrayAsync();
×
102

103
                return $"[{string.Join(",", events.Select(e => e.Content))}]";
×
104
            }
105
        }
106

107
        public async Task<IList<OrchestrationState>> GetOrchestrationStateAsync(string instanceId, bool allExecutions)
108
        {
109
            var state = await GetOrchestrationStateAsync(instanceId, null);
14✔
110
            if (state == null)
14✔
111
                return new OrchestrationState[0];
6✔
112

113
            return new[] { state };
8✔
114
        }
115

116
        public async Task<OrchestrationState> GetOrchestrationStateAsync(string instanceId, string executionId)
117
        {
118
            using (var dbContext = _dbContextFactory.CreateDbContext())
425✔
119
            {
120
                var instance = await dbContext.Executions
425✔
121
                    .Where(e => e.InstanceId == instanceId && (executionId == null || e.ExecutionId == executionId))
425✔
122
                    .OrderByDescending(e => e.CreatedTime)
425✔
123
                    .FirstOrDefaultAsync();
425✔
124

125
                if (instance == null)
425✔
126
                    return null;
6✔
127

128
                return _executionMapper.MapToState(instance);
419✔
129
            }
130
        }
131

132
        public async Task PurgeOrchestrationHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType)
133
        {
134
            using (var dbContext = _dbContextFactory.CreateDbContext())
24✔
135
            {
136
                await _dbContextExtensions.PurgeOrchestrationHistoryAsync(dbContext, thresholdDateTimeUtc, timeRangeFilterType);
24✔
137
            }
138
        }
139

140
        public async Task<PurgeResult> PurgeInstanceStateAsync(string instanceId)
141
        {
142
            using (var dbContext = _dbContextFactory.CreateDbContext())
6✔
143
            {
144
                var deletedRows = await _dbContextExtensions.PurgeInstanceHistoryAsync(dbContext, instanceId);
6✔
145

146
                return new PurgeResult(deletedRows);
6✔
147
            }
148
        }
149

150
        public async Task<PurgeResult> PurgeInstanceStateAsync(PurgeInstanceFilter purgeInstanceFilter)
151
        {
152
            using (var dbContext = _dbContextFactory.CreateDbContext())
×
153
            {
154
                var deletedRows = await _dbContextExtensions.PurgeInstanceHistoryAsync(dbContext, purgeInstanceFilter);
×
155

156
                return new PurgeResult(deletedRows);
×
157
            }
158
        }
159

160
        public Task SendTaskOrchestrationMessageAsync(TaskMessage message)
161
        {
162
            return SendTaskOrchestrationMessageBatchAsync(message);
24✔
163
        }
164

165
        public async Task SendTaskOrchestrationMessageBatchAsync(params TaskMessage[] messages)
166
        {
167
            using (var dbContext = _dbContextFactory.CreateDbContext())
26✔
168
            {
169
                await SendTaskOrchestrationMessagesAsync(dbContext, messages);
26✔
170

171
                await dbContext.SaveChangesAsync();
26✔
172
            }
173
        }
174

175
        public async Task<OrchestrationState> WaitForOrchestrationAsync(
176
            string instanceId,
177
            string executionId,
178
            TimeSpan timeout,
179
            CancellationToken cancellationToken)
180
        {
181
            if (string.IsNullOrWhiteSpace(instanceId))
135✔
182
            {
183
                throw new ArgumentException(nameof(instanceId));
×
184
            }
185

186
            var stoppableCancellationToken = CancellationTokenSource.CreateLinkedTokenSource(
135✔
187
                cancellationToken, _stopCts.Token).Token;
135✔
188

189
            var state = await BackoffPollingHelper.PollAsync(async () =>
135✔
190
            {
135✔
191
                return await GetOrchestrationStateAsync(instanceId, executionId);
540✔
192
            },
135✔
193
            s => IsFinalExecutionStatus(s.OrchestrationStatus),
540✔
194
            timeout,
135✔
195
            _options.PollingInterval,
135✔
196
            stoppableCancellationToken,
135✔
197
            BackoffPollingHelper.CreateNatsWaitUntilSignal(
135✔
198
                _subscription,
135✔
199
                new HashSet<string> { $"history.{instanceId}" }));
135✔
200

201
            if (!IsFinalExecutionStatus(state.OrchestrationStatus))
135✔
202
                return null;
×
203

204
            return state;
135✔
205
        }
206

207
        public Task<OrchestrationFeature[]> GetFeatures()
208
        {
209
            return Task.FromResult(new OrchestrationFeature[]
1✔
210
            {
1✔
211
                OrchestrationFeature.SearchByInstanceId,
1✔
212
                OrchestrationFeature.SearchByName,
1✔
213
                OrchestrationFeature.SearchByCreatedTime,
1✔
214
                OrchestrationFeature.SearchByStatus,
1✔
215
                OrchestrationFeature.Rewind,
1✔
216
                OrchestrationFeature.Tags,
1✔
217
                OrchestrationFeature.StatePerExecution
1✔
218
            });
1✔
219
        }
220

221
        public async Task<OrchestrationQueryResult> GetOrchestrationWithQueryAsync(OrchestrationQuery query, CancellationToken cancellationToken)
222
        {
223
            using (var dbContext = _dbContextFactory.CreateDbContext())
7✔
224
            {
225
                var instances = await _dbContextExtensions.CreateFilteredQueryable(dbContext, query)
7✔
226
                    .OrderByDescending(x => x.CreatedTime)
7✔
227
                    .ThenByDescending(x => x.InstanceId)
7✔
228
                    .Take(query.PageSize + 1)
7✔
229
                    .AsNoTracking()
7✔
230
                    .ToArrayAsync();
7✔
231

232
                var pageInstances = instances
7✔
233
                    .Take(query.PageSize)
7✔
234
                    .ToArray();
7✔
235

236
                var mappedPageInstances = pageInstances
7✔
237
                    .Select(_executionMapper.MapToState)
7✔
238
                    .ToArray();
7✔
239

240
                var newContinuationToken = instances.Length > pageInstances.Length
7✔
241
                    ? new EFCoreContinuationToken
7✔
242
                    {
7✔
243
                        CreatedTime = pageInstances.Last().CreatedTime,
7✔
244
                        InstanceId = pageInstances.Last().InstanceId,
7✔
245
                    }.Serialize()
7✔
246
                    : null;
7✔
247

248
                return new OrchestrationQueryResult(mappedPageInstances, newContinuationToken);
7✔
249
            }
250
        }
251

252
        public async Task RewindTaskOrchestrationAsync(string instanceId, string reason)
253
        {
254
            using (var dbContext = _dbContextFactory.CreateDbContext())
1✔
255
            {
256
                await RewindInstanceAsync(dbContext, instanceId, reason, true, FindLastErrorOrCompletionRewindPoint);
1✔
257
                await dbContext.SaveChangesAsync();
1✔
258
            }
259
        }
260

261
        private async Task RewindInstanceAsync(OrchestrationDbContext dbContext, string instanceId, string reason, bool rewindParents, Func<IList<HistoryEvent>, HistoryEvent> findRewindPoint)
262
        {
263
            var instance = await _dbContextExtensions.LockInstanceForUpdate(dbContext, instanceId);
1✔
264

265
            var execution = await dbContext.Executions
1✔
266
                .Where(e => e.ExecutionId == instance.LastExecutionId)
1✔
267
                .Include(e => e.Events)
1✔
268
                .SingleAsync();
1✔
269

270
            var deserializedEvents = execution.Events
1✔
271
                .OrderBy(e => e.SequenceNumber)
1✔
272
                .Select(e => _options.DataConverter.Deserialize<HistoryEvent>(e.Content))
1✔
273
                .ToArray();
1✔
274

275
            var rewindPoint = findRewindPoint(deserializedEvents);
1✔
276
            if (rewindPoint == null)
1✔
277
            {
278
                return;
1✔
279
            }
280

281
            var rewindResult = deserializedEvents.Rewind(rewindPoint, reason, _options.DataConverter);
×
282

283
            // Rewind suborchestrations
284
            foreach (var instanceIdToRewind in rewindResult.SubOrchestrationsInstancesToRewind)
×
285
            {
286
                await RewindInstanceAsync(dbContext, instanceIdToRewind, reason, false, FindLastErrorOrCompletionRewindPoint);
×
287
            }
288

289
            var orchestrationQueueName = QueueMapper.ToQueue(rewindResult.NewRuntimeState.Name, rewindResult.NewRuntimeState.Version);
×
290

291
            // Write activity messages
292
            var activityMessages = rewindResult.OutboundMessages
×
293
                .Select(m => _activityMessageMapper.CreateActivityMessage(m, orchestrationQueueName))
×
294
                .ToArray();
×
295

296
            await dbContext.ActivityMessages.AddRangeAsync(activityMessages);
×
297

298
            // Write orchestration messages
299
            var knownQueues = new Dictionary<string, string>
×
300
            {
×
301
                [rewindResult.NewRuntimeState.OrchestrationInstance.InstanceId] = orchestrationQueueName
×
302
            };
×
303

304
            if (rewindResult.NewRuntimeState.ParentInstance != null)
×
305
                knownQueues[rewindResult.NewRuntimeState.ParentInstance.OrchestrationInstance.InstanceId] = QueueMapper.ToQueue(rewindResult.NewRuntimeState.ParentInstance.Name, rewindResult.NewRuntimeState.ParentInstance.Version);
×
306

307
            var allOrchestrationMessages = rewindResult.OrchestratorMessages
×
308
                .Concat(rewindResult.TimerMessages)
×
309
                .ToArray();
×
310

311
            await SendTaskOrchestrationMessagesAsync(dbContext, allOrchestrationMessages, knownQueues);
×
312

313
            // Update instance
314
            _instanceMapper.UpdateInstance(instance, rewindResult.NewRuntimeState);
×
315

316
            // Update current execution
317
            execution = await SaveExecutionAsync(dbContext, rewindResult.NewRuntimeState, execution);
×
318

319
            // Rewind parents
320
            if (rewindParents && rewindResult.NewRuntimeState.ParentInstance != null)
×
321
            {
322
                await RewindInstanceAsync(dbContext, rewindResult.NewRuntimeState.ParentInstance.OrchestrationInstance.InstanceId, reason, true, historyEvents =>
×
323
                {
×
324
                    return historyEvents
×
325
                        .OfType<SubOrchestrationInstanceCompletedEvent>()
×
326
                        .FirstOrDefault(h => h.TaskScheduledId == rewindResult.NewRuntimeState.ParentInstance.TaskScheduleId);
×
327
                });
×
328
            }
329
        }
330

331
        private HistoryEvent FindLastErrorOrCompletionRewindPoint(IList<HistoryEvent> historyEvents)
332
        {
333
            var completedEvent = historyEvents.OfType<ExecutionCompletedEvent>().FirstOrDefault();
1✔
334
            if (completedEvent != null)
1✔
335
            {
336
                if (completedEvent.OrchestrationStatus == OrchestrationStatus.Failed)
×
337
                {
338
                    // For failed executions, rewind to last failed task or suborchestration
339
                    var lastFailedTask = historyEvents.LastOrDefault(h => h is TaskFailedEvent || h is SubOrchestrationInstanceFailedEvent);
×
340
                    if (lastFailedTask != null)
×
341
                        return lastFailedTask;
×
342
                }
343

344
                // Fallback to just reopenning orchestration, because error could have happened inside orchestrator function
345
                return completedEvent;
×
346
            }
347

348
            // For terminated executions, only rewing the termination
349
            var terminatedEvent = historyEvents.OfType<ExecutionTerminatedEvent>().FirstOrDefault();
1✔
350
            if (terminatedEvent != null)
1✔
351
                return terminatedEvent;
×
352

353
            return null;
1✔
354
        }
355

356
        private bool IsFinalInstanceStatus(OrchestrationStatus status)
357
        {
358
            return IsFinalExecutionStatus(status) &&
×
359
                status != OrchestrationStatus.ContinuedAsNew;
×
360
        }
361

362
        private bool IsFinalExecutionStatus(OrchestrationStatus status)
363
        {
364
            return status != OrchestrationStatus.Running &&
540✔
365
                status != OrchestrationStatus.Pending;
540✔
366
        }
367
    }
368
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc