• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lucaslorentz / durabletask-extensions / 6132966612

09 Sep 2023 08:15PM UTC coverage: 81.089% (-0.07%) from 81.16%
6132966612

push

github

lucaslorentz
Fix navigation to suborchestration

2264 of 2792 relevant lines covered (81.09%)

144.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.55
/src/LLL.DurableTask.EFCore/EFCoreOrchestrationService.cs
1
using System;
2
using System.Collections.Generic;
3
using System.Linq;
4
using System.Threading;
5
using System.Threading.Tasks;
6
using DurableTask.Core;
7
using DurableTask.Core.History;
8
using LLL.DurableTask.Core;
9
using LLL.DurableTask.EFCore.Entities;
10
using LLL.DurableTask.EFCore.Extensions;
11
using LLL.DurableTask.EFCore.Mappers;
12
using LLL.DurableTask.EFCore.Polling;
13
using Microsoft.EntityFrameworkCore;
14
using Microsoft.Extensions.Logging;
15
using Microsoft.Extensions.Options;
16

17
namespace LLL.DurableTask.EFCore;
18

19
public partial class EFCoreOrchestrationService :
20
    IOrchestrationService,
21
    IDistributedOrchestrationService
22
{
23
    private readonly EFCoreOrchestrationOptions _options;
24
    private readonly IDbContextFactory<OrchestrationDbContext> _dbContextFactory;
25
    private readonly OrchestrationDbContextExtensions _dbContextExtensions;
26
    private readonly OrchestrationMessageMapper _orchestrationMessageMapper;
27
    private readonly ActivityMessageMapper _activityMessageMapper;
28
    private readonly InstanceMapper _instanceMapper;
29
    private readonly ExecutionMapper _executionMapper;
30
    private readonly ILogger<EFCoreOrchestrationService> _logger;
31

32
    private CancellationTokenSource _stopCts = new();
124✔
33

34
    public EFCoreOrchestrationService(
124✔
35
        IOptions<EFCoreOrchestrationOptions> options,
124✔
36
        IDbContextFactory<OrchestrationDbContext> dbContextFactory,
124✔
37
        OrchestrationDbContextExtensions dbContextExtensions,
124✔
38
        OrchestrationMessageMapper orchestrationMessageMapper,
124✔
39
        ActivityMessageMapper activityMessageMapper,
124✔
40
        InstanceMapper instanceMapper,
124✔
41
        ExecutionMapper executionMapper,
124✔
42
        ILogger<EFCoreOrchestrationService> logger)
124✔
43
    {
44
        _options = options.Value;
124✔
45
        _dbContextFactory = dbContextFactory;
124✔
46
        _dbContextExtensions = dbContextExtensions;
124✔
47
        _orchestrationMessageMapper = orchestrationMessageMapper;
124✔
48
        _activityMessageMapper = activityMessageMapper;
124✔
49
        _instanceMapper = instanceMapper;
124✔
50
        _executionMapper = executionMapper;
124✔
51
        _logger = logger;
124✔
52
    }
53

54
    public int TaskOrchestrationDispatcherCount => _options.TaskOrchestrationDispatcherCount;
102✔
55
    public int MaxConcurrentTaskOrchestrationWorkItems => _options.MaxConcurrentTaskOrchestrationWorkItems;
102✔
56
    public int MaxConcurrentTaskActivityWorkItems => _options.MaxConcurrentTaskActivityWorkItems;
110✔
57
    public BehaviorOnContinueAsNew EventBehaviourForContinueAsNew => BehaviorOnContinueAsNew.Carryover;
145✔
58
    public int TaskActivityDispatcherCount => _options.TaskActivityDispatcherCount;
78✔
59

60
    public Task CreateAsync()
61
    {
62
        return CreateAsync(false);
×
63
    }
64

65
    public async Task CreateAsync(bool recreateInstanceStore)
66
    {
67
        using var dbContext = _dbContextFactory.CreateDbContext();
×
68
        if (recreateInstanceStore)
×
69
            await dbContext.Database.EnsureDeletedAsync();
×
70

71
        await _dbContextExtensions.Migrate(dbContext);
×
72
    }
73

74
    public async Task CreateIfNotExistsAsync()
75
    {
76
        using var dbContext = _dbContextFactory.CreateDbContext();
114✔
77
        await _dbContextExtensions.Migrate(dbContext);
114✔
78
    }
79

80
    public Task DeleteAsync()
81
    {
82
        return DeleteAsync(false);
×
83
    }
84

85
    public async Task DeleteAsync(bool deleteInstanceStore)
86
    {
87
        using var dbContext = _dbContextFactory.CreateDbContext();
×
88
        await dbContext.Database.EnsureDeletedAsync();
×
89
    }
90

91
    public int GetDelayInSecondsAfterOnFetchException(Exception exception)
92
    {
93
        return _options.DelayInSecondsAfterFailure;
×
94
    }
95

96
    public int GetDelayInSecondsAfterOnProcessException(Exception exception)
97
    {
98
        return _options.DelayInSecondsAfterFailure;
×
99
    }
100

101
    public bool IsMaxMessageCountExceeded(int currentMessageCount, OrchestrationRuntimeState runtimeState)
102
    {
103
        return false;
259✔
104
    }
105

106
    public async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken)
107
    {
108
        return await LockNextTaskOrchestrationWorkItemAsync(receiveTimeout, null, cancellationToken);
×
109
    }
110

111
    public async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(
112
        TimeSpan receiveTimeout,
113
        INameVersionInfo[] orchestrations,
114
        CancellationToken cancellationToken)
115
    {
116
        var stoppableCancellationToken = CancellationTokenSource.CreateLinkedTokenSource(
230✔
117
            cancellationToken, _stopCts.Token).Token;
230✔
118

119
        return await BackoffPollingHelper.PollAsync(async () =>
230✔
120
        {
230✔
121
            using var dbContext = _dbContextFactory.CreateDbContext();
865✔
122
            var instance = await LockNextInstanceAsync(dbContext, orchestrations);
865✔
123

230✔
124
            if (instance == null)
865✔
125
                return null;
728✔
126

230✔
127
            var execution = await dbContext.Executions
367✔
128
                .Where(e => e.ExecutionId == instance.LastExecutionId)
367✔
129
                .Include(e => e.Events)
367✔
130
                .AsNoTracking()
367✔
131
                .SingleAsync();
367✔
132

230✔
133
            var deserializedEvents = execution.Events
367✔
134
                .OrderBy(e => e.SequenceNumber)
387✔
135
                .Select(e => _options.DataConverter.Deserialize<HistoryEvent>(e.Content))
387✔
136
                .ToArray();
367✔
137

230✔
138
            var reopenedEvents = deserializedEvents.Reopen(_options.DataConverter);
367✔
139

230✔
140
            var runtimeState = new OrchestrationRuntimeState(reopenedEvents);
367✔
141

230✔
142
            var session = new EFCoreOrchestrationSession(
367✔
143
                _options,
367✔
144
                _dbContextFactory,
367✔
145
                instance,
367✔
146
                execution,
367✔
147
                runtimeState,
367✔
148
                _stopCts.Token);
367✔
149

230✔
150
            var messages = await session.FetchNewMessagesAsync(dbContext);
367✔
151

230✔
152
            if (messages.Count == 0)
367✔
153
            {
230✔
154
                instance.LockId = null;
249✔
155
                instance.LockedUntil = DateTime.UtcNow;
249✔
156
                await dbContext.SaveChangesAsync();
249✔
157
                return null;
249✔
158
            }
230✔
159

230✔
160
            await dbContext.SaveChangesAsync();
348✔
161

230✔
162
            return new TaskOrchestrationWorkItem
348✔
163
            {
348✔
164
                InstanceId = instance.InstanceId,
348✔
165
                LockedUntilUtc = instance.LockedUntil,
348✔
166
                OrchestrationRuntimeState = runtimeState,
348✔
167
                NewMessages = messages,
348✔
168
                Session = session
348✔
169
            };
348✔
170
        },
230✔
171
        r => r != null,
865✔
172
        receiveTimeout,
230✔
173
        _options.PollingInterval,
230✔
174
        stoppableCancellationToken);
230✔
175
    }
176

177
    public async Task<TaskActivityWorkItem> LockNextTaskActivityWorkItem(TimeSpan receiveTimeout, CancellationToken cancellationToken)
178
    {
179
        return await LockNextTaskActivityWorkItem(receiveTimeout, null, cancellationToken);
×
180
    }
181

182
    public async Task<TaskActivityWorkItem> LockNextTaskActivityWorkItem(
183
        TimeSpan receiveTimeout,
184
        INameVersionInfo[] activities,
185
        CancellationToken cancellationToken)
186
    {
187
        var stoppableCancellationToken = CancellationTokenSource.CreateLinkedTokenSource(
194✔
188
            cancellationToken, _stopCts.Token).Token;
194✔
189

190
        return await BackoffPollingHelper.PollAsync(async () =>
194✔
191
        {
194✔
192
            using var dbContext = _dbContextFactory.CreateDbContext();
734✔
193
            var activityMessage = await LockActivityMessage(dbContext, activities);
734✔
194

194✔
195
            if (activityMessage == null)
734✔
196
                return null;
630✔
197

194✔
198
            return new TaskActivityWorkItem
298✔
199
            {
298✔
200
                Id = CreateTaskActivityWorkItemId(activityMessage.Id, activityMessage.LockId, activityMessage.ReplyQueue),
298✔
201
                TaskMessage = _options.DataConverter.Deserialize<TaskMessage>(activityMessage.Message),
298✔
202
                LockedUntilUtc = activityMessage.LockedUntil
298✔
203
            };
298✔
204
        },
194✔
205
        x => x != null,
734✔
206
        receiveTimeout,
194✔
207
        _options.PollingInterval,
194✔
208
        stoppableCancellationToken);
194✔
209
    }
210

211
    public async Task ReleaseTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem)
212
    {
213
        using var dbContext = _dbContextFactory.CreateDbContext();
116✔
214
        var session = workItem.Session as EFCoreOrchestrationSession;
116✔
215
        if (!session.Released)
116✔
216
        {
217
            dbContext.Instances.Attach(session.Instance);
116✔
218
            session.Instance.LockId = null;
116✔
219
            session.Instance.LockedUntil = DateTime.UtcNow;
116✔
220
            await dbContext.SaveChangesAsync();
116✔
221
            session.Released = true;
116✔
222
        }
223
    }
224

225
    public async Task AbandonTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem)
226
    {
227
        using var dbContext = _dbContextFactory.CreateDbContext();
×
228
        var session = workItem.Session as EFCoreOrchestrationSession;
×
229
        if (session.Released)
×
230
            throw new InvalidOperationException("Session was already released");
×
231

232
        dbContext.Instances.Attach(session.Instance);
×
233
        session.Instance.LockId = null;
×
234
        // TODO: Exponential backoff
235
        session.Instance.LockedUntil = DateTime.UtcNow.AddMinutes(1);
×
236
        await dbContext.SaveChangesAsync();
×
237
        session.Released = true;
×
238
    }
239

240
    public async Task RenewTaskOrchestrationWorkItemLockAsync(TaskOrchestrationWorkItem workItem)
241
    {
242
        using var dbContext = _dbContextFactory.CreateDbContext();
×
243
        var session = workItem.Session as EFCoreOrchestrationSession;
×
244

245
        var lockedUntilUTC = DateTime.UtcNow.Add(_options.OrchestrationLockTimeout);
×
246
        dbContext.Instances.Attach(session.Instance);
×
247
        session.Instance.LockedUntil = DateTime.UtcNow.Add(_options.OrchestrationLockTimeout);
×
248
        await dbContext.SaveChangesAsync();
×
249

250
        workItem.LockedUntilUtc = lockedUntilUTC;
×
251
    }
252

253
    public async Task AbandonTaskActivityWorkItemAsync(TaskActivityWorkItem workItem)
254
    {
255
        using var dbContext = _dbContextFactory.CreateDbContext();
×
256
        var (id, lockId, _) = ParseTaskActivityWorkItemId(workItem.Id);
×
257

258
        var activityMessage = await dbContext.ActivityMessages.FindAsync(id);
×
259

260
        if (activityMessage.LockId != lockId)
×
261
            throw new Exception("Lost task activity lock");
×
262

263
        activityMessage.LockId = null;
×
264
        activityMessage.LockedUntil = DateTime.UtcNow;
×
265
        await dbContext.SaveChangesAsync();
×
266
    }
267

268
    public async Task<TaskActivityWorkItem> RenewTaskActivityWorkItemLockAsync(TaskActivityWorkItem workItem)
269
    {
270
        using var dbContext = _dbContextFactory.CreateDbContext();
×
271
        var (id, lockId, _) = ParseTaskActivityWorkItemId(workItem.Id);
×
272

273
        var activityMessage = await dbContext.ActivityMessages.FindAsync(id);
×
274

275
        if (activityMessage.LockId != lockId)
×
276
            throw new Exception("Lost task activity lock");
×
277

278
        var lockedUntilUTC = DateTime.UtcNow.Add(_options.ActivtyLockTimeout);
×
279
        activityMessage.LockedUntil = lockedUntilUTC;
×
280
        await dbContext.SaveChangesAsync();
×
281

282
        workItem.LockedUntilUtc = lockedUntilUTC;
×
283

284
        return workItem;
×
285
    }
286

287
    public Task StartAsync()
288
    {
289
        if (_stopCts.IsCancellationRequested)
114✔
290
            _stopCts = new CancellationTokenSource();
×
291
        return Task.CompletedTask;
114✔
292
    }
293

294
    public Task StopAsync()
295
    {
296
        return StopAsync(false);
24✔
297
    }
298

299
    public Task StopAsync(bool isForced)
300
    {
301
        _stopCts.Cancel();
126✔
302
        return Task.CompletedTask;
126✔
303
    }
304

305
    public async Task CompleteTaskOrchestrationWorkItemAsync(
306
        TaskOrchestrationWorkItem workItem,
307
        OrchestrationRuntimeState newOrchestrationRuntimeState,
308
        IList<TaskMessage> outboundMessages,
309
        IList<TaskMessage> orchestratorMessages,
310
        IList<TaskMessage> timerMessages,
311
        TaskMessage continuedAsNewMessage,
312
        OrchestrationState orchestrationState)
313
    {
314
        using var dbContext = _dbContextFactory.CreateDbContext();
202✔
315
        var session = workItem.Session as EFCoreOrchestrationSession;
202✔
316

317
        await _dbContextExtensions.WithinTransaction(dbContext, async () =>
202✔
318
        {
202✔
319
            dbContext.Instances.Attach(session.Instance);
404✔
320

202✔
321
            // Create child orchestrations
202✔
322
            foreach (var executionStartedEvent in orchestratorMessages.Select(m => m.Event).OfType<ExecutionStartedEvent>())
670✔
323
            {
202✔
324
                var childInstance = _instanceMapper.CreateInstance(executionStartedEvent);
217✔
325
                await dbContext.Instances.AddAsync(childInstance);
217✔
326

202✔
327
                var childRuntimeState = new OrchestrationRuntimeState(new[] { executionStartedEvent });
217✔
328
                var childExecution = _executionMapper.CreateExecution(childRuntimeState);
217✔
329
                await dbContext.Executions.AddAsync(childExecution);
217✔
330
            }
202✔
331

202✔
332
            var orchestrationQueueName = QueueMapper.ToQueue(orchestrationState.Name, orchestrationState.Version);
404✔
333

202✔
334
            var knownQueues = new Dictionary<string, string>
404✔
335
            {
404✔
336
                [orchestrationState.OrchestrationInstance.InstanceId] = orchestrationQueueName
404✔
337
            };
404✔
338

202✔
339
            if (orchestrationState.ParentInstance != null)
404✔
340
                knownQueues[orchestrationState.ParentInstance.OrchestrationInstance.InstanceId] = QueueMapper.ToQueue(orchestrationState.ParentInstance.Name, orchestrationState.ParentInstance.Version);
217✔
341

202✔
342
            // Write messages
202✔
343
            var activityMessages = outboundMessages
404✔
344
                .Select(m => _activityMessageMapper.CreateActivityMessage(m, orchestrationQueueName))
508✔
345
                .ToArray();
404✔
346

202✔
347
            await dbContext.ActivityMessages.AddRangeAsync(activityMessages);
404✔
348

202✔
349
            var allOrchestrationMessages = orchestratorMessages
404✔
350
                .Concat(timerMessages)
404✔
351
                .Concat(continuedAsNewMessage != null ? new[] { continuedAsNewMessage } : Enumerable.Empty<TaskMessage>())
404✔
352
                .ToArray();
404✔
353

202✔
354
            await SendTaskOrchestrationMessagesAsync(dbContext, allOrchestrationMessages, knownQueues);
404✔
355

202✔
356
            // Remove executed messages
202✔
357
            dbContext.AttachRange(session.Messages);
404✔
358
            dbContext.OrchestrationMessages.RemoveRange(session.Messages);
404✔
359

202✔
360
            // Update instance
202✔
361
            _instanceMapper.UpdateInstance(session.Instance, newOrchestrationRuntimeState);
404✔
362

202✔
363
            // Update current execution
202✔
364
            EnrichNewEventsInput(workItem.OrchestrationRuntimeState, outboundMessages, orchestratorMessages);
404✔
365
            session.Execution = await SaveExecutionAsync(dbContext, workItem.OrchestrationRuntimeState, session.Execution);
404✔
366

202✔
367
            // Update new execution
202✔
368
            if (newOrchestrationRuntimeState != workItem.OrchestrationRuntimeState)
404✔
369
            {
202✔
370
                EnrichNewEventsInput(newOrchestrationRuntimeState, outboundMessages, orchestratorMessages);
237✔
371
                session.Execution = await SaveExecutionAsync(dbContext, newOrchestrationRuntimeState);
237✔
372
            }
202✔
373

202✔
374
            await dbContext.SaveChangesAsync();
404✔
375
        });
202✔
376

377
        session.RuntimeState = newOrchestrationRuntimeState;
201✔
378
        session.ClearMessages();
201✔
379
    }
380

381
    public async Task CompleteTaskActivityWorkItemAsync(TaskActivityWorkItem workItem, TaskMessage responseMessage)
382
    {
383
        using var dbContext = _dbContextFactory.CreateDbContext();
104✔
384
        await _dbContextExtensions.WithinTransaction(dbContext, async () =>
104✔
385
        {
104✔
386
            var (id, lockId, orchestrationQueue) = ParseTaskActivityWorkItemId(workItem.Id);
208✔
387

104✔
388
            var activityMessage = await dbContext.ActivityMessages
208✔
389
                .FirstAsync(w => w.Id == id && w.LockId == lockId);
208✔
390

104✔
391
            dbContext.ActivityMessages.Remove(activityMessage);
208✔
392

104✔
393
            var knownQueues = new Dictionary<string, string>
208✔
394
            {
208✔
395
                [workItem.TaskMessage.OrchestrationInstance.InstanceId] = orchestrationQueue
208✔
396
            };
208✔
397

104✔
398
            await SendTaskOrchestrationMessagesAsync(dbContext, new[] { responseMessage }, knownQueues);
208✔
399

104✔
400
            await dbContext.SaveChangesAsync();
208✔
401
        });
104✔
402
    }
403

404
    private async Task SendTaskOrchestrationMessagesAsync(
405
        OrchestrationDbContext dbContext,
406
        TaskMessage[] messages,
407
        IDictionary<string, string> knownQueues = null)
408
    {
409
        var instancesGroups = messages
518✔
410
            .GroupBy(m => m.OrchestrationInstance.InstanceId)
881✔
411
            .ToArray();
518✔
412

413
        foreach (var instanceGroup in instancesGroups)
1,762✔
414
        {
415
            if (knownQueues == null || !knownQueues.TryGetValue(instanceGroup.Key, out var queue))
363✔
416
            {
417
                var instance = await dbContext.Instances.FindAsync(instanceGroup.Key);
45✔
418

419
                if (instance == null)
45✔
420
                    throw new Exception($"Instance {instanceGroup.Key} not found");
×
421

422
                queue = instance.LastQueue;
45✔
423
            }
424

425
            var orchestrationMessages = instanceGroup
363✔
426
                .Select((m, i) => _orchestrationMessageMapper.CreateOrchestrationMessageAsync(m, i, queue))
726✔
427
                .ToArray();
363✔
428

429
            await dbContext.OrchestrationMessages.AddRangeAsync(orchestrationMessages);
363✔
430
        }
431
    }
432

433
    private async Task<Execution> SaveExecutionAsync(
434
        OrchestrationDbContext dbContext,
435
        OrchestrationRuntimeState runtimeState,
436
        Execution existingExecution = null)
437
    {
438
        Execution execution;
439

440
        if (existingExecution == null)
237✔
441
        {
442
            execution = _executionMapper.CreateExecution(runtimeState);
35✔
443
            await dbContext.Executions.AddAsync(execution);
35✔
444
        }
445
        else
446
        {
447
            execution = existingExecution;
202✔
448
            dbContext.Executions.Attach(execution);
202✔
449
            dbContext.Events.AttachRange(execution.Events);
202✔
450
            _executionMapper.UpdateExecution(execution, runtimeState);
202✔
451
        }
452

453
        var orderedEvents = execution.Events.OrderBy(e => e.SequenceNumber).ToArray();
830✔
454

455
        var sequenceNumber = 0;
237✔
456
        while (sequenceNumber < runtimeState.Events.Count && sequenceNumber < orderedEvents.Length)
830✔
457
        {
458
            var historyEvent = runtimeState.Events[sequenceNumber];
593✔
459
            var @event = orderedEvents[sequenceNumber];
593✔
460
            _executionMapper.UpdateEvent(@event, historyEvent);
593✔
461
            sequenceNumber++;
593✔
462
        }
463
        while (sequenceNumber < runtimeState.Events.Count)
1,304✔
464
        {
465
            var historyEvent = runtimeState.Events[sequenceNumber];
1,067✔
466
            var @event = _executionMapper.CreateEvent(runtimeState.OrchestrationInstance, sequenceNumber, historyEvent);
1,067✔
467
            execution.Events.Add(@event);
1,067✔
468
            dbContext.Events.Add(@event);
1,067✔
469
            sequenceNumber++;
1,067✔
470
        }
471
        while (sequenceNumber < runtimeState.Events.Count)
237✔
472
        {
473
            var @event = orderedEvents[sequenceNumber];
×
474
            execution.Events.Remove(@event);
×
475
            dbContext.Events.Remove(@event);
×
476
            sequenceNumber++;
×
477
        }
478

479
        return execution;
237✔
480
    }
481

482
    private async Task<Instance> LockNextInstanceAsync(OrchestrationDbContext dbContext, INameVersionInfo[] orchestrations)
483
    {
484
        if (orchestrations == null)
635✔
485
            return await _dbContextExtensions.WithinTransaction(dbContext,
×
486
                () => _dbContextExtensions.TryLockNextInstanceAsync(dbContext, _options.OrchestrationLockTimeout));
×
487

488
        var queues = orchestrations
635✔
489
            .Select(QueueMapper.ToQueue)
635✔
490
            .ToArray();
635✔
491

492
        var instance = await _dbContextExtensions.WithinTransaction(dbContext,
635✔
493
            () => _dbContextExtensions.TryLockNextInstanceAsync(dbContext, queues, _options.OrchestrationLockTimeout));
1,270✔
494
        if (instance != null)
635✔
495
            return instance;
137✔
496

497
        return null;
498✔
498
    }
499

500
    private async Task<ActivityMessage> LockActivityMessage(OrchestrationDbContext dbContext, INameVersionInfo[] activities)
501
    {
502
        var lockId = Guid.NewGuid().ToString();
540✔
503
        var lockUntilUtc = DateTime.UtcNow.Add(_options.OrchestrationLockTimeout);
540✔
504

505
        if (activities == null)
540✔
506
            return await _dbContextExtensions.WithinTransaction(dbContext,
×
507
                () => _dbContextExtensions.TryLockNextActivityMessageAsync(dbContext, _options.OrchestrationLockTimeout));
×
508

509
        var queues = activities
540✔
510
            .Select(QueueMapper.ToQueue)
540✔
511
            .ToArray();
540✔
512

513
        var activityMessage = await _dbContextExtensions.WithinTransaction(dbContext,
540✔
514
            () => _dbContextExtensions.TryLockNextActivityMessageAsync(dbContext, queues, _options.OrchestrationLockTimeout));
1,080✔
515
        if (activityMessage != null)
540✔
516
            return activityMessage;
104✔
517

518
        return null;
436✔
519
    }
520

521
    private static string CreateTaskActivityWorkItemId(Guid id, string lockId, string replyQueue)
522
    {
523
        return $"{id}|{lockId}|{replyQueue}";
104✔
524
    }
525

526
    private static (Guid id, string lockId, string replyQueue) ParseTaskActivityWorkItemId(string id)
527
    {
528
        var parts = id.Split('|');
104✔
529
        return (Guid.Parse(parts[0]), parts[1], parts[2]);
104✔
530
    }
531

532
    private static void EnrichNewEventsInput(OrchestrationRuntimeState orchestrationRuntimeState, IList<TaskMessage> outboundMessages, IList<TaskMessage> orchestratorMessages)
533
    {
534
        foreach (var outboundEvent in outboundMessages.Select(e => e.Event))
846✔
535
        {
536
            switch (outboundEvent)
537
            {
538
                case TaskScheduledEvent outboundTaskScheduledEvent:
539
                    foreach (var taskScheduledEvent in orchestrationRuntimeState.NewEvents.OfType<TaskScheduledEvent>())
1,376✔
540
                    {
541
                        if (taskScheduledEvent.EventId == outboundTaskScheduledEvent.EventId)
564✔
542
                        {
543
                            taskScheduledEvent.Input = outboundTaskScheduledEvent.Input;
104✔
544
                        }
545
                    }
546
                    break;
547
            }
548
        }
549
        foreach (var orchestrationEvent in orchestratorMessages.Select(e => e.Event))
576✔
550
        {
551
            switch (orchestrationEvent)
552
            {
553
                case ExecutionStartedEvent executionStartedEvent:
554
                    foreach (var subOrchestrationCreatedEvent in orchestrationRuntimeState.NewEvents.OfType<SubOrchestrationInstanceCreatedEvent>())
80✔
555
                    {
556
                        if (subOrchestrationCreatedEvent.InstanceId == executionStartedEvent.OrchestrationInstance.InstanceId)
25✔
557
                        {
558
                            subOrchestrationCreatedEvent.Input = executionStartedEvent.Input;
15✔
559
                        }
560
                    }
561
                    break;
562
            }
563
        }
564
    }
565
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc