• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lucaslorentz / durabletask-extensions / 27982538143

22 Jun 2026 08:40PM UTC coverage: 76.52% (-0.6%) from 77.164%
27982538143

push

github

lucaslorentz
perf: cancel an orchestration's pending timers when it finishes

Optimization, not a bug fix: a finished orchestration's pending timers are
already harmless — when they fire they're dropped because it's no longer
running. This discards them earlier, when the execution reaches a terminal
state in CompleteTaskOrchestrationWorkItemAsync, so e.g. an unfired
Task.WhenAny timeout doesn't linger in the queue until its (possibly far-off)
due time.

Also replaces the flaky TimerOrchestration_ShouldTerminateProperly test with
TerminatedOrchestration_ShouldCancelPendingTimers: it schedules many timers
500ms apart, terminates after at least one has fired (while the rest are still
pending), and asserts the orchestration ends Terminated with no pending
messages left. The later timers can't fire within the test window, so an empty
queue proves they were cancelled.

- Reduce TimerOrchestration's timer 2s -> 1s to speed TimerOrchestration_ShouldComplete.
- Ignore *.lscache and test-results/.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

25 of 25 new or added lines in 1 file covered. (100.0%)

26 existing lines in 4 files now uncovered.

2291 of 2994 relevant lines covered (76.52%)

140.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.45
/src/LLL.DurableTask.Server.Grpc.Client/GrpcClientOrchestrationServiceClient.cs
1
using System;
2
using System.Collections.Generic;
3
using System.Linq;
4
using System.Threading;
5
using System.Threading.Tasks;
6
using DurableTask.Core;
7
using DurableTask.Core.Query;
8
using DurableTaskGrpc;
9
using Google.Protobuf.WellKnownTypes;
10
using Grpc.Core;
11
using LLL.DurableTask.Core;
12

13
namespace LLL.DurableTask.Server.Client;
14

15
public partial class GrpcClientOrchestrationService :
16
    IOrchestrationServiceClient,
17
    IOrchestrationServiceQueryClient,
18
    IOrchestrationServicePurgeClient,
19
    IOrchestrationServiceFeaturesClient,
20
    IOrchestrationServiceRewindClient
21
{
22
    public async Task<OrchestrationFeature[]> GetFeatures()
23
    {
24
        var response = await _client.GetFeaturesAsync(new Empty());
×
25
        return response.Features
×
26
            .Select(f => (OrchestrationFeature)f)
×
27
            .ToArray();
×
28
    }
29
    public async Task CreateTaskOrchestrationAsync(TaskMessage creationMessage)
30
    {
31
        await CreateTaskOrchestrationAsync(creationMessage, Array.Empty<OrchestrationStatus>());
×
32
    }
33

34
    public async Task CreateTaskOrchestrationAsync(TaskMessage creationMessage, OrchestrationStatus[] dedupeStatuses)
35
    {
36
        var request = new CreateTaskOrchestrationRequest
14✔
37
        {
14✔
38
            CreationMessage = _options.DataConverter.Serialize(creationMessage)
14✔
39
        };
14✔
40

41
        if (dedupeStatuses is not null)
14✔
42
            request.DedupeStatuses.AddRange(dedupeStatuses.Select(s => (int)s));
×
43

44
        await _client.CreateTaskOrchestrationAsync(request);
14✔
45
    }
46

47
    public async Task<string> GetOrchestrationHistoryAsync(string instanceId, string executionId)
48
    {
UNCOV
49
        var request = new GetOrchestrationHistoryRequest
×
UNCOV
50
        {
×
UNCOV
51
            InstanceId = instanceId,
×
UNCOV
52
            ExecutionId = executionId
×
UNCOV
53
        };
×
54

UNCOV
55
        var response = await _client.GetOrchestrationHistoryAsync(request);
×
56

UNCOV
57
        return response.History;
×
58
    }
59

60
    public async Task<IList<OrchestrationState>> GetOrchestrationStateAsync(string instanceId, bool allExecutions)
61
    {
62
        var request = new GetOrchestrationInstanceStateRequest
23✔
63
        {
23✔
64
            InstanceId = instanceId,
23✔
65
            AllExecutions = allExecutions
23✔
66
        };
23✔
67

68
        var response = await _client.GetOrchestrationInstanceStateAsync(request);
23✔
69

70
        return response.States
23✔
71
            .Select(s => _options.DataConverter.Deserialize<OrchestrationState>(s))
44✔
72
            .ToArray();
23✔
73
    }
74

75
    public async Task<OrchestrationState> GetOrchestrationStateAsync(string instanceId, string executionId)
76
    {
77
        var request = new GetOrchestrationStateRequest
1✔
78
        {
1✔
79
            InstanceId = instanceId,
1✔
80
            ExecutionId = executionId
1✔
81
        };
1✔
82

83
        var response = await _client.GetOrchestrationStateAsync(request);
1✔
84

85
        return string.IsNullOrEmpty(response.State) ? null : _options.DataConverter.Deserialize<OrchestrationState>(response.State);
1✔
86
    }
87

88
    public async Task ForceTerminateTaskOrchestrationAsync(string instanceId, string reason)
89
    {
90
        var request = new ForceTerminateTaskOrchestrationRequest
1✔
91
        {
1✔
92
            InstanceId = instanceId,
1✔
93
            Reason = reason
1✔
94
        };
1✔
95

96
        await _client.ForceTerminateTaskOrchestrationAsync(request);
1✔
97
    }
98

99
    public async Task RewindTaskOrchestrationAsync(string instanceId, string reason)
100
    {
101
        var request = new RewindTaskOrchestrationRequest
×
102
        {
×
103
            InstanceId = instanceId,
×
104
            Reason = reason
×
105
        };
×
106

107
        await _client.RewindTaskOrchestrationAsync(request);
×
108
    }
109

110
    public async Task PurgeOrchestrationHistoryAsync(DateTime thresholdDateTimeUtc, OrchestrationStateTimeRangeFilterType timeRangeFilterType)
111
    {
112
        var request = new PurgeOrchestrationHistoryRequest
×
113
        {
×
114
            ThresholdDateTimeUtc = ToTimestamp(thresholdDateTimeUtc),
×
115
            TimeRangeFilterType = (OrchestrationTimeFilterType)timeRangeFilterType
×
116
        };
×
117

118
        await _client.PurgeOrchestrationHistoryAsync(request);
×
119
    }
120

121
    public async Task SendTaskOrchestrationMessageAsync(TaskMessage message)
122
    {
123
        await SendTaskOrchestrationMessageBatchAsync(message);
2✔
124
    }
125

126
    public async Task SendTaskOrchestrationMessageBatchAsync(params TaskMessage[] messages)
127
    {
128
        var request = new SendTaskOrchestrationMessageBatchRequest
2✔
129
        {
2✔
130
            Messages = { messages.Select(_options.DataConverter.Serialize) }
2✔
131
        };
2✔
132

133
        await _client.SendTaskOrchestrationMessageBatchAsync(request);
2✔
134
    }
135

136
    public async Task<OrchestrationState> WaitForOrchestrationAsync(string instanceId, string executionId, TimeSpan timeout, CancellationToken cancellationToken)
137
    {
138
        var request = new WaitForOrchestrationRequest
20✔
139
        {
20✔
140
            InstanceId = instanceId,
20✔
141
            ExecutionId = executionId,
20✔
142
            Timeout = Duration.FromTimeSpan(timeout)
20✔
143
        };
20✔
144

145
        var callOptions = new CallOptions(cancellationToken: cancellationToken);
20✔
146

147
        var response = await _client.WaitForOrchestrationAsync(request, callOptions);
20✔
148

149
        return string.IsNullOrEmpty(response.State) ? null : _options.DataConverter.Deserialize<OrchestrationState>(response.State);
20✔
150
    }
151

152
    public async Task<OrchestrationQueryResult> GetOrchestrationWithQueryAsync(OrchestrationQuery query, CancellationToken cancellationToken)
153
    {
154
        var request = new GetOrchestrationWithQueryRequest();
1✔
155

156
        if (query.RuntimeStatus is not null)
1✔
157
            request.RuntimeStatus.AddRange(query.RuntimeStatus.Select(s => (int)s));
×
158

159
        request.CreatedTimeFrom = ToTimestamp(query.CreatedTimeFrom);
1✔
160
        request.CreatedTimeTo = ToTimestamp(query.CreatedTimeTo);
1✔
161

162
        if (query.TaskHubNames is not null)
1✔
163
            request.TaskHubNames.AddRange(query.TaskHubNames);
×
164

165
        request.PageSize = query.PageSize;
1✔
166
        request.ContinuationToken = query.ContinuationToken;
1✔
167
        request.InstanceIdPrefix = query.InstanceIdPrefix;
1✔
168
        request.FetchInputsAndOutputs = query.FetchInputsAndOutputs;
1✔
169

170
        if (query is OrchestrationQueryExtended extendedQuery)
1✔
171
        {
172
            request.NamePrefix = extendedQuery.NamePrefix;
1✔
173
            request.CompletedTimeFrom = ToTimestamp(extendedQuery.CompletedTimeFrom);
1✔
174
            request.CompletedTimeTo = ToTimestamp(extendedQuery.CompletedTimeTo);
1✔
175
            request.IncludePreviousExecutions = extendedQuery.IncludePreviousExecutions;
1✔
176
            foreach (var kv in extendedQuery.Tags)
6✔
177
                request.Tags.Add(kv.Key, kv.Value);
2✔
178
        }
179

180
        var callOptions = new CallOptions(cancellationToken: cancellationToken);
1✔
181

182
        var response = await _client.GetOrchestrationWithQueryAsync(request, callOptions);
1✔
183

184
        var orchestrationsState = response.OrchestrationState
1✔
185
            .Select(s => _options.DataConverter.Deserialize<OrchestrationState>(s))
2✔
186
            .ToArray();
1✔
187

188
        return new OrchestrationQueryResult(orchestrationsState, response.ContinuationToken);
1✔
189
    }
190

191
    public async Task<PurgeResult> PurgeInstanceStateAsync(string instanceId)
192
    {
193
        var request = new PurgeInstanceHistoryRequest
1✔
194
        {
1✔
195
            InstanceId = instanceId
1✔
196
        };
1✔
197

198
        var result = await _client.PurgeInstanceHistoryAsync(request);
1✔
199

200
        return new PurgeResult(result.InstancesDeleted);
1✔
201
    }
202

203
    public async Task<PurgeResult> PurgeInstanceStateAsync(PurgeInstanceFilter filter)
204
    {
205
        var request = new PurgeInstanceHistoryRequest
1✔
206
        {
1✔
207
            CreatedTimeFrom = ToTimestamp(filter.CreatedTimeFrom),
1✔
208
            CreatedTimeTo = ToTimestamp(filter.CreatedTimeTo),
1✔
209
        };
1✔
210

211
        if (filter.RuntimeStatus is not null)
1✔
212
            request.RuntimeStatus.AddRange(filter.RuntimeStatus.Select(s => (int)s));
9✔
213

214
        if (filter is PurgeInstanceFilterExtended filterExtended)
1✔
215
        {
216
            if (filterExtended.Limit is not null)
1✔
217
            {
218
                request.Limit = filterExtended.Limit.Value;
1✔
219
            }
220
        }
221

222
        var result = await _client.PurgeInstanceHistoryAsync(request);
1✔
223

224
        return new PurgeResult(result.InstancesDeleted);
1✔
225
    }
226
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc