• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lucaslorentz / durabletask-extensions / 16703104561

03 Aug 2025 08:37AM UTC coverage: 83.438%. First build
16703104561

push

github

web-flow
Merge 2719b0f3f into 5c28b612b

86 of 97 new or added lines in 27 files covered. (88.66%)

2403 of 2880 relevant lines covered (83.44%)

141.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.15
/src/LLL.DurableTask.Server.Grpc/GrpcServerOrchestrationService.cs
1
using System;
2
using System.Linq;
3
using System.Threading.Tasks;
4
using DurableTask.Core;
5
using DurableTask.Core.Common;
6
using DurableTask.Core.History;
7
using DurableTask.Core.Query;
8
using DurableTaskGrpc;
9
using Google.Protobuf.WellKnownTypes;
10
using Grpc.Core;
11
using LLL.DurableTask.Core;
12
using Microsoft.Extensions.Logging;
13
using Microsoft.Extensions.Options;
14
using static DurableTaskGrpc.OrchestrationService;
15
using TaskActivityWorkItem = DurableTask.Core.TaskActivityWorkItem;
16
using TaskOrchestrationWorkItem = DurableTask.Core.TaskOrchestrationWorkItem;
17

18
namespace LLL.DurableTask.Server.Grpc.Server;
19

20
public class GrpcServerOrchestrationService : OrchestrationServiceBase
21
{
22
    private readonly GrpcServerOrchestrationServiceOptions _options;
23
    private readonly IOrchestrationService _orchestrationService;
24
    private readonly IOrchestrationServiceClient _orchestrationServiceClient;
25
    private readonly ILogger<GrpcServerOrchestrationService> _logger;
26
    private readonly IDistributedOrchestrationService _distributedOrchestrationService;
27
    private readonly IOrchestrationServiceQueryClient _orchestrationServiceQueryClient;
28
    private readonly IOrchestrationServicePurgeClient _orchestrationServicePurgeClient;
29
    private readonly IOrchestrationServiceFeaturesClient _orchestrationServiceFeaturesClient;
30
    private readonly IOrchestrationServiceRewindClient _orchestrationServiceRewindClient;
31

32
    public GrpcServerOrchestrationService(
14✔
33
        IOptions<GrpcServerOrchestrationServiceOptions> options,
14✔
34
        IOrchestrationService orchestrationService,
14✔
35
        IOrchestrationServiceClient orchestrationServiceClient,
14✔
36
        ILogger<GrpcServerOrchestrationService> logger,
14✔
37
        IDistributedOrchestrationService distributedOrchestrationService = null,
14✔
38
        IOrchestrationServiceQueryClient orchestrationServiceQueryClient = null,
14✔
39
        IOrchestrationServicePurgeClient orchestrationServicePurgeClient = null,
14✔
40
        IOrchestrationServiceFeaturesClient orchestrationServiceFeaturesClient = null,
14✔
41
        IOrchestrationServiceRewindClient orchestrationServiceRewindClient = null)
14✔
42
    {
43
        _options = options.Value;
14✔
44
        _orchestrationService = orchestrationService;
14✔
45
        _orchestrationServiceClient = orchestrationServiceClient;
14✔
46
        _logger = logger;
14✔
47
        _distributedOrchestrationService = distributedOrchestrationService;
14✔
48
        _orchestrationServiceQueryClient = orchestrationServiceQueryClient;
14✔
49
        _orchestrationServicePurgeClient = orchestrationServicePurgeClient;
14✔
50
        _orchestrationServiceFeaturesClient = orchestrationServiceFeaturesClient;
14✔
51
        _orchestrationServiceRewindClient = orchestrationServiceRewindClient;
14✔
52
    }
53

54
    public override async Task<GetFeaturesResponse> GetFeatures(Empty request, ServerCallContext context)
55
    {
NEW
56
        var features = _orchestrationServiceFeaturesClient is not null
×
57
            ? await _orchestrationServiceFeaturesClient.GetFeatures()
×
58
            : Array.Empty<OrchestrationFeature>();
×
59

60
        return new GetFeaturesResponse
×
61
        {
×
62
            Features = { features.Select(f => (int)f) }
×
63
        };
×
64
    }
65

66
    public override async Task<Empty> CreateTaskOrchestration(CreateTaskOrchestrationRequest request, ServerCallContext context)
67
    {
68
        var creationMessage = _options.DataConverter.Deserialize<TaskMessage>(request.CreationMessage);
14✔
69
        var dedupeStatuses = request.DedupeStatuses.Count > 0
14✔
70
            ? request.DedupeStatuses.Select(x => (OrchestrationStatus)x).ToArray()
14✔
71
            : null;
14✔
72

73
        await _orchestrationServiceClient.CreateTaskOrchestrationAsync(creationMessage, dedupeStatuses);
14✔
74

75
        return new Empty();
14✔
76
    }
77

78
    public override async Task<Empty> ForceTerminateTaskOrchestration(ForceTerminateTaskOrchestrationRequest request, ServerCallContext context)
79
    {
80
        await _orchestrationServiceClient.ForceTerminateTaskOrchestrationAsync(request.InstanceId, request.Reason);
1✔
81

82
        return new Empty();
1✔
83
    }
84

85
    public override async Task<Empty> RewindTaskOrchestration(RewindTaskOrchestrationRequest request, ServerCallContext context)
86
    {
87
        await (_orchestrationServiceRewindClient ?? throw NotSupported("Rewind"))
×
88
            .RewindTaskOrchestrationAsync(request.InstanceId, request.Reason);
×
89

90
        return new Empty();
×
91
    }
92

93
    public override async Task<GetOrchestrationHistoryResponse> GetOrchestrationHistory(GetOrchestrationHistoryRequest request, ServerCallContext context)
94
    {
95
        var history = await _orchestrationServiceClient.GetOrchestrationHistoryAsync(
1✔
96
            request.InstanceId,
1✔
97
            request.ExecutionId);
1✔
98

99
        var response = new GetOrchestrationHistoryResponse
1✔
100
        {
1✔
101
            History = history
1✔
102
        };
1✔
103

104
        return response;
1✔
105
    }
106

107
    public override async Task<GetOrchestrationInstanceStateResponse> GetOrchestrationInstanceState(GetOrchestrationInstanceStateRequest request, ServerCallContext context)
108
    {
109
        var states = await _orchestrationServiceClient.GetOrchestrationStateAsync(
4✔
110
            request.InstanceId,
4✔
111
            request.AllExecutions);
4✔
112

113
        var response = new GetOrchestrationInstanceStateResponse
4✔
114
        {
4✔
115
            States = { states.Select(s => _options.DataConverter.Serialize(s)) }
6✔
116
        };
4✔
117

118
        return response;
4✔
119
    }
120

121
    public override async Task<GetOrchestrationStateResponse> GetOrchestrationState(GetOrchestrationStateRequest request, ServerCallContext context)
122
    {
123
        var state = await _orchestrationServiceClient.GetOrchestrationStateAsync(
1✔
124
            request.InstanceId,
1✔
125
            request.ExecutionId);
1✔
126

127
        var response = new GetOrchestrationStateResponse
1✔
128
        {
1✔
129
            State = state is null ? null : _options.DataConverter.Serialize(state)
1✔
130
        };
1✔
131

132
        return response;
1✔
133
    }
134

135
    public override async Task<Empty> PurgeOrchestrationHistory(PurgeOrchestrationHistoryRequest request, ServerCallContext context)
136
    {
137
        await _orchestrationServiceClient.PurgeOrchestrationHistoryAsync(
×
138
            request.ThresholdDateTimeUtc.ToDateTime(),
×
139
            (OrchestrationStateTimeRangeFilterType)request.TimeRangeFilterType);
×
140

141
        return new Empty();
×
142
    }
143

144
    public override async Task<WaitForOrchestrationResponse> WaitForOrchestration(WaitForOrchestrationRequest request, ServerCallContext context)
145
    {
146
        var state = await _orchestrationServiceClient.WaitForOrchestrationAsync(
19✔
147
            request.InstanceId,
19✔
148
            request.ExecutionId,
19✔
149
            request.Timeout.ToTimeSpan(),
19✔
150
            context.CancellationToken);
19✔
151

152
        var response = new WaitForOrchestrationResponse
19✔
153
        {
19✔
154
            State = state is null ? null : _options.DataConverter.Serialize(state)
19✔
155
        };
19✔
156

157
        return response;
19✔
158
    }
159

160
    public override async Task<GetOrchestrationWithQueryResponse> GetOrchestrationWithQuery(GetOrchestrationWithQueryRequest request, ServerCallContext context)
161
    {
162
        var query = new OrchestrationQueryExtended();
1✔
163
        query.RuntimeStatus = request.RuntimeStatus.Select(s => (OrchestrationStatus)s).ToArray();
1✔
164
        query.CreatedTimeFrom = request.CreatedTimeFrom?.ToDateTime();
1✔
165
        query.CreatedTimeTo = request.CreatedTimeTo?.ToDateTime();
1✔
166
        query.TaskHubNames = request.TaskHubNames;
1✔
167
        query.PageSize = request.PageSize;
1✔
168
        query.ContinuationToken = request.ContinuationToken;
1✔
169
        query.InstanceIdPrefix = request.InstanceIdPrefix;
1✔
170
        query.FetchInputsAndOutputs = request.FetchInputsAndOutputs;
1✔
171
        query.NamePrefix = request.NamePrefix;
1✔
172
        query.CompletedTimeFrom = request.CompletedTimeFrom?.ToDateTime();
1✔
173
        query.CompletedTimeTo = request.CompletedTimeTo?.ToDateTime();
1✔
174
        query.IncludePreviousExecutions = request.IncludePreviousExecutions;
1✔
175
        foreach (var kv in request.Tags)
6✔
176
            query.Tags.Add(kv.Key, kv.Value);
2✔
177

178
        var queryResult = await _orchestrationServiceQueryClient.GetOrchestrationWithQueryAsync(query, context.CancellationToken);
1✔
179

180
        var response = new GetOrchestrationWithQueryResponse
1✔
181
        {
1✔
182
            OrchestrationState = { queryResult.OrchestrationState.Select(s => _options.DataConverter.Serialize(s)) },
2✔
183
            ContinuationToken = queryResult.ContinuationToken
1✔
184
        };
1✔
185

186
        return response;
1✔
187
    }
188

189
    public override async Task<PurgeInstanceHistoryResponse> PurgeInstanceHistory(PurgeInstanceHistoryRequest request, ServerCallContext context)
190
    {
191
        var client = _orchestrationServicePurgeClient ?? throw NotSupported("PurgeInstanceHistory");
2✔
192

193
        PurgeResult result;
194

195
        if (!string.IsNullOrEmpty(request.InstanceId))
2✔
196
        {
197
            result = await client.PurgeInstanceStateAsync(request.InstanceId);
1✔
198
        }
199
        else
200
        {
201
            var createdTimeFrom = request.CreatedTimeFrom?.ToDateTime() ?? DateTime.MinValue;
1✔
202
            var createdTimeTo = request.CreatedTimeTo?.ToDateTime();
1✔
203
            var runtimeStatus = request.RuntimeStatus.Select(s => (OrchestrationStatus)s).ToArray();
9✔
204

205
            var filter = new PurgeInstanceFilterExtended(createdTimeFrom, createdTimeTo, runtimeStatus)
1✔
206
            {
1✔
207
                Limit = request.HasLimit ? request.Limit : null
1✔
208
            };
1✔
209

210
            result = await client.PurgeInstanceStateAsync(filter);
1✔
211
        }
212

213
        return new PurgeInstanceHistoryResponse
2✔
214
        {
2✔
215
            InstancesDeleted = result.DeletedInstanceCount
2✔
216
        };
2✔
217
    }
218

219
    public override async Task<Empty> SendTaskOrchestrationMessageBatch(SendTaskOrchestrationMessageBatchRequest request, ServerCallContext context)
220
    {
221
        var messages = request.Messages.Select(x => _options.DataConverter.Deserialize<TaskMessage>(x)).ToArray();
4✔
222

223
        await _orchestrationServiceClient.SendTaskOrchestrationMessageBatchAsync(messages);
2✔
224

225
        return new Empty();
2✔
226
    }
227

228
    public override async Task LockNextTaskOrchestrationWorkItem(IAsyncStreamReader<TaskOrchestrationRequest> requestStream, IServerStreamWriter<TaskOrchestrationResponse> responseStream, ServerCallContext context)
229
    {
230
        try
231
        {
232
            TaskOrchestrationWorkItem workItem = null;
29✔
233

234
            // Receive and reply each message
235
            await foreach (var message in requestStream.ReadAllAsync(context.CancellationToken))
208✔
236
            {
237
                switch (message.MessageCase)
82✔
238
                {
239
                    case TaskOrchestrationRequest.MessageOneofCase.LockRequest:
240
                        var lockRequest = message.LockRequest;
29✔
241
                        var orchestrations = lockRequest.Orchestrations.Select(x => new NameVersion(x.Name, x.Version)).ToArray();
261✔
242

243
                        workItem = await (lockRequest.AllOrchestrations
29✔
244
                            ? _orchestrationService
29✔
245
                                .LockNextTaskOrchestrationWorkItemAsync(lockRequest.ReceiveTimeout.ToTimeSpan(), context.CancellationToken)
29✔
246
                            : (_distributedOrchestrationService ?? throw DistributedWorkersNotSupported())
29✔
247
                                .LockNextTaskOrchestrationWorkItemAsync(lockRequest.ReceiveTimeout.ToTimeSpan(), orchestrations, context.CancellationToken)
29✔
248
                        );
29✔
249

250
                        var lockResponse = new TaskOrchestrationResponse
16✔
251
                        {
16✔
252
                            LockResponse = new LockNextTaskOrchestrationWorkItemResponse
16✔
253
                            {
16✔
254
                                WorkItem = workItem is null ? null : new DurableTaskGrpc.TaskOrchestrationWorkItem
16✔
255
                                {
16✔
256
                                    InstanceId = workItem.InstanceId,
16✔
257
                                    LockedUntilUtc = Timestamp.FromDateTime(workItem.LockedUntilUtc),
16✔
258
                                    Events = { workItem.OrchestrationRuntimeState.Events.Select(_options.DataConverter.Serialize) },
16✔
259
                                    NewMessages = { workItem.NewMessages.Select(_options.DataConverter.Serialize) }
16✔
260
                                }
16✔
261
                            }
16✔
262
                        };
16✔
263

264
                        context.CancellationToken.ThrowIfCancellationRequested();
16✔
265

266
                        await responseStream.WriteAsync(lockResponse);
16✔
267
                        break;
16✔
268
                    case TaskOrchestrationRequest.MessageOneofCase.RenewRequest:
269
                        var renewRequest = message.RenewRequest;
×
270
                        await _orchestrationService.RenewTaskOrchestrationWorkItemLockAsync(workItem);
×
271

272
                        var renewResponse = new TaskOrchestrationResponse
×
273
                        {
×
274
                            RenewResponse = new RenewTaskOrchestrationWorkItemLockResponse
×
275
                            {
×
276
                                LockedUntilUtc = Timestamp.FromDateTime(workItem.LockedUntilUtc)
×
277
                            }
×
278
                        };
×
279

280
                        context.CancellationToken.ThrowIfCancellationRequested();
×
281

282
                        await responseStream.WriteAsync(renewResponse);
×
283
                        break;
×
284
                    case TaskOrchestrationRequest.MessageOneofCase.CompleteRequest:
285
                        var completeRequest = message.CompleteRequest;
27✔
286
                        var outboundMessages = completeRequest.OutboundMessages.Select(x => _options.DataConverter.Deserialize<TaskMessage>(x)).ToArray();
45✔
287
                        var timerMessages = completeRequest.TimerMessages.Select(x => _options.DataConverter.Deserialize<TaskMessage>(x)).ToArray();
29✔
288
                        var orchestratorMessages = completeRequest.OrchestratorMessages.Select(x => _options.DataConverter.Deserialize<TaskMessage>(x)).ToArray();
33✔
289
                        var continuedAsNewMessage = string.IsNullOrEmpty(completeRequest.ContinuedAsNewMessage)
27✔
290
                            ? null
27✔
291
                            : _options.DataConverter.Deserialize<TaskMessage>(completeRequest.ContinuedAsNewMessage);
27✔
292

293
                        var newEvents = completeRequest.NewEvents.Select(x => _options.DataConverter.Deserialize<HistoryEvent>(x)).ToArray();
151✔
294
                        workItem.OrchestrationRuntimeState ??= new OrchestrationRuntimeState();
27✔
295
                        foreach (var newEvent in newEvents)
302✔
296
                        {
297
                            workItem.OrchestrationRuntimeState.AddEvent(newEvent);
124✔
298
                        }
299
                        workItem.OrchestrationRuntimeState.Status = completeRequest.NewStatus;
27✔
300

301
                        var newOrchestrationRuntimeState = workItem.OrchestrationRuntimeState;
27✔
302
                        var newOrchestrationRuntimeStateEvents = completeRequest.NewOrchestrationEvents.Select(x => _options.DataConverter.Deserialize<HistoryEvent>(x)).ToArray();
62✔
303
                        if (newOrchestrationRuntimeStateEvents.Length > 0)
27✔
304
                        {
305
                            newOrchestrationRuntimeState = new OrchestrationRuntimeState();
7✔
306
                            foreach (var newEvent in newOrchestrationRuntimeStateEvents)
84✔
307
                            {
308
                                newOrchestrationRuntimeState.AddEvent(newEvent);
35✔
309
                            }
310
                            newOrchestrationRuntimeState.Status = completeRequest.NewOrchestrationStatus;
7✔
311
                        }
312

313
                        var orchestrationState = Utils.BuildOrchestrationState(newOrchestrationRuntimeState);
27✔
314

315
                        await _orchestrationService.CompleteTaskOrchestrationWorkItemAsync(
27✔
316
                            workItem,
27✔
317
                            newOrchestrationRuntimeState,
27✔
318
                            outboundMessages,
27✔
319
                            orchestratorMessages,
27✔
320
                            timerMessages,
27✔
321
                            continuedAsNewMessage,
27✔
322
                            orchestrationState);
27✔
323

324
                        newOrchestrationRuntimeState.NewEvents.Clear();
26✔
325

326
                        workItem.OrchestrationRuntimeState = newOrchestrationRuntimeState;
26✔
327

328
                        context.CancellationToken.ThrowIfCancellationRequested();
26✔
329

330
                        await responseStream.WriteAsync(new TaskOrchestrationResponse
26✔
331
                        {
26✔
332
                            CompleteResponse = new CompleteTaskOrchestrationWorkItemResponse()
26✔
333
                        });
26✔
334
                        break;
26✔
335
                    case TaskOrchestrationRequest.MessageOneofCase.FetchRequest:
336
                        var fetchRequest = message.FetchRequest;
12✔
337
                        if (workItem.Session is null)
12✔
338
                        {
339
                            var fetchResponse = new TaskOrchestrationResponse
×
340
                            {
×
341
                                FetchResponse = new FetchNewOrchestrationMessagesResponse
×
342
                                {
×
343
                                    NewMessages = null
×
344
                                }
×
345
                            };
×
346

347
                            context.CancellationToken.ThrowIfCancellationRequested();
×
348

349
                            await responseStream.WriteAsync(fetchResponse);
×
350
                        }
351
                        else
352
                        {
353
                            var newMessages = await workItem.Session.FetchNewOrchestrationMessagesAsync(workItem);
12✔
354

355
                            var fetchResponse = new TaskOrchestrationResponse
12✔
356
                            {
12✔
357
                                FetchResponse = new FetchNewOrchestrationMessagesResponse
12✔
358
                                {
12✔
359
                                    NewMessages = newMessages is null ? null : new OrchestrationMessages
12✔
360
                                    {
12✔
361
                                        Messages = { newMessages.Select(_options.DataConverter.Serialize) }
12✔
362
                                    }
12✔
363
                                }
12✔
364
                            };
12✔
365

366
                            context.CancellationToken.ThrowIfCancellationRequested();
12✔
367

368
                            await responseStream.WriteAsync(fetchResponse);
12✔
369
                        }
370
                        break;
12✔
371
                    case TaskOrchestrationRequest.MessageOneofCase.ReleaseRequest:
372
                        var releaseRequest = message.ReleaseRequest;
14✔
373
                        await _orchestrationService.ReleaseTaskOrchestrationWorkItemAsync(workItem);
14✔
374
                        context.CancellationToken.ThrowIfCancellationRequested();
14✔
375
                        await responseStream.WriteAsync(new TaskOrchestrationResponse
14✔
376
                        {
14✔
377
                            ReleaseResponse = new ReleaseTaskOrchestrationWorkItemResponse()
14✔
378
                        });
14✔
379
                        break;
14✔
380
                    case TaskOrchestrationRequest.MessageOneofCase.AbandonRequest:
381
                        var abandonRequest = message.AbandonRequest;
×
382
                        await _orchestrationService.AbandonTaskOrchestrationWorkItemAsync(workItem);
×
383
                        context.CancellationToken.ThrowIfCancellationRequested();
×
384
                        await responseStream.WriteAsync(new TaskOrchestrationResponse
×
385
                        {
×
386
                            AbandonResponse = new AbandonTaskOrchestrationWorkItemLockResponse()
×
387
                        });
×
388
                        break;
389
                }
390
            }
391
        }
392
        catch (OperationCanceledException) when (context.CancellationToken.IsCancellationRequested)
15✔
393
        {
394
            // Avoid exceptions when clients cancel request
395
        }
396
    }
397

398
    public override async Task<LockNextTaskActivityWorkItemResponse> LockNextTaskActivityWorkItem(LockNextTaskActivityWorkItemRequest request, ServerCallContext context)
399
    {
400
        try
401
        {
402
            var activities = request.Activities.Select(x => new NameVersion(x.Name, x.Version)).ToArray();
128✔
403

404
            var workItem = await (request.AllActivities
32✔
405
                ? _orchestrationService
32✔
406
                    .LockNextTaskActivityWorkItem(request.ReceiveTimeout.ToTimeSpan(), context.CancellationToken)
32✔
407
                : (_distributedOrchestrationService ?? throw DistributedWorkersNotSupported())
32✔
408
                    .LockNextTaskActivityWorkItem(request.ReceiveTimeout.ToTimeSpan(), activities, context.CancellationToken));
32✔
409

410
            var response = new LockNextTaskActivityWorkItemResponse
18✔
411
            {
18✔
412
                WorkItem = workItem is null ? null : ToGrpcWorkItem(workItem)
18✔
413
            };
18✔
414

415
            return response;
18✔
416
        }
417
        catch (OperationCanceledException) when (context.CancellationToken.IsCancellationRequested)
14✔
418
        {
419
            // Avoid exceptions when clients cancel request
420
            return null;
14✔
421
        }
422
    }
423

424
    public override async Task<RenewTaskActivityWorkItemLockResponse> RenewTaskActivityWorkItemLock(RenewTaskActivityWorkItemLockRequest request, ServerCallContext context)
425
    {
426
        var workItem = ToDurableTaskWorkItem(request.WorkItem);
×
427

428
        var newWorkItem = await _orchestrationService.RenewTaskActivityWorkItemLockAsync(workItem);
×
429

430
        var response = new RenewTaskActivityWorkItemLockResponse
×
431
        {
×
432
            WorkItem = ToGrpcWorkItem(newWorkItem)
×
433
        };
×
434

435
        return response;
×
436
    }
437

438
    public override async Task<Empty> CompleteTaskActivityWorkItem(
439
        CompleteTaskActivityWorkItemRequest request, ServerCallContext context)
440
    {
441
        var workItem = ToDurableTaskWorkItem(request.WorkItem);
18✔
442

443
        var responseMessage = _options.DataConverter.Deserialize<TaskMessage>(request.ResponseMessage);
18✔
444

445
        await _orchestrationService.CompleteTaskActivityWorkItemAsync(workItem, responseMessage);
18✔
446

447
        return new Empty();
18✔
448
    }
449

450
    public override async Task<Empty> AbandonTaskActivityWorkItem(AbandonTaskActivityWorkItemRequest request, ServerCallContext context)
451
    {
452
        var workItem = ToDurableTaskWorkItem(request.WorkItem);
×
453

454
        await _orchestrationService.AbandonTaskActivityWorkItemAsync(workItem);
×
455

456
        return new Empty();
×
457
    }
458

459
    private DurableTaskGrpc.TaskActivityWorkItem ToGrpcWorkItem(TaskActivityWorkItem workItem)
460
    {
461
        return new DurableTaskGrpc.TaskActivityWorkItem
18✔
462
        {
18✔
463
            Id = workItem.Id,
18✔
464
            LockedUntilUtc = Timestamp.FromDateTime(workItem.LockedUntilUtc),
18✔
465
            TaskMessage = _options.DataConverter.Serialize(workItem.TaskMessage)
18✔
466
        };
18✔
467
    }
468

469
    private TaskActivityWorkItem ToDurableTaskWorkItem(DurableTaskGrpc.TaskActivityWorkItem grpcWorkItem)
470
    {
471
        return new TaskActivityWorkItem
18✔
472
        {
18✔
473
            Id = grpcWorkItem.Id,
18✔
474
            LockedUntilUtc = grpcWorkItem.LockedUntilUtc.ToDateTime(),
18✔
475
            TaskMessage = _options.DataConverter.Deserialize<TaskMessage>(grpcWorkItem.TaskMessage)
18✔
476
        };
18✔
477
    }
478

479
    private Exception DistributedWorkersNotSupported()
480
    {
481
        return NotSupported("Distributed workers");
×
482
    }
483

484
    private Exception NotSupported(string operation)
485
    {
486
        return new NotSupportedException($"{operation} is not supported by storage implementation");
×
487
    }
488
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc