• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In
Build has been canceled!

lucaslorentz / durabletask-extensions / 19715028186

26 Nov 2025 07:18PM UTC coverage: 78.714%. First build
19715028186

push

github

web-flow
Merge pull request #53 from lucaslorentz/support-dtfx-rewind

Add support for DTFx native rewind

43 of 50 new or added lines in 4 files covered. (86.0%)

2289 of 2908 relevant lines covered (78.71%)

120.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.51
/src/LLL.DurableTask.EFCore/EFCoreOrchestrationSession.cs
1
using System;
2
using System.Collections.Generic;
3
using System.Linq;
4
using System.Threading;
5
using System.Threading.Tasks;
6
using DurableTask.Core;
7
using DurableTask.Core.History;
8
using LLL.DurableTask.EFCore.Entities;
9
using LLL.DurableTask.EFCore.Extensions;
10
using LLL.DurableTask.EFCore.Polling;
11
using Microsoft.EntityFrameworkCore;
12

13
namespace LLL.DurableTask.EFCore;
14

15
public class EFCoreOrchestrationSession : IOrchestrationSession
16
{
17
    private readonly EFCoreOrchestrationOptions _options;
18

19
    private readonly IDbContextFactory<OrchestrationDbContext> _dbContextFactory;
20
    private readonly CancellationToken _stopCancellationToken;
21

22
    public EFCoreOrchestrationSession(
106✔
23
        EFCoreOrchestrationOptions options,
106✔
24
        IDbContextFactory<OrchestrationDbContext> dbContextFactory,
106✔
25
        Instance instance,
106✔
26
        Execution execution,
106✔
27
        OrchestrationRuntimeState runtimeState,
106✔
28
        CancellationToken stopCancellationToken)
106✔
29
    {
30
        _options = options;
106✔
31
        _dbContextFactory = dbContextFactory;
106✔
32
        Instance = instance;
106✔
33
        Execution = execution;
106✔
34
        RuntimeState = runtimeState;
106✔
35
        _stopCancellationToken = stopCancellationToken;
106✔
36
    }
37

38
    public Instance Instance { get; }
1,584✔
39
    public Execution Execution { get; set; }
482✔
40
    public OrchestrationRuntimeState RuntimeState { get; set; }
1,593✔
41
    public List<OrchestrationMessage> Messages { get; } = new List<OrchestrationMessage>();
1,328✔
42

43
    public bool Released { get; set; }
202✔
44

45
    public async Task<IList<TaskMessage>> FetchNewOrchestrationMessagesAsync(
46
        TaskOrchestrationWorkItem workItem)
47
    {
48
        return await BackoffPollingHelper.PollAsync(async () =>
73✔
49
        {
73✔
50
            using var dbContext = _dbContextFactory.CreateDbContext();
317✔
51
            var messages = await FetchNewMessagesAsync(dbContext);
317✔
52
            await dbContext.SaveChangesAsync();
317✔
53
            return messages;
317✔
54
        },
73✔
55
        x => x is null || x.Count > 0,
317✔
56
        _options.FetchNewMessagesPollingTimeout,
73✔
57
        _options.PollingInterval,
73✔
58
        _stopCancellationToken);
73✔
59
    }
60

61
    public async Task<IList<TaskMessage>> FetchNewMessagesAsync(
62
        OrchestrationDbContext dbContext,
63
        CancellationToken cancellationToken = default)
64
    {
65
        var newDbMessages = await dbContext.OrchestrationMessages
350✔
66
            .Where(w => w.AvailableAt <= DateTime.UtcNow
350✔
67
                && w.InstanceId == Instance.InstanceId
350✔
68
                && w.Instance.LockId == Instance.LockId // Ensure we still own the lock
350✔
69
                && !Messages.Contains(w))
350✔
70
            .OrderBy(w => w.AvailableAt)
350✔
71
            .ThenBy(w => w.SequenceNumber)
350✔
72
            .AsNoTracking()
350✔
73
            .ToListAsync(cancellationToken);
350✔
74

75
        var deserializedMessages = newDbMessages
350✔
76
            .Select(w => _options.DataConverter.Deserialize<TaskMessage>(w.Message))
590✔
77
            .ToList();
350✔
78

79
        if (RuntimeState.ExecutionStartedEvent is not null
350✔
80
            && RuntimeState.OrchestrationStatus is OrchestrationStatus.Completed
350✔
81
            && deserializedMessages.Any(m => m.Event.EventType == EventType.EventRaised))
354✔
82
        {
83
            // Reopen completed orchestrations after receiving an event raised
84
            RuntimeState = new OrchestrationRuntimeState(
4✔
85
                RuntimeState.Events.Reopen(_options.DataConverter)
4✔
86
            );
4✔
87
        }
88

89
        var isRunning = RuntimeState.ExecutionStartedEvent is null
350✔
90
            || RuntimeState.OrchestrationStatus is OrchestrationStatus.Running
350✔
91
                or OrchestrationStatus.Suspended
350✔
92
                or OrchestrationStatus.Pending;
350✔
93

94
        for (var i = newDbMessages.Count - 1; i >= 0; i--)
1,180✔
95
        {
96
            var dbMessage = newDbMessages[i];
240✔
97
            var deserializedMessage = deserializedMessages[i];
240✔
98

99
            if (ShouldDropNewMessage(isRunning, dbMessage, deserializedMessage))
240✔
100
            {
101
                dbContext.OrchestrationMessages.Attach(dbMessage);
4✔
102
                dbContext.OrchestrationMessages.Remove(dbMessage);
4✔
103
                newDbMessages.RemoveAt(i);
4✔
104
                deserializedMessages.RemoveAt(i);
4✔
105
            }
106
        }
107

108
        Messages.AddRange(newDbMessages);
350✔
109

110
        return deserializedMessages;
350✔
111
    }
112

113
    private bool ShouldDropNewMessage(
114
        bool isRunning,
115
        OrchestrationMessage dbMessage,
116
        TaskMessage taskMessage)
117
    {
118
        // Drop messages to previous executions
119
        if (dbMessage.ExecutionId is not null && dbMessage.ExecutionId != Instance.LastExecutionId)
240✔
NEW
120
            return true;
×
121

122
        // When not running, drop anything that is not execution rewound
123
        if (!isRunning && taskMessage.Event.EventType != EventType.ExecutionRewound)
240✔
124
            return true;
4✔
125

126
        return false;
236✔
127
    }
128

129
    public void ClearMessages()
130
    {
131
        Messages.Clear();
174✔
132
    }
133
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc