• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lucaslorentz / durabletask-extensions / 5835581151

pending completion
5835581151

Pull #27

github

lucaslorentz
Add ExtendedOrchestrationContext to simplify workers API
Pull Request #27: Add ExtendedOrchestrationContext to simplify workers API

71 of 71 new or added lines in 5 files covered. (100.0%)

2290 of 2798 relevant lines covered (81.84%)

142.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.92
/src/LLL.DurableTask.EFCore/EFCoreOrchestrationService.cs
1
using System;
2
using System.Collections.Generic;
3
using System.Linq;
4
using System.Threading;
5
using System.Threading.Tasks;
6
using DurableTask.Core;
7
using DurableTask.Core.History;
8
using LLL.DurableTask.Core;
9
using LLL.DurableTask.EFCore.Entities;
10
using LLL.DurableTask.EFCore.Extensions;
11
using LLL.DurableTask.EFCore.Mappers;
12
using LLL.DurableTask.EFCore.Polling;
13
using Microsoft.EntityFrameworkCore;
14
using Microsoft.Extensions.Logging;
15
using Microsoft.Extensions.Options;
16

17
namespace LLL.DurableTask.EFCore
18
{
19
    public partial class EFCoreOrchestrationService :
20
        IOrchestrationService,
21
        IDistributedOrchestrationService
22
    {
23
        private readonly EFCoreOrchestrationOptions _options;
24
        private readonly IDbContextFactory<OrchestrationDbContext> _dbContextFactory;
25
        private readonly OrchestrationDbContextExtensions _dbContextExtensions;
26
        private readonly OrchestrationMessageMapper _orchestrationMessageMapper;
27
        private readonly ActivityMessageMapper _activityMessageMapper;
28
        private readonly InstanceMapper _instanceMapper;
29
        private readonly ExecutionMapper _executionMapper;
30
        private readonly ILogger<EFCoreOrchestrationService> _logger;
31

32
        private CancellationTokenSource _stopCts = new CancellationTokenSource();
124✔
33

34
        public EFCoreOrchestrationService(
124✔
35
            IOptions<EFCoreOrchestrationOptions> options,
124✔
36
            IDbContextFactory<OrchestrationDbContext> dbContextFactory,
124✔
37
            OrchestrationDbContextExtensions dbContextExtensions,
124✔
38
            OrchestrationMessageMapper orchestrationMessageMapper,
124✔
39
            ActivityMessageMapper activityMessageMapper,
124✔
40
            InstanceMapper instanceMapper,
124✔
41
            ExecutionMapper executionMapper,
124✔
42
            ILogger<EFCoreOrchestrationService> logger)
124✔
43
        {
44
            _options = options.Value;
124✔
45
            _dbContextFactory = dbContextFactory;
124✔
46
            _dbContextExtensions = dbContextExtensions;
124✔
47
            _orchestrationMessageMapper = orchestrationMessageMapper;
124✔
48
            _activityMessageMapper = activityMessageMapper;
124✔
49
            _instanceMapper = instanceMapper;
124✔
50
            _executionMapper = executionMapper;
124✔
51
            _logger = logger;
124✔
52
        }
53

54
        public int TaskOrchestrationDispatcherCount => _options.TaskOrchestrationDispatcherCount;
102✔
55
        public int MaxConcurrentTaskOrchestrationWorkItems => _options.MaxConcurrentTaskOrchestrationWorkItems;
102✔
56
        public int MaxConcurrentTaskActivityWorkItems => _options.MaxConcurrentTaskActivityWorkItems;
110✔
57
        public BehaviorOnContinueAsNew EventBehaviourForContinueAsNew => BehaviorOnContinueAsNew.Carryover;
145✔
58
        public int TaskActivityDispatcherCount => _options.TaskActivityDispatcherCount;
78✔
59

60
        public Task CreateAsync()
61
        {
62
            return CreateAsync(false);
×
63
        }
64

65
        public async Task CreateAsync(bool recreateInstanceStore)
66
        {
67
            using (var dbContext = _dbContextFactory.CreateDbContext())
×
68
            {
69
                if (recreateInstanceStore)
×
70
                    await dbContext.Database.EnsureDeletedAsync();
×
71

72
                await _dbContextExtensions.Migrate(dbContext);
×
73
            }
74
        }
75

76
        public async Task CreateIfNotExistsAsync()
77
        {
78
            using (var dbContext = _dbContextFactory.CreateDbContext())
114✔
79
            {
80
                await _dbContextExtensions.Migrate(dbContext);
114✔
81
            }
82
        }
83

84
        public Task DeleteAsync()
85
        {
86
            return DeleteAsync(false);
×
87
        }
88

89
        public async Task DeleteAsync(bool deleteInstanceStore)
90
        {
91
            using (var dbContext = _dbContextFactory.CreateDbContext())
×
92
            {
93
                await dbContext.Database.EnsureDeletedAsync();
×
94
            }
95
        }
96

97
        public int GetDelayInSecondsAfterOnFetchException(Exception exception)
98
        {
99
            return _options.DelayInSecondsAfterFailure;
×
100
        }
101

102
        public int GetDelayInSecondsAfterOnProcessException(Exception exception)
103
        {
104
            return _options.DelayInSecondsAfterFailure;
×
105
        }
106

107
        public bool IsMaxMessageCountExceeded(int currentMessageCount, OrchestrationRuntimeState runtimeState)
108
        {
109
            return false;
259✔
110
        }
111

112
        public async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken)
113
        {
114
            return await LockNextTaskOrchestrationWorkItemAsync(receiveTimeout, null, cancellationToken);
×
115
        }
116

117
        public async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(
118
            TimeSpan receiveTimeout,
119
            INameVersionInfo[] orchestrations,
120
            CancellationToken cancellationToken)
121
        {
122
            var stoppableCancellationToken = CancellationTokenSource.CreateLinkedTokenSource(
229✔
123
                cancellationToken, _stopCts.Token).Token;
229✔
124

125
            return await BackoffPollingHelper.PollAsync(async () =>
229✔
126
            {
229✔
127
                using (var dbContext = _dbContextFactory.CreateDbContext())
827✔
128
                {
229✔
129
                    var instance = await LockNextInstanceAsync(dbContext, orchestrations);
827✔
130

229✔
131
                    if (instance == null)
827✔
132
                        return null;
691✔
133

229✔
134
                    var execution = await dbContext.Executions
365✔
135
                        .Where(e => e.ExecutionId == instance.LastExecutionId)
365✔
136
                        .Include(e => e.Events)
365✔
137
                        .AsNoTracking()
365✔
138
                        .SingleAsync();
365✔
139

229✔
140
                    var deserializedEvents = execution.Events
365✔
141
                        .OrderBy(e => e.SequenceNumber)
393✔
142
                        .Select(e => _options.DataConverter.Deserialize<HistoryEvent>(e.Content))
393✔
143
                        .ToArray();
365✔
144

229✔
145
                    var reopenedEvents = deserializedEvents.Reopen(_options.DataConverter);
365✔
146

229✔
147
                    var runtimeState = new OrchestrationRuntimeState(reopenedEvents);
365✔
148

229✔
149
                    var session = new EFCoreOrchestrationSession(
365✔
150
                        _options,
365✔
151
                        _dbContextFactory,
365✔
152
                        instance,
365✔
153
                        execution,
365✔
154
                        runtimeState,
365✔
155
                        _stopCts.Token);
365✔
156

229✔
157
                    var messages = await session.FetchNewMessagesAsync(dbContext);
365✔
158

229✔
159
                    if (messages.Count == 0)
365✔
160
                    {
229✔
161
                        instance.LockId = null;
249✔
162
                        instance.LockedUntil = DateTime.UtcNow;
249✔
163
                        await dbContext.SaveChangesAsync();
249✔
164
                        return null;
249✔
165
                    }
229✔
166

229✔
167
                    await dbContext.SaveChangesAsync();
345✔
168

229✔
169
                    return new TaskOrchestrationWorkItem
345✔
170
                    {
345✔
171
                        InstanceId = instance.InstanceId,
345✔
172
                        LockedUntilUtc = instance.LockedUntil,
345✔
173
                        OrchestrationRuntimeState = runtimeState,
345✔
174
                        NewMessages = messages,
345✔
175
                        Session = session
345✔
176
                    };
345✔
177
                }
229✔
178
            },
229✔
179
            r => r != null,
827✔
180
            receiveTimeout,
229✔
181
            _options.PollingInterval,
229✔
182
            stoppableCancellationToken);
229✔
183
        }
184

185
        public async Task<TaskActivityWorkItem> LockNextTaskActivityWorkItem(TimeSpan receiveTimeout, CancellationToken cancellationToken)
186
        {
187
            return await LockNextTaskActivityWorkItem(receiveTimeout, null, cancellationToken);
×
188
        }
189

190
        public async Task<TaskActivityWorkItem> LockNextTaskActivityWorkItem(
191
            TimeSpan receiveTimeout,
192
            INameVersionInfo[] activities,
193
            CancellationToken cancellationToken)
194
        {
195
            var stoppableCancellationToken = CancellationTokenSource.CreateLinkedTokenSource(
194✔
196
                cancellationToken, _stopCts.Token).Token;
194✔
197

198
            return await BackoffPollingHelper.PollAsync(async () =>
194✔
199
            {
194✔
200
                using (var dbContext = _dbContextFactory.CreateDbContext())
713✔
201
                {
194✔
202
                    var activityMessage = await LockActivityMessage(dbContext, activities);
713✔
203

194✔
204
                    if (activityMessage == null)
713✔
205
                        return null;
609✔
206

194✔
207
                    return new TaskActivityWorkItem
298✔
208
                    {
298✔
209
                        Id = CreateTaskActivityWorkItemId(activityMessage.Id, activityMessage.LockId, activityMessage.ReplyQueue),
298✔
210
                        TaskMessage = _options.DataConverter.Deserialize<TaskMessage>(activityMessage.Message),
298✔
211
                        LockedUntilUtc = activityMessage.LockedUntil
298✔
212
                    };
298✔
213
                }
194✔
214
            },
194✔
215
            x => x != null,
713✔
216
            receiveTimeout,
194✔
217
            _options.PollingInterval,
194✔
218
            stoppableCancellationToken);
194✔
219
        }
220

221
        public async Task ReleaseTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem)
222
        {
223
            using (var dbContext = _dbContextFactory.CreateDbContext())
116✔
224
            {
225
                var session = workItem.Session as EFCoreOrchestrationSession;
116✔
226
                if (!session.Released)
116✔
227
                {
228
                    dbContext.Instances.Attach(session.Instance);
116✔
229
                    session.Instance.LockId = null;
116✔
230
                    session.Instance.LockedUntil = DateTime.UtcNow;
116✔
231
                    await dbContext.SaveChangesAsync();
116✔
232
                    session.Released = true;
116✔
233
                }
234
            }
235
        }
236

237
        public async Task AbandonTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem)
238
        {
239
            using (var dbContext = _dbContextFactory.CreateDbContext())
×
240
            {
241
                var session = workItem.Session as EFCoreOrchestrationSession;
×
242
                if (session.Released)
×
243
                    throw new InvalidOperationException("Session was already released");
×
244

245
                dbContext.Instances.Attach(session.Instance);
×
246
                session.Instance.LockId = null;
×
247
                // TODO: Exponential backoff
248
                session.Instance.LockedUntil = DateTime.UtcNow.AddMinutes(1);
×
249
                await dbContext.SaveChangesAsync();
×
250
                session.Released = true;
×
251
            }
252
        }
253

254
        public async Task RenewTaskOrchestrationWorkItemLockAsync(TaskOrchestrationWorkItem workItem)
255
        {
256
            using (var dbContext = _dbContextFactory.CreateDbContext())
55✔
257
            {
258
                var session = workItem.Session as EFCoreOrchestrationSession;
55✔
259

260
                var lockedUntilUTC = DateTime.UtcNow.Add(_options.OrchestrationLockTimeout);
55✔
261
                dbContext.Instances.Attach(session.Instance);
55✔
262
                session.Instance.LockedUntil = DateTime.UtcNow.Add(_options.OrchestrationLockTimeout);
55✔
263
                await dbContext.SaveChangesAsync();
55✔
264

265
                workItem.LockedUntilUtc = lockedUntilUTC;
55✔
266
            }
267
        }
268

269
        public async Task AbandonTaskActivityWorkItemAsync(TaskActivityWorkItem workItem)
270
        {
271
            using (var dbContext = _dbContextFactory.CreateDbContext())
×
272
            {
273
                var (id, lockId, _) = ParseTaskActivityWorkItemId(workItem.Id);
×
274

275
                var activityMessage = await dbContext.ActivityMessages.FindAsync(id);
×
276

277
                if (activityMessage.LockId != lockId)
×
278
                    throw new Exception("Lost task activity lock");
×
279

280
                activityMessage.LockId = null;
×
281
                activityMessage.LockedUntil = DateTime.UtcNow;
×
282
                await dbContext.SaveChangesAsync();
×
283
            }
284
        }
285

286
        public async Task<TaskActivityWorkItem> RenewTaskActivityWorkItemLockAsync(TaskActivityWorkItem workItem)
287
        {
288
            using (var dbContext = _dbContextFactory.CreateDbContext())
×
289
            {
290
                var (id, lockId, _) = ParseTaskActivityWorkItemId(workItem.Id);
×
291

292
                var activityMessage = await dbContext.ActivityMessages.FindAsync(id);
×
293

294
                if (activityMessage.LockId != lockId)
×
295
                    throw new Exception("Lost task activity lock");
×
296

297
                var lockedUntilUTC = DateTime.UtcNow.Add(_options.ActivtyLockTimeout);
×
298
                activityMessage.LockedUntil = lockedUntilUTC;
×
299
                await dbContext.SaveChangesAsync();
×
300

301
                workItem.LockedUntilUtc = lockedUntilUTC;
×
302

303
                return workItem;
×
304
            }
305
        }
306

307
        public Task StartAsync()
308
        {
309
            if (_stopCts.IsCancellationRequested)
114✔
310
                _stopCts = new CancellationTokenSource();
×
311
            return Task.CompletedTask;
114✔
312
        }
313

314
        public Task StopAsync()
315
        {
316
            return StopAsync(false);
24✔
317
        }
318

319
        public Task StopAsync(bool isForced)
320
        {
321
            _stopCts.Cancel();
126✔
322
            return Task.CompletedTask;
126✔
323
        }
324

325
        public async Task CompleteTaskOrchestrationWorkItemAsync(
326
            TaskOrchestrationWorkItem workItem,
327
            OrchestrationRuntimeState newOrchestrationRuntimeState,
328
            IList<TaskMessage> outboundMessages,
329
            IList<TaskMessage> orchestratorMessages,
330
            IList<TaskMessage> timerMessages,
331
            TaskMessage continuedAsNewMessage,
332
            OrchestrationState orchestrationState)
333
        {
334
            using (var dbContext = _dbContextFactory.CreateDbContext())
197✔
335
            {
336
                var session = workItem.Session as EFCoreOrchestrationSession;
197✔
337

338
                await _dbContextExtensions.WithinTransaction(dbContext, async () =>
197✔
339
                {
197✔
340
                    dbContext.Instances.Attach(session.Instance);
394✔
341

197✔
342
                    // Create child orchestrations
197✔
343
                    foreach (var executionStartedEvent in orchestratorMessages.Select(m => m.Event).OfType<ExecutionStartedEvent>())
655✔
344
                    {
197✔
345
                        var childInstance = _instanceMapper.CreateInstance(executionStartedEvent);
212✔
346
                        await dbContext.Instances.AddAsync(childInstance);
212✔
347

197✔
348
                        var childRuntimeState = new OrchestrationRuntimeState(new[] { executionStartedEvent });
212✔
349
                        var childExecution = _executionMapper.CreateExecution(childRuntimeState);
212✔
350
                        await dbContext.Executions.AddAsync(childExecution);
212✔
351
                    }
197✔
352

197✔
353
                    var orchestrationQueueName = QueueMapper.ToQueue(orchestrationState.Name, orchestrationState.Version);
394✔
354

197✔
355
                    var knownQueues = new Dictionary<string, string>
394✔
356
                    {
394✔
357
                        [orchestrationState.OrchestrationInstance.InstanceId] = orchestrationQueueName
394✔
358
                    };
394✔
359

197✔
360
                    if (orchestrationState.ParentInstance != null)
394✔
361
                        knownQueues[orchestrationState.ParentInstance.OrchestrationInstance.InstanceId] = QueueMapper.ToQueue(orchestrationState.ParentInstance.Name, orchestrationState.ParentInstance.Version);
212✔
362

197✔
363
                    // Write messages
197✔
364
                    var activityMessages = outboundMessages
394✔
365
                        .Select(m => _activityMessageMapper.CreateActivityMessage(m, orchestrationQueueName))
498✔
366
                        .ToArray();
394✔
367

197✔
368
                    await dbContext.ActivityMessages.AddRangeAsync(activityMessages);
394✔
369

197✔
370
                    var allOrchestrationMessages = orchestratorMessages
394✔
371
                        .Concat(timerMessages)
394✔
372
                        .Concat(continuedAsNewMessage != null ? new[] { continuedAsNewMessage } : Enumerable.Empty<TaskMessage>())
394✔
373
                        .ToArray();
394✔
374

197✔
375
                    await SendTaskOrchestrationMessagesAsync(dbContext, allOrchestrationMessages, knownQueues);
394✔
376

197✔
377
                    // Remove executed messages
197✔
378
                    dbContext.AttachRange(session.Messages);
394✔
379
                    dbContext.OrchestrationMessages.RemoveRange(session.Messages);
394✔
380

197✔
381
                    // Update instance
197✔
382
                    _instanceMapper.UpdateInstance(session.Instance, newOrchestrationRuntimeState);
394✔
383

197✔
384
                    // Update current execution
197✔
385
                    EnrichNewEventsInput(workItem.OrchestrationRuntimeState, outboundMessages, orchestratorMessages);
394✔
386
                    session.Execution = await SaveExecutionAsync(dbContext, workItem.OrchestrationRuntimeState, session.Execution);
394✔
387

197✔
388
                    // Update new execution
197✔
389
                    if (newOrchestrationRuntimeState != workItem.OrchestrationRuntimeState)
394✔
390
                    {
197✔
391
                        EnrichNewEventsInput(newOrchestrationRuntimeState, outboundMessages, orchestratorMessages);
232✔
392
                        session.Execution = await SaveExecutionAsync(dbContext, newOrchestrationRuntimeState);
232✔
393
                    }
197✔
394

197✔
395
                    await dbContext.SaveChangesAsync();
394✔
396
                });
197✔
397

398
                session.RuntimeState = newOrchestrationRuntimeState;
197✔
399
                session.ClearMessages();
197✔
400
            }
401
        }
402

403
        public async Task CompleteTaskActivityWorkItemAsync(TaskActivityWorkItem workItem, TaskMessage responseMessage)
404
        {
405
            using (var dbContext = _dbContextFactory.CreateDbContext())
104✔
406
            {
407
                await _dbContextExtensions.WithinTransaction(dbContext, async () =>
104✔
408
                {
104✔
409
                    var (id, lockId, orchestrationQueue) = ParseTaskActivityWorkItemId(workItem.Id);
208✔
410

104✔
411
                    var activityMessage = await dbContext.ActivityMessages
208✔
412
                        .FirstAsync(w => w.Id == id && w.LockId == lockId);
208✔
413

104✔
414
                    dbContext.ActivityMessages.Remove(activityMessage);
208✔
415

104✔
416
                    var knownQueues = new Dictionary<string, string>
208✔
417
                    {
208✔
418
                        [workItem.TaskMessage.OrchestrationInstance.InstanceId] = orchestrationQueue
208✔
419
                    };
208✔
420

104✔
421
                    await SendTaskOrchestrationMessagesAsync(dbContext, new[] { responseMessage }, knownQueues);
208✔
422

104✔
423
                    await dbContext.SaveChangesAsync();
208✔
424
                });
104✔
425
            }
426
        }
427

428
        private async Task SendTaskOrchestrationMessagesAsync(
429
            OrchestrationDbContext dbContext,
430
            TaskMessage[] messages,
431
            IDictionary<string, string> knownQueues = null)
432
        {
433
            var instancesGroups = messages
513✔
434
                .GroupBy(m => m.OrchestrationInstance.InstanceId)
876✔
435
                .ToArray();
513✔
436

437
            foreach (var instanceGroup in instancesGroups)
1,752✔
438
            {
439
                if (knownQueues == null || !knownQueues.TryGetValue(instanceGroup.Key, out var queue))
363✔
440
                {
441
                    var instance = await dbContext.Instances.FindAsync(instanceGroup.Key);
45✔
442

443
                    if (instance == null)
45✔
444
                        throw new Exception($"Instance {instanceGroup.Key} not found");
×
445

446
                    queue = instance.LastQueue;
45✔
447
                }
448

449
                var orchestrationMessages = instanceGroup
363✔
450
                    .Select((m, i) => _orchestrationMessageMapper.CreateOrchestrationMessageAsync(m, i, queue))
726✔
451
                    .ToArray();
363✔
452

453
                await dbContext.OrchestrationMessages.AddRangeAsync(orchestrationMessages);
363✔
454
            }
455
        }
456

457
        private async Task<Execution> SaveExecutionAsync(
458
            OrchestrationDbContext dbContext,
459
            OrchestrationRuntimeState runtimeState,
460
            Execution existingExecution = null)
461
        {
462
            Execution execution;
463

464
            if (existingExecution == null)
232✔
465
            {
466
                execution = _executionMapper.CreateExecution(runtimeState);
35✔
467
                await dbContext.Executions.AddAsync(execution);
35✔
468
            }
469
            else
470
            {
471
                execution = existingExecution;
197✔
472
                dbContext.Executions.Attach(execution);
197✔
473
                dbContext.Events.AttachRange(execution.Events);
197✔
474
                _executionMapper.UpdateExecution(execution, runtimeState);
197✔
475
            }
476

477
            var orderedEvents = execution.Events.OrderBy(e => e.SequenceNumber).ToArray();
744✔
478

479
            var sequenceNumber = 0;
232✔
480
            while (sequenceNumber < runtimeState.Events.Count && sequenceNumber < orderedEvents.Length)
744✔
481
            {
482
                var historyEvent = runtimeState.Events[sequenceNumber];
512✔
483
                var @event = orderedEvents[sequenceNumber];
512✔
484
                _executionMapper.UpdateEvent(@event, historyEvent);
512✔
485
                sequenceNumber++;
512✔
486
            }
487
            while (sequenceNumber < runtimeState.Events.Count)
1,287✔
488
            {
489
                var historyEvent = runtimeState.Events[sequenceNumber];
1,055✔
490
                var @event = _executionMapper.CreateEvent(runtimeState.OrchestrationInstance, sequenceNumber, historyEvent);
1,055✔
491
                execution.Events.Add(@event);
1,055✔
492
                dbContext.Events.Add(@event);
1,055✔
493
                sequenceNumber++;
1,055✔
494
            }
495
            while (sequenceNumber < runtimeState.Events.Count)
232✔
496
            {
497
                var @event = orderedEvents[sequenceNumber];
×
498
                execution.Events.Remove(@event);
×
499
                dbContext.Events.Remove(@event);
×
500
                sequenceNumber++;
×
501
            }
502

503
            return execution;
232✔
504
        }
505

506
        private async Task<Instance> LockNextInstanceAsync(OrchestrationDbContext dbContext, INameVersionInfo[] orchestrations)
507
        {
508
            if (orchestrations == null)
598✔
509
                return await _dbContextExtensions.WithinTransaction(dbContext,
×
510
                    () => _dbContextExtensions.TryLockNextInstanceAsync(dbContext, _options.OrchestrationLockTimeout));
×
511

512
            var queues = orchestrations
598✔
513
                .Select(QueueMapper.ToQueue)
598✔
514
                .ToArray();
598✔
515

516
            var instance = await _dbContextExtensions.WithinTransaction(dbContext,
598✔
517
                () => _dbContextExtensions.TryLockNextInstanceAsync(dbContext, queues, _options.OrchestrationLockTimeout));
1,196✔
518
            if (instance != null)
598✔
519
                return instance;
136✔
520

521
            return null;
462✔
522
        }
523

524
        private async Task<ActivityMessage> LockActivityMessage(OrchestrationDbContext dbContext, INameVersionInfo[] activities)
525
        {
526
            var lockId = Guid.NewGuid().ToString();
519✔
527
            var lockUntilUtc = DateTime.UtcNow.Add(_options.OrchestrationLockTimeout);
519✔
528

529
            if (activities == null)
519✔
530
                return await _dbContextExtensions.WithinTransaction(dbContext,
×
531
                    () => _dbContextExtensions.TryLockNextActivityMessageAsync(dbContext, _options.OrchestrationLockTimeout));
×
532

533
            var queues = activities
519✔
534
                .Select(QueueMapper.ToQueue)
519✔
535
                .ToArray();
519✔
536

537
            var activityMessage = await _dbContextExtensions.WithinTransaction(dbContext,
519✔
538
                () => _dbContextExtensions.TryLockNextActivityMessageAsync(dbContext, queues, _options.OrchestrationLockTimeout));
1,038✔
539
            if (activityMessage != null)
519✔
540
                return activityMessage;
104✔
541

542
            return null;
415✔
543
        }
544

545
        private static string CreateTaskActivityWorkItemId(Guid id, string lockId, string replyQueue)
546
        {
547
            return $"{id}|{lockId}|{replyQueue}";
104✔
548
        }
549

550
        private static (Guid id, string lockId, string replyQueue) ParseTaskActivityWorkItemId(string id)
551
        {
552
            var parts = id.Split('|');
104✔
553
            return (Guid.Parse(parts[0]), parts[1], parts[2]);
104✔
554
        }
555

556
        private static void EnrichNewEventsInput(OrchestrationRuntimeState orchestrationRuntimeState, IList<TaskMessage> outboundMessages, IList<TaskMessage> orchestratorMessages)
557
        {
558
            foreach (var outboundEvent in outboundMessages.Select(e => e.Event))
836✔
559
            {
560
                switch (outboundEvent)
561
                {
562
                    case TaskScheduledEvent outboundTaskScheduledEvent:
563
                        foreach (var taskScheduledEvent in orchestrationRuntimeState.NewEvents.OfType<TaskScheduledEvent>())
1,376✔
564
                        {
565
                            if (taskScheduledEvent.EventId == outboundTaskScheduledEvent.EventId)
564✔
566
                            {
567
                                taskScheduledEvent.Input = outboundTaskScheduledEvent.Input;
104✔
568
                            }
569
                        }
570
                        break;
571
                }
572
            }
573
            foreach (var orchestrationEvent in orchestratorMessages.Select(e => e.Event))
566✔
574
            {
575
                switch (orchestrationEvent)
576
                {
577
                    case ExecutionStartedEvent executionStartedEvent:
578
                        foreach (var subOrchestrationCreatedEvent in orchestrationRuntimeState.NewEvents.OfType<SubOrchestrationInstanceCreatedEvent>())
80✔
579
                        {
580
                            if (subOrchestrationCreatedEvent.InstanceId == executionStartedEvent.OrchestrationInstance.InstanceId)
25✔
581
                            {
582
                                subOrchestrationCreatedEvent.Input = executionStartedEvent.Input;
15✔
583
                            }
584
                        }
585
                        break;
586
                }
587
            }
588
        }
589
    }
590
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc