• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lucaslorentz / durabletask-extensions / 5835751495

pending completion
5835751495

push

github

lucaslorentz
Add husky and apply some code fixes

2502 of 2502 new or added lines in 91 files covered. (100.0%)

2286 of 2792 relevant lines covered (81.88%)

143.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.81
/src/LLL.DurableTask.Server.Grpc/GrpcServerOrchestrationService.cs
1
using System;
2
using System.Linq;
3
using System.Threading.Tasks;
4
using DurableTask.Core;
5
using DurableTask.Core.Common;
6
using DurableTask.Core.History;
7
using DurableTask.Core.Query;
8
using DurableTaskGrpc;
9
using Google.Protobuf.WellKnownTypes;
10
using Grpc.Core;
11
using LLL.DurableTask.Core;
12
using Microsoft.Extensions.Logging;
13
using Microsoft.Extensions.Options;
14
using static DurableTaskGrpc.OrchestrationService;
15
using TaskActivityWorkItem = DurableTask.Core.TaskActivityWorkItem;
16
using TaskOrchestrationWorkItem = DurableTask.Core.TaskOrchestrationWorkItem;
17

18
namespace LLL.DurableTask.Server.Grpc.Server;
19

20
public class GrpcServerOrchestrationService : OrchestrationServiceBase
21
{
22
    private readonly GrpcServerOrchestrationServiceOptions _options;
23
    private readonly IOrchestrationService _orchestrationService;
24
    private readonly IOrchestrationServiceClient _orchestrationServiceClient;
25
    private readonly ILogger<GrpcServerOrchestrationService> _logger;
26
    private readonly IDistributedOrchestrationService _distributedOrchestrationService;
27
    private readonly IOrchestrationServiceQueryClient _orchestrationServiceQueryClient;
28
    private readonly IOrchestrationServicePurgeClient _orchestrationServicePurgeClient;
29
    private readonly IOrchestrationServiceFeaturesClient _orchestrationServiceFeaturesClient;
30
    private readonly IOrchestrationServiceRewindClient _orchestrationServiceRewindClient;
31

32
    public GrpcServerOrchestrationService(
12✔
33
        IOptions<GrpcServerOrchestrationServiceOptions> options,
12✔
34
        IOrchestrationService orchestrationService,
12✔
35
        IOrchestrationServiceClient orchestrationServiceClient,
12✔
36
        ILogger<GrpcServerOrchestrationService> logger,
12✔
37
        IDistributedOrchestrationService distributedOrchestrationService = null,
12✔
38
        IOrchestrationServiceQueryClient orchestrationServiceQueryClient = null,
12✔
39
        IOrchestrationServicePurgeClient orchestrationServicePurgeClient = null,
12✔
40
        IOrchestrationServiceFeaturesClient orchestrationServiceFeaturesClient = null,
12✔
41
        IOrchestrationServiceRewindClient orchestrationServiceRewindClient = null)
12✔
42
    {
43
        _options = options.Value;
12✔
44
        _orchestrationService = orchestrationService;
12✔
45
        _orchestrationServiceClient = orchestrationServiceClient;
12✔
46
        _logger = logger;
12✔
47
        _distributedOrchestrationService = distributedOrchestrationService;
12✔
48
        _orchestrationServiceQueryClient = orchestrationServiceQueryClient;
12✔
49
        _orchestrationServicePurgeClient = orchestrationServicePurgeClient;
12✔
50
        _orchestrationServiceFeaturesClient = orchestrationServiceFeaturesClient;
12✔
51
        _orchestrationServiceRewindClient = orchestrationServiceRewindClient;
12✔
52
    }
53

54
    public override async Task<GetFeaturesResponse> GetFeatures(Empty request, ServerCallContext context)
55
    {
56
        var features = _orchestrationServiceFeaturesClient != null
×
57
            ? await _orchestrationServiceFeaturesClient.GetFeatures()
×
58
            : Array.Empty<OrchestrationFeature>();
×
59

60
        return new GetFeaturesResponse
×
61
        {
×
62
            Features = { features.Select(f => (int)f) }
×
63
        };
×
64
    }
65

66
    public override async Task<Empty> CreateTaskOrchestration(CreateTaskOrchestrationRequest request, ServerCallContext context)
67
    {
68
        var creationMessage = _options.DataConverter.Deserialize<TaskMessage>(request.CreationMessage);
12✔
69
        var dedupeStatuses = request.DedupeStatuses.Count > 0
12✔
70
            ? request.DedupeStatuses.Select(x => (OrchestrationStatus)x).ToArray()
12✔
71
            : null;
12✔
72

73
        await _orchestrationServiceClient.CreateTaskOrchestrationAsync(creationMessage, dedupeStatuses);
12✔
74

75
        return new Empty();
12✔
76
    }
77

78
    public override async Task<Empty> ForceTerminateTaskOrchestration(ForceTerminateTaskOrchestrationRequest request, ServerCallContext context)
79
    {
80
        await _orchestrationServiceClient.ForceTerminateTaskOrchestrationAsync(request.InstanceId, request.Reason);
×
81

82
        return new Empty();
×
83
    }
84

85
    public override async Task<Empty> RewindTaskOrchestration(RewindTaskOrchestrationRequest request, ServerCallContext context)
86
    {
87
        await (_orchestrationServiceRewindClient ?? throw NotSupported("Rewind"))
×
88
            .RewindTaskOrchestrationAsync(request.InstanceId, request.Reason);
×
89

90
        return new Empty();
×
91
    }
92

93
    public override async Task<GetOrchestrationHistoryResponse> GetOrchestrationHistory(GetOrchestrationHistoryRequest request, ServerCallContext context)
94
    {
95
        var history = await _orchestrationServiceClient.GetOrchestrationHistoryAsync(
×
96
            request.InstanceId,
×
97
            request.ExecutionId);
×
98

99
        var response = new GetOrchestrationHistoryResponse
×
100
        {
×
101
            History = history
×
102
        };
×
103

104
        return response;
×
105
    }
106

107
    public override async Task<GetOrchestrationInstanceStateResponse> GetOrchestrationInstanceState(GetOrchestrationInstanceStateRequest request, ServerCallContext context)
108
    {
109
        var states = await _orchestrationServiceClient.GetOrchestrationStateAsync(
2✔
110
            request.InstanceId,
2✔
111
            request.AllExecutions);
2✔
112

113
        var response = new GetOrchestrationInstanceStateResponse
2✔
114
        {
2✔
115
            States = { states.Select(s => _options.DataConverter.Serialize(s)) }
3✔
116
        };
2✔
117

118
        return response;
2✔
119
    }
120

121
    public override async Task<GetOrchestrationStateResponse> GetOrchestrationState(GetOrchestrationStateRequest request, ServerCallContext context)
122
    {
123
        var state = await _orchestrationServiceClient.GetOrchestrationStateAsync(
1✔
124
            request.InstanceId,
1✔
125
            request.ExecutionId);
1✔
126

127
        var response = new GetOrchestrationStateResponse
1✔
128
        {
1✔
129
            State = state == null ? null : _options.DataConverter.Serialize(state)
1✔
130
        };
1✔
131

132
        return response;
1✔
133
    }
134

135
    public override async Task<Empty> PurgeOrchestrationHistory(PurgeOrchestrationHistoryRequest request, ServerCallContext context)
136
    {
137
        await _orchestrationServiceClient.PurgeOrchestrationHistoryAsync(
×
138
            request.ThresholdDateTimeUtc.ToDateTime(),
×
139
            (OrchestrationStateTimeRangeFilterType)request.TimeRangeFilterType);
×
140

141
        return new Empty();
×
142
    }
143

144
    public override async Task<WaitForOrchestrationResponse> WaitForOrchestration(WaitForOrchestrationRequest request, ServerCallContext context)
145
    {
146
        var state = await _orchestrationServiceClient.WaitForOrchestrationAsync(
17✔
147
            request.InstanceId,
17✔
148
            request.ExecutionId,
17✔
149
            request.Timeout.ToTimeSpan(),
17✔
150
            context.CancellationToken);
17✔
151

152
        var response = new WaitForOrchestrationResponse
17✔
153
        {
17✔
154
            State = state == null ? null : _options.DataConverter.Serialize(state)
17✔
155
        };
17✔
156

157
        return response;
17✔
158
    }
159

160
    public override async Task<GetOrchestrationWithQueryResponse> GetOrchestrationWithQuery(GetOrchestrationWithQueryRequest request, ServerCallContext context)
161
    {
162
        var query = new ExtendedOrchestrationQuery();
1✔
163
        query.RuntimeStatus = request.RuntimeStatus.Select(s => (OrchestrationStatus)s).ToArray();
1✔
164
        query.CreatedTimeFrom = request.CreatedTimeFrom?.ToDateTime();
1✔
165
        query.CreatedTimeTo = request.CreatedTimeTo?.ToDateTime();
1✔
166
        query.TaskHubNames = request.TaskHubNames;
1✔
167
        query.PageSize = request.PageSize;
1✔
168
        query.ContinuationToken = request.ContinuationToken;
1✔
169
        query.InstanceIdPrefix = request.InstanceIdPrefix;
1✔
170
        query.FetchInputsAndOutputs = request.FetchInputsAndOutputs;
1✔
171
        query.NamePrefix = request.NamePrefix;
1✔
172
        query.CompletedTimeFrom = request.CompletedTimeFrom?.ToDateTime();
1✔
173
        query.CompletedTimeTo = request.CompletedTimeTo?.ToDateTime();
1✔
174
        query.IncludePreviousExecutions = request.IncludePreviousExecutions;
1✔
175
        foreach (var kv in request.Tags)
6✔
176
            query.Tags.Add(kv.Key, kv.Value);
2✔
177

178
        var queryResult = await _orchestrationServiceQueryClient.GetOrchestrationWithQueryAsync(query, context.CancellationToken);
1✔
179

180
        var response = new GetOrchestrationWithQueryResponse
1✔
181
        {
1✔
182
            OrchestrationState = { queryResult.OrchestrationState.Select(s => _options.DataConverter.Serialize(s)) },
2✔
183
            ContinuationToken = queryResult.ContinuationToken
1✔
184
        };
1✔
185

186
        return response;
1✔
187
    }
188

189
    public override async Task<PurgeInstanceHistoryResponse> PurgeInstanceHistory(PurgeInstanceHistoryRequest request, ServerCallContext context)
190
    {
191
        var client = _orchestrationServicePurgeClient ?? throw NotSupported("PurgeInstanceHistory");
1✔
192

193
        PurgeResult result;
194

195
        if (!string.IsNullOrEmpty(request.InstanceId))
1✔
196
        {
197
            result = await client.PurgeInstanceStateAsync(request.InstanceId);
1✔
198
        }
199
        else
200
        {
201
            var createdTimeFrom = request.CreatedTimeFrom?.ToDateTime() ?? DateTime.MinValue;
×
202
            var createdTimeTo = request.CreatedTimeTo?.ToDateTime();
×
203
            var runtimeStatus = request.RuntimeStatus.Select(s => (OrchestrationStatus)s).ToArray();
×
204

205
            var filter = new PurgeInstanceFilter(createdTimeFrom, createdTimeTo, runtimeStatus);
×
206

207
            result = await client.PurgeInstanceStateAsync(filter);
×
208
        }
209

210
        return new PurgeInstanceHistoryResponse
1✔
211
        {
1✔
212
            InstancesDeleted = result.DeletedInstanceCount
1✔
213
        };
1✔
214
    }
215

216
    public override async Task<Empty> SendTaskOrchestrationMessageBatch(SendTaskOrchestrationMessageBatchRequest request, ServerCallContext context)
217
    {
218
        var messages = request.Messages.Select(x => _options.DataConverter.Deserialize<TaskMessage>(x)).ToArray();
4✔
219

220
        await _orchestrationServiceClient.SendTaskOrchestrationMessageBatchAsync(messages);
2✔
221

222
        return new Empty();
2✔
223
    }
224

225
    public override async Task LockNextTaskOrchestrationWorkItem(IAsyncStreamReader<TaskOrchestrationRequest> requestStream, IServerStreamWriter<TaskOrchestrationResponse> responseStream, ServerCallContext context)
226
    {
227
        try
228
        {
229
            TaskOrchestrationWorkItem workItem = null;
27✔
230

231
            // Receive and reply each message
232
            await foreach (var message in requestStream.ReadAllAsync(context.CancellationToken))
222✔
233
            {
234
                switch (message.MessageCase)
90✔
235
                {
236
                    case TaskOrchestrationRequest.MessageOneofCase.LockRequest:
237
                        var lockRequest = message.LockRequest;
27✔
238
                        var orchestrations = lockRequest.Orchestrations.Select(x => new NameVersion(x.Name, x.Version)).ToArray();
243✔
239

240
                        workItem = await (lockRequest.AllOrchestrations
27✔
241
                            ? _orchestrationService
27✔
242
                                .LockNextTaskOrchestrationWorkItemAsync(lockRequest.ReceiveTimeout.ToTimeSpan(), context.CancellationToken)
27✔
243
                            : (_distributedOrchestrationService ?? throw DistributedWorkersNotSupported())
27✔
244
                                .LockNextTaskOrchestrationWorkItemAsync(lockRequest.ReceiveTimeout.ToTimeSpan(), orchestrations, context.CancellationToken)
27✔
245
                        );
27✔
246

247
                        var lockResponse = new TaskOrchestrationResponse
15✔
248
                        {
15✔
249
                            LockResponse = new LockNextTaskOrchestrationWorkItemResponse
15✔
250
                            {
15✔
251
                                WorkItem = workItem == null ? null : new DurableTaskGrpc.TaskOrchestrationWorkItem
15✔
252
                                {
15✔
253
                                    InstanceId = workItem.InstanceId,
15✔
254
                                    LockedUntilUtc = Timestamp.FromDateTime(workItem.LockedUntilUtc),
15✔
255
                                    Events = { workItem.OrchestrationRuntimeState.Events.Select(_options.DataConverter.Serialize) },
15✔
256
                                    NewMessages = { workItem.NewMessages.Select(_options.DataConverter.Serialize) }
15✔
257
                                }
15✔
258
                            }
15✔
259
                        };
15✔
260

261
                        context.CancellationToken.ThrowIfCancellationRequested();
15✔
262

263
                        await responseStream.WriteAsync(lockResponse);
15✔
264
                        break;
15✔
265
                    case TaskOrchestrationRequest.MessageOneofCase.RenewRequest:
266
                        var renewRequest = message.RenewRequest;
11✔
267
                        await _orchestrationService.RenewTaskOrchestrationWorkItemLockAsync(workItem);
11✔
268

269
                        var renewResponse = new TaskOrchestrationResponse
11✔
270
                        {
11✔
271
                            RenewResponse = new RenewTaskOrchestrationWorkItemLockResponse
11✔
272
                            {
11✔
273
                                LockedUntilUtc = Timestamp.FromDateTime(workItem.LockedUntilUtc)
11✔
274
                            }
11✔
275
                        };
11✔
276

277
                        context.CancellationToken.ThrowIfCancellationRequested();
11✔
278

279
                        await responseStream.WriteAsync(renewResponse);
11✔
280
                        break;
11✔
281
                    case TaskOrchestrationRequest.MessageOneofCase.CompleteRequest:
282
                        var completeRequest = message.CompleteRequest;
26✔
283
                        var outboundMessages = completeRequest.OutboundMessages.Select(x => _options.DataConverter.Deserialize<TaskMessage>(x)).ToArray();
44✔
284
                        var timerMessages = completeRequest.TimerMessages.Select(x => _options.DataConverter.Deserialize<TaskMessage>(x)).ToArray();
27✔
285
                        var orchestratorMessages = completeRequest.OrchestratorMessages.Select(x => _options.DataConverter.Deserialize<TaskMessage>(x)).ToArray();
32✔
286
                        var continuedAsNewMessage = string.IsNullOrEmpty(completeRequest.ContinuedAsNewMessage)
26✔
287
                            ? null
26✔
288
                            : _options.DataConverter.Deserialize<TaskMessage>(completeRequest.ContinuedAsNewMessage);
26✔
289

290
                        var newEvents = completeRequest.NewEvents.Select(x => _options.DataConverter.Deserialize<HistoryEvent>(x)).ToArray();
146✔
291
                        workItem.OrchestrationRuntimeState ??= new OrchestrationRuntimeState();
26✔
292
                        foreach (var newEvent in newEvents)
292✔
293
                        {
294
                            workItem.OrchestrationRuntimeState.AddEvent(newEvent);
120✔
295
                        }
296
                        workItem.OrchestrationRuntimeState.Status = completeRequest.NewStatus;
26✔
297

298
                        var newOrchestrationRuntimeState = workItem.OrchestrationRuntimeState;
26✔
299
                        var newOrchestrationRuntimeStateEvents = completeRequest.NewOrchestrationEvents.Select(x => _options.DataConverter.Deserialize<HistoryEvent>(x)).ToArray();
61✔
300
                        if (newOrchestrationRuntimeStateEvents.Length > 0)
26✔
301
                        {
302
                            newOrchestrationRuntimeState = new OrchestrationRuntimeState();
7✔
303
                            foreach (var newEvent in newOrchestrationRuntimeStateEvents)
84✔
304
                            {
305
                                newOrchestrationRuntimeState.AddEvent(newEvent);
35✔
306
                            }
307
                            newOrchestrationRuntimeState.Status = completeRequest.NewOrchestrationStatus;
7✔
308
                        }
309

310
                        var orchestrationState = Utils.BuildOrchestrationState(newOrchestrationRuntimeState);
26✔
311

312
                        await _orchestrationService.CompleteTaskOrchestrationWorkItemAsync(
26✔
313
                            workItem,
26✔
314
                            newOrchestrationRuntimeState,
26✔
315
                            outboundMessages,
26✔
316
                            orchestratorMessages,
26✔
317
                            timerMessages,
26✔
318
                            continuedAsNewMessage,
26✔
319
                            orchestrationState);
26✔
320

321
                        newOrchestrationRuntimeState.NewEvents.Clear();
26✔
322

323
                        workItem.OrchestrationRuntimeState = newOrchestrationRuntimeState;
26✔
324

325
                        context.CancellationToken.ThrowIfCancellationRequested();
26✔
326

327
                        await responseStream.WriteAsync(new TaskOrchestrationResponse
26✔
328
                        {
26✔
329
                            CompleteResponse = new CompleteTaskOrchestrationWorkItemResponse()
26✔
330
                        });
26✔
331
                        break;
26✔
332
                    case TaskOrchestrationRequest.MessageOneofCase.FetchRequest:
333
                        var fetchRequest = message.FetchRequest;
11✔
334
                        if (workItem.Session == null)
11✔
335
                        {
336
                            var fetchResponse = new TaskOrchestrationResponse
×
337
                            {
×
338
                                FetchResponse = new FetchNewOrchestrationMessagesResponse
×
339
                                {
×
340
                                    NewMessages = null
×
341
                                }
×
342
                            };
×
343

344
                            context.CancellationToken.ThrowIfCancellationRequested();
×
345

346
                            await responseStream.WriteAsync(fetchResponse);
×
347
                        }
348
                        else
349
                        {
350
                            var newMessages = await workItem.Session.FetchNewOrchestrationMessagesAsync(workItem);
11✔
351

352
                            var fetchResponse = new TaskOrchestrationResponse
11✔
353
                            {
11✔
354
                                FetchResponse = new FetchNewOrchestrationMessagesResponse
11✔
355
                                {
11✔
356
                                    NewMessages = newMessages == null ? null : new OrchestrationMessages
11✔
357
                                    {
11✔
358
                                        Messages = { newMessages.Select(_options.DataConverter.Serialize) }
11✔
359
                                    }
11✔
360
                                }
11✔
361
                            };
11✔
362

363
                            context.CancellationToken.ThrowIfCancellationRequested();
11✔
364

365
                            await responseStream.WriteAsync(fetchResponse);
11✔
366
                        }
367
                        break;
11✔
368
                    case TaskOrchestrationRequest.MessageOneofCase.ReleaseRequest:
369
                        var releaseRequest = message.ReleaseRequest;
15✔
370
                        await _orchestrationService.ReleaseTaskOrchestrationWorkItemAsync(workItem);
15✔
371
                        context.CancellationToken.ThrowIfCancellationRequested();
15✔
372
                        await responseStream.WriteAsync(new TaskOrchestrationResponse
15✔
373
                        {
15✔
374
                            ReleaseResponse = new ReleaseTaskOrchestrationWorkItemResponse()
15✔
375
                        });
15✔
376
                        break;
15✔
377
                    case TaskOrchestrationRequest.MessageOneofCase.AbandonRequest:
378
                        var abandonRequest = message.AbandonRequest;
×
379
                        await _orchestrationService.AbandonTaskOrchestrationWorkItemAsync(workItem);
×
380
                        context.CancellationToken.ThrowIfCancellationRequested();
×
381
                        await responseStream.WriteAsync(new TaskOrchestrationResponse
×
382
                        {
×
383
                            AbandonResponse = new AbandonTaskOrchestrationWorkItemLockResponse()
×
384
                        });
×
385
                        break;
386
                }
387
            }
388
        }
389
        catch (OperationCanceledException) when (context.CancellationToken.IsCancellationRequested)
12✔
390
        {
391
            // Avoid exceptions when clients cancel request
392
        }
393
    }
394

395
    public override async Task<LockNextTaskActivityWorkItemResponse> LockNextTaskActivityWorkItem(LockNextTaskActivityWorkItemRequest request, ServerCallContext context)
396
    {
397
        try
398
        {
399
            var activities = request.Activities.Select(x => new NameVersion(x.Name, x.Version)).ToArray();
120✔
400

401
            var workItem = await (request.AllActivities
30✔
402
                ? _orchestrationService
30✔
403
                    .LockNextTaskActivityWorkItem(request.ReceiveTimeout.ToTimeSpan(), context.CancellationToken)
30✔
404
                : (_distributedOrchestrationService ?? throw DistributedWorkersNotSupported())
30✔
405
                    .LockNextTaskActivityWorkItem(request.ReceiveTimeout.ToTimeSpan(), activities, context.CancellationToken));
30✔
406

407
            var response = new LockNextTaskActivityWorkItemResponse
18✔
408
            {
18✔
409
                WorkItem = workItem == null ? null : ToGrpcWorkItem(workItem)
18✔
410
            };
18✔
411

412
            return response;
18✔
413
        }
414
        catch (OperationCanceledException) when (context.CancellationToken.IsCancellationRequested)
12✔
415
        {
416
            // Avoid exceptions when clients cancel request
417
            return null;
12✔
418
        }
419
    }
420

421
    public override async Task<RenewTaskActivityWorkItemLockResponse> RenewTaskActivityWorkItemLock(RenewTaskActivityWorkItemLockRequest request, ServerCallContext context)
422
    {
423
        var workItem = ToDurableTaskWorkItem(request.WorkItem);
×
424

425
        var newWorkItem = await _orchestrationService.RenewTaskActivityWorkItemLockAsync(workItem);
×
426

427
        var response = new RenewTaskActivityWorkItemLockResponse
×
428
        {
×
429
            WorkItem = ToGrpcWorkItem(newWorkItem)
×
430
        };
×
431

432
        return response;
×
433
    }
434

435
    public override async Task<Empty> CompleteTaskActivityWorkItem(
436
        CompleteTaskActivityWorkItemRequest request, ServerCallContext context)
437
    {
438
        var workItem = ToDurableTaskWorkItem(request.WorkItem);
18✔
439

440
        var responseMessage = _options.DataConverter.Deserialize<TaskMessage>(request.ResponseMessage);
18✔
441

442
        await _orchestrationService.CompleteTaskActivityWorkItemAsync(workItem, responseMessage);
18✔
443

444
        return new Empty();
18✔
445
    }
446

447
    public override async Task<Empty> AbandonTaskActivityWorkItem(AbandonTaskActivityWorkItemRequest request, ServerCallContext context)
448
    {
449
        var workItem = ToDurableTaskWorkItem(request.WorkItem);
×
450

451
        await _orchestrationService.AbandonTaskActivityWorkItemAsync(workItem);
×
452

453
        return new Empty();
×
454
    }
455

456
    private DurableTaskGrpc.TaskActivityWorkItem ToGrpcWorkItem(TaskActivityWorkItem workItem)
457
    {
458
        return new DurableTaskGrpc.TaskActivityWorkItem
18✔
459
        {
18✔
460
            Id = workItem.Id,
18✔
461
            LockedUntilUtc = Timestamp.FromDateTime(workItem.LockedUntilUtc),
18✔
462
            TaskMessage = _options.DataConverter.Serialize(workItem.TaskMessage)
18✔
463
        };
18✔
464
    }
465

466
    private TaskActivityWorkItem ToDurableTaskWorkItem(DurableTaskGrpc.TaskActivityWorkItem grpcWorkItem)
467
    {
468
        return new TaskActivityWorkItem
18✔
469
        {
18✔
470
            Id = grpcWorkItem.Id,
18✔
471
            LockedUntilUtc = grpcWorkItem.LockedUntilUtc.ToDateTime(),
18✔
472
            TaskMessage = _options.DataConverter.Deserialize<TaskMessage>(grpcWorkItem.TaskMessage)
18✔
473
        };
18✔
474
    }
475

476
    private Exception DistributedWorkersNotSupported()
477
    {
478
        return NotSupported("Distributed workers");
×
479
    }
480

481
    private Exception NotSupported(string operation)
482
    {
483
        return new NotSupportedException($"{operation} is not supported by storage implementation");
×
484
    }
485
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc