• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lucaslorentz / durabletask-extensions / 5835581151

pending completion
5835581151

Pull #27

github

lucaslorentz
Add ExtendedOrchestrationContext to simplify workers API
Pull Request #27: Add ExtendedOrchestrationContext to simplify workers API

71 of 71 new or added lines in 5 files covered. (100.0%)

2290 of 2798 relevant lines covered (81.84%)

142.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.78
/src/LLL.DurableTask.Worker/Services/WorkerOrchestrationService.cs
1
using System;
2
using System.Collections.Concurrent;
3
using System.Collections.Generic;
4
using System.Linq;
5
using System.Threading;
6
using System.Threading.Tasks;
7
using DurableTask.Core;
8
using LLL.DurableTask.Core;
9
using Microsoft.Extensions.DependencyInjection;
10

11
namespace LLL.DurableTask.Worker.Services
12
{
13
    public class WorkerOrchestrationService : IOrchestrationService
14
    {
15
        private readonly IOrchestrationService _innerOrchestrationService;
16
        private readonly IDistributedOrchestrationService _innerDistributedOrchestrationService;
17
        private readonly IServiceScopeFactory _serviceScopeFactory;
18
        private readonly INameVersionInfo[] _orchestrations;
19
        private readonly INameVersionInfo[] _activities;
20
        private readonly bool _hasAllOrchestrations;
21
        private readonly bool _hasAllActivities;
22

23
        public static ConcurrentDictionary<string, IServiceScope> OrchestrationsServiceScopes { get; } = new ConcurrentDictionary<string, IServiceScope>();
485✔
24

25
        public int TaskOrchestrationDispatcherCount => _orchestrations.Length == 0
114✔
26
            ? 0
114✔
27
            : _innerOrchestrationService.TaskOrchestrationDispatcherCount;
114✔
28
        public int TaskActivityDispatcherCount => _activities.Length == 0
114✔
29
            ? 0
114✔
30
            : _innerOrchestrationService.TaskActivityDispatcherCount;
114✔
31

32
        public WorkerOrchestrationService(
114✔
33
            IOrchestrationService innerOrchestrationService,
114✔
34
            IDistributedOrchestrationService innerDistributedOrchestrationService,
114✔
35
            IServiceScopeFactory serviceScopeFactory,
114✔
36
            IEnumerable<ObjectCreator<TaskOrchestration>> orchestrations,
114✔
37
            IEnumerable<ObjectCreator<TaskActivity>> activities,
114✔
38
            bool hasAllOrchestrations,
114✔
39
            bool hasAllActivities)
114✔
40
        {
41
            _innerOrchestrationService = innerOrchestrationService;
114✔
42
            _innerDistributedOrchestrationService = innerDistributedOrchestrationService;
114✔
43
            _serviceScopeFactory = serviceScopeFactory;
114✔
44
            _orchestrations = orchestrations.OfType<INameVersionInfo>().ToArray();
114✔
45
            _activities = activities.OfType<INameVersionInfo>().ToArray();
114✔
46
            _hasAllOrchestrations = hasAllOrchestrations;
114✔
47
            _hasAllActivities = hasAllActivities;
114✔
48
        }
49

50
        public async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken)
51
        {
52
            var workItem = await (_hasAllOrchestrations
229✔
53
                ? _innerOrchestrationService
229✔
54
                    .LockNextTaskOrchestrationWorkItemAsync(receiveTimeout, cancellationToken)
229✔
55
                : (_innerDistributedOrchestrationService ?? throw DistributedWorkersNotSupported())
229✔
56
                    .LockNextTaskOrchestrationWorkItemAsync(receiveTimeout, _orchestrations, cancellationToken)
229✔
57
            );
229✔
58

59
            if (workItem != null)
116✔
60
            {
61
                OrchestrationsServiceScopes.TryAdd(workItem.InstanceId, _serviceScopeFactory.CreateScope());
116✔
62
            }
63

64
            return workItem;
116✔
65
        }
66

67
        public Task AbandonTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem)
68
        {
69
            if (OrchestrationsServiceScopes.TryRemove(workItem.InstanceId, out var serviceScope))
×
70
                serviceScope.Dispose();
×
71

72
            return _innerOrchestrationService.AbandonTaskOrchestrationWorkItemAsync(workItem);
×
73
        }
74

75
        public Task ReleaseTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem)
76
        {
77
            if (OrchestrationsServiceScopes.TryRemove(workItem.InstanceId, out var serviceScope))
116✔
78
                serviceScope.Dispose();
116✔
79

80
            return _innerOrchestrationService.ReleaseTaskOrchestrationWorkItemAsync(workItem);
116✔
81
        }
82

83
        public int MaxConcurrentTaskOrchestrationWorkItems => _innerOrchestrationService.MaxConcurrentTaskOrchestrationWorkItems;
114✔
84

85
        public BehaviorOnContinueAsNew EventBehaviourForContinueAsNew => _innerOrchestrationService.EventBehaviourForContinueAsNew;
170✔
86

87
        public int MaxConcurrentTaskActivityWorkItems => _innerOrchestrationService.MaxConcurrentTaskActivityWorkItems;
114✔
88

89
        public Task AbandonTaskActivityWorkItemAsync(TaskActivityWorkItem workItem)
90
        {
91
            return _innerOrchestrationService.AbandonTaskActivityWorkItemAsync(workItem);
×
92
        }
93

94
        public Task CompleteTaskActivityWorkItemAsync(TaskActivityWorkItem workItem, TaskMessage responseMessage)
95
        {
96
            return _innerOrchestrationService.CompleteTaskActivityWorkItemAsync(workItem, responseMessage);
104✔
97
        }
98

99
        public Task CompleteTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem, OrchestrationRuntimeState newOrchestrationRuntimeState, IList<TaskMessage> outboundMessages, IList<TaskMessage> orchestratorMessages, IList<TaskMessage> timerMessages, TaskMessage continuedAsNewMessage, OrchestrationState orchestrationState)
100
        {
101
            return _innerOrchestrationService.CompleteTaskOrchestrationWorkItemAsync(workItem, newOrchestrationRuntimeState, outboundMessages, orchestratorMessages, timerMessages, continuedAsNewMessage, orchestrationState);
197✔
102
        }
103

104
        public Task CreateAsync()
105
        {
106
            return _innerOrchestrationService.CreateAsync();
×
107
        }
108

109
        public Task CreateAsync(bool recreateInstanceStore)
110
        {
111
            return _innerOrchestrationService.CreateAsync(recreateInstanceStore);
×
112
        }
113

114
        public Task CreateIfNotExistsAsync()
115
        {
116
            return _innerOrchestrationService.CreateIfNotExistsAsync();
×
117
        }
118

119
        public Task DeleteAsync()
120
        {
121
            return _innerOrchestrationService.DeleteAsync();
×
122
        }
123

124
        public Task DeleteAsync(bool deleteInstanceStore)
125
        {
126
            return _innerOrchestrationService.DeleteAsync(deleteInstanceStore);
×
127
        }
128

129
        public int GetDelayInSecondsAfterOnFetchException(Exception exception)
130
        {
131
            return _innerOrchestrationService.GetDelayInSecondsAfterOnFetchException(exception);
×
132
        }
133

134
        public int GetDelayInSecondsAfterOnProcessException(Exception exception)
135
        {
136
            return _innerOrchestrationService.GetDelayInSecondsAfterOnProcessException(exception);
×
137
        }
138

139
        public bool IsMaxMessageCountExceeded(int currentMessageCount, OrchestrationRuntimeState runtimeState)
140
        {
141
            return _innerOrchestrationService.IsMaxMessageCountExceeded(currentMessageCount, runtimeState);
306✔
142
        }
143

144
        public async Task<TaskActivityWorkItem> LockNextTaskActivityWorkItem(TimeSpan receiveTimeout, CancellationToken cancellationToken)
145
        {
146
            return await (_hasAllActivities
194✔
147
                ? _innerOrchestrationService
194✔
148
                    .LockNextTaskActivityWorkItem(receiveTimeout, cancellationToken)
194✔
149
                : (_innerDistributedOrchestrationService ?? throw DistributedWorkersNotSupported())
194✔
150
                    .LockNextTaskActivityWorkItem(receiveTimeout, _activities, cancellationToken)
194✔
151
            );
194✔
152
        }
153

154
        public Task<TaskActivityWorkItem> RenewTaskActivityWorkItemLockAsync(TaskActivityWorkItem workItem)
155
        {
156
            return _innerOrchestrationService.RenewTaskActivityWorkItemLockAsync(workItem);
×
157
        }
158

159
        public Task RenewTaskOrchestrationWorkItemLockAsync(TaskOrchestrationWorkItem workItem)
160
        {
161
            return _innerOrchestrationService.RenewTaskOrchestrationWorkItemLockAsync(workItem);
55✔
162
        }
163

164
        public Task StartAsync()
165
        {
166
            return _innerOrchestrationService.StartAsync();
114✔
167
        }
168

169
        public Task StopAsync()
170
        {
171
            return _innerOrchestrationService.StopAsync();
×
172
        }
173

174
        public Task StopAsync(bool isForced)
175
        {
176
            return _innerOrchestrationService.StopAsync(isForced);
114✔
177
        }
178

179
        private Exception DistributedWorkersNotSupported()
180
        {
181
            return new NotSupportedException("Distributed workers is not supported by storage implementation");
×
182
        }
183
    }
184
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc