• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lucaslorentz / durabletask-extensions / 11488091084

23 Oct 2024 09:02PM UTC coverage: 81.967% (-0.4%) from 82.351%
11488091084

push

github

lucaslorentz
Restore workload during build

2350 of 2867 relevant lines covered (81.97%)

138.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.78
/src/LLL.DurableTask.Worker/Services/WorkerOrchestrationService.cs
1
using System;
2
using System.Collections.Concurrent;
3
using System.Collections.Generic;
4
using System.Linq;
5
using System.Threading;
6
using System.Threading.Tasks;
7
using DurableTask.Core;
8
using LLL.DurableTask.Core;
9
using Microsoft.Extensions.DependencyInjection;
10

11
namespace LLL.DurableTask.Worker.Services;
12

13
public class WorkerOrchestrationService : IOrchestrationService
14
{
15
    private readonly IOrchestrationService _innerOrchestrationService;
16
    private readonly IDistributedOrchestrationService _innerDistributedOrchestrationService;
17
    private readonly IServiceScopeFactory _serviceScopeFactory;
18
    private readonly INameVersionInfo[] _orchestrations;
19
    private readonly INameVersionInfo[] _activities;
20
    private readonly bool _hasAllOrchestrations;
21
    private readonly bool _hasAllActivities;
22

23
    public static ConcurrentDictionary<string, IServiceScope> OrchestrationsServiceScopes { get; } = new ConcurrentDictionary<string, IServiceScope>();
477✔
24

25
    public int TaskOrchestrationDispatcherCount => _orchestrations.Length == 0
119✔
26
        ? 0
119✔
27
        : _innerOrchestrationService.TaskOrchestrationDispatcherCount;
119✔
28
    public int TaskActivityDispatcherCount => _activities.Length == 0
119✔
29
        ? 0
119✔
30
        : _innerOrchestrationService.TaskActivityDispatcherCount;
119✔
31

32
    public WorkerOrchestrationService(
119✔
33
        IOrchestrationService innerOrchestrationService,
119✔
34
        IDistributedOrchestrationService innerDistributedOrchestrationService,
119✔
35
        IServiceScopeFactory serviceScopeFactory,
119✔
36
        IEnumerable<ObjectCreator<TaskOrchestration>> orchestrations,
119✔
37
        IEnumerable<ObjectCreator<TaskActivity>> activities,
119✔
38
        bool hasAllOrchestrations,
119✔
39
        bool hasAllActivities)
119✔
40
    {
41
        _innerOrchestrationService = innerOrchestrationService;
119✔
42
        _innerDistributedOrchestrationService = innerDistributedOrchestrationService;
119✔
43
        _serviceScopeFactory = serviceScopeFactory;
119✔
44
        _orchestrations = orchestrations.OfType<INameVersionInfo>().ToArray();
119✔
45
        _activities = activities.OfType<INameVersionInfo>().ToArray();
119✔
46
        _hasAllOrchestrations = hasAllOrchestrations;
119✔
47
        _hasAllActivities = hasAllActivities;
119✔
48
    }
49

50
    public async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken)
51
    {
52
        var workItem = await (_hasAllOrchestrations
232✔
53
            ? _innerOrchestrationService
232✔
54
                .LockNextTaskOrchestrationWorkItemAsync(receiveTimeout, cancellationToken)
232✔
55
            : (_innerDistributedOrchestrationService ?? throw DistributedWorkersNotSupported())
232✔
56
                .LockNextTaskOrchestrationWorkItemAsync(receiveTimeout, _orchestrations, cancellationToken)
232✔
57
        );
232✔
58

59
        if (workItem != null)
113✔
60
        {
61
            OrchestrationsServiceScopes.TryAdd(workItem.InstanceId, _serviceScopeFactory.CreateScope());
113✔
62
        }
63

64
        return workItem;
113✔
65
    }
66

67
    public Task AbandonTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem)
68
    {
69
        if (OrchestrationsServiceScopes.TryRemove(workItem.InstanceId, out var serviceScope))
×
70
            serviceScope.Dispose();
×
71

72
        return _innerOrchestrationService.AbandonTaskOrchestrationWorkItemAsync(workItem);
×
73
    }
74

75
    public Task ReleaseTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem)
76
    {
77
        if (OrchestrationsServiceScopes.TryRemove(workItem.InstanceId, out var serviceScope))
113✔
78
            serviceScope.Dispose();
113✔
79

80
        return _innerOrchestrationService.ReleaseTaskOrchestrationWorkItemAsync(workItem);
113✔
81
    }
82

83
    public int MaxConcurrentTaskOrchestrationWorkItems => _innerOrchestrationService.MaxConcurrentTaskOrchestrationWorkItems;
119✔
84

85
    public BehaviorOnContinueAsNew EventBehaviourForContinueAsNew => _innerOrchestrationService.EventBehaviourForContinueAsNew;
168✔
86

87
    public int MaxConcurrentTaskActivityWorkItems => _innerOrchestrationService.MaxConcurrentTaskActivityWorkItems;
119✔
88

89
    public Task AbandonTaskActivityWorkItemAsync(TaskActivityWorkItem workItem)
90
    {
91
        return _innerOrchestrationService.AbandonTaskActivityWorkItemAsync(workItem);
×
92
    }
93

94
    public Task CompleteTaskActivityWorkItemAsync(TaskActivityWorkItem workItem, TaskMessage responseMessage)
95
    {
96
        return _innerOrchestrationService.CompleteTaskActivityWorkItemAsync(workItem, responseMessage);
104✔
97
    }
98

99
    public Task CompleteTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem, OrchestrationRuntimeState newOrchestrationRuntimeState, IList<TaskMessage> outboundMessages, IList<TaskMessage> orchestratorMessages, IList<TaskMessage> timerMessages, TaskMessage continuedAsNewMessage, OrchestrationState orchestrationState)
100
    {
101
        return _innerOrchestrationService.CompleteTaskOrchestrationWorkItemAsync(workItem, newOrchestrationRuntimeState, outboundMessages, orchestratorMessages, timerMessages, continuedAsNewMessage, orchestrationState);
195✔
102
    }
103

104
    public Task CreateAsync()
105
    {
106
        return _innerOrchestrationService.CreateAsync();
×
107
    }
108

109
    public Task CreateAsync(bool recreateInstanceStore)
110
    {
111
        return _innerOrchestrationService.CreateAsync(recreateInstanceStore);
×
112
    }
113

114
    public Task CreateIfNotExistsAsync()
115
    {
116
        return _innerOrchestrationService.CreateIfNotExistsAsync();
×
117
    }
118

119
    public Task DeleteAsync()
120
    {
121
        return _innerOrchestrationService.DeleteAsync();
×
122
    }
123

124
    public Task DeleteAsync(bool deleteInstanceStore)
125
    {
126
        return _innerOrchestrationService.DeleteAsync(deleteInstanceStore);
×
127
    }
128

129
    public int GetDelayInSecondsAfterOnFetchException(Exception exception)
130
    {
131
        return _innerOrchestrationService.GetDelayInSecondsAfterOnFetchException(exception);
3✔
132
    }
133

134
    public int GetDelayInSecondsAfterOnProcessException(Exception exception)
135
    {
136
        return _innerOrchestrationService.GetDelayInSecondsAfterOnProcessException(exception);
×
137
    }
138

139
    public bool IsMaxMessageCountExceeded(int currentMessageCount, OrchestrationRuntimeState runtimeState)
140
    {
141
        return _innerOrchestrationService.IsMaxMessageCountExceeded(currentMessageCount, runtimeState);
304✔
142
    }
143

144
    public async Task<TaskActivityWorkItem> LockNextTaskActivityWorkItem(TimeSpan receiveTimeout, CancellationToken cancellationToken)
145
    {
146
        return await (_hasAllActivities
199✔
147
            ? _innerOrchestrationService
199✔
148
                .LockNextTaskActivityWorkItem(receiveTimeout, cancellationToken)
199✔
149
            : (_innerDistributedOrchestrationService ?? throw DistributedWorkersNotSupported())
199✔
150
                .LockNextTaskActivityWorkItem(receiveTimeout, _activities, cancellationToken)
199✔
151
        );
199✔
152
    }
153

154
    public Task<TaskActivityWorkItem> RenewTaskActivityWorkItemLockAsync(TaskActivityWorkItem workItem)
155
    {
156
        return _innerOrchestrationService.RenewTaskActivityWorkItemLockAsync(workItem);
×
157
    }
158

159
    public Task RenewTaskOrchestrationWorkItemLockAsync(TaskOrchestrationWorkItem workItem)
160
    {
161
        return _innerOrchestrationService.RenewTaskOrchestrationWorkItemLockAsync(workItem);
×
162
    }
163

164
    public Task StartAsync()
165
    {
166
        return _innerOrchestrationService.StartAsync();
119✔
167
    }
168

169
    public Task StopAsync()
170
    {
171
        return _innerOrchestrationService.StopAsync();
×
172
    }
173

174
    public Task StopAsync(bool isForced)
175
    {
176
        return _innerOrchestrationService.StopAsync(isForced);
119✔
177
    }
178

179
    private Exception DistributedWorkersNotSupported()
180
    {
181
        return new NotSupportedException("Distributed workers is not supported by storage implementation");
×
182
    }
183
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc