• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

lucaslorentz / durabletask-extensions / 4432709013

pending completion
4432709013

push

github

Add support for registering activity methods from interfaces

21 of 21 new or added lines in 1 file covered. (100.0%)

2283 of 2775 relevant lines covered (82.27%)

143.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.89
/src/LLL.DurableTask.Server.Grpc.Client/GrpcClientOrchestrationSession.cs
1
using System;
2
using System.Collections.Generic;
3
using System.Linq;
4
using System.Threading;
5
using System.Threading.Tasks;
6
using DurableTask.Core;
7
using DurableTaskGrpc;
8
using Microsoft.Extensions.Logging;
9
using TaskOrchestrationStream = Grpc.Core.AsyncDuplexStreamingCall<DurableTaskGrpc.TaskOrchestrationRequest, DurableTaskGrpc.TaskOrchestrationResponse>;
10
using TaskOrchestrationWorkItem = DurableTask.Core.TaskOrchestrationWorkItem;
11

12
namespace LLL.DurableTask.Server.Client
13
{
14
    public class GrpcClientOrchestrationSession : IOrchestrationSession
15
    {
16
        private static readonly TimeSpan _renewResponseTimeout = TimeSpan.FromSeconds(20);
1✔
17
        private static readonly TimeSpan _completeResponseTimeout = TimeSpan.FromSeconds(20);
1✔
18
        private static readonly TimeSpan _fetchResponseTimeout = TimeSpan.FromHours(1);
1✔
19
        private static readonly TimeSpan _releaseResponseTimeout = TimeSpan.FromSeconds(20);
1✔
20
        private static readonly TimeSpan _abandonResponseTimeout = TimeSpan.FromSeconds(20);
1✔
21

22
        private readonly GrpcClientOrchestrationServiceOptions _options;
23
        private readonly TaskOrchestrationStream _stream;
24
        private readonly ILogger _logger;
25

26
        public GrpcClientOrchestrationSession(
15✔
27
            GrpcClientOrchestrationServiceOptions options,
15✔
28
            TaskOrchestrationStream stream,
15✔
29
            ILogger logger)
15✔
30
        {
31
            _options = options;
15✔
32
            _stream = stream;
15✔
33
            _logger = logger;
15✔
34
        }
35

36
        public async Task Renew(TaskOrchestrationWorkItem workItem)
37
        {
38
            var request = new TaskOrchestrationRequest
11✔
39
            {
11✔
40
                RenewRequest = new RenewTaskOrchestrationWorkItemLockRequest()
11✔
41
            };
11✔
42

43
            await _stream.RequestStream.WriteAsync(request);
11✔
44

45
            var cts = new CancellationTokenSource(_renewResponseTimeout);
11✔
46

47
            if (!await _stream.ResponseStream.MoveNext(cts.Token))
11✔
48
                throw new Exception("Session aborted");
×
49

50
            if (_stream.ResponseStream.Current.MessageCase != TaskOrchestrationResponse.MessageOneofCase.RenewResponse)
11✔
51
                throw new Exception("Unexpected response");
×
52

53
            var renewResponse = _stream.ResponseStream.Current.RenewResponse;
11✔
54
            workItem.LockedUntilUtc = renewResponse.LockedUntilUtc.ToDateTime();
11✔
55
        }
56

57
        public async Task Complete(
58
            TaskOrchestrationWorkItem workItem,
59
            OrchestrationRuntimeState newOrchestrationRuntimeState,
60
            IList<TaskMessage> outboundMessages,
61
            IList<TaskMessage> orchestratorMessages,
62
            IList<TaskMessage> timerMessages,
63
            TaskMessage continuedAsNewMessage,
64
            OrchestrationState orchestrationState)
65
        {
66
            var request = new TaskOrchestrationRequest
25✔
67
            {
25✔
68
                CompleteRequest = new CompleteTaskOrchestrationWorkItemRequest
25✔
69
                {
25✔
70
                    NewEvents = { workItem.OrchestrationRuntimeState.NewEvents.Select(_options.DataConverter.Serialize) },
25✔
71
                    NewStatus = workItem.OrchestrationRuntimeState.Status,
25✔
72
                    NewOrchestrationEvents = { newOrchestrationRuntimeState == workItem.OrchestrationRuntimeState
25✔
73
                        ? Enumerable.Empty<string>()
25✔
74
                        : newOrchestrationRuntimeState.NewEvents.Select(_options.DataConverter.Serialize) },
25✔
75
                    NewOrchestrationStatus = newOrchestrationRuntimeState == workItem.OrchestrationRuntimeState
25✔
76
                        ? null
25✔
77
                        : newOrchestrationRuntimeState.Status,
25✔
78
                    OutboundMessages = { outboundMessages.Select(_options.DataConverter.Serialize) },
25✔
79
                    OrchestratorMessages = { orchestratorMessages.Select(_options.DataConverter.Serialize) },
25✔
80
                    TimerMessages = { timerMessages.Select(_options.DataConverter.Serialize) },
25✔
81
                    ContinuedAsNewMessage = continuedAsNewMessage == null
25✔
82
                        ? string.Empty
25✔
83
                        : _options.DataConverter.Serialize(continuedAsNewMessage),
25✔
84
                    OrchestrationState = _options.DataConverter.Serialize(orchestrationState)
25✔
85
                }
25✔
86
            };
25✔
87

88
            await _stream.RequestStream.WriteAsync(request);
25✔
89

90
            var cts = new CancellationTokenSource(_completeResponseTimeout);
25✔
91

92
            if (!await _stream.ResponseStream.MoveNext(cts.Token))
25✔
93
                throw new Exception("Session aborted");
×
94

95
            if (_stream.ResponseStream.Current.MessageCase != TaskOrchestrationResponse.MessageOneofCase.CompleteResponse)
25✔
96
                throw new Exception("Unexpected response");
×
97
        }
98

99
        public async Task<IList<TaskMessage>> FetchNewOrchestrationMessagesAsync(
100
            TaskOrchestrationWorkItem workItem)
101
        {
102
            var request = new TaskOrchestrationRequest
11✔
103
            {
11✔
104
                FetchRequest = new FetchNewOrchestrationMessagesRequest()
11✔
105
            };
11✔
106

107
            await _stream.RequestStream.WriteAsync(request);
11✔
108

109
            var cts = new CancellationTokenSource(_fetchResponseTimeout);
11✔
110

111
            if (!await _stream.ResponseStream.MoveNext(cts.Token))
11✔
112
                throw new Exception("Session aborted");
×
113

114
            if (_stream.ResponseStream.Current.MessageCase != TaskOrchestrationResponse.MessageOneofCase.FetchResponse)
11✔
115
                throw new Exception("Unexpected response");
×
116

117
            var fetchResponse = _stream.ResponseStream.Current.FetchResponse;
11✔
118
            if (fetchResponse.NewMessages == null)
11✔
119
                return null;
×
120

121
            return fetchResponse.NewMessages.Messages
11✔
122
                .Select(x => _options.DataConverter.Deserialize<TaskMessage>(x))
33✔
123
                .ToArray();
11✔
124
        }
125

126
        public async Task Abandon(TaskOrchestrationWorkItem workItem)
127
        {
128
            var request = new TaskOrchestrationRequest
×
129
            {
×
130
                AbandonRequest = new AbandonTaskOrchestrationWorkItemLockRequest()
×
131
            };
×
132

133
            await _stream.RequestStream.WriteAsync(request);
×
134

135
            var cts = new CancellationTokenSource(_abandonResponseTimeout);
×
136

137
            if (!await _stream.ResponseStream.MoveNext(cts.Token))
×
138
                throw new Exception("Session aborted");
×
139

140
            if (_stream.ResponseStream.Current.MessageCase != TaskOrchestrationResponse.MessageOneofCase.AbandonResponse)
×
141
                throw new Exception("Unexpected response");
×
142
        }
143

144
        public async Task Release(TaskOrchestrationWorkItem workItem)
145
        {
146
            var request = new TaskOrchestrationRequest
15✔
147
            {
15✔
148
                ReleaseRequest = new ReleaseTaskOrchestrationWorkItemRequest()
15✔
149
            };
15✔
150

151
            await _stream.RequestStream.WriteAsync(request);
15✔
152

153
            var cts = new CancellationTokenSource(_releaseResponseTimeout);
14✔
154

155
            if (!await _stream.ResponseStream.MoveNext(cts.Token))
14✔
156
                throw new Exception("Session aborted");
×
157

158
            if (_stream.ResponseStream.Current.MessageCase != TaskOrchestrationResponse.MessageOneofCase.ReleaseResponse)
14✔
159
                throw new Exception("Unexpected response");
×
160

161
            await _stream.RequestStream.CompleteAsync();
14✔
162

163
            // Last read to close stream
164
            await _stream.ResponseStream.MoveNext(cts.Token);
14✔
165

166
            _stream.Dispose();
14✔
167
        }
168
    }
169
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc