• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lucaslorentz / durabletask-extensions / 5836038789

pending completion
5836038789

push

github

lucaslorentz
Add husky and apply some code fixes

2502 of 2502 new or added lines in 91 files covered. (100.0%)

2295 of 2792 relevant lines covered (82.2%)

143.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.44
/src/LLL.DurableTask.Server.Grpc.Client/GrpcClientOrchestrationSession.cs
1
using System;
2
using System.Collections.Generic;
3
using System.Linq;
4
using System.Threading;
5
using System.Threading.Tasks;
6
using DurableTask.Core;
7
using DurableTaskGrpc;
8
using Microsoft.Extensions.Logging;
9
using TaskOrchestrationStream = Grpc.Core.AsyncDuplexStreamingCall<DurableTaskGrpc.TaskOrchestrationRequest, DurableTaskGrpc.TaskOrchestrationResponse>;
10
using TaskOrchestrationWorkItem = DurableTask.Core.TaskOrchestrationWorkItem;
11

12
namespace LLL.DurableTask.Server.Client;
13

14
public class GrpcClientOrchestrationSession : IOrchestrationSession
15
{
16
    private static readonly TimeSpan _renewResponseTimeout = TimeSpan.FromSeconds(20);
1✔
17
    private static readonly TimeSpan _completeResponseTimeout = TimeSpan.FromSeconds(20);
1✔
18
    private static readonly TimeSpan _fetchResponseTimeout = TimeSpan.FromHours(1);
1✔
19
    private static readonly TimeSpan _releaseResponseTimeout = TimeSpan.FromSeconds(20);
1✔
20
    private static readonly TimeSpan _abandonResponseTimeout = TimeSpan.FromSeconds(20);
1✔
21

22
    private readonly GrpcClientOrchestrationServiceOptions _options;
23
    private readonly TaskOrchestrationStream _stream;
24
    private readonly ILogger _logger;
25

26
    public GrpcClientOrchestrationSession(
16✔
27
        GrpcClientOrchestrationServiceOptions options,
16✔
28
        TaskOrchestrationStream stream,
16✔
29
        ILogger logger)
16✔
30
    {
31
        _options = options;
16✔
32
        _stream = stream;
16✔
33
        _logger = logger;
16✔
34
    }
35

36
    public async Task Renew(TaskOrchestrationWorkItem workItem)
37
    {
38
        var request = new TaskOrchestrationRequest
11✔
39
        {
11✔
40
            RenewRequest = new RenewTaskOrchestrationWorkItemLockRequest()
11✔
41
        };
11✔
42

43
        await _stream.RequestStream.WriteAsync(request);
11✔
44

45
        var cts = new CancellationTokenSource(_renewResponseTimeout);
11✔
46

47
        if (!await _stream.ResponseStream.MoveNext(cts.Token))
11✔
48
            throw new Exception("Session aborted");
×
49

50
        if (_stream.ResponseStream.Current.MessageCase != TaskOrchestrationResponse.MessageOneofCase.RenewResponse)
11✔
51
            throw new Exception("Unexpected response");
×
52

53
        var renewResponse = _stream.ResponseStream.Current.RenewResponse;
11✔
54
        workItem.LockedUntilUtc = renewResponse.LockedUntilUtc.ToDateTime();
11✔
55
    }
56

57
    public async Task Complete(
58
        TaskOrchestrationWorkItem workItem,
59
        OrchestrationRuntimeState newOrchestrationRuntimeState,
60
        IList<TaskMessage> outboundMessages,
61
        IList<TaskMessage> orchestratorMessages,
62
        IList<TaskMessage> timerMessages,
63
        TaskMessage continuedAsNewMessage,
64
        OrchestrationState orchestrationState)
65
    {
66
        var request = new TaskOrchestrationRequest
28✔
67
        {
28✔
68
            CompleteRequest = new CompleteTaskOrchestrationWorkItemRequest
28✔
69
            {
28✔
70
                NewEvents = { workItem.OrchestrationRuntimeState.NewEvents.Select(_options.DataConverter.Serialize) },
28✔
71
                NewStatus = workItem.OrchestrationRuntimeState.Status,
28✔
72
                NewOrchestrationEvents = { newOrchestrationRuntimeState == workItem.OrchestrationRuntimeState
28✔
73
                    ? Enumerable.Empty<string>()
28✔
74
                    : newOrchestrationRuntimeState.NewEvents.Select(_options.DataConverter.Serialize) },
28✔
75
                NewOrchestrationStatus = newOrchestrationRuntimeState == workItem.OrchestrationRuntimeState
28✔
76
                    ? null
28✔
77
                    : newOrchestrationRuntimeState.Status,
28✔
78
                OutboundMessages = { outboundMessages.Select(_options.DataConverter.Serialize) },
28✔
79
                OrchestratorMessages = { orchestratorMessages.Select(_options.DataConverter.Serialize) },
28✔
80
                TimerMessages = { timerMessages.Select(_options.DataConverter.Serialize) },
28✔
81
                ContinuedAsNewMessage = continuedAsNewMessage == null
28✔
82
                    ? string.Empty
28✔
83
                    : _options.DataConverter.Serialize(continuedAsNewMessage),
28✔
84
                OrchestrationState = _options.DataConverter.Serialize(orchestrationState)
28✔
85
            }
28✔
86
        };
28✔
87

88
        await _stream.RequestStream.WriteAsync(request);
28✔
89

90
        var cts = new CancellationTokenSource(_completeResponseTimeout);
28✔
91

92
        if (!await _stream.ResponseStream.MoveNext(cts.Token))
28✔
93
            throw new Exception("Session aborted");
×
94

95
        if (_stream.ResponseStream.Current.MessageCase != TaskOrchestrationResponse.MessageOneofCase.CompleteResponse)
27✔
96
            throw new Exception("Unexpected response");
×
97
    }
98

99
    public async Task<IList<TaskMessage>> FetchNewOrchestrationMessagesAsync(
100
        TaskOrchestrationWorkItem workItem)
101
    {
102
        var request = new TaskOrchestrationRequest
12✔
103
        {
12✔
104
            FetchRequest = new FetchNewOrchestrationMessagesRequest()
12✔
105
        };
12✔
106

107
        await _stream.RequestStream.WriteAsync(request);
12✔
108

109
        var cts = new CancellationTokenSource(_fetchResponseTimeout);
12✔
110

111
        if (!await _stream.ResponseStream.MoveNext(cts.Token))
12✔
112
            throw new Exception("Session aborted");
×
113

114
        if (_stream.ResponseStream.Current.MessageCase != TaskOrchestrationResponse.MessageOneofCase.FetchResponse)
12✔
115
            throw new Exception("Unexpected response");
×
116

117
        var fetchResponse = _stream.ResponseStream.Current.FetchResponse;
12✔
118
        if (fetchResponse.NewMessages == null)
12✔
119
            return null;
×
120

121
        return fetchResponse.NewMessages.Messages
12✔
122
            .Select(x => _options.DataConverter.Deserialize<TaskMessage>(x))
35✔
123
            .ToArray();
12✔
124
    }
125

126
    public async Task Abandon(TaskOrchestrationWorkItem workItem)
127
    {
128
        var request = new TaskOrchestrationRequest
1✔
129
        {
1✔
130
            AbandonRequest = new AbandonTaskOrchestrationWorkItemLockRequest()
1✔
131
        };
1✔
132

133
        await _stream.RequestStream.WriteAsync(request);
1✔
134

135
        var cts = new CancellationTokenSource(_abandonResponseTimeout);
×
136

137
        if (!await _stream.ResponseStream.MoveNext(cts.Token))
×
138
            throw new Exception("Session aborted");
×
139

140
        if (_stream.ResponseStream.Current.MessageCase != TaskOrchestrationResponse.MessageOneofCase.AbandonResponse)
×
141
            throw new Exception("Unexpected response");
×
142
    }
143

144
    public async Task Release(TaskOrchestrationWorkItem workItem)
145
    {
146
        var request = new TaskOrchestrationRequest
16✔
147
        {
16✔
148
            ReleaseRequest = new ReleaseTaskOrchestrationWorkItemRequest()
16✔
149
        };
16✔
150

151
        await _stream.RequestStream.WriteAsync(request);
16✔
152

153
        var cts = new CancellationTokenSource(_releaseResponseTimeout);
15✔
154

155
        if (!await _stream.ResponseStream.MoveNext(cts.Token))
15✔
156
            throw new Exception("Session aborted");
×
157

158
        if (_stream.ResponseStream.Current.MessageCase != TaskOrchestrationResponse.MessageOneofCase.ReleaseResponse)
15✔
159
            throw new Exception("Unexpected response");
×
160

161
        await _stream.RequestStream.CompleteAsync();
15✔
162

163
        // Last read to close stream
164
        await _stream.ResponseStream.MoveNext(cts.Token);
15✔
165

166
        _stream.Dispose();
15✔
167
    }
168
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc