• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lucaslorentz / durabletask-extensions / 16703267150

03 Aug 2025 08:57AM UTC coverage: 83.056% (-0.4%) from 83.438%
16703267150

push

github

web-flow
Merge 45c308ddd into 8babccded

2392 of 2880 relevant lines covered (83.06%)

141.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.78
/src/LLL.DurableTask.Server.Grpc.Client/GrpcClientOrchestrationSession.cs
1
using System;
2
using System.Collections.Generic;
3
using System.Linq;
4
using System.Threading;
5
using System.Threading.Tasks;
6
using DurableTask.Core;
7
using DurableTaskGrpc;
8
using Microsoft.Extensions.Logging;
9
using TaskOrchestrationStream = Grpc.Core.AsyncDuplexStreamingCall<DurableTaskGrpc.TaskOrchestrationRequest, DurableTaskGrpc.TaskOrchestrationResponse>;
10
using TaskOrchestrationWorkItem = DurableTask.Core.TaskOrchestrationWorkItem;
11

12
namespace LLL.DurableTask.Server.Client;
13

14
public class GrpcClientOrchestrationSession : IOrchestrationSession
15
{
16
    private static readonly TimeSpan _renewResponseTimeout = TimeSpan.FromSeconds(20);
1✔
17
    private static readonly TimeSpan _completeResponseTimeout = TimeSpan.FromSeconds(20);
1✔
18
    private static readonly TimeSpan _fetchResponseTimeout = TimeSpan.FromHours(1);
1✔
19
    private static readonly TimeSpan _releaseResponseTimeout = TimeSpan.FromSeconds(20);
1✔
20
    private static readonly TimeSpan _abandonResponseTimeout = TimeSpan.FromSeconds(20);
1✔
21

22
    private readonly GrpcClientOrchestrationServiceOptions _options;
23
    private readonly TaskOrchestrationStream _stream;
24
    private readonly ILogger _logger;
25

26
    public GrpcClientOrchestrationSession(
15✔
27
        GrpcClientOrchestrationServiceOptions options,
15✔
28
        TaskOrchestrationStream stream,
15✔
29
        ILogger logger)
15✔
30
    {
31
        _options = options;
15✔
32
        _stream = stream;
15✔
33
        _logger = logger;
15✔
34
    }
35

36
    public async Task Renew(TaskOrchestrationWorkItem workItem)
37
    {
38
        var request = new TaskOrchestrationRequest
×
39
        {
×
40
            RenewRequest = new RenewTaskOrchestrationWorkItemLockRequest()
×
41
        };
×
42

43
        await _stream.RequestStream.WriteAsync(request);
×
44

45
        var cts = new CancellationTokenSource(_renewResponseTimeout);
×
46

47
        if (!await _stream.ResponseStream.MoveNext(cts.Token))
×
48
            throw new Exception("Session aborted");
×
49

50
        if (_stream.ResponseStream.Current.MessageCase != TaskOrchestrationResponse.MessageOneofCase.RenewResponse)
×
51
            throw new Exception("Unexpected response");
×
52

53
        var renewResponse = _stream.ResponseStream.Current.RenewResponse;
×
54
        workItem.LockedUntilUtc = renewResponse.LockedUntilUtc.ToDateTime();
×
55
    }
56

57
    public async Task Complete(
58
        TaskOrchestrationWorkItem workItem,
59
        OrchestrationRuntimeState newOrchestrationRuntimeState,
60
        IList<TaskMessage> outboundMessages,
61
        IList<TaskMessage> orchestratorMessages,
62
        IList<TaskMessage> timerMessages,
63
        TaskMessage continuedAsNewMessage,
64
        OrchestrationState orchestrationState)
65
    {
66
        var request = new TaskOrchestrationRequest
27✔
67
        {
27✔
68
            CompleteRequest = new CompleteTaskOrchestrationWorkItemRequest
27✔
69
            {
27✔
70
                NewEvents = { workItem.OrchestrationRuntimeState.NewEvents.Select(_options.DataConverter.Serialize) },
27✔
71
                NewStatus = workItem.OrchestrationRuntimeState.Status,
27✔
72
                NewOrchestrationEvents = { newOrchestrationRuntimeState == workItem.OrchestrationRuntimeState
27✔
73
                    ? Enumerable.Empty<string>()
27✔
74
                    : newOrchestrationRuntimeState.NewEvents.Select(_options.DataConverter.Serialize) },
27✔
75
                NewOrchestrationStatus = newOrchestrationRuntimeState == workItem.OrchestrationRuntimeState
27✔
76
                    ? null
27✔
77
                    : newOrchestrationRuntimeState.Status,
27✔
78
                OutboundMessages = { outboundMessages.Select(_options.DataConverter.Serialize) },
27✔
79
                OrchestratorMessages = { orchestratorMessages.Select(_options.DataConverter.Serialize) },
27✔
80
                TimerMessages = { timerMessages.Select(_options.DataConverter.Serialize) },
27✔
81
                ContinuedAsNewMessage = continuedAsNewMessage is null
27✔
82
                    ? string.Empty
27✔
83
                    : _options.DataConverter.Serialize(continuedAsNewMessage),
27✔
84
                OrchestrationState = _options.DataConverter.Serialize(orchestrationState)
27✔
85
            }
27✔
86
        };
27✔
87

88
        await _stream.RequestStream.WriteAsync(request);
27✔
89

90
        var cts = new CancellationTokenSource(_completeResponseTimeout);
27✔
91

92
        if (!await _stream.ResponseStream.MoveNext(cts.Token))
27✔
93
            throw new Exception("Session aborted");
×
94

95
        if (_stream.ResponseStream.Current.MessageCase != TaskOrchestrationResponse.MessageOneofCase.CompleteResponse)
27✔
96
            throw new Exception("Unexpected response");
×
97
    }
98

99
    public async Task<IList<TaskMessage>> FetchNewOrchestrationMessagesAsync(
100
        TaskOrchestrationWorkItem workItem)
101
    {
102
        var request = new TaskOrchestrationRequest
12✔
103
        {
12✔
104
            FetchRequest = new FetchNewOrchestrationMessagesRequest()
12✔
105
        };
12✔
106

107
        await _stream.RequestStream.WriteAsync(request);
12✔
108

109
        var cts = new CancellationTokenSource(_fetchResponseTimeout);
12✔
110

111
        if (!await _stream.ResponseStream.MoveNext(cts.Token))
12✔
112
            throw new Exception("Session aborted");
×
113

114
        if (_stream.ResponseStream.Current.MessageCase != TaskOrchestrationResponse.MessageOneofCase.FetchResponse)
12✔
115
            throw new Exception("Unexpected response");
×
116

117
        var fetchResponse = _stream.ResponseStream.Current.FetchResponse;
12✔
118
        if (fetchResponse.NewMessages is null)
12✔
119
            return null;
×
120

121
        return fetchResponse.NewMessages.Messages
12✔
122
            .Select(x => _options.DataConverter.Deserialize<TaskMessage>(x))
35✔
123
            .ToArray();
12✔
124
    }
125

126
    public async Task Abandon(TaskOrchestrationWorkItem workItem)
127
    {
128
        var request = new TaskOrchestrationRequest
×
129
        {
×
130
            AbandonRequest = new AbandonTaskOrchestrationWorkItemLockRequest()
×
131
        };
×
132

133
        await _stream.RequestStream.WriteAsync(request);
×
134

135
        var cts = new CancellationTokenSource(_abandonResponseTimeout);
×
136

137
        if (!await _stream.ResponseStream.MoveNext(cts.Token))
×
138
            throw new Exception("Session aborted");
×
139

140
        if (_stream.ResponseStream.Current.MessageCase != TaskOrchestrationResponse.MessageOneofCase.AbandonResponse)
×
141
            throw new Exception("Unexpected response");
×
142
    }
143

144
    public async Task Release(TaskOrchestrationWorkItem workItem)
145
    {
146
        var request = new TaskOrchestrationRequest
15✔
147
        {
15✔
148
            ReleaseRequest = new ReleaseTaskOrchestrationWorkItemRequest()
15✔
149
        };
15✔
150

151
        await _stream.RequestStream.WriteAsync(request);
15✔
152

153
        var cts = new CancellationTokenSource(_releaseResponseTimeout);
15✔
154

155
        if (!await _stream.ResponseStream.MoveNext(cts.Token))
15✔
156
            throw new Exception("Session aborted");
×
157

158
        if (_stream.ResponseStream.Current.MessageCase != TaskOrchestrationResponse.MessageOneofCase.ReleaseResponse)
15✔
159
            throw new Exception("Unexpected response");
×
160

161
        await _stream.RequestStream.CompleteAsync();
15✔
162

163
        // Last read to close stream
164
        await _stream.ResponseStream.MoveNext(cts.Token);
15✔
165

166
        _stream.Dispose();
15✔
167
    }
168
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc