• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber-go / cadence-client / 018e9ff9-d8ed-403a-8c3a-5ffe0c57f644

02 Apr 2024 06:02PM UTC coverage: 64.75% (+0.08%) from 64.67%
018e9ff9-d8ed-403a-8c3a-5ffe0c57f644

push

buildkite

web-flow
Update client wrappers with new async APIs (#1327)

182 of 242 new or added lines in 2 files covered. (75.21%)

3 existing lines in 2 files now uncovered.

13389 of 20678 relevant lines covered (64.75%)

314.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.68
/internal/internal_workflow_client.go
1
// Copyright (c) 2017-2020 Uber Technologies Inc.
2
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
3
//
4
// Permission is hereby granted, free of charge, to any person obtaining a copy
5
// of this software and associated documentation files (the "Software"), to deal
6
// in the Software without restriction, including without limitation the rights
7
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8
// copies of the Software, and to permit persons to whom the Software is
9
// furnished to do so, subject to the following conditions:
10
//
11
// The above copyright notice and this permission notice shall be included in
12
// all copies or substantial portions of the Software.
13
//
14
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20
// THE SOFTWARE.
21

22
package internal
23

24
import (
25
        "context"
26
        "encoding/json"
27
        "errors"
28
        "fmt"
29
        "reflect"
30
        "time"
31

32
        "go.uber.org/cadence/internal/common/serializer"
33

34
        "github.com/opentracing/opentracing-go"
35
        "github.com/pborman/uuid"
36
        "github.com/uber-go/tally"
37

38
        "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
39
        s "go.uber.org/cadence/.gen/go/shared"
40
        "go.uber.org/cadence/internal/common"
41
        "go.uber.org/cadence/internal/common/backoff"
42
        "go.uber.org/cadence/internal/common/metrics"
43
)
44

45
// Assert that structs do indeed implement the interfaces
46
var _ Client = (*workflowClient)(nil)
47
var _ DomainClient = (*domainClient)(nil)
48

49
const (
50
        defaultDecisionTaskTimeoutInSecs = 10
51
        defaultGetHistoryTimeoutInSecs   = 25
52
)
53

54
var (
55
        maxListArchivedWorkflowTimeout = time.Minute * 3
56
)
57

58
type (
59
        // workflowClient is the client for starting a workflow execution.
60
        workflowClient struct {
61
                workflowService    workflowserviceclient.Interface
62
                domain             string
63
                registry           *registry
64
                metricsScope       *metrics.TaggedScope
65
                identity           string
66
                dataConverter      DataConverter
67
                contextPropagators []ContextPropagator
68
                tracer             opentracing.Tracer
69
                featureFlags       FeatureFlags
70
        }
71

72
        // domainClient is the client for managing domains.
73
        domainClient struct {
74
                workflowService workflowserviceclient.Interface
75
                metricsScope    tally.Scope
76
                identity        string
77
                featureFlags    FeatureFlags
78
        }
79

80
        // WorkflowRun represents a started non child workflow
81
        WorkflowRun interface {
82
                // GetID return workflow ID, which will be same as StartWorkflowOptions.ID if provided.
83
                GetID() string
84

85
                // GetRunID return the first started workflow run ID (please see below)
86
                GetRunID() string
87

88
                // Get will fill the workflow execution result to valuePtr,
89
                // if workflow execution is a success, or return corresponding,
90
                // error. This is a blocking API.
91
                Get(ctx context.Context, valuePtr interface{}) error
92

93
                // NOTE: if the started workflow return ContinueAsNewError during the workflow execution, the
94
                // return result of GetRunID() will be the started workflow run ID, not the new run ID caused by ContinueAsNewError,
95
                // however, Get(ctx context.Context, valuePtr interface{}) will return result from the run which did not return ContinueAsNewError.
96
                // Say ExecuteWorkflow started a workflow, in its first run, has run ID "run ID 1", and returned ContinueAsNewError,
97
                // the second run has run ID "run ID 2" and return some result other than ContinueAsNewError:
98
                // GetRunID() will always return "run ID 1" and  Get(ctx context.Context, valuePtr interface{}) will return the result of second run.
99
                // NOTE: DO NOT USE client.ExecuteWorkflow API INSIDE A WORKFLOW, USE workflow.ExecuteChildWorkflow instead
100
        }
101

102
        // workflowRunImpl is an implementation of WorkflowRun
103
        workflowRunImpl struct {
104
                workflowFn    interface{}
105
                workflowID    string
106
                firstRunID    string
107
                currentRunID  string
108
                iterFn        func(ctx context.Context, runID string) HistoryEventIterator
109
                dataConverter DataConverter
110
                registry      *registry
111
        }
112

113
        // HistoryEventIterator represents the interface for
114
        // history event iterator
115
        HistoryEventIterator interface {
116
                // HasNext return whether this iterator has next value
117
                HasNext() bool
118
                // Next returns the next history events and error
119
                // The errors it can return:
120
                //        - EntityNotExistsError
121
                //        - BadRequestError
122
                //        - InternalServiceError
123
                Next() (*s.HistoryEvent, error)
124
        }
125

126
        // historyEventIteratorImpl is the implementation of HistoryEventIterator
127
        historyEventIteratorImpl struct {
128
                // whether this iterator is initialized
129
                initialized bool
130
                // local cached history events and corresponding consuming index
131
                nextEventIndex int
132
                events         []*s.HistoryEvent
133
                // token to get next page of history events
134
                nexttoken []byte
135
                // err when getting next page of history events
136
                err error
137
                // func which use a next token to get next page of history events
138
                paginate func(nexttoken []byte) (*s.GetWorkflowExecutionHistoryResponse, error)
139
        }
140
)
141

142
// StartWorkflow starts a workflow execution
143
// The user can use this to start using a functor like.
144
// Either by
145
//
146
//        StartWorkflow(options, "workflowTypeName", arg1, arg2, arg3)
147
//        or
148
//        StartWorkflow(options, workflowExecuteFn, arg1, arg2, arg3)
149
//
150
// The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is
151
// subjected to change in the future.
152
func (wc *workflowClient) StartWorkflow(
153
        ctx context.Context,
154
        options StartWorkflowOptions,
155
        workflowFunc interface{},
156
        args ...interface{},
157
) (*WorkflowExecution, error) {
114✔
158
        startRequest, err := wc.getWorkflowStartRequest(ctx, "StartWorkflow", options, workflowFunc, args...)
114✔
159
        if err != nil {
118✔
160
                return nil, err
4✔
161
        }
4✔
162

163
        var response *s.StartWorkflowExecutionResponse
110✔
164

110✔
165
        // Start creating workflow request.
110✔
166
        err = backoff.Retry(ctx,
110✔
167
                func() error {
236✔
168
                        tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags)
126✔
169
                        defer cancel()
126✔
170

126✔
171
                        var err1 error
126✔
172
                        response, err1 = wc.workflowService.StartWorkflowExecution(tchCtx, startRequest, opt...)
126✔
173
                        return err1
126✔
174
                }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
126✔
175

176
        if err != nil {
116✔
177
                return nil, err
6✔
178
        }
6✔
179

180
        if wc.metricsScope != nil {
208✔
181
                scope := wc.metricsScope.GetTaggedScope(tagTaskList, options.TaskList, tagWorkflowType, *startRequest.WorkflowType.Name)
104✔
182
                scope.Counter(metrics.WorkflowStartCounter).Inc(1)
104✔
183
        }
104✔
184

185
        executionInfo := &WorkflowExecution{
104✔
186
                ID:    *startRequest.WorkflowId,
104✔
187
                RunID: response.GetRunId(),
104✔
188
        }
104✔
189
        return executionInfo, nil
104✔
190
}
191

192
// StartWorkflowAsync behaves like StartWorkflow except that the request is queued and processed by Cadence backend asynchronously.
193
// See StartWorkflow for details about inputs and usage.
194
func (wc *workflowClient) StartWorkflowAsync(
195
        ctx context.Context,
196
        options StartWorkflowOptions,
197
        workflowFunc interface{},
198
        args ...interface{},
199
) (*WorkflowExecutionAsync, error) {
5✔
200
        startRequest, err := wc.getWorkflowStartRequest(ctx, "StartWorkflowAsync", options, workflowFunc, args...)
5✔
201
        if err != nil {
6✔
202
                return nil, err
1✔
203
        }
1✔
204

205
        asyncStartRequest := &s.StartWorkflowExecutionAsyncRequest{
4✔
206
                Request: startRequest,
4✔
207
        }
4✔
208

4✔
209
        // Start creating workflow request.
4✔
210
        err = backoff.Retry(ctx,
4✔
211
                func() error {
24✔
212
                        tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags)
20✔
213
                        defer cancel()
20✔
214

20✔
215
                        var err1 error
20✔
216
                        _, err1 = wc.workflowService.StartWorkflowExecutionAsync(tchCtx, asyncStartRequest, opt...)
20✔
217
                        return err1
20✔
218
                }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
20✔
219

220
        if err != nil {
5✔
221
                return nil, err
1✔
222
        }
1✔
223

224
        if wc.metricsScope != nil {
6✔
225
                scope := wc.metricsScope.GetTaggedScope(tagTaskList, options.TaskList, tagWorkflowType, *startRequest.WorkflowType.Name)
3✔
226
                scope.Counter(metrics.WorkflowStartAsyncCounter).Inc(1)
3✔
227
        }
3✔
228

229
        executionInfo := &WorkflowExecutionAsync{
3✔
230
                ID: *startRequest.WorkflowId,
3✔
231
        }
3✔
232
        return executionInfo, nil
3✔
233
}
234

235
// ExecuteWorkflow starts a workflow execution and returns a WorkflowRun that will allow you to wait until this workflow
236
// reaches the end state, such as workflow finished successfully or timeout.
237
// The user can use this to start using a functor like below and get the workflow execution result, as Value
238
// Either by
239
//
240
//        ExecuteWorkflow(options, "workflowTypeName", arg1, arg2, arg3)
241
//        or
242
//        ExecuteWorkflow(options, workflowExecuteFn, arg1, arg2, arg3)
243
//
244
// The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is
245
// subjected to change in the future.
246
// NOTE: the context.Context should have a fairly large timeout, since workflow execution may take a while to be finished
247
func (wc *workflowClient) ExecuteWorkflow(ctx context.Context, options StartWorkflowOptions, workflow interface{}, args ...interface{}) (WorkflowRun, error) {
104✔
248

104✔
249
        // start the workflow execution
104✔
250
        var runID string
104✔
251
        var workflowID string
104✔
252
        executionInfo, err := wc.StartWorkflow(ctx, options, workflow, args...)
104✔
253
        if err != nil {
109✔
254
                if alreadyStartedErr, ok := err.(*s.WorkflowExecutionAlreadyStartedError); ok {
7✔
255
                        runID = alreadyStartedErr.GetRunId()
2✔
256
                        // Assumption is that AlreadyStarted is never returned when options.ID is empty as UUID generated by
2✔
257
                        // StartWorkflow is not going to collide ever.
2✔
258
                        workflowID = options.ID
2✔
259
                } else {
5✔
260
                        return nil, err
3✔
261
                }
3✔
262
        } else {
99✔
263
                runID = executionInfo.RunID
99✔
264
                workflowID = executionInfo.ID
99✔
265
        }
99✔
266

267
        iterFn := func(fnCtx context.Context, fnRunID string) HistoryEventIterator {
218✔
268
                return wc.GetWorkflowHistory(fnCtx, workflowID, fnRunID, true, s.HistoryEventFilterTypeCloseEvent)
117✔
269
        }
117✔
270

271
        return &workflowRunImpl{
101✔
272
                workflowFn:    workflow,
101✔
273
                workflowID:    workflowID,
101✔
274
                firstRunID:    runID,
101✔
275
                currentRunID:  runID,
101✔
276
                iterFn:        iterFn,
101✔
277
                dataConverter: wc.dataConverter,
101✔
278
                registry:      wc.registry,
101✔
279
        }, nil
101✔
280
}
281

282
// GetWorkflow gets a workflow execution and returns a WorkflowRun that will allow you to wait until this workflow
283
// reaches the end state, such as workflow finished successfully or timeout.
284
// The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is
285
// subjected to change in the future.
286
func (wc *workflowClient) GetWorkflow(ctx context.Context, workflowID string, runID string) WorkflowRun {
4✔
287

4✔
288
        iterFn := func(fnCtx context.Context, fnRunID string) HistoryEventIterator {
8✔
289
                return wc.GetWorkflowHistory(fnCtx, workflowID, fnRunID, true, s.HistoryEventFilterTypeCloseEvent)
4✔
290
        }
4✔
291

292
        return &workflowRunImpl{
4✔
293
                workflowID:    workflowID,
4✔
294
                firstRunID:    runID,
4✔
295
                currentRunID:  runID,
4✔
296
                iterFn:        iterFn,
4✔
297
                dataConverter: wc.dataConverter,
4✔
298
                registry:      wc.registry,
4✔
299
        }
4✔
300
}
301

302
// SignalWorkflow signals a workflow in execution.
303
func (wc *workflowClient) SignalWorkflow(ctx context.Context, workflowID string, runID string, signalName string, arg interface{}) error {
3✔
304
        input, err := encodeArg(wc.dataConverter, arg)
3✔
305
        if err != nil {
3✔
306
                return err
×
307
        }
×
308
        return signalWorkflow(ctx, wc.workflowService, wc.identity, wc.domain, workflowID, runID, signalName, input, wc.featureFlags)
3✔
309
}
310

311
// SignalWithStartWorkflow sends a signal to a running workflow.
312
// If the workflow is not running or not found, it starts the workflow and then sends the signal in transaction.
313
func (wc *workflowClient) SignalWithStartWorkflow(
314
        ctx context.Context,
315
        workflowID, signalName string,
316
        signalArg interface{},
317
        options StartWorkflowOptions,
318
        workflowFunc interface{},
319
        workflowArgs ...interface{},
320
) (*WorkflowExecution, error) {
7✔
321

7✔
322
        signalWithStartRequest, err := wc.getSignalWithStartRequest(ctx, "SignalWithStartWorkflow", workflowID, signalName, signalArg, options, workflowFunc, workflowArgs...)
7✔
323
        if err != nil {
10✔
324
                return nil, err
3✔
325
        }
3✔
326

327
        var response *s.StartWorkflowExecutionResponse
4✔
328

4✔
329
        // Start creating workflow request.
4✔
330
        err = backoff.Retry(ctx,
4✔
331
                func() error {
8✔
332
                        tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags)
4✔
333
                        defer cancel()
4✔
334

4✔
335
                        var err1 error
4✔
336
                        response, err1 = wc.workflowService.SignalWithStartWorkflowExecution(tchCtx, signalWithStartRequest, opt...)
4✔
337
                        return err1
4✔
338
                }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
4✔
339

340
        if err != nil {
4✔
UNCOV
341
                return nil, err
×
342
        }
×
343

344
        if wc.metricsScope != nil {
8✔
345
                scope := wc.metricsScope.GetTaggedScope(tagTaskList, options.TaskList, tagWorkflowType, *signalWithStartRequest.WorkflowType.Name)
4✔
346
                scope.Counter(metrics.WorkflowSignalWithStartCounter).Inc(1)
4✔
347
        }
4✔
348

349
        executionInfo := &WorkflowExecution{
4✔
350
                ID:    options.ID,
4✔
351
                RunID: response.GetRunId(),
4✔
352
        }
4✔
353
        return executionInfo, nil
4✔
354
}
355

356
// SignalWithStartWorkflowAsync behaves like SignalWithStartWorkflow except that the request is queued and processed by Cadence backend asynchronously.
357
// See SignalWithStartWorkflow for details about inputs and usage.
358
func (wc *workflowClient) SignalWithStartWorkflowAsync(
359
        ctx context.Context,
360
        workflowID, signalName string,
361
        signalArg interface{},
362
        options StartWorkflowOptions,
363
        workflowFunc interface{},
364
        workflowArgs ...interface{},
365
) (*WorkflowExecutionAsync, error) {
3✔
366

3✔
367
        signalWithStartRequest, err := wc.getSignalWithStartRequest(ctx, "SignalWithStartWorkflow", workflowID, signalName, signalArg, options, workflowFunc, workflowArgs...)
3✔
368
        if err != nil {
4✔
369
                return nil, err
1✔
370
        }
1✔
371

372
        asyncSignalWithStartRequest := &s.SignalWithStartWorkflowExecutionAsyncRequest{
2✔
373
                Request: signalWithStartRequest,
2✔
374
        }
2✔
375

2✔
376
        err = backoff.Retry(ctx,
2✔
377
                func() error {
20✔
378
                        tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags)
18✔
379
                        defer cancel()
18✔
380

18✔
381
                        var err1 error
18✔
382
                        _, err1 = wc.workflowService.SignalWithStartWorkflowExecutionAsync(tchCtx, asyncSignalWithStartRequest, opt...)
18✔
383
                        return err1
18✔
384
                }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
18✔
385

386
        if err != nil {
3✔
387
                return nil, err
1✔
388
        }
1✔
389

390
        if wc.metricsScope != nil {
2✔
391
                scope := wc.metricsScope.GetTaggedScope(tagTaskList, options.TaskList, tagWorkflowType, *signalWithStartRequest.WorkflowType.Name)
1✔
392
                scope.Counter(metrics.WorkflowSignalWithStartAsyncCounter).Inc(1)
1✔
393
        }
1✔
394

395
        executionInfo := &WorkflowExecutionAsync{
1✔
396
                ID: options.ID,
1✔
397
        }
1✔
398
        return executionInfo, nil
1✔
399
}
400

401
// CancelWorkflow cancels a workflow in execution.  It allows workflow to properly clean up and gracefully close.
402
// workflowID is required, other parameters are optional.
403
// If runID is omit, it will terminate currently running workflow (if there is one) based on the workflowID.
404
func (wc *workflowClient) CancelWorkflow(ctx context.Context, workflowID string, runID string, opts ...Option) error {
5✔
405
        request := &s.RequestCancelWorkflowExecutionRequest{
5✔
406
                Domain: common.StringPtr(wc.domain),
5✔
407
                WorkflowExecution: &s.WorkflowExecution{
5✔
408
                        WorkflowId: common.StringPtr(workflowID),
5✔
409
                        RunId:      getRunID(runID),
5✔
410
                },
5✔
411
                Identity: common.StringPtr(wc.identity),
5✔
412
        }
5✔
413

5✔
414
        for _, opt := range opts {
6✔
415
                switch o := opt.(type) {
1✔
416
                case CancelReason:
1✔
417
                        cause := string(o)
1✔
418
                        request.Cause = &cause
1✔
419
                }
420
        }
421

422
        return backoff.Retry(ctx,
5✔
423
                func() error {
10✔
424
                        tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags)
5✔
425
                        defer cancel()
5✔
426
                        return wc.workflowService.RequestCancelWorkflowExecution(tchCtx, request, opt...)
5✔
427
                }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
5✔
428
}
429

430
// TerminateWorkflow terminates a workflow execution.
431
// workflowID is required, other parameters are optional.
432
// If runID is omit, it will terminate currently running workflow (if there is one) based on the workflowID.
433
func (wc *workflowClient) TerminateWorkflow(ctx context.Context, workflowID string, runID string, reason string, details []byte) error {
×
434
        request := &s.TerminateWorkflowExecutionRequest{
×
435
                Domain: common.StringPtr(wc.domain),
×
436
                WorkflowExecution: &s.WorkflowExecution{
×
437
                        WorkflowId: common.StringPtr(workflowID),
×
438
                        RunId:      getRunID(runID),
×
439
                },
×
440
                Reason:   common.StringPtr(reason),
×
441
                Details:  details,
×
442
                Identity: common.StringPtr(wc.identity),
×
443
        }
×
444

×
445
        err := backoff.Retry(ctx,
×
446
                func() error {
×
447
                        tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags)
×
448
                        defer cancel()
×
449
                        return wc.workflowService.TerminateWorkflowExecution(tchCtx, request, opt...)
×
450
                }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
×
451

452
        return err
×
453
}
454

455
// GetWorkflowHistory return a channel which contains the history events of a given workflow
456
func (wc *workflowClient) GetWorkflowHistory(
457
        ctx context.Context,
458
        workflowID string,
459
        runID string,
460
        isLongPoll bool,
461
        filterType s.HistoryEventFilterType,
462
) HistoryEventIterator {
125✔
463

125✔
464
        domain := wc.domain
125✔
465
        paginate := func(nextToken []byte) (*s.GetWorkflowExecutionHistoryResponse, error) {
254✔
466
                request := &s.GetWorkflowExecutionHistoryRequest{
129✔
467
                        Domain: common.StringPtr(domain),
129✔
468
                        Execution: &s.WorkflowExecution{
129✔
469
                                WorkflowId: common.StringPtr(workflowID),
129✔
470
                                RunId:      getRunID(runID),
129✔
471
                        },
129✔
472
                        WaitForNewEvent:        common.BoolPtr(isLongPoll),
129✔
473
                        HistoryEventFilterType: &filterType,
129✔
474
                        NextPageToken:          nextToken,
129✔
475
                        SkipArchival:           common.BoolPtr(isLongPoll),
129✔
476
                }
129✔
477

129✔
478
                var response *s.GetWorkflowExecutionHistoryResponse
129✔
479
                var err error
129✔
480
        Loop:
129✔
481
                for {
260✔
482
                        var isFinalLongPoll bool
131✔
483
                        err = backoff.Retry(ctx,
131✔
484
                                func() error {
262✔
485
                                        var err1 error
131✔
486
                                        tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags, func(builder *contextBuilder) {
262✔
487
                                                if isLongPoll {
262✔
488
                                                        builder.Timeout = defaultGetHistoryTimeoutInSecs * time.Second
131✔
489
                                                        deadline, ok := ctx.Deadline()
131✔
490
                                                        if ok && deadline.Before(time.Now().Add(builder.Timeout)) {
240✔
491
                                                                // insufficient time for another poll, so this needs to be the last attempt
109✔
492
                                                                isFinalLongPoll = true
109✔
493
                                                        }
109✔
494
                                                }
495
                                        })
496
                                        defer cancel()
131✔
497
                                        response, err1 = wc.workflowService.GetWorkflowExecutionHistory(tchCtx, request, opt...)
131✔
498

131✔
499
                                        if err1 != nil {
132✔
500
                                                return err1
1✔
501
                                        }
1✔
502

503
                                        if response.RawHistory != nil {
135✔
504
                                                history, err := serializer.DeserializeBlobDataToHistoryEvents(response.RawHistory, filterType)
5✔
505
                                                if err != nil {
5✔
506
                                                        return err
×
507
                                                }
×
508
                                                response.History = history
5✔
509
                                        }
510
                                        return err1
130✔
511
                                },
512
                                createDynamicServiceRetryPolicy(ctx),
513
                                func(err error) bool {
1✔
514
                                        return isServiceTransientError(err) || isEntityNonExistFromPassive(err)
1✔
515
                                },
1✔
516
                        )
517

518
                        if err != nil {
132✔
519
                                return nil, err
1✔
520
                        }
1✔
521
                        if isLongPoll && len(response.History.Events) == 0 && len(response.NextPageToken) != 0 {
133✔
522
                                if isFinalLongPoll {
4✔
523
                                        // essentially a deadline exceeded, the last attempt did not get a result.
1✔
524
                                        // this is necessary because the server does not know if we are able to try again,
1✔
525
                                        // so it returns an empty result slightly before a timeout occurs, so the next
1✔
526
                                        // attempt's token can be returned if it wishes to retry.
1✔
527
                                        return nil, fmt.Errorf("timed out waiting for the workflow to finish: %w", context.DeadlineExceeded)
1✔
528
                                }
1✔
529
                                request.NextPageToken = response.NextPageToken
2✔
530
                                continue Loop
2✔
531
                        }
532
                        break Loop
127✔
533
                }
534
                return response, nil
127✔
535
        }
536

537
        return &historyEventIteratorImpl{
125✔
538
                paginate: paginate,
125✔
539
        }
125✔
540
}
541

542
func isEntityNonExistFromPassive(err error) bool {
1✔
543
        if nonExistError, ok := err.(*s.EntityNotExistsError); ok {
2✔
544
                return nonExistError.GetActiveCluster() != "" &&
1✔
545
                        nonExistError.GetCurrentCluster() != "" &&
1✔
546
                        nonExistError.GetActiveCluster() != nonExistError.GetCurrentCluster()
1✔
547
        }
1✔
548

549
        return false
×
550
}
551

552
// CompleteActivity reports activity completed. activity Execute method can return activity.ErrResultPending to
553
// indicate the activity is not completed when it's Execute method returns. In that case, this CompleteActivity() method
554
// should be called when that activity is completed with the actual result and error. If err is nil, activity task
555
// completed event will be reported; if err is CanceledError, activity task cancelled event will be reported; otherwise,
556
// activity task failed event will be reported.
557
func (wc *workflowClient) CompleteActivity(ctx context.Context, taskToken []byte, result interface{}, err error) error {
6✔
558
        if taskToken == nil {
6✔
559
                return errors.New("invalid task token provided")
×
560
        }
×
561

562
        var data []byte
6✔
563
        if result != nil {
6✔
564
                var err0 error
×
565
                data, err0 = encodeArg(wc.dataConverter, result)
×
566
                if err0 != nil {
×
567
                        return err0
×
568
                }
×
569
        }
570
        request := convertActivityResultToRespondRequest(wc.identity, taskToken, data, err, wc.dataConverter)
6✔
571
        return reportActivityComplete(ctx, wc.workflowService, request, wc.metricsScope, wc.featureFlags)
6✔
572
}
573

574
// CompleteActivityById reports activity completed. Similar to CompleteActivity
575
// It takes domain name, workflowID, runID, activityID as arguments.
576
func (wc *workflowClient) CompleteActivityByID(ctx context.Context, domain, workflowID, runID, activityID string,
577
        result interface{}, err error) error {
3✔
578

3✔
579
        if activityID == "" || workflowID == "" || domain == "" {
3✔
580
                return errors.New("empty activity or workflow id or domainName")
×
581
        }
×
582

583
        var data []byte
3✔
584
        if result != nil {
3✔
585
                var err0 error
×
586
                data, err0 = encodeArg(wc.dataConverter, result)
×
587
                if err0 != nil {
×
588
                        return err0
×
589
                }
×
590
        }
591

592
        request := convertActivityResultToRespondRequestByID(wc.identity, domain, workflowID, runID, activityID, data, err, wc.dataConverter)
3✔
593
        return reportActivityCompleteByID(ctx, wc.workflowService, request, wc.metricsScope, wc.featureFlags)
3✔
594
}
595

596
// RecordActivityHeartbeat records heartbeat for an activity.
597
func (wc *workflowClient) RecordActivityHeartbeat(ctx context.Context, taskToken []byte, details ...interface{}) error {
3✔
598
        data, err := encodeArgs(wc.dataConverter, details)
3✔
599
        if err != nil {
3✔
600
                return err
×
601
        }
×
602
        return recordActivityHeartbeat(ctx, wc.workflowService, wc.identity, taskToken, data, wc.featureFlags)
3✔
603
}
604

605
// RecordActivityHeartbeatByID records heartbeat for an activity.
606
func (wc *workflowClient) RecordActivityHeartbeatByID(ctx context.Context,
607
        domain, workflowID, runID, activityID string, details ...interface{}) error {
2✔
608
        data, err := encodeArgs(wc.dataConverter, details)
2✔
609
        if err != nil {
2✔
610
                return err
×
611
        }
×
612
        return recordActivityHeartbeatByID(ctx, wc.workflowService, wc.identity, domain, workflowID, runID, activityID, data, wc.featureFlags)
2✔
613
}
614

615
// ListClosedWorkflow gets closed workflow executions based on request filters
616
// The errors it can throw:
617
//   - BadRequestError
618
//   - InternalServiceError
619
//   - EntityNotExistError
620
func (wc *workflowClient) ListClosedWorkflow(ctx context.Context, request *s.ListClosedWorkflowExecutionsRequest) (*s.ListClosedWorkflowExecutionsResponse, error) {
×
621
        if len(request.GetDomain()) == 0 {
×
622
                request.Domain = common.StringPtr(wc.domain)
×
623
        }
×
624
        var response *s.ListClosedWorkflowExecutionsResponse
×
625
        err := backoff.Retry(ctx,
×
626
                func() error {
×
627
                        var err1 error
×
628
                        tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags)
×
629
                        defer cancel()
×
630
                        response, err1 = wc.workflowService.ListClosedWorkflowExecutions(tchCtx, request, opt...)
×
631
                        return err1
×
632
                }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
×
633
        if err != nil {
×
634
                return nil, err
×
635
        }
×
636
        return response, nil
×
637
}
638

639
// ListOpenWorkflow gets open workflow executions based on request filters
640
// The errors it can throw:
641
//   - BadRequestError
642
//   - InternalServiceError
643
//   - EntityNotExistError
644
func (wc *workflowClient) ListOpenWorkflow(ctx context.Context, request *s.ListOpenWorkflowExecutionsRequest) (*s.ListOpenWorkflowExecutionsResponse, error) {
×
645
        if len(request.GetDomain()) == 0 {
×
646
                request.Domain = common.StringPtr(wc.domain)
×
647
        }
×
648
        var response *s.ListOpenWorkflowExecutionsResponse
×
649
        err := backoff.Retry(ctx,
×
650
                func() error {
×
651
                        var err1 error
×
652
                        tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags)
×
653
                        defer cancel()
×
654
                        response, err1 = wc.workflowService.ListOpenWorkflowExecutions(tchCtx, request, opt...)
×
655
                        return err1
×
656
                }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
×
657
        if err != nil {
×
658
                return nil, err
×
659
        }
×
660
        return response, nil
×
661
}
662

663
// ListWorkflow implementation
664
func (wc *workflowClient) ListWorkflow(ctx context.Context, request *s.ListWorkflowExecutionsRequest) (*s.ListWorkflowExecutionsResponse, error) {
2✔
665
        if len(request.GetDomain()) == 0 {
3✔
666
                request.Domain = common.StringPtr(wc.domain)
1✔
667
        }
1✔
668
        var response *s.ListWorkflowExecutionsResponse
2✔
669
        err := backoff.Retry(ctx,
2✔
670
                func() error {
4✔
671
                        var err1 error
2✔
672
                        tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags)
2✔
673
                        defer cancel()
2✔
674
                        response, err1 = wc.workflowService.ListWorkflowExecutions(tchCtx, request, opt...)
2✔
675
                        return err1
2✔
676
                }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
2✔
677
        if err != nil {
3✔
678
                return nil, err
1✔
679
        }
1✔
680
        return response, nil
1✔
681
}
682

683
// ListArchivedWorkflow implementation
684
func (wc *workflowClient) ListArchivedWorkflow(ctx context.Context, request *s.ListArchivedWorkflowExecutionsRequest) (*s.ListArchivedWorkflowExecutionsResponse, error) {
2✔
685
        if len(request.GetDomain()) == 0 {
3✔
686
                request.Domain = common.StringPtr(wc.domain)
1✔
687
        }
1✔
688
        var response *s.ListArchivedWorkflowExecutionsResponse
2✔
689
        err := backoff.Retry(ctx,
2✔
690
                func() error {
4✔
691
                        var err1 error
2✔
692
                        timeout := maxListArchivedWorkflowTimeout
2✔
693
                        now := time.Now()
2✔
694
                        if ctx != nil {
4✔
695
                                if expiration, ok := ctx.Deadline(); ok && expiration.After(now) {
4✔
696
                                        timeout = expiration.Sub(now)
2✔
697
                                        if timeout > maxListArchivedWorkflowTimeout {
2✔
698
                                                timeout = maxListArchivedWorkflowTimeout
×
699
                                        } else if timeout < minRPCTimeout {
2✔
700
                                                timeout = minRPCTimeout
×
701
                                        }
×
702
                                }
703
                        }
704
                        tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags, chanTimeout(timeout))
2✔
705
                        defer cancel()
2✔
706
                        response, err1 = wc.workflowService.ListArchivedWorkflowExecutions(tchCtx, request, opt...)
2✔
707
                        return err1
2✔
708
                }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
709
        if err != nil {
3✔
710
                return nil, err
1✔
711
        }
1✔
712
        return response, nil
1✔
713
}
714

715
// ScanWorkflow implementation
716
func (wc *workflowClient) ScanWorkflow(ctx context.Context, request *s.ListWorkflowExecutionsRequest) (*s.ListWorkflowExecutionsResponse, error) {
2✔
717
        if len(request.GetDomain()) == 0 {
3✔
718
                request.Domain = common.StringPtr(wc.domain)
1✔
719
        }
1✔
720
        var response *s.ListWorkflowExecutionsResponse
2✔
721
        err := backoff.Retry(ctx,
2✔
722
                func() error {
4✔
723
                        var err1 error
2✔
724
                        tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags)
2✔
725
                        defer cancel()
2✔
726
                        response, err1 = wc.workflowService.ScanWorkflowExecutions(tchCtx, request, opt...)
2✔
727
                        return err1
2✔
728
                }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
2✔
729
        if err != nil {
3✔
730
                return nil, err
1✔
731
        }
1✔
732
        return response, nil
1✔
733
}
734

735
// CountWorkflow implementation
736
func (wc *workflowClient) CountWorkflow(ctx context.Context, request *s.CountWorkflowExecutionsRequest) (*s.CountWorkflowExecutionsResponse, error) {
2✔
737
        if len(request.GetDomain()) == 0 {
3✔
738
                request.Domain = common.StringPtr(wc.domain)
1✔
739
        }
1✔
740
        var response *s.CountWorkflowExecutionsResponse
2✔
741
        err := backoff.Retry(ctx,
2✔
742
                func() error {
4✔
743
                        var err1 error
2✔
744
                        tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags)
2✔
745
                        defer cancel()
2✔
746
                        response, err1 = wc.workflowService.CountWorkflowExecutions(tchCtx, request, opt...)
2✔
747
                        return err1
2✔
748
                }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
2✔
749
        if err != nil {
3✔
750
                return nil, err
1✔
751
        }
1✔
752
        return response, nil
1✔
753
}
754

755
// ResetWorkflow implementation
756
func (wc *workflowClient) ResetWorkflow(ctx context.Context, request *s.ResetWorkflowExecutionRequest) (*s.ResetWorkflowExecutionResponse, error) {
×
757
        if len(request.GetDomain()) == 0 {
×
758
                request.Domain = common.StringPtr(wc.domain)
×
759
        }
×
760
        var response *s.ResetWorkflowExecutionResponse
×
761
        err := backoff.Retry(ctx,
×
762
                func() error {
×
763
                        var err1 error
×
764
                        tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags)
×
765
                        defer cancel()
×
766
                        response, err1 = wc.workflowService.ResetWorkflowExecution(tchCtx, request, opt...)
×
767
                        return err1
×
768
                }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
×
769
        if err != nil {
×
770
                return nil, err
×
771
        }
×
772
        return response, nil
×
773
}
774

775
// GetSearchAttributes implementation
776
func (wc *workflowClient) GetSearchAttributes(ctx context.Context) (*s.GetSearchAttributesResponse, error) {
2✔
777
        var response *s.GetSearchAttributesResponse
2✔
778
        err := backoff.Retry(ctx,
2✔
779
                func() error {
4✔
780
                        var err1 error
2✔
781
                        tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags)
2✔
782
                        defer cancel()
2✔
783
                        response, err1 = wc.workflowService.GetSearchAttributes(tchCtx, opt...)
2✔
784
                        return err1
2✔
785
                }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
2✔
786
        if err != nil {
3✔
787
                return nil, err
1✔
788
        }
1✔
789
        return response, nil
1✔
790
}
791

792
// DescribeWorkflowExecution returns information about the specified workflow execution.
793
// The errors it can return:
794
//   - BadRequestError
795
//   - InternalServiceError
796
//   - EntityNotExistError
797
func (wc *workflowClient) DescribeWorkflowExecution(ctx context.Context, workflowID, runID string) (*s.DescribeWorkflowExecutionResponse, error) {
9✔
798
        request := &s.DescribeWorkflowExecutionRequest{
9✔
799
                Domain: common.StringPtr(wc.domain),
9✔
800
                Execution: &s.WorkflowExecution{
9✔
801
                        WorkflowId: common.StringPtr(workflowID),
9✔
802
                        RunId:      common.StringPtr(runID),
9✔
803
                },
9✔
804
        }
9✔
805
        var response *s.DescribeWorkflowExecutionResponse
9✔
806
        err := backoff.Retry(ctx,
9✔
807
                func() error {
18✔
808
                        var err1 error
9✔
809
                        tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags)
9✔
810
                        defer cancel()
9✔
811
                        response, err1 = wc.workflowService.DescribeWorkflowExecution(tchCtx, request, opt...)
9✔
812
                        return err1
9✔
813
                }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
9✔
814
        if err != nil {
9✔
815
                return nil, err
×
816
        }
×
817
        return response, nil
9✔
818
}
819

820
// QueryWorkflow queries a given workflow execution
821
// workflowID and queryType are required, other parameters are optional.
822
// - workflow ID of the workflow.
823
// - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID.
824
// - taskList can be default(empty string). If empty string then it will pick the taskList of the running execution of that workflow ID.
825
// - queryType is the type of the query.
826
// - args... are the optional query parameters.
827
// The errors it can return:
828
//   - BadRequestError
829
//   - InternalServiceError
830
//   - EntityNotExistError
831
//   - QueryFailError
832
func (wc *workflowClient) QueryWorkflow(ctx context.Context, workflowID string, runID string, queryType string, args ...interface{}) (Value, error) {
9✔
833
        queryWorkflowWithOptionsRequest := &QueryWorkflowWithOptionsRequest{
9✔
834
                WorkflowID: workflowID,
9✔
835
                RunID:      runID,
9✔
836
                QueryType:  queryType,
9✔
837
                Args:       args,
9✔
838
        }
9✔
839
        result, err := wc.QueryWorkflowWithOptions(ctx, queryWorkflowWithOptionsRequest)
9✔
840
        if err != nil {
12✔
841
                return nil, err
3✔
842
        }
3✔
843
        return result.QueryResult, nil
6✔
844
}
845

846
// QueryWorkflowWithOptionsRequest is the request to QueryWorkflowWithOptions
847
type QueryWorkflowWithOptionsRequest struct {
848
        // WorkflowID is a required field indicating the workflow which should be queried.
849
        WorkflowID string
850

851
        // RunID is an optional field used to identify a specific run of the queried workflow.
852
        // If RunID is not provided the latest run will be used.
853
        RunID string
854

855
        // QueryType is a required field which specifies the query you want to run.
856
        // By default, cadence supports "__stack_trace" as a standard query type, which will return string value
857
        // representing the call stack of the target workflow. The target workflow could also setup different query handler to handle custom query types.
858
        // See comments at workflow.SetQueryHandler(ctx Context, queryType string, handler interface{}) for more details on how to setup query handler within the target workflow.
859
        QueryType string
860

861
        // Args is an optional field used to identify the arguments passed to the query.
862
        Args []interface{}
863

864
        // QueryRejectCondition is an optional field used to reject queries based on workflow state.
865
        // QueryRejectConditionNotOpen will reject queries to workflows which are not open
866
        // QueryRejectConditionNotCompletedCleanly will reject queries to workflows which completed in any state other than completed (e.g. terminated, canceled timeout etc...)
867
        QueryRejectCondition *s.QueryRejectCondition
868

869
        // QueryConsistencyLevel is an optional field used to control the consistency level.
870
        // QueryConsistencyLevelEventual means that query will eventually reflect up to date state of a workflow.
871
        // QueryConsistencyLevelStrong means that query will reflect a workflow state of having applied all events which came before the query.
872
        QueryConsistencyLevel *s.QueryConsistencyLevel
873
}
874

875
// QueryWorkflowWithOptionsResponse is the response to QueryWorkflowWithOptions
876
type QueryWorkflowWithOptionsResponse struct {
877
        // QueryResult contains the result of executing the query.
878
        // This will only be set if the query was completed successfully and not rejected.
879
        QueryResult Value
880

881
        // QueryRejected contains information about the query rejection.
882
        QueryRejected *s.QueryRejected
883
}
884

885
// QueryWorkflowWithOptions queries a given workflow execution and returns the query result synchronously.
886
// See QueryWorkflowWithOptionsRequest and QueryWorkflowWithOptionsResult for more information.
887
// The errors it can return:
888
//   - BadRequestError
889
//   - InternalServiceError
890
//   - EntityNotExistError
891
//   - QueryFailError
892
func (wc *workflowClient) QueryWorkflowWithOptions(ctx context.Context, request *QueryWorkflowWithOptionsRequest) (*QueryWorkflowWithOptionsResponse, error) {
12✔
893
        var input []byte
12✔
894
        if len(request.Args) > 0 {
12✔
895
                var err error
×
896
                if input, err = encodeArgs(wc.dataConverter, request.Args); err != nil {
×
897
                        return nil, err
×
898
                }
×
899
        }
900
        req := &s.QueryWorkflowRequest{
12✔
901
                Domain: common.StringPtr(wc.domain),
12✔
902
                Execution: &s.WorkflowExecution{
12✔
903
                        WorkflowId: common.StringPtr(request.WorkflowID),
12✔
904
                        RunId:      getRunID(request.RunID),
12✔
905
                },
12✔
906
                Query: &s.WorkflowQuery{
12✔
907
                        QueryType: common.StringPtr(request.QueryType),
12✔
908
                        QueryArgs: input,
12✔
909
                },
12✔
910
                QueryRejectCondition:  request.QueryRejectCondition,
12✔
911
                QueryConsistencyLevel: request.QueryConsistencyLevel,
12✔
912
        }
12✔
913

12✔
914
        var resp *s.QueryWorkflowResponse
12✔
915
        err := backoff.Retry(ctx,
12✔
916
                func() error {
24✔
917
                        tchCtx, cancel, opt := newChannelContextForQuery(ctx, wc.featureFlags)
12✔
918
                        defer cancel()
12✔
919
                        var err error
12✔
920
                        resp, err = wc.workflowService.QueryWorkflow(tchCtx, req, opt...)
12✔
921
                        return err
12✔
922
                }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
12✔
923
        if err != nil {
15✔
924
                return nil, err
3✔
925
        }
3✔
926

927
        if resp.QueryRejected != nil {
9✔
928
                return &QueryWorkflowWithOptionsResponse{
×
929
                        QueryRejected: resp.QueryRejected,
×
930
                        QueryResult:   nil,
×
931
                }, nil
×
932
        }
×
933
        return &QueryWorkflowWithOptionsResponse{
9✔
934
                QueryRejected: nil,
9✔
935
                QueryResult:   newEncodedValue(resp.QueryResult, wc.dataConverter),
9✔
936
        }, nil
9✔
937
}
938

939
// DescribeTaskList returns information about the target tasklist, right now this API returns the
940
// pollers which polled this tasklist in last few minutes.
941
// - tasklist name of tasklist
942
// - tasklistType type of tasklist, can be decision or activity
943
// The errors it can return:
944
//   - BadRequestError
945
//   - InternalServiceError
946
//   - EntityNotExistError
947
func (wc *workflowClient) DescribeTaskList(ctx context.Context, tasklist string, tasklistType s.TaskListType) (*s.DescribeTaskListResponse, error) {
×
948
        request := &s.DescribeTaskListRequest{
×
949
                Domain:       common.StringPtr(wc.domain),
×
950
                TaskList:     &s.TaskList{Name: common.StringPtr(tasklist)},
×
951
                TaskListType: &tasklistType,
×
952
        }
×
953

×
954
        var resp *s.DescribeTaskListResponse
×
955
        err := backoff.Retry(ctx,
×
956
                func() error {
×
957
                        tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags)
×
958
                        defer cancel()
×
959
                        var err error
×
960
                        resp, err = wc.workflowService.DescribeTaskList(tchCtx, request, opt...)
×
961
                        return err
×
962
                }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
×
963
        if err != nil {
×
964
                return nil, err
×
965
        }
×
966

967
        return resp, nil
×
968
}
969

970
// RefreshWorkflowTasks refreshes all the tasks of a given workflow.
971
// - workflow ID of the workflow.
972
// - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID.
973
// The errors it can return:
974
//   - BadRequestError
975
//   - DomainNotActiveError
976
//   - ServiceBusyError
977
//   - EntityNotExistError
978
func (wc *workflowClient) RefreshWorkflowTasks(ctx context.Context, workflowID, runID string) error {
×
979
        request := &s.RefreshWorkflowTasksRequest{
×
980
                Domain: common.StringPtr(wc.domain),
×
981
                Execution: &s.WorkflowExecution{
×
982
                        WorkflowId: common.StringPtr(workflowID),
×
983
                        RunId:      getRunID(runID),
×
984
                },
×
985
        }
×
986

×
987
        return backoff.Retry(ctx,
×
988
                func() error {
×
989
                        tchCtx, cancel, opt := newChannelContext(ctx, wc.featureFlags)
×
990
                        defer cancel()
×
991
                        return wc.workflowService.RefreshWorkflowTasks(tchCtx, request, opt...)
×
992
                }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
×
993
}
994

995
func (wc *workflowClient) getWorkflowHeader(ctx context.Context) *s.Header {
121✔
996
        header := &s.Header{
121✔
997
                Fields: make(map[string][]byte),
121✔
998
        }
121✔
999
        writer := NewHeaderWriter(header)
121✔
1000
        for _, ctxProp := range wc.contextPropagators {
215✔
1001
                ctxProp.Inject(ctx, writer)
94✔
1002
        }
94✔
1003
        return header
121✔
1004
}
1005

1006
func (wc *workflowClient) getWorkflowStartRequest(
1007
        ctx context.Context,
1008
        tracePrefix string,
1009
        options StartWorkflowOptions,
1010
        workflowFunc interface{},
1011
        args ...interface{},
1012
) (*s.StartWorkflowExecutionRequest, error) {
126✔
1013
        workflowID := options.ID
126✔
1014
        if len(workflowID) == 0 {
128✔
1015
                workflowID = uuid.NewRandom().String()
2✔
1016
        }
2✔
1017

1018
        if options.TaskList == "" {
132✔
1019
                return nil, errors.New("missing TaskList")
6✔
1020
        }
6✔
1021

1022
        executionTimeout := common.Int32Ceil(options.ExecutionStartToCloseTimeout.Seconds())
120✔
1023
        if executionTimeout <= 0 {
121✔
1024
                return nil, errors.New("missing or invalid ExecutionStartToCloseTimeout")
1✔
1025
        }
1✔
1026

1027
        decisionTaskTimeout := common.Int32Ceil(options.DecisionTaskStartToCloseTimeout.Seconds())
119✔
1028
        if decisionTaskTimeout < 0 {
120✔
1029
                return nil, errors.New("negative DecisionTaskStartToCloseTimeout provided")
1✔
1030
        }
1✔
1031
        if decisionTaskTimeout == 0 {
118✔
NEW
1032
                decisionTaskTimeout = defaultDecisionTaskTimeoutInSecs
×
NEW
1033
        }
×
1034

1035
        // Validate type and its arguments.
1036
        workflowType, input, err := getValidatedWorkflowFunction(workflowFunc, args, wc.dataConverter, wc.registry)
118✔
1037
        if err != nil {
119✔
1038
                return nil, err
1✔
1039
        }
1✔
1040

1041
        memo, err := getWorkflowMemo(options.Memo, wc.dataConverter)
117✔
1042
        if err != nil {
117✔
NEW
1043
                return nil, err
×
NEW
1044
        }
×
1045

1046
        searchAttr, err := serializeSearchAttributes(options.SearchAttributes)
117✔
1047
        if err != nil {
117✔
NEW
1048
                return nil, err
×
NEW
1049
        }
×
1050

1051
        delayStartSeconds := common.Int32Ceil(options.DelayStart.Seconds())
117✔
1052
        if delayStartSeconds < 0 {
118✔
1053
                return nil, errors.New("Invalid DelayStart option")
1✔
1054
        }
1✔
1055

1056
        jitterStartSeconds := common.Int32Ceil(options.JitterStart.Seconds())
116✔
1057
        if jitterStartSeconds < 0 {
117✔
1058
                return nil, errors.New("Invalid JitterStart option")
1✔
1059
        }
1✔
1060

1061
        // create a workflow start span and attach it to the context object.
1062
        // N.B. we need to finish this immediately as jaeger does not give us a way
1063
        // to recreate a span given a span context - which means we will run into
1064
        // issues during replay. we work around this by creating and ending the
1065
        // workflow start span and passing in that context to the workflow. So
1066
        // everything beginning with the StartWorkflowExecutionRequest will be
1067
        // parented by the created start workflow span.
1068
        ctx, span := createOpenTracingWorkflowSpan(ctx, wc.tracer, time.Now(), fmt.Sprintf("%s-%s", tracePrefix, workflowType.Name), workflowID)
115✔
1069
        span.Finish()
115✔
1070

115✔
1071
        // get workflow headers from the context
115✔
1072
        header := wc.getWorkflowHeader(ctx)
115✔
1073

115✔
1074
        // run propagators to extract information about tracing and other stuff, store in headers field
115✔
1075
        startRequest := &s.StartWorkflowExecutionRequest{
115✔
1076
                Domain:                              common.StringPtr(wc.domain),
115✔
1077
                RequestId:                           common.StringPtr(uuid.New()),
115✔
1078
                WorkflowId:                          common.StringPtr(workflowID),
115✔
1079
                WorkflowType:                        workflowTypePtr(*workflowType),
115✔
1080
                TaskList:                            common.TaskListPtr(s.TaskList{Name: common.StringPtr(options.TaskList)}),
115✔
1081
                Input:                               input,
115✔
1082
                ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(executionTimeout),
115✔
1083
                TaskStartToCloseTimeoutSeconds:      common.Int32Ptr(decisionTaskTimeout),
115✔
1084
                Identity:                            common.StringPtr(wc.identity),
115✔
1085
                WorkflowIdReusePolicy:               options.WorkflowIDReusePolicy.toThriftPtr(),
115✔
1086
                RetryPolicy:                         convertRetryPolicy(options.RetryPolicy),
115✔
1087
                CronSchedule:                        common.StringPtr(options.CronSchedule),
115✔
1088
                Memo:                                memo,
115✔
1089
                SearchAttributes:                    searchAttr,
115✔
1090
                Header:                              header,
115✔
1091
                DelayStartSeconds:                   common.Int32Ptr(delayStartSeconds),
115✔
1092
                JitterStartSeconds:                  common.Int32Ptr(jitterStartSeconds),
115✔
1093
        }
115✔
1094

115✔
1095
        return startRequest, nil
115✔
1096
}
1097

1098
func (wc *workflowClient) getSignalWithStartRequest(
1099
        ctx context.Context,
1100
        tracePrefix, workflowID, signalName string,
1101
        signalArg interface{},
1102
        options StartWorkflowOptions,
1103
        workflowFunc interface{},
1104
        workflowArgs ...interface{},
1105
) (*s.SignalWithStartWorkflowExecutionRequest, error) {
10✔
1106

10✔
1107
        signalInput, err := encodeArg(wc.dataConverter, signalArg)
10✔
1108
        if err != nil {
10✔
NEW
1109
                return nil, err
×
NEW
1110
        }
×
1111

1112
        if workflowID == "" {
11✔
1113
                workflowID = uuid.NewRandom().String()
1✔
1114
        }
1✔
1115

1116
        if options.TaskList == "" {
13✔
1117
                return nil, errors.New("missing TaskList")
3✔
1118
        }
3✔
1119

1120
        executionTimeout := common.Int32Ceil(options.ExecutionStartToCloseTimeout.Seconds())
7✔
1121
        if executionTimeout <= 0 {
8✔
1122
                return nil, errors.New("missing or invalid ExecutionStartToCloseTimeout")
1✔
1123
        }
1✔
1124

1125
        decisionTaskTimeout := common.Int32Ceil(options.DecisionTaskStartToCloseTimeout.Seconds())
6✔
1126
        if decisionTaskTimeout < 0 {
6✔
NEW
1127
                return nil, errors.New("negative DecisionTaskStartToCloseTimeout provided")
×
NEW
1128
        }
×
1129
        if decisionTaskTimeout == 0 {
7✔
1130
                decisionTaskTimeout = defaultDecisionTaskTimeoutInSecs
1✔
1131
        }
1✔
1132

1133
        // Validate type and its arguments.
1134
        workflowType, input, err := getValidatedWorkflowFunction(workflowFunc, workflowArgs, wc.dataConverter, wc.registry)
6✔
1135
        if err != nil {
6✔
NEW
1136
                return nil, err
×
NEW
1137
        }
×
1138

1139
        memo, err := getWorkflowMemo(options.Memo, wc.dataConverter)
6✔
1140
        if err != nil {
6✔
NEW
1141
                return nil, err
×
NEW
1142
        }
×
1143

1144
        searchAttr, err := serializeSearchAttributes(options.SearchAttributes)
6✔
1145
        if err != nil {
6✔
NEW
1146
                return nil, err
×
NEW
1147
        }
×
1148

1149
        delayStartSeconds := common.Int32Ceil(options.DelayStart.Seconds())
6✔
1150
        if delayStartSeconds < 0 {
6✔
NEW
1151
                return nil, errors.New("Invalid DelayStart option")
×
NEW
1152
        }
×
1153

1154
        jitterStartSeconds := common.Int32Ceil(options.JitterStart.Seconds())
6✔
1155
        if jitterStartSeconds < 0 {
6✔
NEW
1156
                return nil, errors.New("Invalid JitterStart option")
×
NEW
1157
        }
×
1158

1159
        // create a workflow start span and attach it to the context object. finish it immediately
1160
        ctx, span := createOpenTracingWorkflowSpan(ctx, wc.tracer, time.Now(), fmt.Sprintf("%s-%s", tracePrefix, workflowType.Name), workflowID)
6✔
1161
        span.Finish()
6✔
1162

6✔
1163
        // get workflow headers from the context
6✔
1164
        header := wc.getWorkflowHeader(ctx)
6✔
1165

6✔
1166
        signalWithStartRequest := &s.SignalWithStartWorkflowExecutionRequest{
6✔
1167
                Domain:                              common.StringPtr(wc.domain),
6✔
1168
                RequestId:                           common.StringPtr(uuid.New()),
6✔
1169
                WorkflowId:                          common.StringPtr(workflowID),
6✔
1170
                WorkflowType:                        workflowTypePtr(*workflowType),
6✔
1171
                TaskList:                            common.TaskListPtr(s.TaskList{Name: common.StringPtr(options.TaskList)}),
6✔
1172
                Input:                               input,
6✔
1173
                ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(executionTimeout),
6✔
1174
                TaskStartToCloseTimeoutSeconds:      common.Int32Ptr(decisionTaskTimeout),
6✔
1175
                SignalName:                          common.StringPtr(signalName),
6✔
1176
                SignalInput:                         signalInput,
6✔
1177
                Identity:                            common.StringPtr(wc.identity),
6✔
1178
                RetryPolicy:                         convertRetryPolicy(options.RetryPolicy),
6✔
1179
                CronSchedule:                        common.StringPtr(options.CronSchedule),
6✔
1180
                Memo:                                memo,
6✔
1181
                SearchAttributes:                    searchAttr,
6✔
1182
                WorkflowIdReusePolicy:               options.WorkflowIDReusePolicy.toThriftPtr(),
6✔
1183
                Header:                              header,
6✔
1184
                DelayStartSeconds:                   common.Int32Ptr(delayStartSeconds),
6✔
1185
                JitterStartSeconds:                  common.Int32Ptr(jitterStartSeconds),
6✔
1186
        }
6✔
1187

6✔
1188
        return signalWithStartRequest, nil
6✔
1189
}
1190

1191
// Register a domain with cadence server
1192
// The errors it can throw:
1193
//   - DomainAlreadyExistsError
1194
//   - BadRequestError
1195
//   - InternalServiceError
1196
func (dc *domainClient) Register(ctx context.Context, request *s.RegisterDomainRequest) error {
3✔
1197
        return backoff.Retry(ctx,
3✔
1198
                func() error {
6✔
1199
                        tchCtx, cancel, opt := newChannelContext(ctx, dc.featureFlags)
3✔
1200
                        defer cancel()
3✔
1201
                        return dc.workflowService.RegisterDomain(tchCtx, request, opt...)
3✔
1202
                }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
3✔
1203
}
1204

1205
// Describe a domain. The domain has 3 part of information
1206
// DomainInfo - Which has Name, Status, Description, Owner Email
1207
// DomainConfiguration - Configuration like Workflow Execution Retention Period In Days, Whether to emit metrics.
1208
// ReplicationConfiguration - replication config like clusters and active cluster name
1209
// The errors it can throw:
1210
//   - EntityNotExistsError
1211
//   - BadRequestError
1212
//   - InternalServiceError
1213
func (dc *domainClient) Describe(ctx context.Context, name string) (*s.DescribeDomainResponse, error) {
3✔
1214
        request := &s.DescribeDomainRequest{
3✔
1215
                Name: common.StringPtr(name),
3✔
1216
        }
3✔
1217

3✔
1218
        var response *s.DescribeDomainResponse
3✔
1219
        err := backoff.Retry(ctx,
3✔
1220
                func() error {
6✔
1221
                        tchCtx, cancel, opt := newChannelContext(ctx, dc.featureFlags)
3✔
1222
                        defer cancel()
3✔
1223
                        var err error
3✔
1224
                        response, err = dc.workflowService.DescribeDomain(tchCtx, request, opt...)
3✔
1225
                        return err
3✔
1226
                }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
3✔
1227
        if err != nil {
3✔
1228
                return nil, err
×
1229
        }
×
1230
        return response, nil
3✔
1231
}
1232

1233
// Update a domain.
1234
// The errors it can throw:
1235
//   - EntityNotExistsError
1236
//   - BadRequestError
1237
//   - InternalServiceError
1238
func (dc *domainClient) Update(ctx context.Context, request *s.UpdateDomainRequest) error {
3✔
1239
        return backoff.Retry(ctx,
3✔
1240
                func() error {
6✔
1241
                        tchCtx, cancel, opt := newChannelContext(ctx, dc.featureFlags)
3✔
1242
                        defer cancel()
3✔
1243
                        _, err := dc.workflowService.UpdateDomain(tchCtx, request, opt...)
3✔
1244
                        return err
3✔
1245
                }, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
3✔
1246
}
1247

1248
func getRunID(runID string) *string {
168✔
1249
        if runID == "" {
171✔
1250
                // Cadence Server will pick current runID if provided empty.
3✔
1251
                return nil
3✔
1252
        }
3✔
1253
        return common.StringPtr(runID)
165✔
1254
}
1255

1256
func (iter *historyEventIteratorImpl) HasNext() bool {
260✔
1257
        if iter.nextEventIndex < len(iter.events) || iter.err != nil {
389✔
1258
                return true
129✔
1259
        } else if !iter.initialized || len(iter.nexttoken) != 0 {
389✔
1260
                iter.initialized = true
129✔
1261
                response, err := iter.paginate(iter.nexttoken)
129✔
1262
                iter.nextEventIndex = 0
129✔
1263
                if err == nil {
256✔
1264
                        iter.events = response.History.Events
127✔
1265
                        iter.nexttoken = response.NextPageToken
127✔
1266
                        iter.err = nil
127✔
1267
                } else {
129✔
1268
                        iter.events = nil
2✔
1269
                        iter.nexttoken = nil
2✔
1270
                        iter.err = err
2✔
1271
                }
2✔
1272

1273
                if iter.nextEventIndex < len(iter.events) || iter.err != nil {
258✔
1274
                        return true
129✔
1275
                }
129✔
1276
                return false
×
1277
        }
1278

1279
        return false
2✔
1280
}
1281

1282
func (iter *historyEventIteratorImpl) Next() (*s.HistoryEvent, error) {
129✔
1283
        // if caller call the Next() when iteration is over, just return nil, nil
129✔
1284
        if !iter.HasNext() {
129✔
1285
                panic("HistoryEventIterator Next() called without checking HasNext()")
×
1286
        }
1287

1288
        // we have cached events
1289
        if iter.nextEventIndex < len(iter.events) {
256✔
1290
                index := iter.nextEventIndex
127✔
1291
                iter.nextEventIndex++
127✔
1292
                return iter.events[index], nil
127✔
1293
        } else if iter.err != nil {
131✔
1294
                // we have err, clear that iter.err and return err
2✔
1295
                err := iter.err
2✔
1296
                iter.err = nil
2✔
1297
                return nil, err
2✔
1298
        }
2✔
1299

1300
        panic("HistoryEventIterator Next() should return either a history event or a err")
×
1301
}
1302

1303
func (workflowRun *workflowRunImpl) GetRunID() string {
30✔
1304
        return workflowRun.firstRunID
30✔
1305
}
30✔
1306

1307
func (workflowRun *workflowRunImpl) GetID() string {
12✔
1308
        return workflowRun.workflowID
12✔
1309
}
12✔
1310

1311
func (workflowRun *workflowRunImpl) Get(ctx context.Context, valuePtr interface{}) error {
121✔
1312

121✔
1313
        iter := workflowRun.iterFn(ctx, workflowRun.currentRunID)
121✔
1314
        if !iter.HasNext() {
121✔
1315
                panic("could not get last history event for workflow")
×
1316
        }
1317
        closeEvent, err := iter.Next()
121✔
1318
        if err != nil {
121✔
1319
                return err
×
1320
        }
×
1321

1322
        switch closeEvent.GetEventType() {
121✔
1323
        case s.EventTypeWorkflowExecutionCompleted:
68✔
1324
                attributes := closeEvent.WorkflowExecutionCompletedEventAttributes
68✔
1325
                if valuePtr == nil || attributes.Result == nil {
77✔
1326
                        return nil
9✔
1327
                }
9✔
1328
                rf := reflect.ValueOf(valuePtr)
59✔
1329
                if rf.Type().Kind() != reflect.Ptr {
59✔
1330
                        return errors.New("value parameter is not a pointer")
×
1331
                }
×
1332
                err = deSerializeFunctionResult(workflowRun.workflowFn, attributes.Result, valuePtr, workflowRun.dataConverter, workflowRun.registry)
59✔
1333
        case s.EventTypeWorkflowExecutionFailed:
19✔
1334
                attributes := closeEvent.WorkflowExecutionFailedEventAttributes
19✔
1335
                err = constructError(attributes.GetReason(), attributes.Details, workflowRun.dataConverter)
19✔
1336
        case s.EventTypeWorkflowExecutionCanceled:
4✔
1337
                attributes := closeEvent.WorkflowExecutionCanceledEventAttributes
4✔
1338
                details := newEncodedValues(attributes.Details, workflowRun.dataConverter)
4✔
1339
                err = NewCanceledError(details)
4✔
1340
        case s.EventTypeWorkflowExecutionTerminated:
4✔
1341
                err = newTerminatedError()
4✔
1342
        case s.EventTypeWorkflowExecutionTimedOut:
1✔
1343
                attributes := closeEvent.WorkflowExecutionTimedOutEventAttributes
1✔
1344
                err = NewTimeoutError(attributes.GetTimeoutType())
1✔
1345
        case s.EventTypeWorkflowExecutionContinuedAsNew:
25✔
1346
                attributes := closeEvent.WorkflowExecutionContinuedAsNewEventAttributes
25✔
1347
                workflowRun.currentRunID = attributes.GetNewExecutionRunId()
25✔
1348
                return workflowRun.Get(ctx, valuePtr)
25✔
1349
        default:
×
1350
                err = fmt.Errorf("Unexpected event type %s when handling workflow execution result", closeEvent.GetEventType())
×
1351
        }
1352
        return err
87✔
1353
}
1354

1355
func getWorkflowMemo(input map[string]interface{}, dc DataConverter) (*s.Memo, error) {
208✔
1356
        if input == nil {
398✔
1357
                return nil, nil
190✔
1358
        }
190✔
1359

1360
        memo := make(map[string][]byte)
18✔
1361
        for k, v := range input {
36✔
1362
                memoBytes, err := encodeArg(dc, v)
18✔
1363
                if err != nil {
20✔
1364
                        return nil, fmt.Errorf("encode workflow memo error: %v", err.Error())
2✔
1365
                }
2✔
1366
                memo[k] = memoBytes
16✔
1367
        }
1368
        return &s.Memo{Fields: memo}, nil
16✔
1369
}
1370

1371
func serializeSearchAttributes(input map[string]interface{}) (*s.SearchAttributes, error) {
217✔
1372
        if input == nil {
407✔
1373
                return nil, nil
190✔
1374
        }
190✔
1375

1376
        attr := make(map[string][]byte)
27✔
1377
        for k, v := range input {
54✔
1378
                attrBytes, err := json.Marshal(v)
27✔
1379
                if err != nil {
30✔
1380
                        return nil, fmt.Errorf("encode search attribute [%s] error: %v", k, err)
3✔
1381
                }
3✔
1382
                attr[k] = attrBytes
24✔
1383
        }
1384
        return &s.SearchAttributes{IndexedFields: attr}, nil
24✔
1385
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc