• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lucaslorentz / durabletask-extensions / 4432709013

pending completion
4432709013

push

github

Add support for registering activity methods from interfaces

21 of 21 new or added lines in 1 file covered. (100.0%)

2283 of 2775 relevant lines covered (82.27%)

143.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.37
/src/LLL.DurableTask.Worker/Services/WorkerOrchestrationService.cs
1
using System;
2
using System.Collections.Concurrent;
3
using System.Collections.Generic;
4
using System.Linq;
5
using System.Threading;
6
using System.Threading.Tasks;
7
using DurableTask.Core;
8
using LLL.DurableTask.Core;
9
using Microsoft.Extensions.DependencyInjection;
10

11
namespace LLL.DurableTask.Worker.Services
12
{
13
    public class WorkerOrchestrationService : IOrchestrationService
14
    {
15
        private readonly IOrchestrationService _innerOrchestrationService;
16
        private readonly IDistributedOrchestrationService _innerDistributedOrchestrationService;
17
        private readonly IServiceScopeFactory _serviceScopeFactory;
18
        private readonly INameVersionInfo[] _orchestrations;
19
        private readonly INameVersionInfo[] _activities;
20
        private readonly bool _hasAllOrchestrations;
21
        private readonly bool _hasAllActivities;
22

23
        public static ConcurrentDictionary<string, IServiceScope> OrchestrationsServiceScopes { get; } = new ConcurrentDictionary<string, IServiceScope>();
482✔
24

25
        public int TaskOrchestrationDispatcherCount => _orchestrations.Length == 0
113✔
26
            ? 0
113✔
27
            : _innerOrchestrationService.TaskOrchestrationDispatcherCount;
113✔
28
        public int TaskActivityDispatcherCount => _activities.Length == 0
113✔
29
            ? 0
113✔
30
            : _innerOrchestrationService.TaskActivityDispatcherCount;
113✔
31

32
        public WorkerOrchestrationService(
113✔
33
            IOrchestrationService innerOrchestrationService,
113✔
34
            IDistributedOrchestrationService innerDistributedOrchestrationService,
113✔
35
            IServiceScopeFactory serviceScopeFactory,
113✔
36
            IEnumerable<ObjectCreator<TaskOrchestration>> orchestrations,
113✔
37
            IEnumerable<ObjectCreator<TaskActivity>> activities,
113✔
38
            bool hasAllOrchestrations,
113✔
39
            bool hasAllActivities)
113✔
40
        {
41
            _innerOrchestrationService = innerOrchestrationService;
113✔
42
            _innerDistributedOrchestrationService = innerDistributedOrchestrationService;
113✔
43
            _serviceScopeFactory = serviceScopeFactory;
113✔
44
            _orchestrations = orchestrations.OfType<INameVersionInfo>().ToArray();
113✔
45
            _activities = activities.OfType<INameVersionInfo>().ToArray();
113✔
46
            _hasAllOrchestrations = hasAllOrchestrations;
113✔
47
            _hasAllActivities = hasAllActivities;
113✔
48
        }
49

50
        public async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken)
51
        {
52
            var workItem = await (_hasAllOrchestrations
226✔
53
                ? _innerOrchestrationService
226✔
54
                    .LockNextTaskOrchestrationWorkItemAsync(receiveTimeout, cancellationToken)
226✔
55
                : (_innerDistributedOrchestrationService ?? throw DistributedWorkersNotSupported())
226✔
56
                    .LockNextTaskOrchestrationWorkItemAsync(receiveTimeout, _orchestrations, cancellationToken)
226✔
57
            );
226✔
58

59
            if (workItem != null)
115✔
60
            {
61
                OrchestrationsServiceScopes.TryAdd(workItem.InstanceId, _serviceScopeFactory.CreateScope());
115✔
62
            }
63

64
            return workItem;
115✔
65
        }
66

67
        public Task AbandonTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem)
68
        {
69
            if (OrchestrationsServiceScopes.TryRemove(workItem.InstanceId, out var serviceScope))
×
70
                serviceScope.Dispose();
×
71

72
            return _innerOrchestrationService.AbandonTaskOrchestrationWorkItemAsync(workItem);
×
73
        }
74

75
        public Task ReleaseTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem)
76
        {
77
            if (OrchestrationsServiceScopes.TryRemove(workItem.InstanceId, out var serviceScope))
115✔
78
                serviceScope.Dispose();
115✔
79

80
            return _innerOrchestrationService.ReleaseTaskOrchestrationWorkItemAsync(workItem);
115✔
81
        }
82

83
        public int MaxConcurrentTaskOrchestrationWorkItems => _innerOrchestrationService.MaxConcurrentTaskOrchestrationWorkItems;
113✔
84

85
        public BehaviorOnContinueAsNew EventBehaviourForContinueAsNew => _innerOrchestrationService.EventBehaviourForContinueAsNew;
168✔
86

87
        public int MaxConcurrentTaskActivityWorkItems => _innerOrchestrationService.MaxConcurrentTaskActivityWorkItems;
113✔
88

89
        public Task AbandonTaskActivityWorkItemAsync(TaskActivityWorkItem workItem)
90
        {
91
            return _innerOrchestrationService.AbandonTaskActivityWorkItemAsync(workItem);
×
92
        }
93

94
        public Task CompleteTaskActivityWorkItemAsync(TaskActivityWorkItem workItem, TaskMessage responseMessage)
95
        {
96
            return _innerOrchestrationService.CompleteTaskActivityWorkItemAsync(workItem, responseMessage);
104✔
97
        }
98

99
        public Task CompleteTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem, OrchestrationRuntimeState newOrchestrationRuntimeState, IList<TaskMessage> outboundMessages, IList<TaskMessage> orchestratorMessages, IList<TaskMessage> timerMessages, TaskMessage continuedAsNewMessage, OrchestrationState orchestrationState)
100
        {
101
            return _innerOrchestrationService.CompleteTaskOrchestrationWorkItemAsync(workItem, newOrchestrationRuntimeState, outboundMessages, orchestratorMessages, timerMessages, continuedAsNewMessage, orchestrationState);
196✔
102
        }
103

104
        public Task CreateAsync()
105
        {
106
            return _innerOrchestrationService.CreateAsync();
×
107
        }
108

109
        public Task CreateAsync(bool recreateInstanceStore)
110
        {
111
            return _innerOrchestrationService.CreateAsync(recreateInstanceStore);
×
112
        }
113

114
        public Task CreateIfNotExistsAsync()
115
        {
116
            return _innerOrchestrationService.CreateIfNotExistsAsync();
×
117
        }
118

119
        public Task DeleteAsync()
120
        {
121
            return _innerOrchestrationService.DeleteAsync();
×
122
        }
123

124
        public Task DeleteAsync(bool deleteInstanceStore)
125
        {
126
            return _innerOrchestrationService.DeleteAsync(deleteInstanceStore);
×
127
        }
128

129
        public int GetDelayInSecondsAfterOnFetchException(Exception exception)
130
        {
131
            return _innerOrchestrationService.GetDelayInSecondsAfterOnFetchException(exception);
2✔
132
        }
133

134
        public int GetDelayInSecondsAfterOnProcessException(Exception exception)
135
        {
136
            return _innerOrchestrationService.GetDelayInSecondsAfterOnProcessException(exception);
×
137
        }
138

139
        public bool IsMaxMessageCountExceeded(int currentMessageCount, OrchestrationRuntimeState runtimeState)
140
        {
141
            return _innerOrchestrationService.IsMaxMessageCountExceeded(currentMessageCount, runtimeState);
303✔
142
        }
143

144
        public async Task<TaskActivityWorkItem> LockNextTaskActivityWorkItem(TimeSpan receiveTimeout, CancellationToken cancellationToken)
145
        {
146
            return await (_hasAllActivities
194✔
147
                ? _innerOrchestrationService
194✔
148
                    .LockNextTaskActivityWorkItem(receiveTimeout, cancellationToken)
194✔
149
                : (_innerDistributedOrchestrationService ?? throw DistributedWorkersNotSupported())
194✔
150
                    .LockNextTaskActivityWorkItem(receiveTimeout, _activities, cancellationToken)
194✔
151
            );
194✔
152
        }
153

154
        public Task<TaskActivityWorkItem> RenewTaskActivityWorkItemLockAsync(TaskActivityWorkItem workItem)
155
        {
156
            return _innerOrchestrationService.RenewTaskActivityWorkItemLockAsync(workItem);
×
157
        }
158

159
        public Task RenewTaskOrchestrationWorkItemLockAsync(TaskOrchestrationWorkItem workItem)
160
        {
161
            return _innerOrchestrationService.RenewTaskOrchestrationWorkItemLockAsync(workItem);
55✔
162
        }
163

164
        public Task StartAsync()
165
        {
166
            return _innerOrchestrationService.StartAsync();
113✔
167
        }
168

169
        public Task StopAsync()
170
        {
171
            return _innerOrchestrationService.StopAsync();
×
172
        }
173

174
        public Task StopAsync(bool isForced)
175
        {
176
            return _innerOrchestrationService.StopAsync(isForced);
113✔
177
        }
178

179
        private Exception DistributedWorkersNotSupported()
180
        {
181
            return new NotSupportedException("Distributed workers is not supported by storage implementation");
×
182
        }
183
    }
184
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc