• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lucaslorentz / durabletask-extensions / 27982538143

22 Jun 2026 08:40PM UTC coverage: 76.52% (-0.6%) from 77.164%
27982538143

push

github

lucaslorentz
perf: cancel an orchestration's pending timers when it finishes

Optimization, not a bug fix: a finished orchestration's pending timers are
already harmless — when they fire they're dropped because it's no longer
running. This discards them earlier, when the execution reaches a terminal
state in CompleteTaskOrchestrationWorkItemAsync, so e.g. an unfired
Task.WhenAny timeout doesn't linger in the queue until its (possibly far-off)
due time.

Also replaces the flaky TimerOrchestration_ShouldTerminateProperly test with
TerminatedOrchestration_ShouldCancelPendingTimers: it schedules many timers
500ms apart, terminates after at least one has fired (while the rest are still
pending), and asserts the orchestration ends Terminated with no pending
messages left. The later timers can't fire within the test window, so an empty
queue proves they were cancelled.

- Reduce TimerOrchestration's timer 2s -> 1s to speed TimerOrchestration_ShouldComplete.
- Ignore *.lscache and test-results/.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

25 of 25 new or added lines in 1 file covered. (100.0%)

26 existing lines in 4 files now uncovered.

2291 of 2994 relevant lines covered (76.52%)

140.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.21
/src/LLL.DurableTask.Server.Grpc/GrpcServerOrchestrationService.cs
1
using System;
2
using System.Linq;
3
using System.Threading.Tasks;
4
using DurableTask.Core;
5
using DurableTask.Core.Common;
6
using DurableTask.Core.History;
7
using DurableTask.Core.Query;
8
using DurableTaskGrpc;
9
using Google.Protobuf.WellKnownTypes;
10
using Grpc.Core;
11
using LLL.DurableTask.Core;
12
using Microsoft.Extensions.Logging;
13
using Microsoft.Extensions.Options;
14
using static DurableTaskGrpc.OrchestrationService;
15
using TaskActivityWorkItem = DurableTask.Core.TaskActivityWorkItem;
16
using TaskOrchestrationWorkItem = DurableTask.Core.TaskOrchestrationWorkItem;
17

18
namespace LLL.DurableTask.Server.Grpc.Server;
19

20
public class GrpcServerOrchestrationService : OrchestrationServiceBase
21
{
22
    private readonly GrpcServerOrchestrationServiceOptions _options;
23
    private readonly IOrchestrationService _orchestrationService;
24
    private readonly IOrchestrationServiceClient _orchestrationServiceClient;
25
    private readonly ILogger<GrpcServerOrchestrationService> _logger;
26
    private readonly IDistributedOrchestrationService _distributedOrchestrationService;
27
    private readonly IOrchestrationServiceQueryClient _orchestrationServiceQueryClient;
28
    private readonly IOrchestrationServicePurgeClient _orchestrationServicePurgeClient;
29
    private readonly IOrchestrationServiceFeaturesClient _orchestrationServiceFeaturesClient;
30
    private readonly IOrchestrationServiceRewindClient _orchestrationServiceRewindClient;
31

32
    public GrpcServerOrchestrationService(
14✔
33
        IOptions<GrpcServerOrchestrationServiceOptions> options,
14✔
34
        IOrchestrationService orchestrationService,
14✔
35
        IOrchestrationServiceClient orchestrationServiceClient,
14✔
36
        ILogger<GrpcServerOrchestrationService> logger,
14✔
37
        IDistributedOrchestrationService distributedOrchestrationService = null,
14✔
38
        IOrchestrationServiceQueryClient orchestrationServiceQueryClient = null,
14✔
39
        IOrchestrationServicePurgeClient orchestrationServicePurgeClient = null,
14✔
40
        IOrchestrationServiceFeaturesClient orchestrationServiceFeaturesClient = null,
14✔
41
        IOrchestrationServiceRewindClient orchestrationServiceRewindClient = null)
14✔
42
    {
43
        _options = options.Value;
14✔
44
        _orchestrationService = orchestrationService;
14✔
45
        _orchestrationServiceClient = orchestrationServiceClient;
14✔
46
        _logger = logger;
14✔
47
        _distributedOrchestrationService = distributedOrchestrationService;
14✔
48
        _orchestrationServiceQueryClient = orchestrationServiceQueryClient;
14✔
49
        _orchestrationServicePurgeClient = orchestrationServicePurgeClient;
14✔
50
        _orchestrationServiceFeaturesClient = orchestrationServiceFeaturesClient;
14✔
51
        _orchestrationServiceRewindClient = orchestrationServiceRewindClient;
14✔
52
    }
53

54
    public override async Task<GetFeaturesResponse> GetFeatures(Empty request, ServerCallContext context)
55
    {
56
        var features = _orchestrationServiceFeaturesClient is not null
×
57
            ? await _orchestrationServiceFeaturesClient.GetFeatures()
×
58
            : Array.Empty<OrchestrationFeature>();
×
59

60
        return new GetFeaturesResponse
×
61
        {
×
62
            Features = { features.Select(f => (int)f) }
×
63
        };
×
64
    }
65

66
    public override async Task<Empty> CreateTaskOrchestration(CreateTaskOrchestrationRequest request, ServerCallContext context)
67
    {
68
        var creationMessage = _options.DataConverter.Deserialize<TaskMessage>(request.CreationMessage);
14✔
69
        var dedupeStatuses = request.DedupeStatuses.Count > 0
14✔
70
            ? request.DedupeStatuses.Select(x => (OrchestrationStatus)x).ToArray()
14✔
71
            : null;
14✔
72

73
        await _orchestrationServiceClient.CreateTaskOrchestrationAsync(creationMessage, dedupeStatuses);
14✔
74

75
        return new Empty();
14✔
76
    }
77

78
    public override async Task<Empty> ForceTerminateTaskOrchestration(ForceTerminateTaskOrchestrationRequest request, ServerCallContext context)
79
    {
80
        await _orchestrationServiceClient.ForceTerminateTaskOrchestrationAsync(request.InstanceId, request.Reason);
1✔
81

82
        return new Empty();
1✔
83
    }
84

85
    public override async Task<Empty> RewindTaskOrchestration(RewindTaskOrchestrationRequest request, ServerCallContext context)
86
    {
87
        await (_orchestrationServiceRewindClient ?? throw NotSupported("Rewind"))
×
88
            .RewindTaskOrchestrationAsync(request.InstanceId, request.Reason);
×
89

90
        return new Empty();
×
91
    }
92

93
    public override async Task<GetOrchestrationHistoryResponse> GetOrchestrationHistory(GetOrchestrationHistoryRequest request, ServerCallContext context)
94
    {
UNCOV
95
        var history = await _orchestrationServiceClient.GetOrchestrationHistoryAsync(
×
UNCOV
96
            request.InstanceId,
×
UNCOV
97
            request.ExecutionId);
×
98

UNCOV
99
        var response = new GetOrchestrationHistoryResponse
×
UNCOV
100
        {
×
UNCOV
101
            History = history
×
UNCOV
102
        };
×
103

UNCOV
104
        return response;
×
105
    }
106

107
    public override async Task<GetOrchestrationInstanceStateResponse> GetOrchestrationInstanceState(GetOrchestrationInstanceStateRequest request, ServerCallContext context)
108
    {
109
        var states = await _orchestrationServiceClient.GetOrchestrationStateAsync(
23✔
110
            request.InstanceId,
23✔
111
            request.AllExecutions);
23✔
112

113
        var response = new GetOrchestrationInstanceStateResponse
23✔
114
        {
23✔
115
            States = { states.Select(s => _options.DataConverter.Serialize(s)) }
44✔
116
        };
23✔
117

118
        return response;
23✔
119
    }
120

121
    public override async Task<GetOrchestrationStateResponse> GetOrchestrationState(GetOrchestrationStateRequest request, ServerCallContext context)
122
    {
123
        var state = await _orchestrationServiceClient.GetOrchestrationStateAsync(
1✔
124
            request.InstanceId,
1✔
125
            request.ExecutionId);
1✔
126

127
        var response = new GetOrchestrationStateResponse
1✔
128
        {
1✔
129
            State = state is null ? null : _options.DataConverter.Serialize(state)
1✔
130
        };
1✔
131

132
        return response;
1✔
133
    }
134

135
    public override async Task<Empty> PurgeOrchestrationHistory(PurgeOrchestrationHistoryRequest request, ServerCallContext context)
136
    {
137
        await _orchestrationServiceClient.PurgeOrchestrationHistoryAsync(
×
138
            request.ThresholdDateTimeUtc.ToDateTime(),
×
139
            (OrchestrationStateTimeRangeFilterType)request.TimeRangeFilterType);
×
140

141
        return new Empty();
×
142
    }
143

144
    public override async Task<WaitForOrchestrationResponse> WaitForOrchestration(WaitForOrchestrationRequest request, ServerCallContext context)
145
    {
146
        var state = await _orchestrationServiceClient.WaitForOrchestrationAsync(
20✔
147
            request.InstanceId,
20✔
148
            request.ExecutionId,
20✔
149
            request.Timeout.ToTimeSpan(),
20✔
150
            context.CancellationToken);
20✔
151

152
        var response = new WaitForOrchestrationResponse
20✔
153
        {
20✔
154
            State = state is null ? null : _options.DataConverter.Serialize(state)
20✔
155
        };
20✔
156

157
        return response;
20✔
158
    }
159

160
    public override async Task<GetOrchestrationWithQueryResponse> GetOrchestrationWithQuery(GetOrchestrationWithQueryRequest request, ServerCallContext context)
161
    {
162
        var query = new OrchestrationQueryExtended();
1✔
163
        query.RuntimeStatus = request.RuntimeStatus.Select(s => (OrchestrationStatus)s).ToArray();
1✔
164
        query.CreatedTimeFrom = request.CreatedTimeFrom?.ToDateTime();
1✔
165
        query.CreatedTimeTo = request.CreatedTimeTo?.ToDateTime();
1✔
166
        query.TaskHubNames = request.TaskHubNames;
1✔
167
        query.PageSize = request.PageSize;
1✔
168
        query.ContinuationToken = request.ContinuationToken;
1✔
169
        query.InstanceIdPrefix = request.InstanceIdPrefix;
1✔
170
        query.FetchInputsAndOutputs = request.FetchInputsAndOutputs;
1✔
171
        query.NamePrefix = request.NamePrefix;
1✔
172
        query.CompletedTimeFrom = request.CompletedTimeFrom?.ToDateTime();
1✔
173
        query.CompletedTimeTo = request.CompletedTimeTo?.ToDateTime();
1✔
174
        query.IncludePreviousExecutions = request.IncludePreviousExecutions;
1✔
175
        foreach (var kv in request.Tags)
6✔
176
            query.Tags.Add(kv.Key, kv.Value);
2✔
177

178
        var queryResult = await _orchestrationServiceQueryClient.GetOrchestrationWithQueryAsync(query, context.CancellationToken);
1✔
179

180
        var response = new GetOrchestrationWithQueryResponse
1✔
181
        {
1✔
182
            OrchestrationState = { queryResult.OrchestrationState.Select(s => _options.DataConverter.Serialize(s)) },
2✔
183
            ContinuationToken = queryResult.ContinuationToken
1✔
184
        };
1✔
185

186
        return response;
1✔
187
    }
188

189
    public override async Task<PurgeInstanceHistoryResponse> PurgeInstanceHistory(PurgeInstanceHistoryRequest request, ServerCallContext context)
190
    {
191
        var client = _orchestrationServicePurgeClient ?? throw NotSupported("PurgeInstanceHistory");
2✔
192

193
        PurgeResult result;
194

195
        if (!string.IsNullOrEmpty(request.InstanceId))
2✔
196
        {
197
            result = await client.PurgeInstanceStateAsync(request.InstanceId);
1✔
198
        }
199
        else
200
        {
201
            var createdTimeFrom = request.CreatedTimeFrom?.ToDateTime() ?? DateTime.MinValue;
1✔
202
            var createdTimeTo = request.CreatedTimeTo?.ToDateTime();
1✔
203
            var runtimeStatus = request.RuntimeStatus.Select(s => (OrchestrationStatus)s).ToArray();
9✔
204

205
            var filter = new PurgeInstanceFilterExtended(createdTimeFrom, createdTimeTo, runtimeStatus)
1✔
206
            {
1✔
207
                Limit = request.HasLimit ? request.Limit : null
1✔
208
            };
1✔
209

210
            result = await client.PurgeInstanceStateAsync(filter);
1✔
211
        }
212

213
        return new PurgeInstanceHistoryResponse
2✔
214
        {
2✔
215
            InstancesDeleted = result.DeletedInstanceCount
2✔
216
        };
2✔
217
    }
218

219
    public override async Task<Empty> SendTaskOrchestrationMessageBatch(SendTaskOrchestrationMessageBatchRequest request, ServerCallContext context)
220
    {
221
        var messages = request.Messages.Select(x => _options.DataConverter.Deserialize<TaskMessage>(x)).ToArray();
4✔
222

223
        await _orchestrationServiceClient.SendTaskOrchestrationMessageBatchAsync(messages);
2✔
224

225
        return new Empty();
2✔
226
    }
227

228
    public override async Task LockNextTaskOrchestrationWorkItem(IAsyncStreamReader<TaskOrchestrationRequest> requestStream, IServerStreamWriter<TaskOrchestrationResponse> responseStream, ServerCallContext context)
229
    {
230
        try
231
        {
232
            TaskOrchestrationWorkItem workItem = null;
29✔
233

234
            // Receive and reply each message
235
            await foreach (var message in requestStream.ReadAllAsync(context.CancellationToken))
212✔
236
            {
237
                switch (message.MessageCase)
84✔
238
                {
239
                    case TaskOrchestrationRequest.MessageOneofCase.LockRequest:
240
                        var lockRequest = message.LockRequest;
29✔
241
                        var orchestrations = lockRequest.Orchestrations.Select(x => new NameVersion(x.Name, x.Version)).ToArray();
290✔
242

243
                        workItem = await (lockRequest.AllOrchestrations
29✔
244
                            ? _orchestrationService
29✔
245
                                .LockNextTaskOrchestrationWorkItemAsync(lockRequest.ReceiveTimeout.ToTimeSpan(), context.CancellationToken)
29✔
246
                            : (_distributedOrchestrationService ?? throw DistributedWorkersNotSupported())
29✔
247
                                .LockNextTaskOrchestrationWorkItemAsync(lockRequest.ReceiveTimeout.ToTimeSpan(), orchestrations, context.CancellationToken)
29✔
248
                        );
29✔
249

250
                        var lockResponse = new TaskOrchestrationResponse
15✔
251
                        {
15✔
252
                            LockResponse = new LockNextTaskOrchestrationWorkItemResponse
15✔
253
                            {
15✔
254
                                WorkItem = workItem is null ? null : new DurableTaskGrpc.TaskOrchestrationWorkItem
15✔
255
                                {
15✔
256
                                    InstanceId = workItem.InstanceId,
15✔
257
                                    LockedUntilUtc = Timestamp.FromDateTime(workItem.LockedUntilUtc),
15✔
258
                                    Events = { workItem.OrchestrationRuntimeState.Events.Select(_options.DataConverter.Serialize) },
15✔
259
                                    NewMessages = { workItem.NewMessages.Select(_options.DataConverter.Serialize) }
15✔
260
                                }
15✔
261
                            }
15✔
262
                        };
15✔
263

264
                        context.CancellationToken.ThrowIfCancellationRequested();
15✔
265

266
                        await responseStream.WriteAsync(lockResponse);
15✔
267
                        break;
15✔
268
                    case TaskOrchestrationRequest.MessageOneofCase.RenewRequest:
269
                        var renewRequest = message.RenewRequest;
×
270
                        await _orchestrationService.RenewTaskOrchestrationWorkItemLockAsync(workItem);
×
271

272
                        var renewResponse = new TaskOrchestrationResponse
×
273
                        {
×
274
                            RenewResponse = new RenewTaskOrchestrationWorkItemLockResponse
×
275
                            {
×
276
                                LockedUntilUtc = Timestamp.FromDateTime(workItem.LockedUntilUtc)
×
277
                            }
×
278
                        };
×
279

280
                        context.CancellationToken.ThrowIfCancellationRequested();
×
281

282
                        await responseStream.WriteAsync(renewResponse);
×
283
                        break;
×
284
                    case TaskOrchestrationRequest.MessageOneofCase.CompleteRequest:
285
                        var completeRequest = message.CompleteRequest;
28✔
286
                        var outboundMessages = completeRequest.OutboundMessages.Select(x => _options.DataConverter.Deserialize<TaskMessage>(x)).ToArray();
46✔
287
                        var timerMessages = completeRequest.TimerMessages.Select(x => _options.DataConverter.Deserialize<TaskMessage>(x)).ToArray();
129✔
288
                        var orchestratorMessages = completeRequest.OrchestratorMessages.Select(x => _options.DataConverter.Deserialize<TaskMessage>(x)).ToArray();
34✔
289
                        var continuedAsNewMessage = string.IsNullOrEmpty(completeRequest.ContinuedAsNewMessage)
28✔
290
                            ? null
28✔
291
                            : _options.DataConverter.Deserialize<TaskMessage>(completeRequest.ContinuedAsNewMessage);
28✔
292

293
                        var newEvents = completeRequest.NewEvents.Select(x => _options.DataConverter.Deserialize<HistoryEvent>(x)).ToArray();
254✔
294
                        workItem.OrchestrationRuntimeState ??= new OrchestrationRuntimeState();
28✔
295
                        foreach (var newEvent in newEvents)
508✔
296
                        {
297
                            workItem.OrchestrationRuntimeState.AddEvent(newEvent);
226✔
298
                        }
299
                        workItem.OrchestrationRuntimeState.Status = completeRequest.NewStatus;
28✔
300

301
                        var newOrchestrationRuntimeState = workItem.OrchestrationRuntimeState;
28✔
302
                        var newOrchestrationRuntimeStateEvents = completeRequest.NewOrchestrationEvents.Select(x => _options.DataConverter.Deserialize<HistoryEvent>(x)).ToArray();
63✔
303
                        if (newOrchestrationRuntimeStateEvents.Length > 0)
28✔
304
                        {
305
                            newOrchestrationRuntimeState = new OrchestrationRuntimeState();
7✔
306
                            foreach (var newEvent in newOrchestrationRuntimeStateEvents)
84✔
307
                            {
308
                                newOrchestrationRuntimeState.AddEvent(newEvent);
35✔
309
                            }
310
                            newOrchestrationRuntimeState.Status = completeRequest.NewOrchestrationStatus;
7✔
311
                        }
312

313
                        var orchestrationState = Utils.BuildOrchestrationState(newOrchestrationRuntimeState);
28✔
314

315
                        await _orchestrationService.CompleteTaskOrchestrationWorkItemAsync(
28✔
316
                            workItem,
28✔
317
                            newOrchestrationRuntimeState,
28✔
318
                            outboundMessages,
28✔
319
                            orchestratorMessages,
28✔
320
                            timerMessages,
28✔
321
                            continuedAsNewMessage,
28✔
322
                            orchestrationState);
28✔
323

324
                        newOrchestrationRuntimeState.NewEvents.Clear();
28✔
325

326
                        workItem.OrchestrationRuntimeState = newOrchestrationRuntimeState;
28✔
327

328
                        context.CancellationToken.ThrowIfCancellationRequested();
28✔
329

330
                        await responseStream.WriteAsync(new TaskOrchestrationResponse
28✔
331
                        {
28✔
332
                            CompleteResponse = new CompleteTaskOrchestrationWorkItemResponse()
28✔
333
                        });
28✔
334
                        break;
28✔
335
                    case TaskOrchestrationRequest.MessageOneofCase.FetchRequest:
336
                        var fetchRequest = message.FetchRequest;
13✔
337
                        if (workItem.Session is null)
13✔
338
                        {
339
                            var fetchResponse = new TaskOrchestrationResponse
×
340
                            {
×
341
                                FetchResponse = new FetchNewOrchestrationMessagesResponse
×
342
                                {
×
343
                                    NewMessages = null
×
344
                                }
×
345
                            };
×
346

347
                            context.CancellationToken.ThrowIfCancellationRequested();
×
348

349
                            await responseStream.WriteAsync(fetchResponse);
×
350
                        }
351
                        else
352
                        {
353
                            var newMessages = await workItem.Session.FetchNewOrchestrationMessagesAsync(workItem);
13✔
354

355
                            var fetchResponse = new TaskOrchestrationResponse
13✔
356
                            {
13✔
357
                                FetchResponse = new FetchNewOrchestrationMessagesResponse
13✔
358
                                {
13✔
359
                                    NewMessages = newMessages is null ? null : new OrchestrationMessages
13✔
360
                                    {
13✔
361
                                        Messages = { newMessages.Select(_options.DataConverter.Serialize) }
13✔
362
                                    }
13✔
363
                                }
13✔
364
                            };
13✔
365

366
                            context.CancellationToken.ThrowIfCancellationRequested();
13✔
367

368
                            await responseStream.WriteAsync(fetchResponse);
13✔
369
                        }
370
                        break;
13✔
371
                    case TaskOrchestrationRequest.MessageOneofCase.ReleaseRequest:
372
                        var releaseRequest = message.ReleaseRequest;
14✔
373
                        await _orchestrationService.ReleaseTaskOrchestrationWorkItemAsync(workItem);
14✔
374
                        context.CancellationToken.ThrowIfCancellationRequested();
14✔
375
                        await responseStream.WriteAsync(new TaskOrchestrationResponse
14✔
376
                        {
14✔
377
                            ReleaseResponse = new ReleaseTaskOrchestrationWorkItemResponse()
14✔
378
                        });
14✔
379
                        break;
14✔
380
                    case TaskOrchestrationRequest.MessageOneofCase.AbandonRequest:
381
                        var abandonRequest = message.AbandonRequest;
×
382
                        await _orchestrationService.AbandonTaskOrchestrationWorkItemAsync(workItem);
×
383
                        context.CancellationToken.ThrowIfCancellationRequested();
×
384
                        await responseStream.WriteAsync(new TaskOrchestrationResponse
×
385
                        {
×
386
                            AbandonResponse = new AbandonTaskOrchestrationWorkItemLockResponse()
×
387
                        });
×
388
                        break;
389
                }
390
            }
391
        }
392
        catch (OperationCanceledException) when (context.CancellationToken.IsCancellationRequested)
15✔
393
        {
394
            // Avoid exceptions when clients cancel request
395
        }
396
    }
397

398
    public override async Task<LockNextTaskActivityWorkItemResponse> LockNextTaskActivityWorkItem(LockNextTaskActivityWorkItemRequest request, ServerCallContext context)
399
    {
400
        try
401
        {
402
            var activities = request.Activities.Select(x => new NameVersion(x.Name, x.Version)).ToArray();
128✔
403

404
            var workItem = await (request.AllActivities
32✔
405
                ? _orchestrationService
32✔
406
                    .LockNextTaskActivityWorkItem(request.ReceiveTimeout.ToTimeSpan(), context.CancellationToken)
32✔
407
                : (_distributedOrchestrationService ?? throw DistributedWorkersNotSupported())
32✔
408
                    .LockNextTaskActivityWorkItem(request.ReceiveTimeout.ToTimeSpan(), activities, context.CancellationToken));
32✔
409

410
            var response = new LockNextTaskActivityWorkItemResponse
18✔
411
            {
18✔
412
                WorkItem = workItem is null ? null : ToGrpcWorkItem(workItem)
18✔
413
            };
18✔
414

415
            return response;
18✔
416
        }
417
        catch (OperationCanceledException) when (context.CancellationToken.IsCancellationRequested)
14✔
418
        {
419
            // Avoid exceptions when clients cancel request
420
            return null;
14✔
421
        }
422
    }
423

424
    public override async Task<RenewTaskActivityWorkItemLockResponse> RenewTaskActivityWorkItemLock(RenewTaskActivityWorkItemLockRequest request, ServerCallContext context)
425
    {
426
        var workItem = ToDurableTaskWorkItem(request.WorkItem);
×
427

428
        var newWorkItem = await _orchestrationService.RenewTaskActivityWorkItemLockAsync(workItem);
×
429

430
        var response = new RenewTaskActivityWorkItemLockResponse
×
431
        {
×
432
            WorkItem = ToGrpcWorkItem(newWorkItem)
×
433
        };
×
434

435
        return response;
×
436
    }
437

438
    public override async Task<Empty> CompleteTaskActivityWorkItem(
439
        CompleteTaskActivityWorkItemRequest request, ServerCallContext context)
440
    {
441
        var workItem = ToDurableTaskWorkItem(request.WorkItem);
18✔
442

443
        var responseMessage = _options.DataConverter.Deserialize<TaskMessage>(request.ResponseMessage);
18✔
444

445
        await _orchestrationService.CompleteTaskActivityWorkItemAsync(workItem, responseMessage);
18✔
446

447
        return new Empty();
18✔
448
    }
449

450
    public override async Task<Empty> AbandonTaskActivityWorkItem(AbandonTaskActivityWorkItemRequest request, ServerCallContext context)
451
    {
452
        var workItem = ToDurableTaskWorkItem(request.WorkItem);
×
453

454
        await _orchestrationService.AbandonTaskActivityWorkItemAsync(workItem);
×
455

456
        return new Empty();
×
457
    }
458

459
    private DurableTaskGrpc.TaskActivityWorkItem ToGrpcWorkItem(TaskActivityWorkItem workItem)
460
    {
461
        return new DurableTaskGrpc.TaskActivityWorkItem
18✔
462
        {
18✔
463
            Id = workItem.Id,
18✔
464
            LockedUntilUtc = Timestamp.FromDateTime(workItem.LockedUntilUtc),
18✔
465
            TaskMessage = _options.DataConverter.Serialize(workItem.TaskMessage)
18✔
466
        };
18✔
467
    }
468

469
    private TaskActivityWorkItem ToDurableTaskWorkItem(DurableTaskGrpc.TaskActivityWorkItem grpcWorkItem)
470
    {
471
        return new TaskActivityWorkItem
18✔
472
        {
18✔
473
            Id = grpcWorkItem.Id,
18✔
474
            LockedUntilUtc = grpcWorkItem.LockedUntilUtc.ToDateTime(),
18✔
475
            TaskMessage = _options.DataConverter.Deserialize<TaskMessage>(grpcWorkItem.TaskMessage)
18✔
476
        };
18✔
477
    }
478

479
    private Exception DistributedWorkersNotSupported()
480
    {
481
        return NotSupported("Distributed workers");
×
482
    }
483

484
    private Exception NotSupported(string operation)
485
    {
486
        return new NotSupportedException($"{operation} is not supported by storage implementation");
×
487
    }
488
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc