• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lucaslorentz / durabletask-extensions / 6132933469

09 Sep 2023 08:09PM UTC coverage: 81.196% (+0.04%) from 81.16%
6132933469

Pull #30

github

web-flow
Merge branch 'main' into fix-navigation-suborchestration
Pull Request #30: Fix navigation to suborchestration

2267 of 2792 relevant lines covered (81.2%)

141.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.78
/src/LLL.DurableTask.Server.Grpc.Client/GrpcClientOrchestrationSession.cs
1
using System;
2
using System.Collections.Generic;
3
using System.Linq;
4
using System.Threading;
5
using System.Threading.Tasks;
6
using DurableTask.Core;
7
using DurableTaskGrpc;
8
using Microsoft.Extensions.Logging;
9
using TaskOrchestrationStream = Grpc.Core.AsyncDuplexStreamingCall<DurableTaskGrpc.TaskOrchestrationRequest, DurableTaskGrpc.TaskOrchestrationResponse>;
10
using TaskOrchestrationWorkItem = DurableTask.Core.TaskOrchestrationWorkItem;
11

12
namespace LLL.DurableTask.Server.Client;
13

14
public class GrpcClientOrchestrationSession : IOrchestrationSession
15
{
16
    private static readonly TimeSpan _renewResponseTimeout = TimeSpan.FromSeconds(20);
1✔
17
    private static readonly TimeSpan _completeResponseTimeout = TimeSpan.FromSeconds(20);
1✔
18
    private static readonly TimeSpan _fetchResponseTimeout = TimeSpan.FromHours(1);
1✔
19
    private static readonly TimeSpan _releaseResponseTimeout = TimeSpan.FromSeconds(20);
1✔
20
    private static readonly TimeSpan _abandonResponseTimeout = TimeSpan.FromSeconds(20);
1✔
21

22
    private readonly GrpcClientOrchestrationServiceOptions _options;
23
    private readonly TaskOrchestrationStream _stream;
24
    private readonly ILogger _logger;
25

26
    public GrpcClientOrchestrationSession(
14✔
27
        GrpcClientOrchestrationServiceOptions options,
14✔
28
        TaskOrchestrationStream stream,
14✔
29
        ILogger logger)
14✔
30
    {
31
        _options = options;
14✔
32
        _stream = stream;
14✔
33
        _logger = logger;
14✔
34
    }
35

36
    public async Task Renew(TaskOrchestrationWorkItem workItem)
37
    {
38
        var request = new TaskOrchestrationRequest
×
39
        {
×
40
            RenewRequest = new RenewTaskOrchestrationWorkItemLockRequest()
×
41
        };
×
42

43
        await _stream.RequestStream.WriteAsync(request);
×
44

45
        var cts = new CancellationTokenSource(_renewResponseTimeout);
×
46

47
        if (!await _stream.ResponseStream.MoveNext(cts.Token))
×
48
            throw new Exception("Session aborted");
×
49

50
        if (_stream.ResponseStream.Current.MessageCase != TaskOrchestrationResponse.MessageOneofCase.RenewResponse)
×
51
            throw new Exception("Unexpected response");
×
52

53
        var renewResponse = _stream.ResponseStream.Current.RenewResponse;
×
54
        workItem.LockedUntilUtc = renewResponse.LockedUntilUtc.ToDateTime();
×
55
    }
56

57
    public async Task Complete(
58
        TaskOrchestrationWorkItem workItem,
59
        OrchestrationRuntimeState newOrchestrationRuntimeState,
60
        IList<TaskMessage> outboundMessages,
61
        IList<TaskMessage> orchestratorMessages,
62
        IList<TaskMessage> timerMessages,
63
        TaskMessage continuedAsNewMessage,
64
        OrchestrationState orchestrationState)
65
    {
66
        var request = new TaskOrchestrationRequest
25✔
67
        {
25✔
68
            CompleteRequest = new CompleteTaskOrchestrationWorkItemRequest
25✔
69
            {
25✔
70
                NewEvents = { workItem.OrchestrationRuntimeState.NewEvents.Select(_options.DataConverter.Serialize) },
25✔
71
                NewStatus = workItem.OrchestrationRuntimeState.Status,
25✔
72
                NewOrchestrationEvents = { newOrchestrationRuntimeState == workItem.OrchestrationRuntimeState
25✔
73
                    ? Enumerable.Empty<string>()
25✔
74
                    : newOrchestrationRuntimeState.NewEvents.Select(_options.DataConverter.Serialize) },
25✔
75
                NewOrchestrationStatus = newOrchestrationRuntimeState == workItem.OrchestrationRuntimeState
25✔
76
                    ? null
25✔
77
                    : newOrchestrationRuntimeState.Status,
25✔
78
                OutboundMessages = { outboundMessages.Select(_options.DataConverter.Serialize) },
25✔
79
                OrchestratorMessages = { orchestratorMessages.Select(_options.DataConverter.Serialize) },
25✔
80
                TimerMessages = { timerMessages.Select(_options.DataConverter.Serialize) },
25✔
81
                ContinuedAsNewMessage = continuedAsNewMessage == null
25✔
82
                    ? string.Empty
25✔
83
                    : _options.DataConverter.Serialize(continuedAsNewMessage),
25✔
84
                OrchestrationState = _options.DataConverter.Serialize(orchestrationState)
25✔
85
            }
25✔
86
        };
25✔
87

88
        await _stream.RequestStream.WriteAsync(request);
25✔
89

90
        var cts = new CancellationTokenSource(_completeResponseTimeout);
25✔
91

92
        if (!await _stream.ResponseStream.MoveNext(cts.Token))
25✔
93
            throw new Exception("Session aborted");
×
94

95
        if (_stream.ResponseStream.Current.MessageCase != TaskOrchestrationResponse.MessageOneofCase.CompleteResponse)
25✔
96
            throw new Exception("Unexpected response");
×
97
    }
98

99
    public async Task<IList<TaskMessage>> FetchNewOrchestrationMessagesAsync(
100
        TaskOrchestrationWorkItem workItem)
101
    {
102
        var request = new TaskOrchestrationRequest
11✔
103
        {
11✔
104
            FetchRequest = new FetchNewOrchestrationMessagesRequest()
11✔
105
        };
11✔
106

107
        await _stream.RequestStream.WriteAsync(request);
11✔
108

109
        var cts = new CancellationTokenSource(_fetchResponseTimeout);
11✔
110

111
        if (!await _stream.ResponseStream.MoveNext(cts.Token))
11✔
112
            throw new Exception("Session aborted");
×
113

114
        if (_stream.ResponseStream.Current.MessageCase != TaskOrchestrationResponse.MessageOneofCase.FetchResponse)
11✔
115
            throw new Exception("Unexpected response");
×
116

117
        var fetchResponse = _stream.ResponseStream.Current.FetchResponse;
11✔
118
        if (fetchResponse.NewMessages == null)
11✔
119
            return null;
×
120

121
        return fetchResponse.NewMessages.Messages
11✔
122
            .Select(x => _options.DataConverter.Deserialize<TaskMessage>(x))
33✔
123
            .ToArray();
11✔
124
    }
125

126
    public async Task Abandon(TaskOrchestrationWorkItem workItem)
127
    {
128
        var request = new TaskOrchestrationRequest
×
129
        {
×
130
            AbandonRequest = new AbandonTaskOrchestrationWorkItemLockRequest()
×
131
        };
×
132

133
        await _stream.RequestStream.WriteAsync(request);
×
134

135
        var cts = new CancellationTokenSource(_abandonResponseTimeout);
×
136

137
        if (!await _stream.ResponseStream.MoveNext(cts.Token))
×
138
            throw new Exception("Session aborted");
×
139

140
        if (_stream.ResponseStream.Current.MessageCase != TaskOrchestrationResponse.MessageOneofCase.AbandonResponse)
×
141
            throw new Exception("Unexpected response");
×
142
    }
143

144
    public async Task Release(TaskOrchestrationWorkItem workItem)
145
    {
146
        var request = new TaskOrchestrationRequest
14✔
147
        {
14✔
148
            ReleaseRequest = new ReleaseTaskOrchestrationWorkItemRequest()
14✔
149
        };
14✔
150

151
        await _stream.RequestStream.WriteAsync(request);
14✔
152

153
        var cts = new CancellationTokenSource(_releaseResponseTimeout);
14✔
154

155
        if (!await _stream.ResponseStream.MoveNext(cts.Token))
14✔
156
            throw new Exception("Session aborted");
×
157

158
        if (_stream.ResponseStream.Current.MessageCase != TaskOrchestrationResponse.MessageOneofCase.ReleaseResponse)
14✔
159
            throw new Exception("Unexpected response");
×
160

161
        await _stream.RequestStream.CompleteAsync();
14✔
162

163
        // Last read to close stream
164
        await _stream.ResponseStream.MoveNext(cts.Token);
14✔
165

166
        _stream.Dispose();
14✔
167
    }
168
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc