• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lucaslorentz / durabletask-extensions / 19714733464

26 Nov 2025 07:08PM UTC coverage: 78.714%. Remained the same
19714733464

push

github

web-flow
Merge 7ae27b4d0 into a154947f6

43 of 50 new or added lines in 4 files covered. (86.0%)

35 existing lines in 2 files now uncovered.

2289 of 2908 relevant lines covered (78.71%)

120.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.51
/src/LLL.DurableTask.EFCore/EFCoreOrchestrationSession.cs
1
using System;
2
using System.Collections.Generic;
3
using System.Linq;
4
using System.Threading;
5
using System.Threading.Tasks;
6
using DurableTask.Core;
7
using DurableTask.Core.History;
8
using LLL.DurableTask.EFCore.Entities;
9
using LLL.DurableTask.EFCore.Extensions;
10
using LLL.DurableTask.EFCore.Polling;
11
using Microsoft.EntityFrameworkCore;
12

13
namespace LLL.DurableTask.EFCore;
14

15
public class EFCoreOrchestrationSession : IOrchestrationSession
16
{
17
    private readonly EFCoreOrchestrationOptions _options;
18

19
    private readonly IDbContextFactory<OrchestrationDbContext> _dbContextFactory;
20
    private readonly CancellationToken _stopCancellationToken;
21

22
    public EFCoreOrchestrationSession(
104✔
23
        EFCoreOrchestrationOptions options,
104✔
24
        IDbContextFactory<OrchestrationDbContext> dbContextFactory,
104✔
25
        Instance instance,
104✔
26
        Execution execution,
104✔
27
        OrchestrationRuntimeState runtimeState,
104✔
28
        CancellationToken stopCancellationToken)
104✔
29
    {
30
        _options = options;
104✔
31
        _dbContextFactory = dbContextFactory;
104✔
32
        Instance = instance;
104✔
33
        Execution = execution;
104✔
34
        RuntimeState = runtimeState;
104✔
35
        _stopCancellationToken = stopCancellationToken;
104✔
36
    }
37

38
    public Instance Instance { get; }
1,568✔
39
    public Execution Execution { get; set; }
478✔
40
    public OrchestrationRuntimeState RuntimeState { get; set; }
1,573✔
41
    public List<OrchestrationMessage> Messages { get; } = new List<OrchestrationMessage>();
1,313✔
42

43
    public bool Released { get; set; }
200✔
44

45
    public async Task<IList<TaskMessage>> FetchNewOrchestrationMessagesAsync(
46
        TaskOrchestrationWorkItem workItem)
47
    {
48
        return await BackoffPollingHelper.PollAsync(async () =>
73✔
49
        {
73✔
50
            using var dbContext = _dbContextFactory.CreateDbContext();
314✔
51
            var messages = await FetchNewMessagesAsync(dbContext);
314✔
52
            await dbContext.SaveChangesAsync();
314✔
53
            return messages;
314✔
54
        },
73✔
55
        x => x is null || x.Count > 0,
314✔
56
        _options.FetchNewMessagesPollingTimeout,
73✔
57
        _options.PollingInterval,
73✔
58
        _stopCancellationToken);
73✔
59
    }
60

61
    public async Task<IList<TaskMessage>> FetchNewMessagesAsync(
62
        OrchestrationDbContext dbContext,
63
        CancellationToken cancellationToken = default)
64
    {
65
        var newDbMessages = await dbContext.OrchestrationMessages
345✔
66
            .Where(w => w.AvailableAt <= DateTime.UtcNow
345✔
67
                && w.InstanceId == Instance.InstanceId
345✔
68
                && w.Instance.LockId == Instance.LockId // Ensure we still own the lock
345✔
69
                && !Messages.Contains(w))
345✔
70
            .OrderBy(w => w.AvailableAt)
345✔
71
            .ThenBy(w => w.SequenceNumber)
345✔
72
            .AsNoTracking()
345✔
73
            .ToListAsync(cancellationToken);
345✔
74

75
        var deserializedMessages = newDbMessages
345✔
76
            .Select(w => _options.DataConverter.Deserialize<TaskMessage>(w.Message))
584✔
77
            .ToList();
345✔
78

79
        if (RuntimeState.ExecutionStartedEvent is not null
345✔
80
            && RuntimeState.OrchestrationStatus is OrchestrationStatus.Completed
345✔
81
            && deserializedMessages.Any(m => m.Event.EventType == EventType.EventRaised))
349✔
82
        {
83
            // Reopen completed orchestrations after receiving an event raised
84
            RuntimeState = new OrchestrationRuntimeState(
4✔
85
                RuntimeState.Events.Reopen(_options.DataConverter)
4✔
86
            );
4✔
87
        }
88

89
        var isRunning = RuntimeState.ExecutionStartedEvent is null
345✔
90
            || RuntimeState.OrchestrationStatus is OrchestrationStatus.Running
345✔
91
                or OrchestrationStatus.Suspended
345✔
92
                or OrchestrationStatus.Pending;
345✔
93

94
        for (var i = newDbMessages.Count - 1; i >= 0; i--)
1,168✔
95
        {
96
            var dbMessage = newDbMessages[i];
239✔
97
            var deserializedMessage = deserializedMessages[i];
239✔
98

99
            if (ShouldDropNewMessage(isRunning, dbMessage, deserializedMessage))
239✔
100
            {
101
                dbContext.OrchestrationMessages.Attach(dbMessage);
4✔
102
                dbContext.OrchestrationMessages.Remove(dbMessage);
4✔
103
                newDbMessages.RemoveAt(i);
4✔
104
                deserializedMessages.RemoveAt(i);
4✔
105
            }
106
        }
107

108
        Messages.AddRange(newDbMessages);
345✔
109

110
        return deserializedMessages;
345✔
111
    }
112

113
    private bool ShouldDropNewMessage(
114
        bool isRunning,
115
        OrchestrationMessage dbMessage,
116
        TaskMessage taskMessage)
117
    {
118
        // Drop messages to previous executions
119
        if (dbMessage.ExecutionId is not null && dbMessage.ExecutionId != Instance.LastExecutionId)
239✔
NEW
120
            return true;
×
121

122
        // When not running, drop anything that is not execution rewound
123
        if (!isRunning && taskMessage.Event.EventType != EventType.ExecutionRewound)
239✔
124
            return true;
4✔
125

126
        return false;
235✔
127
    }
128

129
    public void ClearMessages()
130
    {
131
        Messages.Clear();
173✔
132
    }
133
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc