• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lucaslorentz / durabletask-extensions / 9336690319

02 Jun 2024 07:51AM UTC coverage: 81.942% (-0.4%) from 82.326%
9336690319

Pull #34

github

lucaslorentz
Update to EF Core 8
Pull Request #34: Update to EF Core 8

2346 of 2863 relevant lines covered (81.94%)

138.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.05
/src/LLL.DurableTask.Server.Grpc.Client/GrpcClientOrchestrationService.cs
1
using System;
2
using System.Collections.Generic;
3
using System.Linq;
4
using System.Threading;
5
using System.Threading.Tasks;
6
using DurableTask.Core;
7
using DurableTask.Core.History;
8
using DurableTaskGrpc;
9
using Google.Protobuf.WellKnownTypes;
10
using LLL.DurableTask.Core;
11
using Microsoft.Extensions.Logging;
12
using Microsoft.Extensions.Options;
13
using static DurableTaskGrpc.OrchestrationService;
14
using TaskActivityWorkItem = DurableTask.Core.TaskActivityWorkItem;
15
using TaskOrchestrationWorkItem = DurableTask.Core.TaskOrchestrationWorkItem;
16

17
namespace LLL.DurableTask.Server.Client;
18

19
public partial class GrpcClientOrchestrationService :
20
    IOrchestrationService,
21
    IDistributedOrchestrationService
22
{
23
    private readonly GrpcClientOrchestrationServiceOptions _options;
24
    private readonly OrchestrationServiceClient _client;
25
    private readonly ILogger _logger;
26

27
    public int TaskOrchestrationDispatcherCount => _options.TaskOrchestrationDispatcherCount;
13✔
28
    public int MaxConcurrentTaskOrchestrationWorkItems => _options.MaxConcurrentTaskOrchestrationWorkItems;
13✔
29
    public int MaxConcurrentTaskActivityWorkItems => _options.MaxConcurrentTaskActivityWorkItems;
15✔
30
    public BehaviorOnContinueAsNew EventBehaviourForContinueAsNew => BehaviorOnContinueAsNew.Carryover;
24✔
31
    public int TaskActivityDispatcherCount => _options.TaskActivityDispatcherCount;
13✔
32

33
    public GrpcClientOrchestrationService(
13✔
34
        IOptions<GrpcClientOrchestrationServiceOptions> options,
13✔
35
        OrchestrationServiceClient client,
13✔
36
        ILogger<GrpcClientOrchestrationSession> logger)
13✔
37
    {
38
        _options = options.Value;
13✔
39
        _client = client;
13✔
40
        _logger = logger;
13✔
41
    }
42

43
    #region Setup
44
    public Task CreateAsync()
45
    {
46
        return CreateAsync(false);
×
47
    }
48

49
    public Task CreateAsync(bool recreateInstanceStore)
50
    {
51
        return Task.CompletedTask;
×
52
    }
53

54
    public Task CreateIfNotExistsAsync()
55
    {
56
        return Task.CompletedTask;
13✔
57
    }
58

59
    public Task DeleteAsync()
60
    {
61
        return DeleteAsync(false);
×
62
    }
63

64
    public Task DeleteAsync(bool deleteInstanceStore)
65
    {
66
        return Task.CompletedTask;
×
67
    }
68

69
    public int GetDelayInSecondsAfterOnFetchException(Exception exception)
70
    {
71
        return _options.DelayInSecondsAfterFailure;
×
72
    }
73

74
    public int GetDelayInSecondsAfterOnProcessException(Exception exception)
75
    {
76
        return _options.DelayInSecondsAfterFailure;
×
77
    }
78

79

80
    public bool IsMaxMessageCountExceeded(int currentMessageCount, OrchestrationRuntimeState runtimeState)
81
    {
82
        return false;
46✔
83
    }
84

85
    public Task StartAsync()
86
    {
87
        return Task.CompletedTask;
13✔
88
    }
89

90
    public Task StopAsync()
91
    {
92
        return Task.CompletedTask;
×
93
    }
94

95
    public Task StopAsync(bool isForced)
96
    {
97
        return Task.CompletedTask;
13✔
98
    }
99
    #endregion
100

101
    #region Orchestration
102
    public Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(
103
        TimeSpan receiveTimeout, CancellationToken cancellationToken)
104
    {
105
        return LockNextTaskOrchestrationWorkItemAsync(
×
106
            receiveTimeout,
×
107
            Array.Empty<INameVersionInfo>(),
×
108
            true,
×
109
            cancellationToken
×
110
        );
×
111
    }
112

113
    public Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(TimeSpan receiveTimeout, INameVersionInfo[] orchestrations, CancellationToken cancellationToken)
114
    {
115
        return LockNextTaskOrchestrationWorkItemAsync(
26✔
116
            receiveTimeout,
26✔
117
            orchestrations,
26✔
118
            false,
26✔
119
            cancellationToken
26✔
120
        );
26✔
121
    }
122

123
    private async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(
124
        TimeSpan receiveTimeout,
125
        INameVersionInfo[] orchestrations,
126
        bool allOrchesrations,
127
        CancellationToken cancellationToken)
128
    {
129
        var stream = _client.LockNextTaskOrchestrationWorkItem(cancellationToken: cancellationToken);
26✔
130

131
        try
132
        {
133
            var request = new TaskOrchestrationRequest
26✔
134
            {
26✔
135
                LockRequest = new LockNextTaskOrchestrationWorkItemRequest
26✔
136
                {
26✔
137
                    ReceiveTimeout = Duration.FromTimeSpan(receiveTimeout),
26✔
138
                    Orchestrations = {
26✔
139
                        orchestrations
26✔
140
                            .Select(nv => new NameVersion { Name = nv.Name, Version = nv.Version })
234✔
141
                    },
26✔
142
                    AllOrchestrations = allOrchesrations
26✔
143
                }
26✔
144
            };
26✔
145

146
            await stream.RequestStream.WriteAsync(request);
26✔
147

148
            if (!await stream.ResponseStream.MoveNext(cancellationToken))
26✔
149
                throw new Exception("Session aborted");
×
150

151
            if (stream.ResponseStream.Current.MessageCase != TaskOrchestrationResponse.MessageOneofCase.LockResponse)
13✔
152
                throw new Exception("Didn't receive lock response");
×
153

154
            var lockResponse = stream.ResponseStream.Current.LockResponse;
13✔
155

156
            if (lockResponse.WorkItem == null)
13✔
157
                return null;
×
158

159
            return new TaskOrchestrationWorkItem
13✔
160
            {
13✔
161
                InstanceId = lockResponse.WorkItem.InstanceId,
13✔
162
                OrchestrationRuntimeState = new OrchestrationRuntimeState(
13✔
163
                    lockResponse.WorkItem.Events
13✔
164
                        .Select(e => _options.DataConverter.Deserialize<HistoryEvent>(e))
17✔
165
                        .ToArray()),
13✔
166
                LockedUntilUtc = lockResponse.WorkItem.LockedUntilUtc.ToDateTime(),
13✔
167
                NewMessages = lockResponse.WorkItem.NewMessages.Select(m => _options.DataConverter.Deserialize<TaskMessage>(m)).ToArray(),
27✔
168
                Session = new GrpcClientOrchestrationSession(_options, stream, _logger)
13✔
169
            };
13✔
170
        }
171
        catch
13✔
172
        {
173
            stream.Dispose();
13✔
174
            throw;
13✔
175
        }
176
    }
177

178
    public async Task RenewTaskOrchestrationWorkItemLockAsync(TaskOrchestrationWorkItem workItem)
179
    {
180
        await (workItem.Session as GrpcClientOrchestrationSession).Renew(workItem);
×
181
    }
182

183
    public async Task CompleteTaskOrchestrationWorkItemAsync(
184
        TaskOrchestrationWorkItem workItem,
185
        OrchestrationRuntimeState newOrchestrationRuntimeState,
186
        IList<TaskMessage> outboundMessages,
187
        IList<TaskMessage> orchestratorMessages,
188
        IList<TaskMessage> timerMessages,
189
        TaskMessage continuedAsNewMessage,
190
        OrchestrationState orchestrationState)
191
    {
192
        await (workItem.Session as GrpcClientOrchestrationSession).Complete(
24✔
193
            workItem,
24✔
194
            newOrchestrationRuntimeState,
24✔
195
            outboundMessages,
24✔
196
            orchestratorMessages,
24✔
197
            timerMessages,
24✔
198
            continuedAsNewMessage,
24✔
199
            orchestrationState);
24✔
200
    }
201

202
    public async Task ReleaseTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem)
203
    {
204
        await (workItem.Session as GrpcClientOrchestrationSession).Release(workItem);
13✔
205
    }
206

207
    public async Task AbandonTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem)
208
    {
209
        await (workItem.Session as GrpcClientOrchestrationSession).Abandon(workItem);
×
210
    }
211

212
    #endregion
213

214
    #region Activity
215
    public Task<TaskActivityWorkItem> LockNextTaskActivityWorkItem(TimeSpan receiveTimeout, CancellationToken cancellationToken)
216
    {
217
        return LockNextTaskActivityWorkItem(
×
218
            receiveTimeout,
×
219
            Array.Empty<INameVersionInfo>(),
×
220
            true,
×
221
            cancellationToken);
×
222
    }
223

224
    public Task<TaskActivityWorkItem> LockNextTaskActivityWorkItem(TimeSpan receiveTimeout, INameVersionInfo[] activities, CancellationToken cancellationToken)
225
    {
226
        return LockNextTaskActivityWorkItem(
31✔
227
            receiveTimeout,
31✔
228
            activities,
31✔
229
            false,
31✔
230
            cancellationToken);
31✔
231
    }
232

233
    private async Task<TaskActivityWorkItem> LockNextTaskActivityWorkItem(
234
        TimeSpan receiveTimeout,
235
        INameVersionInfo[] activities,
236
        bool allActivities,
237
        CancellationToken cancellationToken)
238
    {
239
        var request = new LockNextTaskActivityWorkItemRequest
31✔
240
        {
31✔
241
            ReceiveTimeout = Duration.FromTimeSpan(receiveTimeout),
31✔
242
            Activities = {
31✔
243
                activities
31✔
244
                    .Select(nv => new NameVersion { Name = nv.Name, Version = nv.Version })
124✔
245
            },
31✔
246
            AllActivities = allActivities
31✔
247
        };
31✔
248

249
        var response = await _client.LockNextTaskActivityWorkItemAsync(request, cancellationToken: cancellationToken);
31✔
250

251
        if (response.WorkItem == null)
18✔
252
            return null;
×
253

254
        return ToDurableTaskWorkItem(response.WorkItem);
18✔
255
    }
256

257
    public async Task<TaskActivityWorkItem> RenewTaskActivityWorkItemLockAsync(TaskActivityWorkItem workItem)
258
    {
259
        var request = new RenewTaskActivityWorkItemLockRequest
×
260
        {
×
261
            WorkItem = ToGrpcWorkItem(workItem)
×
262
        };
×
263

264
        var response = await _client.RenewTaskActivityWorkItemLockAsync(request);
×
265

266
        return ToDurableTaskWorkItem(response.WorkItem);
×
267
    }
268

269
    public async Task CompleteTaskActivityWorkItemAsync(
270
        TaskActivityWorkItem workItem,
271
        TaskMessage responseMessage)
272
    {
273
        var request = new CompleteTaskActivityWorkItemRequest
18✔
274
        {
18✔
275
            WorkItem = ToGrpcWorkItem(workItem),
18✔
276
            ResponseMessage = _options.DataConverter.Serialize(responseMessage)
18✔
277
        };
18✔
278

279
        await _client.CompleteTaskActivityWorkItemAsync(request);
18✔
280
    }
281

282
    public async Task AbandonTaskActivityWorkItemAsync(TaskActivityWorkItem workItem)
283
    {
284
        var request = new AbandonTaskActivityWorkItemRequest
×
285
        {
×
286
            WorkItem = ToGrpcWorkItem(workItem)
×
287
        };
×
288

289
        await _client.AbandonTaskActivityWorkItemAsync(request);
×
290
    }
291

292
    private DurableTaskGrpc.TaskActivityWorkItem ToGrpcWorkItem(TaskActivityWorkItem workItem)
293
    {
294
        return new DurableTaskGrpc.TaskActivityWorkItem
18✔
295
        {
18✔
296
            Id = workItem.Id,
18✔
297
            LockedUntilUtc = ToTimestamp(workItem.LockedUntilUtc),
18✔
298
            TaskMessage = _options.DataConverter.Serialize(workItem.TaskMessage)
18✔
299
        };
18✔
300
    }
301

302
    private TaskActivityWorkItem ToDurableTaskWorkItem(DurableTaskGrpc.TaskActivityWorkItem grpcWorkItem)
303
    {
304
        return new TaskActivityWorkItem
18✔
305
        {
18✔
306
            Id = grpcWorkItem.Id,
18✔
307
            LockedUntilUtc = grpcWorkItem.LockedUntilUtc.ToDateTime(),
18✔
308
            TaskMessage = _options.DataConverter.Deserialize<TaskMessage>(grpcWorkItem.TaskMessage)
18✔
309
        };
18✔
310
    }
311

312
    #endregion
313

314
    private Timestamp ToTimestamp(DateTime? dateTime)
315
    {
316
        if (dateTime == null)
5✔
317
            return null;
4✔
318

319
        return ToTimestamp(dateTime.Value);
1✔
320
    }
321

322
    private Timestamp ToTimestamp(DateTime dateTime)
323
    {
324
        switch (dateTime.Kind)
20✔
325
        {
326
            case DateTimeKind.Local:
327
                dateTime = dateTime.ToUniversalTime();
×
328
                break;
×
329
            case DateTimeKind.Unspecified:
330
                dateTime = DateTime.SpecifyKind(dateTime, DateTimeKind.Utc);
×
331
                break;
332
        }
333

334
        return dateTime.ToTimestamp();
20✔
335
    }
336
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc