• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lucaslorentz / durabletask-extensions / 19714769363

26 Nov 2025 07:09PM UTC coverage: 78.714%. Remained the same
19714769363

push

github

web-flow
Merge 986b8b97b into a154947f6

43 of 50 new or added lines in 4 files covered. (86.0%)

39 existing lines in 2 files now uncovered.

2289 of 2908 relevant lines covered (78.71%)

120.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.51
/src/LLL.DurableTask.EFCore/EFCoreOrchestrationSession.cs
1
using System;
2
using System.Collections.Generic;
3
using System.Linq;
4
using System.Threading;
5
using System.Threading.Tasks;
6
using DurableTask.Core;
7
using DurableTask.Core.History;
8
using LLL.DurableTask.EFCore.Entities;
9
using LLL.DurableTask.EFCore.Extensions;
10
using LLL.DurableTask.EFCore.Polling;
11
using Microsoft.EntityFrameworkCore;
12

13
namespace LLL.DurableTask.EFCore;
14

15
public class EFCoreOrchestrationSession : IOrchestrationSession
16
{
17
    private readonly EFCoreOrchestrationOptions _options;
18

19
    private readonly IDbContextFactory<OrchestrationDbContext> _dbContextFactory;
20
    private readonly CancellationToken _stopCancellationToken;
21

22
    public EFCoreOrchestrationSession(
105✔
23
        EFCoreOrchestrationOptions options,
105✔
24
        IDbContextFactory<OrchestrationDbContext> dbContextFactory,
105✔
25
        Instance instance,
105✔
26
        Execution execution,
105✔
27
        OrchestrationRuntimeState runtimeState,
105✔
28
        CancellationToken stopCancellationToken)
105✔
29
    {
30
        _options = options;
105✔
31
        _dbContextFactory = dbContextFactory;
105✔
32
        Instance = instance;
105✔
33
        Execution = execution;
105✔
34
        RuntimeState = runtimeState;
105✔
35
        _stopCancellationToken = stopCancellationToken;
105✔
36
    }
37

38
    public Instance Instance { get; }
1,576✔
39
    public Execution Execution { get; set; }
481✔
40
    public OrchestrationRuntimeState RuntimeState { get; set; }
1,578✔
41
    public List<OrchestrationMessage> Messages { get; } = new List<OrchestrationMessage>();
1,319✔
42

43
    public bool Released { get; set; }
202✔
44

45
    public async Task<IList<TaskMessage>> FetchNewOrchestrationMessagesAsync(
46
        TaskOrchestrationWorkItem workItem)
47
    {
48
        return await BackoffPollingHelper.PollAsync(async () =>
73✔
49
        {
73✔
50
            using var dbContext = _dbContextFactory.CreateDbContext();
314✔
51
            var messages = await FetchNewMessagesAsync(dbContext);
314✔
52
            await dbContext.SaveChangesAsync();
314✔
53
            return messages;
314✔
54
        },
73✔
55
        x => x is null || x.Count > 0,
314✔
56
        _options.FetchNewMessagesPollingTimeout,
73✔
57
        _options.PollingInterval,
73✔
58
        _stopCancellationToken);
73✔
59
    }
60

61
    public async Task<IList<TaskMessage>> FetchNewMessagesAsync(
62
        OrchestrationDbContext dbContext,
63
        CancellationToken cancellationToken = default)
64
    {
65
        var newDbMessages = await dbContext.OrchestrationMessages
346✔
66
            .Where(w => w.AvailableAt <= DateTime.UtcNow
346✔
67
                && w.InstanceId == Instance.InstanceId
346✔
68
                && w.Instance.LockId == Instance.LockId // Ensure we still own the lock
346✔
69
                && !Messages.Contains(w))
346✔
70
            .OrderBy(w => w.AvailableAt)
346✔
71
            .ThenBy(w => w.SequenceNumber)
346✔
72
            .AsNoTracking()
346✔
73
            .ToListAsync(cancellationToken);
346✔
74

75
        var deserializedMessages = newDbMessages
346✔
76
            .Select(w => _options.DataConverter.Deserialize<TaskMessage>(w.Message))
586✔
77
            .ToList();
346✔
78

79
        if (RuntimeState.ExecutionStartedEvent is not null
346✔
80
            && RuntimeState.OrchestrationStatus is OrchestrationStatus.Completed
346✔
81
            && deserializedMessages.Any(m => m.Event.EventType == EventType.EventRaised))
350✔
82
        {
83
            // Reopen completed orchestrations after receiving an event raised
84
            RuntimeState = new OrchestrationRuntimeState(
4✔
85
                RuntimeState.Events.Reopen(_options.DataConverter)
4✔
86
            );
4✔
87
        }
88

89
        var isRunning = RuntimeState.ExecutionStartedEvent is null
346✔
90
            || RuntimeState.OrchestrationStatus is OrchestrationStatus.Running
346✔
91
                or OrchestrationStatus.Suspended
346✔
92
                or OrchestrationStatus.Pending;
346✔
93

94
        for (var i = newDbMessages.Count - 1; i >= 0; i--)
1,172✔
95
        {
96
            var dbMessage = newDbMessages[i];
240✔
97
            var deserializedMessage = deserializedMessages[i];
240✔
98

99
            if (ShouldDropNewMessage(isRunning, dbMessage, deserializedMessage))
240✔
100
            {
101
                dbContext.OrchestrationMessages.Attach(dbMessage);
4✔
102
                dbContext.OrchestrationMessages.Remove(dbMessage);
4✔
103
                newDbMessages.RemoveAt(i);
4✔
104
                deserializedMessages.RemoveAt(i);
4✔
105
            }
106
        }
107

108
        Messages.AddRange(newDbMessages);
346✔
109

110
        return deserializedMessages;
346✔
111
    }
112

113
    private bool ShouldDropNewMessage(
114
        bool isRunning,
115
        OrchestrationMessage dbMessage,
116
        TaskMessage taskMessage)
117
    {
118
        // Drop messages to previous executions
119
        if (dbMessage.ExecutionId is not null && dbMessage.ExecutionId != Instance.LastExecutionId)
240✔
NEW
120
            return true;
×
121

122
        // When not running, drop anything that is not execution rewound
123
        if (!isRunning && taskMessage.Event.EventType != EventType.ExecutionRewound)
240✔
124
            return true;
4✔
125

126
        return false;
236✔
127
    }
128

129
    public void ClearMessages()
130
    {
131
        Messages.Clear();
174✔
132
    }
133
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc