• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lucaslorentz / durabletask-extensions / 5835581151

pending completion
5835581151

Pull #27

github

lucaslorentz
Add ExtendedOrchestrationContext to simplify workers API
Pull Request #27: Add ExtendedOrchestrationContext to simplify workers API

71 of 71 new or added lines in 5 files covered. (100.0%)

2290 of 2798 relevant lines covered (81.84%)

142.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.05
/src/LLL.DurableTask.Server.Grpc.Client/GrpcClientOrchestrationService.cs
1
using System;
2
using System.Collections.Generic;
3
using System.Linq;
4
using System.Threading;
5
using System.Threading.Tasks;
6
using DurableTask.Core;
7
using DurableTask.Core.History;
8
using DurableTaskGrpc;
9
using Google.Protobuf.WellKnownTypes;
10
using LLL.DurableTask.Core;
11
using Microsoft.Extensions.Logging;
12
using Microsoft.Extensions.Options;
13
using static DurableTaskGrpc.OrchestrationService;
14
using TaskActivityWorkItem = DurableTask.Core.TaskActivityWorkItem;
15
using TaskOrchestrationWorkItem = DurableTask.Core.TaskOrchestrationWorkItem;
16

17
namespace LLL.DurableTask.Server.Client
18
{
19
    public partial class GrpcClientOrchestrationService :
20
        IOrchestrationService,
21
        IDistributedOrchestrationService
22
    {
23
        private readonly GrpcClientOrchestrationServiceOptions _options;
24
        private readonly OrchestrationServiceClient _client;
25
        private readonly ILogger _logger;
26

27
        public int TaskOrchestrationDispatcherCount => _options.TaskOrchestrationDispatcherCount;
12✔
28
        public int MaxConcurrentTaskOrchestrationWorkItems => _options.MaxConcurrentTaskOrchestrationWorkItems;
12✔
29
        public int MaxConcurrentTaskActivityWorkItems => _options.MaxConcurrentTaskActivityWorkItems;
14✔
30
        public BehaviorOnContinueAsNew EventBehaviourForContinueAsNew => BehaviorOnContinueAsNew.Carryover;
25✔
31
        public int TaskActivityDispatcherCount => _options.TaskActivityDispatcherCount;
12✔
32

33
        public GrpcClientOrchestrationService(
12✔
34
            IOptions<GrpcClientOrchestrationServiceOptions> options,
12✔
35
            OrchestrationServiceClient client,
12✔
36
            ILogger<GrpcClientOrchestrationSession> logger)
12✔
37
        {
38
            _options = options.Value;
12✔
39
            _client = client;
12✔
40
            _logger = logger;
12✔
41
        }
42

43
        #region Setup
44
        public Task CreateAsync()
45
        {
46
            return CreateAsync(false);
×
47
        }
48

49
        public Task CreateAsync(bool recreateInstanceStore)
50
        {
51
            return Task.CompletedTask;
×
52
        }
53

54
        public Task CreateIfNotExistsAsync()
55
        {
56
            return Task.CompletedTask;
12✔
57
        }
58

59
        public Task DeleteAsync()
60
        {
61
            return DeleteAsync(false);
×
62
        }
63

64
        public Task DeleteAsync(bool deleteInstanceStore)
65
        {
66
            return Task.CompletedTask;
×
67
        }
68

69
        public int GetDelayInSecondsAfterOnFetchException(Exception exception)
70
        {
71
            return _options.DelayInSecondsAfterFailure;
×
72
        }
73

74
        public int GetDelayInSecondsAfterOnProcessException(Exception exception)
75
        {
76
            return _options.DelayInSecondsAfterFailure;
×
77
        }
78

79

80
        public bool IsMaxMessageCountExceeded(int currentMessageCount, OrchestrationRuntimeState runtimeState)
81
        {
82
            return false;
47✔
83
        }
84

85
        public Task StartAsync()
86
        {
87
            return Task.CompletedTask;
12✔
88
        }
89

90
        public Task StopAsync()
91
        {
92
            return Task.CompletedTask;
×
93
        }
94

95
        public Task StopAsync(bool isForced)
96
        {
97
            return Task.CompletedTask;
12✔
98
        }
99
        #endregion
100

101
        #region Orchestration
102
        public Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(
103
            TimeSpan receiveTimeout, CancellationToken cancellationToken)
104
        {
105
            return LockNextTaskOrchestrationWorkItemAsync(
×
106
                receiveTimeout,
×
107
                new INameVersionInfo[0],
×
108
                true,
×
109
                cancellationToken
×
110
            );
×
111
        }
112

113
        public Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(TimeSpan receiveTimeout, INameVersionInfo[] orchestrations, CancellationToken cancellationToken)
114
        {
115
            return LockNextTaskOrchestrationWorkItemAsync(
26✔
116
                receiveTimeout,
26✔
117
                orchestrations,
26✔
118
                false,
26✔
119
                cancellationToken
26✔
120
            );
26✔
121
        }
122

123
        private async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(
124
            TimeSpan receiveTimeout,
125
            INameVersionInfo[] orchestrations,
126
            bool allOrchesrations,
127
            CancellationToken cancellationToken)
128
        {
129
            var stream = _client.LockNextTaskOrchestrationWorkItem(cancellationToken: cancellationToken);
26✔
130

131
            try
132
            {
133
                var request = new TaskOrchestrationRequest
26✔
134
                {
26✔
135
                    LockRequest = new LockNextTaskOrchestrationWorkItemRequest
26✔
136
                    {
26✔
137
                        ReceiveTimeout = Duration.FromTimeSpan(receiveTimeout),
26✔
138
                        Orchestrations = {
26✔
139
                            orchestrations
26✔
140
                                .Select(nv => new NameVersion { Name = nv.Name, Version = nv.Version })
234✔
141
                        },
26✔
142
                        AllOrchestrations = allOrchesrations
26✔
143
                    }
26✔
144
                };
26✔
145

146
                await stream.RequestStream.WriteAsync(request);
26✔
147

148
                if (!await stream.ResponseStream.MoveNext(cancellationToken))
26✔
149
                    throw new Exception("Session aborted");
×
150

151
                if (stream.ResponseStream.Current.MessageCase != TaskOrchestrationResponse.MessageOneofCase.LockResponse)
14✔
152
                    throw new Exception("Didn't receive lock response");
×
153

154
                var lockResponse = stream.ResponseStream.Current.LockResponse;
14✔
155

156
                if (lockResponse.WorkItem == null)
14✔
157
                    return null;
×
158

159
                return new TaskOrchestrationWorkItem
14✔
160
                {
14✔
161
                    InstanceId = lockResponse.WorkItem.InstanceId,
14✔
162
                    OrchestrationRuntimeState = new OrchestrationRuntimeState(
14✔
163
                        lockResponse.WorkItem.Events
14✔
164
                            .Select(e => _options.DataConverter.Deserialize<HistoryEvent>(e))
18✔
165
                            .ToArray()),
14✔
166
                    LockedUntilUtc = lockResponse.WorkItem.LockedUntilUtc.ToDateTime(),
14✔
167
                    NewMessages = lockResponse.WorkItem.NewMessages.Select(m => _options.DataConverter.Deserialize<TaskMessage>(m)).ToArray(),
29✔
168
                    Session = new GrpcClientOrchestrationSession(_options, stream, _logger)
14✔
169
                };
14✔
170
            }
171
            catch
12✔
172
            {
173
                stream.Dispose();
12✔
174
                throw;
12✔
175
            }
176
        }
177

178
        public async Task RenewTaskOrchestrationWorkItemLockAsync(TaskOrchestrationWorkItem workItem)
179
        {
180
            await (workItem.Session as GrpcClientOrchestrationSession).Renew(workItem);
11✔
181
        }
182

183
        public async Task CompleteTaskOrchestrationWorkItemAsync(
184
            TaskOrchestrationWorkItem workItem,
185
            OrchestrationRuntimeState newOrchestrationRuntimeState,
186
            IList<TaskMessage> outboundMessages,
187
            IList<TaskMessage> orchestratorMessages,
188
            IList<TaskMessage> timerMessages,
189
            TaskMessage continuedAsNewMessage,
190
            OrchestrationState orchestrationState)
191
        {
192
            await (workItem.Session as GrpcClientOrchestrationSession).Complete(
25✔
193
                workItem,
25✔
194
                newOrchestrationRuntimeState,
25✔
195
                outboundMessages,
25✔
196
                orchestratorMessages,
25✔
197
                timerMessages,
25✔
198
                continuedAsNewMessage,
25✔
199
                orchestrationState);
25✔
200
        }
201

202
        public async Task ReleaseTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem)
203
        {
204
            await (workItem.Session as GrpcClientOrchestrationSession).Release(workItem);
14✔
205
        }
206

207
        public async Task AbandonTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem)
208
        {
209
            await (workItem.Session as GrpcClientOrchestrationSession).Abandon(workItem);
×
210
        }
211

212
        #endregion
213

214
        #region Activity
215
        public Task<TaskActivityWorkItem> LockNextTaskActivityWorkItem(TimeSpan receiveTimeout, CancellationToken cancellationToken)
216
        {
217
            return LockNextTaskActivityWorkItem(
×
218
                receiveTimeout,
×
219
                new INameVersionInfo[0],
×
220
                true,
×
221
                cancellationToken);
×
222
        }
223

224
        public Task<TaskActivityWorkItem> LockNextTaskActivityWorkItem(TimeSpan receiveTimeout, INameVersionInfo[] activities, CancellationToken cancellationToken)
225
        {
226
            return LockNextTaskActivityWorkItem(
30✔
227
                receiveTimeout,
30✔
228
                activities,
30✔
229
                false,
30✔
230
                cancellationToken);
30✔
231
        }
232

233
        private async Task<TaskActivityWorkItem> LockNextTaskActivityWorkItem(
234
            TimeSpan receiveTimeout,
235
            INameVersionInfo[] activities,
236
            bool allActivities,
237
            CancellationToken cancellationToken)
238
        {
239
            var request = new LockNextTaskActivityWorkItemRequest
30✔
240
            {
30✔
241
                ReceiveTimeout = Duration.FromTimeSpan(receiveTimeout),
30✔
242
                Activities = {
30✔
243
                    activities
30✔
244
                        .Select(nv => new NameVersion { Name = nv.Name, Version = nv.Version })
120✔
245
                },
30✔
246
                AllActivities = allActivities
30✔
247
            };
30✔
248

249
            var response = await _client.LockNextTaskActivityWorkItemAsync(request, cancellationToken: cancellationToken);
30✔
250

251
            if (response.WorkItem == null)
18✔
252
                return null;
×
253

254
            return ToDurableTaskWorkItem(response.WorkItem);
18✔
255
        }
256

257
        public async Task<TaskActivityWorkItem> RenewTaskActivityWorkItemLockAsync(TaskActivityWorkItem workItem)
258
        {
259
            var request = new RenewTaskActivityWorkItemLockRequest
×
260
            {
×
261
                WorkItem = ToGrpcWorkItem(workItem)
×
262
            };
×
263

264
            var response = await _client.RenewTaskActivityWorkItemLockAsync(request);
×
265

266
            return ToDurableTaskWorkItem(response.WorkItem);
×
267
        }
268

269
        public async Task CompleteTaskActivityWorkItemAsync(
270
            TaskActivityWorkItem workItem,
271
            TaskMessage responseMessage)
272
        {
273
            var request = new CompleteTaskActivityWorkItemRequest
18✔
274
            {
18✔
275
                WorkItem = ToGrpcWorkItem(workItem),
18✔
276
                ResponseMessage = _options.DataConverter.Serialize(responseMessage)
18✔
277
            };
18✔
278

279
            await _client.CompleteTaskActivityWorkItemAsync(request);
18✔
280
        }
281

282
        public async Task AbandonTaskActivityWorkItemAsync(TaskActivityWorkItem workItem)
283
        {
284
            var request = new AbandonTaskActivityWorkItemRequest
×
285
            {
×
286
                WorkItem = ToGrpcWorkItem(workItem)
×
287
            };
×
288

289
            await _client.AbandonTaskActivityWorkItemAsync(request);
×
290
        }
291

292
        private DurableTaskGrpc.TaskActivityWorkItem ToGrpcWorkItem(TaskActivityWorkItem workItem)
293
        {
294
            return new DurableTaskGrpc.TaskActivityWorkItem
18✔
295
            {
18✔
296
                Id = workItem.Id,
18✔
297
                LockedUntilUtc = ToTimestamp(workItem.LockedUntilUtc),
18✔
298
                TaskMessage = _options.DataConverter.Serialize(workItem.TaskMessage)
18✔
299
            };
18✔
300
        }
301

302
        private TaskActivityWorkItem ToDurableTaskWorkItem(DurableTaskGrpc.TaskActivityWorkItem grpcWorkItem)
303
        {
304
            return new TaskActivityWorkItem
18✔
305
            {
18✔
306
                Id = grpcWorkItem.Id,
18✔
307
                LockedUntilUtc = grpcWorkItem.LockedUntilUtc.ToDateTime(),
18✔
308
                TaskMessage = _options.DataConverter.Deserialize<TaskMessage>(grpcWorkItem.TaskMessage)
18✔
309
            };
18✔
310
        }
311

312
        #endregion
313

314
        private Timestamp ToTimestamp(DateTime? dateTime)
315
        {
316
            if (dateTime == null)
4✔
317
                return null;
4✔
318

319
            return ToTimestamp(dateTime.Value);
×
320
        }
321

322
        private Timestamp ToTimestamp(DateTime dateTime)
323
        {
324
            switch (dateTime.Kind)
18✔
325
            {
326
                case DateTimeKind.Local:
327
                    dateTime = dateTime.ToUniversalTime();
×
328
                    break;
×
329
                case DateTimeKind.Unspecified:
330
                    dateTime = DateTime.SpecifyKind(dateTime, DateTimeKind.Utc);
×
331
                    break;
332
            }
333

334
            return dateTime.ToTimestamp();
18✔
335
        }
336
    }
337
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc