• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lucaslorentz / durabletask-extensions / 5835751495

pending completion
5835751495

push

github

lucaslorentz
Add husky and apply some code fixes

2502 of 2502 new or added lines in 91 files covered. (100.0%)

2286 of 2792 relevant lines covered (81.88%)

143.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.89
/src/LLL.DurableTask.Server.Grpc.Client/GrpcClientOrchestrationSession.cs
1
using System;
2
using System.Collections.Generic;
3
using System.Linq;
4
using System.Threading;
5
using System.Threading.Tasks;
6
using DurableTask.Core;
7
using DurableTaskGrpc;
8
using Microsoft.Extensions.Logging;
9
using TaskOrchestrationStream = Grpc.Core.AsyncDuplexStreamingCall<DurableTaskGrpc.TaskOrchestrationRequest, DurableTaskGrpc.TaskOrchestrationResponse>;
10
using TaskOrchestrationWorkItem = DurableTask.Core.TaskOrchestrationWorkItem;
11

12
namespace LLL.DurableTask.Server.Client;
13

14
public class GrpcClientOrchestrationSession : IOrchestrationSession
15
{
16
    private static readonly TimeSpan _renewResponseTimeout = TimeSpan.FromSeconds(20);
1✔
17
    private static readonly TimeSpan _completeResponseTimeout = TimeSpan.FromSeconds(20);
1✔
18
    private static readonly TimeSpan _fetchResponseTimeout = TimeSpan.FromHours(1);
1✔
19
    private static readonly TimeSpan _releaseResponseTimeout = TimeSpan.FromSeconds(20);
1✔
20
    private static readonly TimeSpan _abandonResponseTimeout = TimeSpan.FromSeconds(20);
1✔
21

22
    private readonly GrpcClientOrchestrationServiceOptions _options;
23
    private readonly TaskOrchestrationStream _stream;
24
    private readonly ILogger _logger;
25

26
    public GrpcClientOrchestrationSession(
15✔
27
        GrpcClientOrchestrationServiceOptions options,
15✔
28
        TaskOrchestrationStream stream,
15✔
29
        ILogger logger)
15✔
30
    {
31
        _options = options;
15✔
32
        _stream = stream;
15✔
33
        _logger = logger;
15✔
34
    }
35

36
    public async Task Renew(TaskOrchestrationWorkItem workItem)
37
    {
38
        var request = new TaskOrchestrationRequest
11✔
39
        {
11✔
40
            RenewRequest = new RenewTaskOrchestrationWorkItemLockRequest()
11✔
41
        };
11✔
42

43
        await _stream.RequestStream.WriteAsync(request);
11✔
44

45
        var cts = new CancellationTokenSource(_renewResponseTimeout);
11✔
46

47
        if (!await _stream.ResponseStream.MoveNext(cts.Token))
11✔
48
            throw new Exception("Session aborted");
×
49

50
        if (_stream.ResponseStream.Current.MessageCase != TaskOrchestrationResponse.MessageOneofCase.RenewResponse)
11✔
51
            throw new Exception("Unexpected response");
×
52

53
        var renewResponse = _stream.ResponseStream.Current.RenewResponse;
11✔
54
        workItem.LockedUntilUtc = renewResponse.LockedUntilUtc.ToDateTime();
11✔
55
    }
56

57
    public async Task Complete(
58
        TaskOrchestrationWorkItem workItem,
59
        OrchestrationRuntimeState newOrchestrationRuntimeState,
60
        IList<TaskMessage> outboundMessages,
61
        IList<TaskMessage> orchestratorMessages,
62
        IList<TaskMessage> timerMessages,
63
        TaskMessage continuedAsNewMessage,
64
        OrchestrationState orchestrationState)
65
    {
66
        var request = new TaskOrchestrationRequest
26✔
67
        {
26✔
68
            CompleteRequest = new CompleteTaskOrchestrationWorkItemRequest
26✔
69
            {
26✔
70
                NewEvents = { workItem.OrchestrationRuntimeState.NewEvents.Select(_options.DataConverter.Serialize) },
26✔
71
                NewStatus = workItem.OrchestrationRuntimeState.Status,
26✔
72
                NewOrchestrationEvents = { newOrchestrationRuntimeState == workItem.OrchestrationRuntimeState
26✔
73
                    ? Enumerable.Empty<string>()
26✔
74
                    : newOrchestrationRuntimeState.NewEvents.Select(_options.DataConverter.Serialize) },
26✔
75
                NewOrchestrationStatus = newOrchestrationRuntimeState == workItem.OrchestrationRuntimeState
26✔
76
                    ? null
26✔
77
                    : newOrchestrationRuntimeState.Status,
26✔
78
                OutboundMessages = { outboundMessages.Select(_options.DataConverter.Serialize) },
26✔
79
                OrchestratorMessages = { orchestratorMessages.Select(_options.DataConverter.Serialize) },
26✔
80
                TimerMessages = { timerMessages.Select(_options.DataConverter.Serialize) },
26✔
81
                ContinuedAsNewMessage = continuedAsNewMessage == null
26✔
82
                    ? string.Empty
26✔
83
                    : _options.DataConverter.Serialize(continuedAsNewMessage),
26✔
84
                OrchestrationState = _options.DataConverter.Serialize(orchestrationState)
26✔
85
            }
26✔
86
        };
26✔
87

88
        await _stream.RequestStream.WriteAsync(request);
26✔
89

90
        var cts = new CancellationTokenSource(_completeResponseTimeout);
26✔
91

92
        if (!await _stream.ResponseStream.MoveNext(cts.Token))
26✔
93
            throw new Exception("Session aborted");
×
94

95
        if (_stream.ResponseStream.Current.MessageCase != TaskOrchestrationResponse.MessageOneofCase.CompleteResponse)
26✔
96
            throw new Exception("Unexpected response");
×
97
    }
98

99
    public async Task<IList<TaskMessage>> FetchNewOrchestrationMessagesAsync(
100
        TaskOrchestrationWorkItem workItem)
101
    {
102
        var request = new TaskOrchestrationRequest
11✔
103
        {
11✔
104
            FetchRequest = new FetchNewOrchestrationMessagesRequest()
11✔
105
        };
11✔
106

107
        await _stream.RequestStream.WriteAsync(request);
11✔
108

109
        var cts = new CancellationTokenSource(_fetchResponseTimeout);
11✔
110

111
        if (!await _stream.ResponseStream.MoveNext(cts.Token))
11✔
112
            throw new Exception("Session aborted");
×
113

114
        if (_stream.ResponseStream.Current.MessageCase != TaskOrchestrationResponse.MessageOneofCase.FetchResponse)
11✔
115
            throw new Exception("Unexpected response");
×
116

117
        var fetchResponse = _stream.ResponseStream.Current.FetchResponse;
11✔
118
        if (fetchResponse.NewMessages == null)
11✔
119
            return null;
×
120

121
        return fetchResponse.NewMessages.Messages
11✔
122
            .Select(x => _options.DataConverter.Deserialize<TaskMessage>(x))
33✔
123
            .ToArray();
11✔
124
    }
125

126
    public async Task Abandon(TaskOrchestrationWorkItem workItem)
127
    {
128
        var request = new TaskOrchestrationRequest
×
129
        {
×
130
            AbandonRequest = new AbandonTaskOrchestrationWorkItemLockRequest()
×
131
        };
×
132

133
        await _stream.RequestStream.WriteAsync(request);
×
134

135
        var cts = new CancellationTokenSource(_abandonResponseTimeout);
×
136

137
        if (!await _stream.ResponseStream.MoveNext(cts.Token))
×
138
            throw new Exception("Session aborted");
×
139

140
        if (_stream.ResponseStream.Current.MessageCase != TaskOrchestrationResponse.MessageOneofCase.AbandonResponse)
×
141
            throw new Exception("Unexpected response");
×
142
    }
143

144
    public async Task Release(TaskOrchestrationWorkItem workItem)
145
    {
146
        var request = new TaskOrchestrationRequest
15✔
147
        {
15✔
148
            ReleaseRequest = new ReleaseTaskOrchestrationWorkItemRequest()
15✔
149
        };
15✔
150

151
        await _stream.RequestStream.WriteAsync(request);
15✔
152

153
        var cts = new CancellationTokenSource(_releaseResponseTimeout);
15✔
154

155
        if (!await _stream.ResponseStream.MoveNext(cts.Token))
15✔
156
            throw new Exception("Session aborted");
×
157

158
        if (_stream.ResponseStream.Current.MessageCase != TaskOrchestrationResponse.MessageOneofCase.ReleaseResponse)
15✔
159
            throw new Exception("Unexpected response");
×
160

161
        await _stream.RequestStream.CompleteAsync();
15✔
162

163
        // Last read to close stream
164
        await _stream.ResponseStream.MoveNext(cts.Token);
15✔
165

166
        _stream.Dispose();
15✔
167
    }
168
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc